spring-kafka使消费者动态订阅新增的topic问题

 更新时间:2022年12月27日 11:31:56   作者:DayDayUp丶  
这篇文章主要介绍了spring-kafka使消费者动态订阅新增的topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、前言

在Java中使用kafka,方式很多,例如:

  • 直接使用kafka-clients这类原生的API;
  • 也可以使用Spring对其的包装API,即spring-kafka,同其它包装API一样(如JdbcTemplate、RestTemplate、RedisTemplate等等),KafkaTemplate是其生产者核心类,KafkaListener是其消费者核心注解;
  • 也有包装地更加抽象的SpringCloudStream等。

这里讨论的话题是,如何在spring-kafka中,使得一个消费者可以动态订阅新增的topic?

本文不讨论利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式来达到目的,这种方式有点杀鸡用牛刀,也会增加系统复杂度和维护成本。

我的环境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0单节点。

二、需求分析

上面已经提到,spring-kafka通过 @KafkaListener 的方式配置订阅的topic,最常用的属性可能是 topics,而要实现本文的需求,就要使用另一个属性 topicPattern,查看它的属性说明:

The topic pattern for this listener. 
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'. 
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions. 
The pattern matching will be performedperiodically against topics existing at the time of check. 
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported). 

将其翻译过来:

此侦听器的主题模式。条目可以是“主题模式”,“属性占位符键”或“表达式”。
该框架将创建一个容器,该容器订阅与指定模式匹配的所有主题以获取动态分配的分区。
模式匹配将针对检查时存在的主题【定期执行】。
表达式必须解析为主题模式(支持字符串或模式结果类型)。

注意:从说明信息来看,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。

下面列出消费端的核心测试代码:

@Component
public class SinkConsumer {
    @KafkaListener(topicPattern = "test_topic2.*")
    public void listen2(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

代码实现很简洁,就是期待我们新增一个符合 topicPattern 的topic后,spring-kafka能否自动为新建的topic分配到此目标消费者。

三、测试运行

3.1 启动消费者服务

配置文件中,spring该配的配,kafka该配的配,接着启动即可。

3.2 新建topic

新建 test_topic2_3,刚创建完不能立刻分配到目标消费者,从 topicPattern 的注释得知spring-kafka会定期扫描topic列表,我们要给它几分钟等待扫描到新topic,并为它成功分配到目标消费者后,再去发送第一条消息(所以可以先去洗个手,此时19:02)。

3.3 等待topic被分配到消费者

洗手期间的控制台日志提示:已为新建的 test_topic2_3 分配到我们的目标消费者,并将offset设置到起始位置0,日志如下:

2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]

3.4 发送第一条消息

洗手完毕,看到3.3小节里的日志,然后确认成功分配到目标消费者,且offset被设为0之后,发送第一条消息【我是第1个test_topic2_3的消息】,控制台日志打印出此消息信息,代表成功消费:

topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1个test_topic2_3的消息"} 

3.5 注意事项

若不等到offset被设为0之后,过早发送消息,则会在消费端丢失过早发送的消息,并且当spring-kafka自动设置offset的时候,日志提示,offset被设置为1,而不是起始位置0:

INFO o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.

在上面的3.1至3.4的整个过程中,可能会日志警告,代表暂时不能为新增的topic分配到目标消费者:

WARN o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3] 

所以只需等待日志提示可以成功分配到目标消费者,且offset被设为0之后,即可发送第一条消息。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • mybatis多个区间处理方式(双foreach循环)

    mybatis多个区间处理方式(双foreach循环)

    这篇文章主要介绍了mybatis多个区间处理方式(双foreach循环),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • 关于Java反射给泛型集合赋值问题

    关于Java反射给泛型集合赋值问题

    这篇文章主要介绍了Java反射给泛型集合赋值,需要的朋友可以参考下
    2022-01-01
  • Java开启线程的四种方法案例详解

    Java开启线程的四种方法案例详解

    这篇文章主要介绍了Java开启线程的四种方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-02-02
  • SpringBoot中使用tkMapper的方法详解

    SpringBoot中使用tkMapper的方法详解

    这篇文章主要介绍了SpringBoot中使用tkMapper的方法详解
    2022-11-11
  • java简单实现多线程及线程池实例详解

    java简单实现多线程及线程池实例详解

    这篇文章主要为大家详细介绍了java简单实现多线程,及java爬虫使用线程池实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-03-03
  • Java Scanner对象中hasNext()与next()方法的使用

    Java Scanner对象中hasNext()与next()方法的使用

    这篇文章主要介绍了Java Scanner对象中hasNext()与next()方法的使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • Java反射中java.beans包学习总结

    Java反射中java.beans包学习总结

    本篇文章通过学习Java反射中java.beans包,吧知识点做了总结,并把相关内容做了关联,对此有需要的朋友可以学习参考下。
    2018-02-02
  • 详解Java多线程和IO流的应用

    详解Java多线程和IO流的应用

    这篇文章主要介绍了详解Java多线程和IO流的应用,无论是本地文件复制,还是网络多线程下载,对于流的使用都是一样的,需要的朋友可以参考下
    2023-04-04
  • 使用maven自定义插件开发

    使用maven自定义插件开发

    这篇文章主要介绍了使用maven自定义插件开发,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • Java8之Lambda表达式使用解读

    Java8之Lambda表达式使用解读

    这篇文章主要介绍了Java8之Lambda表达式使用解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11

最新评论