参考文章:C++多线程开发基础入门教程C++学习笔记:并发与多线程
参考视频:C++编程进阶教程

基本概念

什么是线程、进程、并发

线程(Thread)是操作系统能够进行CPU调度的最小单位,它被包含在进程(program)之中,一个进程可包含单个或者多个线程。
一条线程指的是进程中单一的控制流,每条线程执行不同的任务。

进程是运行起来的可执行程序。
并发是指多个任务同时发生,一个程序同时执行多个独立任务。

进程与线程的关系

一个进程可以包含多个线程,这些线程共享相同的进程空间(代码段、数据段、堆)系统资源(文件描述符、信号处理),但各自有独立的栈空间和线程控制块,不共享内存。

🚀详细区分:

  1. 容器:每个进程都有相应的线程。进程是线程的容器
  2. 最小单位:进程是资源分配的最小单位,而线程是程序执行的最小单位。
  3. 地址空间:进程有自己独立的地址空间,而线程没有独立的地址空间,同一进程的线程共享本进程的地址空间
  4. 资源:进程之间的资源是独立的,而同一进程内的线程共享本进程的资源
  5. 并发执行:进程可以并发执行,同一进程内的多个线程也可以并发执行。

锅炉爷爷是一个进程,每条手可以被看成一个线程。

线程的特点

  1. 轻量级:与进程相比,线程的创建和销毁成本低。因为线程是进程的一个执行流,共享进程的大部分资源,只需要少量的额外开销来维护进程的状态和控制信息。
  2. 共享地址空间和资源:同一进程内的线程共享进程地址空间和全局变量等资源,这使得线程间通信更加便捷。但是这也带来了数据同步和互斥问题,需要适当的同步机制来避免数据竞争和死锁的问题。
  3. 并发执行:多个线程可以在同一时间内并发执行,提高了程序的执行效率。但是由于线程的执行顺序和速度受到操作系统调度策略和硬件性能的影响,因此线程的执行结果可能是不确定的。
  4. 独立调度:线程是独立调度的基本单位。在多线程操作系统中,调度器根据线程的优先级、状态等因素来决定线程的调度顺序和执行时间。

什么是多线程编程

一个程序中创建多个线程并发的执行,每个线程执行不同的任务。

为什么使用多线程/并发(优点)

  1. 充分利用 CPU 资源
  2. 提高程序响应速度
  3. 便于程序设计和维护

并发与并行的区别

  • 并发:同一时间段,多个任务交替执行。
  • 并行:同一时间段,多个任务同时执行。

线程的声明周期

  1. 新建状态 New
  2. 就绪状态 Runnable
  3. 运行状态 Running
  4. 阻塞状态 Blocked
  5. 死亡状态 Dead

Thread 创建线程

总体代码

需要包含头文件 <thread>。可通过回调函数、仿函数、Lambda表达式创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//通过函数来创建线程:
void myprint()//初始函数 //回调函数
{
cout << "自己创建的线程开始执行" << endl;
cout << "自己创建的线程执行中1" << endl;
cout << "自己创建的线程执行中2" << endl;
cout << "自己创建的线程执行中3" << endl;
cout << "自己创建的线程执行中4" << endl;
cout << "自己创建的线程执行中5" << endl;
cout << "自己创建的线程执行完毕" << endl;
}

void test05_01() //通过函数来创建线程
{
thread myobj(myprint); //thread:创建线程的类;//创建线程,调用myprint,从myprint开始执行;
// 🚩创建该线程以后,线程就已经开始执行,并不是等到调用join()或者detach()时才开始执行

if (myobj.joinable())//判断能不能join或者detach,true表示可以join或者detach;
{
cout << "joinable() = true" << endl;
}
else
{
cout << "joinable() = false" << endl;
}

// 🚩当线程启动后,一定要在和线程相关的thread对象被销毁前,对线程调用join()或者detach()方法
myobj.join();//阻塞主线程,让主线程等待子线程执行完毕,然后子线程和主线程汇合;
//myobj.detach(); //一旦使用detach()后,与这个主线程关联的thread对象就失去了与这个主线程的关联,此时这个子线程就会在后台运行(守护线程);
//这个子线程就相当于被C++运行时库接管,这个子线程运行完后,由运行时库清理该线程相关资源;就不能再用join()接管回来了;

if (myobj.joinable())
{
cout << "joinable2() = true" << endl;
}
else
{
cout << "joinable2() = false" << endl; //join()之后也不能再join
}

cout << "主线程运行结束" << endl;
cout << "主线程运行结束2" << endl;
cout << "主线程运行结束3" << endl;
cout << "主线程运行结束4" << endl;
cout << "主线程运行结束5" << endl;
cout << "主线程运行结束6" << endl;
}

/**
* 通过类来创建线程,类作为可调用对象(仿函数):
*/
class TA
{
public:
TA(int i):m_i(i) //有参构造 //不能用引用int &i
{
cout << "TA有参构造函数被执行" << endl;
}

TA(const TA &ta) :m_i(ta.m_i) //拷贝构造
{
cout << "TA拷贝构造函数被执行" << endl;
}

~TA() //析构
{
cout << "TA析构函数被执行" << endl;
}

void operator()()//不能带参数 //operator()()就是线程的入口点
{
cout << "operator()线程开始执行" << endl;
cout << "m_i = " << m_i << endl;
cout << "m_i = " << m_i << endl;
cout << "m_i = " << m_i << endl;
cout << "m_i = " << m_i << endl;
cout << "m_i = " << m_i << endl;
cout << "m_i = " << m_i << endl;
cout << "operator()线程执行完毕" << endl;
}

//int &m_i;
int m_i;
};

void test05_02() //通过类来创建线程
{
int myi = 6;
TA ta(myi);
thread myjob(ta);//ta: 可调用对象 //ta是被复制到了线程中(拷贝构造),主线程结束后这个被复杂的对象依然存在;只要没有引用、指针就不会有问题;
myjob.join();//等待线程结束
//如果用detach(),因为有局部变量ta,且构造函数里是引用,主线程运行结束后会回收这块局部变量ta的内存,在子线程的构造函数里就获取不到这个地址了,会产生不可预料的后果;
cout << "主线程与子线程汇合" << endl;
}

//用Lambda表达式创建线程:(前提是和test05_03一样在局部作用域中才行,不能赋值给全局变量)
auto mylamthread = []() //线程入口
{
cout << "Lambda线程开始执行" << endl;
cout << "Lambda线程执行结束" << endl;
};

void test05_03()//用Lambda表达式创建线程
{
thread myjob(mylamthread);
myjob.join();

cout << "主线程与子线程汇合" << endl;
}

通过可调用对象创建线程

  1. 函数指针
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void FunctionPtrTask()
{
cout << "Executed" << endl;
}

void (*ptr)() = []() // 另一种
{
std::cout << "Executed" << endl;
};

int main()
{
thread t(FunctionPtrTask);
t.join();
return 0;
}
  1. 成员函数指针
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TestClass
{
public:
void MemberFunctionTask()
{
cout << "Executed" << endl;
}
}

int main()
{
TestClass obj;
thread t(&TestClass::MemberFunctionTask, &obj);
t.join();
return 0;
}
  1. Lambda 表达式
1
2
3
4
5
6
7
8
9
10
int main()  
{
// Lambda表达式放在全局作用域中会导致类型问题,建议放在局部作用域中使用
thread t([]()
{
std::cout << "Executed" << endl;
});
t.join();
return 0;
}

或者:

1
2
3
4
5
6
7
8
9
10
int main()  
{
function<void(void)> Lambda = []()
{
std::cout << "Executed" << endl;
};
thread t(Lambda);
t.join();
return 0;
}

🤔例子:不在同一作用域,有警告
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function<void(void)> WrongLambda = []()  //警告
{
std::cout << "Executed" << endl;
}

void (*ptr)() = []() //函数指针同理
{
std::cout << "Executed" << endl;
};

