从零实现一个轻量级C++线程池
一、引言
本文目标:从零实现一个轻量、可用、可扩展的 C++ 线程池。
关键技术点:
- std::thread
- std::mutex / std::unique_lock
- std::condition_variable
- std::function / future / packaged_task / bind
二、什么是线程池
线程池是一种用于管理和复用线程的并发编程模型。它的核心思想是预先创建一组工作线程,并将它们放入一个“池”中进行管理。当有新任务需要处理时,不再创建和销毁线程,而是直接将任务提交给线程池,由池中空闲的线程来执行。

三、为什么需要线程池
频繁地创建和销毁线程会带来显著的系统开销,包括内存分配、切换到内核态等。线程池通过复用线程,有效解决了这个问题,并带来了以下优势:
- 降低资源开销:避免了频繁创建和销毁线程的开销,提升了系统性能。
- 提高响应速度:任务到达时无需等待线程创建,可以立即被分配给空闲线程执行。
- 提高线程的可管理性:线程是稀缺资源,无限制地创建线程会消耗大量系统内存,甚至导致内存溢出。线程池可以对线程数量进行统一分配、调优和监控。
四、线程池的核心组成
- 工作线程集合 (Worker Threads):池中预先创建好的一组线程,它们会持续运行,不断从任务队列中获取并执行任务。
- 任务队列 (Task Queue):一个线程安全的队列,用于存放所有待执行的任务。它作为任务提交者和工作线程之间的缓冲区。
- 同步机制 (Synchronization):
- 互斥锁 (Mutex):用于保护任务队列,确保在多线程环境下对队列的访问是安全的,防止竞态条件。
- 条件变量 (Condition Variable):用于工作线程的等待和唤醒。当任务队列为空时,工作线程会进入等待状态;当有新任务加入时,会通知(唤醒)一个或所有等待的线程。
- 任务接口 (Task Interface):一个用于提交任务的方法,允许外部将各种类型的任务(函数、Lambda表达式等)提交到线程池中。
五、C++线程池的实现
因为在代码中使用到了一些异步编程技术,所以先做个简单的介绍。
std::condition_variable——条件变量,它是一种线程间的同步机制,当没有任务时,它会阻塞工作线程。生产者线程将任务加入队列后,会通过同一个条件变量唤醒在该条件变量下等待的线程。这么做的好处是,避免了工作线程循环检测队列中有没有任务带来的CPU开销。代码中用到的接口主要有3个:
void wait (unique_lock<mutex>& lck, Predicate pred);
第一个参数是互斥锁,该函数内部会将锁释放,避免线程休眠时持有锁,导致其他活跃线程拿不到锁。
第二个参数是一个可调用对象,这个可调用对象必须能够返回true或false。而且,这个可调用对象会循环的执行,直到它的返回结果是true。
pred可调用对象返回true后,线程被唤醒,重新获取锁,向下执行。
void notify_one() noexcept;
唤醒一个在该条件变量下等待的线程。
void notify_all() noexcept;
唤醒所有在该条件变量下等待的线程。
std::future,用来获取异步执行的结果。如果没有std::future,在C++中想要获取其他线程的返回值的话,我们需要将这个返回值写入全局变量,这样其他的线程才可以看到。全局变量是共享资源,多线程场景下是需要加锁保护的,std::future封装了这些底层的细节,提供了一种同步获取结果的方式。
get()方法
会阻塞调用线程,直到拿到结果才继续往下执行,所以说它是一种同步获取结果的方式。
std::function,是一种函数包装器,统一函数的类型。为什么这么说?函数指针、lambda表达式虽然都是可调用对象,但是它们的类型是完全不同的,所以就不能够放到同一个容器中统一管理。但是,经过function的包装后,它们就有了统一的类型。下面在实现线程池中就可以看到如何使用。
std::packaged_task,是一种任务包装器,用来获取异步执行任务的结果的。它内部有一个关联的future对象,函数的返回值会被写入到这个future对象中,同时它还提供了一个获取这个future对象的接口get_future(),其他线程拿到这个关联的future后,就可以调用get()方法获取异步执行的结果。顺便说一下,packaged_task内部重载了(),可以直接通过packaged_task对象执行它包装的任务。
std::bind,绑定函数参数,并返回一个可调用对象。比如,函数Add(int a, int b)原本是需要传入两个参数的,是以这种形式 Add(10, 20) 调用的。但是,经过std::bind绑定后,例如 auto func = std::bind(Add, 10, 20); 在调用的时候,就不用传参了,直接这样 func() 调用,效果是一样的。为什么要绑定参数,在下面的线程池实现中,就很明白了。
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <future>
#include <mutex>
#include <functional>
#include <chrono>
#include <type_traits>
#include <condition_variable>
class ThreadPool {
public:
ThreadPool(size_t thread_num = 4):
_thread_num(thread_num),
_start(false),
_stop(false)
{}
~ThreadPool(){
if (_start && !_stop) stop();
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
void start() {
std::unique_lock<std::mutex> lock(_mutex);
if (_start) return;
_workers.reserve(_thread_num);
for (size_t i = 0; i < _thread_num; i++) {
_workers.emplace_back(std::thread([this](){
work_loop();
}));
}
_start = true;
}
void stop() {
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_start || _stop) return;
// 在join回收线程之前,必须先将_stop置为true,
// 否则工作线程可能会一直阻塞在条件变量上,导致无法正常退出,甚至会导致程序崩溃
_stop = true;
}
_cond.notify_all();
for (auto& worker : _workers) {
if (worker.joinable()) worker.join();
}
}
template<class F, class... Args>
auto submit(F&& f, Args&&... args)->std::future<std::invoke_result_t<F, Args...>> {
using return_type = std::invoke_result_t<F, Args...>;
// 绑定函数参数,并交给任务包装器
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 获取关联的future
std::future<return_type> res = task->get_future();
// 加锁+入队列
{
std::unique_lock<std::mutex> lock(_mutex);
if (_stop || !_start) throw std::runtime_error("线程池未启动!");
_tasks.emplace([task](){(*task)();});
}
_cond.notify_one();
return res;
}
private:
void work_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [this](){
return _stop || !_tasks.empty();
});
if (_stop && _tasks.empty()) return;
task = std::move(_tasks.front());
_tasks.pop();
}
task();
}
}
private:
std::vector<std::thread> _workers;
std::queue<std::function<void()>> _tasks;
std::mutex _mutex;
std::condition_variable _cond;
size_t _thread_num;
bool _start;
bool _stop;
};
int add(int a, int b) {
return a + b;
}
void print() {
std::cout << "-------------------print-------------------" << std::endl;
std::cout << "Hello World!" << std::endl;
}
int main() {
ThreadPool pool(4);
pool.start();
std::cout << "==================ThreadPoolTest==================" << std::endl;
pool.submit([](){
std::cout << "-------------------lambda-------------------" << std::endl;
std::cout << "this is a lambda!" << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(3));
auto ret1 = pool.submit(add, 10, 20);
std::cout << "-------------------add-------------------" << std::endl;
std::cout << "10 + 20 = " << ret1.get() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
pool.submit(print);
pool.stop();
return 0;
}运行结果:

在上述代码实现中,用到了C++17的语法。
这里我解释一下队列中的任务参数为空,并且返回值为void,但是带返回值的add函数为什么可以插入队列中。
通过std::bind绑定函数的所有参数,所有就做到了“无参”。
通过std::packaged_task获取函数的返回值,所以不担心返回值拿不到。
通过lambda封装一层,不管原本是否有参数,是否有返回值,加入队列中的任务都是满足无参和无返回值的。
这三个操作组合在一起,不管函数有无返回值,都能适配到任务队列中。
另外,在绑定参数时,不仅对函数参数进行了完美转发,还对函数本身进行了完美转发。对参数进行完美转发是因为要保持它本身的左右值属性。对函数进行完美转发是因为,我们在使用时,可能会直接在submit函数传入lambda,这时候lambda它是一个右值,采用完美转发可以保持它的右值属性,触发移动语义,也就避免了std::bind内部对它进行拷贝。
以上就是从零实现一个轻量级C++线程池的详细内容,更多关于轻量级C++线程池实现的资料请关注脚本之家其它相关文章!
相关文章
QT自定义QTextEdit实现大数据的实时刷新显示功能实例
TextEdit是我们常用的Qt控件,用来显示文本信息,下面这篇文章主要给大家介绍了关于QT自定义QTextEdit实现大数据的实时刷新显示功能的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下2022-05-05
如何用c++表驱动替换if/else和switch/case语句
本文将介绍使用表驱动法,替换复杂的if/else和switch/case语句,想了解详细内容,请看下文2021-08-08


最新评论