C++简单实现消息队列的示例代码

 更新时间:2025年11月07日 10:06:29   作者:两片空白  
消息队列是一种应用间的通讯方式,消息发送后可以立即放回,由消息系统来确保消息的可靠传递,本文主要介绍了C++简单实现消息队列的示例代码,感兴趣的可以了解一下

简介

消息队列是一种应用间的通讯方式,消息发送后可以立即放回,由消息系统来确保消息的可靠传递。消息发布者只需要将消息发布到消息队列中,而不需要管谁来取。消息使用者只管从消息队列中取消息而不管谁发布的。这样发布者和使用者都不同知道对方的存在。
消息队列普遍使用在生产者和消费者模型中。

  • 优点
  1. 应用解耦: 应用之间不用那么多的同步调用,发消息到消息队列就行,消费者可以自己消费,消费生产者不用管了,降低应用之间的耦合。
  2. 降低延时: 应用之间用同步调用,需要等待对方响应,等待时间比较长,用消息之后,发送消息到消息队列就行,应用就可以返回了,对客户来讲降低了应用延时。
  3. 削峰填谷:请求比较多的时候,应用处理不过来,会丢弃请求;请求比较少时,应用不饱和。
    请求比较多时,把请求放到消息队列,消费者按特定处理速度来处理,请求少时,也让应用有事情可以做;能做到忙时不丢请求,闲时不闲置应用资源。

主流的消息队列:Kafka、ActiveMQ、RabbitMQ、RocketMQ

下面使用C++实现一个简单的消息队列。

具体实现

  • 消息队列类
    里面主要包含一个数组和一个队列,都保存消息。
    应用从数组中拿消息处理,当数组满的时候,消息保存到队列中,当数组中消息处理完,从队列中取消息,再处理。
    数组实现的是一个环形数组,记录下消息写游标和读游标,超过数组大小,对数组大小取余。
//消息长度和消息
typedef std::pair<size_t, char*> msgPair;
#define MsgQueueSize 102400

class zMsgQueue
{
  //消息对,first表示是否存放消息
  typedef std::pair<bool, msgPair> msgQueue;
public:
  zMsgQueue();
  ~zMsgQueue();
  void* msgMalloc(const size_t len);
  void msgFree(void* p);
  //获得一个消息
  msgPair* get();
  //放入一个消息
  bool put(const void* msg, size_t msgLen);
  //将队列中的消息放到消息数组中
  bool putMsgQueue2Arr();
  //删除一个消息
  void erase();
  bool empty();

  bool msgArrEmpty();
private:
  void clear();
  


  // 保存正在处理的消息
  msgQueue msgArr_[MsgQueueSize];
  // 保存等待处理的消息
  std::queue<msgPair> msgQueue_;
  
  //消息写游标
  size_t queueWrite_;
  //消息读游标
  size_t queueRead_;
};

实现:

zMsgQueue::zMsgQueue()
{
  bzero(msgArr_, sizeof(msgArr_));
  queueWrite_ = 0;
  queueRead_ = 0;
}


zMsgQueue::~zMsgQueue()
{
  clear();
}

void* zMsgQueue::msgMalloc(const size_t len)
{
  char* p = (char*)malloc(len + 1);
  return (void*)(p + 1);
}

void zMsgQueue::msgFree(void* p)
{
  free((char*)p - 1);
}

  //获得一个消息
msgPair* zMsgQueue::get()
{
  if(queueRead_ >= MsgQueueSize)
    return NULL;

  if(msgArrEmpty())
    putMsgQueue2Arr();
  
  msgPair* ret = NULL;
  if(msgArr_[queueRead_].first)
    ret = &msgArr_[queueRead_].second;

  return ret;
}

