ローカル・シリアライザー#
コンテキスト
対話型プログラムを考えてみます。同時実行性と応答性を最大化するため、ユーザーが要求した操作をタスクとして実装できます。操作の順序が重要であることがあります。例えば、プログラムが編集可能なテキストをユーザーに提示するとします。テキストを選択したり、選択したテキストを削除する操作ががあります。同じバッファーに対する「選択」と「削除」操作の順序を逆にするのは、適切ではありません。ただし、異なるバッファーでの操作の交換には問題がない場合もあります。したがって、目標は、特定のオブジェクトに関連付けられたタスクの順序を確立することであり、異なるオブジェクト間のタスクの順序を制限することではありません。
強制
特定のオブジェクトに関連付けられた操作は、順番に実行する必要があります。
ロックを使用してシリアル化することは無駄です。なぜなら、スレッドはロックで待機している間、他の場所で有用なワークを行うことができないためです。
解決方法
FIFO (先入先出構造) によってワーク項目を順序付けます。可能であれば、常に項目を処理状態にしておきます。ワーク項目が出現したときに実行中の項目がない場合は、その項目を実行中にします。それ以外の場合は、項目を FIFO にプッシュします。処理中の項目が完了したら、FIFO から別の項目をポップして処理中にします。
このロジックは、FIFO に concurrent_queue を使用し、atomic<int> を使用して待機中および処理中の項目数をカウントすることで、ミューテックスなしでも実装できます。この例ではアカウンティングについて詳しく説明します。
例
次の例は、ノンプリエンプティブな優先順位の例をベースにして、優先順位に加えてローカルでのシリアル化を実装します。3 つの優先度レベルとローカル・リアライザーを実装します。ユーザー・インターフェイスは次のとおりです。
enum Priority {
P_High,
P_Medium,
P_Low
};
template<typename Func>
void EnqueueWork( Priority p, Func f, Serializer* s=NULL );テンプレート関数 EnqueueWork は、次の表の 3 つの条件が満たされたときに関数 f を実行します。
条件 |
クラスで解決… |
|---|---|
|
|
スレッドが利用可能である。 |
|
より優先度の高いワークの実行準備ができていない。 |
|
特定の関数に対する条件は、表の上から下に向かって解決されます。s が NULL の場合、最初の条件はありません。EnqueueWork の実装では、関数を SerializedWorkItem にパッケージ化し、ワーク間の最初の関連する条件を適用するクラスにルーティングします。
template<typename Func>
void EnqueueWork( Priority p, Func f, Serializer* s=NULL ) {
WorkItem* item = new SerializedWorkItem<Func>( p, f, s );
if( s )
s->add(item);
else
ReadyPile.add(item);
}SerializedWorkItem は WorkItem から派生したもので、ワークの詳細を知らなくても、優先順位付けされたワークを伝達する手段として機能します。
// 優先順位が付けられたワークの抽象基本クラス
class WorkItem {
public:
WorkItem( Priority p ) : priority(p) {}
// 派生クラスは実際のワークを定義
virtual void run() = 0;
const Priority priority;
};
template<typename Func>
class SerializedWorkItem: public WorkItem {
Serializer* serializer;
Func f;
/*override*/
void run() {
f();
Serializer* s = serializer;
// Serializer の next 関数を実行する前に f を破棄
delete this;
if( s )
s->noteCompletion();
}
public:
SerializedWorkItem( Priority p, const Func& f_, Serializer* s ) : WorkItem(p), serializer(s), f(f_)
{}
};基本クラス WorkItem は、ノンプリエンプティブな優先度の例の WorkItem クラスと同じです。シリアル制約の概念は基本クラスから完全に隠匿されているため、フレームワークは他の種類の制約や制約の欠如を拡張することが可能になります。SerializedWorkItem クラスは、本質的には、ノンプリエンプティブな優先順位の例の ConcreteWorkItem を基に、Serializer 機能を拡張したものです。
関数を実行すると、仮想メソッドrun() が呼び出されます。次の 3 つのステップを実行します。
ファンクターを実行。
ファンクターを破棄。
ファンクターが完了したことを
Serializerに通知し、次に待機中のファンクターの制約を解除します。
ステップ 3 は、ConcreteWorkItem::run 操作と異なります。いくつかのコンテキストでは、同時実行性をわずかに向上させるため、ステップ 2 をステップ 3 の後に実行できます。ただし、ステップ 2 に時間がかかる場合は、次の関数が実行される前に完了の必要がある副作用が発生する可能性が髙いため、提示された順序が推奨されます。
Serializer クラスは、ローカル Serializer パターンの中核を実装します。
class Serializer {
oneapi::tbb::concurrent_queue<WorkItem*> queue;
std::atomic<int> count; // キューに投入された項目と処理中の項目の数
void moveOneItemToReadyPile() { // キューから ReadyPile にアイテムを転送
WorkItem* item;
queue.try_pop(item);
ReadyPile.add(item);
}
public:
void add( WorkItem* item ) {
queue.push(item);
if( ++count==1 )
moveOneItemToReadyPile();
}
void noteCompletion() { // ワーク項目が完了したときに呼び出されます
if( --count!=0 )
moveOneItemToReadyPile();
}
};このクラスは 2 つのメンバーを維持します。
前のワークが完了するのを待機する WorkItem のキュー。
キューに投入されたワークまたは処理中のワークの数。
ミューテックスは、concurrent_queue<WorkItem*> と atomic<int> を用いることで回避され、操作の順序を慎重にすることで実現されています。count の遷移は、Serializer クラスがどのように動作するか理解する鍵となります。
addメソッドがcountを 0 から 1 にインクリメントする場合、他に処理中のワークがないことを示しており、そのためワークをReadyPileに移動する必要があります。noteCompletionメソッドが count をデクリメントし、それが 1 から 0 でない場合、キューは空ではなく、キュー内の別の項目をReadyPileに移動する必要があります。
ReadyPile クラスについては、ノンプリエンプティブな優先度の例で説明されています。
優先順位が必要ない場合、moveOneItemToReadyPile メソッドを使用しますが、2 つのバリエーションがありそれぞれ意味が異なります。
moveOneItemToReadyPileメソッドは、item->run()を直接呼び出すことができます。このアプローチは、特定のSerializerに対してオーバーヘッドが比較的低く、スレッドの局所性が高くなります。しかし公正ではありません。Serializerにタスクのストリームが継続的に存在する場合、その上で動作するスレッドは他のタスクを除外してそれらのタスクを処理し続けます。moveOneItemToReadyPileメソッドは、task::enqueueを呼び出して、item->run()を呼び出すタスクをキューに登録できます。それにより、最初のアプローチよりもオーバーヘッドが高くなり、局所性は低くなりますが、スターベーションを回避できます。
公平性と最大局所性の間の矛盾は根本的なものです。最善の解決策は状況によって異なります。
このパターンは、Serializer クラスで管理される制約よりも一般的なワーク項目に対する制約に一般化されます。一般化された Serializer::add は、ワーク項目が制約されていないか判断され、制約されていない場合はすぐに実行します。一般化された Serializer::noteCompletion は、現在のワーク項目の完了によって制約が解除された、それまで制約されていたすべてのワークを実行します。「実行」という用語は、ワークをすぐに実行するか、さらに制約がある場合はワークを次の制約リゾルバーに転送することを意味します。