Kafka批量消费&逐条消费详解

 更新时间:2026年02月06日 15:21:51   作者:C_Knight  
文章介绍了Kafka消费者的配置参数,包括批量消费和逐条消费的设置,在逐条消费模式下,消息会被分割成字符串数组,总结并提供了个人经验供参考

Kafka批量消费&逐条消费

消费者配置参数

    private Map<String, Object> defaultGoodsConsumerConfig() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
        props.put("listener.type", "batch");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "modify-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_PLAINTEXT”);
        props.put(SaslConfigs.SASL_MECHANISM, defaultKafkaProperties.getSaslMechanism());
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="###" password="###";
");

        return props;
    }

    @Bean(name = "defaultListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> defaultListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        log.info("KafkaDefaultConsumer factory获取实例:"+ JSON.toJSONString(factory));
        return factory;
    }
       ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
       factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
       factory.setConcurrency(4);
       //批量消费,如果不设置默认是单条消费
       factory.setBatchListener(true);
       factory.getContainerProperties().setPollTimeout(3000);

消费者监听消息

    /**
     * 监听goods变更消息
     */
    @KafkaListener(id="sync-modify-goods", topics = "${kafka.sync.goods.topic}", concurrency = "4", containerFactory = "defaultListenerContainerFactory")
    public void updateListener(List<ConsumerRecord<String, String>> records){
        for (ConsumerRecord<String, String> msg:records) {
            GoodsChangeMsg changeMsg = null;
            try {
                changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
                syncGoodsProcessor.handle(changeMsg);
            }catch (Exception exception) {
                log.error("解析失败{}", msg, exception);
            }
        }
    }

List<ConsumerRecord<String, String>> records可以是String[] message

如果是逐条消费,这里配置list,kafka会根据字符串中的逗号进行分割,所以碰见该现象不要慌,看一下批量消费的配置。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java中String字符串使用避坑指南

    Java中String字符串使用避坑指南

    Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容易造成线上事故( OOM),服务器崩溃,甚至难以察觉的Bug!今天我们就来聊聊String使用中的一些常见坑点,以及如何优雅避坑
    2025-02-02
  • Java  Thread多线程详解及用法解析

    Java Thread多线程详解及用法解析

    本文主要介绍Java 多线程详解及用法,这里整理了详细资料及简单实现代码,有需要的小伙伴可以参考下
    2016-09-09
  • SpringBoot 自动装配的原理详解分析

    SpringBoot 自动装配的原理详解分析

    这篇文章主要介绍了SpringBoot 自动装配的原理详解分析,文章通过通过一个案例来看一下自动装配的效果展开详情,感兴趣的小伙伴可以参考一下
    2022-08-08
  • Java int类型如何获取高低位

    Java int类型如何获取高低位

    这篇文章主要介绍了Java int类型如何获取高低位,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • java IO流 之 输出流 OutputString()的使用

    java IO流 之 输出流 OutputString()的使用

    这篇文章主要介绍了java IO流 之 输出流 OutputString()的使用的相关资料,需要的朋友可以参考下
    2016-12-12
  • springboot集成camunda的实现示例

    springboot集成camunda的实现示例

    本文主要介绍了springboot集成camunda的实现示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • Java静态static与实例instance方法示例

    Java静态static与实例instance方法示例

    这篇文章主要为大家介绍了Java静态static与实例instance方法示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-08-08
  • Spring AOP详解面向切面编程思想

    Spring AOP详解面向切面编程思想

    Spring是一个广泛应用的框架,SpringAOP则是Spring提供的一个标准易用的aop框架,依托Spring的IOC容器,提供了极强的AOP扩展增强能力,对项目开发提供了极大地便利
    2022-06-06
  • Java修改PowerPoint幻灯片批注信息

    Java修改PowerPoint幻灯片批注信息

    这篇文章主要介绍了Java修改PowerPoint幻灯片批注信息,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • java实现自定义日期选择器的方法实例

    java实现自定义日期选择器的方法实例

    日期选择器是我们日常开发中经常需要用到的一个功能,下面这篇文章主要给大家介绍了关于利用java实现自定义日期选择器的相关资料,文中给出了详细的示例代码,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧。
    2017-10-10

最新评论