kafka运维consumer-groups.sh消费者组管理

 更新时间:2022年11月11日 10:32:25   作者:石臻臻的杂货铺  
这篇文章主要为大家介绍了kafka运维consumer-groups.sh消费者组管理,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

消费者组管理 kafka-consumer-groups.sh

1. 查看消费者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list 

先调用MetadataRequest拿到所有在线Broker列表 再给每个Broker发送ListGroupsRequest请求获取 消费者组数据

2. 查看消费者组详情--describe

DescribeGroupsRequest

查看消费组详情--group 或 --all-groups

查看指定消费组详情--group sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group

查看所有消费组详情--all-groups sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups 查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息

查询消费者成员信息--members

所有消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090

指定消费组成员信息 sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查询消费者状态信息--state

所有消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090

指定消费组状态信息 sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 删除消费者组--delete

DeleteGroupsRequest

删除消费组--delete

删除指定消费组--group sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090

删除所有消费组--all-groups sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消费组的偏移量 --reset-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果; 等你想真正执行的时候请换成参数--excute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group

重置指定消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置指定消费组的指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消费组的偏移量 --all-group

重置所有消费组的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置所有消费组中指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相关重置Offset的模式

参数描述例子
--to-earliest :重置offset到最开始的那条offset(找到还未被删除最早的那个offset) 
--to-current:直接重置offset到当前的offset,也就是LOE 
--to-latest:重置到最后一个offset 
--to-datetime:重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss;--to-datetime "2021-6-26T00:00:00.000"
--to-offset重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个--to-offset 3465
--shift-by按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的;--shift-by 100 、--shift-by -100
--from-file根据CVS文档来重置; 这里下面单独讲解 

--to-offset 例子

--to-offset 3465

--from-file着重讲解一下

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;

先配置cvs文档 格式为: Topic:分区号: 重置目标偏移量

test2,0,100
test2,1,200
test2,2,300

执行命令

sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量delete-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2

相关可选参数

参数描述例子
--bootstrap-server指定连接到的kafka服务;--bootstrap-server localhost:9092
--list列出所有消费组名称--list
--describe查询消费者描述信息--describe
--group指定消费组 
--all-groups指定所有消费组 
--members查询消费组的成员信息 
--state查询消费者的状态信息 
--offsets在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的; 
dry-run重置偏移量的时候,使用这个参数可以让你预先看到重置情况,这个时候还没有真正的执行,真正执行换成--excute;默认为dry-run 
--excute真正的执行重置偏移量的操作; 
--to-earliest将offset重置到最早 
to-latest将offset重置到最近

以上就是kafka运维consumer-groups.sh消费者组管理的详细内容,更多关于kafka运维consumer groups sh的资料请关注脚本之家其它相关文章!

相关文章

  • 浅析如何在Java应用中优雅的发送短信

    浅析如何在Java应用中优雅的发送短信

    很多业务场景里,我们都需要发送短信,比如登陆验证码、告警、营销通知、节日祝福等等,这篇文章,我们聊聊 Java 应用中如何优雅的发送短信,文中有详细的代码示例供大家参考,需要的朋友可以参考下
    2023-11-11
  • MybatisPlus出现Error attempting to get column ‘xxx字段‘ from result set异常解决

    MybatisPlus出现Error attempting to get col

    本文重点分析使用@EnumValue注解转换时遇到的一下错误原因,及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-11-11
  • Java中Jedis基本使用

    Java中Jedis基本使用

    Redis的Java实现的客户端,本文主要介绍了Java中Jedis基本使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • Java实现邮箱找回密码实例代码

    Java实现邮箱找回密码实例代码

    本篇文章主要介绍了Java实现邮箱找回密码实例代码,可以通过邮箱找回丢失密码,具有一定的参考价值,有需要的可以了解一下。
    2016-11-11
  • 指定springboot的jar运行内存方式

    指定springboot的jar运行内存方式

    这篇文章主要介绍了指定springboot的jar运行内存方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • IDEA使用技巧之如何将本地项目和git远程项目关联

    IDEA使用技巧之如何将本地项目和git远程项目关联

    这篇文章主要介绍了IDEA使用技巧之如何将本地项目和git远程项目关联问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • Java编程实现基于TCP协议的Socket聊天室示例

    Java编程实现基于TCP协议的Socket聊天室示例

    这篇文章主要介绍了Java编程实现基于TCP协议的Socket聊天室,结合实例形式详细分析了java基于TCP协议的Socket聊天室客户端与服务器端相关实现与使用技巧,需要的朋友可以参考下
    2018-01-01
  • spring动态控制定时任务的实现

    spring动态控制定时任务的实现

    在实际项目中,经常需要动态的控制定时任务,比如通过接口增加、启动、停止、删除定时任务,本文主要介绍了spring动态控制定时任务的实现,感兴趣的可以了解一下
    2024-01-01
  • Mybatis-Plus使用saveOrUpdate及问题解决方法

    Mybatis-Plus使用saveOrUpdate及问题解决方法

    本文主要介绍了Mybatis-Plus使用saveOrUpdate及问题解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • java中对象和JSON格式的转换方法代码

    java中对象和JSON格式的转换方法代码

    JSON格式可以轻松地以面向对象的方式转换为Java对象,下面这篇文章主要给大家介绍了关于java中对象和JSON格式的转换方法,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2023-12-12

最新评论