フローグラフの基本: 予約#
oneAPI スレッディング・ビルディング・ブロック (oneTBB) のフローグラフ join_node には、次の 4 つのポリシーがあります: queueing、reserving、key_matching、tag_matching。join_nodes は、出力メッセージを作成する前に、すべての入力でメッセージを必要とします。予約側の join_node には内部バッファーがなく、各入力にメッセージが到着するまで、入力からメッセージを抽出しません。出力メッセージを作成するには、各入力ポートでメッセージを一時的に予約し、すべての入力ポートがメッセージの予約に成功した場合にのみ出力メッセージが作成されます。いずれかの入力ポートがメッセージを予約できない場合、join_node によってメッセージはプルされません。
join_node の予約をサポートするため、一部のノードは出力の予約をサポートしています。予約の仕組みは次のとおりです。
プッシュ状態の予約
join_nodeに接続されたノードがメッセージをプッシュしようとすると、join_nodeは常にプッシュを拒否し、ノードを接続するエッジはプルモードに切り替わります。予約入力ポートは、プル状態の各エッジで
try_reserveを呼び出します。これは失敗する可能性があります。その場合、予約入力ポートはエッジをプッシュ状態に切り替え、プル状態のエッジで接続されている次のノードを予約しようとします。入力ポートの先行ノードが予約状態にある間、他のノードは予約された値を取得できません。各入力ポートがプル状態のエッジを正常に予約すると、予約
join_nodeは予約されたメッセージを使用してメッセージを作成し、結果のメッセージをそれに接続されているすべてのノードにプッシュします。メッセージが後続ノードに正常にプッシュされると、予約されていた先行ノードにメッセージが使用されたことが通知されます (
try_consume()呼び出しで)。これらのメッセージは正常にプッシュされているため、先行ノードによって破棄されます。メッセージが後続ノードへのプッシュに失敗すると、予約されていた先行ノードにはメッセージが使用されなかったことが通知されます (
try_release()呼び出しで)。この時点で、メッセージは他のノードにプッシュされたり、他のノードによってプルされる可能性があります。
予約 join_node は、各入力ポートに少なくとも 1 つのエッジがプル状態である場合にのみプッシュを試行し、すべての入力ポートがメッセージの予約に成功した場合にのみメッセージを作成してプッシュを試行するため、予約 join_node の各入力ポートの先行ポートの少なくとも 1 つは予約可能である必要があります。
次の例は、予約 join_node の動作を示しています。buffer_nodes は出力をバッファリングするため、出力エッジのプッシュモードからプルモードへの切り替えを受け入れます。broadcast_nodes はメッセージをバッファリングせず、try_get() または try_reserve() をサポートしません。
void run_example2() { // Flow_Graph_Reservation.xml の例
graph g;
broadcast_node<int> bn(g);
buffer_node<int> buf1(g);
buffer_node<int> buf2(g);
typedef join_node<tuple<int,int>, reserving> join_type;
join_type jn(g);
buffer_node<join_type::output_type> buf_out(g);
join_type::output_type tuple_out;
int icnt;
// join_node の先行ノードは両方とも予約可能な buffer_nodes です
make_edge(buf1,input_port<0>(jn));
make_edge(bn,input_port<0>(jn)); // broadcast_node を接続
make_edge(buf2,input_port<1>(jn)); make_edge(jn, buf_out);
bn.try_put(2);
buf1.try_put(3);
buf2.try_put(4);
buf2.try_put(7);
g.wait_for_all();
while (buf_out.try_get(tuple_out)) {
printf("join_node output == (%d,%d)\n",get<0>(tuple_out), get<1>(tuple_out) );
}
if(buf1.try_get(icnt)) printf("buf1 had %d\n", icnt);
else printf("buf1 was empty\n");
if(buf2.try_get(icnt)) printf("buf2 had %d\n", icnt);
else printf("buf2 was empty\n");
}上記の例では、予約 join_node jn のポート 0 には、buffer_node buf1 と broadcast_node bn の 2 つの先行ノードがあります。join_node のポート 1 には、1 つの先行ノード、buffer_node buf2 があります。
実行シーケンスの 1 つについて考えてみます (タスクのスケジュールは若干異なることがありますが、最終結果は同じになります)。
bn.try_put(2);bn は 2 を jn に転送しようとします。jn は値を受け入れず、bn から jn へ反転します。bn も jn もメッセージをバッファリングしないため、メッセージは失われます。jn へのすべての入力に利用可能な先行入力があるわけではないので、jn はそれ以上何も行いません。
警告
予約をサポートしていないノードは、予約 join_node に接続しても正しく動作しません。このプログラムは、発生の原因を示しています。予約をサポートしないノードを予約のサポートを必要とするノードに接続することは、推奨される方法ではありません。
buf1.try_put(3);buf1 は 3 を jn に転送しようとします。jn は値を受け入れず、buf1 から jn へ反転します。jn へのすべての入力に利用可能な先行入力があるわけではないので、jn はそれ以上何も行いません。
buf2.try_put(4);buf2 は 4 を jn に転送しようとします。jn は値を受け入れず、buf2 から jn へ反転します。これで、jn の両方の入力に先行処理が存在するため、jn からメッセージを構築して転送するタスクが生成されます。タスクはまだ実行されていないものと想定します。
buf2.try_put(7);buf2 には後継ノードがないため (jn へ反転しているため)、値 7 が格納されます。
これで、jn を実行するタスクがスポーンされます。
jnはbnを予約しようとしますが、失敗します。bnへの流れは、前方向に戻ります。jnはbuf1を予約しようとし、成功します (予約されたノードは灰色で表示されます)。jnはbuf1から値 3 を受信しますが、値はbuf1に残ります (jnからのメッセージの転送が失敗した場合に備えて)。jnはbuf2を予約しようとし、成功します。jnはbuf2から値 4 を受け取りますが、値はbuf2に残ります。jnは出力メッセージtuple<3,4>を構築します。
ここで、jn はメッセージを buf_out にプッシュし、buf_out はそれを受け入れます。プッシュが成功したため、jn は予約された値が使用されたことを buf1 と buf2 に通知し、バッファーはそれらの値を破棄します。ここで、jn は再度予約を試みます。
bnからjnへのエッジはプッシュ状態にあるため、bnからのプルは試行されません。jnはbuf1を予約しようとしますが、失敗します。buf1への流れは、前方向に戻ります。jnはそれ以上のアクションを試行しません。
グラフではこれ以上のアクティビティーは発生せず、wait_for_all() が完了します。このコードの出力は以下です。
join_node output == (3,4)
buf1 was empty
buf2 had 7






