Kafka调试技巧及心得分享

 更新时间:2026年02月12日 09:09:37   作者:程序员Forlan  
Kafka消费组机制确保每个消息只被一个消费者消费,新增机器时根据策略分配分区,本地调试时,修改代码后可能需要重新测试和造消息,但可以通过配置参数控制消费偏移量,线上环境调试可以通过接口拉取处理

基础理念

对于我们有a,b,c,3台机器,那么我们的消息会被消费3次?

  • 可能会,也可能不会,这取决于你的配置和策略。
  • 消费者组机制:Kafka 使用消费组(Consumer Group)来确保每个消息只会被每个消费者组中的一个消费者消费一次。
  • 分区分配:Kafka 主题可以分为多个分区(Partitions),每个分区只能由一个消费者组中的一个消费者消费。

如果新增1台机器,那么他的偏移量从0开始?

在kafka中,消费组都会维护自己的偏移量(offset),以此来记录消费的消息位置,而当新增1台机器时,会根据分区分配策略,比如:范围分配、轮询分配,这就有2种情况,可能加入旧分区,也可能加入新分区,具体可以配置下策略参数auto.offset.reset,对应的值如下:

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

本地调试

所以,当我们在调试kafka消费逻辑的时候,可能由于消费逻辑写的不对,改完代码需要重新测,重新去造1条消息?还是你会怎么去做,了解了前面的原理,我们改消费组是不可行的,他获取的是最新的偏移量,无法实现复用之前造的某条数据,特别是我们有不同逻辑,把每种类型的消息都重新推一波,这不仅麻烦,而且也容易出错,是否可以直接复用之前的消息,准确处理?

我们可以先了解下@KafkaListener里面的一些配置参数,具体如下:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分区", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
	String messageStr = record.value();
	log.info(“测试触达记录消费:offset = {}, res = {}”,record.offset(), messageStr);
}

上面只是指定了我们从什么偏移量开始消费,如果要限制范围,可以在代码里面加限制

@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分区", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
	if (record.offset() > 结束偏移量) return;
	String messageStr = record.value();
	log.info(“测试触达记录消费:offset = {}, res = {}”,record.offset(), messageStr);
}

测试或正式环境调试

上面只适合本地场景,如果是线上环境,我们本地一般是没有权限连接监听的,那么可以怎么做?其实也能做,只不过需要通过接口去拉取处理

@Autowired
private KafkaConfig kafkaConfig;

public void reconsumeMessage(String topic, int partition, long offset) {
		ConsumerFactory<Integer, String> consumerFactory = kafkaConfig.consumerFactory();
		Map<String, Object> configurationProperties = consumerFactory.getConfigurationProperties();

		Map<String, Object> customProps = new HashMap<>();
		customProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reconsume-temp-group-" + UUID.randomUUID());
		customProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 防止 offset 不存在时报错
		customProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		customProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");// 控制每次 poll 只拉取一条
		// 复用 kafkaConfig 中的基础配置
		Map<String, Object> props = new HashMap<>(configurationProperties);
		props.putAll(customProps);

		try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
			TopicPartition topicPartition = new TopicPartition(topic, partition);

			// 分配分区并定位偏移量
			consumer.assign(Collections.singletonList(topicPartition));
			consumer.seek(topicPartition, offset);

			// 拉取消息(设置超时时间)
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

			for (ConsumerRecord<String, String> record : records) {
				if (record.offset() == offset) {
					if (Objects.equals(topic, KafkaTopic.Forlan_MESSAGE_NOTIFY)) {
						// 调用@KafkaListener的方法执行逻辑
						forlanConsumer.userConsumer(record);
					}
					break;
				}
			}
		}
	}

项目中如果没有配置kafkaConfig,也可以自定义一个,只要能拿到连接就行

private Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap();
    props.put("bootstrap.servers", this.bootstrapServers);
    props.put("group.id", this.groupid);
    props.put("enable.auto.commit", this.autoCommit);
    props.put("auto.commit.interval.ms", this.interval);
    props.put("session.timeout.ms", this.timeout);
    props.put("key.deserializer", this.keyDeserializer);
    props.put("value.deserializer", this.valueDeserializer);
    props.put("auto.offset.reset", this.offsetReset);
    props.put("max.poll.records", this.maxPollRecords);
    return props;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Minio环境部署过程及如何配置HTTPS域名

    Minio环境部署过程及如何配置HTTPS域名

    MinIO 是一个对象存储系统,数据需要存储在宿主机上,容器的重启不影响数据,因此我们需要为 MinIO 创建一个挂载目录,用于持久化存储数据,本文详细介绍了如何部署MinIO,并通过配置反向代理和HTTPS来提升其安全性,感兴趣的朋友一起看看吧
    2025-03-03
  • 使用自定义注解和@Aspect实现责任链模式的组件增强的详细代码

    使用自定义注解和@Aspect实现责任链模式的组件增强的详细代码

    责任链模式是一种行为设计模式,其作用是将请求的发送者和接收者解耦,从而可以灵活地组织和处理请求,本文讲给大家介绍如何使用自定义注解和@Aspect实现责任链模式的组件增强,文中有详细的代码示例供大家参考,感兴趣的同学可以借鉴一下
    2023-05-05
  • 一文带你搞懂Maven的继承与聚合

    一文带你搞懂Maven的继承与聚合

    这篇文章主要为大家详细介绍了Maven的继承和聚合以及二者的区别,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-07-07
  • Java 字符串转float运算 float转字符串的方法

    Java 字符串转float运算 float转字符串的方法

    今天小编就为大家分享一篇Java 字符串转float运算 float转字符串的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • Hibernatede 一对多映射配置方法(分享)

    Hibernatede 一对多映射配置方法(分享)

    下面小编就为大家带来一篇Hibernatede 一对多映射配置方法(分享)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-09-09
  • 区块链java代码实现

    区块链java代码实现

    这篇文章主要为大家详细介绍了区块链java代码实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • eclipse 安装lombok插件

    eclipse 安装lombok插件

    这篇文章主要介绍了eclipse 安装lombok插件的详细步骤,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-07-07
  • Java BigDecimal类的一般使用、BigDecimal转double方式

    Java BigDecimal类的一般使用、BigDecimal转double方式

    这篇文章主要介绍了Java BigDecimal类的一般使用、BigDecimal转double方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • java解析任意层数json字符串的方法

    java解析任意层数json字符串的方法

    一个方法解析任意层数的json字符窜:使用正则表达式,递归算法,将jsonArray解析出后添加到List, JsonObject添加至Map
    2014-02-02
  • Spring AOP 的组成和实现

    Spring AOP 的组成和实现

    这篇文章主要介绍了Spring AOP 的组成和实现,AOP 是一种思想,Spring AOP 是这种思想的具体实现,本文结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-07-07

最新评论