Kafka调试技巧及心得分享

 更新时间:2026年02月12日 09:09:37   作者:程序员Forlan  
Kafka消费组机制确保每个消息只被一个消费者消费,新增机器时根据策略分配分区,本地调试时,修改代码后可能需要重新测试和造消息,但可以通过配置参数控制消费偏移量,线上环境调试可以通过接口拉取处理

基础理念

对于我们有a,b,c,3台机器,那么我们的消息会被消费3次?

  • 可能会,也可能不会,这取决于你的配置和策略。
  • 消费者组机制:Kafka 使用消费组(Consumer Group)来确保每个消息只会被每个消费者组中的一个消费者消费一次。
  • 分区分配:Kafka 主题可以分为多个分区(Partitions),每个分区只能由一个消费者组中的一个消费者消费。

如果新增1台机器,那么他的偏移量从0开始?

在kafka中,消费组都会维护自己的偏移量(offset),以此来记录消费的消息位置,而当新增1台机器时,会根据分区分配策略,比如:范围分配、轮询分配,这就有2种情况,可能加入旧分区,也可能加入新分区,具体可以配置下策略参数auto.offset.reset,对应的值如下:

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

本地调试

所以,当我们在调试kafka消费逻辑的时候,可能由于消费逻辑写的不对,改完代码需要重新测,重新去造1条消息?还是你会怎么去做,了解了前面的原理,我们改消费组是不可行的,他获取的是最新的偏移量,无法实现复用之前造的某条数据,特别是我们有不同逻辑,把每种类型的消息都重新推一波,这不仅麻烦,而且也容易出错,是否可以直接复用之前的消息,准确处理?

我们可以先了解下@KafkaListener里面的一些配置参数,具体如下:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分区", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
	String messageStr = record.value();
	log.info(“测试触达记录消费:offset = {}, res = {}”,record.offset(), messageStr);
}

上面只是指定了我们从什么偏移量开始消费,如果要限制范围,可以在代码里面加限制

@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分区", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
	if (record.offset() > 结束偏移量) return;
	String messageStr = record.value();
	log.info(“测试触达记录消费:offset = {}, res = {}”,record.offset(), messageStr);
}

测试或正式环境调试

上面只适合本地场景,如果是线上环境,我们本地一般是没有权限连接监听的,那么可以怎么做?其实也能做,只不过需要通过接口去拉取处理

@Autowired
private KafkaConfig kafkaConfig;

public void reconsumeMessage(String topic, int partition, long offset) {
		ConsumerFactory<Integer, String> consumerFactory = kafkaConfig.consumerFactory();
		Map<String, Object> configurationProperties = consumerFactory.getConfigurationProperties();

		Map<String, Object> customProps = new HashMap<>();
		customProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reconsume-temp-group-" + UUID.randomUUID());
		customProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 防止 offset 不存在时报错
		customProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		customProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");// 控制每次 poll 只拉取一条
		// 复用 kafkaConfig 中的基础配置
		Map<String, Object> props = new HashMap<>(configurationProperties);
		props.putAll(customProps);

		try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
			TopicPartition topicPartition = new TopicPartition(topic, partition);

			// 分配分区并定位偏移量
			consumer.assign(Collections.singletonList(topicPartition));
			consumer.seek(topicPartition, offset);

			// 拉取消息(设置超时时间)
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

			for (ConsumerRecord<String, String> record : records) {
				if (record.offset() == offset) {
					if (Objects.equals(topic, KafkaTopic.Forlan_MESSAGE_NOTIFY)) {
						// 调用@KafkaListener的方法执行逻辑
						forlanConsumer.userConsumer(record);
					}
					break;
				}
			}
		}
	}

项目中如果没有配置kafkaConfig,也可以自定义一个,只要能拿到连接就行

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap();
    props.put("bootstrap.servers", this.bootstrapServers);
    props.put("group.id", this.groupid);
    props.put("enable.auto.commit", this.autoCommit);
    props.put("auto.commit.interval.ms", this.interval);
    props.put("session.timeout.ms", this.timeout);
    props.put("key.deserializer", this.keyDeserializer);
    props.put("value.deserializer", this.valueDeserializer);
    props.put("auto.offset.reset", this.offsetReset);
    props.put("max.poll.records", this.maxPollRecords);
    return props;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • RabbitMQ交换机与Springboot整合的简单实现

    RabbitMQ交换机与Springboot整合的简单实现

    这篇文章主要介绍了RabbitMQ交换机与Springboot整合的简单实现,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-07-07
  • 在Java中如何对类进行排序详解

    在Java中如何对类进行排序详解

    这篇文章主要给大家介绍了关于如何在Java中使用Arrays.toString()对类进行排序的相关资料,文中通过代码示例介绍的非常详细,需要的朋友可以参考下
    2023-08-08
  • SpringBoot整合Sharding-JDBC实现MySQL8读写分离

    SpringBoot整合Sharding-JDBC实现MySQL8读写分离

    本文是一个基于SpringBoot整合Sharding-JDBC实现读写分离的极简教程,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的可以了解一下
    2021-07-07
  • 教你Java中的Lock锁底层AQS到底是如何实现的

    教你Java中的Lock锁底层AQS到底是如何实现的

    本文是基于ReentrantLock来讲解,ReentrantLock加锁只是对AQS的api的调用,底层的锁的状态(state)和其他线程等待(Node双向链表)的过程其实是由AQS来维护的,对Java Lock锁AQS实现过程感兴趣的朋友一起看看吧
    2022-05-05
  • 对handlerexecutionchain类的深入理解

    对handlerexecutionchain类的深入理解

    下面小编就为大家带来一篇对handlerexecutionchain类的深入理解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Java实战之用Swing实现通讯录管理系统

    Java实战之用Swing实现通讯录管理系统

    今天给大家带来的是Java实战的相关知识,文章围绕着Swing实现通讯录管理系统展开,文中有非常详细的代码示例,需要的朋友可以参考下
    2021-06-06
  • IDEA中创建properties配置文件

    IDEA中创建properties配置文件

    我们在j2ee当中,连接数据库的时候经常会用到properties配置文件,本文主要介绍了IDEA中创建properties配置文件,具有一定的参考价值, 感兴趣的可以了解一下
    2024-04-04
  • Java中的@Repeatable注解使用详解

    Java中的@Repeatable注解使用详解

    这篇文章主要介绍了Java中的@Repeatable注解使用详解,@Repeatable注解是java8为了解决同一个注解不能重复在同一类/方法/属性上使用的问题,本文提供了解决思路和部分实现代码,需要的朋友可以参考下
    2024-02-02
  • m1 Mac设置多jdk版本并动态切换的实现

    m1 Mac设置多jdk版本并动态切换的实现

    本文主要介绍 Mac 下如何安装 JDK 并且多版本如何切换,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • 浅析Java如何实现动态线程池的任务编排

    浅析Java如何实现动态线程池的任务编排

    动态线程池是在程序运行期间,动态调整线程池参数而无需重启程序的技术,那么如何在动态线程池中进行任务编排呢,下面小编就来和大家详细介绍一下吧
    2025-09-09

最新评论