フローグラフで単一メッセージを待機#
説明#
この機能により、フローグラフ内の受信ノードに新しい try_put_and_wait インターフェイスが追加されます。この関数は、メッセージをフローグラフへの入力として配置し、そのメッセージに関連するすべてのワークが完了するまで待機します。graph::wait_for_all は、入力メッセージに関係のないワークも含め、すべてのワークが完了するまで待機するため、try_put_and_wait は graph::wait_for_all を呼び出す場合と比べてレイテンシーを短縮できる可能性があります。
node.try_put_and_wait(msg) はノード上で node.try_put(msg) を実行し、msg でワークが完了するまで待機します。したがって、次の条件が当てはまります。
フローグラフ内の任意のノードによって開始されたタスクで、
msgの操作またはmsgから計算されたその他の中間結果の処理を完了します。msgから計算された中間結果はグラフ内のいづれのバッファーにも残りません。
警告
try_put_and_wait 呼び出しが無限に待機するのを防ぐには、最終結果はフローグラフによって自動的に消費されないため、フローグラフの最後でバッファーノードを使用てはなりません。
警告
現在、この機能では multifunction_node クラスと async_node クラスはサポートされていません。フローグラフにこれらのノードの 1 つを含めると、初期入力メッセージの計算が進行中であっても、try_put_and_wait が早期に終了する可能性があります。
API#
ヘッダー#
#define TBB_PREVIEW_FLOW_GRAPH_FEATURES // マクロオプション 1
#define TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // マクロオプション 2
#include <oneapi/tbb/flow_graph.h>概要#
namespace oneapi {
namespace tbb {
template <typename Output, typename Policy = /*default-policy*/>
class continue_node {
public:
bool try_put_and_wait(const continue_msg& input);
}; // class continue_node
template <typename Input, typename Output = continue_msg, typename Policy = /*default-policy*/>
class function_node {
public:
bool try_put_and_wait(const Input& input);
}; // class function_node
template <typename T>
class overwrite_node {
public:
bool try_put_and_wait(const T& input);
}; // class overwrite_node
template <typename T>
class write_once_node {
public:
bool try_put_and_wait(const T& input);
}; // class write_once_node
template <typename T>
class buffer_node {
public:
bool try_put_and_wait(const T& input);
}; // class buffer_node
template <typename T>
class queue_node {
public:
bool try_put_and_wait(const T& input);
}; // class queue_node
template <typename T, typename Compare = std::less<T>>
class priority_queue_node {
public:
bool try_put_and_wait(const T& input);
}; // class priority_queue_node
template <typename T>
class sequencer_node {
public:
bool try_put_and_wait(const T& input);
}; // class sequencer_node
template <typename T, typename DecrementType = continue_msg>
class limiter_node {
public:
bool try_put_and_wait(const T& input);
}; // class limiter_node
template <typename T>
class broadcast_node {
public:
bool try_put_and_wait(const T& input);
}; // class broadcast_node
template <typename TupleType>
class split_node {
public:
bool try_put_and_wait(const TupleType& input);
}; // class split_node
} // namespace tbb
} // namespace oneapiメンバー関数#
template <typename Output, typename Policy>
bool continue_node<Output, Policy>::try_put_and_wait(const continue_msg& input)効果: 入力信号を受信した回数をインクリメントします。インクリメントされたカウント値が既知の先行処理数と等しい場合、body 関数オブジェクトを実行します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
template <typename Input, typename Output, typename Policy>
bool function_node<Input, Output, Policy>::try_put_and_wait(const Input& input)効果: 同時実行の制限が適用されると、受信メッセージ input に対するユーザー定義 body を実行します。それ以外の場合は、ノードの Policy に応じて受信メッセージ input をキューに投入するか拒否します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: 入力が受け付けられた場合は true、それ以外は false。
template <typename T>
bool overwrite_node<T>::try_put_and_wait(const T& input)効果: 内部の単一項目のバッファーに input を格納し、すべての後続処理にブロードキャストします。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
警告
入力要素は、後続ノードが受け入れると overwrite_node から取得されないため、clear() メソッドを明示的に呼び出すか別の要素で上書きして取得し、try_put_and_wait が無期限に待機するのを防ぎます。
template <typename T>
bool write_once_node<T>::try_put_and_wait(const T& input)効果: 有効な値がまだ含まれていない場合は、input を内部の単一項目バッファーに格納します。新しい値が設定されるとノードはすべての後続ノードにその値をブロードキャストします。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: 構築後または clear() の最初の呼び出しには true を返します。
警告
入力要素は、後続ノードが受け入れると write_once_node から取得されないため、clear() メソッドを明示的に呼び出すか別の要素で上書きして取得し、try_put_and_wait が無期限に待機するのを防ぎます。
template <typename T>
bool buffer_node<T>::try_put_and_wait(const T& input)効果: ノードが管理する項目セットに input を追加して後続ノードに転送します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
template <typename T>
bool queue_node<T>::try_put_and_wait(const T& input)効果: ノードが管理する項目セットに input を追加して、最後に追加された項目を後続ノードに転送します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
template <typename T, typename Compare>
bool priority_queue_node<T>::try_put_and_wait(const T& input)効果: Input を priority_queue_node に追加し、ノードに追加されたまま、まだ後続ノードに転送されていないすべての項目の中で、最も優先度の高い項目を転送します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
template <typename T>
bool sequencer_node<T>::try_put_and_wait(const T& input)効果: input を sequencer_node に追加し、シーケンス内の次の項目を後続のノードに転送します。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
template <typename T, typename DecrementType>
bool limiter_node<T, DecrementType>::try_put_and_wait(const T& input)効果: ブロードキャスト・カウントがしきい値未満の場合、すべての後続ノードに input がブロードキャストされます。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: Input がブロードキャストされる場合は true、それ以外の場合は false。
template <typename T>
bool broadcast_node<T>::try_put_and_wait(const T& input)効果: すべての後続に input をブロードキャストします。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: メッセージを後続ノードに転送できなかった場合でも常に true を返します。
template <typename TupleType>
bool split_node<TupleType>::try_put_and_wait(const TupleType& input);効果: 受け取ったタプルの各要素を、split_nodeの出力ポートに接続されているノードにブロードキャストします。input のインデックス i の要素は、出力ポート番号 i を通じてブロードキャストされます。
フローグラフ内の input が完了するまで待機します。つまり、各ノードによって作成され、 input に関連するすべてのタスクが実行され、グラフ内のどのバッファーにも関連するオブジェクトが残りません。
戻り値: true。
例#
#define TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT 1
#include <oneapi/tbb/flow_graph.h>
#include <oneapi/tbb/parallel_for.h>
#include <tuple>
struct f1_body;
struct f2_body;
struct f3_body;
struct f4_body;
int main() {
using namespace oneapi::tbb;
flow::graph g;
flow::broadcast_node<int> start_node(g);
flow::function_node<int, int> f1(g, flow::unlimited, f1_body{});
flow::function_node<int, int> f2(g, flow::unlimited, f2_body{});
flow::function_node<int, int> f3(g, flow::unlimited, f3_body{});
flow::join_node<std::tuple<int, int>> join(g);
flow::function_node<std::tuple<int, int>, int> f4(g, flow::serial, f4_body{});
flow::make_edge(start_node, f1);
flow::make_edge(f1, f2);
flow::make_edge(start_node, f3);
flow::make_edge(f2, flow::input_port<0>(join));
flow::make_edge(f3, flow::input_port<1>(join));
flow::make_edge(join, f4);
// グラフにワークを送信
parallel_for(0, 100, [&](int input) {
start_node.try_put_and_wait(input);
// 入力結果の後処理
});
}parallel_for の各反復は、フローグラフに入力を送信します。try_put_and_wait(input) から戻った後、グラフ内のすべてのノードによって input の完了に関連するすべてのワークが実行されることが保証されます。他の呼び出しで送信された入力に関連するタスクの完了は保証されません。