SpringBoot实现kafka多源配置的示例代码

 更新时间:2024年06月06日 10:42:53   作者:it噩梦  
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置,本文介绍了SpringBoot实现kafka多源配置,需要的朋友可以参考下

背景

实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。

核心配置

自动化配置类

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@Configuration(
        proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
    public MyKafkaAutoConfiguration() {
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        beanFactory.getBean(CustomKafkaDataSourceRegister.class);
    }
}

注册生产者、消费者核心bean到spring

public void afterPropertiesSet() {
        Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
        if (factories != null && !factories.isEmpty()) {
            factories.forEach((factoryName, consumerConfig) -> {
                KafkaProperties.Listener listener = consumerConfig.getListener();
                Integer concurrency = consumerConfig.getConcurrency();
                // 创建监听容器工厂
                ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
                // 注册到容器
                if (!beanFactory.containsBean(factoryName)) {
                    beanFactory.registerSingleton(factoryName, containerFactory);
                }
            });
        }
        Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
        if (!ObjectUtils.isEmpty(templates)) {
            templates.forEach((templateName, producerConfig) -> {
                //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
                //注册spring bean的两种方式
                registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
            });
        }
    }

配置spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:
  kafka:
    multiple:
      consumer:
        factories:
          test-factory:
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            bootstrap-servers: 192.168.56.112:9092
            group-id: group_a
            concurrency: 25
            fetch-min-size: 1048576
            fetch-max-wait: 3000
            listener:
              type: batch
            properties:
              spring-json-trusted-packages: '*'
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        auto-offset-reset: latest
      producer:
        templates:
          test-template:
            bootstrap-servers: 192.168.56.112:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在这里插入图片描述

在这里插入图片描述

到此这篇关于SpringBoot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关SpringBoot kafka多源配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java8中parallelStream性能测试及结果分析

    java8中parallelStream性能测试及结果分析

    本篇文章给大家用代码实例做了segmentfaultjava8中parallelStream性能测试,并对测试结果做了说明,需要的朋友学习下吧。
    2018-01-01
  • Hibernate中5个核心接口知识点整理

    Hibernate中5个核心接口知识点整理

    在本篇文章里小编给大家整理的是一篇关于Hibernate中5个核心接口知识点整理等内容,有兴趣的朋友们跟着学习参考下。
    2021-08-08
  • Java Calendar类常用示例_动力节点Java学院整理

    Java Calendar类常用示例_动力节点Java学院整理

    从JDK1.1版本开始,在处理日期和时间时,系统推荐使用Calendar类进行实现。接下来通过实例代码给大家详细介绍Java Calendar类相关知识,需要的朋友参考下吧
    2017-04-04
  • 线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录

    线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录

    系统相关使用人员反馈系统故障,这篇文章主要介绍了线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决的记录,有需要的朋友可以借鉴参考下
    2022-03-03
  • 客户端Socket与服务端ServerSocket串联实现网络通信

    客户端Socket与服务端ServerSocket串联实现网络通信

    这篇文章主要为大家介绍了客户端Socket与服务端ServerSocket串联实现网络通信的内容详解,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-03-03
  • HttpClient详细使用示例详解

    HttpClient详细使用示例详解

    这篇文章主要介绍了HttpClient详细使用示例详解,本文给大家介绍的非常想详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • dubbo如何设置连接zookeeper权限

    dubbo如何设置连接zookeeper权限

    这篇文章主要介绍了dubbo如何设置连接zookeeper权限问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • 项目为什么引入log4j而不是logback代码

    项目为什么引入log4j而不是logback代码

    这篇文章主要介绍了项目为什么引入log4j而不是logback代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • 在Java8与Java7中HashMap源码实现的对比

    在Java8与Java7中HashMap源码实现的对比

    这篇文章主要介绍了在Java8与Java7中HashMap源码实现的对比,内容包括HashMap 的原理简单介绍、结合源码在Java7中是如何解决hash冲突的以及优缺点,结合源码以及在Java8中如何解决hash冲突,balance tree相关源码介绍,需要的朋友可以参考借鉴。
    2017-01-01
  • Java实现获取服务器资源(内存,负载,磁盘容量)

    Java实现获取服务器资源(内存,负载,磁盘容量)

    这篇文章主要为大家详细介绍了如何Java实现获取服务器资源信息,包括内存,负载,磁盘容量等内容,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-07-07

最新评论