C++多线程

条件变量

条件变量顾名思义,就是就是线程A1,A2, A3.. 由于不满足某些条件而挂起。然后当条件被其他线程满足了后,A1,A2,A3…就因为满足了条件被唤醒继续自己的工作,参考以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

std::mutex mtx; //互斥量
std::condition_variable cv; //条件变量
bool ready = false;
int current_hour = 0;

// 幽灵类
class Ghost {
public:
    Ghost(int id) : id(id) {}    
    void wait_for_signal() {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, []{ return ready; });
        std::cout << "Ghost " << id << " is moving at hour " << current_hour << ".\n";
    }
private:
    int id;
};

// 控制时间流动的函数
void control_time() {
    while (current_hour < 24) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::unique_lock<std::mutex> lock(mtx);
        current_hour++;
        if (current_hour == 12 || current_hour == 24) {
            ready = true;
            cv.notify_all(); // 满足条件,唤醒所有幽灵
        }
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(&Ghost::wait_for_signal, Ghost(i + 1));
    }
    std::thread controller(control_time);
    for (auto& th : threads) {
        th.join();
    }
    controller.join();
    return 0;
}

条件变量condition_variable 经常和mutex互斥量一起使用,代码中有两个线程,一个线程叫ghost,一个线程叫control_time, 想要设计:control_time修改条件,以满足一定条件后放出ghosts。
对于ghost而言,会使用raii机制的lock去获取mtx互斥量,比如ghost1获取了mtx,其他ghost这个时候去获取是失败的进而阻塞,然后到运行cv.wait(lock, lamda),会把lock释放进而变成被在wait函数阻塞,然后其他线程相继获得了mtx,然后释放,然后阻塞在cv.wait函数。

对于control_time而言,也会使用raii机制lock去获取mtx,然后进而操作,需要等待一段时间才满足条件,当满足条件的时候把ready赋值为true,然后调用cv.notify_all函数唤醒所有线程。

对于被唤醒的ghost而言,重新一个一个相继获得了mtx的锁,然后执行操作,释放。

虚假唤醒

为什么这里cv.wait(lock, lamda),会需要传入一个lamda函数呢?因为由于操作系统的底层设计原因,调度线程的算法机制,为了兼顾性能,会周期性唤醒线程。这个唤醒不是由条件变量唤醒的,所以称之“虚假唤醒”,为了避免虚假唤醒,上述代码有个ready初始化为false,lamda表达式也返回ready,由于返回的ready一直为false,所以“虚假唤醒”,会被阻止。真正的满足了read=true,同时cv.notify()执行,才会唤醒线程继续操作。

互斥量

互斥量是为了避免同时访问共享资源。确保只有一个线程能访问某个共享资源。互斥量只有两种状态,锁定和没有锁定。且mutex互斥量的底层封装可能是操作系统的spinlokc,不同操作系统实现不同。而且除了普通的mutex还有衍生出

  • timed_mutex: 超时加锁
  • recursive_mutex: 递归加锁,在同一线程可以被加锁多次。
  • shared_mutex: C++17引入,和unique_lock, 和shared_lock配合
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <chrono>
    std::timed_mutex timed_mtx;
    void task(int id) {
        while (true) {
            if (timed_mtx.try_lock_for(std::chrono::milliseconds(100))) {
                std::cout << "Thread " << id << " acquired the lock." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务
                timed_mtx.unlock();
                std::cout << "Thread " << id << " released the lock." << std::endl;
                break;
            } else {
                std::cout << "Thread " << id << " could not acquire the lock, retrying..." << std::endl;
            }
        }
    }
    int main() {
        std::thread t1(task, 1);
        std::thread t2(task, 2);
        t1.join();
        t2.join();
        return 0;
    }
    std::timed_mutex 允许线程在一定时间内尝试获取锁,如果在指定时间内未能获取到锁,则放弃获取锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>
#include <mutex>
std::recursive_mutex recursive_mtx;
void recursive_function(int n) {
    if (n <= 0) return;
    std::lock_guard<std::recursive_mutex> lock(recursive_mtx);
    std::cout << "Lock acquired in recursive_function: " << n << std::endl;
    recursive_function(n - 1);
    std::cout << "Lock released in recursive_function: " << n << std::endl;
}
int main() {
    std::thread t1(recursive_function, 5);
    std::thread t2(recursive_function, 3);
    t1.join();
    t2.join();
    return 0;
}

std::recursive_mutex 允许同一个线程多次获取同一个互斥量而不会死锁。每次获取锁后,必须有相应次数的解锁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <shared_mutex>
std::shared_mutex shared_mtx;
int shared_data = 0;
void reader(int id) {
    std::shared_lock<std::shared_mutex> lock(shared_mtx);
    std::cout << "Reader " << id << " reads shared_data: " << shared_data << std::endl;
}
void writer(int id) {
    std::unique_lock<std::shared_mutex> lock(shared_mtx);
    ++shared_data;
    std::cout << "Writer " << id << " increments shared_data to: " << shared_data << std::endl;
}
int main() {
    std::thread t1(reader, 1);
    std::thread t2(writer, 2);
    std::thread t3(reader, 3);
    t1.join();
    t2.join();
    t3.join();
    return 0;
}

