RabbitMQ使用案例详解

 更新时间:2024年03月08日 10:51:58   作者:在无清风  
RabbitMQ是基于Erlang语言开发的开源的消息中间件,这篇文章给大家介绍RabbitMQ使用案例,感兴趣的朋友跟随小编一起看看吧

初识MQ

同步通讯和异步通讯

同步通讯

微服务间基于Feign的调用就属于同步方式,存在一些问题

同步调用存在的问题

  • 耦合度高:每次加入新需求,都要修改原来的代码
  • 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用时间之和
  • 浪费资源:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下回极度浪费系统资源
  • 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致镇整个微服务群故障

总结

同步调用优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败问题

异步通讯

异步调用常见实现就是事件驱动模式

事件驱动优势

优势一:服务解耦

优势二:新能提示,吞吐量提高

优势三:服务没有强依赖,不担心级联失败问题

优势四:流浪削峰

总结

异步通信的优点:

  • 耦合度低
  • 吞吐量提示
  • 故障隔离
  • 流量削峰

异步通讯的缺点:

  • 依赖于Broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ

MQ(MessageQueue),中文解释是消息队列,字面来看就是存放消息的队列,也就是时间驱动架构中的Broker。

RabbitMQ快速入门

RabbitMQ概述和安装

RabbitMQ是基于Erlang语言开发的开源的消息中间件,官网地址:RabbitMQ: easy to use, flexible messaging and streaming | RabbitMQ

安装RabbitMQ,参考课前资料

Docker运行RabbitMQ:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

输入端口,输入密码,进入RabbitMQ

RabbitMQ结构和概念

总结

RabbitMQ中的几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息

cirtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见消息模型

MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:

  • 基本消息队列(BasicQueue)

  • 工作消息队列(WorkQueue)

  • 发布订阅(Publish,Subscribe),又根据交换机类型不同分为三种:

1.Fanout Exchange:广播

2.Diect Exchange:路由

3.Topic Exchange:主题

不同消息模型

HelloWorld案例

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅列队,处理队列中的消息

案例:完成官方Demo中Hello world案例

实现步骤:

  • 导入课前资料中的demo工程
  • 运行publisher服务中测试类publisherTest中的测试方法testSendMessage()
  • 查看RabbitMQ控制台的消息
  • 启动consumer服务,查看是否能接收消息

总结

基本消息队列的消息发送流程:

  • 创建connection
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息

基本消息队列的消息接收流程:

  • 创建connection
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消费行为handleDelivery()
  • 利用channel将消费者与队列绑定

SpringAMQP

什么是AMQP

Advanced Message Queuing Protocol,适用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

什么是SprngAMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中Spring-AMPQ是基础抽象,Spring-rabbit是底层的默认实现

SpringAmqp的官方地址:Spring AMQP

Basic Queue 简单队列模型

案例:利用SpringAMQP实现Helloworld中的基础消息队列功能

发送消息

流程如下:

1.在父工程中引入spring-adqp的依赖

<!-- AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

在publisher服务中编写application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: ip #(填写自己的ip)
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /

在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @Test
    public void testSendMessage2SimpleQueue(){
        String queueName = "simple.queue";
        String message = "Hello World!spring amqp!!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在consumer服务中编写消费逻辑,绑定simple.queue这个队列

 String queueName = "simple.queue";
    String message = "Hello World!spring amqp!!";
    rabbitTemplate.convertAndSend(queueName, message);

总结

什么是AMQP?

应用间消息通信的一种协议,与语言和平台无关。

SpringAMQP如何发送消息?

  • 引入amqp的starter依赖
  • 配置RabbitMQ地址
  • 利用RabbitTemplate的convertAndSend方法

接收消息

在consumer中编写消费逻辑,监听simple.queue

在consumer服务中编写application.yml,添加mq连接信息:

spring:
  rabbitmq:
    host: ip
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /

在consumer服务中新建一个类,编写消费逻辑:

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到simple.queue的消息:{"+msg+"}");
    }
}

总结

SpringAMQP如何接收消息?

  • 映入amqp的starter依赖
  • 配置RabbitMQ地址
  • 定义类,添加@Component注解

类中声明方法,添加@RabbitListener注解,方法参数就时消息

Work Queue 工作队列模型

案例:模拟Work Queue,实现一个队列绑定多个消费者

基本思路如下:

在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "Hello World!spring amqp!!__";
    for (int i = 1; i < 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message+i);
        Thread.sleep(20);
    }
}

在consumer服务中定义两个消息监听者,都监听simple.queue队列

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.out.println("消费者2接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now());
    Thread.sleep(50);
}

消费者 1 每秒处理50条消息,消费者 2 每秒处理10条消息

总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条信息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

发布,订阅模型-Fanout

发布订阅与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入exchange(交换机)。

案例:利用SpringAMQP演示FanoutExchange的使用

实现思路

在consumer服务中,利用代码声明队列,交换机,将两者绑定

在consumer服务声明Exchang,Queue,Bingding

SpringAMQP提供了交换机,队列,绑定关系的API,例如:

@Configuration
public class FanoutConfig {
    //itcast.fanout(交换机)
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    //itcast.queue1(队列一)
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1).
                to(fanoutExchange);
    }
    //itcast.queue2(队列二)
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
               .bind(fanoutQueue2).
                to(fanoutExchange);
    }
}

在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg){
    System.out.println("消费者2接收到fanout.queue1的消息:{"+msg+"}");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg){
    System.out.println("消费者2接收到fanout.queue2的消息:{"+msg+"}");
}

