RabbitMQ的Direct Exchange模式实现的消息发布案例(示例代码)

 更新时间:2024年09月12日 10:57:19   作者:Xwzzz_  
本文介绍了RabbitMQ的DirectExchange模式下的消息发布和消费的实现,详细说明了如何在DirectExchange模式中进行消息的发送和接收,以及消息处理的基本方法,感兴趣的朋友跟随小编一起看看吧

Producer生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducer {
    private final static String EXCHANGE_NAME = "direct_message_exchange";
    private final static String EXCHANGE_TYPE = "direct";
    public static void main(String[] args) {
        // 1. 创建连接工厂,设置连接参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672); // RabbitMQ默认端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 2. 声明交换机 (direct类型,持久化)
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
            // 3. 声明队列 (持久化,非独占,连接断开时不自动删除)
            channel.queueDeclare("queue5", true, false, false, null);
            channel.queueDeclare("queue6", true, false, false, null);
            channel.queueDeclare("queue7", true, false, false, null);
            // 4. 绑定队列到交换机,设置路由键
            channel.queueBind("queue5", EXCHANGE_NAME, "order");
            channel.queueBind("queue6", EXCHANGE_NAME, "order");
            channel.queueBind("queue7", EXCHANGE_NAME, "course");
            // 5. 准备要发送的消息
            String message = "你好,学相伴:www.kuangstudy.com";
            // 6. 向交换机发送消息,使用路由键 "course"
            channel.basicPublish(EXCHANGE_NAME, "course", null, message.getBytes("UTF-8"));
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            // 捕获异常并打印堆栈信息
            ex.printStackTrace();
            System.out.println("消息发送出现异常...");
        } finally {
            // 在try-with-resources中,不再需要显式关闭连接和通道
            // 会自动关闭连接和通道
        }
    }
}

功能点:

  • 声明了一个Direct类型的交换机,并绑定了三个队列(queue5queue6queue7)。其中queue5queue6都绑定到order路由键,而queue7绑定到course路由键。
  • 发送了一条消息到course路由键绑定的队列中(即queue7)。

Consumer消费者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class RabbitMQConsumer {
    private final static String QUEUE_NAME = "queue7"; // 与生产者的绑定一致
    private final static String EXCHANGE_NAME = "direct_message_exchange";
    private final static String EXCHANGE_TYPE = "direct";
    public static void main(String[] args) {
        // 1. 创建连接工厂,设置连接参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672); // RabbitMQ默认端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 2. 声明交换机和队列,与生产者保持一致
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 3. 绑定队列到交换机,路由键为"course"
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "course");
            System.out.println(" [*] 等待接收消息...");
            // 4. 定义接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收到的消息: '" + message + "'");
                // 这里可以添加进一步的消息处理逻辑
            };
            // 5. 开始消费消息 (自动应答)
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (Exception ex) {
            // 捕获异常并打印堆栈信息
            ex.printStackTrace();
            System.out.println("消费者运行中出现异常...");
        }
    }
}

功能点: 

   1.  与生产者保持一致:消费者的队列名称、交换机名称和路由键与生产者保持一致,即监听queue7队列,并接收路由键为course的消息。

   2. 回调函数处理消息:使用DeliverCallback来定义收到消息后的处理逻辑。在回调函数中,delivery.getBody()获取消息内容,随后可以对消息进行处理、存储或其他业务逻辑操作。

   3 自动应答basicConsume中的true表示自动应答(auto-acknowledge),即消息处理完毕后,RabbitMQ会自动确认消息已成功处理。如果需要手动应答,可以将true替换为false,并在处理完成后调用channel.basicAck()来手动确认消息。

到此这篇关于RabbitMQ的Direct Exchange模式实现的消息发布案例的文章就介绍到这了,更多相关RabbitMQ Direct Exchange消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java手机号最新校验规则

    Java手机号最新校验规则

    在Java中,进行手机号校验通常使用正则表达式(Regex)来匹配手机号的格式,以下是一个基于当前(截至2024年)中国手机号规则的校验方法,感兴趣的朋友跟随小编一起看看吧
    2024-05-05
  • JDK10新特性之本地变量类型var的深入理解

    JDK10新特性之本地变量类型var的深入理解

    这篇文章主要给大家介绍了J关于DK10新特性之本地变量类型var的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用JDK10具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2020-05-05
  • spring boot 自定义starter的实现教程

    spring boot 自定义starter的实现教程

    下面小编就为大家分享一篇spring boot 自定义starter的实现教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-12-12
  • 使用nacos实现自定义文本配置的实时刷新

    使用nacos实现自定义文本配置的实时刷新

    我们都知道,使用Nacos时,如果将Bean使用@RefreshScope标注之后,这个Bean中的配置就会做到实时刷新,本文给大家介绍了如何使用nacos实现自定义文本配置的实时刷新,需要的朋友可以参考下
    2024-05-05
  • Java中Integer类型值相等判断方法

    Java中Integer类型值相等判断方法

    这篇文章主要给大家介绍了关于Java中Integer类型值相等判断的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-02-02
  • Spring Boot 编写Servlet、Filter、Listener、Interceptor的方法

    Spring Boot 编写Servlet、Filter、Listener、Interceptor的方法

    这篇文章给大家介绍了spring-boot中如何定义过滤器、监听器和拦截器,对Spring Boot 编写Servlet、Filter、Listener、Interceptor的相关知识感兴趣的朋友一起看看吧
    2017-07-07
  • Java 实现微信和支付宝支付功能

    Java 实现微信和支付宝支付功能

    这篇文章主要介绍了Java 实现微信和支付宝支付功能,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • java正则匹配HTML中a标签里的中文字符示例

    java正则匹配HTML中a标签里的中文字符示例

    这篇文章主要介绍了java正则匹配HTML中a标签里的中文字符,涉及java中文正则及HTML元素操作技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2017-01-01
  • 完美解决idea moudle没有蓝色的小方块的问题

    完美解决idea moudle没有蓝色的小方块的问题

    这篇文章主要介绍了完美解决idea moudle没有蓝色的小方块的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • 详解Java动态加载数据库驱动

    详解Java动态加载数据库驱动

    本篇文章主要介绍了详解Java动态加载数据库驱动,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05

最新评论