低レベルタスク API から移行#
スレッディング・ビルディング・ブロック (TBB) の低レベルのタスク API は、複雑でエラーが発生しやすいと考えられていたため、これが oneAPI スレッディング・ビルディング・ブロック (oneTBB) から削除された主な理由です。このガイドは、低レベルのタスク API が使用されるユースケースを、TBB から oneTBB へ移行するのに役立ちます。
独立したタスクのスポーン#
ほとんどのユースケースでは、個別の独立したタスクのスポーンは、oneapi::tbb::task_group または oneapi::tbb::parallel_invoke のどちらかに置き換えることができます。
例えば、RootTask、ChildTask1、および ChildTask2 は、tbb::task を継承し、そのインターフェイスを実装するユーザー・ファンクターです。そして、互いに並行して実行し RootTask を待機できる ChildTask1 タスクと ChildTask2 タスクのスポーンは、次のように実装されます。
#include <tbb/task.h>
int main() {
// RootTask、ChildTask1、および ChildTask2 が定義されていると仮定。
RootTask& root = *new(tbb::task::allocate_root()) RootTask{};
ChildTask1& child1 = *new(root.allocate_child()) ChildTask1{/*params*/};
ChildTask2& child2 = *new(root.allocate_child()) ChildTask2{/*params*/};
root.set_ref_count(3);
tbb::task::spawn(child1);
tbb::task::spawn(child2);
root.wait_for_all();
}oneapi::tbb::task_group を使用#
上記のコードは、oneapi::tbb::task_group を使用して、次のように書き直すことができます。
#include <oneapi/tbb/task_group.h>
int main() {
// ChildTask1 および ChildTask2 が定義されていると仮定。
oneapi::tbb::task_group tg;
tg.run(ChildTask1{/*params*/});
tg.run(ChildTask2{/*params*/});
tg.wait();
}コードが簡潔になりました。また、ラムダ関数も有効になり、tbb::task* tbb::task::execute() 仮想メソッドをオーバーライドする tbb::task インターフェイスも実装する必要がなくなります。この新しいアプローチでは、void operator() const を実装することで、C++ 標準に従った方法で関数を操作します。
struct Functor {
// このタイプのオブジェクトが oneapi::tbb::task_group::run() メソッドに
// 渡されたときに呼び出されるメンバー
void operator()() const {}
};oneapi::tbb::parallel_invoke を使用#
oneapi::tbb::parallel_invoke を使用して元のコードを書き直し、さらに簡潔にすることもできます。
#include <oneapi/tbb/parallel_invoke.h>
int main() {
// ChildTask1 および ChildTask2 が定義されていると仮定。
oneapi::tbb::parallel_invoke(
ChildTask1{/*params*/},
ChildTask2{/*params*/}
);
}タスク実行中にワークを追加#
oneapi::tbb::parallel_invoke はブロック化スタイルのプログラミングに従います。つまり、並列パターンに渡されたすべての関数が実行を終了したときにのみ完了します。
TBB では、ワークの量が事前に不明であり、並列アルゴリズムの実行中にワークを追加する必要がある場合、ほとんどは tbb::parallel_do 高レベル並列パターンによってカバーされていました。tbb::parallel_do アルゴリズムは、次のようにタスク API を使用して実装できます。
#include <cstddef>
#include <vector>
#include <tbb/task.h>
// RootTask と OtherWork が定義され、tbb::task インターフェイスを実装していると仮定。
struct Task : public tbb::task {
Task(tbb::task& root, int i)
: m_root(root), m_i(i)
{}
tbb::task* execute() override {
// ... 項目 m_i のワークを実行 ...
if (add_more_parallel_work) {
tbb::task& child = *new(m_root.allocate_child()) OtherWork;
tbb::task::spawn(child);
}
return nullptr;
}
tbb::task& m_root;
int m_i;
};
int main() {
std::vector<int> items = { 0, 1, 2, 3, 4, 5, 6, 7 };
RootTask& root = *new(tbb::task::allocate_root()) RootTask{/*params*/};
root.set_ref_count(items.size() + 1);
for (std::size_t i = 0; i < items.size(); ++i) {
Task& task = *new(root.allocate_child()) Task(root, items[i]);
tbb::task::spawn(task);
}
root.wait_for_all();
return 0;
}oneTBB では tbb::parallel_do インターフェイスが削除されました。代わりに、新しくワークを追加する機能が oneapi::tbb::parallel_for_each インターフェイスに組み込まれました。
前述のユースケースは、oneTBB では次のように書き換えることができます。
#include <vector>
#include <oneapi/tbb/parallel_for_each.h>
int main() {
std::vector<int> items = { 0, 1, 2, 3, 4, 5, 6, 7 };
oneapi::tbb::parallel_for_each(
items.begin(), items.end(),
[](int& i, tbb::feeder<int>& feeder) {
// ... 項目 i のワークを実行 ...
if (add_more_parallel_work)
feeder.add(i);
}
);
}TBB と oneTBB はどちらもネストされた式をサポートしているため、すでに実行中のファンクター内から追加のファンクターを実行できます。
前述のユースケースは、oneapi::tbb::task_group 使用して次のように書き換えることができます。
#include <cstddef>
#include <vector>
#include <oneapi/tbb/task_group.h>
int main() {
std::vector<int> items = { 0, 1, 2, 3, 4, 5, 6, 7 };
oneapi::tbb::task_group tg;
for (std::size_t i = 0; i < items.size(); ++i) {
tg.run([&i = items[i], &tg] {
// ... 項目 i のワークを実行 ...
if (add_more_parallel_work)
// OtherWork が定義されていると仮定
tg.run(OtherWork{});
});
}
tg.wait();
}タスクの再利用#
*this を oneapi::tbb::task_group::run() メソッドに渡すことで、関数を再実行できます。この場合、関数はコピーされますが、その状態はインスタンス間で共有できます。
#include <memory>
#include <oneapi/tbb/task_group.h>
struct SharedStateFunctor {
std::shared_ptr<Data> m_shared_data;
oneapi::tbb::task_group& m_task_group;
void operator()() const {
// m_share_data を処理するワークを実行
if (has_more_work)
m_task_group.run(*this);
// 既に m_shared_data に同時にアクセスしている可能性があることに注意
}
};
int main() {
// Data が定義されていると仮定
std::shared_ptr<Data> data = std::make_shared<Data>(/*params*/);
oneapi::tbb::task_group tg;
tg.run(SharedStateFunctor{data, tg});
tg.wait();
}このようなパターンは、関数内のワークが完了していないが、タスク・スケジューラーがグループ実行のキャンセルなどの外部状況に反応する必要がある場合に役立ちます。並行アクセスの問題を回避するには、最後のステップとして再実行を送信することをお勧めします。
#include <memory>
#include <oneapi/tbb/task_group.h>
struct SharedStateFunctor {
std::shared_ptr<Data> m_shared_data;
oneapi::tbb::task_group& m_task_group;
void operator()() const {
// m_share_data を処理するワークを実行
if (need_to_yield) {
m_task_group.run(*this);
return;
}
}
};
int main() {
// Data が定義されていると仮定
std::shared_ptr<Data> data = std::make_shared<Data>(/*params*/);
oneapi::tbb::task_group tg;
tg.run(SharedStateFunctor{data, tg});
tg.wait();
}子として再利用または継続#
oneTBB では、この種の再利用は手作業で行われます。タスクを実行するタイミングを追跡する必要があります。
#include <cstddef>
#include <vector>
#include <atomic>
#include <cassert>
#include <oneapi/tbb/task_group.h>
struct ContinuationTask {
ContinuationTask(std::vector<int>& data, int& result)
: m_data(data), m_result(result)
{}
void operator()() const {
for (const auto& item : m_data)
m_result += item;
}
std::vector<int>& m_data;
int& m_result;
};
struct ChildTask {
ChildTask(std::vector<int>& data, int& result,
std::atomic<std::size_t>& tasks_left, std::atomic<std::size_t>& tasks_done,
oneapi::tbb::task_group& tg)
: m_data(data), m_result(result), m_tasks_left(tasks_left), m_tasks_done(tasks_done), m_tg(tg)
{}
void operator()() const {
std::size_t index = --m_tasks_left;
m_data[index] = produce_item_for(index);
std::size_t done_num = ++m_tasks_done;
if (index % 2 != 0) {
// 子として再利用
m_tg.run(*this);
return;
} else if (done_num == m_data.size()) {
assert(m_tasks_left == 0);
// リダクションを行う継続を生成
m_tg.run(ContinuationTask(m_data, m_result));
}
}
std::vector<int>& m_data;
int& m_result;
std::atomic<std::size_t>& m_tasks_left;
std::atomic<std::size_t>& m_tasks_done;
oneapi::tbb::task_group& m_tg;
};
int main() {
int result = 0;
std::vector<int> items(10, 0);
std::atomic<std::size_t> tasks_left{items.size()};
std::atomic<std::size_t> tasks_done{0};
oneapi::tbb::task_group tg;
for (std::size_t i = 0; i < items.size(); i+=2) {
tg.run(ChildTask(items, result, tasks_left, tasks_done, tg));
}
tg.wait();
}スケジューラーのバイパス#
TBB の task::execute() メソッドは、現在のスレッドで次に実行できるタスクへのポインターを返すことができます。これにより、直接 spawn に比べてスケジュールのオーバーヘッドが削減される可能性があります。spawn と同様に、返されたタスクが現在のスレッドによって実行されることは保証されません。
#include <tbb/task.h>
// OtherTask が定義されていると仮定
struct Task : tbb::task {
task* execute(){
// ワークを実行 ...
auto* other_p = new(this->parent().allocate_child()) OtherTask{};
this->parent().add_ref_count();
return other_p;
}
};
int main(){
// RootTask が定義されていると仮定
RootTask& root = *new(tbb::task::allocate_root()) RootTask{};
Task& child = *new(root.allocate_child()) Task{/*params*/};
root.add_ref_count();
tbb::task_spawn(child);
root.wait_for_all();
}oneTBB では、oneapi::tbb::task_group を使用してこれを実行できます。
#include <oneapi/tbb/task_group.h>
// OtherTask が定義されていると仮定
int main(){
oneapi::tbb::task_group tg;
tg.run([&tg](){
//なにかワークを実行 ...
return tg.defer(OtherTask{});
});
tg.wait();
}ここで、oneapi::tbb::task_group::defer は tg に新しいタスクを追加します。ただし、タスクは oneapi::tbb::task_group::run を介して実行準備が整ったタスクのキューに投入されるのではなく、関数の戻り値を介して実行中のスレッドに直接バイパスされます。
遅延タスク作成#
TBB の低レベルタスク API は、タスクの作成と実際のスポーンを分離します。この分離により、タスクのスポーンを延期することができますが、親タスクと最終結果の生成が早期に終了することがブロックされます。例えば、RootTask、ChildTask、および CallBackTask は、tbb::task を継承し、そのインターフェイスを実装するユーザー・ファンクターです。次に、RootTask が途中で終了するのをブロックして待機する処理は、次のように実装されます。
#include <tbb/task.h>
int main() {
// RootTask、ChildTask、および CallBackTask が定義されていると仮定。
RootTask& root = *new(tbb::task::allocate_root()) RootTask{};
ChildTask& child = *new(root.allocate_child()) ChildTask{/*params*/};
CallBackTask& cb_task = *new(root.allocate_child()) CallBackTask{/*params*/};
root.set_ref_count(3);
tbb::task::spawn(child);
register_callback([cb_task&](){
tbb::task::enqueue(cb_task);
});
root.wait_for_all();
// 制御フローは、ChildTask と CallBackTask の両方が実行された後にのみここに到達
// つまり、コールバックが呼び出された後
}oneTBB では、oneapi::tbb::task_group を使用してこれを実行できます。
#include <oneapi/tbb/task_group.h>
int main(){
oneapi::tbb::task_group tg;
oneapi::tbb::task_arena arena;
// ChildTask と CallBackTask が定義されていると仮定
auto cb = tg.defer(CallBackTask{/*params*/});
register_callback([&tg, c = std::move(cb), &arena]{
arena.enqueue(c);
});
tg.run(ChildTask{/*params*/});
tg.wait();
// ChildTask と CallBackTask の両方が実行されると、制御フローはここに到達
// つまり、コールバックが呼び出された後
}ここで、oneapi::tbb::task_group::defer は tg に新しいタスクを追加します。ただし、oneapi::tbb::task_arena::enqueue が呼び出されるまでタスクはスポーンされません。
注
oneapi::tbb::task_group::wait の呼び出しは、ChildTask と CallBackTask の両方が実行されるまで制御を戻しません。