参考文章:C++多线程开发基础入门教程 ,C++学习笔记:并发与多线程 参考视频:C++编程进阶教程
基本概念 什么是线程、进程、并发 线程 (Thread)是操作系统能够进行CPU调度的最小单位 ,它被包含在进程(program)之中 ,一个进程可包含单个或者多个线程。 一条线程指的是进程中单一的控制流,每条线程执行不同的任务。
进程 是运行起来的可执行程序。并发 是指多个任务同时发生,一个程序同时执行多个独立任务。
进程与线程的关系 一个进程可以包含多个线程,这些线程共享 相同的进程空间(代码段、数据段、堆) 和系统资源(文件描述符、信号处理) ,但各自有独立的栈空间和线程控制块 ,不共享内存。
🚀详细区分:
容器 :每个进程都有相应的线程。进程是线程的容器 。最小单位 :进程是资源分配 的最小单位,而线程是程序执行 的最小单位。地址空间 :进程有自己独立的地址空间 ,而线程没有独立的地址空间,同一进程 的线程共享本进程的地址空间 。资源 :进程之间的资源是独立的 ,而同一进程内的线程共享本进程的资源 。并发执行 :进程可以并发执行,同一进程 内的多个线程也可以并发执行。锅炉爷爷是一个进程,每条手可以被看成一个线程。
线程的特点 轻量级 :与进程相比,线程的创建和销毁成本低。因为线程是进程的一个执行流,共享进程的大部分资源,只需要少量的额外开销来维护进程的状态和控制信息。共享地址空间和资源 :同一进程内的线程共享进程地址空间和全局变量等资源,这使得线程间通信更加便捷。但是这也带来了数据同步和互斥问题,需要适当的同步机制来避免数据竞争和死锁的问题。并发执行 :多个线程可以在同一时间内并发执行,提高了程序的执行效率。但是由于线程的执行顺序和速度受到操作系统调度策略和硬件性能的影响,因此线程的执行结果可能是不确定的。独立调度 :线程是独立调度的基本单位。在多线程操作系统中,调度器根据线程的优先级、状态等因素来决定线程的调度顺序和执行时间。什么是多线程编程 一个程序中创建多个线程并发 的执行,每个线程执行不同的任务。
为什么使用多线程/并发(优点) 充分利用 CPU 资源 提高程序响应速度 便于程序设计和维护 并发与并行的区别 并发:同一时间段,多个任务交替 执行。 并行:同一时间段,多个任务同时 执行。 线程的声明周期 新建状态 New 就绪状态 Runnable 运行状态 Running 阻塞状态 Blocked 死亡状态 Dead Thread 创建线程 总体代码 需要包含头文件 <thread>
。可通过回调函数、仿函数、Lambda表达式创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 void myprint () { cout << "自己创建的线程开始执行" << endl; cout << "自己创建的线程执行中1" << endl; cout << "自己创建的线程执行中2" << endl; cout << "自己创建的线程执行中3" << endl; cout << "自己创建的线程执行中4" << endl; cout << "自己创建的线程执行中5" << endl; cout << "自己创建的线程执行完毕" << endl; } void test05_01 () { thread myobj (myprint) ; if (myobj.joinable ()) { cout << "joinable() = true" << endl; } else { cout << "joinable() = false" << endl; } myobj.join (); if (myobj.joinable ()) { cout << "joinable2() = true" << endl; } else { cout << "joinable2() = false" << endl; } cout << "主线程运行结束" << endl; cout << "主线程运行结束2" << endl; cout << "主线程运行结束3" << endl; cout << "主线程运行结束4" << endl; cout << "主线程运行结束5" << endl; cout << "主线程运行结束6" << endl; } class TA { public : TA (int i):m_i (i) { cout << "TA有参构造函数被执行" << endl; } TA (const TA &ta) :m_i (ta.m_i) { cout << "TA拷贝构造函数被执行" << endl; } ~TA () { cout << "TA析构函数被执行" << endl; } void operator () () { cout << "operator()线程开始执行" << endl; cout << "m_i = " << m_i << endl; cout << "m_i = " << m_i << endl; cout << "m_i = " << m_i << endl; cout << "m_i = " << m_i << endl; cout << "m_i = " << m_i << endl; cout << "m_i = " << m_i << endl; cout << "operator()线程执行完毕" << endl; } int m_i; }; void test05_02 () { int myi = 6 ; TA ta (myi) ; thread myjob (ta) ; myjob.join (); cout << "主线程与子线程汇合" << endl; } auto mylamthread = []() { cout << "Lambda线程开始执行" << endl; cout << "Lambda线程执行结束" << endl; }; void test05_03 () { thread myjob (mylamthread) ; myjob.join (); cout << "主线程与子线程汇合" << endl; }
通过可调用对象创建线程 函数指针 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void FunctionPtrTask () { cout << "Executed" << endl; } void (*ptr)() = []() { std::cout << "Executed" << endl; }; int main () { thread t (FunctionPtrTask) ; t.join (); return 0 ; }
成员函数指针 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class TestClass { public : void MemberFunctionTask () { cout << "Executed" << endl; } } int main () { TestClass obj; thread t (&TestClass::MemberFunctionTask, &obj) ; t.join (); return 0 ; }
Lambda 表达式 1 2 3 4 5 6 7 8 9 10 int main () { thread t ([]() { std::cout << "Executed" << endl; }) ; t.join (); return 0 ; }
或者:
1 2 3 4 5 6 7 8 9 10 int main () { function<void (void )> Lambda = []() { std::cout << "Executed" << endl; }; thread t (Lambda) ; t.join (); return 0 ; }
🤔例子:不在同一作用域,有警告1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 function<void (void )> WrongLambda = []() { std::cout << "Executed" << endl; } void (*ptr)() = []() { std::cout << "Executed" << endl; }; int main () { thread t (WrongLambda) ; t.join (); return 0 ; }
仿函数 functor(也叫函数对象) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 struct Functor { void operator () () { cout << "Executed" << endl; } } int main () { Functor functor; thread t (functor) ; t.join (); return 0 ; }
绑定对象(std::bind
创建) 1 2 3 4 5 6 7 8 9 10 11 12 void FunctionWithArgs (int a, int b) { cout << "a = " << a << ",b = " << b << endl; } int main () { auto BoundFunc = std::bind (FunctionWithArgs, 1 , 2 ); thread t (BoundFunc) ; t.join (); return 0 ; }
线程参数传递 通过值传递 见前文,1-4 都没传递任何参数。如果要值传递参数且不使用 std::bind
,参考以下代码:
1 2 3 4 5 6 7 8 9 10 int main () { void (*ptr)(int ) = [](int x) { std::cout << x << endl; }; thread t (ptr, 45 ) ; t.join (); return 0 ; }
或者在使用了 std::bind
的情况下:
1 2 3 4 5 6 7 8 9 10 11 12 void FunctionWithArgs (int a, int b) { cout << "a = " << a << ",b = " << b << endl; } int main () { auto BoundFunc = std::bind (FunctionWithArgs, 1 , placeholders::_1); thread t (BoundFunc, 2 ) ; t.join (); return 0 ; }
通过引用传递 这意味着原始数据和线程中使用的数据是同一个实体(而不是副本),这就需要小心了,如果你在一个线程中修改了数据,这些修改将在所有引用该数据的线程中可见。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void ModifyAndPrint (int & a, int & b) { a += 5 ; b += 10 ; cout << "Modified Values:" << a << "," << b << endl; } int main () { int x = 5 ; int y = 10 ; thread t (ModifyAndPrint, std::ref(x), std::ref(y)) ; t.join (); cout << "Values in main thread: " << x << ", " << y << endl; return 0 ; }
这里使用了 std::ref
来包装 x
和 y
的引用并进行参数传递。
🚀注意事项:
线程安全性:引用传递数据时需要保证对数据的访问是线程安全的,否则可能遇到数据竞争等并发问题。 资源管理:特别是在新线程使用了动态分配的内存时,确保这些资源在不再需要的时候被正确释放。 异常处理:新线程中抛出的异常不会自动传播到创建该子线程的线程,所以需要确保在新线程中处理可能产生的所有异常(不要尝试在主线程中捕获子线程中可能产生的异常)。 join 与 detach 的区别 join
:阻塞调用,等待线程完成,适用于需要同步和资源管理的场景。detach
:非阻塞调用,线程独立运行,适用于后台任务或不需要同步的场景。必须调用 join
或 detach
:否则程序会崩溃。调用 join
或 detach
后,线程对象不再与实际线程相关联,线程对象可以安全销毁。 一个线程对象只能调用一次 join
/ detach
方法。 ✅表格:
特性 join detach 阻塞性 阻塞调用,等待线程完成 非阻塞调用,立即返回(从主线程分离,和主线程并发执行) 资源管理 线程完成后,资源被回收 线程独立运行,资源由操作系统管理 线程控制 主线程需要等待线程完成 主线程不需要等待线程完成 线程对象状态 调用 join
后,线程对象变为无效 调用 detach
后,线程对象变为无效 使用场景 需要等待线程结果或同步时 线程独立运行,不需要同步(如后台任务)
💻代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void SayHello () { int i = 0 ; while (i < 1000 ) { cout << "Hello: " << i << endl; i++; } } int main () { thread t (SayHello) ; t.join (); cout << "This is main Thread." << endl; return 0 ; }
主线程包含的内容 线程 ID:系统中的唯一标识符,区分不同线程。 线程栈 (Thread Stack):每个线程 线程状态:新建 New,就绪 Ready,运行 Running,阻塞 Blocked,终止 Terminated 线程上下文(Thread Context) 线程函数(Thread Function) 线程优先级 线程属性 线程同步原语:互斥锁 Mutex、条件变量、信号量 Semaphore,帮助线程在访问共享资源时避免冲突和竞态条件。 this_thread std::this_thread
是 C++11 引入的一个命名空间 ,提供了与当前线程相关的操作。它包含一些静态成员函数,用于获取当前线程的信息或控制当前线程的行为。
获取当前线程的 ID :this_thread::get_id()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void threadFunction () { cout << "Thread ID: " << this_thread::get_id () << endl; } int main () { thread t (threadFunction) ; t.join (); cout << "Main Thread ID: " << std::this_thread::get_id () << endl; return 0 ; }
暂停当前线程一段时间 :this_thread::sleep_for()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <thread> #include <chrono> using namespace std;void threadFunction () { cout << "Thread starts" << endl; this_thread::sleep_for (std::chrono::seconds (2 )); cout << "Thread resumes after 2 seconds" << endl; } int main () { thread t (threadFunction) ; t.join (); return 0 ; }
暂停当前线程到指定时间点 :std::this_thread::sleep_until()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <thread> #include <chrono> using namespace std;void threadFunction () { auto now = std::chrono::system_clock::now (); auto endTime = now + std::chrono::seconds (2 ); cout << "Thread starts" << endl; this_thread::sleep_until (endTime); cout << "Thread resumes after 2 seconds" << endl; } int main () { thread t (threadFunction) ; t.join (); return 0 ; }
主动让出 CPU 时间片 :std::this_thread::yield()
1 2 3 4 5 6 7 8 9 10 11 12 void threadFunction () { for (int i = 0 ; i < 5 ; ++i) { cout << "Thread is running" << endl; this_thread::yield (); } } int main () { thread t (threadFunction) ; t.join (); return 0 ; }
线程同步 概念 线程同步(Thread Synchronization)是指通过一定机制来控制多个线程之间的执行顺序,以确保它们能够正确地访问和修改共享资源 ,从而避免数据竞争和不一致性问题。 ✅C++提供了多种线程同步机制(重要):
互斥锁(Mutex) :当一个线程想要访问共享资源时,它首先会尝试获取与该资源关联的互斥锁 。如果锁已经被其他线程持有,则该线程被阻塞,直到锁被释放。这样可以确保任何时候只有一个线程能够访问共享资源。包含于头文件 <mutex>
中。条件变量(Condition Value) :使线程在满足某条件前等待,通常与互斥锁一起使用,以便在等待条件成立时释放锁,并在条件成立时创建锁 。这允许线程在等待期间不占用锁,提高并发性能。包含于头文件 <condition_variable>
中。信号量(Semaphore) :允许多个线程同时访问共享资源,但限制同时访问的线程数量 。信号量内部维护一个计数器,用于表示可用资源的数量。当线程需要访问资源时,它会尝试减少计数器的值;当线程释放资源时,它会增加计数器的值。当计数器的值小于零时,尝试获取资源的线程将被阻塞。(PV 操作)原子操作(Atomic Operations) :原子操作是不可中断的操作(执行过程中不会被其他线程打断 ),用于安全地更新共享数据,而无需使用互斥锁等同步机制 。包含于 C++11 及以后的头文件 <atomic>
中。读写锁 :允许多个线程同时读取 ,但只允许一个线程写入 。栅栏(Barrier) :用于协调多个线程的执行,使得它们在某个同步点等待 ,直到所有线程都到达该点(C++20 新特性)。🚩互斥锁 mutex 当一个线程想要访问共享资源时,它首先会尝试获取与该资源关联的互斥锁 。如果锁已经被其他线程持有,则该线程被阻塞,直到锁被释放。这样可以确保任何时候只有一个线程能够访问共享资源。
💻代码示例:
未引入互斥锁时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int counter = 0 ;void increment_counter (int times) { for (int i = 0 ; i < times; i++) { counter++; } } int main () { thread t1 (increment_counter, 100000 ) ; thread t2 (increment_counter, 100000 ) ; t1.join (); t2.join (); cout << "最终结果:" << counter << endl; return 0 ; }
🚩引入互斥锁(需要头文件 <mutex>
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 int counter = 0 ; mutex mtx; void increment_counter (int times) { for (int i = 0 ; i < times; i++) { mtx.lock (); counter++; mtx.unlock (); } } int main () { thread t1 (increment_counter, 100000 ) ; thread t2 (increment_counter, 100000 ) ; t1.join (); t2.join (); cout << "最终结果:" << counter << endl; return 0 ; }
注意事项 死锁 : 如果线程在持有互斥量的情况下调用了另一个阻塞操作(如另一个互斥量的 lock()
),并且这个阻塞操作永远不会完成(因为其他线程持有它需要的资源),那么就会发生死锁。避免死锁的一种方法是始终按照相同的顺序锁定互斥量 ,或者使用更高级的同步原语 ,如 std::lock_guard
或 std::unique_lock
,它们可以自动管理锁的获取和释放。异常安全 : 如果在锁定互斥量后抛出异常,那么必须确保互斥量被正确解锁。使用 std::lock_guard
或 std::unique_lock
可以自动处理这种情况,因为它们在析构时会释放锁。不要手动解锁未锁定的互斥量 : 调用 unlock
之前,必须确保互斥量已经被 lock
锁定。否则该行为是未定义的。不要多次锁定同一个互斥量 : 对于非递归互斥量(如 std::mutex
),不要在同一线程中多次锁定它,这会导致未定义行为。如果需要递归锁定,使用 std::recursive_mutex
。使用 RAII 管理锁 : 使用 RAII(资源获取即初始化)原则来管理锁的生命周期,通过 std::lock_guard
或者 std::unique_lock
来确保锁在不需要时自动释放。避免长时间持有锁 : 尽量缩短持有锁的时间,以减少线程之间的争用 ,提高程序并发性能。考虑使用更高级的同步原语 : 除了 std::mutex
,还可以使用条件变量 std::condition_variable
、读写锁 std::shared_mutex
等等。🚀使用 std::lock_guard
或 std::unique_lock
(遵循 RAII 原则)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <iostream> #include <mutex> std::mutex mtx; void SafeFunction () { std::lock_guard<std::mutex> lock (mtx) ; if () { throw std::runtime_error ("An error occurred!" ); } } catch (const std::exception& e){ std::cerr << e.what () << std::endl; } }
✅关于RAII 全称为 Resource Acquisition Is Initialization (资源获取即初始化)。它是 C++ 中管理资源(如内存、文件句柄、互斥锁等)的一种重要机制。RAII 的核心思想是将资源的获取和释放与对象的生命周期绑定,从而确保资源在对象创建时自动获取,在对象销毁时自动释放 。
优点 :自动管理资源、异常安全、简化代码。
RAII 的实现依赖于构造和析构函数。💻示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <iostream> #include <string> using namespace std;class FileHandler {public : FileHandler (const string& filename) { cout << "Opening file: " << filename << endl; } ~FileHandler () { cout << "Closing file" << endl; } void write (const string& data) { cout << "Writing data: " << data << endl; } }; int main () { FileHandler file ("example.txt" ) ; file.write ("Hello, RAII!" ); return 0 ; }
构造函数 :在 FileHandler
对象创建时,构造函数会自动打开文件(模拟)。析构函数 :在 FileHandler
对象销毁时,析构函数会自动关闭文件(模拟)。资源管理 :通过对象的生命周期管理文件的打开和关闭,避免了手动调用 close()
的复杂性。🚩RAII 在标准库中应用:
动态内存管理:unique_ptr
和 shared_ptr
互斥锁管理:lock_guard
和 unique_lock
文件管理: ifstream
和 ofstream
mutex 的四种类型 std::mutex
:最基本的互斥锁,适用于简单的资源保护。std::recursive_mutex
:支持递归锁定,适用于递归函数或多次锁定场景。std::timed_mutex
:支持超时锁定,适用于需要避免长时间阻塞的场景。std::recursive_timed_mutex
:结合递归锁定和超时锁定功能,适用于复杂的同步需求。💻代码示例:recursive_mutex
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <iostream> #include <thread> #include <mutex> using namespace std;std::recursive_mutex mtx; void recursiveFunction (int count) { std::lock_guard<std::recursive_mutex> lock (mtx) ; cout << "Count: " << count << endl; if (count > 0 ) { recursiveFunction (count - 1 ); } } int main () { std::thread t (recursiveFunction, 5 ) ; t.join (); return 0 ; } void recursiveFunction () { mtx.lock (); std::cout << "Thread: " << std::this_thread::get_id () << " locked mutex." << std::endl; mtx.lock (); std::cout << "Thread: " << std::this_thread::get_id () << " locked mutex." << std::endl; mtx.unlock (); std::cout << "Thread: " << std::this_thread::get_id () << " unlocked mutex." << std::endl; mtx.unlock (); std::cout << "Thread: " << std::this_thread::get_id () << " unlocked mutex." << std::endl; }
timed_mutex
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #include <iostream> #include <thread> #include <mutex> #include <chrono> using namespace std;std::timed_mutex mtx; void threadFunction () { if (mtx.try_lock_for (std::chrono::seconds (1 ))) { cout << "Thread ID: " << std::this_thread::get_id () << " acquired the lock" << endl; std::this_thread::sleep_for (std::chrono::seconds (2 )); mtx.unlock (); } else { cout << "Thread ID: " << std::this_thread::get_id () << " failed to acquire the lock" << endl; } } int main () { std::thread t1 (threadFunction) ; std::thread t2 (threadFunction) ; t1.join (); t2.join (); return 0 ; } void threadFunction () { auto deadline = std::chrono::system_clock::now () + std::chrono::seconds (1 ); if (mtx.try_lock_until (deadline)) { cout << "Thread ID: " << std::this_thread::get_id () << " acquired the lock" << endl; std::this_thread::sleep_for (std::chrono::seconds (2 )); mtx.unlock (); } else { cout << "Thread ID: " << std::this_thread::get_id () << " failed to acquire the lock" << endl; } }
recursive_timed_mutex
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <iostream> #include <thread> #include <mutex> #include <chrono> using namespace std;std::recursive_timed_mutex mtx; void recursiveFunction (int count) { if (mtx.try_lock_for (std::chrono::seconds (1 ))) { cout << "Count: " << count << endl; if (count > 0 ) { recursiveFunction (count - 1 ); } mtx.unlock (); } else { cout << "Failed to acquire the lock at count: " << count << endl; } } int main () { std::thread t (recursiveFunction, 5 ) ; t.join (); return 0 ; }
(了解)关于 std::chrono
: 它提供了高精度的时间操作和计时功能,允许开发者以灵活的方式处理时间点和时间段。std::chrono
的核心概念包括 时间点(time_point) 、时间段(duration) 和 时钟(clock) 。
常用操作有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 std::chrono::seconds sec (10 ) ; std::chrono::milliseconds ms (500 ) ; std::chrono::microseconds us (1000 ) ; std::chrono::seconds sec = std::chrono::duration_cast <std::chrono::seconds>(ms); std::chrono::seconds sec1 (10 ) ; std::chrono::seconds sec2 (20 ) ; std::chrono::seconds sec3 = sec1 + sec2; auto now = std::chrono::system_clock::now ();std::time_t now_time_t = std::chrono::system_clock::to_time_t (now); auto now = std::chrono::system_clock::now ();auto future = now + std::chrono::seconds (10 ); auto now = std::chrono::system_clock::now ();auto steady_now = std::chrono::steady_clock::now ();
lock_guard 什么是 lock_guard
?为什么需要 lock_guard
?
lock_guard
是一个模板类,位于 <mutex>
头文件中,符合 RAII 风格,用于管理 mutex
的生命周期,解决了手动管理 mutex
锁定和解锁时可能出现的问题(忘记解锁、异常情况下未解锁等)。 可以理解为,lock_guard
是对 mutex 的一种管理封装,它可以更好地管理 mutex。
💻代码示例
通过设定作用域,使得std::lock_guard在合适的地方被析构 (在互斥量锁定到互斥量解锁之间的代码叫做临界区(需要互斥访问共享资源的那段代码称为临界区),临界区范围应该尽可能的小,即lock互斥量后应该尽早unlock),通过使用{}来调整作用域范围,可使得互斥量m在合适的地方被解锁 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <iostream> #include <thread> #include <mutex> using namespace std;mutex m; void proc1 (int a) { lock_guard<mutex> g1 (m) ; cout << "proc1函数正在改写a" << endl; cout << "原始a为" << a << endl; cout << "现在a为" << a + 2 << endl; } void proc2 (int a) { { lock_guard<mutex> g2 (m) ; cout << "proc2函数正在改写a" << endl; cout << "原始a为" << a << endl; cout << "现在a为" << a + 1 << endl; } cout << "作用域外的内容3" << endl; cout << "作用域外的内容4" << endl; cout << "作用域外的内容5" << endl; } int main () { int a = 0 ; thread t1 (proc1, a) ; thread t2 (proc2, a) ; t1.join (); t2.join (); return 0 ; }
std::lock_guard
也可以传入两个参数,第一个参数被 adopt_lock
标识时,表示构造函数中不再进行互斥量锁定,因此此时需要提前手动锁定 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <iostream> #include <thread> #include <mutex> using namespace std;mutex m; void proc1 (int a) { m.lock (); lock_guard<mutex> g1 (m, std::adopt_lock) ; cout << "proc1函数正在改写a" << endl; cout << "原始a为" << a << endl; cout << "现在a为" << a + 2 << endl; } void proc2 (int a) { lock_guard<mutex> g2 (m) ; cout << "proc2函数正在改写a" << endl; cout << "原始a为" << a << endl; cout << "现在a为" << a + 1 << endl; } int main () { int a = 0 ; thread t1 (proc1, a) ; thread t2 (proc2, a) ; t1.join (); t2.join (); return 0 ; }
lock_guard
禁用了拷贝构造函数和拷贝复制运算符,所以它不支持拷贝语义 ,只能通过直接创建对象 来使用。这样可以避免多个 lock_guard
对象同时管理同一个互斥锁而导致的错误行为。lock_guard
仅用于管理 mutex 的锁定和解锁(单一职责),对于更复杂的锁定需求,使用 std::unique_lock
。unique_lock 为什么需要 unique_lock
?
mutex
在管理方面有瑕疵,因此使用互斥量封装器 lock_guard
来智能管理 mutex,但其功能较弱,需要功能更加强大的 unique_lock
(也叫”灵活锁”)。
lock_guard
的瑕疵在于?
下表给出二者区别:
特性 lock_guard unique_lock 自动锁定 ✅ ✅ 自动解锁 ✅ ✅ 手动加锁 ✅ ✅ 延迟锁定 不支持 支持 手动解锁 不支持,只能通过作用域自动解锁 支持 参数 支持 adopt_lock 支持 adopt_lock/try_to_lock/defer_lock 与条件变量结合 不支持 支持 递归锁 不支持 支持 灵活性 低 高 性能开销 低 高 适用场景 简单的锁管理 复杂的锁管理
💻代码示例 构造(多种锁定策略)
std::defer_lock
:延迟锁定,互斥锁在构造时不锁定。需要时调用 lock
手动锁定,结束时自动解锁。std::try_to_lock
:尝试锁定,如果互斥锁已被占用,则立即返回。std::adopt_lock
:接管已经锁定的互斥锁。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 mutex mtx; timed_mutex TimeMtx; unique_lock<mutex> lock (mtx) ; unique_lock<timed_mutex> TimeLock (TimeMtx, std::chrono::seconds(1 )) ;unique_lock<mutex> lock1 (mtx, std::defer_lock) ; unique_lock<mutex> lock2 (mtx, std::try_to_lock) ;unique_lock<mutex> lock3 (mtx, std::adopt_lock) ; unique_lock<mutex> lock4 (move(lock)) ; if (lock2.owns_lock ()){ cout << "锁成功" << endl; }
🚩条件变量 condition_variable 什么是条件变量 std::condition_variable
?
std::condition_variable
是 C++11 引入的一个同步原语,用于实现线程间的通信和同步。它通常与 std::mutex
配合使用,允许一个线程等待另一个线程的通知 ,从而实现线程间的协作。
条件变量的使用场景?
生产者-消费者模型 :一个线程生产数据,另一个线程消费数据,通过条件变量实现同步。事件通知 :一个线程等待某个事件的发生,另一个线程在事件发生时通知它。线程协作 :多个线程需要协作完成任务,通过条件变量实现同步。💻代码示例 等待(wait) :一个线程调用 wait()
方法,进入等待状态,直到另一个线程调用 notify_one()
或 notify_all()
通知它。 线程进入等待期间,会释放与之关联的互斥锁,允许其他线程访问共享数据。当线程被唤醒后,会重新获取互斥锁并继续执行。通知(notify) :另一个线程调用 notify_one()
或 notify_all()
方法,唤醒等待的线程。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include <iostream> #include <thread> #include <mutex> #include <queue> #include <condition_variable> using namespace std; mutex mtx; condition_variable cv; int MyValue = 0 ; bool turn = false ; void Increment (int thread_id) { for (int i = 0 ; i < 3 ; ++i) { unique_lock<mutex> lock (mtx) ; cv.wait (lock, [&] { return thread_id == 1 ? !turn : turn; }); ++MyValue; cout << "Thread " << thread_id << " incremented value to: " << MyValue << endl; turn = !turn; cv.notify_all (); } } int main () { thread t1 (Increment, 1 ) ; thread t2 (Increment, 2 ) ; t1.join (); t2.join (); cout << "Final Value: " << MyValue << endl; return 0 ; }
wait(std::unique_lock<std::mutex>&, Predicate)
,其中 Predicate
指可调用对象。wait_for(std::unique_lock<std::mutex>&, std::chrono::seconds, Predicate)
:允许指定一个超时时间,这段时间内没有收到唤醒信号/条件不满足,则函数会返回,并且线程会重新获取互斥锁。notify_one()
:唤醒一个等待的线程,适用于只有一个线程需要被唤醒的场景。 例如,生产者-消费者模型中,生产者生产一个数据后,只需要唤醒一个消费者。notify_all()
:唤醒所有等待的线程,适用于多个线程需要被唤醒的场景。 例如,多个消费者等待同一个条件变量时,生产者生产数据后,需要唤醒所有消费者。代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> using namespace std; std::mutex mtx; std::condition_variable cv; std::queue<int > SharedQueue; void producer (bool notify_all) { for (int i = 0 ; i < 50000 ; ++i) { std::unique_lock<std::mutex> lock (mtx) ; SharedQueue.push (i); cout << "Produced: " << i << endl; if (notify_all) { cv.notify_all (); } else { cv.notify_one (); } } } void consumer (int id) { while (true ) { std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, [] { return !SharedQueue.empty (); }); int value = SharedQueue.front (); SharedQueue.pop (); cout << "Consumer " << id << " consumed: " << value << endl; if (value == 4 ) break ; } } int main () { bool notify_all = true ; std::thread producerThread (producer, notify_all) ; std::thread consumerThread1 (consumer, 1 ) ; std::thread consumerThread2 (consumer, 2 ) ; producerThread.join (); consumerThread1.join (); consumerThread2.join (); return 0 ; }
选择 notify_one()
:等待队列中的第一个线程(和 id 等无关)执行函数
1 2 3 4 5 6 7 8 9 10 11 12 13 Produced: 0 Produced: 1 Produced: 2 ... Consumer 1 consumed: 0 Consumer 1 consumed: 1 Produced: 9 Produced: 10 Consumer 1 consumed: 3 Consumer 1 consumed: 4 Consumer 1 consumed: 5 ... Consumer 1 consumed: 49999
选择 notify_all()
:由于消费者线程之间的竞争,很可能出现仅有 consumer 1/2 参与的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 Produced: 0 Produced: 1 Produced: 2 ... Consumer 1 consumed: 0 Consumer 1 consumed: 1 Produced: 9 Produced: 10 Consumer 2 consumed: 3 Consumer 2 consumed: 4 Consumer 2 consumed: 5 ... Consumer 1 consumed: 49999
每次运行的输出结果不会完全一致,但大体逻辑不变。
🚩读写锁 shared_mutex 基本概念 什么是读写锁?
读写锁(共享锁、独占锁)是一种同步机制,允许多个线程同时读取资源,但是同一时间只允许一个线程写入资源。
基本特征?
读读之间不互斥,写写/读写之间互斥。
特性 描述 共享锁 - 允许多个线程同时持有 - 多个线程可以并发地读取共享资源 - 不能与独占锁同时持有 独占锁 - 只能有一个线程持有 - 持有独占锁的线程可以写入共享资源 - 当持有独占锁时,其他线程不能持有共享锁或独占锁 读写分离 - 读操作和写操作分开处理,提高并发性能 - 当持有独占锁时,其他线程不能持有共享锁或独占锁 互斥性 - 读写锁中的写操作与其他写操作、读操作互斥 - 读操作之间不互斥 公平性 - 有些读写锁实现提供公平性机制,确保读线程和写线程不会被长期阻塞 性能 - 在读多写少 的情况下能显著提高性能 - 在写多的情况下,读写锁的性能优势可能不明显
代码实现 shared_mutex std::shared_mutex
是 C++17 引入的一种互斥锁,支持多读单写的并发访问模式。 它允许多个线程同时持有共享锁(读锁),但在持有独占锁时,其他线程不能再持有任何类型的锁。 虽然其存在排他性锁定的成员函数(lock/unlock 等),但是一般使用:unique_lock
/ lock_guard
管理独占锁 (排他性锁定),shared_mutex
管理共享锁定 。unique_lock
和 shared_mutex
都会自动上锁(构造函数实现了)。前者位于头文件 <mutex>
中,后者位于头文件 <shared_mutex>
中。
shared_lock C++17 引入的一种锁管理器,用于管理 shared_mutex
的共享锁。shared_lock
同样可以自动获取和释放锁。shared_lock
也称通用共享互斥所有权包装器,unique_lock
也称独占所有权包装器,二者配合使用对 shared_mutex
进行管理,实现读写锁。
成员函数 作用 std::shared_lock(mutex_type& m)
构造时获取共享锁 std::shared_lock(mutex_type& m, std::defer_lock)
构造时不获取锁 std::shared_lock(mutex_type& m, std::try_to_lock)
构造时尝试获取共享锁 std::shared_lock(mutex_type& m, std::adopt_lock)
构造时认为调用者已经持有共享锁 lock()
获取共享锁 try_lock()
尝试获取共享锁 unlock()
释放共享锁 owns_lock()
返回结果:锁是否被持有 release()
释放锁的所有权,但不解锁
💻代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <iostream> #include <vector> #include <thread> #include <shared_mutex> #include <mutex> std::shared_mutex rw_mutex; int shared_data = 0 ; void reader () { std::shared_lock lock (rw_mutex) ; std::cout << "Reader thread: " << std::this_thread::get_id () << " reads value: " << shared_data << std::endl; } void writer (int value) { std::unique_lock lock (rw_mutex) ; shared_data = value; std::cout << "Writer thread: " << std::this_thread::get_id () << " writes value: " << shared_data << std::endl; } int main () { std::vector<std::thread> threads; for (int i = 0 ; i < 5 ; ++i) { threads.emplace_back (reader); } for (int i = 0 ; i < 2 ; ++i) { threads.emplace_back (writer, i); } for (std::thread& t : threads) { t.join (); } return 0 ; }
输出结果:1 2 3 4 5 6 7 Reader thread: Reader thread: 3 reads value: Reader thread: 5 reads value: 00 Reader thread: 4 reads value: 0 Reader thread: 6 reads value: 0 2 reads value: 0 Writer thread: 7 writes value: 0 Writer thread: 8 writes value: 1
读的输出比较混乱,是因为读与读操作之间并不互斥。
🚩原子变量和原子操作 atomic 基本概念 原子变量是什么?
原子变量是指使用 std::atomic
模板类定义的变量,确保在多线程环境中,对变量的读写操作是线程安全的,不会被其他线程中断或干扰。
1 2 3 4 #include <atomic> std::atomic<int > atomicInt (10 ) ;
原子变量的特性
原子性 :对原子变量的读、写、修改操作是不可分割的,即操作要么完全完成,要么完全不完成 。无锁 :原子操作不需要锁机制 ,因此不会引起上下文切换,具有更高的性能。(上下文切换:指操作系统从一个线程转换到另一线程的过程)易用性 :标准库提供了原子操作接口,简化了多线程编程中的同步问题 。原子操作是什么?
原子操作是指对原子变量进行的不可分割的操作。不可分割的意思是这些操作要么完全执行,要么完全不执行,不会在执行过程中被其他线程打断。
代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> #include <atomic> using namespace std; atomic<int > num = 0 ; void AddNum () { for (int i = 0 ; i < 1000000 ; ++i) { num++; } } int main () { thread t1 (AddNum) ; thread t2 (AddNum) ; t1.join (); t2.join (); cout << num << endl; return 0 ; }
✅注意:原子变量可以进行原子操作,原子操作才是真正保证变量是线程安全的根本原因。即使使用原子变量,如果进行的不是原子操作,也无法保证线程安全。 如果使用 num = num + 1
,会发现输出结果并不是预期的 2000000,这是因为该操作并不是原子操作(没有被 atomic
类重载)。
常见原子操作方法 加载和存储操作1 2 int value = atomicInt.load (); atomicInt.store (10 );
读写操作(等价于 load 和 store)1 2 int value = atomicInt; atomicInt = 10 ;
自增自减操作1 2 3 4 atomicInt++; atomicInt--; ++atomicInt; --atomicInt;
复合赋值操作1 2 atomicInt += 5 ; atomicInt &= 3 ;
高级操作compare_exchange
:比较当前值和期望值,如果相等则将当前值替换为新值,否则更新期望值。fetch_add
/ fetch_sub
/ fetch_and
:对当前值进行加法/减法/按位与操作,并返回操作前的值。exchange
:交换当前值和新值,并返回旧值。1 2 3 4 5 6 7 8 9 10 11 12 int expected = 0 ;int desired = 1 ;if (atomicInt.compare_exchange_strong (expected, desired)){ } atomicInt.fetch_add (1 , memory_order_relaxed); std::atomic<int > value (10 ) ;int old_value = value.exchange (20 );
以上原子操作仅针对原子变量生效。原子操作的内存序问题 原子操作可以指定不同的内存顺序,以控制操作的可见性和排序。 内存序不会影响原子操作的原子性,但会影响操作的可见性和顺序。 顺序 特点 memory_order_relaxed
没有同步或顺序约束,仅保证原子性 memory_order_acquire
确保该操作之前的所有读操作不会被重排序到该操作之后 memory_order_release
确保该操作之后的所有写操作不会被重排序到该操作之前 memory_order_acq_rel
同时具备 acquire 和 release 的属性 memory_order_seq_cst
顺序一致性,保证所有线程的操作按照顺序发生
🚩信号量 semaphore 基本概念 什么是信号量?
信号量(Semaphore)是一种用于管理和协调多线程或多进程访问共享资源的同步机制。它通过计数器来控制对资源的访问数量 ,确保多个线程或进程能够安全地使用共享资源而不会发生数据竞争或死锁。传统的锁(如互斥锁)可以用来保护共享资源,但对于某些场景(如资源的计数管理 ),信号量提供了更灵活和高效的解决方案。
信号量的类型?
计数信号量:允许对资源的多次访问,计数信号量的值可以是任意非负整数,表示可以同时访问资源的线程或进程的数量。 二元信号量:也称为互斥信号量 mutex,其值只能是 0 或 1,类似于互斥锁,用于实现对资源的互斥访问。 信号量的作用?
信号量的基本操作?
P 操作(等待,wait):如果信号量的值大于 0,则将其减 1,否则线程会被阻塞,直到信号量的值大于 0。 V 操作(释放,signal):将信号量的值加 1,如果有线程被阻塞在 P 操作上,则唤醒一个阻塞的线程。 PV 操作均为原子操作。类似加锁解锁的操作。counting_semaphore std::counting_semaphore
是 C++20 标准库中的一个类模板,实现了一个计数信号量。需要头文件 <semaphore>
禁止拷贝构造,禁止拷贝赋值操作(不允许 a(b)
或者 a = b
)。 成员函数 作用 void acquire()
尝试获取信号量,如果信号量的计数值为 0,则阻塞当前线程,直到计数值大于 0。 bool try_acquire()
- 尝试获取信号量。如果信号量的计数值大于 0,则将其减 1 并返回 true
。 - 如果计数值为 0,则立即返回 false
,不会阻塞。 bool try_acquire_for(const chrono::duration&)
- 如果在指定时间内信号量的计数值大于 0,则将其减 1 并返回 true
。 - 如果超时仍未获取到信号量,则返回 false
。 bool try_acquire_until(const chrono::time_point&)
- 如果在指定时间点之前信号量的计数值大于 0,则将其减 1 并返回 true
。 - 如果超时仍未获取到信号量,则返回 false
。 void release(std::ptrdiff_t update = 1)
释放信号量,增加信号量的计数值。默认加 1。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <semaphore> std::counting_semaphore<5> semaphore (3 ) ; semaphore.acquire (); semaphore.release (); semaphore.release (2 ); std::counting_semaphore<> dynamic_semaphore (2 ); if (semaphore.try_acquire_for (std::chrono::seconds (1 ))) { } else { }
💻代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <thread> #include <semaphore> #include <vector> using namespace std; std::counting_semaphore<1> sem (1 ) ; void worker (int id) { sem.acquire (); cout << "worker " << id << " is working" << endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); cout << "worker " << id << " is done" << endl; sem.release (); } int main () { cout << "max " << sem.max () << endl; vector<thread> threads; for (int i = 0 ; i < 5 ; ++i) { threads.emplace_back (worker, i); } for (thread& t : threads) { t.join (); } return 0 ; }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 max 1 worker 0 is working worker 0 is done worker 3 is working worker 3 is done worker 4 is working worker 4 is done worker 1 is working worker 1 is done worker 2 is working worker 2 is done
使用多个信号量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 std::counting_semaphore<1> ready (0 ) ;std::counting_semaphore<1> done (1 ) ;void prepare () { done.acquire (); std::cout << "Preparing...\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "Preparation done.\n" ; ready.release (); } void work () { ready.acquire (); std::cout << "Working...\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "Work done.\n" ; done.release (); } int main () { std::thread t1 (prepare) ; std::thread t2 (work) ; t1.join (); t2.join (); std::cout << "All tasks completed.\n" ; return 0 ; }
输出结果:1 2 3 4 5 Preparing... Preparation done. Working... Work done. All tasks completed.
栅栏 barrier 基本概念 什么是栅栏?
栅栏是一种同步原语,用于协调多个线程的执行,使得它们能够在某个特定的点(即栅栏)等待 。直到所有线程都达到这一个点,才能继续执行。 位于 C++20 的头文件 <barrier>
中。
栅栏的作用?
确保并发任务在某些关键时刻同步,比如等待所有线程完成某个阶段的工作,然后再进入下一阶段。 用于阶段同步和批处理。
栅栏的特点?
同步点 :栅栏用于创建一个同步点,确保多个线程在同一时刻同步。计数器 :栅栏内部维护一个计数器,记录到达栅栏的线程数量。当计数器达到预设值时,所有等待的线程被同时唤醒。可重用性 :C++20 中的 std::barrier
是可重用的,线程可以反复使用同一个栅栏对象进行同步。💻代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <iostream> #include <vector> #include <thread> #include <barrier> using namespace std; void worker (int id, std::barrier<>& sync_point) { cout << "Worker " << id << " is doing phase 1 work.\n" ; std::this_thread::sleep_for (std::chrono::milliseconds (100 * id)); sync_point.arrive_and_wait (); cout << "Worker " << id << " has completed phase 1 and is doing phase 2 work.\n" ; std::this_thread::sleep_for (std::chrono::milliseconds (100 * id)); } int main () { const int num_threads = 5 ; std::barrier sync_point (num_threads) ; vector<thread> threads; for (int i = 1 ; i <= num_threads; ++i) { threads.emplace_back (worker, i, std::ref (sync_point)); } for (thread& t : threads){ t.join (); } std::cout << "All tasks completed.\n" ; return 0 ; }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 Worker Worker 2 is doing phase 1 work. 1 is doing phase 1 work. Worker 3 is doing phase 1 work. Worker 4 is doing phase 1 work. Worker 5 is doing phase 1 work. Worker Worker 5 has completed phase 1 and is doing phase 2 work. Worker 1 has completed phase 1 and is doing phase 2 work. Worker 3 has completed phase 1 and is doing phase 2 work. 4 has completed phase 1 and is doing phase 2 work. Worker 2 has completed phase 1 and is doing phase 2 work. All tasks completed.
死锁 什么是死锁?
死锁是指两个或者多个进程互相等待对方释放资源 ,从而导致所有进程或线程都无法继续执行的现象。
死锁的必要条件?
互斥 :资源一次只能被一个资源占用。请求和保持 :线程已经持有至少一个资源,同时又申请新的资源,而新资源已经被其他线程占有。不剥夺 :已经获得的资源在未使用完之前,不能被强行剥夺,只能在使用完毕后自己释放。循环等待 :存在一个线程循环的链,链中的每个线程都持有下一个线程需要的资源。 要想解决死锁这一问题,就需要避开上面四点,可以使用高级同步工具(如 unique_lock
等)。💻代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <iostream> #include <thread> #include <mutex> std::mutex mtx1; std::mutex mtx2; void thread1 () { std::lock_guard<std::mutex> lock1 (mtx1) ; std::cout << "Thread 1 acquired mtx1\n" ; std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::lock_guard<std::mutex> lock2 (mtx2) ; std::cout << "Thread 1 acquired mtx2\n" ; } void thread2 () { std::lock_guard<std::mutex> lock2 (mtx2) ; std::cout << "Thread 2 acquired mtx2\n" ; std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::lock_guard<std::mutex> lock1 (mtx1) ; std::cout << "Thread 2 acquired mtx1\n" ; } int main () { std::thread t1 (thread1) ; std::thread t2 (thread2) ; t1.join (); t2.join (); std::cout << "All threads completed.\n" ; return 0 ; }
同时锁定两个锁
1 2 3 4 5 6 7 8 9 10 11 12 13 void thread1 () { lock (mtx1, mtx2); std::lock_guard<std::mutex> lock1 (mtx1, std::adopt_lock) ; std::lock_guard<std::mutex> lock2 (mtx2, std::adopt_lock) ; std::cout << "Thread 1 acquired both locks\n" ; } void thread2 () { std::lock (mtx1, mtx2); std::lock_guard<std::mutex> lock1 (mtx1, std::adopt_lock) ; std::lock_guard<std::mutex> lock2 (mtx2, std::adopt_lock) ; std::cout << "Thread 2 acquired both locks\n" ; }
统一锁定顺序
1 2 3 4 5 6 7 8 9 10 11 void thread1 () { std::lock_guard<std::mutex> lock1 (mtx1) ; std::lock_guard<std::mutex> lock2 (mtx2) ; std::cout << "Thread 1 acquired both locks\n" ; } void thread2 () { std::lock_guard<std::mutex> lock1 (mtx1) ; std::lock_guard<std::mutex> lock2 (mtx2) ; std::cout << "Thread 2 acquired both locks\n" ; }
使用 std::unique_lock
和 std::defer_lock
1 2 3 4 5 6 7 8 9 10 11 12 13 void thread1 () { std::unique_lock<std::mutex> lock1 (mtx1, std::defer_lock) ; std::unique_lock<std::mutex> lock2 (mtx2, std::defer_lock) ; std::lock (lock1, lock2); std::cout << "Thread 1 acquired both locks\n" ; } void thread2 () { std::unique_lock<std::mutex> lock1 (mtx1, std::defer_lock) ; std::unique_lock<std::mutex> lock2 (mtx2, std::defer_lock) ; std::lock (lock1, lock2); std::cout << "Thread 2 acquired both locks\n" ; }
异步编程 概念 什么是异步?
允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞或等待这些操作完成。
异步编程的优点?为什么选择异步编程?
提高性能 :通过并发执行多个任务,异步编程可以更高效地利用 CPU 资源。提高响应速度 :异步编程时程序在等待某些操作完成时继续响应用户输入,提高用户体验。简化 I/O 操作 :异步编程非常适合处理 I/O 密集型操作,比如文件读取、网络请求等。🚩async 基本概念 什么是 async
?
async
是一个函数模板,包含在头文件 <future>
中,用于启动一个异步任务。 它接受一个可调用对象作为参数,并在一个单独的线程上异步执行该对象。std::async
自动管理异步任务的生命周期 ,并返回一个 std::future
对象,该对象可用于获取异步操作的结果。
什么是 future
?
std::future
是一个模板类,用于表示异步操作的结果。std::future
对象需要借助 std::async
、std::promise
、std::packaged_task
结合使用。
💻代码实现 🚀future
的基本成员函数:
成员函数 作用 get()
获取异步任务的返回值。如果任务尚未完成,会阻塞直到任务完成。 wait()
等待异步任务完成,但不获取返回值。 wait_for()
等待异步任务完成,直到指定的时间。 如果任务在指定时间内完成,返回 std::future_status::ready
; 如果任务未完成,返回 std::future_status::timeout
。 wait_until()
等待异步任务完成,直到指定的时间点。 如果任务在指定时间点前完成,返回 std::future_status::ready
; 如果任务未完成,返回 std::future_status::timeout
。 valid()
检查 std::future
对象是否持有有效的异步任务。 如果对象有效,返回 true
;否则返回 false
。 share()
返回一个 std::shared_future
对象,允许多个线程共享异步任务的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <future> using namespace std; int myfunc (int a, int b) { cout << "This is myfunc" << " " ; return a + b; } int main () { future<int > MyFunc = async (myfunc, 1 , 2 ); cout << "MyFunc == " << MyFunc.get () << " test" << endl; return 0 ; }
✅注意:
get()
等待异步操作完成 后,再获取结果。如果异步操作尚未完成,调用 get()
的线程将被阻塞,直到操作完成。一旦调用 get()
,它将返回异步操作的结果,并且 future
对象将变为无效状态 ,不能再调用 get()
。一个 future
对象只能调用一次 get()
。 wait()
等待异步操作完成,但不获取结果。与 get()
不同之处在于,调用 wait()
后,future
对象仍然有效,可以继续使用 get()
来获取结果。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <iostream> #include <future> using namespace std; int myfunc (int a, int b) { cout << "This is myfunc " ; return a + b; } int main () { future<int > MyFunc = async (myfunc, 1 , 2 ); MyFunc.wait (); cout << "MyFunc == " << MyFunc.get () << " test" << endl; return 0 ; }
三种启动策略 启动策略 是否创建新线程 任务执行时机 适用场景 std::launch::async
是 立即执行,创建新线程 需要并发执行的任务,任务可以立即开始执行。 std::launch::deferred
否 延迟执行,调用 get()
或 wait()
时 不需要立即执行的任务,任务可以在需要时才执行。 默认 由实现决定 由实现决定 不确定任务是否需要立即执行,由实现自动选择执行方式。
launch::async
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <thread> #include <future> using namespace std; int myfunc (int a, int b) { cout << "This is myfunc: " << this_thread::get_id () << endl; return a + b; } int main () { future<int > MyFunc = async (launch::async, myfunc, 1 , 2 ); cout << "This is main: " << this_thread::get_id () << endl; cout << "This is from myfunc: " << MyFunc.get () << endl; return 0 ; }
这里由于策略原因,多句话立即同时执行了,async
创建的新线程和主线程同时进行。 加上 wait()
即可观察正确的线程 id:
1 2 3 4 5 6 7 8 9 10 11 int main () { future<int > MyFunc = async (launch::async, myfunc, 1 , 2 ); MyFunc.wait (); cout << "This is main: " << this_thread::get_id () << endl; cout << "This is from myfunc: " << MyFunc.get () << endl; return 0 ; }
launch::deferred
: 需要注意的是,可能隐藏潜在的并发问题,因为任务可能在查询结果时才执行。
1 2 3 4 5 6 7 8 9 10 int main () { future<int > MyFunc = async (launch::deferred, myfunc, 1 , 2 ); cout << "This is main: " << this_thread::get_id () << endl; cout << "This is from myfunc: " << MyFunc.get () << endl; return 0 ; }
加入 wait
方法:
1 2 3 4 5 6 7 8 9 10 11 int main () { future<int > MyFunc = async (launch::deferred, myfunc, 1 , 2 ); MyFunc.wait (); cout << "This is main: " << this_thread::get_id () << endl; cout << "This is from myfunc: " << MyFunc.get () << endl; return 0 ; }
可以看出,launch::deferred
策略并没有创建新的线程,并且在调用 get
方法时才执行 myfunc()
🚩promise 基本概念 promise 是什么?
std::promise
是一个用于设置异步操作结果的机制。它允许在一个线程中设置值或异常 ,然后在另一个线程中通过 std::future
对象检索这些值或异常。 通常与 std::async
、std::packaged_task
或 std::thread
结合使用。需要头文件 <future>
。
一个 promise 对象可以创建多个 future 对象。
💻代码实现 设置值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void AsyncTask (promise<int > prom) { std::this_thread::sleep_for (std::chrono::seconds (1 )); prom.set_value (3 ); } int main () { std::promise<int > prom; std::future<int > FutureObj = prom.get_future (); std::thread t (AsyncTask, std::move(prom)) ; cout << "The result is: " << FutureObj.get () << endl; t.join (); return 0 ; }
promise
比 async
更加灵活,拥有更多功能。
成员函数 作用 get_future()
获取与 std::promise
关联的 std::future
对象。 set_value()
设置 std::promise
的值。 set_exception()
设置 std::promise
的异常。 set_value_at_thread_exit()
在线程退出时设置值。 set_exception_at_thread_exit()
在线程退出时设置异常。
处理异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <iostream> #include <thread> #include <future> using namespace std; void producer (std::promise<int >& promise) { try { throw std::runtime_error ("An error occurred!" ); } catch (const std::exception& e) { promise.set_exception (std::current_exception ()); } } void consumer (std::future<int >& future) { try { int value = future.get (); std::cout << "Received value: " << value << std::endl; } catch (const std::exception& e) { std::cout << "Exception caught: " << e.what () << std::endl; } } int main () { std::promise<int > promise; std::future<int > future = promise.get_future (); std::thread producer_thread (producer, std::ref(promise)) ; std::thread consumer_thread (consumer, std::ref(future)) ; producer_thread.join (); consumer_thread.join (); return 0 ; }
注意事项 只能设置一次值或异常 :std::promise
只能设置一次值或异常。如果多次调用 set_value()
或 set_exception()
,会导致未定义行为。线程安全 :std::promise
和 std::future
是线程安全的,但需要确保在适当的时机调用 set_value()
和 get()
。不能被直接复制 :std::promise
对象不能被复制 ,只能通过 std::move
转移控制权。1 2 3 4 5 6 7 8 9 10 11 12 std::promise<int > promise; std::promise<int > promise2 = std::move (promise); void MyFunc (promise<int > prom) { prom.set_exception (std::current_exception ()); } std::thread t (MyFunc, std::move(promise)) ; void MyFunc (promise<int >& prom) { prom.set_exception (std::current_exception ()); } std::thread t (MyFunc, std::ref(promise)) ;
🚩packaged_task 基本概念 什么是 packaged_task
?
packaged_task
是一个模板类,用于封装可调用对象,并将任务的执行与结果的获取分离 。同样需要头文件 <future>
。
💻代码实现 在另一个线程中执行任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int main () { std::packaged_task<int (int ,int ) > task ([](int a, int b) { return a + b; }) ; std::future<int > result = task.get_future (); std::thread t (std::move(task), 1 , 2 ) ; t.join (); cout << result.get () << endl; return 0 ; } int Multiply (int a, int b) { return a * b; } int main () { std::packaged_task<decltype (Multiply) > task (Multiply) ; }
重置对象,获取新的 future
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int main () { std::packaged_task<int (int ,int ) > task ([](int a, int b) { return a + b; }) ; std::future<int > result = task.get_future (); task (1 , 2 ); cout << result.get () << endl; task.reset (); result = task.get_future (); task (3 , 4 ); cout << result.get () << endl; return 0 ; }
成员函数 作用 get_future()
获取与 std::packaged_task
关联的 std::future
对象。 operator()
执行封装的可调用对象,并将结果存储在 std::future
中。 make_ready_at_thread_exit()
在线程退出时执行任务,并将结果存储在 std::future
中。 reset()
重置 std::packaged_task
,允许重新执行任务。
注意事项 std::packaged_task
使用 operator()
执行任务,并将结果存储在 std::future
中。不可复制 : packaged_task
可移动,但不可复制。因此,构建 thread
中值传递 std::packaged_task
时需要使用 std::move
(引用传递用 std::ref
)。只能执行一次任务 :std::packaged_task
只能执行一次任务。如果需要重新执行任务,应使用 reset()
重置 std::packaged_task
。线程安全 :std::packaged_task
和 std::future
是线程安全的,但需要确保在适当的时机调用 get_future()
和 get()
。三种异步工具的比较 特性 async promise packaged_task 代码写法 简单,自动管理线程和任务执行 复杂,手动管理线程和任务执行 复杂,手动管理线程和任务执行 成员函数 自动设置值或异常 get_future()
、set_value()
、set_exception()
get_future()
、operator()
、reset()
应用场景 简单异步任务 复杂线程间通信,灵活性最高 任务调度、线程池、封装可调用对象
共同点是,它们均为 C++11 新特性,包含在头文件 <future>
中,并且都需要搭配 future
对象实现功能。
之间的关系 以下内容摘自 Thread、Future、Promise、Packaged_task、Async之间有什么关系?
packaged_task
≈ promise
+ functionasync
≈ thread
+ packaged_task
通过 promise
的 get_future()
可拿到 future
对象 通过 future
对象的 share()
可拿到 shared_future
对象 promise
只能 set_value
,不太好执行复杂的逻辑,有执行函数+阻塞的需求时,就可以考虑使用 packaged_task
。
shared_future
: 普通的 future
有个特点,它不能拷贝,只能移动,这就意味着只能有一个线程一个实例可以通过 get()
拿到对应的结果。 如果想要多个线程多个实例拿到结果,就可以使用 shared_future
,那怎么拿到 shared_future
,可以通过普通 future
的 shared()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <iostream> #include <future> using namespace std; int main () { std::promise<int > prom; future<int > fu = prom.get_future (); shared_future<int > shared_fu = fu.share (); future<void > f1 = std::async (std::launch::async, [shared_fu]() { std::cout << shared_fu.get () << std::endl; }); future<void > f2 = std::async (std::launch::async, [shared_fu]() { std::cout << shared_fu.get () << std::endl; }); prom.set_value (102 ); f1.get (); f2.get (); return 0 ; }
线程池 基本概念 什么是线程池?作用是什么?
线程池是指一种预先创建一组线程 的机制,这些线程在应用程序启动时就已经创建好,等待执行任务。 每当有新的任务需要执行时,线程池会从线程集合中分配一个空闲线程来执行该任务,而不是每次都重新创建和销毁线程。(池化思想,如 epoll、SQL)
为什么使用线程池?
提高性能 :创建和销毁线程开销较大 ,线程池通过重用的线程,减少了这种开销,提高程序性能。控制并发量 :线程池允许限制并发线程的数量 ,防止系统因创建过多线程而出现资源耗尽的问题(如 CPU 过载、内存不足)。简化线程管理 :使用线程池可以避免手动管理线程的生命周期 ,减少代码复杂性。线程池通常还提供了任务排队和调度 的功能,使得多线程编程更加容易。线程池的使用场景?
服务器应用 :比如 Web 服务器,处理每个客户端请求时不必为每个请求创建一个新线程,而是从线程池中取出线程来处理请求。高性能计算 异步任务处理 一个线程池应该包含什么?
线程池管理器(ThreadPool Manager) :负责创建并管理线程池,包括线程的创建、销毁、任务分配等。工作线程(Worker Thread) :线程池中的线程,负责执行具体的任务。这些线程通常是预先创建好的,并且在任务执行完毕后不会立即销毁,而是返回到线程池中等待下一个任务。任务队列(Task Queue) :用于存储待执行的任务。当有新的任务提交到线程池时,任务会被放入任务队列中,等待工作线程来执行。任务(Task) :需要执行的工作单元,通常是一个函数或方法。 图例:💻代码实现 来自 threadpool-lzpong
threadpool.h
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 #pragma once #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <atomic> #include <future> #include <stdexcept> namespace std{ #define THREADPOOL_MAX_NUM 16 class threadpool { unsigned short _initSize; using Task = function<void ()>; vector<thread> _pool; queue<Task> _tasks; mutex _lock; #ifdef THREADPOOL_AUTO_GROW mutex _lockGrow; #endif condition_variable _task_cv; atomic<bool > _run{ true }; atomic<int > _idlThrNum{ 0 }; public : inline threadpool (unsigned short size = 4 ) { _initSize = size; addThread (size); } inline ~threadpool () { _run=false ; _task_cv.notify_all (); for (thread& thread : _pool) { if (thread.joinable ()) thread.join (); } } public : template <class F, class ... Args> auto commit (F&& f, Args&&... args) -> future<decltype (f(args...)) > { if (!_run) throw runtime_error ("commit on ThreadPool is stopped." ); using RetType = decltype (f (args...)); auto task = make_shared<packaged_task<RetType ()>>( bind (forward<F>(f), forward<Args>(args)...) ); future<RetType> future = task->get_future (); { lock_guard<mutex> lock{ _lock }; _tasks.emplace ([task]() { (*task)(); }); } #ifdef THREADPOOL_AUTO_GROW if (_idlThrNum < 1 && _pool.size () < THREADPOOL_MAX_NUM) addThread (1 ); #endif _task_cv.notify_one (); return future; } template <class F > void commit2 (F&& task) { if (!_run) return ; { lock_guard<mutex> lock{ _lock }; _tasks.emplace (std::forward<F>(task)); } #ifdef THREADPOOL_AUTO_GROW if (_idlThrNum < 1 && _pool.size () < THREADPOOL_MAX_NUM) addThread (1 ); #endif _task_cv.notify_one (); } int idlCount () { return _idlThrNum; } int thrCount () { return _pool.size (); } #ifndef THREADPOOL_AUTO_GROW private :#endif void addThread (unsigned short size) {#ifdef THREADPOOL_AUTO_GROW if (!_run) throw runtime_error ("Grow on ThreadPool is stopped." ); unique_lock<mutex> lockGrow{ _lockGrow }; #endif for (; _pool.size () < THREADPOOL_MAX_NUM && size > 0 ; --size) { _pool.emplace_back ( [this ]{ while (true ) { Task task; { unique_lock<mutex> lock{ _lock }; _task_cv.wait (lock, [this ] { return !_run || !_tasks.empty (); }); if (!_run && _tasks.empty ()) return ; _idlThrNum--; task = move (_tasks.front ()); _tasks.pop (); } task (); #ifdef THREADPOOL_AUTO_GROW if (_idlThrNum>0 && _pool.size () > _initSize) return ; #endif { unique_lock<mutex> lock{ _lock }; _idlThrNum++; } } }); { unique_lock<mutex> lock{ _lock }; _idlThrNum++; } } } }; } #endif
main.cpp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 #include "threadpool.h" #include <iostream> #include <windows.h> void fun1 (int slp) { printf (" hello, fun1 ! %d\n" ,std::this_thread::get_id ()); if (slp>0 ) { printf (" ======= fun1 sleep %d ========= %d\n" ,slp, std::this_thread::get_id ()); std::this_thread::sleep_for (std::chrono::milliseconds (slp)); } } struct gfun { int operator () (int n) { printf ("%d hello, gfun ! %d\n" ,n, std::this_thread::get_id () ); return 42 ; } }; class A { public : static int Afun (int n = 0 ) { std::cout << n << " hello, Afun ! " << std::this_thread::get_id () << std::endl; return n; } static std::string Bfun (int n, std::string str, char c) { std::cout << n << " hello, Bfun ! " << str.c_str () <<" " << (int )c <<" " << std::this_thread::get_id () << std::endl; return str; } }; int main () { try { std::threadpool executor{ 50 }; A a; std::future<void > ff = executor.commit (fun1,0 ); std::future<int > fg = executor.commit (gfun{},0 ); std::future<int > gg = executor.commit (a.Afun, 9999 ); std::future<std::string> gh = executor.commit (A::Bfun, 9998 ,"mult args" , 123 ); std::future<std::string> fh = executor.commit ([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id () << std::endl; return "hello,fh ret !" ; }); std::cout << " ======= sleep ========= " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (std::chrono::microseconds (900 )); for (int i = 0 ; i < 50 ; i++) { executor.commit (fun1,i*100 ); } std::cout << " ======= commit all ========= " << std::this_thread::get_id ()<< " idlsize=" <<executor.idlCount () << std::endl; std::cout << " ======= sleep ========= " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (std::chrono::seconds (3 )); ff.get (); std::cout << fg.get () << " " << fh.get ().c_str ()<< " " << std::this_thread::get_id () << std::endl; std::cout << " ======= sleep ========= " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (std::chrono::seconds (3 )); std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id () << std::endl; executor.commit (fun1,55 ).get (); std::cout << "end... " << std::this_thread::get_id () << std::endl; std::threadpool pool (4 ) ; std::vector< std::future<int > > results; for (int i = 0 ; i < 8 ; ++i) { results.emplace_back ( pool.commit ([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "world " << i << std::endl; return i*i; }) ); } std::cout << " ======= commit all2 ========= " << std::this_thread::get_id () << std::endl; for (auto && result : results) std::cout << result.get () << ' ' ; std::cout << std::endl; return 0 ; } catch (std::exception& e) { std::cout << "some unhappy happened... " << std::this_thread::get_id () << e.what () << std::endl; } }
C++11 语言细节:
using Task = function<void()>
是类型别名,简化了 typedef
的用法。function<void()>
可以认为是一个函数类型,接受任意原型是 void()
的函数,或是函数对象,或是匿名函数。void()
意思是不带参数,没有返回值。pool.emplace_back([this]{...})
和 pool.push_back([this]{...})
功能一样,只不过前者性能会更好。pool.emplace_back([this]{...})
构造了一个线程对象,执行函数是 Lambda匿名函数。所有对象的初始化方式均采用了 {}
,而不再使用 ()
方式,因为风格不够一致且容易出错。 匿名函数: [this]{...}
不多说。[]
是捕捉器,this
是引用域外的变量 this指针,内部使用死循环, 由 cv_task.wait(lock,[this]{...})
来阻塞线程。 delctype(expr)
用来推断 expr
的类型,和 auto
是类似的,相当于类型占位符,占据一个类型的位置;auto f(A a, B b) -> decltype(a+b)
是一种用法,不能写作 decltype(a+b) f(A a, B b)
。commit
方法是不是略奇葩!可以带任意多的参数,第一个参数是 f,后面依次是函数 f 的参数(注意:参数要传struct/class的话,建议用pointer,小心变量的作用域 )!可变参数模板是 c++11 的一大亮点。commit
直接使用智能调用 stdcall
函数,但有两种方法可以实现调用类成员,一种是使用 bind:commit(std::bind(&Dog::sayHello, &dog));
一种是用 mem_fn: commit(std::mem_fn(&Dog::sayHello), &dog);
。make_shared()
用来构造 shared_ptr
智能指针。用法大体是 shared_ptr p = make_shared(4)
然后 *p == 4
。智能指针的好处就是自动 delete。bind
函数,接受函数 f 和部分参数,返回currying后的匿名函数,譬如 bind(add, 4)
可以实现类似 add(4)
的函数。forward()
函数,类似于 move()
函数,后者是将参数右值化,前者是不改变最初传入的类型的引用类型(左值还是左值,右值还是右值)。packaged_task
就是任务函数的封装类,通过 get_future()
获取 future
,然后通过 future
可以获取函数的返回值(future.get()
);packaged_task
本身可以像函数一样调用。queue
是队列类, front()
获取头部元素, pop()
移除头部元素;back()
获取尾部元素,push()
尾部添加元素。lock_guard
是 mutex
的 stack 封装类,构造的时候 lock()
,析构的时候 unlock()
,是 C++ RAII 的 idea。condition_variable cv;
条件变量,需要配合 unique_lock
使用;unique_lock
相比 lock_guard
的好处是:可以随时 unlock()
和 lock()
。 cv.wait()
之前需要持有 mutex,wait()
本身会解锁,如果条件满足则会重新持有锁。最后线程池析构的时候,join()
可以等待任务都执行完再结束,很安全。 简单写法 ThreadPool.hpp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 #pragma once #include <iostream> #include <thread> #include <future> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <vector> #include <type_traits> class ThreadPool { public : explicit ThreadPool (int ThreadNums) ; ~ThreadPool (); template <typename F, typename ...Arg> auto EnterQueues (F&& f, Arg&&... arg) -> std::future<typename std::invoke_result<F, Arg...>::type> ; private : void worker () ; bool IsStop; std::condition_variable cv; std::mutex mtx; std::vector<std::thread> workers; std::queue<std::function<void ()>> MyQueue; }; ThreadPool::ThreadPool (int ThreadNums) : IsStop (false ) { for (size_t i = 0 ; i < ThreadNums; ++i) { workers.emplace_back ([this ]() { this ->worker (); }); } } ThreadPool::~ThreadPool () { { std::unique_lock<std::mutex> lock (mtx) ; IsStop = true ; } cv.notify_all (); for (std::thread& OneThread : workers) { OneThread.join (); } } template <typename F, typename ...Arg> auto ThreadPool::EnterQueues (F&& f, Arg&&... arg) -> std::future<typename std::invoke_result<F, Arg...>::type> { using functype = typename std::result_of<F (Arg...)>::type; auto task = std::make_shared<std::packaged_task<functype ()>>( std::bind (std::forward<F>(f), std::forward<Arg>(arg)...) ); std::future<functype> rsFuture = task->get_future (); { std::lock_guard<std::mutex> lockGuard (mtx) ; if (IsStop) { throw std::runtime_error ("出错:线程池已经停止了" ); } MyQueue.emplace ([task]() { (*task)(); }); } cv.notify_one (); return rsFuture; } void ThreadPool::worker () { while (true ) { std::function<void ()> task; { std::unique_lock<std::mutex> uniqueLock (mtx) ; cv.wait (uniqueLock, [this ]() { return !this ->MyQueue.empty () || this ->IsStop; }); if (IsStop && MyQueue.empty ()) { return ; } task = std::move (MyQueue.front ()); MyQueue.pop (); } task (); } }
main.cpp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <iostream> #include "ThreadPool.hpp" int main () { system ("chcp 65001" ); ThreadPool MyPool (4 ) ; for (int i = 0 ; i < 20 ; ++i) { std::future<int > MyFuture = MyPool.EnterQueues ([](int a, int b) -> int { std::cout << "当前线程: " << std::this_thread::get_id () << std::endl; return a + b; }, 10 * i, 10 * i); std::cout << "thread rs: " << MyFuture.get () << std::endl; } return 0 ; }
✅注:关于 hpp 文件 HPP(Header Plus Plus)是C++中的一种特殊头文件格式,它将类的声明和实现代码放在同一个文件中。与传统的 .h
和 .cpp
文件不同,HPP文件允许在头文件中直接包含实现代码,从而减少了代码文件的数量和编译次数。这种文件格式特别适合用于编写模板类和开源库,因为它简化了代码的管理和使用。
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 thread rs: 当前线程: 2 0 thread rs: 当前线程: 3 20 thread rs: 当前线程: 4 40 thread rs: 当前线程: 5 60 thread rs: 当前线程: 2 80 thread rs: 当前线程: 3 100 thread rs: 当前线程: 4 120 thread rs: 当前线程: 5 140 thread rs: 当前线程: 2 160 thread rs: 当前线程: 3 180 thread rs: 当前线程: 4 200 thread rs: 当前线程: 5 220 thread rs: 当前线程: 2 240 thread rs: 当前线程: 3 260 thread rs: 当前线程: 4 280 thread rs: 当前线程: 5 300 thread rs: 当前线程: 2 320 thread rs: 当前线程: 3 340 thread rs: 当前线程: 4 360 thread rs: 当前线程: 5 380