Java中RabbitMQ消息队列的交换机详解

 更新时间:2023年07月31日 10:15:09   作者:迷鹿小女子  
这篇文章主要介绍了Java中的RabbitMQ交换机详解,消息队列是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器,需要的朋友可以参考下

RabbitMQ交换机

在这里插入图片描述

交换机属性

  • Name:交换机名称
  • Type:交换机类型 direct、topic、fanout、headers
  • Durability:是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
  • Arguments:扩展参数,用于扩展AMQP协议,定制化使用

直流交换机

直连交换机Direct Exchange(完全匹配路由key)

所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4DirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author 小李飞刀
 * @site www.javaxl.com
 * @company
 * @create  2019-11-18 10:22
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
//        String routingKey = "test.direct111"; //收不到
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

代码的区别: 一条消息只会发送在一个队列里

在这里插入图片描述

创建一个交换机与队列

在这里插入图片描述

所绑定的交换机

在这里插入图片描述

控制台输出

在这里插入图片描述

主题交换机

主题交换机Topic Exchange(匹配路由规则的交换机)

所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上;

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic;

注意:可以使用通配符进行模糊匹配

  • 符号:“#” 匹配一个或者多个词
  • 符号:“” 匹配不多不少一个词

列如:

  • “log.#” 能够匹配到 “log.info.oa”
  • “log.” 能够匹配到 “log.err”

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
//        String routingKey = "user.*";
        // 1 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交换机和队列的绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

代码的区别: 一条消息会发送在多个队列里 消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

控制台输出

在这里插入图片描述

并且可以同时绑定多个交换机

在这里插入图片描述

输出交换机

输出交换机Fanout Exchange(不做路由)

  • 不处理路由键,只需要简单的将队列绑定到交换机上;
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
  • Fanout交换机转发消息是最快的

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";    //不设置路由键
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

在这里插入图片描述

控制台输出

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

Binding-绑定

  • Exchange和Exchange、Queue之间的连接关系;
  • Binding中可以包含RoutingKey或者参数

Queue-消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化
  • Durable:是,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

Message-消息

  • 服务器和应用程序之间传递的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery model、headers(自定义属性)

Message-其他属性

  • content_type、content_encoding、priority
  • correlation_id、reply_to、expiration、message_id
  • Timestamp、type、user_id、app_id、cluster_id

Virtual host-虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

总结一下

交换机的概念

在没有交换机的时候,我们的消息队列会处理所有发给这个消息队列的消息,然后由消费者一个一个消费这个队列里面的消息,如果由集群的话还会分摊对这个消息队列的处理。只不过这里面有一个

Message acknowledgment的概念

这将会导致严重的bug——Queue中堆积的消息会越来越多

在这里插入图片描述

当然一般的消息中间件都不会这么干,我们使用了交换机后,我们看到我们的三种策略,其实都可以说由交换机去找跟它所绑定的消息队列,如果生产端的路由键不符合要求或找不到消息队列定好的路由键的话就会进行其他处理。

在这里插入图片描述

到此这篇关于Java中RabbitMQ消息队列的交换机详解的文章就介绍到这了,更多相关RabbitMQ交换机内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot 部署过程解析(jar or war)

    Spring Boot 部署过程解析(jar or war)

    这篇文章主要介绍了Spring Boot 部署过程解析(jar or war),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • maven配置阿里云仓库的实现方法(2022年)

    maven配置阿里云仓库的实现方法(2022年)

    本文主要介绍了maven配置阿里云仓库的实现方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • Java中内存分配的几种方法

    Java中内存分配的几种方法

    本文主要介绍Java中几种分配内存的方法。我们会看到如何使用sun.misc.Unsafe来统一操作任意类型的内存。以前用C语言开发的同学通常都希望能在Java中通过较底层的接口来操作内存,他们一定会对本文中要讲的内容感兴趣
    2014-03-03
  • java7改善后的异常处理

    java7改善后的异常处理

    在本篇文章里小编给大家整理的是关于java7改善后的异常处理知识点总结,有需要的朋友们参考下。
    2019-11-11
  • IDEA Debug模式下改变各类型变量值的方法

    IDEA Debug模式下改变各类型变量值的方法

    这篇文章主要介绍了IDEA Debug模式下改变各类型变量值的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • springboot使用外置tomcat启动方式

    springboot使用外置tomcat启动方式

    这篇文章主要介绍了springboot使用外置tomcat启动方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • MyBatis学习教程(七)-Mybatis缓存介绍

    MyBatis学习教程(七)-Mybatis缓存介绍

    MyBatis缓存分为一级缓存和二级缓存一级缓存,本文给大家介绍mybatis缓存知识,非常不错具有参考借鉴价值,感兴趣的朋友一起学习吧
    2016-05-05
  • spring boot整合mongo查询converter异常排查记录

    spring boot整合mongo查询converter异常排查记录

    这篇文章主要为大家介绍了spring boot整合mongo查询时抛出converter异常的排查解决记录,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-03-03
  • java使用Graphics2D绘图/画图方式

    java使用Graphics2D绘图/画图方式

    这篇文章主要介绍了java使用Graphics2D绘图/画图方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java上传文件图片到服务器的方法

    Java上传文件图片到服务器的方法

    这篇文章主要为大家详细介绍了Java上传文件图片到服务器的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01

最新评论