int main()
{
thread t(WrongLambda);
t.join();
return 0;
}

  1. 仿函数 functor(也叫函数对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Functor
{
void operator()() //不能带参数
{
cout << "Executed" << endl;
}
}

int main()
{
Functor functor;
thread t(functor);
t.join();
return 0;
}
  1. 绑定对象(std::bind 创建)
1
2
3
4
5
6
7
8
9
10
11
12
void FunctionWithArgs(int a, int b)
{
cout << "a = " << a << ",b = " << b << endl;
}

int main()
{
auto BoundFunc = std::bind(FunctionWithArgs, 1, 2);
thread t(BoundFunc);
t.join();
return 0;
}

线程参数传递

通过值传递

见前文,1-4 都没传递任何参数。如果要值传递参数且不使用 std::bind,参考以下代码:

1
2
3
4
5
6
7
8
9
10
int main()  
{
void (*ptr)(int) = [](int x)
{
std::cout << x << endl;
};
thread t(ptr, 45); //输出45
t.join();
return 0;
}

或者在使用了 std::bind 的情况下:

1
2
3
4
5
6
7
8
9
10
11
12
void FunctionWithArgs(int a, int b)  
{
cout << "a = " << a << ",b = " << b << endl;
}

int main()
{
auto BoundFunc = std::bind(FunctionWithArgs, 1, placeholders::_1);
thread t(BoundFunc, 2); //a = 1,b = 2
t.join();
return 0;
}

通过引用传递

这意味着原始数据和线程中使用的数据是同一个实体(而不是副本),这就需要小心了,如果你在一个线程中修改了数据,这些修改将在所有引用该数据的线程中可见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void ModifyAndPrint(int& a, int& b)
{
a += 5;
b += 10;
cout << "Modified Values:" << a << "," << b << endl;
}

int main()
{
int x = 5;
int y = 10;
thread t(ModifyAndPrint, std::ref(x), std::ref(y));
t.join();
cout << "Values in main thread: " << x << ", " << y << endl;
return 0;
}

//Modified Values:10,20
//Values in main thread: 10, 20

这里使用了 std::ref 来包装 xy 的引用并进行参数传递。

🚀注意事项:

  • 线程安全性:引用传递数据时需要保证对数据的访问是线程安全的,否则可能遇到数据竞争等并发问题。
  • 资源管理:特别是在新线程使用了动态分配的内存时,确保这些资源在不再需要的时候被正确释放。
  • 异常处理:新线程中抛出的异常不会自动传播到创建该子线程的线程,所以需要确保在新线程中处理可能产生的所有异常(不要尝试在主线程中捕获子线程中可能产生的异常)。

join 与 detach 的区别

  • join:阻塞调用,等待线程完成,适用于需要同步和资源管理的场景。
  • detach:非阻塞调用,线程独立运行,适用于后台任务或不需要同步的场景。
  • 必须调用 joindetach:否则程序会崩溃。
  • 调用 joindetach 后,线程对象不再与实际线程相关联,线程对象可以安全销毁。
  • 一个线程对象只能调用一次 join / detach 方法。

✅表格:

特性joindetach
阻塞性阻塞调用,等待线程完成非阻塞调用,立即返回(从主线程分离,和主线程并发执行)
资源管理线程完成后,资源被回收线程独立运行,资源由操作系统管理
线程控制主线程需要等待线程完成主线程不需要等待线程完成
线程对象状态调用 join 后,线程对象变为无效调用 detach 后,线程对象变为无效
使用场景需要等待线程结果或同步时线程独立运行,不需要同步(如后台任务)

💻代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void SayHello()
{
int i = 0;
while(i < 1000)
{
cout << "Hello: " << i << endl;
i++;
}
}

int main()
{
thread t(SayHello);
t.join(); //阻塞主线程
//t.detach(); //从主线程中分离出去
cout << "This is main Thread." << endl;
return 0;
}

//使用t.join();
//Hello: 0
//Hello: 1
//......
//Hello: 998
//Hello: 999
//This is main Thread.

//使用t.detach();
//This is main Thread.Hello: //顺序是乱的,因为detach后的线程t和主线程是并发执行的

主线程包含的内容

  1. 线程 ID:系统中的唯一标识符,区分不同线程。
  2. 线程栈 (Thread Stack):每个线程
  3. 线程状态:新建 New,就绪 Ready,运行 Running,阻塞 Blocked,终止 Terminated
  4. 线程上下文(Thread Context)
  5. 线程函数(Thread Function)
  6. 线程优先级
  7. 线程属性
  8. 线程同步原语:互斥锁 Mutex、条件变量、信号量 Semaphore,帮助线程在访问共享资源时避免冲突和竞态条件。

this_thread

std::this_thread 是 C++11 引入的一个命名空间,提供了与当前线程相关的操作。它包含一些静态成员函数,用于获取当前线程的信息或控制当前线程的行为。

  1. 获取当前线程的 IDthis_thread::get_id()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void threadFunction() {
cout << "Thread ID: " << this_thread::get_id() << endl;
}

int main() {
thread t(threadFunction);
t.join();

cout << "Main Thread ID: " << std::this_thread::get_id() << endl;
return 0;
}

//Thread ID: 140234567890176
//Main Thread ID: 140234576282880
  1. 暂停当前线程一段时间this_thread::sleep_for()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

void threadFunction() {
cout << "Thread starts" << endl;
// 参数为一个 std::chrono::duration 类型的对象
this_thread::sleep_for(std::chrono::seconds(2)); // 暂停2秒
cout << "Thread resumes after 2 seconds" << endl;
}

int main() {
thread t(threadFunction);
t.join();
return 0;
}
  1. 暂停当前线程到指定时间点std::this_thread::sleep_until()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

void threadFunction() {
auto now = std::chrono::system_clock::now();
auto endTime = now + std::chrono::seconds(2); // 当前时间 + 2 秒

cout << "Thread starts" << endl;
// 参数为一个 std::chrono::time_point 类型的对象
this_thread::sleep_until(endTime); // 暂停到指定时间点
cout << "Thread resumes after 2 seconds" << endl;
}

int main() {
thread t(threadFunction);
t.join();
return 0;
}

//Thread starts
//Thread resumes after 2 seconds
  1. 主动让出 CPU 时间片std::this_thread::yield()
1
2
3
4
5
6
7
8
9
10
11
12
void threadFunction() {
for (int i = 0; i < 5; ++i) {
cout << "Thread is running" << endl;
this_thread::yield(); // 让出CPU时间片,允许其他线程运行。用于实现线程的协作式调度,避免长时间占用CPU
}
}

int main() {
thread t(threadFunction);
t.join();
return 0;
}

线程同步

概念

线程同步(Thread Synchronization)是指通过一定机制来控制多个线程之间的执行顺序,以确保它们能够正确地访问和修改共享资源,从而避免数据竞争和不一致性问题。
|861
✅C++提供了多种线程同步机制(重要):

  1. 互斥锁(Mutex):当一个线程想要访问共享资源时,它首先会尝试获取与该资源关联的互斥锁。如果锁已经被其他线程持有,则该线程被阻塞,直到锁被释放。这样可以确保任何时候只有一个线程能够访问共享资源。包含于头文件 <mutex> 中。
  2. 条件变量(Condition Value):使线程在满足某条件前等待,通常与互斥锁一起使用,以便在等待条件成立时释放锁,并在条件成立时创建锁。这允许线程在等待期间不占用锁,提高并发性能。包含于头文件 <condition_variable> 中。
  3. 信号量(Semaphore):允许多个线程同时访问共享资源,但限制同时访问的线程数量。信号量内部维护一个计数器,用于表示可用资源的数量。当线程需要访问资源时,它会尝试减少计数器的值;当线程释放资源时,它会增加计数器的值。当计数器的值小于零时,尝试获取资源的线程将被阻塞。(PV 操作)
  4. 原子操作(Atomic Operations):原子操作是不可中断的操作(执行过程中不会被其他线程打断),用于安全地更新共享数据,而无需使用互斥锁等同步机制。包含于 C++11 及以后的头文件 <atomic> 中。
  5. 读写锁:允许多个线程同时读取,但只允许一个线程写入
  6. 栅栏(Barrier):用于协调多个线程的执行,使得它们在某个同步点等待,直到所有线程都到达该点(C++20 新特性)。

🚩互斥锁 mutex

当一个线程想要访问共享资源时,它首先会尝试获取与该资源关联的互斥锁。如果锁已经被其他线程持有,则该线程被阻塞,直到锁被释放。这样可以确保任何时候只有一个线程能够访问共享资源。

💻代码示例:

未引入互斥锁时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//共享变量
int counter = 0;

void increment_counter(int times)
{
for (int i = 0; i < times; i++)
{
//数据竞争,多个线程可能同时执行这行代码
counter++;
}
}

int main()
{
thread t1(increment_counter, 100000);
thread t2(increment_counter, 100000);

t1.join();
t2.join();

cout << "最终结果:" << counter << endl; // 145593
return 0;
}

🚩引入互斥锁(需要头文件 <mutex>):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int counter = 0;  
mutex mtx;

void increment_counter(int times)
{
for (int i = 0; i < times; i++)
{
mtx.lock(); //访问临界变量前先加锁
counter++;
mtx.unlock(); //访问完成,解锁
}
}

int main()
{
thread t1(increment_counter, 100000);
thread t2(increment_counter, 100000);

t1.join();
t2.join();

cout << "最终结果:" << counter << endl; // 200000
return 0;
}

注意事项

  1. 死锁
    如果线程在持有互斥量的情况下调用了另一个阻塞操作(如另一个互斥量的 lock()),并且这个阻塞操作永远不会完成(因为其他线程持有它需要的资源),那么就会发生死锁。避免死锁的一种方法是始终按照相同的顺序锁定互斥量,或者使用更高级的同步原语,如 std::lock_guardstd::unique_lock,它们可以自动管理锁的获取和释放。
  2. 异常安全
    如果在锁定互斥量后抛出异常,那么必须确保互斥量被正确解锁。使用 std::lock_guardstd::unique_lock 可以自动处理这种情况,因为它们在析构时会释放锁。
  3. 不要手动解锁未锁定的互斥量
    调用 unlock 之前,必须确保互斥量已经被 lock 锁定。否则该行为是未定义的。
  4. 不要多次锁定同一个互斥量
    对于非递归互斥量(如 std::mutex),不要在同一线程中多次锁定它,这会导致未定义行为。如果需要递归锁定,使用 std::recursive_mutex
  5. 使用 RAII 管理锁
    使用 RAII(资源获取即初始化)原则来管理锁的生命周期,通过 std::lock_guard 或者 std::unique_lock 来确保锁在不需要时自动释放。
  6. 避免长时间持有锁
    尽量缩短持有锁的时间,以减少线程之间的争用,提高程序并发性能。
  7. 考虑使用更高级的同步原语
    除了 std::mutex,还可以使用条件变量 std::condition_variable、读写锁 std::shared_mutex 等等。

🚀使用 std::lock_guardstd::unique_lock(遵循 RAII 原则)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>  
#include <mutex>

std::mutex mtx;

void SafeFunction()
{
std::lock_guard<std::mutex> lock(mtx); //锁定互斥量
//这里执行需要互斥访问的代码。
//如果抛出异常,lock_guard会在析构时自动解锁 mtx try {
if (/*一些可能造成异常的代码*/)
{
throw std::runtime_error("An error occurred!");
}
} catch (const std::exception& e){
//处理异常,不需要担心解锁,lock_guard会处理
std::cerr << e.what() << std::endl;
}
//lock_guard会在析构/离开作用域时自动解锁 mtx
}

✅关于RAII

全称为 Resource Acquisition Is Initialization(资源获取即初始化)。它是 C++ 中管理资源(如内存、文件句柄、互斥锁等)的一种重要机制。RAII 的核心思想是将资源的获取和释放与对象的生命周期绑定,从而确保资源在对象创建时自动获取,在对象销毁时自动释放

优点:自动管理资源、异常安全、简化代码。

RAII 的实现依赖于构造和析构函数。💻示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <string>
using namespace std;

class FileHandler {
public:
FileHandler(const string& filename) {
cout << "Opening file: " << filename << endl;
// 模拟打开文件
// ...
}

~FileHandler() {
cout << "Closing file" << endl;
// 模拟关闭文件
// ...
}

void write(const string& data) {
cout << "Writing data: " << data << endl;
// 模拟写入文件
// ...
}
};

int main() {
FileHandler file("example.txt"); // 资源获取
file.write("Hello, RAII!"); // 使用资源
// 资源在 file 对象销毁时自动释放
return 0;
}

//Opening file: example.txt
//Writing data: Hello, RAII!
//Closing file
  • 构造函数:在 FileHandler 对象创建时,构造函数会自动打开文件(模拟)。
  • 析构函数:在 FileHandler 对象销毁时,析构函数会自动关闭文件(模拟)。
  • 资源管理:通过对象的生命周期管理文件的打开和关闭,避免了手动调用 close() 的复杂性。

