1 つまたは複数の後続への送信

1 つまたは複数の後続への送信#

事前定義されたノードの重要な特性は、出力を単一の後続ノードにプッシュするか、すべての後続ノードにブロードキャストするかです。次の定義済みノードは、メッセージを単一の後続ノードにプッシュします。

  • buffer_node

  • queue_node

  • priority_queue_node

  • sequencer_node

他のノードは、メッセージを受け入れるすべての後続ノードにメッセージをプッシュします。

単一の後継ノードにのみプッシュするノードはすべてバッファーノードです。それらの目的は、メッセージが下流で消費されるまで一時的に保持することです。以下のサンプルを考えてみます。

void use_buffer_and_two_nodes() { 
  graph g; 

  function_node< int, int, rejecting > f1( g, 1, []( int i ) -> int { 
    spin_for(0.1); 
    cout << "f1 consuming " << i << "\n"; 
    return i; 
  } ); 

  function_node< int, int, rejecting > f2( g, 1, []( int i ) -> int { 
    spin_for(0.2); 
    cout << "f2 consuming " << i << "\n"; 
    return i; 
  } ); 

  priority_queue_node< int > q(g);

  make_edge( q, f1 ); 
  make_edge( q, f2 ); 
  for ( int i = 10; i > 0; --i ) { 
    q.try_put( i ); 
  } 
  g.wait_for_all(); 
}

まず、function_nodes はデフォルトで、入力時に受信したメッセージをキューに投入します。priority_queue_nodefunction_node で適切に動作させるため、上記の例では、バッファーポリシーを拒否に設定して function_nodes を構築しています。したがって、f1f2 は受信メッセージを内部的にバッファリングせず、代わりに priority_queue_node の上流バッファリングに依存します。

上記の例では、priority_queue_node によってバッファリングされた各メッセージは、f1 または f2 のいずれかに送信されますが、両方に送信されることはありません。

別の動作を考えてみます。つまり、priority_queue_node がすべての後続ノードにブロードキャストした場合はどうなるでしょうか? すべてのノードではなく一部のノードがメッセージを受け入れる場合はどうなるでしょう? すべてのノードがメッセージを受け入れるまでメッセージをバッファリングする必要がありますか、それとも受け入れるサブセットにのみ配信する必要がありますか? ノードがメッセージをバッファリングし続ける場合、最終的にすべてのノードに同じ順序でメッセージを配信する必要がありますか? それともノードが次のメッセージを受け取った時点での現在の優先順位で配信する必要がありますか? 例えば、後継ノード f1 が “9” を受け入れたが、別の後継ノード f2 がそれを拒否した場合、priority_queue_node には “9” のみが含まれているとします。その後、値 “100” が到着し、f2 がメッセージを受信できるようになります。f2 は次に “9” を受信するべきでしょうか、それとも優先度が高い “100” を受信するべきでしょうか? いずれにしても、すべての後続処理で各メッセージを確実に受信しようとすると、ガベージ・コレクションの問題が発生し、推論が複雑になります。したがって、これらのバッファーノードは、各メッセージを 1 つの後続ノードにのみプッシュします。また、この特性を利用して、上のグラフに示すような便利なグラフ構造を作成できます。このグラフ構造では、各メッセージは f1 または f2 によって優先順位に従って処理されます。

しかし、実際に f1f2 の両方にすべての値を優先順位に従って受け取ってもらいたい場合はどうすればよいでしょうか? この動作は、次に示すように、function_node ごとに 1 つの priority_queue_node を作成し、broadcast_node を介して両方のキューに各値をプッシュすることで簡単に作成できます。

graph g; 

function_node< int, int, rejecting > f1( g, 1, []( int i ) -> int { 
  spin_for(0.1); 
  cout << "f1 consuming " << i << "\n"; 
  return i; 
} ); 

function_node< int, int, rejecting > f2( g, 1, []( int i ) -> int { 
  spin_for(0.2); 
  cout << "f2 consuming " << i << "\n"; 
  return i; 
} ); 

priority_queue_node< int > q1(g); 
priority_queue_node< int > q2(g); 
broadcast_node< int > b(g); 

make_edge( b, q1 ); 
make_edge( b, q2 ); 
make_edge( q1, f1 ); 
make_edge( q2, f2 ); 
for ( int i = 10; i > 0; --i ) { 
  b.try_put( i ); 
} 
g.wait_for_all();

したがって、グラフ内のノードを複数の後続ノードに接続する場合、出力がすべての後続ノードにブロードキャストされるか、それとも 1 つの後続ノードにのみブロードキャストされるか理解しておく必要があります。