RabbitMQ中的Publish-Subscribe模式最佳实践记录

 更新时间:2024年12月19日 14:11:42   作者:AllenBright  
Publish/Subscribe 模式是 RabbitMQ 中一种强大且灵活的消息传递模式,适用于需要将消息广播给多个订阅者的场景,这篇文章主要介绍了RabbitMQ中的Publish-Subscribe模式,需要的朋友可以参考下

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件。RabbitMQ 是一个功能强大且广泛使用的开源消息代理,支持多种消息传递模式。其中,Publish/Subscribe(发布/订阅)模式是一种常见且重要的模式,它允许消息发布者将消息广播给多个订阅者。

本文将深入探讨 RabbitMQ 中的 Publish/Subscribe 模式,包括其工作原理、实现方式、适用场景以及最佳实践。

1. Publish/Subscribe 模式简介

1.1 什么是 Publish/Subscribe 模式?

Publish/Subscribe(发布/订阅)模式是一种消息传递模式,它将消息的发送者(发布者)和接收者(订阅者)解耦。发布者将消息发布到一个交换机(Exchange),而订阅者通过绑定到交换机的**队列(Queue)**来接收消息。

与点对点模式(如工作队列)不同,Publish/Subscribe 模式允许多个订阅者接收相同的消息,从而实现消息的广播。

1.2 核心概念

在 RabbitMQ 中,Publish/Subscribe 模式依赖以下核心组件:

  • 发布者(Publisher):发送消息的客户端。
  • 交换机(Exchange):接收发布者发送的消息,并根据规则将消息路由到队列。
  • 队列(Queue):存储消息的缓冲区。
  • 订阅者(Subscriber):从队列中消费消息的客户端。
  • 绑定(Binding):定义交换机和队列之间的关系。

2. Publish/Subscribe 模式的工作原理

2.1 交换机的作用

在 RabbitMQ 中,消息不会直接发送到队列,而是发送到交换机。交换机根据绑定规则将消息路由到相应的队列。

RabbitMQ 提供了多种类型的交换机,其中最常用的是:

  • Fanout 交换机:将消息广播到所有绑定到它的队列,忽略路由键(Routing Key)。
  • Direct 交换机:根据消息的路由键将消息路由到匹配的队列。
  • Topic 交换机:支持更复杂的路由规则,允许使用通配符匹配路由键。
  • Headers 交换机:根据消息的头部属性进行路由。

在 Publish/Subscribe 模式中,通常使用 Fanout 交换机,因为它能够将消息广播到所有绑定的队列。

2.2 消息的广播过程

  • 发布者将消息发送到交换机。
  • 交换机接收到消息后,将消息广播到所有绑定的队列。
  • 订阅者从队列中消费消息。

3. Java 实现 Publish/Subscribe 模式

以下是使用 Java 和 RabbitMQ Java Client 实现 Publish/Subscribe 模式的完整示例。

3.1 添加依赖

在 Maven 项目中,添加 RabbitMQ Java Client 依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

3.2 创建发布者(Publisher)

发布者负责将消息发送到交换机。以下是发布者的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Publisher {
    private static final String EXCHANGE_NAME = "publisher_subscriber";
    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个 Fanout 交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 发布消息
            String message = "Hello, Subscribers!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

3.3 创建订阅者(Subscriber)

订阅者负责从队列中消费消息。以下是订阅者的代码:

import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Subscriber {
    private static final String EXCHANGE_NAME = "publisher_subscriber";
    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明一个 Fanout 交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建一个临时队列,并绑定到交换机
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定义消息处理函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 开始消费消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3.4 运行示例

启动多个订阅者,在不同的终端窗口中运行多个订阅者实例

启动多个订阅者后,能在RabbitMQ终端页面,能看到多个临时的队列,但交换机只有一个publisher_subscriber

启动发布者,在另一个终端窗口中运行发布者 3.4.1 观察输出

所有订阅者都会收到发布者发送的消息。例如:

发布者输出:

 [x] Sent 'Hello, Subscribers!'

订阅者输出:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello, Subscribers!'

4. 代码解析

4.1 发布者代码解析

  • 连接工厂ConnectionFactory 用于创建到 RabbitMQ 服务器的连接。
  • 交换机声明channel.exchangeDeclare(EXCHANGE_NAME, "fanout") 声明一个 Fanout 交换机。
  • 消息发布channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)) 将消息发送到交换机。

4.2 订阅者代码解析

  • 临时队列channel.queueDeclare().getQueue() 创建一个非持久化的、独占的临时队列。
  • 队列绑定channel.queueBind(queueName, EXCHANGE_NAME, "") 将队列绑定到交换机。
  • 消息处理DeliverCallback 定义了如何处理接收到的消息。
  • 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }) 开始消费消息。

