An Introduction to Multithreading in C++20 - Anthony Williams - ACCU 2022
CppCon 2022 也讲了,以前记录过。这次选用 ACCU 的版本,因为内容更全一些。
Anthony 应该都认识,写《C++并发编程》第二版的那个作者。这是他在 ACCU/CppCon 的讲座的总结,内容是关于 C++20 的所有并发 API。
首先那个并发执行策略直接跳过,17进入标准,马上就要废弃了纯纯没用。
取消操作
假设你的 GUI 下载程序支持取消操作,你点一下下载,发现需要很长时间。你可能会说,啊,快取消这个操作,然后让其他线程帮他清理资源。
C++20 提供了两个工具,std::stop_source
, std::stop_token
来解决协作任务的取消。
std::stop_source
用来检查是否有人让你取消操作。
取消是一个协作任务,如果别人让你取消,但你不检查 std::stop_source
,那么什么都不会发生。
具体的用法如下:
- 创建
std::stop_source
- 从
std::stop_source
获取std::stop_token
- 将
std::stop_token
传递给一个新的线程或者 task(std::async) - 当你想取消操作的时候,就调用
source.request_stop()
- 周期性检测:
token.stop_requested()
来查看是否有人让你暂停 - 如果你不检查,那么什么都不发生
|
|
他们背后并没有什么同步机制,总之你得自己检查。
你可以使用 std::stop_callback
来提供你自己的取消机制,如:
|
|
这样可以在结束后执行回调。
管理线程
std::jthread
C++20 后,std::jthread
才是你的首选;很少的情况下也许会使用 std::async
。
std::jthread
会自动提供你 stop_token
(当然,前提是你的函数接收这个参数,不接受的话就不会给你)
|
|
我们来看看 std::jthread
的 API:
std::jthread
默认构造- 创建一个空的对象
std::jthread x{Callable, Args...}
- 创建新的
std::stop_source
- src - 创建新线程调用
Callable(src.get_token(), Args...)
orCallable(Args...)
- 创建新的
- 析构函数
- 调用
src.request_stop()
,然后等待线程结束后帮你 join
- 调用
x.get_id()
x.join()
等待线程结束,不会析构对象
std::jthread
是一个值类型,已经是一个 handle 了,是可移动的,可以转移所有权,可以被存储在容器里。不要 new。
Anthony 表示求你了,人家已经是一个 handle 了,别再 new 了。(说实话我也不懂为什么有人这么干。对 std::thread 就有人这么干。难道是写
->
运算符看起来高级吗?)
线程:Callable and Args
callabe 和 args 会被拷贝到新线程的 local。
这个主要是防止悬垂引用和竞争,如果想用引用请用 std::ref()
或者 lambda。
取消 API
|
|
同步工具
多数情况下,线程之间有交互,例如数据交互。那么要小心数据竞争。在 C++ 中,数据竞争全是 ub。
C++20 提供了一大堆同步工具:
- latches
- barriers
- futures
- mutexes
- semaphores
- atomics
这个顺序是一会讲解的顺序,也是推荐你使用的顺序,按照这个顺序依次思考,这个组件是否能解决你的问题。
Anthony 表示,例如 atomic 就很容易用错,不过你的团队里总有人知道怎么正确使用。所以普通人不懂的就不要乱用。
(确实,不懂内存序那也用不明白 atomic。)
Latches
std::latch
是只能使用一次的计数器,线程们等待它归零后通行。
- 创建一个非 0 count 的 latch
- 一个或者多个线程减少 count(原子的)
- 其他线程可能等待 latch 被触发
- 当 latch 归零的时候,会保持触发,所有的等待它的线程都会被唤醒
|
|
就像一个结界,latch 归 0,线程就都能通过了。
这个也可以用于测试并发,让所有的线程等待一个 latch,之后执行你的可能会发生 data race 的代码。
Barriers
std::barrier<>
是一个模板类:
- 构造一个 barrier,需要一个 count,以及一个 Completion Function
- 一个或者多个线程到达 barrier
- 等待 barrier 被触发
- count 归 0,触发 barrier,会调用 Completion Function,然后重复以上过程。
|
|
在游戏渲染可能比较有用,因为游戏每一帧都要同步。而 nvidia 对这俩都有特殊的指令支持。
barrier 可以循环使用,提供了 completion function 也方便在触发后执行一些操作,比如写入文件。
Futures
std::future
有时你只是单纯想把数据从一个线程传递到另外一个线程,future 就是干这个的。
futures 用于线程间的数据传递。
std::async
发起任务,并返回一个值。推荐的使用方法时,使用std::async
来立马进行一个计算,之后会使用 future 获取值。否则你可能需要的是std::packaged_task
std::promise
显式设定值std::packaged_task
将返回一个值的任务封装。它会保存一个任务,你可以在未来对其进行调用,之后获取 future。可以使用这个配合线程池,来获取返回值
以上所有这些,你都可以使用 std::future<T>
来获取结果。
|
|
std::promise
promise 可能是你用的最多的:
|
|
使用 future/promise 传递数据:
|
|
注意,future 是一次性的,所以你需要注意移动。此外,调用 get() 后,其就不拥有值了。
对于异常:
|
|
std::async
还可以使用 std::async
来发起一个任务:
std::async
可以用来创建一个线程,只要你第一个参数传递 std::launch::async
,否则的话是否新建一个线程是实现定义。
|
|
f.get()
会返回 func 的结果f 拥有一个类似
jthread
的线程,析构函数会帮你处理现成的
std::shared_future
std::shared_future
允许你让多个线程收到执行结果。
|
|
std::mutex
future 比 mutex 好一点,但因为它是一次性的。所以你也许需要 mutex
C++ 提供了 6 种 mutex,实则有 5 个没啥用
|
|
对于上锁和解锁,我们也有 RAII 封装:
|
|
你把所有的锁都给 std::scoped_lock
,他会一起给你上锁,而且保证不会死锁。
等待数据
如何等待数据完成?
忙等吗?这个是个 bad idea,浪费 cpu 性能。
|
|
所以我们有 std::condition_variable
来通知,他并不能同步数据,但可以避免忙等待。
我们需要一个 std::unique_lock
,因为其跟 scoped_lock 不合
|
|
首先你需要给 mutex
上锁,然后你再调用条件变量来传递 lock
以及一个 lambda,表明:**你到底在等待什么?**当你的 lambda 返回 true 时,就会替你解锁。
对于 std::condition_variable
,使用忙等实现是合法的,但是操作系统不会这么做,因为操作系统知道这样做不好。如何通知呢?你需要调用一下 notify()
|
|
如何使用条件变量配合取消操作呢?使用 std::condition_variable_any
|
|
现在 wait 会返回一个布尔变量,告诉你你的谓词是否返回 true,如果返回 false,就代表我需要暂停操作,因为某些线程告诉我 stop。
当然,如果同时发生 stop 和数据准备好两个事件,那么依然会返回 true,那么依然会处理数据。具体怎么做取决于你,如果你的操作很快,那可以继续,如果想取消,就显式判断 stop。
信号量
如果以上的工具都不能满足你(显然都不满足你有点难度),那么你可能需要信号量。
信号量代表一些可以被使用的 “slots”。如果你 acquire 一个 slot,那么信号量会减少,直到你 release 该 slot。
acquire 一个 slot,但是 count 为 0 时,会阻塞或者失败。
信号量可以实现所有的同步机制,包括 latches, barriers, mutexes,当然,大部分情况下你直接用更高级的结构即可。
binary_semaphore
有 2 个状态:1 slot free,no slots free,就像 mutex 一样使用。
C++20 提供了 std::counting_semaphore<max_count>
, std::binary_semaphore
是其 max_count = 1
时的特化。
有阻塞的 sem.acquire()
也有 sem.try_acquire()
,..for, …until,替代阻塞的版本。
原子变量
原子变量是最最底层的同步设施了。在 C++ 里是 std::atomic<T>
T 必须 可平凡拷贝,Bitwise comparable
原子变量 lock_free 吗?
std::atomic<T>
可能不是 lock-free 的,可能会使用内部 mutex。
std::atomic_flags
, std::atomic_signed_lock_free
, std::atomic_unsigned_lock_free
是唯三保证 lock-free 的类型。
在大多数平台上,std::atomic<integral-type>
以及 std::atomic<T*>
是 lock-free 的。
你可以通过 std::atomic<T>::is_always_lock_free
来查看到底是不是 lock-free。