C++ 多线程
https://zh.cppreference.com/w/cpp/thread
chrono
自:C++11 高精度时间库
auto start = std::chrono::system_clock::now(); // 获取当前时间点
auto end = std::chrono::system_clock::now(); // 获取当前时间点
std::chrono::steady_clock::duration elapsed = end - start;
long long elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
std::cout << elapsed_seconds << std::endl;
// high_resolution_clock 替代 steady_clock
auto start = std::chrono::system_clock::now(); // 获取当前时间点
auto end = std::chrono::system_clock::now(); // 获取当前时间点
std::chrono::high_resolution_clock::duration elapsed = end - start;
long long elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
std::cout << elapsed_seconds << std::endl;同步和异步
同步:按顺序执行。只有一个线程执行临界区代码,多线程之间,互相交换锁的持有权。调用线程可能阻塞,但是共享资源访问。
异步:任务发起后不阻塞当前线程,允许主线程继续执行其他操作,通过回调或轮询获取结果。一般是做 IO 等耗时操作。
mutex
自:C++11 传统的互斥锁(直译就是互斥)
提供加锁和解锁操作,以确保只有一个线程可以访问某段代码或数据
成员函数 lock unlock
注意:这个东西是一个锁,而不是一共需要操作的变量。锁意味着操作的权利,有锁才能操作,无锁不行。它不是变量! 一般是,锁放在一个所有线程都能拿到的位置。每个线程,执行操作之前,先加锁,操作完毕,解锁。
互斥锁的作用就是互斥,mutual exclusive,是用来保护 [临界区](critical section) 的。所谓临界区就是代码的一个区间,如果两个线程同时执行就有可能出问题,所以需要互斥锁来保护。
std::vector 就是线程不安全的,其模型比较简单,不内置锁
(同步代码例子)
// 共享的欲修改的变量,和一个锁,放在所有线程可见的位置
int shared_counter = 0;
std::mutex mtx;
// 线程函数:对共享资源进行访问
void increment(int id, int times)
{
for (int i = 0; i < times; ++i)
{
// 加锁保护共享资源
mtx.lock();
++shared_counter;
std::cout << "Thread " << id << " incremented counter to " << shared_counter << std::endl;
mtx.unlock();
// 模拟其他操作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}注意:mtx.lock(); 如果无法拿,会异常
锁对比 bool
有的时候我们试图在一段代码开始前 bool=ture,在别处检查 bool,结束后置为 false
但是别处检查的操作和最开始置 true 的操作不是原子的,可能同时进行,这导致异常
而 mutex 是基于底层硬件指令或操作系统来实现的
即使你用 RAII 思想包装也没用
所以你如果想把 bool 换成锁,那就把 bool 放在 atomic 里
atomic
自:C++11
为了避免使用 std::mutex 过于复杂,而发明的模板类
该模板类里可以存储一些值,操作时,自动上锁,解锁。被包装的类型需要是“平凡可复制”
也就是说,如果是 mutex,那么你需要手动释放锁,容易遗忘。如果你加锁后操作的对象恰好是 int 这种,那么给你提供一个现成的。
大概是这样。
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
std::atomic<int> counter(0); // 原子依然是放在了全局,否则访问不到
void increment() {
for (int i = 0; i < 1000; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}
int main() {
const int numThreads = 4;
std::vector<std::thread> threads;
for (int i = 0; i < numThreads; ++i) {
threads.emplace_back(increment);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final counter: " << counter.load() << std::endl;
return 0;
}thread
自:C++11
线程的构造函数:第一个参数接受一个函数指针,第二个参数为可变长参数,接受任何变量
线程构造函数走完,立刻开始执行
A 线程 调用 B 线程的 Join 可以在 A 线程,等待 B 线程结束,自己再运行。
这个有的语言叫做 wait,或者 await
void DoWork(int x, int y)
{
std::cout << "DoWork...\n";
}
int main() {
std::thread worker(DoWork, 0, 1);
worker.join();
std::cout << "Finish";
return 0;
}future
自:C++11
std::future<T> 是将来某个时刻获取的,异步操作的结果
异步任务可以通过 std::async 或者与 std::promise 搭配使用来创建
std::future 是 std::async 的返回值
前者是一个模板类,后者是一个函数
int compute(int a, int b) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return a + b;
}
int main() {
// 启动异步任务,计算 3 + 4
std::future<int> fut = std::async(std::launch::async, compute, 3, 4);
std::cout << "等待异步任务的结果。..\n";
// 等待任务完成并获取返回值
// 做其它是事情
int result = fut.get();
std::cout << "计算结果:" << result << std::endl;
return 0;
}也就是说,future<T>.get() 会像 thread.join() 那样,在当前线程等待,那个其它线程执行完任务,再执行
Q:感觉只是让线程有个位置放结果,你直接 join,把结果放其它位置也差不多。甚至直接在主函数里算不是一样?
A:你没有利用异步的并发性,比如在等待期间做其他事情。否则主线程期间无法执行任务,这里主线程还可以做事。也就是说,如果你直接是 thread::join 则会直接停下来等,但是 future 说这里将会有一个变量,你可以后续用,只有你 get 了,get 的时候,如果没有,才会等。
补充:感觉这个思想有点类似于其他高级语言的异步了
async
用于简化异步任务的启动和管理。它能在后台启动一个任务,并返回一个 std::future 对象
目前标准 C++ 中并没有 std::await 这个接口
promise
和 future 一起用,实现线程间的配合
promise 承诺一个之后会准备好的值,当值被设置后,future 会被通知,future 可以通过 get 拿到这个值
即 future 不仅可以是 async 的返回值,还可以是 promise 的 get 的结果
消费者线程(这里是主线程)创造一个 promise,然后送给生产者,生产者计算后,给 promise 赋值,之后消费者通过 future 判断能不能拿到值
void produce(std::promise<int>& prom) {
// 模拟某种处理
int result = 42;
// 将结果设置到 promise 中
prom.set_value(result);
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread producer(produce, std::ref(prom));
// 在主线程中等待并获取结果
int result = fut.get();
std::cout << "通过 promise 获取的结果:" << result << std::endl;
producer.join();
return 0;
}lock_guard
虽然 atomic 提供了锁,但是只针对于一个平凡变量,如果还是需要 mutex,还是需要手动操作
因此需要一个 RAII 模板类,在构造函数中要求一个 mutex,析构时自动释放
lock_guard 应该放在临界区里,而不是整个线程执行的函数最外层,以实现即使对锁的释放
lock_guard 如果拿不到,会抛出异常 system_error。win 上 code=10,显示资源被占用。
// 线程函数:对共享资源进行访问
void increment(int id, int times) {
for (int i = 0; i < times; ++i) {
// 加锁保护共享资源
std::lock_guard<std::mutex> this_lock(mtx);
++shared_counter;
std::cout << "Thread " << id << " incremented counter to " << shared_counter << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}condition_variable
自:C++11
用于线程之间的通知,唤醒。与 std::mutex 等配合使用
它上面有一个叫做 wait 的函数,此函数接受一个 std::mutex 和一个 lambda 表达式,可以让当前线程进入等待状态。
这个 lambda 表达式,叫做谓词 Predicate,花括号内,返回一个 bool
notify_one 通知等待该条件的一个线程
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥体。
例子你看后面的 unique_lock 就行了
条件变量一般用于:一个线程不清楚自己能不能做某件事情
在以前的模型中,都是生产,没人消费,但是如果自己是消费者,那么要搞清楚自己能不能消费
办法 1:通过轮询某个变量,看是否能消费
办法 2:查询一次,如果不行,就先休息,再也不查询了,并告诉生产者,产出后告诉自己。减小了 CPU 占用。
条件变量就是办法 2
unique_lock
提供比 lock_guard 更多的功能,和 condition_variable 配合使用
一般的流程如下:
1、构造 mutex condition_variable 和 一个要操作的共享数据
2、当生产者线程,去操作时,它首先用 unique_lock 获取锁(以免别人读),生产数据后,使用 condition_variable 通知消费者线程去消费
3、当消费者线程,去操作时,操作前使用 unique_lock 获取锁,然后使用 wait 进行等待,等生产者告诉自己,能够开始消费了,它才进行消费
生产者和消费者都在一个死循环里,不停的工作
// 一个例子,构造了一个线程安全的队列,基于 std::queue,外加了 锁 与 条件变量
template <typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() {}
void push(const T& value) { //供生产者使用的函数,push 时,先拿锁,然后放值,放好以后通知一个消费者消费
std::unique_lock<std::mutex> lock(mtx_);
queue_.push(value);
cv_.notify_one();
}
void pop(T& value) { //消费函数,首先依然拿锁,并检查条件变量的状态,如果不行,则进入等待,并等待生产者唤醒自己;如果可以,就弹出
std::unique_lock<std::mutex> lock(mtx_); // !! 线程在检查条件前必须先获取互斥锁
cv_.wait(lock, [this]() { return !queue_.empty(); }); //调用 wait() 时,线程会释放锁并进入阻塞状态
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable cv_;
};
void producer(ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 10; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(ThreadSafeQueue<int>& queue) {
int value;
for (int i = 0; i < 10; ++i) {
queue.pop(value);
std::cout << "Consumer " << std::this_thread::get_id() << " popped " << value << std::endl;
}
}
int main() {
ThreadSafeQueue<int> queue; // 餐桌,这个队列,是主线程造的,但是数据是在堆上,主线程把指针给了消费者和生产者,那么他们能看到。主线程开了饭店
// 一个生产,俩消费
std::thread t1(producer, std::ref(queue));
std::thread t2(consumer, std::ref(queue));
std::thread t3(consumer, std::ref(queue));
t1.join();
t2.join();
t3.join();
return 0;
}核心
当构造出一个线程时,它到底和原来的线程是否运行在同一个核上?
核心是指逻辑处理器
线程是调度的基本单位,而进程是资源分配的单位
现代操作系统通常调度线程而非进程,所以当一个进程有多个线程时,它们可能被分配到不同核心
默认:操作系统可能倾向于让线程在同一个核心上运行,以利用 CPU 缓存
默认:除非当前核心负载极高,否则不会将线程迁移到其他核心。这样会刷上下文、缓存,慢。但是,线程因等待 I/O 被挂起 → 恢复后可能分配到其他核心
核心分配与线程调度的独立性
现代操作系统(如 Windows、Linux)通常采用节能优先的策略,在轻度负载时会尽量将任务集中在少数几个核心上,以便让其他核心进入低功耗状态
在轻度负载下,比如我开启多个软件,如 IDE、网页,那么也只有一到两个核在工作,而其它核闲置。不管我怎么重启 IDE,那些空闲核永远不会工作,永远是那个负载没满的核有更多负载,去运行这个 IDE。而不是一个空闲的核来工作。只有真的的重量级计算任务到来,空闲核才会被利用。
Intel/AMD CPU 在少数核心高负载时,会自动提升频率(如从 3GHz 飙到 5GHz),单核性能更高,足以应对轻度任务
临界区变量设计
放在全局区里不好,那么
将变量封装到类里,并且将锁、条件变量都合并到类里,通过管理类,来管理这些变量。共享数据与锁的生命周期绑定
主函数负责造这个类,确保生命周期覆盖所有使用线程,其它线程拿到这个类,产出食物,吃食物
eg:线程安全队列
哲学家就餐
Dijkstra 提出的
五个哲学家围坐在一张圆桌旁,每个哲学家面前有一碗饭,每两人之间放一根筷子。
哲学家们的生活就是思考(不需要筷子)和吃饭,但吃饭时需要两根筷子,也就是左右两边的各一根。
问题在于如何设计一个算法,让所有哲学家都能公平地进餐而不会出现死锁或饥饿的情况
1. 资源分级法(有序拿筷子)
- 为每根筷子编号(如 0 到 4)。
- 要求每位哲学家 先拿编号小的筷子,再拿编号大的。
- 作用:破坏“循环等待”条件,避免死锁。
void philosopher(int id) {
int left = id;
int right = (id + 1) % 5;
if (left > right) std::swap(left, right); // 确保先拿编号小的
pickup(left);
pickup(right);
eat();
putdown(left);
putdown(right);
}2. 限制并发数
- 只允许最多 4 位哲学家 同时尝试拿筷子。
- 原理:至少有一位哲学家能拿到两根筷子,避免全阻塞。
- 实现:使用信号量(如
semaphore(4))。
3. 交替行为(非对称策略)
- 奇数编号的哲学家先拿左边筷子,偶数编号先拿右边。
- 作用:打破对称性,降低死锁概率。
4. 超时机制
- 若等待筷子超时,则主动释放已拿到的筷子。
- 缺点:可能导致活锁(反复尝试失败)。
信号量与 PV 操作
信号量大概是对共享资源的访问的协调。它可以用于多线程,也可以是多进程。
在实现上,它就是一个可以原子化操作的类,里面包装了一个 int,int 的意义是,当前还能有几个人使用这个资源。用 condition_variable 和 unique_lock 配合实现功能。
1、初始化。sem_init(&sem, 0, 5)表示最多 5 人同时访问。
2、P = wait。请求。尝试获取资源,如成功,则计数器 -1,并继续,否则阻塞。
3、V = signal。释放。计数器 +1,并唤醒等待的线程。
之所以叫 PV,是荷兰语中尝试、增加的首字母。
之所以不叫 lock 是因为其只允许一次仅一个线程访问资源。
class Semaphore {
public:
Semaphore(int count = 0) : count_(count) {}
void notify() {
std::unique_lock<std::mutex> lock(mtx_);
count_++;
cv_.notify_one();
}
void wait() {
std::unique_lock<std::mutex> lock(mtx_);
while (count_ <= 0) {
cv_.wait(lock);
}
count_--;
}
private:
std::mutex mtx_;
std::condition_variable cv_;
int count_;
};大概就是给之前的封装的类里面,再加上一个 count 计数。