关于Java中RabbitMQ的高级特性

 更新时间:2023年07月10日 11:16:10   作者:卑微小钟  
这篇文章主要介绍了关于Java中RabbitMQ的高级特性,MQ全称为Message Queue,即消息队列,"消息队列"是在消息的传输过程中保存消息的容器,它是典型的:生产者、消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息,需要的朋友可以参考下

RabbitMQ高级特性

1.消息的可靠投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

RabbitMQ整个消息投递的路径为:producer>rabbitMQ broker> exchange > queue > consumer

  • 消息从producer到exchange则会返回一个confirmCallback
  • 消息从exchange到queue投递失败则会返回一个returnCallback

利用这两个callback来控制消息的可靠性传递。

1.1 confirm 确认模式

(1)开启确认模式

在创建连接工厂的时候要开启确认模式,关键字:publisher-confirms,默认为false

<rabbit:connection-factory id="connectionFactory" 
                           host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-confirms="true"
/>

(2)RabbitTemplate设置回调

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    /**
     * 注入RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 测试默认的队列发送消息
     */
    @Test
    public void testConfirmCallback() throws InterruptedException {
        // 设置回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 回调方法
             * @param correlationData 回调的相关数据。
             * @param ack true 表示发送成功, false 发送失败
             * @param cause 失败原因,ack==true->null
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("发送成功");
                } else {
                    System.out.println("发送失败,原因:" + cause);
                    // 失败后处理流程
                }
            }
        });
        rabbitTemplate.convertAndSend("spring_queue", "hello world");
        // 防止发送完成后,未完成回调关闭通道
        Thread.sleep(5000);
    }
}
  • public void confirm(CorrelationData correlationData, boolean ack, String cause)

    • correlationData 参数,发送数据的时候可以携带上
    • ack 是否发送成功,成功为true,失败为false
    • cause 失败的原因,成功时为null
  • Thread.sleep(5000);防止发送完成后,未完成回调关闭通道

    如果没有加上会

    clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

1.2 return 回退模式

(1)开启回退模式

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                           port="${rabbitmq.port}"
                           username="${rabbitmq.username}"
                           password="${rabbitmq.password}"
                           virtual-host="${rabbitmq.virtual-host}"
                           publisher-returns="true"
/>

(2)RabbitTemplate设置回调

@Test
    public void testReturnCallback() throws InterruptedException {
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        // 设置回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 返回消息
             * @param message 消息对象
             * @param replyCode 错误码
             * @param replyText 交换信息
             * @param exchange 交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息对象:" + new String(message.getBody()));
                System.out.println("错误码:" + replyCode);
                System.out.println("交换信息:" + replyText);
                System.out.println("交换机:" + exchange);
                System.out.println("路由键:" + routingKey);
            }
        });
        rabbitTemplate.convertAndSend("spring_direct_exchange", "direct_key_3",
                "spring_direct_exchange_direct_key_1");
        // 防止发送完成后,未完成回调关闭通道
        Thread.sleep(5000);
    }

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

  • message 消息对象
  • replyCode 错误码
  • replyText 交换信息
  • exchange 交换机
  • routingKey 路由键

mandatory属性的优先级高于publisher-returns的优先级
mandatory结果为true、false时会忽略掉publisher-returns属性的值
mandatory结果为null(即不配置)时结果由publisher-returns确定

2.Consumer Ack(消费端)

