C++中线程池ThreadPool源码解析

 更新时间:2022年09月05日 08:53:27   作者:喂喂喂–学编程  
线程池(threadpool)作为五大池之一(内存池、连接池、线程池、进程池、协程池),线程池的应用非常广泛,不管事客户端程序还是后台服务端,都是提高业务处理能力的必备模块

什么是线程

线程是进程中的⼀个执⾏单元,负责当前进程中程序的执⾏,⼀个进程中⾄少有⼀个线程。⼀个进程中是可以有多个线程的,这个应⽤程序也可以称之为多线程程序。多线程程序作为一种多任务、并发的工作方式

并发与并⾏

早期计算机的 CPU 都是单核的,一个 CPU 在同一时间只能执行一个进程/线程,当系统中有多个进程/线程等待执行时,CPU 只能执行完一个再执行下一个。为了提高 CPU 利用率,减少等待时间,人们提出了一种 CPU 并发工作的理论.

并发:指两个或多个事件在同⼀个时间段内发⽣,当系统中有多个进程/线程等待执行时,CPU只能执行完一个再执行下一个。

并⾏:指两个或多个事件在同⼀时刻发⽣(同时发⽣),多核 CPU 的每个核心都可以独立地执行一个任务,而且多个核心之间不会相互干扰。在不同核心上执行的多个任务,是真正地同时运行,这种状态就叫做并行。。

什么是线程池

顾名思义:线程池就是线程的池子,有很多线程,但是数量不会超过池子的限制。需要用到多执行流进行任务出路的时候,就从池子中取出一个线程去处理,线程池就类似于一个实现了消费者业务的生产者与消费者模型。

本质上:这就是一个基于生产者消费者模型来实现的线程池,那么同样遵守三种规则,生产者和生产者之间存在互斥,处理任务的线程之间存在互斥关系,生产者和消费者之间存在同步和互斥关系

线程池解决什么问题

线程池维护者多个线程,等待着分配可并发执行的任务,可以避免在短时间创建和销毁大量线程带来时间成本。

总结为三点:

1.避免线程因为不限制创建数量导致的资源耗尽风险

2.任务队列缓冲任务,支持忙线不均的作用

3.节省了大量频繁创建/销毁线程的时间成本

怎么用线程池

下面展示一些 threadpool实现,源码来自openharmony。

/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL
#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>
namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
    /**
     * disallow default constructor
     */
    ThreadPool() = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(ThreadPool &&) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(ThreadPool &&) = delete;
    /**
     * make DEFAULT_THREAD_NUM threads
     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
     */
    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    {
        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
            std::thread([this] { RunTask(); }).detach();
        }
    }
    /**
     * if ~ThreadPool, terminate all thread
     */
    ~ThreadPool()
    {
        // set needRun_ = false, and notify all the thread to wake and terminate
        needRun_ = false;
        while (runningNum_ > 0) {
            needRunCondition_.notify_all();
        }
    }
    /**
     * push it to taskQueue_ and notify a thread to run it
     * @param task new task to Execute
     */
    void Push(const Task &task)
    {
        PushTask(task);
        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
            std::thread([this] { RunTask(); }).detach();
        }
        needRunCondition_.notify_all();
    }
private:
    bool IsQueueEmpty()
    {
        std::lock_guard<std::mutex> guard(mutex_);
        return taskQueue_.empty();
    }
    bool GetTask(Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        // if taskQueue_ is empty, means timeout
        if (taskQueue_.empty()) {
            return false;
        }
        // if run to this line, means that taskQueue_ is not empty
        task = taskQueue_.top();
        taskQueue_.pop();
        return true;
    }
    void PushTask(const Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        taskQueue_.push(task);
    }
    class NumWrapper {
    public:
        NumWrapper() = delete;
        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
        {
            ++num_;
        }
        ~NumWrapper()
        {
            --num_;
        }
    private:
        std::atomic<uint32_t> &num_;
    };
    void Sleep()
    {
        std::mutex needRunMutex;
        std::unique_lock<std::mutex> lock(needRunMutex);
        /**
         * if the thread is waiting, it is idle
         * if wake up, this thread is not idle:
         *     1 this thread should return
         *     2 this thread should run task
         *     3 this thread should go to next loop
         */
        NumWrapper idleWrapper(idleThreadNum_);
        (void)idleWrapper;
        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
                                   [this] { return !needRun_ || !IsQueueEmpty(); });
    }
    void RunTask()
    {
        NumWrapper runningWrapper(runningNum_);
        (void)runningWrapper;
        while (needRun_) {
            Task task;
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            Sleep();
            if (!needRun_) {
                return;
            }
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            if (runningNum_ > DEFAULT_THREAD_NUM) {
                return;
            }
        }
    }
