SpringBoot实现发布/订阅广播消息的示例代码
引言
在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式。
我们日常使用的普通点对点队列,一条消息只会被一个消费者消费(竞争消费);而广播模式可以实现一条消息、多服务、多消费者同时接收,完美实现一对多通知。
像 缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播 等场景,全部依赖 Fanout 广播模式。
一、什么是 MQ 广播(发布/订阅)模式?
1. 核心定义
广播模式基于 FanoutExchange(扇形交换机) 实现,核心逻辑:
生产者发送一条消息到 Fanout 交换机,所有绑定该交换机的队列,都会完整收到这条消息。
不管路由键是什么、不管队列名称,只要完成绑定,就会无条件广播投递。
2. 核心特性
- 无视routingKey,路由键传空、传任意值都不生效
- 纯广播、全量投递、一对多分发
- 每条消息独立进入每一个绑定队列
- 天然支持多服务、多节点同步通知
- 无匹配规则,绑定即接收
3. 适用业务场景
- 分布式缓存全局刷新(多节点统一清空缓存)
- 系统配置动态推送、热更新
- 全站公告、全局消息推送
- 微服务多节点日志采集、链路追踪
- 服务上下线、状态同步广播
- 多端消息同步(PC/APP/小程序)
二、四大交换机模式核心对比
| 交换机类型 | 匹配规则 | 消费模式 | 核心场景 |
| Direct(直连) | 完全匹配 routingKey | 点对点竞争消费 | 订单、支付、任务处理 |
| Topic(主题) | 通配符模糊匹配 | 选择性多消费 | 日志分级、消息订阅 |
| Fanout(广播) | 无视路由键,全部投递 | 全员订阅消费 | 缓存刷新、全局通知 |
| Headers | 匹配消息头参数 | 自定义匹配 | 极少使用 |
三、关键认知误区
1:同一个队列多消费者可以实现广播
绝对错误!
同一个队列下的多个消费者,默认是竞争消费,一条消息只会被一个消费者消费。
广播必备条件:每个消费者对应一个独立队列,全部绑定同一个 Fanout 交换机。
2:Fanout 交换机需要配置路由键
Fanout 交换机底层逻辑直接忽略 routingKey,无论发送时传什么值,都不会影响广播效果。
3:广播消息天然可靠、不会丢失
默认非持久化、自动ACK 场景下,广播消息极易丢失,生产必须做持久化+手动ACK。
四、SpringBoot 完整实现
1. 基础依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 生产级配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
# 手动ACK 保证广播消息不丢
acknowledge-mode: manual
# 限制预取数,防止单节点消息堆积
prefetch: 5
# 开启消费重试
retry:
enabled: true
max-attempts: 3
initial-interval: 10003. 广播交换机、队列、绑定配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutBroadcastConfig {
// 广播交换机名称
public static final String FANOUT_EXCHANGE = "system.fanout.broadcast.exchange";
// 三个独立消费者队列
public static final String QUEUE_CACHE_REFRESH = "queue.cache.refresh";
public static final String QUEUE_NOTICE = "queue.system.notice";
public static final String QUEUE_LOG = "queue.log.collect";
// 声明 Fanout 广播交换机:持久化、不自动删除
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
// 队列1:缓存刷新队列
@Bean
public Queue cacheRefreshQueue() {
return new Queue(QUEUE_CACHE_REFRESH, true);
}
// 队列2:系统通知队列
@Bean
public Queue noticeQueue() {
return new Queue(QUEUE_NOTICE, true);
}
// 队列3:日志采集队列
@Bean
public Queue logQueue() {
return new Queue(QUEUE_LOG, true);
}
// 全部绑定到广播交换机
@Bean
public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange);
}
@Bean
public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(noticeQueue).to(fanoutExchange);
}
@Bean
public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(logQueue).to(fanoutExchange);
}
}4. 广播消息生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class BroadcastProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/broadcast")
public String sendBroadcastMsg(@RequestParam String content) {
// Fanout广播:路由键传空字符串
rabbitTemplate.convertAndSend(
FanoutBroadcastConfig.FANOUT_EXCHANGE,
"",
content
);
return "✅ 广播消息发送成功:" + content;
}
}5. 多消费者实现(全员接收)
消费者1:缓存刷新消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class CacheRefreshConsumer {
@RabbitListener(queues = FanoutBroadcastConfig.QUEUE_CACHE_REFRESH)
public void consume(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("【缓存服务】接收广播消息:" + msg);
// 执行缓存刷新业务逻辑
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 消费失败,重回队列重试
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}消费者2:系统通知消费者
@Component
public class SystemNoticeConsumer {
@RabbitListener(queues = FanoutBroadcastConfig.QUEUE_NOTICE)
public void consume(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("【通知服务】接收广播消息:" + msg);
// 执行消息推送业务
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}消费者3:日志采集消费者
@Component
public class LogCollectConsumer {
@RabbitListener(queues = FanoutBroadcastConfig.QUEUE_LOG)
public void consume(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("【日志服务】接收广播消息:" + msg);
// 执行日志采集业务
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}五、测试效果
访问接口:
http://localhost:8080/send/broadcast?content=全局缓存刷新通知
控制台输出:
【缓存服务】接收广播消息:全局缓存刷新通知 【通知服务】接收广播消息:全局缓存刷新通知 【日志服务】接收广播消息:全局缓存刷新通知
一条消息,多服务同时消费,广播生效!
六、总结
1. 必须开启持久化
交换机、队列全部设置持久化,防止重启丢失广播配置。
2. 强制手动ACK
广播场景多为重要通知、缓存同步,自动ACK会导致业务未执行完成消息丢失。
3. 每个服务独立队列
不同微服务必须使用独立队列,避免竞争消费,保证广播全覆盖。
4. 广播消息建议做幂等
MQ 重试、网络抖动会导致广播消息重复推送,核心业务必须基于消息ID做幂等防重。
5. 禁止设置复杂路由键
Fanout 无视路由键,统一传空字符串,保持代码规范。
写在最后
广播发布订阅模式是微服务分布式通信的重要基石,区别于传统的点对点任务消费,它主打全局通知、多节点同步、状态广播,是缓存刷新、配置热更新、系统公告等场景的最优解。
很多开发者一直混淆“竞争消费”和“广播消费”的本质,导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范,能帮你彻底解决分布式多节点同步难题。
以上就是SpringBoot实现发布/订阅广播消息的示例代码的详细内容,更多关于SpringBoot发布/订阅广播消息的资料请关注脚本之家其它相关文章!
相关文章
SpringCloud使用Feign实现远程调用流程详细介绍
OpenFeign源于Netflix的Feign,是http通信的客户端。屏蔽了网络通信的细节,直接面向接口的方式开发,让开发者感知不到网络通信细节。所有远程调用,都像调用本地方法一样完成2023-02-02
Spring Boot前后端分离开发模式中的跨域问题及解决方法
本文介绍了解决Spring Boot前端Vue跨域问题的实战经验,并提供了后端和前端的配置示例,通过配置后端和前端,我们可以轻松解决跨域问题,实现正常的前后端交互,需要的朋友可以参考下2023-09-09


最新评论