SpringBoot实现kafka多源配置的示例代码
更新时间:2024年06月06日 10:42:53 作者:it噩梦
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置,本文介绍了SpringBoot实现kafka多源配置,需要的朋友可以参考下
背景
实际开发中,不同的topic可能来自不同的集群,所以就需要配置不同的kafka数据源,基于springboot自动配置的思想,最终通过配置文件的配置,自动生成生产者及消费者的配置。
核心配置
自动化配置类
import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
public MyKafkaAutoConfiguration() {
}
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
beanFactory.getBean(CustomKafkaDataSourceRegister.class);
}
}
注册生产者、消费者核心bean到spring
public void afterPropertiesSet() {
Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
if (factories != null && !factories.isEmpty()) {
factories.forEach((factoryName, consumerConfig) -> {
KafkaProperties.Listener listener = consumerConfig.getListener();
Integer concurrency = consumerConfig.getConcurrency();
// 创建监听容器工厂
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
// 注册到容器
if (!beanFactory.containsBean(factoryName)) {
beanFactory.registerSingleton(factoryName, containerFactory);
}
});
}
Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
if (!ObjectUtils.isEmpty(templates)) {
templates.forEach((templateName, producerConfig) -> {
//registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
//注册spring bean的两种方式
registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
});
}
}
配置spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.kafka.MyKafkaAutoConfiguration
yml配置
spring:
kafka:
multiple:
consumer:
factories:
test-factory:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap-servers: 192.168.56.112:9092
group-id: group_a
concurrency: 25
fetch-min-size: 1048576
fetch-max-wait: 3000
listener:
type: batch
properties:
spring-json-trusted-packages: '*'
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
producer:
templates:
test-template:
bootstrap-servers: 192.168.56.112:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
使用


到此这篇关于SpringBoot实现kafka多源配置的示例代码的文章就介绍到这了,更多相关SpringBoot kafka多源配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java Calendar类常用示例_动力节点Java学院整理
从JDK1.1版本开始,在处理日期和时间时,系统推荐使用Calendar类进行实现。接下来通过实例代码给大家详细介绍Java Calendar类相关知识,需要的朋友参考下吧2017-04-04
线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录
系统相关使用人员反馈系统故障,这篇文章主要介绍了线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决的记录,有需要的朋友可以借鉴参考下2022-03-03
客户端Socket与服务端ServerSocket串联实现网络通信
这篇文章主要为大家介绍了客户端Socket与服务端ServerSocket串联实现网络通信的内容详解,有需要的朋友可以借鉴参考下,希望能够有所帮助2022-03-03


最新评论