private:
    /**
     * other thread put a task to the taskQueue_
     */
    std::mutex mutex_;
    std::priority_queue<Task> taskQueue_;
    /**
     * 1 terminate the thread if it is idle for timeout_ seconds
     * 2 wait for the thread started util timeout_
     * 3 wait for the thread notified util timeout_
     * 4 wait for the thread terminated util timeout_
     */
    uint32_t timeout_;
    /**
     * if idleThreadNum_ is zero, make a new thread
     */
    std::atomic<uint32_t> idleThreadNum_;
    /**
     * when ThreadPool object is deleted, wait until runningNum_ is zero.
     */
    std::atomic<uint32_t> runningNum_;
    /**
     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
     */
    std::atomic_bool needRun_;
    std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */

这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。

#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
    Task() = default;
    explicit Task(std::string context){
        mContext = context;
    }
    bool operator<(const Task &e) const{
        return priority_ < e.priority_;
    }
    void Execute(){
        std::lock_guard<std::mutex> guard(mutex_);
        std::cout <<  "task is execute,name is:"<<mContext<<std::endl;
    }
public:
    uint32_t priority_;
private:
    std::string mContext;
    static std::mutex mutex_;
};
#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500
std::mutex Task::mutex_;
static int threadpoolTest(){
    static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);
    Task task1("name_1");
    Task task2("name_2");
    Task task3("name_3");
    Task task4("name_4");
    threadPool_.Push(task1);
    threadPool_.Push(task2);
    threadPool_.Push(task3);
    threadPool_.Push(task4);
    return 0;
}
TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    threadpoolTest();
}

以上该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法,不过整体实现还是很不错的,通俗易懂!

总结

线程池的应用场景:当有大量的数据请求,需要多执行流并发/并行处理时,可以采用线程池来处理任务,可避免大量线程频繁创建或销毁所带来的时间成本,也可避免在峰值压力下,系统资源耗尽的风险。

到此这篇关于C++中线程池ThreadPool源码解析的文章就介绍到这了,更多相关C++ ThreadPool内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C++实现动态顺序表(vector)

    C++实现动态顺序表(vector)

    这篇文章主要为大家详细介绍了C++实现动态顺序表,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • Qt利用QSortFilterProxyModel代理实现自定义排序与联合过滤

    Qt利用QSortFilterProxyModel代理实现自定义排序与联合过滤

    QsortFilterProxyModel类用来为model和view之间提供强大的排序和过滤支持。这篇文章将利用QSortFilterProxyModel代理实现自定义排序与联合过滤,需要的可以参考一下
    2022-11-11
  • Qt键盘事件实现图片在窗口上下左右移动

    Qt键盘事件实现图片在窗口上下左右移动

    这篇文章主要为大家详细介绍了Qt键盘事件实现图片在窗口上下左右移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-08-08
  • C++ 动态内存管理详情解说

    C++ 动态内存管理详情解说

    这篇文章主要介绍了C++ 动态内存管理详情解说,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下,希望对你的学习有所帮助
    2022-07-07
  • C++11 智能指针的具体使用

    C++11 智能指针的具体使用

    本文主要介绍了C++11 智能指针的具体使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Qt基础开发之Qt多线程类QThread与Qt定时器类QTimer的详细方法与实例

    Qt基础开发之Qt多线程类QThread与Qt定时器类QTimer的详细方法与实例

    这篇文章主要介绍了Qt基础开发之Qt多线程类QThread与Qt定时器类QTimer的详细方法与实例,需要的朋友可以参考下
    2020-03-03
  • C++ 转换函数用法案例详解

    C++ 转换函数用法案例详解

    这篇文章主要介绍了C++ 转换函数用法案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • C++动态加载so/dll库的实现

    C++动态加载so/dll库的实现

    本文主要介绍了C++动态加载so/dll库的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • 详解Qt使用QImage类实现图像基本操作

    详解Qt使用QImage类实现图像基本操作

    这篇文章主要介绍了Qt如何利用QImage类实现对图像的基本操作,包括图像显示、图像缩放、图像旋转等,感兴趣的小伙伴可以跟随小编一起动手尝试一下
    2022-06-06
  • C++实现哈夫曼树简单创建与遍历的方法

    C++实现哈夫曼树简单创建与遍历的方法

    这篇文章主要介绍了C++实现哈夫曼树简单创建与遍历的方法,对于C++算法的学习来说不失为一个很好的借鉴实例,需要的朋友可以参考下
    2014-07-07

最新评论