SpringBoot实现发布/订阅广播消息的示例代码

 更新时间:2026年05月28日 08:42:16   作者:希望永不加班  
在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式,本文给大家介绍了SpringBoot实现发布/订阅广播消息的示例代码,需要的朋友可以参考下

引言

在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式。

我们日常使用的普通点对点队列,一条消息只会被一个消费者消费(竞争消费);而广播模式可以实现一条消息、多服务、多消费者同时接收,完美实现一对多通知。

像 缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播 等场景,全部依赖 Fanout 广播模式。

一、什么是 MQ 广播(发布/订阅)模式?

1. 核心定义

广播模式基于 FanoutExchange(扇形交换机) 实现,核心逻辑:

生产者发送一条消息到 Fanout 交换机,所有绑定该交换机的队列,都会完整收到这条消息。

不管路由键是什么、不管队列名称,只要完成绑定,就会无条件广播投递。

2. 核心特性

  • 无视routingKey,路由键传空、传任意值都不生效
  • 纯广播、全量投递、一对多分发
  • 每条消息独立进入每一个绑定队列
  • 天然支持多服务、多节点同步通知
  • 无匹配规则,绑定即接收

3. 适用业务场景

  • 分布式缓存全局刷新(多节点统一清空缓存)
  • 系统配置动态推送、热更新
  • 全站公告、全局消息推送
  • 微服务多节点日志采集、链路追踪
  • 服务上下线、状态同步广播
  • 多端消息同步(PC/APP/小程序)

二、四大交换机模式核心对比

交换机类型匹配规则消费模式核心场景
Direct(直连)完全匹配 routingKey点对点竞争消费订单、支付、任务处理
Topic(主题)通配符模糊匹配选择性多消费日志分级、消息订阅
Fanout(广播)无视路由键,全部投递全员订阅消费缓存刷新、全局通知
Headers匹配消息头参数自定义匹配极少使用

三、关键认知误区

1:同一个队列多消费者可以实现广播

绝对错误!

同一个队列下的多个消费者,默认是竞争消费,一条消息只会被一个消费者消费。

广播必备条件:每个消费者对应一个独立队列,全部绑定同一个 Fanout 交换机。

2:Fanout 交换机需要配置路由键

Fanout 交换机底层逻辑直接忽略 routingKey,无论发送时传什么值,都不会影响广播效果。

3:广播消息天然可靠、不会丢失

默认非持久化、自动ACK 场景下,广播消息极易丢失,生产必须做持久化+手动ACK

四、SpringBoot 完整实现

1. 基础依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 生产级配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 手动ACK 保证广播消息不丢
        acknowledge-mode: manual
        # 限制预取数,防止单节点消息堆积
        prefetch: 5
        # 开启消费重试
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

3. 广播交换机、队列、绑定配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutBroadcastConfig {
    // 广播交换机名称
    public static final String FANOUT_EXCHANGE = "system.fanout.broadcast.exchange";
    // 三个独立消费者队列
    public static final String QUEUE_CACHE_REFRESH = "queue.cache.refresh";
    public static final String QUEUE_NOTICE = "queue.system.notice";
    public static final String QUEUE_LOG = "queue.log.collect";
    // 声明 Fanout 广播交换机:持久化、不自动删除
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }
    // 队列1:缓存刷新队列
    @Bean
    public Queue cacheRefreshQueue() {
        return new Queue(QUEUE_CACHE_REFRESH, true);
    }
    // 队列2:系统通知队列
    @Bean
    public Queue noticeQueue() {
        return new Queue(QUEUE_NOTICE, true);
    }
    // 队列3:日志采集队列
    @Bean
    public Queue logQueue() {
        return new Queue(QUEUE_LOG, true);
    }
    // 全部绑定到广播交换机
    @Bean
    public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(noticeQueue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logQueue).to(fanoutExchange);
    }
}

4. 广播消息生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class BroadcastProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send/broadcast")
    public String sendBroadcastMsg(@RequestParam String content) {
        // Fanout广播:路由键传空字符串
        rabbitTemplate.convertAndSend(
                FanoutBroadcastConfig.FANOUT_EXCHANGE,
                "",
                content
        );
        return "✅ 广播消息发送成功:" + content;
    }
}

5. 多消费者实现(全员接收)

消费者1:缓存刷新消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class CacheRefreshConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_CACHE_REFRESH)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【缓存服务】接收广播消息:" + msg);
            // 执行缓存刷新业务逻辑
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消费失败,重回队列重试
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

消费者2:系统通知消费者