5. Publish/Subscribe 模式的适用场景

5.1 日志记录

在分布式系统中,日志记录是一个常见的需求。使用 Publish/Subscribe 模式,可以将日志消息广播给多个日志处理器,分别将日志写入文件、数据库或发送到监控系统。

5.2 实时通知

在社交网络或即时通讯应用中,可以使用 Publish/Subscribe 模式向多个用户发送实时通知。例如,当用户发布新动态时,通知所有关注者。

5.3 分布式缓存更新

在分布式缓存系统中,当缓存数据更新时,可以使用 Publish/Subscribe 模式通知所有缓存节点同步更新。

5.4 事件驱动架构

在事件驱动架构中,Publish/Subscribe 模式用于实现事件的广播。例如,当用户注册成功时,发布一个事件,通知多个服务(如邮件服务、积分服务)执行相应的操作。

6. 最佳实践

6.1 使用持久化

为了确保消息不会丢失,建议将交换机和队列设置为持久化。例如:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.queueDeclare("my_queue", true, false, false, null);

6.2 处理消息确认

在生产环境中,建议启用消息确认机制,确保消息被成功消费。例如:

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });

6.3 避免消息积压

在高并发场景下,可能会出现消息积压的情况。可以通过设置队列的最大长度或使用**死信队列(DLX)**来处理积压的消息。

6.4 监控和报警

使用 RabbitMQ 的管理界面或监控工具(如 Prometheus + Grafana)监控消息队列的状态,并设置报警规则,及时发现和解决问题。

7. 总结

Publish/Subscribe 模式是 RabbitMQ 中一种强大且灵活的消息传递模式,适用于需要将消息广播给多个订阅者的场景。通过使用 Fanout 交换机,可以轻松实现消息的广播,同时结合持久化、消息确认和监控机制,可以构建高可靠性的分布式系统。

到此这篇关于RabbitMQ中的Publish-Subscribe模式的文章就介绍到这了,更多相关RabbitMQ Publish-Subscribe模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程图解

    SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程图解

    这篇文章主要介绍了SpringBoot配置嵌入式Servlet容器和使用外置Servlet容器的教程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • java结合keytool如何实现非对称签名和验证详解

    java结合keytool如何实现非对称签名和验证详解

    这篇文章主要给大家介绍了关于java结合keytool如何实现非对称签名和验证的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-08-08
  • 老生常谈java中的fail-fast机制

    老生常谈java中的fail-fast机制

    下面小编就为大家带来一篇老生常谈java中的fail-fast机制。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • Java8中的LocalDateTime和Date一些时间操作方法

    Java8中的LocalDateTime和Date一些时间操作方法

    这篇文章主要介绍了Java8中的LocalDateTime和Date一些时间操作方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-04-04
  • Java多线程下载文件实例详解

    Java多线程下载文件实例详解

    这篇文章主要为大家详细介绍了Java多线程下载文件的实例代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-04-04
  • 用Set类判断Map里key是否存在的示例代码

    用Set类判断Map里key是否存在的示例代码

    本篇文章主要是对用Set类判断Map里key是否存在的示例代码进行了介绍,需要的朋友可以过来参考下,希望对大家有所帮助
    2013-12-12
  • idea取消git托管方式(删除git文件)

    idea取消git托管方式(删除git文件)

    遇到Git文件传输错误或打不开问题时,需进行Git清理和重新配置,首先删除项目中的.git文件和.gitignore文件,若找不到,检查是否为隐藏文件,接着在IDE的设置中,删除所有版本控制模块,最后,若想重新使用Git,可在设置里重新启用并配置,连接至GitHub仓库即可恢复正常
    2024-10-10
  • 使用 Apache POI 在 Java 中写入 Excel 文件的方法

    使用 Apache POI 在 Java 中写入 Excel

    这篇文章详细介绍了如何使用ApachePOI在Java中编写Excel文件的技巧,包括创建工作簿、工作表、行和单元格,以及如何处理不同版本的Excel文件,通过详细的步骤和代码示例,读者可以快速掌握ApachePOI的基本使用方法,感兴趣的朋友一起看看吧
    2025-02-02
  • Java日期工具类时间校验实现

    Java日期工具类时间校验实现

    一般项目中需要对入参进行校验,比如必须是一个合法的日期,本文就来介绍一下Java日期工具类时间校验实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • SpringBoot读写xml上传到AWS存储服务S3的示例

    SpringBoot读写xml上传到AWS存储服务S3的示例

    这篇文章主要介绍了SpringBoot读写xml上传到S3的示例,帮助大家更好的理解和使用springboot框架,感兴趣的朋友可以了解下
    2020-10-10

最新评论