Linux之生产者消费者模型用法解读
一、生产者消费者模型
举个例子:

我们现实生活中:工厂,超市,人[消费者]之间的关系就是一个典型的生产者消费者模型
这个模型的基本工作流程是:
- 工厂制造商品供货给超市,消费者到超市购买商品
现实生活中,为什么会优化出这样的一种模型?
为什么消费者不直接去工厂买东西?
主要有以下3个原因:
为了效率
1.对于工厂来说,消费者如果直接来买商品,一个消费者一次购买的商品数量非常有限,而且一个工厂的产品种类一般并不多,所以消费者还得去不同工厂买东西,工厂为了防止卖不完,只能减少商品的生产,即降低了工厂的生产效率,但是如果卖给超市,就可以一次性卖一大车货物,多找几个超市,就可以做到:生产多少卖多少了
2.对于消费者,工厂的占地面积比较大,所以一般都距离居民地比较远,而超市一般都比工厂占地面积小的多,离消费者很近,所以消费者去超市买东西比去工厂更快,而且一个工厂的产品种类一般并不多,但是超市售卖的产品种类繁多
有了超市就可以做到生产者和消费者之间的解藕
1.一个工厂关门了,超市换一个工厂进货就行了,只要进的货还是一样的,就丝毫不影响消费者消费。事实就是如此,我们消费者从超市买东西时,根本就不知道这个商品是哪个工厂生产的,我们也不关心消费者只管购买商品就行了
2.到超市消费者是谁,是什么群体,工厂也不知道,也不关心,这个是超市该关心的,因为工厂只把商品卖给超市,超市卖给谁和工厂无关,工厂只管生产就行了
3.有了超市的存在
- 左侧的工厂发生变化不影响右侧的消费者
- 右侧的消费者发生变化不影响左侧的工厂
- 这不就
解藕了吗?
支持忙闲不均
有了超市这个缓冲区的存在
1.在购物潮之前,超市就可以通知工厂多生产一些商品,超市多进货,方便应对更多消费者
2.在囤积的商品多的时候,超市也可以搞活动吸引消费者,并且通知工厂生产地慢一点
超市本质上就是工厂和消费者之间的缓存
即:超市支持工厂“预加载”产品,消费者可以一定程度“预定”产品
上述例子对应到计算机中的生产者消费者模型就是:
- ①工厂:生产者线程
- ②消费者:消费者线程
- ③超市:以某种数据结构组织的内存区域
- ④商品:数据
和线程安全再对应一下:
- ①超市:
共享/临界资源 - ②我们要研究生产者消费者模型,就需要研究清楚多个生产者和多个消费者之间的同步互斥关系!
一共有3种关系:
1.生产者和生产者之间:互斥
- 因为如果多个生产者同时向超市(共享资源)写数据,可能会产生线程安全问题
- 因为同时有多个线程对同一份共享资源进行修改,很容易互相覆盖,出现并发线程安全问题 即:线程之间切换时可能导致数据不一致问题
- 其次共享资源的空间就那么大,多个生产者线程同时去写的话,一个线程写的多了,另一个线程就只能少写一点了
- 或者线程a刚向共享资源中的一个位置写了一个1,线程b就跑过来把那个位置的1改成10了
- 所以生产者线程之间是竞争关系
2.消费者和消费者之间:互斥
- 消费者线程,虽然是进入临界资源读取数据,但是其实也会对共享资源进行修改
- 即:线程拿走了一个数据a,其他线程看待这个这个数据a就是过期数据了(和到超市买东西一样,买了一包方便面,超市就少了一包方便面)
- 既然会修改,那么多个消费者线程同时进入共享资源进行修改的话,也可能会出现并发切换导致的线程安全问题
- 比如:线程a和线程b同时访问共享资源,线程a先把数据X消费走了,但是因为是同时进入线程b任认为数据X还是有效的,也拿了数据X(如果共享资源是数组,那可以把数据X理解为数组中的一个元素)
- 就会导致一份数据,被使用了两次
3.生产者和消费者之间:互斥+同步
- 因为不管是生产者线程还是消费者线程,访问共享资源时,都会进行修改,多线程并发修改共享资源,就可能会出现并发线程安全问题
- 所以它们首先得是
互斥的 - 但是如果只有互斥,那么消费者如果不知道有没有数据,就只能不断地去轮询检测,每次轮询都要申请锁,那生产者线程就可能很难抢到锁,就一直生产不了数据,消费者线程就一直拿不到数据,造成恶性循环
- 就可能会导致锁的饥饿问题
- 所以需要
同步关系来提高生产者和消费者模型的效率 即: - 设置条件变量,让消费者线程在条件变量的等待队列中等,当生产者线程生产数据之后,才唤醒消费者线程,去读取数据
二、生产者消费者模型的阻塞队列版本
生产者消费者模型一般会使用一个阻塞队列来作为共享资源,进而实现多线程协作
生产者消费者模型的阻塞队列的特点:
- 如果队列为空,那么一个消费者线程如果来拿数据,它就会被阻塞
- 如果队列为满,那么一个生产者线程如果还要向队列里写数据,它就会被阻塞
- 如果队列不空也不满,那么生产者就可以向队列尾部写数据,消费者就可以向队列头部拿数据
阻塞队列类的简单实现
成员变量
1.存储数据的容器
直接使用STL的queue,因为数据的类型不确定,所以阻塞队列类是模板类
2.一把锁
阻塞队列自己会被所有线程看见,所以它是共享资源,所以需要锁来保护自己
要几把锁呢?
因为所有生产者线程之间,所有消费者线程之间,以及生产者和消费者之间
都是互斥的,所以它们得用同一把锁来实现互斥
3.生产者线程的条件变量
因为在满足一定条件(比如:阻塞队列满了,或者阻塞队列满了4/5了等)时,可以让所有生产者线程暂时暂停生产(即去条件变量的等待队列中阻塞)
4.消费者线程的条件变量
因为在满足一定条件(比如:阻塞队列为空,或者阻塞队列空了4/5了等)时,可以让所有消费者线程暂时暂停消费(即去条件变量的等待队列中阻塞)
为什么要搞两个条件变量?
一个条件变量虽然也可以实现生产者线程和消费者线程之间的同步
但是实现起来非常麻烦,而且不能区分条件变量的等待队列下的是生产者线程还是消费者线程
而且
两个条件变量可以很好地支持:
生产者消费者模型的第3个优点:忙闲不均
5.int _cap:阻塞队列的最大容量
6.int _csleep_num:在消费者条件变量的等待队列中等待的线程个数
7.int _psleep_num:在生产者条件变量的等待队列中等待的线程个数
6和7成员变量的存在主要是为了方便实现线程之间的互相唤醒机制
(即生产者线程生产了之后,可以唤醒消费者线程来消费,反之同理)
成员函数
- Equeue:生产数据
void Equeue(const T& in)
{
pthread_mutex_lock(&_mutex);
//生产者调用
while(IsFull())
{
_psleep_num++;
cout << "生产者, 进入休眠了:" << _psleep_num << endl;
pthread_cond_wait(&_full_cond, &_mutex);
_psleep_num--;
}
//100% 队列有空间
_q.push(in);
if(_csleep_num > 0)
{
pthread_cond_signal(&_empty_cond);
cout << "唤醒消费者..." << endl;
}
pthread_mutex_unlock(&_mutex);
}
代码细节:
伪唤醒问题的解决
- 即:判断线程是否要进入条件变量的等待队列时,判断不能用if而要用while
- 不然就有可能出现伪唤醒问题:即在条件变量下等待的线程,唤醒条件其实并不满足
- 但是因为程序员编码的问题,可能意外被唤醒了
例如:
- 生产者消费者模型中,因为阻塞队列中没有数据,所以全部都5个消费者线程在条件变量的等待队列中等待
- 生产者线程生产了一个数据,意外地把唤醒了多个消费者线程
- 然后一个消费者线程抢到锁之后,把阻塞队列中那唯一的一个数据抢走了,它解锁之后
- 因为唤醒了多个消费者线程
- 所以锁可能又被一个消费者线程抢到了,但是此时阻塞队列中根本没有数据!
此时:
1.如果此时是使用if进行“线程是否需要进入条件变量的等待队列"的判断的这个被伪唤醒的线程,重新申请并拿到锁之后,就直接"饿虎出笼"去肆意妄为了
2.如果是使用while进行“线程是否需要进入条件变量的等待队列”的判断的,这个被唤醒的线程,重新申请并拿到锁之后,也还是不能直接出循环,因为要再判断一下循环条件是否不满足了
虽然循环条件是"线程需要进入等待队列"的条件,但是如果这个条件满足,不就意味着线程不应该被唤醒吗?
- Pop:获取并删除数据
T Pop()
{
//消费者调用
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
_csleep_num++;
pthread_cond_wait(&_empty_cond, &_mutex);
_csleep_num--;
}
T data = _q.front();
_q.pop();
if(_psleep_num > 0)
{
pthread_cond_signal(&_full_cond);
cout << "唤醒生产者.." << endl;
}
pthread_mutex_unlock(&_mutex);
return data;
}
- IsEmpty:阻塞队列是否为空
- IsFull:阻塞队列是否为满
bool IsFull()
{
return _q.size() >= _cap;
}
bool IsEmpty()
{
return _q.empty();
}
源码
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
using namespace std;
int defalutcap = 5;
template<typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _q.size() >= _cap;
}
bool IsEmpty()
{
return _q.empty();
}
public:
BlockQueue(int cap = defalutcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full_cond, nullptr);
pthread_cond_init(&_empty_cond, nullptr);
}
void Equeue(const T& in)
{
pthread_mutex_lock(&_mutex);
//生产者调用
while(IsFull())
{
_psleep_num++;
cout << "生产者, 进入休眠了:" << _psleep_num << endl;
pthread_cond_wait(&_full_cond, &_mutex);
_psleep_num--;
}
//100% 队列有空间
_q.push(in);
if(_csleep_num > 0)
{
pthread_cond_signal(&_empty_cond);
cout << "唤醒消费者..." << endl;
}
pthread_mutex_unlock(&_mutex);
}
T Pop()
{
//消费者调用
pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
_csleep_num++;
pthread_cond_wait(&_empty_cond, &_mutex);
_csleep_num--;
}
T data = _q.front();
_q.pop();
if(_psleep_num > 0)
{
pthread_cond_signal(&_full_cond);
cout << "唤醒生产者.." << endl;
}
pthread_mutex_unlock(&_mutex);
return data;
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full_cond);
pthread_cond_destroy(&_empty_cond);
}
private:
//临界资源
queue<T> _q;
//大小
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _full_cond;
pthread_cond_t _empty_cond;
int _csleep_num;//消费者休眠的个数
int _psleep_num;//生产者休眠的个数
};
三、POSIX信号量
POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。
初始化信号量
sem_init
作用: 用于初始化一个未命名的POSIX信号量(也称为匿名信号量),通常用于线程间同步或共享内存的进程间同步。
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- sem_t *sem: 指向要初始化的信号量对象的指针。
- pshared: 0表⽰线程间共享,⾮零表⽰进程间共享
- value:信号量初始值
返回值
- 成功:返回 0
- 失败:非零
销毁信号量
sem_destroy
int sem_destroy(sem_t *sem);
等待信号量
sem_wait
是 POSIX 信号量的 P操作(等待/获取信号量),用于对信号量进行原子减1操作。
它的主要作用是:
- 如果信号量值 > 0:立即将其减 1,线程继续执行。
- 如果信号量值 = 0:线程阻塞,直到信号量值变为正数(其他线程或进程调用 sem_post 释放资源)。
int sem_wait(sem_t *sem); //P()
发布信号量
sem_post
是 POSIX 信号量的 V操作(释放/增加信号量),用于对信号量进行原子加1操作。它的主要作用是:
- 将信号量的值 +1,表示释放一个资源。
- 如果有线程阻塞在 sem_wait,则唤醒其中一个线程(取决于系统调度策略)。
int sem_post(sem_t *sem);//V()
四、基于环形队列的⽣产消费模型
上⼀节⽣产者-消费者的例⼦是基于queue的,其空间可以动态分配,现在基于固定⼤⼩的环形队列重写这个程序(POSIX信号量):
Sem的封装
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
namespace SemMoudle
{
const int defaultvalue = 1;
class Sem
{
public:
Sem(unsigned int sem_vlaue = defaultvalue)
{
sem_init(&_sem, 0, sem_vlaue);
}
void P()
{
//等待信号量,会将信号量的值减1
int n = sem_wait(&_sem);//原子的
(void)n;
}
void V()
{
//发布信号量,释放资源,会将信号量的值加1
int n = sem_post(&_sem);//原子的
(void)n;
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
}
Mutex的封装
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModue
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
(void)n;
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
(void)n;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
pthread_mutex_t* Get()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex& mutex):_mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex& _mutex;
};
}
环形队列
#pragma once
#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"
using namespace std;
static const int gcap = 5;
using namespace SemMoudle;
using namespace MutexModue;
//环形队列
template<typename T>
class RingQueue
{
public:
RingQueue(int cap = gcap)
:_cap(cap),
_rq(cap),
_blank_sem(cap),
_p_step(0),
_data_sem(0),
_c_step(0)
{}
void Equeue(const T& in)
{
//生产者
//1.申请信号量,空位置信号量
_blank_sem.P();
{
LockGuard lockguard(_pmutex);
//2.生产
_rq[_p_step] = in;
//3.更新下标
++_p_step;
//4.维持环形特性
_p_step %= _cap;
}
_data_sem.V();
}
void Pop(T* out)
{
//消费者
//1.申请信号量,数据信号量
_data_sem.P();
{
LockGuard lockguard(_cmutex);
//2.消费
*out = _rq[_c_step];
//3.更新下标
++_c_step;
//4.维持环形特性
_c_step %= _cap;
}
_blank_sem.V();
}
private:
vector<T> _rq;
int _cap;
//生产者
Sem _blank_sem;//空位置
int _p_step;
//消费者
Sem _data_sem;//数据
int _c_step;
//维护多生产,多消费,2把锁
Mutex _cmutex;
Mutex _pmutex;
};
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Apache Pulsar 微信大流量实时推荐场景下实践详解
这篇文章主要为大家介绍了Apache Pulsar 微信大流量实时推荐场景下实践详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2022-11-11


最新评论