springboot+kafka中@KafkaListener动态指定多个topic问题

 更新时间:2022年12月27日 11:35:56   作者:Forward233  
这篇文章主要介绍了springboot+kafka中@KafkaListener动态指定多个topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

说明

本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener

首先,application.properties中配置用逗号隔开的多个topic。

方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)

运行程序,console打印的效果如下:

因为只开了一条消费者线程,所以所有的topic和分区都分配给这条线程。

如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和)

运行程序,console打印的效果如下:

总结一下大家问的最多的一个问题

如何在程序运行的过程中,改变topic,消费者能够消费修改后的topic?

ans: 经过尝试,使用@KafkaListener注解实现不了此需求,在程序启动的时候,程序就会根据@KafkaListener的注解信息初始化好消费者去消费指定好的topic。如果在程序运行的过程中,修改topic,不会让此消费者修改消费者的配置再重新订阅topic的。

不过我们可以有个折中的办法,就是利用@KafkaListener的topicPattern参数来进行topic匹配。

具体如何操作的可以看下这篇文章:

https://www.jb51.net/article/271098.htm

终极方法

思路

不使用@KafkaListener,使用kafka原生客户端依赖,手动初始化消费者,开启消费者线程。

在消费者线程中,每次循环都从配置、数据库或者其他配置源获取最新的topic信息,与之前的topic比较,如果发生变化,重新订阅topic或者初始化消费者。

实现

加入kafka客户端依赖(本次测试服务端kafka版本:2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>

代码

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}

说一下第72行代码:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

上面这行代码表示:在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回。

在修改topic后,必须等到此次poll拉取的消息处理完,while(true)循环的时候检测topic发生变化,才能重新订阅topic.

poll()方法一次拉取得消息数默认为:500,如下图,kafka客户端源码中设置的。

如果想自定义此配置,可在初始化消费者时加入

运行结果(测试的topic中都无数据)

注意:KafkaConsumer是线程不安全的,不要用一个KafkaConsumer实例开启多个消费者,要开启多个消费者,需要new 多个KafkaConsumer实例。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 深入分析Comparable与Comparator及Clonable三个Java接口

    深入分析Comparable与Comparator及Clonable三个Java接口

    接口不是类,而是对类的一组需求描述,这些类要遵从接口描述的统一格式进行定义,这篇文章主要为大家详细介绍了Java的Comparable,Comparator和Cloneable的接口,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-05-05
  • 详解Java token主流框架之JWT

    详解Java token主流框架之JWT

    JWT(JSON Web Token)是一种基于JSON格式的轻量级的、用于身份认证的开放标准,它通过在用户和服务器之间传递一个安全的、可靠的、独立的JSON对象来进行身份验证和授权,本文将详细给大家介绍Java token主流框架之JWT,需要的朋友可以参考下
    2023-05-05
  • java分割字符串多种方法(附例子)

    java分割字符串多种方法(附例子)

    这篇文章主要给大家介绍了关于java分割字符串多种方法的相关资料,Java中有多种方法可以实现字符串分割,文中将每张方法都给出了代码示例,需要的朋友可以参考下
    2023-10-10
  • 实例讲解Java基础之反射

    实例讲解Java基础之反射

    今天小编就为大家分享一篇关于实例讲解Java基础之反射,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • springboot使用小工具之Lombok、devtools、Spring Initailizr详解

    springboot使用小工具之Lombok、devtools、Spring Initailizr详解

    这篇文章主要介绍了springboot使用小工具之Lombok、devtools、Spring Initailizr详解,Lombok可以代替手写get、set、构造方法等,需要idea装插件lombok,本文通过示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-10-10
  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    详解RabbitMQ中死信队列和延迟队列的使用详解

    这篇文章主要为大家介绍了RabbitMQ中死信队列和延迟队列的原理与使用,这也是Java后端面试中常见的问题,感兴趣的小伙伴可以了解一下
    2022-05-05
  • java获取json中的全部键值对实例

    java获取json中的全部键值对实例

    下面小编就为大家分享一篇java获取json中的全部键值对实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-03-03
  • idea复制module(项目)并在一个窗口展示的教程详解

    idea复制module(项目)并在一个窗口展示的教程详解

    这篇文章主要介绍了idea复制module(项目)并在一个窗口展示的方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • SpringBoot3集成Kafka的方法详解

    SpringBoot3集成Kafka的方法详解

    Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,下面我们就来看看SpringBoot3是如何集成Kafka的吧
    2023-08-08
  • 聊聊Java和CPU的关系

    聊聊Java和CPU的关系

    java和cpu关系不大,但是也有点关系,下面我们来聊一聊java和cpu的关系,感兴趣的朋友一起看看吧
    2016-08-08

最新评论