リダクション#
問題
データセット全体で結合リダクション操作を実行します。
コンテキスト
多くのシリアル・アルゴリズムは、一連の項目をスキャンしてサマリー情報を収集します。
強制
サマリーは、データセットに対する結合操作として表現できます。または、少なくとも再結合が問題にならないほど結合に近いものになります。
解決方法
oneAPI スレッディング・ビルディング・ブロック (oneTBB) には 2 つのソリューションがあります。どちらを選択するかは、いくつかの考慮事項によって異なります。
演算は結合的かつ可換的か?
リダクション・タイプのインスタンスの構築と破棄にはコストがかかります。例えば、浮動小数点数は構築コストが安価です。疎な浮動小数点行列の構築には髙いコストがかかる可能性があります。
オブジェクトの構築コストがそれほどかからない場合は、oneapi::tbb::parallel_reduce を使用します。リダクション操作が可換でない場合でも機能します。
リダクション操作が可換であり、タイプのインスタンスのコストが高い場合は、oneapi::tbb::parallel_for と oneapi::tbb::combinable を使用します。
操作が正確に結合的ではなく、正確に決定論的な結果が必要な場合は、再帰的なリダクションを使用し、oneapi::tbb::parallel_invoke で並列化します。
例
ここで紹介する例では、さまざまなソリューションといくつかのトレードオフを示しています。
最初の例では、oneapi::tbb::parallel_reduce を使用して、タイプ T のシーケンスに対して + リダクションを実行します。シーケンスは、半開区間 [first,last) によって定義されます。
T AssociativeReduce( const T* first, const T* last, T identity ) {
return oneapi::tbb::parallel_reduce(
// リダクションのインデックス・レンジ
oneapi::tbb::blocked_range<const T*>(first,last),
// アイデンティティー要素
identity,
// サブレンジと部分和をリデュース
[&]( oneapi::tbb::blocked_range<const T*> r, T partial_sum )->float {
return std::accumulate( r.begin(), r.end(), partial_sum );
},
// 2 つの部分和をリデュース
std::plus<T>()
);
}この形式の parallel_reduce の 3 番目と 4 番目の引数は、凝集パターンのビルトイン形式です。リダクション前に要素ごとのアクションを実行する場合、それを 3 番目の引数 (サブレンジのリダクション) に組み込むと、参照の局所性が向上しパフォーマンスが向上する可能性があります。凝集のブロックサイズは明示的に指定されていないことに注意してください。parallel_reduce は、暗黙的に使用される oneapi::tbb::auto_partitioner の助けを借りて、ブロックを自動的に定義します。
2 番目の例では、+ が T 上で可換であると想定しています。これは、T オブジェクトの構築にコストがかかる場合に適したソリューションです。
T CombineReduce( const T* first, const T* last, T identity ) {
oneapi::tbb::combinable<T> sum(identity);
oneapi::tbb::parallel_for(
oneapi::tbb::blocked_range<const T*>(first,last),
[&]( oneapi::tbb::blocked_range<const T*> r ) {
sum.local() += std::accumulate(r.begin(), r.end(), identity);
}
);
return sum.combine( []( const T& x, const T& y ) {return x+y;} );
}場合によっては、部分的な結果を破壊的に使用して最終結果を生成することが望ましいことがあります。例えば、部分的な結果がリストである場合、それらを結合して最終結果を形成できます。その場合は、combinable の代わりに oneapi::tbb::enumerable_thread_specific クラスを使用します。分割統治の ParallelFindCollisions の例でこの手法が示されています。
浮動小数点の加算と乗算はほぼ結合的です。再関連付けを行うと、丸めの影響により差が生じる可能性があります。これまで示した手法では、用語は非決定論的に再関連付けされます。完全に結合的ではない操作に対する決定論的な並列リダクションには、決定論的な再結合を使用する必要があります。以下のコードは、T タイプの値のシーケンスに対して + リダクションを実行するテンプレートの形式でこれを示しています。
template<typename T>
T RepeatableReduce( const T* first, const T* last, T identity ) {
if( last-first<=1000 ) {
// シリアル・リダクションを使用
return std::accumulate( first, last, identity );
} else {
// 並列分割統治リダクションを実行
const T* mid = first+(last-first)/2;
T left, right;
oneapi::tbb::parallel_invoke(
[&]{left=RepeatableReduce(first,mid,identity);},
[&]{right=RepeatableReduce(mid,last,identity);}
);
return left+right;
}
}外側の if-else 文は、再帰計算の凝集パターンのインスタンスです。リダクション・グラフは厳密な二分木ではありませんが、完全に決定論的です。したがって、すべてのスレッドが同じ浮動小数点丸めを行うと仮定すると、特定の入力シーケンスの結果は常に同じになります。
oneapi::tbb::parallel_deterministic_reduce は、再現可能な非結合的リダクションを実現するシンプルで効率的な方法です。これは oneapi::tbb::parallel_reduce とよく似ていますが、後者とは異なり、決定論的なリダクション・グラフを構築します。これにより、RepeatableReduce のサンプルは AssociativeReduce とほぼ同じになります。
template<typename T>
T RepeatableReduce( const T* first, const T* last, T identity ) {
return oneapi::tbb::parallel_deterministic_reduce(
// リダクションのインデックス・レンジ
oneapi::tbb::blocked_range<const T*>(first,last,1000),
// アイデンティティー要素
identity,
// サブレンジと部分和をリデュース
[&]( oneapi::tbb::blocked_range<const T*> r, T partial_sum )->float {
return std::accumulate( r.begin(), r.end(), partial_sum );
},
// 2つの部分和をリデュース
std::plus<T>()
);
}関数名の変更に加えて、oneapi::tbb::blocked_range に指定された粒度 1000 にも留意してください。凝集に必要なブロックサイズを定義します。非決定性のため、自動ブロックサイズ選択は使用されません。
最後の例は、通常はリダクションとして見なされない問題を、リダクションとすることでどのように並列化できるかを示しています。問題は、データセット全体の計算に対する浮動小数点例外フラグを取得することです。シリアルコードは次のようになります。
feclearexcept(FE_ALL_EXCEPT);
for( int i=0; i<N; ++i )
C[i]=A[i]*B[i];
int flags = fetestexcept(FE_ALL_EXCEPT);
if (flags & FE_DIVBYZERO) ...;
if (flags & FE_OVERFLOW) ...;
...ループのチャンクを個別に計算し、各チャンクの浮動小数点フラグをマージすることで、コードを並列化できます。tbb:parallel_reduce でこれを行うには、まず以下に示すように “ボディー” タイプを定義します。
struct ComputeChunk {
int flags; // これまでに確認された浮動小数点例外を保持
void reset_fpe() {
flags=0;
feclearexcept(FE_ALL_EXCEPT);
}
ComputeChunk () {
reset_fpe();
}
// レンジをサブレンジに分割するときに parallel_reduce によって呼び出される「分割コンストラクター」
ComputeChunk ( const ComputeChunk&, oneapi::tbb::split ) {
reset_fpe();
}
// チャンクを操作し、浮動小数点例外状態を flags メンバーに収集します
void operator()( oneapi::tbb::blocked_range<int> r ) {
int end=r.end();
for( int i=r.begin(); i!=end; ++i )
C[i] = A[i]/B[i];
// ここで = ではなく |= を実行することが重要です。
// そうしないと、同じスレッドからの以前の例外が失われる可能性があります。
flags |= fetestexcept(FE_ALL_EXCEPT);
}
// 2 つのサブレンジの結果を結合するときに parallel_reduce によって呼び出されます。
void join( Body& other ) {
flags |= other.flags;
}
};次のように呼び出します。
// cc の構築により、FP 例外状態が暗黙的にリセットされます。
ComputeChunk cc;
oneapi::tbb::parallel_reduce( oneapi::tbb::blocked_range<int>(0,N), cc );
if (cc.flags & FE_DIVBYZERO) ...;
if (cc.flags & FE_OVERFLOW) ...;
...