🚩RAII 在标准库中应用:

  1. 动态内存管理:unique_ptrshared_ptr
  2. 互斥锁管理:lock_guardunique_lock
  3. 文件管理: ifstreamofstream

mutex 的四种类型

  1. std::mutex:最基本的互斥锁,适用于简单的资源保护。
  2. std::recursive_mutex:支持递归锁定,适用于递归函数或多次锁定场景。
  3. std::timed_mutex:支持超时锁定,适用于需要避免长时间阻塞的场景。
  4. std::recursive_timed_mutex:结合递归锁定和超时锁定功能,适用于复杂的同步需求。

💻代码示例:
recursive_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;

std::recursive_mutex mtx; // 定义一个递归互斥锁

void recursiveFunction(int count) {
std::lock_guard<std::recursive_mutex> lock(mtx); //锁定互斥量
cout << "Count: " << count << endl;
if (count > 0) {
recursiveFunction(count - 1); // 递归调用
}
}

int main() {
std::thread t(recursiveFunction, 5);
t.join();
return 0;
}
//Count: 5
//Count: 4
//Count: 3
//Count: 2
//Count: 1
//Count: 0

//或者:
void recursiveFunction() {
mtx.lock(); //第一次锁定
std::cout << "Thread: " << std::this_thread::get_id() << " locked mutex." << std::endl;
mtx.lock(); //可以多次锁定
std::cout << "Thread: " << std::this_thread::get_id() << " locked mutex." << std::endl;

mtx.unlock();
std::cout << "Thread: " << std::this_thread::get_id() << " unlocked mutex." << std::endl;
mtx.unlock();
std::cout << "Thread: " << std::this_thread::get_id() << " unlocked mutex." << std::endl;
}

timed_mutex:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;

std::timed_mutex mtx; // 定义一个定时互斥锁

void threadFunction() {
if (mtx.try_lock_for(std::chrono::seconds(1))) { // 尝试锁定 1 秒,使用try_lock_for或者try_lock_until
cout << "Thread ID: " << std::this_thread::get_id() << " acquired the lock" << endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作
mtx.unlock();
} else {
cout << "Thread ID: " << std::this_thread::get_id() << " failed to acquire the lock" << endl;
}
}

int main() {
std::thread t1(threadFunction);
std::thread t2(threadFunction);

t1.join();
t2.join();

return 0;
}

//Thread ID: 140234567890176 acquired the lock
//Thread ID: 140234576282880 failed to acquire the lock

void threadFunction() {
auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1); // 设置截止时间

if (mtx.try_lock_until(deadline)) { // 尝试在截止时间之前获取锁
cout << "Thread ID: " << std::this_thread::get_id() << " acquired the lock" << endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作
mtx.unlock();
} else {
cout << "Thread ID: " << std::this_thread::get_id() << " failed to acquire the lock" << endl;
}
}

recursive_timed_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;

std::recursive_timed_mutex mtx; // 定义一个递归定时互斥锁

void recursiveFunction(int count) {
if (mtx.try_lock_for(std::chrono::seconds(1))) { // 尝试锁定 1 秒
cout << "Count: " << count << endl;
if (count > 0) {
recursiveFunction(count - 1); // 递归调用
}
mtx.unlock();
} else {
cout << "Failed to acquire the lock at count: " << count << endl;
}
}

int main() {
std::thread t(recursiveFunction, 5);
t.join();
return 0;
}

//Count: 5
//Count: 4
//Count: 3
//Count: 2
//Count: 1
//Count: 0

(了解)关于 std::chrono
它提供了高精度的时间操作和计时功能,允许开发者以灵活的方式处理时间点和时间段。std::chrono 的核心概念包括 时间点(time_point)时间段(duration)时钟(clock)

常用操作有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//时间段duration的操作---------------------------------------
//创建时间段
std::chrono::seconds sec(10); // 10 秒
std::chrono::milliseconds ms(500); // 500 毫秒
std::chrono::microseconds us(1000); // 1000 微秒
//时间段转换
std::chrono::seconds sec = std::chrono::duration_cast<std::chrono::seconds>(ms);
//时间段运算
std::chrono::seconds sec1(10);
std::chrono::seconds sec2(20);
std::chrono::seconds sec3 = sec1 + sec2; // 30 秒

//时间点time_point的操作--------------------------------------
//获取当前时间点
auto now = std::chrono::system_clock::now();
//时间点转换为时间戳
std::time_t now_time_t = std::chrono::system_clock::to_time_t(now);
//时间点运算
auto now = std::chrono::system_clock::now();
auto future = now + std::chrono::seconds(10); // 当前时间 + 10 秒

//时钟clock的操作--------------------------------------------
//获取当前时间点
auto now = std::chrono::system_clock::now();
//获取稳定时间点
auto steady_now = std::chrono::steady_clock::now();

lock_guard

什么是 lock_guard?为什么需要 lock_guard

lock_guard 是一个模板类,位于 <mutex> 头文件中,符合 RAII 风格,用于管理 mutex 的生命周期,解决了手动管理 mutex 锁定和解锁时可能出现的问题(忘记解锁、异常情况下未解锁等)。
可以理解为,lock_guard 是对 mutex 的一种管理封装,它可以更好地管理 mutex。

💻代码示例

通过设定作用域,使得std::lock_guard在合适的地方被析构(在互斥量锁定到互斥量解锁之间的代码叫做临界区(需要互斥访问共享资源的那段代码称为临界区),临界区范围应该尽可能的小,即lock互斥量后应该尽早unlock),通过使用{}来调整作用域范围,可使得互斥量m在合适的地方被解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

mutex m;//实例化m对象,不要理解为定义变量

void proc1(int a)
{
lock_guard<mutex> g1(m);//用此语句替换了m.lock();lock_guard传入一个参数时,该参数为互斥量,此时调用了lock_guard的构造函数,申请锁定m
cout << "proc1函数正在改写a" << endl;
cout << "原始a为" << a << endl;
cout << "现在a为" << a + 2 << endl;
}//此时不需要写m.unlock(),g1出了作用域被释放,自动调用析构函数,于是m被解锁

void proc2(int a)
{
{
lock_guard<mutex> g2(m);
cout << "proc2函数正在改写a" << endl;
cout << "原始a为" << a << endl;
cout << "现在a为" << a + 1 << endl;
}//通过使用{}来调整作用域范围,可使得m在合适的地方被解锁
cout << "作用域外的内容3" << endl;
cout << "作用域外的内容4" << endl;
cout << "作用域外的内容5" << endl;
}

int main()
{
int a = 0;
thread t1(proc1, a);
thread t2(proc2, a);
t1.join();
t2.join();
return 0;
}

//proc1函数正在改写a
//原始a为0
//现在a为2
//proc2函数正在改写a
//原始a为0
//现在a为1
//作用域外的内容3
//作用域外的内容4
//作用域外的内容5

std::lock_guard 也可以传入两个参数,第一个参数被 adopt_lock 标识时,表示构造函数中不再进行互斥量锁定,因此此时需要提前手动锁定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;

mutex m;//实例化m对象,不要理解为定义变量

void proc1(int a)
{
m.lock();//手动锁定
lock_guard<mutex> g1(m, std::adopt_lock); // adapt_lock是一个标记(tag),告诉 lock_guard 互斥锁 m 已经被锁定
cout << "proc1函数正在改写a" << endl;
cout << "原始a为" << a << endl;
cout << "现在a为" << a + 2 << endl;
}//自动解锁

void proc2(int a)
{
lock_guard<mutex> g2(m);//自动锁定
cout << "proc2函数正在改写a" << endl;
cout << "原始a为" << a << endl;
cout << "现在a为" << a + 1 << endl;
}//自动解锁

int main()
{
int a = 0;
thread t1(proc1, a);
thread t2(proc2, a);
t1.join();
t2.join();
return 0;
}

  • lock_guard 禁用了拷贝构造函数和拷贝复制运算符,所以它不支持拷贝语义,只能通过直接创建对象来使用。这样可以避免多个 lock_guard 对象同时管理同一个互斥锁而导致的错误行为。
  • lock_guard 仅用于管理 mutex 的锁定和解锁(单一职责),对于更复杂的锁定需求,使用 std::unique_lock

unique_lock

为什么需要 unique_lock

mutex 在管理方面有瑕疵,因此使用互斥量封装器 lock_guard 来智能管理 mutex,但其功能较弱,需要功能更加强大的 unique_lock(也叫”灵活锁”)。

lock_guard 的瑕疵在于?

下表给出二者区别:

特性lock_guardunique_lock
自动锁定
自动解锁
手动加锁
延迟锁定不支持支持
手动解锁不支持,只能通过作用域自动解锁支持
参数支持 adopt_lock支持 adopt_lock/try_to_lock/defer_lock
与条件变量结合不支持支持
递归锁不支持支持
灵活性
性能开销
适用场景简单的锁管理复杂的锁管理

💻代码示例

构造(多种锁定策略)

  • std::defer_lock:延迟锁定,互斥锁在构造时不锁定。需要时调用 lock 手动锁定,结束时自动解锁。
  • std::try_to_lock:尝试锁定,如果互斥锁已被占用,则立即返回。
  • std::adopt_lock:接管已经锁定的互斥锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mutex mtx;
timed_mutex TimeMtx;
unique_lock<mutex> lock(mtx); //自动上锁
unique_lock<timed_mutex> TimeLock(TimeMtx, std::chrono::seconds(1));

unique_lock<mutex> lock1(mtx, std::defer_lock); //延迟锁定,不要自动锁定
unique_lock<mutex> lock2(mtx, std::try_to_lock);//尝试锁定
unique_lock<mutex> lock3(mtx, std::adopt_lock); //接收已经锁定的mutex

//所有权转移(unique_lock对象之间的转移)
unique_lock<mutex> lock4(move(lock)); //所有权转移,此时由lock来管理互斥量mtx

//判断是否锁成功
if (lock2.owns_lock())
{
cout << "锁成功" << endl;
}

🚩条件变量 condition_variable

什么是条件变量 std::condition_variable

std::condition_variable 是 C++11 引入的一个同步原语,用于实现线程间的通信和同步。它通常与 std::mutex 配合使用,允许一个线程等待另一个线程的通知,从而实现线程间的协作。

