使用Java代码实现RocketMQ的生产与消费消息

 更新时间:2024年07月31日 08:43:23   作者:小威要向诸佬学习呀  
这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点,并通过代码示例介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

RocketMQ其他组件

在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。

监听器(Listener)

定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)

偏移量(Offset)

定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。

偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。

简而言之就是它能告诉消费者已经消费到哪一条消息!!

  • 集群模式:在集群消费模式下,消息队列的消费进度保存在Broker端。消费者每次消费完消息后,会将最新的消费进度同步到Broker,以便在消费者重启或者故障转移的时候能够从上一次消费的位置继续消费。
  • 广播模式:在广播消费模式下,消息队列的消费进度保存在消费者本地。因为广播模式下每条消息都会被所有消费者消费,所以不需要在Broker端保存消费进度。

所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)

命名服务器(NameServer)

定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。

它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息,从而进行消息的投递和消费。

消息组成

  • Topic:消息主题,对不同的业务消息进行分类。
  • Tag:消息标签,进一步区分某个Topic下的消息分类。使用Tag可以实现对Topic中的消息进行过滤。消费者可以根据Tag来订阅自己感兴趣的消息,而不是接收Topic下的所有消息。
  • Message Body:消息体,消息的实际内容。
  • Keys:消息的键值,标识消息的唯一性。在RocketMQ中,每个消息都可以设置Keys字段,以便在需要的时候根据Keys来查询或者定位消息。
  • 属性:除了上面滴,RocketMQ的消息还可以包含一系列的属性信息,比如消息的发送时间、生产者信息等等。这些属性信息以键值对的形式存在,随着消息一起被存储和传输。

实现生产与消费消息

按之前的步骤搭建完成RocketMQ集群后...

首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生产消息

然后编写生产者生产消息的代码:

// 1.创建一个DefaultMQProducer实例,指定生产者组名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.启动生产者实例  
producer.start();   
// 4.使用for循环发送10条消息  
for (int i = 0; i < 10; i++) {  
    // 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.发送消息并接收发送结果  
    SendResult sendResult = producer.send(message);  
    // 打印发送结果,包括消息ID、发送状态等信息  
    System.out.println(sendResult);  
}  
// 6.发送完所有消息后,关闭生产者实例,释放资源  
producer.shutdown();

生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。

这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!

消费消息

// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.当收到消息时,方法会被调用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我们打印msg.getBody()的内容,为了保留消息原样  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消费状态,这里表示消息已成功消费  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.启动消费者实例  
consumer.start();  
// 8.打印日志消费者已启动  
System.out.println("消费者已启动");

这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

以上就是使用Java代码实现RocketMQ的生产与消费消息的详细内容,更多关于Java实现RocketMQ生产与消费的资料请关注脚本之家其它相关文章!

相关文章

  • Spring Cloud Nacos 和 Eureka区别解析

    Spring Cloud Nacos 和 Eureka区别解析

    Spring Cloud Nacos 和 Spring Cloud Eureka 都是 Spring Cloud 微服务框架中的服务注册和发现组件,用于帮助开发者轻松地构建和管理微服务应用,这篇文章主要介绍了Spring Cloud Nacos 和 Eureka区别,需要的朋友可以参考下
    2023-08-08
  • 全面解析java中的hashtable

    全面解析java中的hashtable

    以下是对java中的hashtable进行了详细的分析介绍。需要的朋友可以过来参考下
    2013-08-08
  • 如何更好的使用Java8中方法引用详解

    如何更好的使用Java8中方法引用详解

    在Java8中,我们可以直接通过方法引用来简写lambda表达式中已经存在的方法,这种特性就叫做方法引用(Method Reference)。下面这篇文章主要给大家介绍了关于如何更好的使用Java8中方法引用的相关资料,需要的朋友可以参考下。
    2017-09-09
  • spring整合JMS实现同步收发消息(基于ActiveMQ的实现)

    spring整合JMS实现同步收发消息(基于ActiveMQ的实现)

    本篇文章主要介绍了spring整合JMS实现同步收发消息(基于ActiveMQ的实现),具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10
  • springboot如何解决跨域后session获取不到sessionId不一致

    springboot如何解决跨域后session获取不到sessionId不一致

    这篇文章主要介绍了springboot如何解决跨域后session获取不到sessionId不一致问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • SpringBoot开发技巧之如何处理跨域请求CORS

    SpringBoot开发技巧之如何处理跨域请求CORS

    CORS(Cross-Origin Resource Sharing)"跨域资源共享",是一个W3C标准,它允许浏览器向跨域服务器发送Ajax请求,打破了Ajax只能访问本站内的资源限制
    2021-10-10
  • SpringBoot中对应2.0.x版本的Redis配置详解

    SpringBoot中对应2.0.x版本的Redis配置详解

    这篇文章主要为大家介绍了SpringBoot中对应2.0.x版本的Redis配置详解,文中的实现步骤讲解详细,感兴趣的小伙伴们可以了解一下
    2022-06-06
  • 解决mybatis使用char类型字段查询oracle数据库时结果返回null问题

    解决mybatis使用char类型字段查询oracle数据库时结果返回null问题

    这篇文章主要介绍了mybatis使用char类型字段查询oracle数据库时结果返回null问题的解决方法,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-06-06
  • Java并发 线程间的等待与通知

    Java并发 线程间的等待与通知

    这篇文章主要介绍了Java并发 线程间的等待与通知,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Spring Boot 整合 Druid 并开启监控的操作方法

    Spring Boot 整合 Druid 并开启监控的操作方法

    本文介绍了如何在SpringBoot项目中引入和配置Druid数据库连接池,并开启其监控功能,通过添加依赖、配置数据源、开启监控、自定义配置以及访问监控页面,开发者可以有效提高数据库访问效率并监控连接池状态,感兴趣的朋友跟随小编一起看看吧
    2025-01-01

最新评论