在publisher中编写测试方法,向itcast.fanout发送消息

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "itcast.fanout";
        //消息
        String message = "hello,every one!";
        //发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失

FanoutExchange的会将消息路由到每个绑定的队列

声明队列,交换机,绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

发布,订阅模型-Direct

发布订阅-DirectExchange

Direct Exchange会将接收到的消息根据规则路由指定的Queue,因此称为路由模式(routes)。

案例:利用SpringAMQP演示DirectExchange的使用

实现思路如下:

实现思路如下:

1.利用@RabbitListener声明Exchange,Queue,RoutingKey

  • 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  • 并利用@RabbitListener声明Exchange,Queue,RoutingKey

2.在consumer服务中,编写两个消费则方法,分别监听direct.queue1和direct.queue2

@RabbitListener(bindings = @QueueBinding(
        //队列
        value = @Queue(name = "direct.queue1"),
        //交换机
        exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
        //邦定机置
        key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:{"+msg+"}");
}
@RabbitListener(bindings = @QueueBinding(
        //队列
        value = @Queue(name = "direct.queue2"),
        //交换机
        exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
        //邦定机置
        key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:{"+msg+"}");
}

3.在publisher中编写测试方法,向itcast.direct发送消息

@Test
public void testSendDirectExchange(){
    //交换机名称
    String exchangeName = "itcast.direct";
    //消息
    String message = "hello,blue one!";
    //发送消息,参数分别是:交换机名称,RoutingKey("red"),消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见的注解?

  • @Queue
  • @Exchange

发布,订阅模型-Topic

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单纯的列表,并且以 . 分割。

例如:

china.news:代表有中国新闻消息;

china.weather:代表中国的天气消息;

japan.news:代表日本新闻;

japan.weather:代表日本的天气消息;

案例:利用SpringAMQP演示TopicExchange的使用

实现思路如下:

  • 利用@RabbitListener声明Exchange,Queue,RoutingKey
  • 在consumer服务中,编写两个消费则方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:{"+msg+"}");
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:{"+msg+"}");
}

在publisher中编写测试方法,向itcast.topic发送消息

@Test
public void testSendTopicExchange(){
    //交换机名称
    String exchangeName = "itcast.topic";
    //消息
    String message = "中国的新闻!";
    //发送消息,参数分别是:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

总结

描述下Direct交换机与Topic交换机的差异?

自己总结

消息转换器

测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。(也就是是对象会序列化为字节)

修改序列化(推荐JSON序列化)

发送消息

我们在publisher服务引入依赖

<!--rabbitmq使用json序列化-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

我们在publisher服务声明MessageConverter:

@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

发送消息

@Test
public void testSendObjectQueue(){
    Map<String,Object> msg = new HashMap<>();
    msg.put("name","留言");
    msg.put("age",21);
    rabbitTemplate.convertAndSend("object.queue",msg);
}

接送消息

我们在consumer服务引入Jackson依赖:

<!--rabbitmq使用json序列化-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

我们在consumer服务定义MessageConverter:

@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

然后定义一个消费者,监听object.queue队列并消费信息:

@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){
    System.out.println("消费者接收到object.queue的消息:{"+msg+"}");
}

总结

SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方与接收方必须使用相同的MessageConverter

到此这篇关于RabbitMQ使用案例详解的文章就介绍到这了,更多相关RabbitMQ使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 关于mybatis plus 中的查询优化问题

    关于mybatis plus 中的查询优化问题

    这篇文章主要介绍了关于mybatis plus 中的查询优化问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • MyBatis-Plus 查询返回实体对象还是map

    MyBatis-Plus 查询返回实体对象还是map

    这篇文章主要介绍了MyBatis-Plus 查询返回实体对象还是map,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Python连接Java Socket服务端的实现方法

    Python连接Java Socket服务端的实现方法

    这篇文章主要介绍了Python连接Java Socket服务端的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • SpringBoot自定义监听器的项目实践

    SpringBoot自定义监听器的项目实践

    Spring Boot提供了强大的事件模型,其中包括多种内置监听器,同时也支持开发者自定义监听器,下面就来介绍下SpringBoot自定义监听器,感兴趣的可以了解一下
    2024-03-03
  • Mybatis中 SQL语句复用

    Mybatis中 SQL语句复用

    这篇文章主要介绍了Mybatis中 SQL语句复用,需要的朋友可以参考下
    2017-03-03
  • Kotlin内存陷阱inline使用技巧示例详解

    Kotlin内存陷阱inline使用技巧示例详解

    这篇文章主要为大家介绍了Kotlin内存陷阱inline使用技巧示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-10-10
  • 一篇文章带你了解java Object根类中关于toString,equals的方法

    一篇文章带你了解java Object根类中关于toString,equals的方法

    这篇文章主要介绍了Object类toString()和equals()方法使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2021-09-09
  • Spring中@PropertySource的使用方法和运行原理详解

    Spring中@PropertySource的使用方法和运行原理详解

    这篇文章主要介绍了Spring中@PropertySource的使用方法和运行原理详解,PropertySource注解可以方便和灵活的向Spring的环境容器(org.springframework.core.env.Environment Environment)中注入一些属性,这些属性可以在Bean中使用,需要的朋友可以参考下
    2023-11-11
  • Java系统的高并发解决方法详解

    Java系统的高并发解决方法详解

    这篇文章主要介绍了Java系统的高并发解决方法,内容十分丰富,在这里分享给大家,需要的朋友可以参考。
    2017-09-09
  • SpringCloud之Feign远程接口映射的实现

    SpringCloud之Feign远程接口映射的实现

    这篇文章主要介绍了SpringCloud之Feign远程接口映射的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09

最新评论