条件变量的使用场景?

  • 生产者-消费者模型:一个线程生产数据,另一个线程消费数据,通过条件变量实现同步。
  • 事件通知:一个线程等待某个事件的发生,另一个线程在事件发生时通知它。
  • 线程协作:多个线程需要协作完成任务,通过条件变量实现同步。

💻代码示例

  • 等待(wait):一个线程调用 wait() 方法,进入等待状态,直到另一个线程调用 notify_one()notify_all() 通知它。
    线程进入等待期间,会释放与之关联的互斥锁,允许其他线程访问共享数据。当线程被唤醒后,会重新获取互斥锁并继续执行。
  • 通知(notify):另一个线程调用 notify_one()notify_all() 方法,唤醒等待的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//两个线程交替执行
#include<iostream>
#include<thread>
#include<mutex>
#include<queue>
#include<condition_variable>
using namespace std;

mutex mtx;
condition_variable cv;
int MyValue = 0;
bool turn = false; //控制哪个线程该执行

void Increment(int thread_id)
{
for (int i = 0; i < 3; ++i) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&] { return thread_id == 1 ? !turn : turn; });
++MyValue;
cout << "Thread " << thread_id << " incremented value to: " << MyValue << endl;
// 切换turn标志并通知另一个线程
turn = !turn;
cv.notify_all(); //这里只需要notify_one()即可
}
}

int main()
{
thread t1(Increment, 1);
thread t2(Increment, 2);
t1.join();
t2.join();
cout << "Final Value: " << MyValue << endl;
return 0;
}

//Thread 1 incremented value to: 1
//Thread 2 incremented value to: 2
//Thread 1 incremented value to: 3
//Thread 2 incremented value to: 4
//Thread 1 incremented value to: 5
//Thread 2 incremented value to: 6
//Final Value: 6
  1. wait(std::unique_lock<std::mutex>&, Predicate),其中 Predicate 指可调用对象。
  2. wait_for(std::unique_lock<std::mutex>&, std::chrono::seconds, Predicate):允许指定一个超时时间,这段时间内没有收到唤醒信号/条件不满足,则函数会返回,并且线程会重新获取互斥锁。
  3. notify_one():唤醒一个等待的线程,适用于只有一个线程需要被唤醒的场景。
    例如,生产者-消费者模型中,生产者生产一个数据后,只需要唤醒一个消费者。
  4. notify_all():唤醒所有等待的线程,适用于多个线程需要被唤醒的场景。
    例如,多个消费者等待同一个条件变量时,生产者生产数据后,需要唤醒所有消费者。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//生产者-消费者模型
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;

std::mutex mtx; // 定义一个互斥锁
std::condition_variable cv; // 定义一个条件变量
std::queue<int> SharedQueue; // 共享队列

void producer(bool notify_all) {
for (int i = 0; i < 50000; ++i) { //样本数较大时效果更加直观
std::unique_lock<std::mutex> lock(mtx); // 自动锁定
SharedQueue.push(i); // 生产数据
cout << "Produced: " << i << endl;
if (notify_all) {
cv.notify_all(); // 通知所有消费者
} else {
cv.notify_one(); // 通知一个消费者
}
}
}

void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx); // 自动锁定
cv.wait(lock, [] { return !SharedQueue.empty(); }); // 等待队列非空
int value = SharedQueue.front(); // 消费数据
SharedQueue.pop();
cout << "Consumer " << id << " consumed: " << value << endl;
if (value == 4) break; // 结束条件
}
}

int main() {
bool notify_all = true; // 选择 notify_all 或 notify_one
std::thread producerThread(producer, notify_all);
std::thread consumerThread1(consumer, 1);
std::thread consumerThread2(consumer, 2);

producerThread.join();
consumerThread1.join();
consumerThread2.join();

return 0;
}

选择 notify_one():等待队列中的第一个线程(和 id 等无关)执行函数

1
2
3
4
5
6
7
8
9
10
11
12
13
Produced: 0
Produced: 1
Produced: 2
...
Consumer 1 consumed: 0
Consumer 1 consumed: 1
Produced: 9
Produced: 10
Consumer 1 consumed: 3
Consumer 1 consumed: 4
Consumer 1 consumed: 5
...
Consumer 1 consumed: 49999

选择 notify_all():由于消费者线程之间的竞争,很可能出现仅有 consumer 1/2 参与的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
Produced: 0
Produced: 1
Produced: 2
...
Consumer 1 consumed: 0
Consumer 1 consumed: 1
Produced: 9
Produced: 10
Consumer 2 consumed: 3
Consumer 2 consumed: 4
Consumer 2 consumed: 5
...
Consumer 1 consumed: 49999

每次运行的输出结果不会完全一致,但大体逻辑不变。
|916

🚩读写锁 shared_mutex

基本概念

什么是读写锁?

读写锁(共享锁、独占锁)是一种同步机制,允许多个线程同时读取资源,但是同一时间只允许一个线程写入资源。

基本特征?

读读之间不互斥,写写/读写之间互斥。

特性描述
共享锁- 允许多个线程同时持有
- 多个线程可以并发地读取共享资源
- 不能与独占锁同时持有
独占锁- 只能有一个线程持有
- 持有独占锁的线程可以写入共享资源
- 当持有独占锁时,其他线程不能持有共享锁或独占锁
读写分离- 读操作和写操作分开处理,提高并发性能
- 当持有独占锁时,其他线程不能持有共享锁或独占锁
互斥性- 读写锁中的写操作与其他写操作、读操作互斥
- 读操作之间不互斥
公平性- 有些读写锁实现提供公平性机制,确保读线程和写线程不会被长期阻塞
性能- 在读多写少的情况下能显著提高性能
- 在写多的情况下,读写锁的性能优势可能不明显

代码实现

shared_mutex

std::shared_mutex 是 C++17 引入的一种互斥锁,支持多读单写的并发访问模式。
它允许多个线程同时持有共享锁(读锁),但在持有独占锁时,其他线程不能再持有任何类型的锁。
|566
虽然其存在排他性锁定的成员函数(lock/unlock 等),但是一般使用:
unique_lock / lock_guard 管理独占锁(排他性锁定),shared_mutex 管理共享锁定
unique_lockshared_mutex 都会自动上锁(构造函数实现了)。前者位于头文件 <mutex> 中,后者位于头文件 <shared_mutex> 中。

shared_lock

C++17 引入的一种锁管理器,用于管理 shared_mutex 的共享锁。shared_lock 同样可以自动获取和释放锁。
shared_lock 也称通用共享互斥所有权包装器,unique_lock 也称独占所有权包装器,二者配合使用对 shared_mutex 进行管理,实现读写锁。

成员函数作用
std::shared_lock(mutex_type& m)构造时获取共享锁
std::shared_lock(mutex_type& m, std::defer_lock)构造时不获取锁
std::shared_lock(mutex_type& m, std::try_to_lock)构造时尝试获取共享锁
std::shared_lock(mutex_type& m, std::adopt_lock)构造时认为调用者已经持有共享锁
lock()获取共享锁
try_lock()尝试获取共享锁
unlock()释放共享锁
owns_lock()返回结果:锁是否被持有
release()释放锁的所有权,但不解锁

💻代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>  
#include <vector>
#include <thread>
#include <shared_mutex>
#include <mutex>

std::shared_mutex rw_mutex; // 读写锁
int shared_data = 0; // 共享数据

// 读线程函数
void reader() {
std::shared_lock lock(rw_mutex); // 申请共享锁
std::cout << "Reader thread: " << std::this_thread::get_id() << " reads value: " << shared_data << std::endl;
}

// 写线程函数
void writer(int value) {
std::unique_lock lock(rw_mutex); // 申请独占锁
shared_data = value; // 修改共享数据
std::cout << "Writer thread: " << std::this_thread::get_id() << " writes value: " << shared_data << std::endl;
}

int main() {
std::vector<std::thread> threads; // 用于存储线程的向量

// 创建读线程
for (int i = 0; i < 5; ++i) {
threads.emplace_back(reader);
}
// 创建写线程
for (int i = 0; i < 2; ++i) {
threads.emplace_back(writer, i);
}
// 等待所有线程完成
for (std::thread& t : threads) {
t.join();
}
return 0;
}

输出结果:
1
2
3
4
5
6
7
Reader thread: Reader thread: 3 reads value: Reader thread: 5 reads value: 00
Reader thread: 4 reads value: 0

Reader thread: 6 reads value: 0
2 reads value: 0
Writer thread: 7 writes value: 0
Writer thread: 8 writes value: 1

读的输出比较混乱,是因为读与读操作之间并不互斥。

🚩原子变量和原子操作 atomic

基本概念

原子变量是什么?

原子变量是指使用 std::atomic 模板类定义的变量,确保在多线程环境中,对变量的读写操作是线程安全的,不会被其他线程中断或干扰。

1
2
3
4
//定义原子变量
#include <atomic>

std::atomic<int> atomicInt(10); //int型的原子变量

原子变量的特性

  • 原子性:对原子变量的读、写、修改操作是不可分割的,即操作要么完全完成,要么完全不完成
  • 无锁:原子操作不需要锁机制,因此不会引起上下文切换,具有更高的性能。(上下文切换:指操作系统从一个线程转换到另一线程的过程)
  • 易用性:标准库提供了原子操作接口,简化了多线程编程中的同步问题

原子操作是什么?

原子操作是指对原子变量进行的不可分割的操作。不可分割的意思是这些操作要么完全执行,要么完全不执行,不会在执行过程中被其他线程打断。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>  
#include <thread>
#include <atomic>
using namespace std;

atomic<int> num = 0;

void AddNum()
{
for (int i = 0; i < 1000000; ++i) {
num++;
//num += 1; //结果为2000000
//num = num + 1; //结果为1259262
}
}

int main() {
thread t1(AddNum);
thread t2(AddNum);
t1.join();
t2.join();
cout << num << endl; //2000000
return 0;
}

✅注意:原子变量可以进行原子操作,原子操作才是真正保证变量是线程安全的根本原因。即使使用原子变量,如果进行的不是原子操作,也无法保证线程安全。
如果使用 num = num + 1,会发现输出结果并不是预期的 2000000,这是因为该操作并不是原子操作(没有被 atomic 类重载)。

