Introduction to C++ Coroutines Through a Thread Scheduling Demonstration
这个视频简单介绍了协程,并且有 compiler view,可以方便的看出协程的行为,之后实现了一个简单的调度器,很有学习价值。
协程的基础使用方法已经在文章 Writing custom C++20 coroutine systems 中介绍的足够详细,所以视频中关于协程基础的用法我会选择性翻译或者直接略过。
也就是说,本文假设你已经了解了协程的基础知识,例如 Promise,Awaitable 等等
重点摘录后半部分对于调度的讲述内容。
Why coroutine
视频中举了一个例子,实际上这一部分也可以参考隔壁微软 C# 的异步文档,道理是一样的。
协程非常有用,如果你需要异步计算(例如GPU,TPU)或者做异步I/O
没有协程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| void cpu_work() {
cpu_matmul(matA, matB...);
}
void gpu_work() {
cudaStream_t stream;
cudaStreamCreate(stream);
gpu_matmul<<<8, 256, 0>>>(matA, matB, ...);
cudaStreamSynchronize(stream); // 这里同步,意味着需要等待 gpu 计算的结果
cudaStreamDestroy(stream);
}
// 实际上 CPU(假设CPU只有一个线程) 和 GPU 是独立的,所以完全没必要顺序执行。
int main() {
cpu_work();
gpu_work();
// 或者
gpu_work();
cpu_work();
}
|
使用协程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| void cpu_work() {
cpu_matmul(matA, matB...);
}
Coro gpu_work() {
cudaStream_t stream;
cudaStreamCreate(stream);
gpu_matmul<<<8, 256, 0>>>(matA, matB, ...);
while (cudaStreamQuery(stream) != cudaSuccess) {
co_await std::suspend_always{};
}
cudaStreamDestroy(stream);
}
int main() {
auto coro = gpu_work();
cpu_work();
while (!coro.done()) { coro.resume(); }
}
|
Coroutine/Promise Compiler’s View
1
2
3
4
5
6
7
8
9
| Coro gpu_work() {
cudaStream_t stream;
cudaStreamCreate(stream);
gpu_matmul<<<8, 256, 0>>>(matA, matB, ...);
while (cudaStreamQuery(stream) != cudaSuccess) {
co_await std::suspend_always{};
}
cudaStreamDestroy(stream);
}
|
↓ Complier View
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| Coro gpu_work() {
Coro::promise_type p();
Coro coro_obj = p.get_return_object();
try {
co_await p.initial_suspend();
cudaStream_t stream;
cudaStreamCreate(stream);
gpu_matmul<<<8, 256, 0>>>(matA, matB, ...);
while (cudaStreamQuery(stream) != cudaSuccess) {
co_await std::suspend_always{};
}
cudaStreamDestroy(stream);
} catch (...) {
p.unhandled_exception();
}
co_await p.final_suspend();
}
|
这就是 promise 和 coroutine 的行为。
对于 co_await std::suspend_always{};
in Compiler View:
1
2
3
4
5
6
7
| // compiler transform
auto&& awaiter = std::suspend_always{};
if (!awaiter.await_ready()) {
awaiter.await_suspend(std::coroutine_handle<>...);
// <suspend/resume> 协程在这里选择性的暂停
}
awaiter.await_resume();
|
总而言之一句话:
Promise 控制协程的行为:initial_suspend()
, final_suspend()
, exception handling…
Awaitable 控制暂停点的行为
Coroutine Handle
coroutine handle 就像是指向协程的指针,你可以通过它访问 promise
而 std::coroutine_handle<>
则是类型擦除版本,它可以代表所有协程,但也代表了你不能用它访问 promise
Coroutine Tasks 以及 Scheduler APIs
接下来举个例子,来编写一个调度器
单线程调度器
在编写调度器之前,我们先看看想要调度什么样的协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| Task TaskA(Scheduler& sch) {
std::cout << "Hello from TaskA\n";
co_await sch.suspend();
std::cout << "Executing the TaskA\n";
co_await sch.suspend();
std::cout << "TaskA finished.\n";
}
Task TaskB(Scheduler& sch) {
std::cout << "Hello from TaskB\n";
co_await sch.suspend();
std::cout << "Executing the TaskB\n";
co_await sch.suspend();
std::cout << "TaskB finished.\n";
}
// emplace: emplace a coroutine handle(task)
// schedule: 调度所有 emplaced task
// get_handle: 获取 coroutine handle
int main() {
Scheduler sch;
sch.emplace(TaskA(sch).get_handle());
sch.emplace(TaskB(sch).get_handle());
std::cout << "Start scheduling...\n";
sch.schedule();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| struct Task {
struct promise_type {
std::suspend_always initial_suspend() {return {};}
std::suspend_always final_suspend() noexcept {return {};}
Task get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
void return_void() {}
void unhandled_exception() {}
};
Task(std::coroutine_handle<promise_type> handle) : handle(handle) {}
auto get_handle() {return handle;}
std::coroutine_handle<promise_type> handle;
};
|
由于 initial_suspend 是 std::suspend_always ,所以在 emplace 后,只会创建协程,而并不执行。
final_suspend 这里返回什么要思考一下,例如返回 suspend_never 就会很容易 ub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| class Scheduler {
std::queue<std::coroutine_handle<>> tasks_;
public:
void emplace(std::coroutine_handle<> task) {
tasks_.push(task);
}
void schedule() {
while(!tasks_.empty()) {
auto task = tasks_.front();
tasks_.pop();
task.resume();
if (!task.done()) {
tasks_.push(task);
}
}
}
auto suspend() {
return std::syspend_always{};
}
};
|
如果使用队列的话,结果会是:
1
2
3
4
5
6
7
| Start scheduling...
Hello from TaskA
Hello from TaskB
Executing the TaskA
Executing the TaskB
TaskA finished
TaskB finished
|
如果把 scheduler 的队列换成栈呢?变成了类似普通的函数
1
2
3
4
5
6
7
| Start scheduling...
Hello from TaskB
Executing the TaskB
TaskB finished
Hello from TaskA
Executing the TaskA
TaskA finished
|
多线程调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| Task TaskA(Scheduler& sch) {
std::cout << "Hello from TaskA\n";
co_await sch.suspend();
std::cout << "Executing the TaskA\n";
co_await sch.suspend();
std::cout << "TaskA finished.\n";
}
Task TaskB(Scheduler& sch) {
std::cout << "Hello from TaskB\n";
co_await sch.suspend();
std::cout << "Executing the TaskB\n";
co_await sch.suspend();
std::cout << "TaskB finished.\n";
}
// emplace: emplace a coroutine handle(task)
// schedule: 调度所有 emplaced task
// get_handle: 获取 coroutine handle
int main() {
Scheduler sch;
sch.emplace(TaskA(sch).get_handle());
sch.emplace(TaskB(sch).get_handle());
std::cout << "Start scheduling...\n";
sch.schedule();
sch.wait();
}
|
执行的代码大多数没有任何区别,区别在于 Scheduler 的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
| class Scheduler {
public:
Scheduler(size_t num_threads);
void emplace(std::coroutine_handle<> task);
auto suspend();
void schedule();
void wait();
private:
void enqueue(std::coroutine_handle<> task); // 插入任务准备执行
void process(std::coroutine_handle<> task); // 恢复任务
std::vector <std::coroutine_handle<>> tasks_; // 存储所有的任务
std::queue<std::coroutine_handle<>> pending_tasks_; // 存储可以被恢复的任务
std::vector<std::thread> workers_; // 存储所有线程
std::mutex mtx_;
std::condition_variable cv_; // block/unblock 现成
bool stop_{false}; // 让所有线程返回
std::atomic<size_t> finished_{}; // 记录完成的任务
};
Scheduler::Scheduler(size_t num_threads) {
workers_.reserve(num_threads);
for (size_t t = 0; t < num_threads; ++t) {
workers_.emplace_back([this]() {
while(true) {
// 这里跟普通的调度器唯一的区别就是,std::function<void()> 变成了 std::coroutine_handle<>
std::coroutine_handle<> task;
{
std::unique_lock lock(mtx_);
// 如果谓词为 true,那么就不阻塞。
cv_.wait(lock, [this]{
return stop_ || (!pending_tasks_.empty());
});
}
// 先检查 stop_,如果其为真,那意味着所有的任务执行完毕。
if (stop_) return;
task = pending_tasks.front();
pending_tasks.pop();
if (task) {
process_(task);
}
}
});
}
}
// 恢复任务
// 如果任务没有完成,就把它添加到 等待队列
// 如果任务完成,就增加 finished_,并检查是否所有任务已经完成
void Scheduler::process_(std::coroutine_handle<> task) {
task.resume();
if (!task.done()) {
enqueue(task);
} else {
if (finished.fetch_add(1) + 1 == tasks_.size()) {
std::unique_lock lock(mtx_);
stop_ = true;
}
cv_.notify_all();
}
}
void Scheduler::enqueue(std::coroutine_handle<> task) {
{
std::unique_lock lock(mtx_);
pending_tasks_.push(task);
}
cv_.notify_one();
}
void Scheduler::emplace(std::coroutine_handle<> task) {
tasks_.emplace_back(task);
}
void Scheduler::schedule() {
for (auto task : tasks_) {
enqueue(task);
}
}
void Scheduler::wait() {
for(auto& w : workers_) {
w.join();
}
}
|
代码实现还可以,不是特别难。
接下来的部分是 CPU-GPU 的调度器,这个比较难,对我个人来说也不涉及,所以暂时不翻译。