std::shared_mutex 允许多个线程同时读取共享资源,但只有一个线程能写入。读操作不会阻塞其他读操作,但会阻塞写操作。写操作会阻塞所有其他操作。

互斥量包装器(基于 RAII 的思想)

这里要介绍的是unique_lock类模板,才有RAII的方式对锁进行封装,且用独占所有权的方式管理对mutex上锁和解锁。很方便,析构的时候自动解锁。且提供了很多灵活的函数。

  • 上锁/解锁操作:lock、try_lock、try_lock_for、try_lock_until 和 unlock;
  • 修改操作:支持移动赋值、交换(swap:与另一个 unique_lock 对象互换所管理的互斥量所有权)、释放(release:返回它所管理的互斥量对象的指针,并释放所有权)。
  • 获取属性:owns_lock (返回当前对象是否上了锁)、operator bool() (与 owns_lock() 的功能相同)、mutex(返回当前 unique_lock 所管理的互斥量的指针)。

信号量(二元信号量、计数信号量)

两种信号量:(计数信号量是C++20引入)

binary_semaphore:二元信号量类似于互斥量,信号量只有 0 与 1 。

counting_semaphore:计数信号量

所有关于信号量的定义参考头文件 #include ,计数信号量是一种轻量级同步原语,可以控制对共享资源的访问。与 std::mutex 不同的是,acounting_semaphore 至少允许 LeastMaxValue 并发访问者对同一资源进行多个并发访问。Acounting_semaphore 包含一个由构造函数初始化的内部计数器。该计数器可以通过 acquire() 获取资源访问权限,并通过调用 release() 来释放资源从而递增计数器。当计数器为零时,调用 acquire() 时就会阻塞直到计数器增加,但是调用 try_acquire( ) 不阻塞;try_acquire_for() 和 try_acquire_until() 阻塞直到计数器增加或达到超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>
#include <chrono>
const int max_resources = 3;
std::counting_semaphore<max_resources> counting_sem(max_resources);
void resource_user(int id) {
    counting_sem.acquire(); // 获取一个资源
    std::cout << "Resource user " << id << " acquired a resource." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟资源使用
    std::cout << "Resource user " << id << " releasing a resource." << std::endl;
    counting_sem.release(); // 释放一个资源
}
int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(resource_user, i + 1);
    }
    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

barrier 和 latch

std::latch 是一个一次性同步原语,用于等待一组线程到达某个同步点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <latch>
#include <vector>

const int num_threads = 5;
std::latch latch(num_threads);

void worker(int id) {
    std::cout << "Worker " << id << " is working...\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); // 模拟工作
    std::cout << "Worker " << id << " has finished work and is waiting at the latch...\n";
    latch.count_down(); // 递减计数
    latch.wait(); // 等待所有线程到达同步点
    std::cout << "Worker " << id << " is proceeding after latch...\n";
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, i + 1);
    }

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

和latch相比,barrier的不同之处在于是多次使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <barrier>
#include <vector>
const int num_threads = 5;
std::barrier barrier(num_threads, [](){ std::cout << "All threads have reached the barrier, proceeding to next step...\n"; });
void worker(int id) {
    for (int phase = 1; phase <= 3; ++phase) {
        std::cout << "Worker " << id << " is working on phase " << phase << "...\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); // 模拟工作
        std::cout << "Worker " << id << " has finished phase " << phase << " and is waiting at the barrier...\n";
        barrier.arrive_and_wait(); // 到达屏障并等待
    }
}
int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(worker, i + 1);
    }
    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

call_once

C++ 11 以后支持 call_once。确保某个操作只被执行一次(成功执行才算),即使是多线程环境下也确保只执行一次,在多线程下初始化单例模式特别有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag1, flag2;
void simple_do_once()
{
    std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
 
int main()
{
    std::thread st1(simple_do_once);
    std::thread st2(simple_do_once);
    std::thread st3(simple_do_once);
    std::thread st4(simple_do_once);
    st1.join();
    st2.join();
    st3.join();
    st4.join();
}

一些其他

join & detach

对于join和detach,是启动线程和脱离线程的方法,join会启用线程直到结束,主线程才会结束。detach就是线程和主线程已经没有多大的关系了。

async & future

std::async

std::async 用于启动一个异步任务并返回一个 std::future 对象,通过该对象可以获取任务的结果。std::async 可以选择不同的策略(立即执行或延迟执行)。

std::future

对象用于从异步任务中获取结果。std::future 提供了 get() 方法来阻塞并等待异步任务的完成,并返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <future>
#include <chrono>
int compute_value() {
    std::cout << "Computing value...\n";
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 42;
}
int main() {
    // 启动异步任务
    std::future<int> result = std::async(std::launch::async, compute_value);
    std::cout << "Main thread continues...\n";
    // 获取异步任务的结果
    int value = result.get(); // 阻塞直到结果准备好
    std::cout << "Computed value: " << value << std::endl;
    return 0;
}