常见原子操作方法

  1. 加载和存储操作
    1
    2
    int value = atomicInt.load();	//原子加载
    atomicInt.store(10); //原子存储
  2. 读写操作(等价于 load 和 store)
    1
    2
    int value = atomicInt;	//原子读取
    atomicInt = 10; //原子写入
  3. 自增自减操作
    1
    2
    3
    4
    atomicInt++;
    atomicInt--;
    ++atomicInt;
    --atomicInt;
  4. 复合赋值操作
    1
    2
    atomicInt += 5;	//原子加
    atomicInt &= 3; //原子位与
  5. 高级操作
    compare_exchange:比较当前值和期望值,如果相等则将当前值替换为新值,否则更新期望值。
    fetch_add / fetch_sub / fetch_and:对当前值进行加法/减法/按位与操作,并返回操作前的值。
    exchange:交换当前值和新值,并返回旧值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    int expected = 0;
    int desired = 1;
    if (atomicInt.compare_exchange_strong(expected, desired))
    {
    // 如果atomicInt == expected,则让atomicInt = desired并返回true
    // 否则expected = atomicInt,返回false
    }

    atomicInt.fetch_add(1, memory_order_relaxed);

    std::atomic<int> value(10);
    int old_value = value.exchange(20); // 交换值,返回旧值 10,现在value = 20
    以上原子操作仅针对原子变量生效。

    原子操作的内存序问题

    原子操作可以指定不同的内存顺序,以控制操作的可见性和排序。
    内存序不会影响原子操作的原子性,但会影响操作的可见性和顺序。
顺序特点
memory_order_relaxed没有同步或顺序约束,仅保证原子性
memory_order_acquire确保该操作之前的所有读操作不会被重排序到该操作之后
memory_order_release确保该操作之后的所有写操作不会被重排序到该操作之前
memory_order_acq_rel同时具备 acquire 和 release 的属性
memory_order_seq_cst顺序一致性,保证所有线程的操作按照顺序发生

🚩信号量 semaphore

基本概念

什么是信号量?

信号量(Semaphore)是一种用于管理和协调多线程或多进程访问共享资源的同步机制。它通过计数器来控制对资源的访问数量,确保多个线程或进程能够安全地使用共享资源而不会发生数据竞争或死锁。传统的锁(如互斥锁)可以用来保护共享资源,但对于某些场景(如资源的计数管理),信号量提供了更灵活和高效的解决方案。

信号量的类型?

  1. 计数信号量:允许对资源的多次访问,计数信号量的值可以是任意非负整数,表示可以同时访问资源的线程或进程的数量。
  2. 二元信号量:也称为互斥信号量 mutex,其值只能是 0 或 1,类似于互斥锁,用于实现对资源的互斥访问。

信号量的作用?

|684

信号量的基本操作?

  • P 操作(等待,wait):如果信号量的值大于 0,则将其减 1,否则线程会被阻塞,直到信号量的值大于 0。
  • V 操作(释放,signal):将信号量的值加 1,如果有线程被阻塞在 P 操作上,则唤醒一个阻塞的线程。
    PV 操作均为原子操作。类似加锁解锁的操作。
    |590

    counting_semaphore

    std::counting_semaphore 是 C++20 标准库中的一个类模板,实现了一个计数信号量。需要头文件 <semaphore>
    禁止拷贝构造,禁止拷贝赋值操作(不允许 a(b) 或者 a = b)。
成员函数作用
void acquire()尝试获取信号量,如果信号量的计数值为 0,则阻塞当前线程,直到计数值大于 0。
bool try_acquire()- 尝试获取信号量。如果信号量的计数值大于 0,则将其减 1 并返回 true
- 如果计数值为 0,则立即返回 false,不会阻塞。
bool try_acquire_for(const chrono::duration&)- 如果在指定时间内信号量的计数值大于 0,则将其减 1 并返回 true
- 如果超时仍未获取到信号量,则返回 false
bool try_acquire_until(const chrono::time_point&)- 如果在指定时间点之前信号量的计数值大于 0,则将其减 1 并返回 true
- 如果超时仍未获取到信号量,则返回 false
void release(std::ptrdiff_t update = 1)释放信号量,增加信号量的计数值。默认加 1。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <semaphore>

// 创建一个最大值为 5 的计数信号量
std::counting_semaphore<5> semaphore(3); // 初始值为 3
semaphore.acquire(); // 计数值减 1,变为 2
semaphore.release(); // 计数值增加 1,变为 3
semaphore.release(2); // 计数值增加 2,变为 5
// 创建一个动态最大值的计数信号量
std::counting_semaphore<> dynamic_semaphore(2); // 初始值为 2

if (semaphore.try_acquire_for(std::chrono::seconds(1))) {
// 成功获取信号量
} else {
// 超时未获取到信号量
}

💻代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>  
#include <thread>
#include <semaphore>
#include <vector>

using namespace std;

std::counting_semaphore<1> sem(1); //初始值为1的计数信号量; sem.max()为1

void worker(int id)
{
sem.acquire(); //P操作,等待信号量大于0并将其减1
cout << "worker " << id << " is working" << endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); //模拟工作
cout << "worker " << id << " is done" << endl;
sem.release(); //V操作,将信号量加1
}

int main() {
cout << "max " << sem.max() << endl;
vector<thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(worker, i);
}
for (thread& t : threads) {
t.join();
}
return 0;
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
max 1
worker 0 is working
worker 0 is done
worker 3 is working
worker 3 is done
worker 4 is working
worker 4 is done
worker 1 is working
worker 1 is done
worker 2 is working
worker 2 is done

使用多个信号量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 信号量,用于线程同步,初始计数值为0,用于控制 work 线程的执行
std::counting_semaphore<1> ready(0);

// 信号量,用于线程同步,初始计数值为1,用于控制 prepare 线程的执行
std::counting_semaphore<1> done(1);

void prepare() {
done.acquire(); // 减少信号量 done 的计数值,确保 prepare 在 work 完成之前不再执行
std::cout << "Preparing...\n";
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟准备工作的耗时
std::cout << "Preparation done.\n";
ready.release(); // 增加信号量 ready 的计数值,通知 work 线程可以开始工作
}

void work() {
ready.acquire(); // 等待 ready 信号量,确保准备工作完成后再执行
std::cout << "Working...\n";
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作的耗时
std::cout << "Work done.\n";
done.release(); // 增加信号量 done 的计数值,通知 prepare 线程可以重新进入准备阶段
}

int main() {
std::thread t1(prepare); // 创建 prepare 线程,执行准备工作
std::thread t2(work); // 创建 work 线程,执行主要工作

t1.join(); // 等待 prepare 线程执行完成
t2.join(); // 等待 work 线程执行完成

std::cout << "All tasks completed.\n"; // 所有工作完成

return 0;
}

输出结果:
1
2
3
4
5
Preparing...
Preparation done.
Working...
Work done.
All tasks completed.

栅栏 barrier

基本概念

什么是栅栏?

栅栏是一种同步原语,用于协调多个线程的执行,使得它们能够在某个特定的点(即栅栏)等待。直到所有线程都达到这一个点,才能继续执行。
位于 C++20 的头文件 <barrier> 中。

栅栏的作用?

确保并发任务在某些关键时刻同步,比如等待所有线程完成某个阶段的工作,然后再进入下一阶段。
|807
用于阶段同步和批处理。

栅栏的特点?

  1. 同步点:栅栏用于创建一个同步点,确保多个线程在同一时刻同步。
  2. 计数器:栅栏内部维护一个计数器,记录到达栅栏的线程数量。当计数器达到预设值时,所有等待的线程被同时唤醒。
  3. 可重用性:C++20 中的 std::barrier 是可重用的,线程可以反复使用同一个栅栏对象进行同步。

💻代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>  
#include <vector>
#include <thread>
#include <barrier>

using namespace std;

void worker(int id, std::barrier<>& sync_point) //同步点需要为引用
{
cout << "Worker " << id << " is doing phase 1 work.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); //模拟不同的工作时间

// 到达栅栏,等待其他线程
sync_point.arrive_and_wait();

cout << "Worker " << id << " has completed phase 1 and is doing phase 2 work.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); //模拟不同的工作时间
}

