在SpringBoot中利用RocketMQ实现批量消息消费功能

 更新时间:2024年11月07日 11:53:31   作者:魔道不误砍柴功  
RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递,对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能,感兴趣的小伙伴跟着小编一起来看看吧

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

在 application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: batchConsumerGroup
  producer:
    group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class BatchProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessages() {
        List<Message<String>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();
            messages.add(message);
        }
        
        rocketMQTemplate.syncSend("BatchTopic", messages, 10000);
        System.out.println("批量消息发送成功!");
    }
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {

    @Override
    public void onMessage(List<String> messages) {
        System.out.println("批量接收到消息:");
        messages.forEach(message -> System.out.println("消息内容:" + message));
    }
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private BatchProducer batchProducer;

    @GetMapping("/sendBatchMessages")
    public String sendBatchMessages() {
        batchProducer.sendBatchMessages();
        return "批量消息已发送";
    }
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  • 使用 BatchProducer 类批量发送消息。
  • 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  • 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

到此这篇关于在SpringBoot中利用RocketMQ实现批量消息消费功能的文章就介绍到这了,更多相关SpringBoot RocketMQ消息消费内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot 和 Spring 到底有啥区别你知道吗

    Spring Boot 和 Spring 到底有啥区别你知道吗

    Spring Boot框架的核心就是自动配置,只要存在相应的jar包,Spring就帮我们自动配置。接下来通过本文给大家介绍Spring与Spring boot的区别介绍,非常不错,需要的朋友参考下吧
    2021-08-08
  • 电脑上安装多个JDK版本时该如何自由切换(详细图文)

    电脑上安装多个JDK版本时该如何自由切换(详细图文)

    我们在学习的过程中经常用到不同的jdk版本,那么如何在一台电脑上同时安装多个jdk版本并进行切换呢,这篇文章主要给大家介绍了关于电脑上安装多个JDK版本时该如何自由切换的相关资料,需要的朋友可以参考下
    2023-10-10
  • java中使用sax解析xml的解决方法

    java中使用sax解析xml的解决方法

    本篇文章介绍了,在java中使用sax解析xml的解决方法。需要的朋友参考下
    2013-05-05
  • Java中instanceof的基本语法与用法详解

    Java中instanceof的基本语法与用法详解

    这篇文章主要介绍了Java中instanceof的基本语法与用法的相关资料,instanceof是Java中用于检查对象是否是某个类或接口的实例的二元运算符,需要的朋友可以参考下
    2025-03-03
  • JAVA通过Filter实现允许服务跨域请求的方法

    JAVA通过Filter实现允许服务跨域请求的方法

    这里的域指的是这样的一个概念:我们认为若协议 + 域名 + 端口号均相同,那么就是同域即我们常说的浏览器请求的同源策略。这篇文章主要介绍了JAVA通过Filter实现允许服务跨域请求,需要的朋友可以参考下
    2018-11-11
  • vue vxe-table 实现财务记账凭证的方案

    vue vxe-table 实现财务记账凭证的方案

    使用 vxe-table 实现财务记账凭证非常简单,实现在线实时编辑的记账凭证、自动合计金额等,这篇文章主要介绍了vue vxe-table 实现财务记账凭证的方案,需要的朋友可以参考下
    2024-12-12
  • SpringBoot统一功能处理实现的全过程

    SpringBoot统一功能处理实现的全过程

    最近在做项目时需要对异常进行全局统一处理,主要是一些分类入库以及记录日志等,下面这篇文章主要给大家介绍了关于SpringBoot统一功能处理实现的相关资料,文中通过图文以及实例代码介绍的非常详细,需要的朋友可以参考下
    2023-01-01
  • Mybatis拦截器实现自定义需求

    Mybatis拦截器实现自定义需求

    本文主要介绍了Mybatis拦截器实现自定义需求,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • Spring中的@Cacheable缓存注解详解

    Spring中的@Cacheable缓存注解详解

    这篇文章主要介绍了Spring中的@Cacheable缓存注解详解,数据库查找的流程是先要从磁盘拿到数据,再刷新到内存,再返回数据。磁盘相比于内存来说,速度是很慢的,为了提升性能,就出现了基于内存的缓存,需要的朋友可以参考下
    2023-05-05
  • java反编译工具jd-gui-osx for mac M1芯片无法使用的问题及解决

    java反编译工具jd-gui-osx for mac M1芯片无法使用的问题及解决

    这篇文章主要介绍了java反编译工具jd-gui-osx for mac M1芯片无法使用的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01

最新评论