基于rocketmq的有序消费模式和并发消费模式的区别说明

 更新时间:2021年06月22日 10:18:59   作者:从心归零  
这篇文章主要介绍了基于rocketmq的有序消费模式和并发消费模式的区别说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

rocketmq消费者注册监听有两种模式

有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。

MessageListenerOrderly

正确消费返回

ConsumeOrderlyStatus.SUCCESS

稍后消费返回

ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently

正确消费返回

ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消费返回

ConsumeConcurrentlyStatus.RECONSUME_LATER

顾名思义,有序消费模式是按照消息的顺序进行消费,但是除此之外,在实践过程中我发现和并发消费模式还有很大的区别的。

第一,速度,下面我打算用实验来探究一下。

使用mq发送消息,消费者使用有序消费模式消费,具体的业务是阻塞100ms

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() { 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        logger.info("==========CONSUME_START===========");  
		logger.info(Thread.currentThread().getName()  
                            + " Receive New Messages: " + msgs.size());  
        try {
        	if(date1 == null)
        		date1 = new Date();//在第一次消费时初始化
        	Thread.sleep(100);
       		logger.info("total:"+(++total));
        	date2 = new Date();
       		totalTime = (date2.getTime() - date1.getTime());
       		logger.info("totalTime:"+totalTime);
            logger.info("==========CONSUME_SUCCESS===========");  
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
            logger.info("==========RECONSUME_LATER===========");  
            logger.error(e.getMessage(),e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}
}

消费100条消息

速度挺快的,为了让结果更准确,将消息加到1000条

消费1000条消息

可以看到每一条消息平均耗时25ms,然而业务是阻塞100ms,这说明有序消费模式和同步消费可能并不是一回事,那如果不阻塞代码我们再来看一下结果

不阻塞过后速度明显提高了,那么我阻塞300ms会怎么样呢?

时间相比阻塞100ms多了2倍

接下来我们测试并发消费模式

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(  
                       List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  
 
    		logger.info(Thread.currentThread().getName()  
                                 + " Receive New Messages: " + msgs.size()); 
    		try {
    			if(date1 == null)
    				date1 = new Date();
            	Thread.sleep(100);
           		logger.info("total:"+(++total));
           		date2 = new Date();
           		totalTime = (date2.getTime() - date1.getTime());
           		logger.info("totalTime:"+totalTime);
                logger.info("==========CONSUME_SUCCESS===========");  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            } catch (Exception e) {
                logger.info("==========RECONSUME_LATER===========");  
                logger.error(e.getMessage(),e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }  
}

基于上次的经验,同样测试三种情况,消费1000条不阻塞,消费1000条阻塞100ms,消费1000条阻塞300ms

消费1000条不阻塞的情况

和有序消费模式差不多,快个一两秒。

消费1000条阻塞100ms

竟然比不阻塞的情况更快,可能是误差把

消费1000条阻塞300ms

速度稍慢,但是还是比有序消费快得多。

结论是并发消费的消费速度要比有序消费更快。

另一个区别是消费失败时的处理不同,有序消费模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消费者会立马消费这条消息,而使用并发消费模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要过好几秒甚至十几秒才会再次消费。

我是在只有一条消息的情况下测试的。更重要的区别是,

返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不会增加消息的消费次数,mq消息有个默认最大消费次数16,消费次数到了以后,这条消息会进入死信队列,这个最大消费次数是可以在mqadmin中设置的。

mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3

我测试后发现,并发模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一个消息到达最大消费次数之后就不会再出现了。这说明有序消费模式可能并没有这个机制,这意味着你再有序消费模式下抛出固定异常,那么这条异常信息将会被永远消费,并且很可能会影响之后正常的消息。下面依然做个试验

Map<String, Integer> map = new HashMap<>();//保存消息错误消费次数
new MessageListenerOrderly() {
 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        try {
        	if(1 == 1)
        			throw new Exception();
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
        	MessageExt msg = msgs.get(0);
			if(map.containsKey(msg.getKeys())) {//消息每消费一次,加1
			    map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
			}else {
			    map.put(msg.getKeys(), 1);
			}
			logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}	
}

发送了十条消息

可以看到虽然我发了十条消息,但是一直在消费同样四条消息,这可能跟消息broker有默认四条队列有关系。同时从时间可以看到,消费失败后,会马上拉这条信息。

至于并发消费模式则不会无限消费,而且消费失败后不会马上再消费。具体的就不尝试了。

结论是有序消费模式MessageListenerOrderly要慎重地处理异常,我则是用全局变量记录消息的错误消费次数,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS。

突然想到之前测试有序消费模式MessageListenerOrderly的时候为什么1000条消息阻塞100ms耗时25000ms了,因为有序消费模式是同时拉取四条队列消息的,这就对上了。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java核心库实现AOP过程

    Java核心库实现AOP过程

    给大家分享一下利用Java核心库实现简单的AOP的经验分享和教学,需要的读者们参考下吧。
    2017-12-12
  • 浅谈关于Java正则和转义中\\和\\\\的理解

    浅谈关于Java正则和转义中\\和\\\\的理解

    这篇文章主要介绍了浅谈关于Java正则和转义中\\和\\\\的理解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • 简单讲解Java的Future编程模式

    简单讲解Java的Future编程模式

    这篇文章主要介绍了Java的Future编程模式,包括对异步和并发的一些设计思维,需要的朋友可以参考下
    2015-11-11
  • Java RandomAccessFile的用法详解

    Java RandomAccessFile的用法详解

    下面小编就为大家带来一篇Java RandomAccessFile的用法详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-06-06
  • SpringMVC Cron定时器Demo常见问题解决方案

    SpringMVC Cron定时器Demo常见问题解决方案

    这篇文章主要介绍了SpringMVC Cron定时器Demo常见问题解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • Android开发在轮播图片上加入点击事件的方法

    Android开发在轮播图片上加入点击事件的方法

    这篇文章主要介绍了Android开发在轮播图片上加入点击事件的方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2016-11-11
  • macOS中搭建Java8开发环境(基于Intel x86 64-bit)

    macOS中搭建Java8开发环境(基于Intel x86 64-bit)

    这篇文章主要介绍了macOS中搭建Java8开发环境(基于Intel x86 64-bit) 的相关资料,需要的朋友可以参考下
    2022-12-12
  • JUnit4 Hamcrest匹配器常用方法总结

    JUnit4 Hamcrest匹配器常用方法总结

    这篇文章主要介绍了JUnit4 Hamcrest匹配器常用方法总结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • Idea集成ApiFox插件及使用小结

    Idea集成ApiFox插件及使用小结

    本文介绍了如何使用Apifox和IntelliJ IDEA插件来整理和生成接口文档,包括配置Apifox和IntelliJ IDEA插件、代码案例以及使用方法,感兴趣的朋友跟随小编一起看看吧
    2024-11-11
  • Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载(替换过程)

    Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载(替换过程)

    Apache 开源项目 Log4j 的远程代码执行漏洞细节被公开,由于 Log4j 的广泛使用,该漏洞一旦被攻击者利用会造成严重危害,下面小编给大家带来了Log4j2 重大漏洞编译好的log4j-2.15.0.jar包下载,感兴趣的朋友一起看看吧
    2021-12-12

最新评论