フローグラフの基本: 予約

フローグラフの基本: 予約#

oneAPI スレッディング・ビルディング・ブロック (oneTBB) のフローグラフ join_node には、次の 4 つのポリシーがあります: queueingreservingkey_matchingtag_matchingjoin_nodes は、出力メッセージを作成する前に、すべての入力でメッセージを必要とします。予約側の join_node には内部バッファーがなく、各入力にメッセージが到着するまで、入力からメッセージを抽出しません。出力メッセージを作成するには、各入力ポートでメッセージを一時的に予約し、すべての入力ポートがメッセージの予約に成功した場合にのみ出力メッセージが作成されます。いずれかの入力ポートがメッセージを予約できない場合、join_node によってメッセージはプルされません。

join_node の予約をサポートするため、一部のノードは出力の予約をサポートしています。予約の仕組みは次のとおりです。

  • プッシュ状態の予約 join_node に接続されたノードがメッセージをプッシュしようとすると、join_node は常にプッシュを拒否し、ノードを接続するエッジはプルモードに切り替わります。

  • 予約入力ポートは、プル状態の各エッジで try_reserve を呼び出します。これは失敗する可能性があります。その場合、予約入力ポートはエッジをプッシュ状態に切り替え、プル状態のエッジで接続されている次のノードを予約しようとします。入力ポートの先行ノードが予約状態にある間、他のノードは予約された値を取得できません。

  • 各入力ポートがプル状態のエッジを正常に予約すると、予約 join_node は予約されたメッセージを使用してメッセージを作成し、結果のメッセージをそれに接続されているすべてのノードにプッシュします。

  • メッセージが後続ノードに正常にプッシュされると、予約されていた先行ノードにメッセージが使用されたことが通知されます (try_consume() 呼び出しで)。これらのメッセージは正常にプッシュされているため、先行ノードによって破棄されます。

  • メッセージが後続ノードへのプッシュに失敗すると、予約されていた先行ノードにはメッセージが使用されなかったことが通知されます (try_release() 呼び出しで)。この時点で、メッセージは他のノードにプッシュされたり、他のノードによってプルされる可能性があります。

予約 join_node は、各入力ポートに少なくとも 1 つのエッジがプル状態である場合にのみプッシュを試行し、すべての入力ポートがメッセージの予約に成功した場合にのみメッセージを作成してプッシュを試行するため、予約 join_node の各入力ポートの先行ポートの少なくとも 1 つは予約可能である必要があります。

次の例は、予約 join_node の動作を示しています。buffer_nodes は出力をバッファリングするため、出力エッジのプッシュモードからプルモードへの切り替えを受け入れます。broadcast_nodes はメッセージをバッファリングせず、try_get() または try_reserve() をサポートしません。

void run_example2() { // Flow_Graph_Reservation.xml の例 
    graph g; 
    broadcast_node<int> bn(g); 
    buffer_node<int> buf1(g);
    buffer_node<int> buf2(g); 
    typedef join_node<tuple<int,int>, reserving> join_type; 
    join_type jn(g); 
    buffer_node<join_type::output_type> buf_out(g); 
    join_type::output_type tuple_out; 
    int icnt; 

    // join_node の先行ノードは両方とも予約可能な buffer_nodes です 
    make_edge(buf1,input_port<0>(jn)); 
    make_edge(bn,input_port<0>(jn)); // broadcast_node を接続 
    make_edge(buf2,input_port<1>(jn)); make_edge(jn, buf_out); 
    bn.try_put(2); 
    buf1.try_put(3); 
    buf2.try_put(4); 
    buf2.try_put(7); 
    g.wait_for_all(); 
    while (buf_out.try_get(tuple_out)) { 
        printf("join_node output == (%d,%d)\n",get<0>(tuple_out), get<1>(tuple_out) ); 
    } 
    if(buf1.try_get(icnt)) printf("buf1 had %d\n", icnt); 
    else printf("buf1 was empty\n"); 
    if(buf2.try_get(icnt)) printf("buf2 had %d\n", icnt); 
    else printf("buf2 was empty\n"); 
}

上記の例では、予約 join_node jn のポート 0 には、buffer_node buf1broadcast_node bn の 2 つの先行ノードがあります。join_node のポート 1 には、1 つの先行ノード、buffer_node buf2 があります。

image0

実行シーケンスの 1 つについて考えてみます (タスクのスケジュールは若干異なることがありますが、最終結果は同じになります)。

bn.try_put(2);

bn は 2 を jn に転送しようとします。jn は値を受け入れず、bn から jn へ反転します。bn も jn もメッセージをバッファリングしないため、メッセージは失われます。jn へのすべての入力に利用可能な先行入力があるわけではないので、jn はそれ以上何も行いません。

警告

予約をサポートしていないノードは、予約 join_node に接続しても正しく動作しません。このプログラムは、発生の原因を示しています。予約をサポートしないノードを予約のサポートを必要とするノードに接続することは、推奨される方法ではありません

イメージ1

buf1.try_put(3);

buf1 は 3 を jn に転送しようとします。jn は値を受け入れず、buf1 から jn へ反転します。jn へのすべての入力に利用可能な先行入力があるわけではないので、jn はそれ以上何も行いません。

イメージ2

buf2.try_put(4);

buf2 は 4 を jn に転送しようとします。jn は値を受け入れず、buf2 から jn へ反転します。これで、jn の両方の入力に先行処理が存在するため、jn からメッセージを構築して転送するタスクが生成されます。タスクはまだ実行されていないものと想定します。

イメージ3

buf2.try_put(7);

buf2 には後継ノードがないため (jn へ反転しているため)、値 7 が格納されます。

image4

これで、jn を実行するタスクがスポーンされます。

  • jnbn を予約しようとしますが、失敗します。bn への流れは、前方向に戻ります。

  • jnbuf1 を予約しようとし、成功します (予約されたノードは灰色で表示されます)。jnbuf1 から値 3 を受信しますが、値は buf1 に残ります (jn からのメッセージの転送が失敗した場合に備えて)。

  • jnbuf2 を予約しようとし、成功します。jnbuf2 から値 4 を受け取りますが、値は buf2 に残ります。

  • jn は出力メッセージ tuple<3,4> を構築します。

image5

ここで、jn はメッセージを buf_out にプッシュし、buf_out はそれを受け入れます。プッシュが成功したため、jn は予約された値が使用されたことを buf1buf2 に通知し、バッファーはそれらの値を破棄します。ここで、jn は再度予約を試みます。

  • bn から jn へのエッジはプッシュ状態にあるため、bn からのプルは試行されません。

  • jnbuf1 を予約しようとしますが、失敗します。buf1 への流れは、前方向に戻ります。

  • jn はそれ以上のアクションを試行しません。

image6

グラフではこれ以上のアクティビティーは発生せず、wait_for_all() が完了します。このコードの出力は以下です。

join_node output == (3,4) 
buf1 was empty 
buf2 had 7