C++线程池实现代码

 更新时间:2021年12月30日 09:34:17   作者:mrbone11  
C++11中,线程我们可以理解为对应一个thread对象,任务可以理解为要执行的函数,通常是耗时的函数。线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能

前言

这段时间看了《C++并发编程实战》的基础内容,想着利用最近学的知识自己实现一个简单的线程池。

什么是线程池

线程池(thread pool)是一种线程使用模式。线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着管理器分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性。线程池不仅能够保证内核的充分利用,还能防止过分调度。

思路

个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任务,从而避免线程重复创建和销毁带来的额外开销。
C++11中,线程我们可以理解为对应一个thread对象,任务可以理解为要执行的函数,通常是耗时的函数。
我们的任务多少和顺序并非固定的,因此需要有一个方法能添加指定的任务,任务存放的地方应该是一个任务队列,因为我们的线程数量有限,当任务很多时同时执行的任务数量也有限,因此任务需要排队,遵循先来后到的原则。
当要执行一个任务时,意味着先将这个任务从队列取出,再执行相应任务,而“取出”动作的执行者是线程池中的线程,这意味我们的队列需要考虑多个线程在同一队列上执行“取出”操作的问题,实际上,取出任务操作和添加任务操作也不能同时进行,否则会产生竞争条件;另一方面,程序本身如果就是多线程的,多个线程同时添加任务的操作也应该是互斥的。
当没有任务可以执行时,所有线程应该什么也不做,当出现了一个任务时,应该将这个任务分配到任一线程中执行。实现上我们固然可以使用轮询的方式判断当前队列是否有任务,有则取出(即使加了互斥锁似乎也无法避免竞争条件?),但这样会消耗无谓的CPU资源,写轮询周期难以选取。其实,我们可以使用condition_variable代替轮询。
上述任务的创建和取出其实就是经典的生产者消费者模型。
我们将上面的内容都封装在一个类中,取名ThreadPool,用户可以在构造ThreadPool对象时指定线程池大小,之后可以随时添加要执行的任务。

实现

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(packaged_task<void()> &&task);

private:
	vector<thread*> threadPool;
	deque<packaged_task<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(packaged_task<void()> &&task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		packaged_task<void()> task=std::move(taskQueue.front());
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

这里我使用packaged_task作为任务,每当添加一个任务,就调用condition_variable::notify_one方法,调用condition_variable::wait的线程就会被唤醒,并检查等待条件。这里有个小细节是notify_one在解锁后执行,这样避免线程唤醒后还要等待互斥锁解锁。
使用示例:

void Task1()
{
	Sleep(1000);
	cout << "Task1"<<endl;
}

void Task5()
{
	Sleep(5000);
	cout << "Task5" << endl;
}

class Worker
{
public:
	void run();
};

void Worker::run()
{
	cout << "Worker::run start" << endl;
	Sleep(5000);
	cout << "Worker::run end" << endl;
}

int main()
{
	ThreadPool pool(2);
	pool.pushTask(packaged_task<void()>(Task5));
	pool.pushTask(packaged_task<void()>(Task1));
	pool.pushTask(packaged_task<void()>(Task1));
	Worker worker;
	pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker)));
	pool.pushTask(packaged_task<void()>([&](){worker.run();}));
	Sleep(20000);
}

这个线程池目前有几个缺点:

  • 只能传入调用形式为void()形式的函数或可调用对象,不能返回任务执行的值,只能通过其他方式同步任务执行结果(如果有)
  • 传入参数较为复杂,必须封装一层packaged_task,调用对象方法时需要使用bind或者lambda表达式的方法封装

