springboot使用@KafkaListener监听多个kafka配置实现

 更新时间:2024年04月09日 09:36:45   作者:道不平  
当服务中需要监听多个kafka时, 需要配置多个kafka,本文主要介绍了springboot使用@KafkaListener监听多个kafka配置实现,具有一定的参考价值,感兴趣的可以了解一下

背景

使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

方案

可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:

1. 导入依赖

        <dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

2. yml配置

kafka:
  # 默认消费者配置
  default-consumer:
    # 自动提交已消费offset
    enable-auto-commit: true
    # 自动提交间隔时间
    auto-commit-interval: 1000
    # 消费的超时时间
    poll-timeout: 1500
    # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
    auto.offset.reset: latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    session.timeout.ms: 120000
    # 消费请求超时时间
    request.timeout.ms: 180000
  # 1号kafka配置
  test1:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  # 2号kafka配置
  test2:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx

3. 容器工厂配置

package com.zhdx.modules.backstage.config;

import com.google.common.collect.Maps;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;

/**
 * kafka监听容器工厂配置
 * <p>
 * 拓展其他消费者配置只需配置指定的属性和bean即可
 */
@EnableKafka
@Configuration
@RefreshScope
public class KafkaListenerContainerFactoryConfig {

    /**
     *  test1 kafka配置
     */
    @Value("${kafka.test1.bootstrap-servers}")
    private String test1KafkaServerUrls;

    @Value("${kafka.test1.consumer.group-id}")
    private String test1GroupId;

    @Value("${kafka.test1.consumer.sasl.mechanism}")
    private String test1SaslMechanism;

    @Value("${kafka.test1.consumer.security.protocol}")
    private String test1SecurityProtocol;

    @Value("${kafka.test1.consumer.sasl.jaas.config}")
    private String test1SaslJaasConfig;
    /**
     *  test2 kafka配置
     */
    @Value("${kafka.test2.bootstrap-servers}")
    private String test2KafkaServerUrls;

    @Value("${kafka.test2.consumer.group-id}")
    private String test2GroupId;

    @Value("${kafka.test2.consumer.sasl.mechanism}")
    private String test2SaslMechanism;

    @Value("${kafka.test2.consumer.security.protocol}")
    private String test2SecurityProtocol;

    @Value("${kafka.test2.consumer.sasl.jaas.config}")
    private String test2SaslJaasConfig;

    /**
     * 默认消费者配置
     */
    @Value("${kafka.default-consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${kafka.default-consumer.poll-timeout}")
    private int pollTimeout;

    @Value("${kafka.default-consumer.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${kafka.default-consumer.session.timeout.ms}")
    private int sessionTimeoutMs;

    @Value("${kafka.default-consumer.request.timeout.ms}")
    private int requestTimeoutMs;

    /**
     * test1消费者配置
     */
    public Map<String, Object> test1ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);
        props.put("security.protocol", test1SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);
        return props;
    }
    
    /**
     * test2消费者配置
     */
    public Map<String, Object> test2ConsumerConfigs() {
        Map<String, Object> props = getDefaultConsumerConfigs();
        // broker server地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);
        // 加密
        props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);
        props.put("security.protocol", test2SecurityProtocol);
        // 账号密码
        props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);
        return props;
    }

    /**
     * 默认消费者配置
     */
    private Map<String, Object> getDefaultConsumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        // 自动提交(按周期)已消费offset 批量消费下设置false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        // 消费请求超时时间
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    /**
     * 消费者工厂类
     */
    public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(
        Map<String, Object> consumerConfigs) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(initConsumerFactory(consumerConfigs));
        // 是否开启批量消费
        factory.setBatchListener(false);
        // 消费的超时时间
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        return factory;
    }

    /**
     * 创建test1 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test1KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
    

    /**
     * 创建test2 Kafka 监听容器工厂。
     *
     * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
     */
    @Bean(name = "test2KafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {
        Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();
        return initKafkaListenerContainerFactory(consumerConfigs);
    }
}

4. @KafkaListener使用

package com.zhdx.modules.backstage.kafka;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka监听器
 */
@Slf4j
@Component
public class test1KafkaListener {
    @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
    public void handleHyPm(ConsumerRecord<String, String> record) {
        log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));
    }
}

到此这篇关于springboot使用@KafkaListener监听多个kafka配置实现的文章就介绍到这了,更多相关springboot 监听多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • JAVA Frame 窗体背景图片,首位相接滚动代码实例

    JAVA Frame 窗体背景图片,首位相接滚动代码实例

    这篇文章主要介绍了JAVA Frame 窗体背景图片,首位相接滚动代码示例,需要的朋友可以参考下复制代码
    2017-04-04
  • Java数据库连接池c3p0过程解析

    Java数据库连接池c3p0过程解析

    这篇文章主要介绍了Java数据库连接池c3p0过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Java怎么获取多网卡本地ip

    Java怎么获取多网卡本地ip

    java获取本地ip,获取多网卡本地ip在项目中经常会用到,下面小编把代码分享到脚本之家平台,供大家参考
    2016-03-03
  • Java 中的内存映射 mmap

    Java 中的内存映射 mmap

    这篇文章主要介绍了Java 中的内存映射,mmap 是一种内存映射文件的方法,即将一个文件映射到进程的地址空间,实现文件磁盘地址和一段进程虚拟地址的映射,下面来看看详细内容,需要的朋友可以参考一下
    2021-11-11
  • Springboot项目快速实现拦截器功能

    Springboot项目快速实现拦截器功能

    上一篇文章介绍了Springboot项目如何快速实现过滤器功能,本篇文章接着来盘一盘拦截器,仔细研究后会发现,其实拦截器和过滤器的功能非常类似,可以理解为面向切面编程的一种具体实现。感兴趣的小伙伴可以参考阅读
    2023-03-03
  • Spring Aop 如何获取参数名参数值

    Spring Aop 如何获取参数名参数值

    这篇文章主要介绍了Spring Aop 如何获取参数名参数值的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Java String字符串内容实现添加双引号

    Java String字符串内容实现添加双引号

    这篇文章主要介绍了Java String字符串内容实现添加双引号,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • 关于JpaRepository的关联查询和@Query查询

    关于JpaRepository的关联查询和@Query查询

    这篇文章主要介绍了JpaRepository的关联查询和@Query查询,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • MyBatis之关于动态SQL解读

    MyBatis之关于动态SQL解读

    这篇文章主要介绍了MyBatis之关于动态SQL解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • MyBatis执行Sql的流程实例解析

    MyBatis执行Sql的流程实例解析

    这篇文章主要介绍了MyBatis执行Sql的流程实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12

最新评论