spring-Kafka中的@KafkaListener深入源码解读

 更新时间:2023年02月20日 09:45:33   作者:柏油  
本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程,从spring启动开始处理@KafkaListener,本文结合实例流程图给大家讲解的非常详细,需要的朋友参考下

前言

本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

一、总体流程

从spring启动开始处理@KafkaListener,到start消息监听整体流程图

二、源码解读

1、postProcessAfterInitialization

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
		    Class<?> targetClass = AopUtils.getTargetClass(bean);
		    
		    // 扫描@KafkaListener注解
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			
			......
			
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					// 遍历扫描到的所有@KafkaListener注解并开始处理
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			// 处理在类上的@KafkaListener注解
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

1.1、processKafkaListener

KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

1.2、processListener

KafkaListenerAnnotationBeanPostProcessor#processListener

将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();

        ......
      
        // 注册已经封装好的消费端-endpoint
		this.registrar.registerEndpoint(endpoint, factory);
		
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}

1.3、registerEndpoint

KafkaListenerEndpointRegistrar#registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
		
	    ......
		
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
		    // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动)
			if (this.startImmediately) { // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
			    // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {

        ......
        
		synchronized (this.listenerContainers) {
		
			......
			
			// 1.创建消息监听容器
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
            
            // 2.是否立即启动消息监听
			if (startImmediately) {
				startIfNecessary(container);
			}
		}
	}

1.4、startIfNecessary

KafkaListenerEndpointRegistry#startIfNecessary
启动消息监听

	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		    // 启动消息监听
		    // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑
		    // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑
			listenerContainer.start();
		}
	}

2、afterSingletonsInstantiated

这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

        // 对"注册员"信息的完善
		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		......

		// Actually register all listeners
		// 整个方法这里才是关键
		// 创建MessageListenerContainer并注册
		this.registrar.afterPropertiesSet();
	}

2.1、afterPropertiesSet

KafkaListenerEndpointRegistrar#afterPropertiesSet

	public void afterPropertiesSet() {
		registerAllEndpoints();
	}

2.2、registerAllEndpoints

KafkaListenerEndpointRegistrar#registerAllEndpoints

	protected void registerAllEndpoints() {
		synchronized (this.endpointDescriptors) {
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
			    // 这里是真正的创建ListenerContainer监听对象并注册
				this.endpointRegistry.registerListenerContainer(
						descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			// 启动时所有消息监听对象都注册之后,便将参数置为true
			this.startImmediately = true;  // trigger immediate startup
		}
	}

总结

以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

调试及相关源码版本:

org.springframework.boot::2.3.3.RELEASE
spring-kafka:2.5.4.RELEASE

相关参考:

spring-kafka官方文档
spring容器之refresh方法

到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot整合RabbitMQ实现延迟队列的示例详解

    SpringBoot整合RabbitMQ实现延迟队列的示例详解

    这篇文章主要为大家详细介绍了SpringBoot如何整合RabbitMQ实现延迟队列,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的可以了解一下
    2023-04-04
  • SpringSecurity6.4中一次性令牌登录(One-Time Token Login)实现

    SpringSecurity6.4中一次性令牌登录(One-Time Token Login)实现

    Spring Security为一次性令牌认证提供了支持,本文就来介绍一下SpringSecurity6.4中一次性令牌登录(One-Time Token Login)实现,具有一定的参考价值,感兴趣的可以了解一下
    2025-03-03
  • Java并发编程之Fork/Join框架的理解

    Java并发编程之Fork/Join框架的理解

    今天带大家学习Java并发编程的相关知识,文中对Fork/Join框架作了非常详细的介绍,对正在学习有关知识的小伙伴们很有帮助,需要的朋友可以参考下
    2021-06-06
  • Java实现斗地主案例

    Java实现斗地主案例

    这篇文章主要为大家详细介绍了Java实现斗地主案例,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-04-04
  • 使用@NonNull注解遇到的小问题及解决

    使用@NonNull注解遇到的小问题及解决

    这篇文章主要介绍了使用@NonNull注解遇到的小问题及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • java如何遍历对象中的所有属性(字段)和类型

    java如何遍历对象中的所有属性(字段)和类型

    这篇文章主要介绍了java如何遍历对象中的所有属性(字段)和类型问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • java数据结构与算法之简单选择排序详解

    java数据结构与算法之简单选择排序详解

    这篇文章主要介绍了java数据结构与算法之简单选择排序,结合实例形式分析了选择排序的原理、实现方法与相关操作技巧,需要的朋友可以参考下
    2017-05-05
  • Java实现简易图书借阅系统

    Java实现简易图书借阅系统

    这篇文章主要为大家详细介绍了Java实现简易图书借阅系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • idea安装hsdis的方法

    idea安装hsdis的方法

    这篇文章主要介绍了idea安装hsdis,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • MyBatis按时间排序方式

    MyBatis按时间排序方式

    这篇文章主要介绍了MyBatis按时间排序方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01

最新评论