SpringCloud Stream RabbitMQ动态路由Key问题

 更新时间:2026年05月06日 09:17:26   作者:这个饕字怎么读  
文章主要描述了使用消息中间件实现不同操作后发送不同邮件的功能,通过设置路由key和group,使得生产者能够根据邮件类型发送消息至对应队列,消费者则通过路由key筛选消息

前言

这里有个业务是这样的,我需要在不同的操作后给用户发送不同的邮件,由于比较耗时所以引入消息中间件,不同的邮件对应的消息类型是不一样的,所以需要生产者往队列里发送数据时绑定好路由key,例如:

图里表示交换机根据路由key绑定了不同的队列。

要达到这种效果,首先消费者肯定是可以根据路由key来决定消息是不是发送给自己的,对于生产者则需要用到routingKeyExpression 来决定往哪个路由key发送数据(大概是这个意思)。

然后就是stream中的group其实对应到rabbitMQ中就是队列的概念,所以我们这里设置两个不同的group来对应到不同的队列,区分开业务;

例子

这里我定义了两个服务对应消费者和生产者。

生产者

spring:
  application:
    name: producer
  cloud:
    stream:
      binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
        etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
          type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
          environment: # MQ环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxxx
      bindings: # 关联整合通道和binder对象
        output: # output是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json
          binder: etpmsRabbitMQ # 关联MQ服务
      rabbit:
        bindings:
          output:
            producer:
              # 生产者配置RabbitMq的动态路由键
              routingKeyExpression: headers.type
package top.chenyt.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

/**
 * @author yantao.chen
 */
@Service
public class ProviderService {
    /**
     * 将MessageChannel的封装对象Source注⼊到这⾥使⽤
     */
    @Autowired
    private Source source;

    public void sendMessage(String content, String type) {
        // 向mq中发送消息(并不是直接操作mq,应该操作的是spring cloud stream)
        // 使⽤通道向外发出消息(指的是Source⾥⾯的output通道)
        source.output().send(MessageBuilder.withPayload(content).setHeader("type",type).build());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Source.class})
public class ProducerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

主要是yml配置添加:routingKeyExpression: headers.type

发送消息的时候setHeader一下

消费者

spring:
  application:
    name: consumer
  cloud:
    stream:
      binders: # 绑定MQ服务信息(此处我们是RabbitMQ)
        etpmsRabbitMQ: # 给Binder定义的名称,⽤于后⾯的关联
          type: rabbit # MQ类型,如果是Kafka的话,此处配置kafka
          environment: # MQ环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxx
      bindings: # 关联整合通道和binder对象
        input: # input是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
          binder: etpmsRabbitMQ # 关联MQ服务
          group: register
        my-input:
          destination: testExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json,自动将对象转为json
          binder: etpmsRabbitMQ # 关联MQ服务
          group: task
      rabbit:
        bindings:
          my-input:
            consumer:
              bindingRoutingKey: task
          input:
            consumer:
              bindingRoutingKey: register

这里我就定义了两个通道,一个是默认的input,一个是自定的

package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    String MY_INPUT = "my-input";

    @Input(MY_INPUT)
    SubscribableChannel myinput();

}

package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@Service
public class ConsumerMsg {

    @StreamListener(Sink.INPUT)
    public void receiveMessages(Message<String> message) {
        System.out.println("========= input接收到的消息:" + message.getPayload());
    }

    @StreamListener(MySink.MY_INPUT)
    public void receiveMessages02(Message<String> message) {
        System.out.println("========= myinput接收到的消息:" + message.getPayload());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import top.chenyt.consumer.MySink;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Sink.class, MySink.class})
public class ConsumerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

这样就能实现根据不同的消息类型对应到不同的队列且不同的路由key去了

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java将html转成图片代码实例(html2image)

    java将html转成图片代码实例(html2image)

    这篇文章主要介绍了java将html转成图片的相关资料,在Java开发中,将HTML转换为图片可以使用html2image库,文中通过代码及图文介绍的非常详细,需要的朋友可以参考下
    2024-09-09
  • idea复制module(项目)并在一个窗口展示的教程详解

    idea复制module(项目)并在一个窗口展示的教程详解

    这篇文章主要介绍了idea复制module(项目)并在一个窗口展示的方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • SpringBoot Bean实例化流程解析

    SpringBoot Bean实例化流程解析

    在SpringBoot启动过程中会执行refreshContext()方法,而在其执行过程中,又会调用finishBeanFactoryInitialization()方法,该方法负责了Bean的实例化,那么本文将从源码跟读的角度来解析一下具体流程
    2023-08-08
  • SpringCloud Finchley Gateway 缓存请求Body和Form表单的实现

    SpringCloud Finchley Gateway 缓存请求Body和Form表单的实现

    在接入Spring-Cloud-Gateway时,可能有需求进行缓存Json-Body数据或者Form-Urlencoded数据的情况。这篇文章主要介绍了SpringCloud Finchley Gateway 缓存请求Body和Form表单的实现,感兴趣的小伙伴们可以参考一下
    2019-01-01
  • IDEA设置JVM运行参数的方法步骤

    IDEA设置JVM运行参数的方法步骤

    这篇文章主要介绍了IDEA设置JVM运行参数的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • Java 基于UDP协议实现消息发送

    Java 基于UDP协议实现消息发送

    这篇文章主要介绍了Java 基于UDP协议实现消息发送,帮助大家更好的理解和学习Java网络编程,感兴趣的朋友可以了解下
    2020-11-11
  • Spring无法解决循环依赖的五种场景分析

    Spring无法解决循环依赖的五种场景分析

    本文详细分析Spring框架中五类循环依赖问题(构造器注入、原型作用域、@Async、配置类、BeanPostProcessor),提出应急方案如@Lazy、重构设计,并强调通过单一职责、依赖倒置等设计原则避免循环依赖,需要的朋友可以参考下
    2025-05-05
  • 解决使用json-lib包实现xml转json时空值被转为空中括号的问题

    解决使用json-lib包实现xml转json时空值被转为空中括号的问题

    网上能查到的xml转json的jar包大部分是net.sf.json-lib,但是JSON json =xmlSerializer.read(xml); 方法会出现将空值转化为[]的问题,下面为大家提供两种解决方法
    2018-03-03
  • SpringBoot自动配置原理分析

    SpringBoot自动配置原理分析

    这篇文章主要介绍了SpringBoot自动配置原理分析,SpringBoot是我们经常使用的框架,那么你能不能针对SpringBoot实现自动配置做一个详细的介绍。如果可以的话,能不能画一下实现自动配置的流程图。牵扯到哪些关键类,以及哪些关键点
    2022-08-08
  • Java受检异常的一些思考

    Java受检异常的一些思考

    受检异常是否真的有必要?这是一个争论了很久的问题,至今仍然没有一个确定的答案。Java的受检异常,被很多人吐槽,也被很多人喜爱,当然他们都可以拿出很多的理由来证明自己的观点。
    2020-12-12

最新评论