int main() {
const int num_threads = 5;
std::barrier sync_point(num_threads); //创建barrier对象,记录同步点

vector<thread> threads;
for (int i = 1; i <= num_threads; ++i) {
threads.emplace_back(worker, i, std::ref(sync_point));
}

for (thread& t : threads){
t.join();
}
std::cout << "All tasks completed.\n"; // 所有工作完成
return 0;
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
Worker Worker 2 is doing phase 1 work.
1 is doing phase 1 work.
Worker 3 is doing phase 1 work.
Worker 4 is doing phase 1 work.
Worker 5 is doing phase 1 work.
Worker Worker 5 has completed phase 1 and is doing phase 2 work.
Worker 1 has completed phase 1 and is doing phase 2 work.
Worker 3 has completed phase 1 and is doing phase 2 work.
4 has completed phase 1 and is doing phase 2 work.
Worker 2 has completed phase 1 and is doing phase 2 work.
All tasks completed.

死锁

什么是死锁?

死锁是指两个或者多个进程互相等待对方释放资源,从而导致所有进程或线程都无法继续执行的现象。
|647

死锁的必要条件?

  1. 互斥:资源一次只能被一个资源占用。
  2. 请求和保持:线程已经持有至少一个资源,同时又申请新的资源,而新资源已经被其他线程占有。
  3. 不剥夺:已经获得的资源在未使用完之前,不能被强行剥夺,只能在使用完毕后自己释放。
  4. 循环等待:存在一个线程循环的链,链中的每个线程都持有下一个线程需要的资源。
    要想解决死锁这一问题,就需要避开上面四点,可以使用高级同步工具(如 unique_lock 等)。

💻代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//当线程1 持有 mtx1 并等待 mtx2 时,线程2 持有 mtx2 并等待 mtx1。
//两个线程互相等待对方释放锁,导致死锁
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx1; // 第一个互斥锁
std::mutex mtx2; // 第二个互斥锁

void thread1() {
std::lock_guard<std::mutex> lock1(mtx1); // 先锁定 mtx1
std::cout << "Thread 1 acquired mtx1\n";

// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));

std::lock_guard<std::mutex> lock2(mtx2); // 再锁定 mtx2
std::cout << "Thread 1 acquired mtx2\n";
}

void thread2() {
std::lock_guard<std::mutex> lock2(mtx2); // 先锁定 mtx2
std::cout << "Thread 2 acquired mtx2\n";

// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));

std::lock_guard<std::mutex> lock1(mtx1); // 再锁定 mtx1
std::cout << "Thread 2 acquired mtx1\n";
}

int main() {
std::thread t1(thread1);
std::thread t2(thread2);

t1.join();
t2.join();

std::cout << "All threads completed.\n";
return 0;
}

//Thread 1 acquired mtx1
//Thread 2 acquired mtx2
//导致死锁
  1. 同时锁定两个锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void thread1() {
    lock(mtx1, mtx2); // 同时锁定 mtx1 和 mtx2
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
    std::cout << "Thread 1 acquired both locks\n";
    }

    void thread2() {
    std::lock(mtx1, mtx2); // 同时锁定 mtx1 和 mtx2
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
    std::cout << "Thread 2 acquired both locks\n";
    }
  2. 统一锁定顺序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void thread1() {
    std::lock_guard<std::mutex> lock1(mtx1); // 先锁定 mtx1
    std::lock_guard<std::mutex> lock2(mtx2); // 再锁定 mtx2
    std::cout << "Thread 1 acquired both locks\n";
    }

    void thread2() {
    std::lock_guard<std::mutex> lock1(mtx1); // 先锁定 mtx1
    std::lock_guard<std::mutex> lock2(mtx2); // 再锁定 mtx2
    std::cout << "Thread 2 acquired both locks\n";
    }
  3. 使用 std::unique_lockstd::defer_lock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void thread1() {
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    std::lock(lock1, lock2); // 同时锁定
    std::cout << "Thread 1 acquired both locks\n";
    }

    void thread2() {
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    std::lock(lock1, lock2); // 同时锁定
    std::cout << "Thread 2 acquired both locks\n";
    }

异步编程

概念

什么是异步?

允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞或等待这些操作完成。

异步编程的优点?为什么选择异步编程?

  • 提高性能:通过并发执行多个任务,异步编程可以更高效地利用 CPU 资源。
  • 提高响应速度:异步编程时程序在等待某些操作完成时继续响应用户输入,提高用户体验。
  • 简化 I/O 操作:异步编程非常适合处理 I/O 密集型操作,比如文件读取、网络请求等。

🚩async

基本概念

什么是 async

async 是一个函数模板,包含在头文件 <future> 中,用于启动一个异步任务。
它接受一个可调用对象作为参数,并在一个单独的线程上异步执行该对象。
std::async 自动管理异步任务的生命周期,并返回一个 std::future 对象,该对象可用于获取异步操作的结果。

什么是 future

std::future 是一个模板类,用于表示异步操作的结果。
std::future 对象需要借助 std::asyncstd::promisestd::packaged_task 结合使用。

💻代码实现

🚀future 的基本成员函数:

成员函数作用
get()获取异步任务的返回值。如果任务尚未完成,会阻塞直到任务完成。
wait()等待异步任务完成,但不获取返回值。
wait_for()等待异步任务完成,直到指定的时间。
如果任务在指定时间内完成,返回 std::future_status::ready
如果任务未完成,返回 std::future_status::timeout
wait_until()等待异步任务完成,直到指定的时间点。
如果任务在指定时间点前完成,返回 std::future_status::ready
如果任务未完成,返回 std::future_status::timeout
valid()检查 std::future 对象是否持有有效的异步任务。
如果对象有效,返回 true;否则返回 false
share()返回一个 std::shared_future 对象,允许多个线程共享异步任务的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <future>
using namespace std;

int myfunc(int a, int b) {
cout << "This is myfunc" << " ";
return a + b;
}

int main() {
// async的返回对象必须要赋值使用,不能直接async(myfunc, 1, 2);
future<int> MyFunc = async(myfunc, 1, 2); /* 等价于 thread t(myfunc, 1, 2); t.join(); 所以会输出"This is my func" */
cout << "MyFunc == " << MyFunc.get() << " test" << endl; // MyFunc.get()的返回值为3
return 0;
}

//MyFunc == This is myfunc 3 test //输出顺序不固定,但test一定在3之后输出,3一定在两句以后输出

✅注意:

  • get() 等待异步操作完成后,再获取结果。如果异步操作尚未完成,调用 get() 的线程将被阻塞,直到操作完成。
  • 一旦调用 get(),它将返回异步操作的结果,并且 future 对象将变为无效状态,不能再调用 get()。一个 future 对象只能调用一次 get()
  • wait() 等待异步操作完成,但不获取结果。与 get() 不同之处在于,调用 wait() 后,future 对象仍然有效,可以继续使用 get() 来获取结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>  
#include <future>

using namespace std;

int myfunc(int a, int b) {
cout << "This is myfunc ";
return a + b;
}

int main() {
future<int> MyFunc = async(myfunc, 1, 2);
MyFunc.wait(); //等待异步操作完成,之后进行下一步操作
cout << "MyFunc == " << MyFunc.get() << " test" << endl;
return 0;
}

//This is myfunc MyFunc == 3 test //此时输出顺序固定

三种启动策略

启动策略是否创建新线程任务执行时机适用场景
std::launch::async立即执行,创建新线程需要并发执行的任务,任务可以立即开始执行。
std::launch::deferred延迟执行,调用 get()wait()不需要立即执行的任务,任务可以在需要时才执行。
默认由实现决定由实现决定不确定任务是否需要立即执行,由实现自动选择执行方式。
  1. launch::async

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #include <iostream>  
    #include <thread> // this_thread需要头文件和std命名空间
    #include <future>

    using namespace std;

    int myfunc(int a, int b) {
    cout << "This is myfunc: " << this_thread::get_id() << endl;
    return a + b;
    }

    int main() {
    future<int> MyFunc = async(launch::async, myfunc, 1, 2);
    cout << "This is main: " << this_thread::get_id() << endl;
    cout << "This is from myfunc: " << MyFunc.get() << endl;
    return 0;
    }

    //This is main: This is myfunc: 21
    //This is from myfunc:
    //3

    这里由于策略原因,多句话立即同时执行了,async 创建的新线程和主线程同时进行。
    加上 wait() 即可观察正确的线程 id:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int main() {  
    future<int> MyFunc = async(launch::async, myfunc, 1, 2);
    MyFunc.wait();
    cout << "This is main: " << this_thread::get_id() << endl;
    cout << "This is from myfunc: " << MyFunc.get() << endl;
    return 0;
    }

    //This is myfunc: 2
    //This is main: 1
    //This is from myfunc: 3
  2. launch::deferred
    需要注意的是,可能隐藏潜在的并发问题,因为任务可能在查询结果时才执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    int main() {  
    future<int> MyFunc = async(launch::deferred, myfunc, 1, 2);
    cout << "This is main: " << this_thread::get_id() << endl;
    cout << "This is from myfunc: " << MyFunc.get() << endl; // 调用get()方法时才执行myfunc()
    return 0;
    }

    //This is main: 1
    //This is from myfunc: This is myfunc: 1
    //3

    加入 wait 方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int main() {  
    future<int> MyFunc = async(launch::deferred, myfunc, 1, 2);
    MyFunc.wait();
    cout << "This is main: " << this_thread::get_id() << endl;
    cout << "This is from myfunc: " << MyFunc.get() << endl;
    return 0;
    }

    //This is myfunc: 1
    //This is main: 1
    //This is from myfunc: 3

    可以看出,launch::deferred 策略并没有创建新的线程,并且在调用 get 方法时才执行 myfunc()

🚩promise

基本概念

promise 是什么?

std::promise 是一个用于设置异步操作结果的机制。它允许在一个线程中设置值或异常,然后在另一个线程中通过 std::future 对象检索这些值或异常。
通常与 std::asyncstd::packaged_taskstd::thread 结合使用。需要头文件 <future>

一个 promise 对象可以创建多个 future 对象。
|607

💻代码实现

设置值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void AsyncTask(promise<int> prom) {  
std::this_thread::sleep_for(std::chrono::seconds(1));
prom.set_value(3); // 设置异步操作的结果
}

int main() {
std::promise<int> prom;
std::future<int> FutureObj = prom.get_future(); //获取与promise关联的future。<int>指prom.set_value的参数类型

std::thread t(AsyncTask, std::move(prom)); //启动异步任务,并传递参数promise

cout << "The result is: " << FutureObj.get() << endl;

t.join(); //等待线程结束。必须加上
return 0;
}

//The result is: 3

promiseasync 更加灵活,拥有更多功能。

成员函数作用
get_future()获取与 std::promise 关联的 std::future 对象。
set_value()设置 std::promise 的值。
set_exception()设置 std::promise 的异常。
set_value_at_thread_exit()在线程退出时设置值。
set_exception_at_thread_exit()在线程退出时设置异常。

处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>  
#include <thread>
#include <future>
using namespace std;

void producer(std::promise<int>& promise) {
try {
throw std::runtime_error("An error occurred!");
} catch (const std::exception& e) {
promise.set_exception(std::current_exception()); // 设置异常
}
}

void consumer(std::future<int>& future) {
try {
int value = future.get(); // 获取值,此时会捕获到抛出的异常
std::cout << "Received value: " << value << std::endl;
} catch (const std::exception& e) {
std::cout << "Exception caught: " << e.what() << std::endl;
}
}

int main() {
std::promise<int> promise;
std::future<int> future = promise.get_future();

std::thread producer_thread(producer, std::ref(promise));
std::thread consumer_thread(consumer, std::ref(future));

producer_thread.join();
consumer_thread.join();

return 0;
}
//Exception caught: An error occurred!

注意事项

  1. 只能设置一次值或异常std::promise 只能设置一次值或异常。如果多次调用 set_value()set_exception(),会导致未定义行为。
  2. 线程安全std::promisestd::future 是线程安全的,但需要确保在适当的时机调用 set_value()get()
  3. 不能被直接复制std::promise 对象不能被复制,只能通过 std::move 转移控制权。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    std::promise<int> promise;
    std::promise<int> promise2 = std::move(promise);

    void MyFunc(promise<int> prom) {
    prom.set_exception(std::current_exception());
    }
    std::thread t(MyFunc, std::move(promise)); //值传递

    void MyFunc(promise<int>& prom) { //不可以为const,因为set_exception()
    prom.set_exception(std::current_exception());
    }
    std::thread t(MyFunc, std::ref(promise)); //引用传递

🚩packaged_task

基本概念

什么是 packaged_task

packaged_task 是一个模板类,用于封装可调用对象,并将任务的执行与结果的获取分离。同样需要头文件 <future>

💻代码实现

在另一个线程中执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int main() {  
std::packaged_task<int(int,int)> task([](int a, int b) { return a + b; });
std::future<int> result = task.get_future();

std::thread t(std::move(task), 1, 2); //packaged_task同样不可以复制
t.join();

cout << result.get() << endl;
return 0;
}

//另一种构造写法:
int Multiply(int a, int b) {
return a * b;
}

int main() {
std::packaged_task<decltype(Multiply)> task(Multiply);
//...
}

重置对象,获取新的 future 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {  
std::packaged_task<int(int,int)> task([](int a, int b) { return a + b; });
std::future<int> result = task.get_future();

task(1, 2); //执行任务
cout << result.get() << endl;

task.reset(); //重置packaged_task,因为其只能执行一次任务
result = task.get_future(); //必须获取新的future对象,因为每个future对象在调用get方法后会失效

task(3, 4);
cout << result.get() << endl;

return 0;
}
//3
//7

成员函数作用
get_future()获取与 std::packaged_task 关联的 std::future 对象。
operator()执行封装的可调用对象,并将结果存储在 std::future 中。
make_ready_at_thread_exit()在线程退出时执行任务,并将结果存储在 std::future 中。
reset()重置 std::packaged_task,允许重新执行任务。

注意事项

  1. std::packaged_task 使用 operator() 执行任务,并将结果存储在 std::future 中。
  2. 不可复制packaged_task 可移动,但不可复制。因此,构建 thread 中值传递 std::packaged_task 时需要使用 std::move(引用传递用 std::ref)。
  3. 只能执行一次任务std::packaged_task 只能执行一次任务。如果需要重新执行任务,应使用 reset() 重置 std::packaged_task
  4. 线程安全std::packaged_taskstd::future 是线程安全的,但需要确保在适当的时机调用 get_future()get()

三种异步工具的比较

特性asyncpromisepackaged_task
代码写法简单,自动管理线程和任务执行复杂,手动管理线程和任务执行复杂,手动管理线程和任务执行
成员函数自动设置值或异常get_future()set_value()set_exception()get_future()operator()reset()
应用场景简单异步任务复杂线程间通信,灵活性最高任务调度、线程池、封装可调用对象

共同点是,它们均为 C++11 新特性,包含在头文件 <future> 中,并且都需要搭配 future 对象实现功能。

之间的关系

以下内容摘自 Thread、Future、Promise、Packaged_task、Async之间有什么关系?
|852

  • packaged_taskpromise + function
  • asyncthread + packaged_task
  • 通过 promiseget_future() 可拿到 future 对象
  • 通过 future 对象的 share() 可拿到 shared_future 对象

promise 只能 set_value,不太好执行复杂的逻辑,有执行函数+阻塞的需求时,就可以考虑使用 packaged_task

shared_future
普通的 future 有个特点,它不能拷贝,只能移动,这就意味着只能有一个线程一个实例可以通过 get() 拿到对应的结果。
如果想要多个线程多个实例拿到结果,就可以使用 shared_future,那怎么拿到 shared_future,可以通过普通 futureshared() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>  
#include <future>
using namespace std;

int main() {
std::promise<int> prom;
future<int> fu = prom.get_future();
shared_future<int> shared_fu = fu.share();
future<void> f1 = std::async(std::launch::async, [shared_fu]() { std::cout << shared_fu.get() << std::endl; });
future<void> f2 = std::async(std::launch::async, [shared_fu]() { std::cout << shared_fu.get() << std::endl; });
prom.set_value(102);
f1.get();
f2.get();
return 0;
}
//102102 //这里的输出顺序无法确定(毕竟因为异步)


线程池

基本概念

什么是线程池?作用是什么?

线程池是指一种预先创建一组线程的机制,这些线程在应用程序启动时就已经创建好,等待执行任务。
每当有新的任务需要执行时,线程池会从线程集合中分配一个空闲线程来执行该任务,而不是每次都重新创建和销毁线程。(池化思想,如 epoll、SQL)

为什么使用线程池?

  1. 提高性能创建和销毁线程开销较大,线程池通过重用的线程,减少了这种开销,提高程序性能。
  2. 控制并发量:线程池允许限制并发线程的数量,防止系统因创建过多线程而出现资源耗尽的问题(如 CPU 过载、内存不足)。
  3. 简化线程管理:使用线程池可以避免手动管理线程的生命周期,减少代码复杂性。线程池通常还提供了任务排队和调度的功能,使得多线程编程更加容易。

线程池的使用场景?

  1. 服务器应用:比如 Web 服务器,处理每个客户端请求时不必为每个请求创建一个新线程,而是从线程池中取出线程来处理请求。
  2. 高性能计算
  3. 异步任务处理

一个线程池应该包含什么?

  1. 线程池管理器(ThreadPool Manager):负责创建并管理线程池,包括线程的创建、销毁、任务分配等。
  2. 工作线程(Worker Thread):线程池中的线程,负责执行具体的任务。这些线程通常是预先创建好的,并且在任务执行完毕后不会立即销毁,而是返回到线程池中等待下一个任务。
  3. 任务队列(Task Queue):用于存储待执行的任务。当有新的任务提交到线程池时,任务会被放入任务队列中,等待工作线程来执行。
  4. 任务(Task):需要执行的工作单元,通常是一个函数或方法。
    图例:
    |954

💻代码实现

来自 threadpool-lzpong

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <atomic>
#include <future>
//#include <condition_variable>
//#include <thread>
//#include <functional>
#include <stdexcept>

namespace std
{
//线程池最大容量,应尽量设小一点
#define THREADPOOL_MAX_NUM 16
//线程池是否可以自动增长(如果需要,且不超过 THREADPOOL_MAX_NUM)
//#define THREADPOOL_AUTO_GROW

//线程池,可以提交变参函数或Lambda\表达式的匿名函数执行,可以获取执行返回值
//不直接支持类成员函数, 支持类静态成员函数或全局函数,Operator()函数等
class threadpool
{
unsigned short _initSize; //初始化线程数量
using Task = function<void()>; //定义类型
vector<thread> _pool; //线程池
queue<Task> _tasks; //任务队列
mutex _lock; //任务队列同步锁
#ifdef THREADPOOL_AUTO_GROW
mutex _lockGrow; //线程池增长同步锁
#endif // !THREADPOOL_AUTO_GROW
condition_variable _task_cv; //条件阻塞
atomic<bool> _run{ true }; //线程池是否执行
atomic<int> _idlThrNum{ 0 }; //空闲线程数量

public:
inline threadpool(unsigned short size = 4) { _initSize = size; addThread(size); }
inline ~threadpool()
{
_run=false;
_task_cv.notify_all(); // 唤醒所有线程执行
for (thread& thread : _pool) {
//thread.detach(); // 让线程“自生自灭”
if (thread.joinable())
thread.join(); // 等待任务结束, 前提:线程一定会执行完
}
}

public:
// 提交一个任务
// 调用.get()获取返回值会等待任务执行完,获取返回值
// 有两种方法可以实现调用类成员,
// 一种是使用 bind: .commit(std::bind(&Dog::sayHello, &dog));
// 一种是用 mem_fn: .commit(std::mem_fn(&Dog::sayHello), this)
template<class F, class... Args>
auto commit(F&& f, Args&&... args) -> future<decltype(f(args...))>
{
if (!_run) // stoped ??
throw runtime_error("commit on ThreadPool is stopped.");

using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函数 f 的返回值类型
auto task = make_shared<packaged_task<RetType()>>(
bind(forward<F>(f), forward<Args>(args)...)
); // 把函数入口及参数,打包(绑定)
future<RetType> future = task->get_future();
{ // 添加任务到队列
lock_guard<mutex> lock{ _lock };//对当前块的语句加锁 lock_guard 是 mutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock()
_tasks.emplace([task]() { // push(Task{...}) 放到队列后面
(*task)();
});
}
#ifdef THREADPOOL_AUTO_GROW
if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
addThread(1);
#endif // !THREADPOOL_AUTO_GROW
_task_cv.notify_one(); // 唤醒一个线程执行

return future;
}
// 提交一个无参任务, 且无返回值
template <class F>
void commit2(F&& task)
{
if (!_run) return;
{
lock_guard<mutex> lock{ _lock };
_tasks.emplace(std::forward<F>(task));
}
#ifdef THREADPOOL_AUTO_GROW
if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
addThread(1);
#endif // !THREADPOOL_AUTO_GROW
_task_cv.notify_one();
}
//空闲线程数量
int idlCount() { return _idlThrNum; }
//线程数量
int thrCount() { return _pool.size(); }

#ifndef THREADPOOL_AUTO_GROW
private:
#endif // !THREADPOOL_AUTO_GROW
//添加指定数量的线程
void addThread(unsigned short size)
{
#ifdef THREADPOOL_AUTO_GROW
if (!_run) // stoped ??
throw runtime_error("Grow on ThreadPool is stopped.");
unique_lock<mutex> lockGrow{ _lockGrow }; //自动增长锁
#endif // !THREADPOOL_AUTO_GROW
for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
{ //增加线程数量,但不超过 预定义数量 THREADPOOL_MAX_NUM
_pool.emplace_back( [this]{ //工作线程函数
while (true) //防止 _run==false 时立即结束,此时任务队列可能不为空
{
Task task; // 获取一个待执行的 task
{
// unique_lock 相比 lock_guard 的好处是:可以随时 unlock() 和 lock()
unique_lock<mutex> lock{ _lock };
_task_cv.wait(lock, [this] { // wait 直到有 task, 或需要停止
return !_run || !_tasks.empty();
});
if (!_run && _tasks.empty())
return;
_idlThrNum--;
task = move(_tasks.front()); // 按先进先出从队列取一个 task
_tasks.pop();
}
task();//执行任务
#ifdef THREADPOOL_AUTO_GROW
if (_idlThrNum>0 && _pool.size() > _initSize) //支持自动释放空闲线程,避免峰值过后大量空闲线程
return;
#endif // !THREADPOOL_AUTO_GROW
{
unique_lock<mutex> lock{ _lock };
_idlThrNum++;
}
}
});
{
unique_lock<mutex> lock{ _lock };
_idlThrNum++;
}
}
}
};

}

#endif //https://github.com/lzpong/

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "threadpool.h"
#include <iostream>
#include <windows.h>

void fun1(int slp)
{
printf(" hello, fun1 ! %d\n" ,std::this_thread::get_id());
if (slp>0) {
printf(" ======= fun1 sleep %d ========= %d\n",slp, std::this_thread::get_id());
std::this_thread::sleep_for(std::chrono::milliseconds(slp));
//Sleep(slp );
}
}

struct gfun {
int operator()(int n) {
printf("%d hello, gfun ! %d\n" ,n, std::this_thread::get_id() );
return 42;
}
};

class A { //函数必须是 static 的才能使用线程池
public:
static int Afun(int n = 0) {
std::cout << n << " hello, Afun ! " << std::this_thread::get_id() << std::endl;
return n;
}

static std::string Bfun(int n, std::string str, char c) {
std::cout << n << " hello, Bfun ! "<< str.c_str() <<" " << (int)c <<" " << std::this_thread::get_id() << std::endl;
return str;
}
};

int main() {
try {
std::threadpool executor{ 50 };
A a;
std::future<void> ff = executor.commit(fun1,0);
std::future<int> fg = executor.commit(gfun{},0);
std::future<int> gg = executor.commit(a.Afun, 9999); //IDE提示错误,但可以编译运行
std::future<std::string> gh = executor.commit(A::Bfun, 9998,"mult args", 123);
std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !"; });

std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(900));

for (int i = 0; i < 50; i++) {
executor.commit(fun1,i*100 );
}
std::cout << " ======= commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl;

std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));

