条件变量 条件变量顾名思义,就是就是线程A1,A2, A3.. 由于不满足某些条件而挂起。然后当条件被其他线程满足了后,A1,A2,A3…就因为满足了条件被唤醒继续自己的工作,参考以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <vector> std::mutex mtx; //互斥量 std::condition_variable cv; //条件变量 bool ready = false; int current_hour = 0; // 幽灵类 class Ghost { public: Ghost(int id) : id(id) {} void wait_for_signal() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return ready; }); std::cout << "Ghost " << id << " is moving at hour " << current_hour << ".\n"; } private: int id; }; // 控制时间流动的函数 void control_time() { while (current_hour < 24) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::unique_lock<std::mutex> lock(mtx); current_hour++; if (current_hour == 12 || current_hour == 24) { ready = true; cv.notify_all(); // 满足条件,唤醒所有幽灵 } } } int main() { std::vector<std::thread> threads; for (int i = 0; i < 5; ++i) { threads.emplace_back(&Ghost::wait_for_signal, Ghost(i + 1)); } std::thread controller(control_time); for (auto& th : threads) { th.join(); } controller.join(); return 0; }
条件变量condition_variable 经常和mutex互斥量一起使用,代码中有两个线程,一个线程叫ghost,一个线程叫control_time, 想要设计:control_time修改条件,以满足一定条件后放出ghosts。 对于ghost而言,会使用raii机制的lock去获取mtx互斥量,比如ghost1获取了mtx,其他ghost这个时候去获取是失败的进而阻塞,然后到运行cv.wait(lock, lamda),会把lock释放进而变成被在wait函数阻塞,然后其他线程相继获得了mtx,然后释放,然后阻塞在cv.wait函数。
对于control_time而言,也会使用raii机制lock去获取mtx,然后进而操作,需要等待一段时间才满足条件,当满足条件的时候把ready赋值为true,然后调用cv.notify_all函数唤醒所有线程。
对于被唤醒的ghost而言,重新一个一个相继获得了mtx的锁,然后执行操作,释放。
虚假唤醒 为什么这里cv.wait(lock, lamda),会需要传入一个lamda函数呢?因为由于操作系统的底层设计原因,调度线程的算法机制,为了兼顾性能,会周期性唤醒线程。这个唤醒不是由条件变量唤醒的,所以称之“虚假唤醒”,为了避免虚假唤醒,上述代码有个ready初始化为false,lamda表达式也返回ready,由于返回的ready一直为false,所以“虚假唤醒”,会被阻止。真正的满足了read=true,同时cv.notify()执行,才会唤醒线程继续操作。
互斥量 互斥量是为了避免同时访问共享资源。确保只有一个线程能访问某个共享资源。互斥量只有两种状态,锁定和没有锁定。且mutex互斥量的底层封装可能是操作系统的spinlokc,不同操作系统实现不同。而且除了普通的mutex还有衍生出
timed_mutex: 超时加锁
recursive_mutex: 递归加锁,在同一线程可以被加锁多次。
shared_mutex: C++17引入,和unique_lock, 和shared_lock配合1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <thread> #include <mutex> #include <chrono> std::timed_mutex timed_mtx; void task(int id) { while (true) { if (timed_mtx.try_lock_for(std::chrono::milliseconds(100))) { std::cout << "Thread " << id << " acquired the lock." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务 timed_mtx.unlock(); std::cout << "Thread " << id << " released the lock." << std::endl; break; } else { std::cout << "Thread " << id << " could not acquire the lock, retrying..." << std::endl; } } } int main() { std::thread t1(task, 1); std::thread t2(task, 2); t1.join(); t2.join(); return 0; }
std::timed_mutex 允许线程在一定时间内尝试获取锁,如果在指定时间内未能获取到锁,则放弃获取锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <iostream> #include <thread> #include <mutex> std::recursive_mutex recursive_mtx; void recursive_function(int n) { if (n <= 0) return; std::lock_guard<std::recursive_mutex> lock(recursive_mtx); std::cout << "Lock acquired in recursive_function: " << n << std::endl; recursive_function(n - 1); std::cout << "Lock released in recursive_function: " << n << std::endl; } int main() { std::thread t1(recursive_function, 5); std::thread t2(recursive_function, 3); t1.join(); t2.join(); return 0; }
std::recursive_mutex 允许同一个线程多次获取同一个互斥量而不会死锁。每次获取锁后,必须有相应次数的解锁操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <thread> #include <shared_mutex> std::shared_mutex shared_mtx; int shared_data = 0; void reader(int id) { std::shared_lock<std::shared_mutex> lock(shared_mtx); std::cout << "Reader " << id << " reads shared_data: " << shared_data << std::endl; } void writer(int id) { std::unique_lock<std::shared_mutex> lock(shared_mtx); ++shared_data; std::cout << "Writer " << id << " increments shared_data to: " << shared_data << std::endl; } int main() { std::thread t1(reader, 1); std::thread t2(writer, 2); std::thread t3(reader, 3); t1.join(); t2.join(); t3.join(); return 0; }
std::shared_mutex 允许多个线程同时读取共享资源,但只有一个线程能写入。读操作不会阻塞其他读操作,但会阻塞写操作。写操作会阻塞所有其他操作。
互斥量包装器(基于 RAII 的思想) 这里要介绍的是unique_lock类模板,才有RAII的方式对锁进行封装,且用独占所有权的方式管理对mutex上锁和解锁。很方便,析构的时候自动解锁。且提供了很多灵活的函数。
上锁/解锁操作:lock、try_lock、try_lock_for、try_lock_until 和 unlock;
修改操作:支持移动赋值、交换(swap:与另一个 unique_lock 对象互换所管理的互斥量所有权)、释放(release:返回它所管理的互斥量对象的指针,并释放所有权)。
获取属性:owns_lock (返回当前对象是否上了锁)、operator bool() (与 owns_lock() 的功能相同)、mutex(返回当前 unique_lock 所管理的互斥量的指针)。
信号量(二元信号量、计数信号量) 两种信号量:(计数信号量是C++20引入)
binary_semaphore :二元信号量类似于互斥量,信号量只有 0 与 1 。
counting_semaphore :计数信号量
所有关于信号量的定义参考头文件 #include ,计数信号量是一种轻量级同步原语,可以控制对共享资源的访问。与 std::mutex 不同的是,acounting_semaphore 至少允许 LeastMaxValue 并发访问者对同一资源进行多个并发访问。Acounting_semaphore 包含一个由构造函数初始化的内部计数器。该计数器可以通过 acquire() 获取资源访问权限,并通过调用 release() 来释放资源从而递增计数器。当计数器为零时,调用 acquire() 时就会阻塞直到计数器增加,但是调用 try_acquire( ) 不阻塞;try_acquire_for() 和 try_acquire_until() 阻塞直到计数器增加或达到超时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> #include <semaphore> #include <vector> #include <chrono> const int max_resources = 3; std::counting_semaphore<max_resources> counting_sem(max_resources); void resource_user(int id) { counting_sem.acquire(); // 获取一个资源 std::cout << "Resource user " << id << " acquired a resource." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟资源使用 std::cout << "Resource user " << id << " releasing a resource." << std::endl; counting_sem.release(); // 释放一个资源 } int main() { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) { threads.emplace_back(resource_user, i + 1); } for (auto& t : threads) { t.join(); } return 0; }
barrier 和 latch std::latch 是一个一次性同步原语,用于等待一组线程到达某个同步点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <thread> #include <latch> #include <vector> const int num_threads = 5; std::latch latch(num_threads); void worker(int id) { std::cout << "Worker " << id << " is working...\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); // 模拟工作 std::cout << "Worker " << id << " has finished work and is waiting at the latch...\n"; latch.count_down(); // 递减计数 latch.wait(); // 等待所有线程到达同步点 std::cout << "Worker " << id << " is proceeding after latch...\n"; } int main() { std::vector<std::thread> threads; for (int i = 0; i < num_threads; ++i) { threads.emplace_back(worker, i + 1); } for (auto& t : threads) { t.join(); } return 0; }
和latch相比,barrier的不同之处在于是多次使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> #include <barrier> #include <vector> const int num_threads = 5; std::barrier barrier(num_threads, [](){ std::cout << "All threads have reached the barrier, proceeding to next step...\n"; }); void worker(int id) { for (int phase = 1; phase <= 3; ++phase) { std::cout << "Worker " << id << " is working on phase " << phase << "...\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); // 模拟工作 std::cout << "Worker " << id << " has finished phase " << phase << " and is waiting at the barrier...\n"; barrier.arrive_and_wait(); // 到达屏障并等待 } } int main() { std::vector<std::thread> threads; for (int i = 0; i < num_threads; ++i) { threads.emplace_back(worker, i + 1); } for (auto& t : threads) { t.join(); } return 0; }
call_once C++ 11 以后支持 call_once。确保某个操作只被执行一次(成功执行才算),即使是多线程环境下也确保只执行一次,在多线程下初始化单例模式特别有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <iostream> #include <thread> #include <mutex> std::once_flag flag1, flag2; void simple_do_once() { std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; }); } int main() { std::thread st1(simple_do_once); std::thread st2(simple_do_once); std::thread st3(simple_do_once); std::thread st4(simple_do_once); st1.join(); st2.join(); st3.join(); st4.join(); }
一些其他 join & detach 对于join和detach,是启动线程和脱离线程的方法,join会启用线程直到结束,主线程才会结束。detach就是线程和主线程已经没有多大的关系了。
async & future std::async
std::async 用于启动一个异步任务并返回一个 std::future 对象,通过该对象可以获取任务的结果。std::async 可以选择不同的策略(立即执行或延迟执行)。
std::future
对象用于从异步任务中获取结果。std::future 提供了 get() 方法来阻塞并等待异步任务的完成,并返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <future> #include <chrono> int compute_value() { std::cout << "Computing value...\n"; std::this_thread::sleep_for(std::chrono::seconds(3)); return 42; } int main() { // 启动异步任务 std::future<int> result = std::async(std::launch::async, compute_value); std::cout << "Main thread continues...\n"; // 获取异步任务的结果 int value = result.get(); // 阻塞直到结果准备好 std::cout << "Computed value: " << value << std::endl; return 0; }