使用Java代码实现RocketMQ的生产与消费消息

 更新时间:2024年07月31日 08:43:23   作者:小威要向诸佬学习呀  
这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点,并通过代码示例介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

RocketMQ其他组件

在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。

监听器(Listener)

定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)

偏移量(Offset)

定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。

偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。

简而言之就是它能告诉消费者已经消费到哪一条消息!!

  • 集群模式:在集群消费模式下,消息队列的消费进度保存在Broker端。消费者每次消费完消息后,会将最新的消费进度同步到Broker,以便在消费者重启或者故障转移的时候能够从上一次消费的位置继续消费。
  • 广播模式:在广播消费模式下,消息队列的消费进度保存在消费者本地。因为广播模式下每条消息都会被所有消费者消费,所以不需要在Broker端保存消费进度。

所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)

命名服务器(NameServer)

定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。

它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息,从而进行消息的投递和消费。

消息组成

  • Topic:消息主题,对不同的业务消息进行分类。
  • Tag:消息标签,进一步区分某个Topic下的消息分类。使用Tag可以实现对Topic中的消息进行过滤。消费者可以根据Tag来订阅自己感兴趣的消息,而不是接收Topic下的所有消息。
  • Message Body:消息体,消息的实际内容。
  • Keys:消息的键值,标识消息的唯一性。在RocketMQ中,每个消息都可以设置Keys字段,以便在需要的时候根据Keys来查询或者定位消息。
  • 属性:除了上面滴,RocketMQ的消息还可以包含一系列的属性信息,比如消息的发送时间、生产者信息等等。这些属性信息以键值对的形式存在,随着消息一起被存储和传输。

实现生产与消费消息

按之前的步骤搭建完成RocketMQ集群后...

首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生产消息

然后编写生产者生产消息的代码:

// 1.创建一个DefaultMQProducer实例,指定生产者组名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.启动生产者实例  
producer.start();   
// 4.使用for循环发送10条消息  
for (int i = 0; i < 10; i++) {  
    // 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.发送消息并接收发送结果  
    SendResult sendResult = producer.send(message);  
    // 打印发送结果,包括消息ID、发送状态等信息  
    System.out.println(sendResult);  
}  
// 6.发送完所有消息后,关闭生产者实例,释放资源  
producer.shutdown();

生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。

这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!

消费消息

// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.当收到消息时,方法会被调用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我们打印msg.getBody()的内容,为了保留消息原样  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消费状态,这里表示消息已成功消费  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.启动消费者实例  
consumer.start();  
// 8.打印日志消费者已启动  
System.out.println("消费者已启动");

这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

以上就是使用Java代码实现RocketMQ的生产与消费消息的详细内容,更多关于Java实现RocketMQ生产与消费的资料请关注脚本之家其它相关文章!

相关文章

  • spring rocketmq集成方案

    spring rocketmq集成方案

    本文详细介绍了如何在Spring Boot项目中集成RocketMQ 5.x,包括前置条件、核心依赖、配置、生产者和消费者实现、测试验证以及关键注意事项,感兴趣的朋友跟随小编一起看看吧
    2026-03-03
  • 彻底理解Java 中的ThreadLocal

    彻底理解Java 中的ThreadLocal

    这篇文章主要介绍了彻底理解Java 中的ThreadLocal的相关资料,需要的朋友可以参考下
    2017-07-07
  • spring boot只需两步优雅整合activiti示例解析

    spring boot只需两步优雅整合activiti示例解析

    这篇文章主要主要来教大家spring boot优雅整合activiti只需两步就可完成测操作示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助祝大家多多进步
    2022-03-03
  • SpringBoot轻松实现ip解析(含源码)

    SpringBoot轻松实现ip解析(含源码)

    IP地址一般以数字形式表示,如192.168.0.1,IP解析是将这个数字IP转换为包含地区、城市、运营商等信息的字符串形式,如“广东省深圳市 电信”,这样更方便人理解和使用,本文给大家介绍了SpringBoot如何轻松实现ip解析,需要的朋友可以参考下
    2023-10-10
  • 使用Java实现将ppt转换为文本

    使用Java实现将ppt转换为文本

    这篇文章主要为大家详细介绍了如何使用Java实现将ppt转换为文本,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以参考下
    2024-01-01
  • SpringBoot 开发提速神器 Lombok+MybatisPlus+SwaggerUI

    SpringBoot 开发提速神器 Lombok+MybatisPlus+SwaggerUI

    这篇文章主要介绍了SpringBoot 开发提速神器 Lombok+MybatisPlus+SwaggerUI,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • Java中的CyclicBarrier同步屏障详解

    Java中的CyclicBarrier同步屏障详解

    这篇文章主要介绍了Java中的CyclicBarrier同步屏障详解,CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,屏障才会开门,所有被阻塞的线程才会继续执行,需要的朋友可以参考下
    2023-09-09
  • Mybatis 中的<![CDATA[ ]]>浅析

    Mybatis 中的<![CDATA[ ]]>浅析

    本文给大家解析使用<![CDATA[ ]]>解决xml文件不被转义的问题, 对mybatis 中的<![CDATA[ ]]>相关知识感兴趣的朋友一起看看吧
    2017-09-09
  • Java实用小技能之快速创建List常用几种方式

    Java实用小技能之快速创建List常用几种方式

    java集合可以说无论是面试、刷题还是工作中都是非常常用的,下面这篇文章主要给大家介绍了关于Java实用小技能之快速创建List常用的几种方式,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-12-12
  • Java程序员的10道常见的XML面试问答题(XML术语详解)

    Java程序员的10道常见的XML面试问答题(XML术语详解)

    包括web开发人员的Java面试在内的各种面试中,XML面试题在各种编程工作的面试中很常见。XML是一种成熟的技术,经常作为从一个平台到其他平台传输数据的标准
    2014-04-04

最新评论