SpringBoot集成RocketMQ的使用示例

 更新时间:2023年11月20日 09:26:57   作者:小月亮与六便士  
RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景,本文就来介绍一下SpringBoot集成RocketMQ的使用示例,感兴趣的可以了解一下

一、RocketMQ基本概念

消息模型(Message Model)

RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。

1、在springBoot项目中添加Maven依赖 

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>

2、添加配置:

application.yml 文件中添加如下配置:

rocketmq:
  name-server: 192.168.152.165:9876
  producer:
    group: my-group

SpringBoot 集成 RocketMQ代码:

生产者: 消息发送的三种方式

package com.rocketmq.springbootrocketmq;


import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.TimeUnit;


@RunWith(SpringRunner.class)
@SpringBootTest
public class T {


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //同步消息
    @Test
    public void testRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot发送同步消息").build();
        rocketMQTemplate.send("helloTopicBoot", msg);
        System.out.println("success send");
    }

    //异步消息
    @Test
    public void sendASYCMsg() throws InterruptedException {
        Message message = MessageBuilder.withPayload("boot发送异步消息").build();
        rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送状态:"+sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送失败");
            }
        });
        TimeUnit.SECONDS.sleep(5);
    }

    //一次性消息
    @Test
    public void sendOneWayRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();
        rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
    }

}

消费者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

消息消费的两种模式

集群模式:默认模式

广播模式:

消费者:messageModel = MessageModel.BROADCASTING

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING)
public class HelloTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

顺序消息

生产者:

    //顺序消息
    @Test
    public void sendOrderlyMsg(){
        //设置队列选择器
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
                String orderIdStr = (String) o;
                long orderId = Long.parseLong(orderIdStr);
                int index = (int)orderId % list.size();
                return list.get(index);
            }
        });

        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        for (OrderStep orderStep : orderSteps) {
            Message msg = MessageBuilder.withPayload(orderStep.toString()).build();
            rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId()));

        }
    }

消费者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("当前线程:" + Thread.currentThread() + "队列ID"+messageExt.getQueueId() + ",消息内容:" + new String(messageExt.getBody(),Charset.defaultCharset()));
    }
}

延迟消息

生产者:

    //延迟消息
    @Test
    public void sendDelayRocketMQ() {
        Message msg = MessageBuilder.withPayload("boot发送延时消息,发送时间:"+new Date()).build();
        rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3);
    }

消费者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot")
public class DelayTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

消息Tag条件过滤

生成者

    //Tag消息
    @Test
    public void sendTagFilterRocketMQ() {
        Message msg1 = MessageBuilder.withPayload("消息A").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1);
        Message msg2 = MessageBuilder.withPayload("消息B").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2);
        Message msg3 = MessageBuilder.withPayload("消息C").build();
        rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3);
    }

消费者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

SQL92消息过滤

生产者:

    //SQL92消息
    @Test
    public void sendSQL92FilterRocketMQ() {
        Message msg1 = MessageBuilder.withPayload("小红,年龄22,体重45").setHeader("age","22").setHeader("weight",45).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1);
        Message msg2 = MessageBuilder.withPayload("小明,年龄25,体重60").setHeader("age","25").setHeader("weight",60).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2);
        Message msg3 = MessageBuilder.withPayload("小蓝,年龄40,体重70").setHeader("age","40").setHeader("weight",70).build();
        rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3);
    }

消费者:

package com.example.springbooTRocketMQConsumer.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.Date;

@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60")
public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset()));
    }
}

 到此这篇关于SpringBoot集成RocketMQ的使用示例的文章就介绍到这了,更多相关SpringBoot集成RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • MyBatis自定义映射关系和关联查询实现方法详解

    MyBatis自定义映射关系和关联查询实现方法详解

    这篇文章主要介绍了MyBatis自定义映射关系和关联查询实现方法,当POJO属性名与数据库列名不一致时,需要自定义实体类和结果集的映射关系,在MyBatis注解开发中,使用@Results定义并使用自定义映射,使用 @ResultMap使用自定义映射
    2023-04-04
  • Spring Data JPA实现排序与分页查询超详细流程讲解

    Spring Data JPA实现排序与分页查询超详细流程讲解

    在介绍Spring Data JPA的时候,我们首先认识下Hibernate。Hibernate是数据访问解决技术的绝对霸主,使用O/R映射技术实现数据访问,O/R映射即将领域模型类和数据库的表进行映射,通过程序操作对象而实现表数据操作的能力,让数据访问操作无须关注数据库相关的技术
    2022-10-10
  • Java不是内部或者外部命令,也不是可运行的程序的解决办法

    Java不是内部或者外部命令,也不是可运行的程序的解决办法

    这篇文章主要介绍了Java不是内部或者外部命令,也不是可运行的程序的解决办法,出现这种情况一般来说是没有配置环境变量或者是没有配置好,文中通过图文将解决办法介绍的非常详细,需要的朋友可以参考下
    2025-11-11
  • 举例讲解Java中Piped管道输入输出流的线程通信控制

    举例讲解Java中Piped管道输入输出流的线程通信控制

    Java中的PipedWriter、PipedReader类管道的读写依赖于PipedOutputStream、PipedInputStream两个管道输入输出类,这里我们将来举例讲解Java中Piped管道输入输出流的线程通信控制:
    2016-06-06
  • Java 注解学习笔记

    Java 注解学习笔记

    一直都在使用注解,但是一直都没有用的很明白,后来被逼的发现不搞明白真的就没有办法愉快的写代码了,所以,这篇《Java中的注解学习笔记》就呼之欲出了
    2020-10-10
  • Java中基于maven实现zxing二维码功能

    Java中基于maven实现zxing二维码功能

    这篇文章主要介绍了Java中基于maven实现zxing二维码功能,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-02-02
  • Java面试重点中的重点之Elasticsearch核心原理

    Java面试重点中的重点之Elasticsearch核心原理

    ElasticSearch是一个基于Lucene的搜索引擎,是用Java语言开发的,能够达到实时搜索,稳定,可靠,快速,安装使用方便,作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎,是最受欢迎的企业搜索引擎
    2022-01-01
  • 学生视角手把手带你写Java 线程池初版

    学生视角手把手带你写Java 线程池初版

    作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门
    2022-03-03
  • 如何在Java中使用WebSocket协议

    如何在Java中使用WebSocket协议

    WebSocket是一种基于 TCP 协议的全双工通信协议,可以在浏览器和服务器之间建立实时、双向的数据通信,下面这篇文章主要给大家介绍了关于如何在Java中使用WebSocket协议的相关资料,需要的朋友可以参考下
    2024-02-02
  • 详解spring-cloud与netflixEureka整合(注册中心)

    详解spring-cloud与netflixEureka整合(注册中心)

    这篇文章主要介绍了详解spring-cloud与netflixEureka整合(注册中心),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-02-02

最新评论