//放入一个消息
bool zMsgQueue::put(const void* msg, size_t msgLen)
{
  char* buf = (char*)msgMalloc(msgLen);
  if(buf)
  {
    bcopy(msg, buf, msgLen);
    
    //先将队列中的消息放到数组中
    //数组中还有位置直接放到数组中
    //没有位置放到队列中
    if(!putMsgQueue2Arr() && !msgArr_[queueWrite_].first)
    {
      msgArr_[queueWrite_].first = true;
      msgArr_[queueWrite_].second.first = msgLen;
      msgArr_[queueWrite_].second.second = buf;
      queueWrite_++;
      queueWrite_ %= MsgQueueSize;
    }
    else 
    {
      msgQueue_.push(std::make_pair(msgLen, buf));
    }
    return true;
  } 
  return false;
}

//将队列中的消息放到消息数组中
bool zMsgQueue::putMsgQueue2Arr()
{
  bool isLeft = false;
  while(!msgQueue_.empty())
  {
    if(!msgArr_[queueWrite_].first)
    {
      msgArr_[queueWrite_].first = true;
      msgArr_[queueWrite_].second = msgQueue_.front();
      queueWrite_++;
      queueWrite_ %= MsgQueueSize;
      msgQueue_.pop();
    }
    else 
    {
      isLeft = true;
      break;
    }
  }
  return isLeft;
}

//删除一个消息
void zMsgQueue::erase()
{
  if(!msgArr_[queueRead_].first)
    return;

  msgFree(msgArr_[queueRead_].second.second);
  msgArr_[queueRead_].second.second = NULL;
  msgArr_[queueRead_].second.first = 0;
  msgArr_[queueRead_].first = false;

  queueRead_++;
  queueRead_ %= MsgQueueSize;
}

void zMsgQueue::clear()
{
  //队列中还有消息
  while(putMsgQueue2Arr())
  {
    //数组中还有消息
    while(get())
    {
      erase();
    }
  }
  //数组中还有消息
  while(get())
  {
    erase();
  }
}

bool zMsgQueue::empty()
{
  if(putMsgQueue2Arr()) return false;

  return msgArrEmpty();
}

bool zMsgQueue::msgArrEmpty()
{
  if(queueRead_ == queueWrite_ && !msgArr_[queueRead_].first)
  {
    return true;
  }
  return false;
}
  • 消息队列的封装
    对消息队列的封装主要是为了对消息进行解析和处理。
    消息解析和处理函数定义成了虚函数,当需要使用消息队列并处理消息时,只需要继承消息队列,然后重写虚函数,进行对应处理即可。
    类中还使用到了读写锁,当多线程的情况下,消息队列是一个临界资源,线程共享,需要进行上锁。单线程的情况下不需要加锁。
//T表示使用的消息队列
//msgT表示消息的类型,有的需要消息头,消息正文等,需要解析,这里是直接使用
template<class T=zMsgQueue, class msgT=char>
class messageQueue : public rwLocker
{
public:
  messageQueue()
  {}
  ~messageQueue()
  {}

  bool putMsg(const msgT* msg, const size_t msgLen)
  {
    rwLocker::wlock();
    msgQueue_.put(msg, msgLen);
    rwLocker::unlock();
    return true;
  }
  //解析消息,处理消息
  virtual bool msgParse(const msgT* msg, const size_t msgLen) = 0;
  //获取消息,解析消息,处理消息
  bool doCmd()
  {
    rwLocker::wlock();
    msgPair* msg = msgQueue_.get();
    while(msg)
    {
      msgParse(msg->second, msg->first);  
      msgQueue_.erase();
      msg = msgQueue_.get();
    }
    rwLocker::unlock();
    return true;
  }

  bool empty()
  {
    return msgQueue_.empty();
  }

private:
  T msgQueue_;
};
  • 读写锁的封装
    读写锁:可以多个线程进行读,只能一个线程进行写。写时独享资源,读时共享资源。写锁的优先级高。
  • 为什么读写锁需要读锁?

为了防止其他线程请求写锁。一个线程请求了读锁,其他线程在请求写锁会阻塞,但是请求读锁不会阻塞。一个线程请求了写锁,其他线程请求读锁和写锁都会阻塞。

#include <pthread.h>

class rwLock 
{
public:
  rwLock()
  {
    pthread_rwlock_init(&rwlc_, NULL);
  }

