Java中RabbitMQ消息队列的交换机详解

 更新时间:2023年07月31日 10:15:09   作者:迷鹿小女子  
这篇文章主要介绍了Java中的RabbitMQ交换机详解,消息队列是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器,需要的朋友可以参考下

RabbitMQ交换机

在这里插入图片描述

交换机属性

  • Name:交换机名称
  • Type:交换机类型 direct、topic、fanout、headers
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
  • Arguments:扩展参数,用于扩展AMQP协议,定制化使用

直流交换机

直连交换机Direct Exchange(完全匹配路由key)

所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4DirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author 小李飞刀
 * @site www.javaxl.com
 * @company
 * @create  2019-11-18 10:22
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
//        String routingKey = "test.direct111"; //收不到
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

代码的区别: 一条消息只会发送在一个队列里

在这里插入图片描述

创建一个交换机与队列

在这里插入图片描述

所绑定的交换机

在这里插入图片描述

控制台输出

在这里插入图片描述

主题交换机

主题交换机Topic Exchange(匹配路由规则的交换机)

所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上;

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic;

注意:可以使用通配符进行模糊匹配

  • 符号:“#” 匹配一个或者多个词
  • 符号:“” 匹配不多不少一个词

列如:

  • “log.#” 能够匹配到 “log.info.oa”
  • “log.” 能够匹配到 “log.err”

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
//        String routingKey = "user.*";
        // 1 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交换机和队列的绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

代码的区别: 一条消息会发送在多个队列里 消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

控制台输出

在这里插入图片描述

并且可以同时绑定多个交换机

在这里插入图片描述

输出交换机

输出交换机Fanout Exchange(不做路由)

  • 不处理路由键,只需要简单的将队列绑定到交换机上;
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
  • Fanout交换机转发消息是最快的

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";    //不设置路由键
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

在这里插入图片描述

控制台输出

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

Binding-绑定

  • Exchange和Exchange、Queue之间的连接关系;
  • Binding中可以包含RoutingKey或者参数

Queue-消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化
  • Durable:是,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

Message-消息

  • 服务器和应用程序之间传递的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery model、headers(自定义属性)

Message-其他属性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • Timestamp、type、user_id、app_id、cluster_id

Virtual host-虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

总结一下

交换机的概念

在没有交换机的时候,我们的消息队列会处理所有发给这个消息队列的消息,然后由消费者一个一个消费这个队列里面的消息,如果由集群的话还会分摊对这个消息队列的处理。只不过这里面有一个

Message acknowledgment的概念

这将会导致严重的bug——Queue中堆积的消息会越来越多

在这里插入图片描述

当然一般的消息中间件都不会这么干,我们使用了交换机后,我们看到我们的三种策略,其实都可以说由交换机去找跟它所绑定的消息队列,如果生产端的路由键不符合要求或找不到消息队列定好的路由键的话就会进行其他处理。

在这里插入图片描述

到此这篇关于Java中RabbitMQ消息队列的交换机详解的文章就介绍到这了,更多相关RabbitMQ交换机内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现List去重的方法详解

    Java实现List去重的方法详解

    本文用示例介绍Java的List(ArrayList、LinkedList等)的去重的方法。List去重的常用方法一般是:JDK8的stream的distinct、转为HashSet、转为TreeSet等,感兴趣的可以了解一下
    2022-05-05
  • java获取手机已安装APK的签名摘要

    java获取手机已安装APK的签名摘要

    这篇文章主要介绍了java获取手机已安装APK的签名摘要的相关资料,需要的朋友可以参考下
    2016-02-02
  • Spring boot使用logback实现日志管理过程详解

    Spring boot使用logback实现日志管理过程详解

    这篇文章主要介绍了Spring boot使用logback实现日志管理过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • Maven指令打包SpringBoot项目提示没有主清单文件问题

    Maven指令打包SpringBoot项目提示没有主清单文件问题

    在Java开发中,打包Jar时常会遇到“没有主清单属性”的错误,这通常是因为在pom.xml文件中没有正确配置maven插件导致的,特别是在使用自定义的<parent/>节点而非spring-boot-starter-parent时
    2024-09-09
  • nacos配置中心的配置修改之后,无需重启服务的实现过程

    nacos配置中心的配置修改之后,无需重启服务的实现过程

    本文介绍Nacos配置自动刷新的两种方式:@RefreshScope注解和@ConfigurationProperties,强调需将配置写入当前服务的配置文件以确保优先加载和动态更新,避免重启服务
    2025-08-08
  • Java线程中的ThreadLocal原理及源码解析

    Java线程中的ThreadLocal原理及源码解析

    这篇文章主要介绍了Java线程中的ThreadLocal原理及源码解析,ThreadLocal 的作用是为每个线程保存一份局部变量的引用,实现多线程之间的数据隔离,从而避免了线程不安全情况的发生,需要的朋友可以参考下
    2023-12-12
  • Stream流排序数组和List 详解

    Stream流排序数组和List 详解

    这篇文章主要介绍了Stream流排序数组和List 详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下
    2022-09-09
  • Redisson分布式锁原理深入分析

    Redisson分布式锁原理深入分析

    文章介绍了Redisson分布式锁的工作原理,并详细解释了加锁、续期、解锁的核心流程,Redisson利用Redis的单线程、自动续期和Lua脚本原子性等特点,避免了传统锁常见的死锁、锁争用和误删等问题,此外,还总结了使用Redisson分布式锁时的注意事项和避坑点
    2026-04-04
  • Java实战员工绩效管理系统的实现流程

    Java实战员工绩效管理系统的实现流程

    只学书上的理论是远远不够的,只有在实战中才能获得能力的提升,本篇文章手把手带你用java+SSM+Mysql+Maven+HTML实现一个员工绩效管理系统,大家可以在过程中查缺补漏,提升水平
    2022-01-01
  • java如何实现自动生成数据库设计文档

    java如何实现自动生成数据库设计文档

    以前我们还需要手写数据库设计文档、现在可以通过引入screw核心包来实现Java 数据库文档一键生成。本文将具体介绍一下如何通过java自动生成数据库设计文档,需要的朋友可以参考下
    2021-11-11

最新评论