以上缺点在当前版本的实现不予解决,日后另写博文优化。
2021/12/29 更新之一
事实上,我们只要将packaged_task改为funtion模板类,就可以简化我们的调用参数:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(function<void()> task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

调用代码改为如下:

ThreadPool pool(2);

pool.pushTask(&Task5);

pool.pushTask(&Task1);

pool.pushTask(&Task1);

Worker worker;

pool.pushTask((bind(&Worker::run, &worker)));

pool.pushTask([&](){worker.run(); });//1

Sleep(15000);

我们可以执行指定的函数,也可以将要执行的代码放入lambda表达式的函数体中,正如1处所示,这样就能在其他线程中执行指定的代码了。
2021/12/29 更新之二
我们发现,main最后都要调用sleep函数来避免主线程在线程任务完成之前就退出,因此我们希望添加一个接口,等待线程所有任务完成,改进如下,其他函数同前:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);
	void waitAllTask();

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;
	atomic<int> busyCount;
	bool bStop;

	void taskConsumer();
	mutex taskQueueMutex;
	condition_variable taskQueueCond;
	condition_variable taskFinishedCond;
};

void ThreadPool::taskConsumer()
{
	while (!bStop)
	{
		unique_lock<mutex> lk(taskQueueMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		busyCount++;
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
		busyCount--;
		taskFinishedCond.notify_one();
	}
}

void ThreadPool::waitAllTask()
{
	unique_lock<mutex> lk(taskQueueMutex);
	taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任务均已完成
}

这样我们只要调用waitAllTask就可以等待所有任务完成啦。

到此这篇关于C++线程池实现代码的文章就介绍到这了,更多相关C++线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C语言利用cJSON解析JSON格式全过程

    C语言利用cJSON解析JSON格式全过程

    cJSON是用于解析json格式字符串的一套api,非常好用,下面这篇文章主要给大家介绍了关于C语言利用cJSON解析JSON格式的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • C语言函数的递归和调用实例分析

    C语言函数的递归和调用实例分析

    一个函数在它的函数体内调用它自身称为递归调用。这种函数称为递归函数。C语言允许函数的递归调用。在递归调用中,主调函数又是被调函数。执行递归函数将反复调用其自身,每调用一次就进入新的一层
    2013-07-07
  • C语言 for循环示例详解

    C语言 for循环示例详解

    本文将详细介绍for循环的用法并提供相关的可编译运行的C代码示例,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,感兴趣的朋友一起看看吧
    2023-06-06
  • C++ 自增自减运算符的实现示例

    C++ 自增自减运算符的实现示例

    本文主要介绍了C++ 自增自减运算符的实现示例,自增和自减运算符在C++中主要用于循环语句中,使循环变量的值自动+1或者-1,具有一定的参考价值,感兴趣的可以了解一下
    2023-08-08
  • C语言实现代码雨效果

    C语言实现代码雨效果

    这篇文章主要为大家详细介绍了C语言实现代码雨效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • C语言基础指针详解教程

    C语言基础指针详解教程

    此处对于指针做一些简要的介绍,作者实属初学,写博客也是作者学习的一个过程,难免文章中有内容理解不到位或者有不当之处,还请朋友们不吝指正,希望大家给予支持
    2021-11-11
  • C++ Boost Xpressive示例分析使用

    C++ Boost Xpressive示例分析使用

    Boost是为C++语言标准库提供扩展的一些C++程序库的总称。Boost库是一个可移植、提供源代码的C++库,作为标准库的后备,是C++标准化进程的开发引擎之一,是为C++语言标准库提供扩展的一些C++程序库的总称
    2022-11-11
  • cocos2dx实现刮奖效果

    cocos2dx实现刮奖效果

    这篇文章主要为大家详细介绍了cocos2dx实现刮奖效果,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-12-12
  • C++中的对象数组详细解析

    C++中的对象数组详细解析

    在建立数组时,同样要调用构造函数。如果有50个元素,就需要调用50次构造函数。在需要的时候,可以在定义数组时提供实参以实现初始化
    2013-10-10
  • C++中inet_pton、inet_ntop函数的用法

    C++中inet_pton、inet_ntop函数的用法

    这篇文章主要介绍了C++中inet_pton、inet_ntop函数的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-08-08

最新评论