springboot项目配置多个kafka的示例代码
更新时间:2023年04月24日 14:44:41 作者:CccccDi
这篇文章主要介绍了springboot项目配置多个kafka,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.配置文件相关信息
kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false3.kafka配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Value("${kafka.consumer.enable.auto.commit}")
private String autoCommit;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${kafka.consumer.group.id.pic}")
private String groupIdPic;
@Value("${kafka.consumer.concurrency.pic}")
private int concurrencyPic;
@Value("${kafka.consumer.enable.auto.commit.pic}")
private String autoCommitPic;
@Value("${kafka.bootstrap-servers.pic}")
private String bootstrapServerPic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
String bootstrapServers = bootstrapServer;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactoryPic() {
String bootstrapServers = bootstrapServerPic;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryPic());
factory.setConcurrency(concurrencyPic);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}4.消费主题消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
String jsonString = message.value();
if (StringUtils.isNoneBlank(jsonString)) {
log.info("消费:{}",jsonString);
//TODO ....
}
} catch (Exception e) {
log.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
if (StringUtils.isNoneBlank(message.value())) {
//TODO ....
}
} catch (Exception e) {
logger.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}到此这篇关于springboot项目配置多个kafka的文章就介绍到这了,更多相关springboot配置多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
SpringBoot+Spring Data JPA整合H2数据库的示例代码
H2数据库是一个开源的关系型数据库,本文重点给大家介绍SpringBoot+Spring Data JPA整合H2数据库的示例代码,感兴趣的朋友跟随小编一起看看吧2022-02-02
springboot docker jenkins 自动化部署并上传镜像的步骤详解
这篇文章主要介绍了springboot docker jenkins 自动化部署并上传镜像的相关资料,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-05-05
SpringBoot普通类获取spring容器中bean的操作
这篇文章主要介绍了SpringBoot普通类获取spring容器中bean的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2020-09-09
SpringBoot+JavaMailSender实现腾讯企业邮箱配置
这篇文章主要介绍了SpringBoot+JavaMailSender实现腾讯企业邮箱配置,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2021-04-04


最新评论