@Component
public class SystemNoticeConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_NOTICE)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【通知服务】接收广播消息:" + msg);
            // 执行消息推送业务
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

消费者3:日志采集消费者

@Component
public class LogCollectConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_LOG)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【日志服务】接收广播消息:" + msg);
            // 执行日志采集业务
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

五、测试效果

访问接口:

http://localhost:8080/send/broadcast?content=全局缓存刷新通知

控制台输出:

【缓存服务】接收广播消息:全局缓存刷新通知
【通知服务】接收广播消息:全局缓存刷新通知
【日志服务】接收广播消息:全局缓存刷新通知

一条消息,多服务同时消费,广播生效!

六、总结

1. 必须开启持久化

交换机、队列全部设置持久化,防止重启丢失广播配置。

2. 强制手动ACK

广播场景多为重要通知、缓存同步,自动ACK会导致业务未执行完成消息丢失。

3. 每个服务独立队列

不同微服务必须使用独立队列,避免竞争消费,保证广播全覆盖。

4. 广播消息建议做幂等

MQ 重试、网络抖动会导致广播消息重复推送,核心业务必须基于消息ID做幂等防重。

5. 禁止设置复杂路由键

Fanout 无视路由键,统一传空字符串,保持代码规范。

写在最后

广播发布订阅模式是微服务分布式通信的重要基石,区别于传统的点对点任务消费,它主打全局通知、多节点同步、状态广播,是缓存刷新、配置热更新、系统公告等场景的最优解。

很多开发者一直混淆“竞争消费”和“广播消费”的本质,导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范,能帮你彻底解决分布式多节点同步难题。

以上就是SpringBoot实现发布/订阅广播消息的示例代码的详细内容,更多关于SpringBoot发布/订阅广播消息的资料请关注脚本之家其它相关文章!

相关文章

  • SpringCloud使用Feign实现远程调用流程详细介绍

    SpringCloud使用Feign实现远程调用流程详细介绍

    OpenFeign源于Netflix的Feign,是http通信的客户端。屏蔽了网络通信的细节,直接面向接口的方式开发,让开发者感知不到网络通信细节。所有远程调用,都像调用本地方法一样完成
    2023-02-02
  • Springcloud sentinel安装和使用方法解析

    Springcloud sentinel安装和使用方法解析

    这篇文章主要介绍了Springcloud sentinel安装和使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12
  • Mybatis中的缓存机制解析

    Mybatis中的缓存机制解析

    这篇文章给大家介绍mybatis中的缓存机制,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2025-11-11
  • MyBatisPlus的IService接口实现

    MyBatisPlus的IService接口实现

    MyBatisPlus是一个为MyBatis提供增强的工具,它通过IService接口简化了数据库的CRUD操作,IService接口封装了一系列常用的数据操作方法,本文就来介绍一下,感兴趣的可以了解一下
    2024-10-10
  • Spring核心IoC容器的依赖注入接口和层级包命名规范

    Spring核心IoC容器的依赖注入接口和层级包命名规范

    这篇文章主要介绍了Spring核心IoC容器的依赖注入接口和层级包命名规范,IOC又名控制反转,把对象创建和对象之间的调用过程,交给Spring进行管理,目的是为了降低耦合度,需要的朋友可以参考下
    2023-05-05
  • java实现中缀表达式转后缀的方法

    java实现中缀表达式转后缀的方法

    这篇文章主要为大家详细介绍了java实现中缀表达式转后缀的表达式方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-11-11
  • IDEA中关于enter键换行的问题

    IDEA中关于enter键换行的问题

    这篇文章主要介绍了IDEA中关于enter键换行的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • Spring Boot前后端分离开发模式中的跨域问题及解决方法

    Spring Boot前后端分离开发模式中的跨域问题及解决方法

    本文介绍了解决Spring Boot前端Vue跨域问题的实战经验,并提供了后端和前端的配置示例,通过配置后端和前端,我们可以轻松解决跨域问题,实现正常的前后端交互,需要的朋友可以参考下
    2023-09-09
  • MyBatis集成Spring流程详解

    MyBatis集成Spring流程详解

    在实际开发中不仅仅是要展示数据,还要构成数据模型添加数据,这篇文章主要介绍了SpringBoot集成Mybatis操作数据库,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • java实现的日期时间转换工具类完整示例

    java实现的日期时间转换工具类完整示例

    这篇文章主要介绍了java实现的日期时间转换工具类,结合完整实例形式分析了java针对日期时间常见的转换、计算、格式化等相关操作与封装技巧,需要的朋友可以参考下
    2019-10-10

最新评论