  ~rwLock()
  {
    pthread_rwlock_destroy(&rwlc_);
  }

  void rlock()
  {
    pthread_rwlock_rdlock(&rwlc_);
  }

  void wlock()
  {
    pthread_rwlock_wrlock(&rwlc_);
  }

  void unlock()
  {
    pthread_rwlock_unlock(&rwlc_);
  }

private:
    pthread_rwlock_t rwlc_;
};


class rwLocker
{
public:
  void rlock()
  {
    rwlc_.rlock();
  }

  void wlock()
  {
    rwlc_.wlock();
  }

  void unlock()
  {
    rwlc_.unlock();
  }

private:
  rwLock rwlc_;
};
  • Makefile:
# ini1=main.cpp 
# in2=messageQueue.cpp
out=main 
cc=g++
std=-std=c++11 -lpthread

#$(out):$(in1) $(in2)
$(out): main.cpp messageQueue.cpp rwlock.h
	$(cc) $^ -o $@ $(std)


.PHONY:clean
clean:
	rm -rf $(out)
  • 代码测试
    实现一个类继承消息队列,重写消息处理函数。
    定义对象,调用doCmd函数即可。
#include "messageQueue.h"

class test : public messageQueue<>
{
  bool msgParse(const char* msg, const size_t msgLen)
  {
    std::cout << msgLen << ":" << msg << std::endl;
    return true;
  }
};

int main()
{
  //模拟客户端发送消息
  char buf[256] = "hello world!";
  test t;
  //消息队列放消息
  t.putMsg(buf, strlen(buf));
  //处理消息
  t.doCmd();
  return 0;
}

到此这篇关于C++简单实现消息队列的示例代码的文章就介绍到这了,更多相关C++ 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C语言动态内存函数(malloc、calloc、realloc、free)详解

    C语言动态内存函数(malloc、calloc、realloc、free)详解

    在C语言中,动态内存函数是块重要的知识点,以往,我们开辟空间都是固定得,数组编译结束后就不能继续给它开辟空间了,开辟的空间满了,就不能在开辟空间了,学习本文章,我们就可以解决这个问题,向内存申请空间,感兴趣的小伙伴跟着小编一起来看看吧
    2023-08-08
  • C语言实现经典24点算法

    C语言实现经典24点算法

    这篇文章主要为大家详细介绍了C语言实现经典24点算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-10-10
  • opencv3/C++ PHash算法图像检索详解

    opencv3/C++ PHash算法图像检索详解

    今天小编就为大家分享一篇opencv3/C++ PHash算法图像检索详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • C++中的内存对齐实例详解

    C++中的内存对齐实例详解

    这篇文章主要介绍了C++中的内存对齐实例详解的相关资料,这里不仅提供实现方法及代码还提供了手工制作图,来帮助到大家理解这部分知识,需要的朋友可以参考下
    2017-07-07
  • C++ 虚函数及虚函数表详解

    C++ 虚函数及虚函数表详解

    这篇文章主要介绍了c++ 虚函数及虚函数表详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-11-11
  • C语言实现学生考勤系统

    C语言实现学生考勤系统

    这篇文章主要为大家详细介绍了C语言实现学生考勤系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • C语言中回调函数的含义与使用场景详解

    C语言中回调函数的含义与使用场景详解

    这篇文章主要为大家详细介绍了C语言中回调函数的含义与使用场景,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • C语言实现贪吃蛇游戏代码

    C语言实现贪吃蛇游戏代码

    这篇文章主要为大家详细介绍了C语言实现贪吃蛇游戏代码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-09-09
  • C++判断矩形相交的方法

    C++判断矩形相交的方法

    这篇文章主要介绍了C++判断矩形相交的方法,涉及C++针对平面坐标数学运算的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • sigsetjmp的用法总结

    sigsetjmp的用法总结

    sigsetjmp()会保存目前堆栈环境,然后将目前的地址作一个记号,而在程序其他地方调用siglongjmp()时便会直接跳到这个记号位置,然后还原堆栈,继续程序的执行
    2013-09-09

最新评论