RocketMq同组消费者如何自动设置InstanceName

 更新时间:2024年06月18日 08:55:46   作者:曾令胜  
这篇文章主要介绍了RocketMq同组消费者如何自动设置InstanceName问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、背景

同组多于1个消费者,如果没单独设置instanceName,默认为DEFAULT。

启动时会报如下错误:

org.apache.rocketmq.client.exception.MQClientException: The consumer group[group_03] has been created before, specify another name please.

二、处理方法

创建MqBeanPost,利用后置处理器获取到想要设置的bean,把instanceName设置成随机数。

@Component
public class MqBeanPost implements BeanPostProcessor {
    @Autowired
    MqJudgePacsConfig mqJudgePacsConfig;
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){
            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
            String topic = container.getTopic();
            if(topic.equals(mqJudgePacsConfig.getTopic())){
                DefaultMQPushConsumer consumer =   container.getConsumer();
                consumer.setInstanceName(UUID.fastUUID().toString());
            }
        }
        return bean;
    }
}

三、源码分析

一、springboot整合rocketmq启动流程:

(1)SpringBootApplication

(2)@EnableAutoConfiguration

(3)AutoConfigurationImportSelector实现了ImportSelector 接口,所以执行selectImports方法

  • ->getAutoConfigurationEntry
  • ->getCandidateConfigurations
  • ->SpringFactoriesLoader.loadFactoryNames
  • ->loadSpringFactories此方法会读取所有META-INF/spring.factories文件,转成Map<String, List>,最后getOrDefault(factoryTypeName, Collections.emptyList())获取key 为org.springframework.boot.autoconfigure.EnableAutoConfiguration的值为需要加载到容器类的全类名的集合。

(4)rocketmq和springboot整合jar中spring.factories位置。

(5)RocketMQAutoConfiguration中@import注入ListenerContainerConfiguration。

ListenerContainerConfiguration 实现了SmartInitializingSingleton类所以当spring容器创建ListenerContainerConfiguration是会进入afterSingletonsInstantiated方法。

(6)此方法中,获取带有RocketMQMessageListener注解类的集合,遍历执行registerContainer。

public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}

(7)重点分析一下画红的代码。createRocketMQListenerContainer方法是获取注解中的属性,创建出DefaultRocketMQListenerContainer对象。最后注册到容器。

(8)DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);

spring容器创建DefaultRocketMQListenerContainer对象。

创建对象的流程不在赘述(可以翻看我以前博客创建对象流程)。

主要分析后置处理器。使用后置处理器来处理instanceName。

(9)invokeInitMethods方法中,执行afterPropertiesSet初始化方法。

invokeInitMethods方法执行前会调用applyBeanPostProcessorsBeforeInitialization,方法执行后会调用applyBeanPostProcessorsAfterInitialization。

(10)DefaultRocketMQListenerContainer 实现了InitializingBean所以在执行初始化方法时,调用afterPropertiesSet,然后继续调用initRocketMQPushConsumer 

(11)initRocketMQPushConsumer 方法会创建DefaultMQPushConsumer对象,默认的instanceName就是在此创建。

所以如果想给DefaultMQPushConsumer设置instanceName,就可以在applyBeanPostProcessorsAfterInitialization中设置。

为何不能在applyBeanPostProcessorsBeforeInitialization执行的时候,因为DefaultMQPushConsumer还未创建。

(12)拓展:initRocketMQPushConsumer 中画红的地方。

如果消费端实现了RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener类的话,可以重写prepareStart方法。

在prepareStart方法中设置instanceName。

但是这种方法如果有多个消费端的话,要写多次。

四、总结

此方法的切入点是DefaultRocketMQListenerContainer类创建过程中,使用后置处理器设置instanceName。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • ThreadLocal原理及内存泄漏原因

    ThreadLocal原理及内存泄漏原因

    这篇文章主要介绍了ThreadLocal原理及内存泄漏原因,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • springboot跨域问题解决方案

    springboot跨域问题解决方案

    这篇文章主要介绍了springboot跨域问题解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • 零基础学Java:Java开发工具 Eclipse 安装过程创建第一个Java项目及Eclipse的一些基础使用技巧

    零基础学Java:Java开发工具 Eclipse 安装过程创建第一个Java项目及Eclipse的一些基础使用技巧

    这篇文章主要介绍了零基础学Java:Java开发工具 Eclipse 安装过程创建第一个Java项目及Eclipse的一些基础使用技巧,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • Java 生成任意长度的验证码过程解析

    Java 生成任意长度的验证码过程解析

    这篇文章主要介绍了Java 生成任意长度的验证码过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • Java mysql详细讲解双数据源配置使用

    Java mysql详细讲解双数据源配置使用

    在开发过程中我们常常会用到两个数据库,一个数据用来实现一些常规的增删改查,另外一个数据库用来实时存数据。进行数据的统计分析。可以读写分离。可以更好的优化和提高效率;或者两个数据存在业务分离的时候也需要多个数据源来实现
    2022-06-06
  • SpringBoot中的@EnableConfigurationProperties注解原理及用法

    SpringBoot中的@EnableConfigurationProperties注解原理及用法

    在SpringBoot中,@EnableConfigurationProperties注解是一个非常有用的注解,它可以用于启用对特定配置类的支持,在本文中,我们将深入探讨@EnableConfigurationProperties注解,包括它的原理和如何使用,需要的朋友可以参考下
    2023-06-06
  • Java中throw和throws异常处理完整例子说明

    Java中throw和throws异常处理完整例子说明

    这篇文章主要给大家介绍了关于Java中throw和throws异常处理的相关资料, throw关键字是用于在方法内抛出异常,而throws关键字是在方法声明中指定可能抛出的异常,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-06-06
  • Java中Dijkstra算法求解最短路径的实现

    Java中Dijkstra算法求解最短路径的实现

    Dijkstra算法是一种解决最短路径问题的常用算法,本文主要介绍了Java中Dijkstra算法求解最短路径的实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-09-09
  • MyBatis使用接口映射的方法步骤

    MyBatis使用接口映射的方法步骤

    映射器是MyBatis中最核心的组件之一,本文主要介绍了MyBatis使用接口映射的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-07-07
  • SpringBoot中的异常处理与参数校验的方法实现

    SpringBoot中的异常处理与参数校验的方法实现

    这篇文章主要介绍了SpringBoot中的异常处理与参数校验的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-04-04

最新评论