Springboot 整合 RocketMQ 收发消息的配置过程

 更新时间:2021年12月29日 11:53:14   作者:liqiangbk  
这篇文章主要介绍了Springboot 整合 RocketMQ 收发消息,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

yml 配置

application.yml

rocketmq:
  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:
  producer:
    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:
  producer:
    group: producer-demo2

测试

demo 1

  • 发送普通消息
  • 发送 Spring 的通用 Message 对象
  • 发送异步消息
  • 发送顺序消息

生产者

package cn.tedu.demo2.m1;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
    @Autowired
    private RocketMQTemplate t ;
    public  void send(){
        //发送同步消息
        t.convertAndSend("Topic1:TagA", "Hello world! ");
        //发送spring的Message
        Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
        t.send("Topic1:TagA",message);
        //发送异步消息
        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败");
            }
        });
        //发送顺序消息
        t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
    }
}

消费者

package cn.tedu.demo2.m1;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
public class Consumer  implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("收到"+s);
    }
}

主类

package cn.tedu.demo2.m1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

测试类

需要放在 test 文件夹

激活 demo1 profile @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo1")
public class Test1 {
    @Autowired
    private  Producer producer;
    @Test
    public void test1(){
        producer.send();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component

public class Producer {

    @Autowired
    private RocketMQTemplate t;

    public void send(){
        Message<String> message = MessageBuilder.withPayload("Hello world").build();
        //一旦发送消息,则执行监听器
        t.sendMessageInTransaction("Topic2",message,null);
    }
    @RocketMQTransactionListener
    class Lis implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            System.out.println("执行本地事务");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            System.out.println("执行事务回查");
            return RocketMQLocalTransactionState.COMMIT;
        }
    }

}

消费者

package cn.tedu.demo2.m2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("收到"+s);
    }
}

主类

package cn.tedu.demo2.m2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

测试类

package cn.tedu.demo2.m2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("demo2")
public class Test2 {
    @Autowired
    private  Producer producer;
    @Test
    public void  test1(){
        producer.send();
        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

到此这篇关于Springboot 整合 RocketMQ 收发消息的文章就介绍到这了,更多相关Springboot 整合 RocketMQ 收发消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈StringBuilder类的capacity()方法和length()方法的一些小坑

    浅谈StringBuilder类的capacity()方法和length()方法的一些小坑

    这篇文章主要介绍了StringBuilder类的capacity()方法和length()方法的一些小坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Java中遍历Map集合的5种方式总结

    Java中遍历Map集合的5种方式总结

    这篇文章主要给大家介绍了关于Java中遍历Map集合的5种方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • Redis 集成Spring的示例代码(spring-data-redis)

    Redis 集成Spring的示例代码(spring-data-redis)

    本篇文章主要介绍了Redis 集成Spring的示例代码(spring-data-redis) ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-09-09
  • maven多模块依赖版本不一致问题解决

    maven多模块依赖版本不一致问题解决

    本文主要介绍了maven多模块依赖版本不一致问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • IntelliJ IDEA 的使用界面图文教程

    IntelliJ IDEA 的使用界面图文教程

    这篇文章主要介绍了IntelliJ IDEA 的使用界面图文教程,需要的朋友可以参考下
    2018-10-10
  • Jmeter安装及配置教程详解

    Jmeter安装及配置教程详解

    很多朋友私信小编Jmeter安装及配置教程能出一期教程吗?正巧赶上疫情,不是太忙,下面小编把Jmeter安装及配置教程分享到脚本之家平台,感兴趣的朋友跟随小编一起看看吧
    2021-12-12
  • shrio中hashedCredentialsMatcher密码匹配示例详解

    shrio中hashedCredentialsMatcher密码匹配示例详解

    shrio是一个轻量级权限管理框架,密码的匹配由框架内部完成。密码是否匹配由接口CredentialsMatcher定义实现类完成,CredentialsMatcher实现类有SimpleCredentialsMatcher和HashedCredentialsMatcher两个
    2021-10-10
  • 一文带你搞懂Redis分布式锁

    一文带你搞懂Redis分布式锁

    本篇文章主要来介绍一下如何Redis实现分布式锁的演进过程,以及为什么不能直接用Setnx实现分布式锁,文中的示例代码讲解详细,需要的可以参考一下
    2022-09-09
  • Java使用JDBC连接Oracle_MSSQL实例代码

    Java使用JDBC连接Oracle_MSSQL实例代码

    这篇文章主要介绍了Java使用JDBC连接Oracle_MSSQL实例代码,需要的朋友可以参考下
    2014-01-01
  • Java创建数组的3种方式代码举例

    Java创建数组的3种方式代码举例

    数组是相同类型数据的有序集合,数组描述的是若干个相同类型的数据按照一定的先后次序排列组合而成,其中每一个数据称为数组的元素,可以通过下标进行访问,这篇文章主要给大家介绍了关于Java创建数组的3种方式,需要的朋友可以参考下
    2024-01-01

最新评论