spring-kafka使消费者动态订阅新增的topic问题

 更新时间:2022年12月27日 11:31:56   作者:DayDayUp丶  
这篇文章主要介绍了spring-kafka使消费者动态订阅新增的topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、前言

在Java中使用kafka,方式很多,例如:

  • 直接使用kafka-clients这类原生的API;
  • 也可以使用Spring对其的包装API,即spring-kafka,同其它包装API一样(如JdbcTemplate、RestTemplate、RedisTemplate等等),KafkaTemplate是其生产者核心类,KafkaListener是其消费者核心注解;
  • 也有包装地更加抽象的SpringCloudStream等。

这里讨论的话题是,如何在spring-kafka中,使得一个消费者可以动态订阅新增的topic?

本文不讨论利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式来达到目的,这种方式有点杀鸡用牛刀,也会增加系统复杂度和维护成本。

我的环境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0单节点。

二、需求分析

上面已经提到,spring-kafka通过 @KafkaListener 的方式配置订阅的topic,最常用的属性可能是 topics,而要实现本文的需求,就要使用另一个属性 topicPattern,查看它的属性说明:

The topic pattern for this listener. 
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'. 
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions. 
The pattern matching will be performedperiodically against topics existing at the time of check. 
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported). 

将其翻译过来:

此侦听器的主题模式。条目可以是“主题模式”,“属性占位符键”或“表达式”。
该框架将创建一个容器,该容器订阅与指定模式匹配的所有主题以获取动态分配的分区。
模式匹配将针对检查时存在的主题【定期执行】。
表达式必须解析为主题模式(支持字符串或模式结果类型)。

注意:从说明信息来看,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。

下面列出消费端的核心测试代码:

@Component
public class SinkConsumer {
    @KafkaListener(topicPattern = "test_topic2.*")
    public void listen2(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

代码实现很简洁,就是期待我们新增一个符合 topicPattern 的topic后,spring-kafka能否自动为新建的topic分配到此目标消费者。

三、测试运行

3.1 启动消费者服务

配置文件中,spring该配的配,kafka该配的配,接着启动即可。

3.2 新建topic

新建 test_topic2_3,刚创建完不能立刻分配到目标消费者,从 topicPattern 的注释得知spring-kafka会定期扫描topic列表,我们要给它几分钟等待扫描到新topic,并为它成功分配到目标消费者后,再去发送第一条消息(所以可以先去洗个手,此时19:02)。

3.3 等待topic被分配到消费者

洗手期间的控制台日志提示:已为新建的 test_topic2_3 分配到我们的目标消费者,并将offset设置到起始位置0,日志如下:

2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]

3.4 发送第一条消息

洗手完毕,看到3.3小节里的日志,然后确认成功分配到目标消费者,且offset被设为0之后,发送第一条消息【我是第1个test_topic2_3的消息】,控制台日志打印出此消息信息,代表成功消费:

topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1个test_topic2_3的消息"} 

3.5 注意事项

若不等到offset被设为0之后,过早发送消息,则会在消费端丢失过早发送的消息,并且当spring-kafka自动设置offset的时候,日志提示,offset被设置为1,而不是起始位置0:

INFO o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.

在上面的3.1至3.4的整个过程中,可能会日志警告,代表暂时不能为新增的topic分配到目标消费者:

WARN o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3] 

所以只需等待日志提示可以成功分配到目标消费者,且offset被设为0之后,即可发送第一条消息。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • maven私服搭建与使用教程

    maven私服搭建与使用教程

    在使用maven进行Java项目的开发过程中,难免会有些公共的私有库,这些库是不太方便放到中央仓库的,可以通过Nexus搭建一个私有仓库,这篇文章主要介绍了maven私服搭建与使用,需要的朋友可以参考下
    2023-03-03
  • Java下http下载文件客户端和上传文件客户端实例代码

    Java下http下载文件客户端和上传文件客户端实例代码

    这篇文章主要介绍了Java下http下载文件客户端和上传文件客户端实例代码,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-12-12
  • Java中的线程安全集合CopyOnWriteArrayList解析

    Java中的线程安全集合CopyOnWriteArrayList解析

    这篇文章主要介绍了Java中的线程安全CopyOnWriteArrayList解析,CopyOnWriteArrayList是ArrayList的线程安全版本,从他的名字可以推测,CopyOnWriteArrayList是在有写操作的时候会copy一份数据,然后写完再设置成新的数据,需要的朋友可以参考下
    2023-12-12
  • 解决Maven依赖冲突的方法

    解决Maven依赖冲突的方法

    本文主要介绍了解决Maven依赖冲突的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • Java创建表格实例详解

    Java创建表格实例详解

    这篇文章主要介绍了Java创建表格实例详解,需要的朋友可以参考下。
    2017-09-09
  • 使用springboot 获取控制器参数的几种方法小结

    使用springboot 获取控制器参数的几种方法小结

    这篇文章主要介绍了使用springboot 获取控制器参数的几种方法小结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • java实现简单网络象棋游戏

    java实现简单网络象棋游戏

    这篇文章主要为大家详细介绍了java实现简单网络象棋游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-12-12
  • 十分钟理解Java中的动态代理

    十分钟理解Java中的动态代理

    十分钟帮助大家理解Java中的动态代理,什么是动态代理?感兴趣的小伙伴们可以参考一下
    2016-06-06
  • java中枚举的详细使用介绍

    java中枚举的详细使用介绍

    本篇文章介绍了,在java中枚举的详细使用。需要的朋友参考下
    2013-04-04
  • java web实现网上手机销售系统

    java web实现网上手机销售系统

    这篇文章主要为大家详细介绍了java web实现网上手机销售系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08

最新评论