SpringCloud Stream RabbitMQ动态路由Key问题

 更新时间:2026年05月06日 09:17:26   作者:这个饕字怎么读  
文章主要描述了使用消息中间件实现不同操作后发送不同邮件的功能,通过设置路由key和group,使得生产者能够根据邮件类型发送消息至对应队列,消费者则通过路由key筛选消息

前言

这里有个业务是这样的,我需要在不同的操作后给用户发送不同的邮件,由于比较耗时所以引入消息中间件,不同的邮件对应的消息类型是不一样的,所以需要生产者往队列里发送数据时绑定好路由key,例如:

图里表示交换机根据路由key绑定了不同的队列。

要达到这种效果,首先消费者肯定是可以根据路由key来决定消息是不是发送给自己的,对于生产者则需要用到routingKeyExpression 来决定往哪个路由key发送数据(大概是这个意思)。

然后就是stream中的group其实对应到rabbitMQ中就是队列的概念,所以我们这里设置两个不同的group来对应到不同的队列,区分开业务;

例子

这里我定义了两个服务对应消费者和生产者。

生产者

spring:
  application:
    name: producer
  cloud:
    stream:
      binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
        etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
          type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
          environment: # MQ环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxxx
      bindings: # 关联整合通道和binder对象
        output: # output是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json
          binder: etpmsRabbitMQ # 关联MQ服务
      rabbit:
        bindings:
          output:
            producer:
              # 生产者配置RabbitMq的动态路由键
              routingKeyExpression: headers.type
package top.chenyt.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

/**
 * @author yantao.chen
 */
@Service
public class ProviderService {
    /**
     * 将MessageChannel的封装对象Source注⼊到这⾥使⽤
     */
    @Autowired
    private Source source;

    public void sendMessage(String content, String type) {
        // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
        // 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
        source.output().send(MessageBuilder.withPayload(content).setHeader("type",type).build());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Source.class})
public class ProducerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

主要是yml配置添加:routingKeyExpression: headers.type

发送消息的时候setHeader一下

消费者

spring:
  application:
    name: consumer
  cloud:
    stream:
      binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
        etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
          type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
          environment: # MQ环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxx
      bindings: # 关联整合通道和binder对象
        input: # input是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
          binder: etpmsRabbitMQ # 关联MQ服务
          group: register
        my-input:
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
          binder: etpmsRabbitMQ # 关联MQ服务
          group: task
      rabbit:
        bindings:
          my-input:
            consumer:
              bindingRoutingKey: task
          input:
            consumer:
              bindingRoutingKey: register

这里我就定义了两个通道,一个是默认的input,一个是自定的

package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    String MY_INPUT = "my-input";

    @Input(MY_INPUT)
    SubscribableChannel myinput();

}

package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@Service
public class ConsumerMsg {

    @StreamListener(Sink.INPUT)
    public void receiveMessages(Message<String> message) {
        System.out.println("========= input接收到的消息:" + message.getPayload());
    }

    @StreamListener(MySink.MY_INPUT)
    public void receiveMessages02(Message<String> message) {
        System.out.println("========= myinput接收到的消息:" + message.getPayload());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import top.chenyt.consumer.MySink;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Sink.class, MySink.class})
public class ConsumerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

这样就能实现根据不同的消息类型对应到不同的队列且不同的路由key去了

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java中创建写入文件的6种方式详解与源码实例

    java中创建写入文件的6种方式详解与源码实例

    这篇文章主要介绍了java中创建写入文件的6种方式详解与源码实例,Files.newBufferedWriter(Java 8),Files.write(Java 7 推荐),PrintWriter,File.createNewFile,FileOutputStream.write(byte[] b) 管道流,需要的朋友可以参考下
    2022-12-12
  • JAVA8发送带有Body的HTTP GET请求

    JAVA8发送带有Body的HTTP GET请求

    本文主要介绍了JAVA8发送带有Body的HTTP GET请求,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • java使用BeanUtils.copyProperties方法对象复制同名字段类型不同赋值为空问题解决方案

    java使用BeanUtils.copyProperties方法对象复制同名字段类型不同赋值为空问题解决方案

    这篇文章主要给大家介绍了关于java使用BeanUtils.copyProperties方法对象复制同名字段类型不同赋值为空问题的解决方案,文中通过代码介绍的非常详细,对大家的学习或者工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-11-11
  • k8s部署java项目的实现

    k8s部署java项目的实现

    本文主要介绍了k8s部署java项目的实现,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • Java Optional用法面试题精讲

    Java Optional用法面试题精讲

    这篇文章主要为大家介绍了Java Optional用法面试题精讲,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-09-09
  • 浅谈Timer和TimerTask与线程的关系

    浅谈Timer和TimerTask与线程的关系

    下面小编就为大家带来一篇浅谈Timer和TimerTask与线程的关系。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-03-03
  • MyBatis-Plus 中 的动态SQL 片段(sqlSegment)详解

    MyBatis-Plus 中 的动态SQL 片段(sqlSegment)详解

    MyBatis-Plus的sqlSegment通过Wrapper动态生成SQL片段,支持XML中${ew.customSqlSegment}引用,结合Lambda表达式避免硬编码,适用于动态查询、逻辑删除等场景,提升代码可维护性与灵活性,本文给大家介绍MyBatis-Plus中的动态SQL片段(sqlSegment)讲解,感兴趣的朋友一起看看吧
    2025-06-06
  • Spark SQL配置及使用教程

    Spark SQL配置及使用教程

    SparkSQL是spark的一个模块,主入口是SparkSession,将SQL查询与Spark程序无缝混合,这篇文章主要介绍了Spark SQL配置及使用,需要的朋友可以参考下
    2021-12-12
  • java实现斗地主游戏

    java实现斗地主游戏

    这篇文章主要为大家详细介绍了java实现斗地主游戏,洗牌、发牌、看牌,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01
  • Json读写本地文件实现代码

    Json读写本地文件实现代码

    今天没事研究了下Gson,写了个工具类,需要的朋友可以参考下
    2014-03-03

最新评论