Ack指Acknowledge,确认。表示消费端接收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message 从RabbitMQ的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用``channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()`方法,让其自动重新发送消息。

2.1 设置手动签收

(1)创建一个监听器接收消息

设置手动接收时,让监听器实现ChannelAwareMessageListener接口

如果消息成功处理,则调用channel.basicAck()

如果消息处理失败,则调用 channel.basicNack(),broker重新发送consumer

/**
 * @author zhong
 * <p>
 * Consumer Ack机制
 * 1.设置手动签收,acknowledge="manual"
 * 2.让监听器实现ChannelAwareMessageListener接口
 * 3.如果消息成功处理,则调用channel.basicAck()
 * 4.如果消息处理失败,则调用 channel.basicNack(),broker重新发送consumer
 */
@Component
public class AckSpringQueueListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 接收消息
        System.out.println("Message:" + new String(message.getBody()));
        // 手动签收
        /**
         * deliveryTag: 标识id
         * multiple: 确认所有消息
         */
        channel.basicAck(deliveryTag, true);
        // 手动拒绝
        /**
         * requeue:如果被拒绝的消息应该被重新排队而不是被丢弃/死信
         */
        //channel.basicNack(deliveryTag, true, true);
    }
}

(2)设置手动,加入监听

设置手动签收,acknowledge=“manual”

<context:component-scan base-package="org.example"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true">
    <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>

3.消费端限流

MQ一个作用就是削峰填谷,通过消费端限流实现。

消费端限流包括一下操作:

  • <rabbit:listener-container>配置prefetch​​​​​​​属性设置
  • 消费端一次拉去多少消息消费端确认模式一定为手动确认。acknowledge="nanual"

(1)关键配置文件:

<context:component-scan base-package="org.example"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"
                           auto-declare="true">
    <rabbit:listener ref="qosListener" queue-names="spring_queue"/>
</rabbit:listener-container>

(1)手动确认 acknowledge="manual"

(2)设置阈值 prefetch="1"

(2)关键监听器代码

/**
 * Consumer 限流机制
 * 1.确保ack机制为手动确认
 * 2.listener-container 配置属性
 * perfetch = 1 表示消费端每次从mq拉取一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
 */
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("QosListener:" + new String(message.getBody()));
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 签收消息
        Thread.sleep(1000);
        channel.basicAck(deliveryTag, true);
    }
}

4.TTL(存活时间/过期时间)

TTL全称Time To Live (存活时间/过期时间)。

  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

4.1 控制台设置

RabbitMQ控制台可以设置队列的过期时间。

4.2 消息单独过期

@Test
public void testTTL() {
    // 消息后处理队列,设置一下消息参数信息
    MessagePostProcessor messagePostProcessor = message -> {
        // 1.设置message的消息
        message.getMessageProperties().setExpiration("50000");// 设置过期时间,字符串,毫秒
        // 2.返回消息
        return message;
    };
    // 传入
    rabbitTemplate.convertAndSend("spring_fanout_exchange", "key", "RabbitMQ", messagePostProcessor);
}

4.3 小结

如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。队列过期后,会将队列所有消息全部移除。消息过期后,只有消息在队列顶端,才会判断其是否过期(移除)。

5.死信队列

死信队列,英文缩写:DLX。Dead Letter Exchange(死信交换机)

当消息成为Dead Message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

到此这篇关于关于Java中RabbitMQ的高级特性 的文章就介绍到这了,更多相关RabbitMQ的高级特性 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python Prim算法通过遍历墙实现迷宫的生成

    Python Prim算法通过遍历墙实现迷宫的生成

    之前,我们在另外一篇文章中使用Prim算法生成了一个完美迷宫,利用的是遍历网格的方法,这一次,我们要教教大家用遍历墙的方法生成,感兴趣的可以收藏一下
    2023-01-01
  • 浅析Python的对象拷贝和内存布局

    浅析Python的对象拷贝和内存布局

    这篇文章主要为大家详细介绍了Python中的对象拷贝和内存布局的相关知识,文中的示例代码讲解详细,对我们学习Python有一定的帮助,需要的可以参考一下
    2022-12-12
  • Python logging模块handlers用法详解

    Python logging模块handlers用法详解

    这篇文章主要介绍了Python logging模块handlers用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • python global关键字的用法详解

    python global关键字的用法详解

    这篇文章主要介绍了python global关键字的用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • 基于Python的自媒体小助手---登录页面的实现代码

    基于Python的自媒体小助手---登录页面的实现代码

    这篇文章主要介绍了基于Python的自媒体小助手---登录页面的实现代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • Python "手绘风格"数据可视化方法实例汇总

    Python "手绘风格"数据可视化方法实例汇总

    这篇文章主要给大家介绍了关于Python "手绘风格"数据可视化方法实现的相关资料,本文分别给大家带来了Python-matplotlib手绘风格图表绘制、Python-cutecharts手绘风格图表绘制以及Python-py-roughviz手绘风格图表绘制,需要的朋友可以参考下
    2022-02-02
  • python使用cartopy在地图中添加经纬线的示例代码

    python使用cartopy在地图中添加经纬线的示例代码

    gridlines可以根据坐标系,自动绘制网格线,这对于普通绘图来说显然不必单独拿出来说说,但在地图中,经纬线几乎是必不可少的,本文将给大家介绍了python使用cartopy在地图中添加经纬线的方法,需要的朋友可以参考下
    2024-01-01
  • python使用ctypes模块调用windowsapi获取系统版本示例

    python使用ctypes模块调用windowsapi获取系统版本示例

    这篇文章主要介绍了python使用ctypes模块调用windowsapi获取系统版本示例,需要的朋友可以参考下
    2014-04-04
  • 通过shell+python实现企业微信预警

    通过shell+python实现企业微信预警

    这篇文章主要介绍了通过shell+python实现企业微信预警,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-03-03
  • Python 识别12306图片验证码物品的实现示例

    Python 识别12306图片验证码物品的实现示例

    这篇文章主要介绍了Python 识别12306图片验证码物品的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01

最新评论