ff.get(); //调用.get()获取返回值会等待线程执行完,获取返回值
std::cout << fg.get() << " " << fh.get().c_str()<< " " << std::this_thread::get_id() << std::endl;

std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));

std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id() << std::endl;
executor.commit(fun1,55).get(); //调用.get()获取返回值会等待线程执行完

std::cout << "end... " << std::this_thread::get_id() << std::endl;


std::threadpool pool(4);
std::vector< std::future<int> > results;

for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.commit([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
std::cout << " ======= commit all2 ========= " << std::this_thread::get_id() << std::endl;

for (auto && result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
catch (std::exception& e) {
std::cout << "some unhappy happened... " << std::this_thread::get_id() << e.what() << std::endl;
}
}

C++11 语言细节:

  1. using Task = function<void()> 是类型别名,简化了 typedef 的用法。function<void()> 可以认为是一个函数类型,接受任意原型是 void() 的函数,或是函数对象,或是匿名函数。void() 意思是不带参数,没有返回值。
  2. pool.emplace_back([this]{...})pool.push_back([this]{...}) 功能一样,只不过前者性能会更好。
  3. pool.emplace_back([this]{...}) 构造了一个线程对象,执行函数是 Lambda匿名函数。
  4. 所有对象的初始化方式均采用了 {},而不再使用 () 方式,因为风格不够一致且容易出错。
  5. 匿名函数: [this]{...} 不多说。[] 是捕捉器,this 是引用域外的变量 this指针,内部使用死循环, 由 cv_task.wait(lock,[this]{...}) 来阻塞线程。
  6. delctype(expr) 用来推断 expr 的类型,和 auto 是类似的,相当于类型占位符,占据一个类型的位置;auto f(A a, B b) -> decltype(a+b) 是一种用法,不能写作 decltype(a+b) f(A a, B b)
  7. commit 方法是不是略奇葩!可以带任意多的参数,第一个参数是 f,后面依次是函数 f 的参数(注意:参数要传struct/class的话,建议用pointer,小心变量的作用域)!可变参数模板是 c++11 的一大亮点。
  8. commit 直接使用智能调用 stdcall 函数,但有两种方法可以实现调用类成员,一种是使用 bind:commit(std::bind(&Dog::sayHello, &dog)); 一种是用 mem_fn: commit(std::mem_fn(&Dog::sayHello), &dog);
  9. make_shared() 用来构造 shared_ptr 智能指针。用法大体是 shared_ptr p = make_shared(4) 然后 *p == 4 。智能指针的好处就是自动 delete。
  10. bind 函数,接受函数 f 和部分参数,返回currying后的匿名函数,譬如 bind(add, 4) 可以实现类似 add(4) 的函数。
  11. forward() 函数,类似于 move() 函数,后者是将参数右值化,前者是不改变最初传入的类型的引用类型(左值还是左值,右值还是右值)。
  12. packaged_task 就是任务函数的封装类,通过 get_future() 获取 future ,然后通过 future 可以获取函数的返回值(future.get());packaged_task 本身可以像函数一样调用。
  13. queue 是队列类, front() 获取头部元素, pop() 移除头部元素;back() 获取尾部元素,push() 尾部添加元素。
  14. lock_guardmutex 的 stack 封装类,构造的时候 lock(),析构的时候 unlock(),是 C++ RAII 的 idea。
  15. condition_variable cv; 条件变量,需要配合 unique_lock 使用;unique_lock 相比 lock_guard 的好处是:可以随时 unlock()lock()cv.wait() 之前需要持有 mutex,wait() 本身会解锁,如果条件满足则会重新持有锁。
  16. 最后线程池析构的时候,join() 可以等待任务都执行完再结束,很安全。

简单写法

ThreadPool.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#pragma once  

#include <iostream>
#include <thread>
#include <future>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <type_traits>

//线程池
class ThreadPool
{
public:
explicit ThreadPool(int ThreadNums); //加上explicit避免产生诸如ThreadPool pool = 1;这样的隐式转换
~ThreadPool();

//添加任务到队列
//F&&万能引用,如果F是左值,那得到的就是左值;如果F是右值,那得到的就是右值
//F为可调用对象类型,Arg为其对应的参数类型
template<typename F, typename ...Arg> //...Arg为可变参数模板,任意个数的参数
auto EnterQueues(F&& f, Arg&&... arg) -> std::future<typename std::invoke_result<F, Arg...>::type>;
//result_of是类型萃取工具,利用::type来推导出可调用对象F在传入参数Arg...时的返回类型。头文件<type_traits>
//C++17中result_of被启用,使用invoke_result

private:
void worker(); //线程的执行内容
bool IsStop; //标识,当前线程池是不是停止
std::condition_variable cv;
std::mutex mtx;
std::vector<std::thread> workers; //线程集合(线程池)
std::queue<std::function<void()>> MyQueue; //任务队列,任务封装成了void返回类型的函数
//如果用户放进来的函数有返回值,要想获得这个返回值,就需要利用packaged_task返回一个future对象,利用该对象获取返回值
};

//构造函数
ThreadPool::ThreadPool(int ThreadNums) : IsStop(false)
{
for (size_t i = 0; i < ThreadNums; ++i) {
workers.emplace_back([this]() {
this->worker(); //此时线程就开始运行了
});
}
}

//析构函数
ThreadPool::~ThreadPool()
{
//更改停止标识
{ //添加作用域,离开后自动释放。加锁保证线程安全地将IsStop置为true
std::unique_lock<std::mutex> lock(mtx);
IsStop = true;
}

//通知所有阻塞中的线程,让它们继续进行
cv.notify_all();

//确保线程执行完成
for (std::thread& OneThread : workers)
{
OneThread.join(); //都加入(阻塞)到主线程中(不是指执行),当所有子线程释放以后,主线程才能释放
}
}

//添加任务
template<typename F, typename ...Arg> //...Arg为可变参数模板,任意个数的参数
auto ThreadPool::EnterQueues(F&& f, Arg&&... arg) -> std::future<typename std::invoke_result<F, Arg...>::type>
{
//获得f执行后的类型
using functype = typename std::result_of<F(Arg...)>::type;

//获得一个智能指针,指向一个被包装为functype()的task
//之所以使用指针而不是使用局部变量,是为了延长生命周期
auto task = std::make_shared<std::packaged_task<functype()>>(
std::bind(std::forward<F>(f), std::forward<Arg>(arg)...) //绑定成function<void()>类型
);
//std::forward完美转发,发的是右值则传右值,发的是左值则传左值

//获得future
std::future<functype> rsFuture = task->get_future();

//将任务添加到队列
{
std::lock_guard<std::mutex> lockGuard(mtx);
if (IsStop)
{
throw std::runtime_error("出错:线程池已经停止了");
}

MyQueue.emplace([task]() {
(*task)();
});
}

//通知线程去执行任务
cv.notify_one();

return rsFuture;
}

//每个具体的工作任务
void ThreadPool::worker()
{
while (true)
{
//定义任务
std::function<void()> task;

//从队列中取得一个任务
{
std::unique_lock<std::mutex> uniqueLock(mtx);

cv.wait(uniqueLock, [this]() {
return !this->MyQueue.empty() || this->IsStop;
});

if (IsStop && MyQueue.empty())
{
return;
}
task = std::move(MyQueue.front());
MyQueue.pop();
}

//执行任务
task();
}
}

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>  
#include "ThreadPool.hpp"

int main() {
system("chcp 65001");
ThreadPool MyPool(4);
for (int i = 0; i < 20; ++i) { //即使是i < 20,仍然只有4个线程运行
std::future<int> MyFuture = MyPool.EnterQueues([](int a, int b) -> int {
std::cout << "当前线程: " << std::this_thread::get_id() << std::endl;
return a + b;
}, 10 * i, 10 * i);

std::cout << "thread rs: " << MyFuture.get() << std::endl;
}
return 0;
}

✅注:关于 hpp 文件
HPP(Header Plus Plus)是C++中的一种特殊头文件格式,它将类的声明和实现代码放在同一个文件中。与传统的 .h.cpp 文件不同,HPP文件允许在头文件中直接包含实现代码,从而减少了代码文件的数量和编译次数。这种文件格式特别适合用于编写模板类和开源库,因为它简化了代码的管理和使用。

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
thread rs: 当前线程: 2
0
thread rs: 当前线程: 3
20
thread rs: 当前线程: 4
40
thread rs: 当前线程: 5
60
thread rs: 当前线程: 2
80
thread rs: 当前线程: 3
100
thread rs: 当前线程: 4
120
thread rs: 当前线程: 5
140
thread rs: 当前线程: 2
160
thread rs: 当前线程: 3
180
thread rs: 当前线程: 4
200
thread rs: 当前线程: 5
220
thread rs: 当前线程: 2
240
thread rs: 当前线程: 3
260
thread rs: 当前线程: 4
280
thread rs: 当前线程: 5
300
thread rs: 当前线程: 2
320
thread rs: 当前线程: 3
340
thread rs: 当前线程: 4
360
thread rs: 当前线程: 5
380