データ・フロー・グラフ

データ・フロー・グラフ#

データ・フロー・グラフにおけるノードは、データメッセージを送受信する計算です。一部のノードはメッセージの送信のみを、他のノードはメッセージの受信のみを行い、また他のノードは受信したメッセージに応答してメッセージを送信する場合があります。

次のデータ・フロー・グラフでは、左端のノードが 1 から 10 までの整数値を生成し、それを 2 つの後続ノードに渡します。後続処理の 1 つは、受信した各値を 2 乗し、その結果を下流に渡します。2 番目の後継処理では、受信した各値を 3 乗し、その結果を下流に渡します。右端のノードは、中央のノードの両方から値を受け取ります。各値を受け取ると、その値が合計に加算されます。アプリケーションが完了まで実行されると、合計値は 1 から 10 までの一連の平方数と立方数の合計に等しくなります。

単純なデータ・フロー・グラフ

image0

次のコード例は、上記の単純なデータ・フロー・グラフの実装を示しています。

int sum = 0; 
graph g; 
function_node< int, int > squarer( g, unlimited, [](const int &v) { 
    return v*v; 
} ); 
function_node< int, int > cuber( g, unlimited, [](const int &v) { 
    return v*v*v; 
} ); 
function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
    return sum += v; 
} ); 
make_edge( squarer, summer ); 
make_edge( cuber, summer ); 

for ( int i = 1; i <= 10; ++i ) { 
    squarer.try_put(i); 
    cuber.try_put(i); 
} 
g.wait_for_all(); 

cout << "Sum is " << sum << "\n";

上記の実装では、次の function_nodes が作成されます。

  • 2 乗の値を生成

  • 3 乗の値を生成

  • グローバル合計に値を加算

squarer ノードと cuber ノードは副作用がないため、無制限の同時実行で作成されます。summer ノードは、グローバル変数への参照を通じて合計を更新するため、並列実行は安全ではありません。したがって、同時実行制限が 1 で作成されます。上記の単純なデータ・フロー・グラフのノード F は、squarer ノードと cuber ノードの両方にメッセージを送るループとして実装されています。

最初の実装に対する改善点は、追加のノードタイプである broadcast_node を導入したことです。broadcast_node は受信したメッセージを後続ノードすべてにブロードキャストします。

これにより、ループ内の 2 つの try_put を 1 つの try_put に置き換えることができます。

broadcast_node<int> b(g); 
make_edge( b, squarer ); 
make_edge( b, cuber ); 
for ( int i = 1; i <= 10; ++i ) { 
    b.try_put(i); 
} 
g.wait_for_all();

さらに優れたオプションは、input_node を導入することです。これにより、実装は上記の単純なデータ・フロー・グラフにさらに近づきます。input_node は、その名前が示すとおり、メッセージを送信するだけで、メッセージを受信しません。コンストラクターは 2 つの引数を受け取ります。

template< typename Body > input_node( graph &g, Body body)

ボディーは関数オペレーターを含む関数オブジェクトまたはラムダ式です。

Output Body::operator()( oneapi::tbb::flow_control &fc );

ループを input_node に置き換えることもできます。

input_node< int > src( g, src_body(10) ); 
make_edge( src, squarer ); 
make_edge( src, cuber ); 
src.activate(); 
g.wait_for_all();

ランタイム・ライブラリーは、ボディー内で fc.stop() が呼び出されるまで、src_body 内の関数オペレーターを繰り返し呼び出します。したがって、上記の単純なデータ・フロー・グラフのループボディーのように動作するボディーを作成する必要があります。これらすべての変更を適用した最終的な実装を以下に示します。

class src_body { 
    const int my_limit; 
    int my_next_value; 
public: 
    src_body(int l) : my_limit(l), my_next_value(1) {} 
    int operator()( oneapi::tbb::flow_control& fc ) { 
        if ( my_next_value <= my_limit ) { 
            return my_next_value++; 
        } else { 
            fc.stop(); 
            return int(); 
        } 
    } 
}; 

int main() { 
    int sum = 0; 
    graph g; 
    function_node< int, int > squarer( g, unlimited, [](const int &v) { 
        return v*v; 
    } ); 
    function_node< int, int > cuber( g, unlimited, [](const int &v) { 
        return v*v*v; 
    } ); 
    function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 
        return sum += v; 
    } ); 
    make_edge( squarer, summer ); 
    make_edge( cuber, summer ); 
    input_node< int > src( g, src_body(10) ); 
    make_edge( src, squarer ); 
    make_edge( src, cuber ); 
    src.activate(); 
    g.wait_for_all(); 
    cout << "Sum is " << sum << "\n"; 
}

この最終的な実装には、上記の単純なデータ・フロー・グラフのすべてのノードとエッジが含まれます。この単純な例では、明示的なループに換えて input_node を使用する利点はあまりありません。ただし、input_node は下流ノードの動作に反応できるため、より複雑なグラフではメモリー使用量を抑えることができます。詳細については、create_token_based_system を参照してください。