springboot配置kafka批量消费,并发消费方式

 更新时间:2024年12月30日 16:50:22   作者:梵法利亚  
文章介绍了如何在Spring Boot中配置Kafka进行批量消费,并发消费,需要注意的是,并发量必须小于等于分区数,否则会导致线程空闲,文章还总结了创建Kafka分区的命令,并鼓励读者分享经验

springboot配置kafka批量消费,并发消费

 @KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})},
            containerFactory = "batchFactory")
    public void listener0(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
          //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})},
            containerFactory = "batchFactory")
    public void listener1(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})},
            containerFactory = "batchFactory")
    public void listener2(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})},
            containerFactory = "batchFactory")
    public void listener3(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersConfig;





    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3");
        log.info("bootstrapServersConfig:自定义配置="+ bootstrapServersConfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) {
        //Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并发数量
        factory.setConcurrency(3);
        //开启批量监听,消费
        factory.setBatchListener(true);
        //factory.set
        return factory;
    }

}

按照以上配置内容即可,可以达到kafka批量消费的能力。

但是,要特别需要注意的一个点是:

  • 并发量根据实际的分区数量决定
  • 必须小于等于分区数
  • 否则会有线程一直处于空闲状态

下面是创建4个分区的命令写法

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic   personnel_card_real_time_recordinfo    --partitions 4 --replication-factor 1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • JAVA如何获取客户端IP地址和MAC地址

    JAVA如何获取客户端IP地址和MAC地址

    本篇文章主要介绍了JAVA如何获取客户端IP地址和MAC地址非常具有实用价值,这里整理了详细的代码,需要的朋友可以参考下
    2017-08-08
  • SpringBoot实现防止XSS攻击的示例详解

    SpringBoot实现防止XSS攻击的示例详解

    这篇文章主要为大家详细介绍了SpringBoot如何实现防止XSS攻击,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-03-03
  • Java如何获取文件夹下所有压缩包下指定文件

    Java如何获取文件夹下所有压缩包下指定文件

    在Java中,通过遍历文件夹并对压缩包进行解析,可以实现提取指定文件的功能,如文档、PDF等,该过程中可增加过滤条件来适应不同需求,例如文件类型或文件名过滤,该方法适用于处理大量数据时的文件管理和数据提取
    2024-09-09
  • java 指定某个jdk版本方法

    java 指定某个jdk版本方法

    这篇文章主要介绍了java 指定某个jdk版本方法的相关资料,需要的朋友可以参考下
    2017-05-05
  • Java分布式锁的三种实现方案

    Java分布式锁的三种实现方案

    本文主要介绍了Java分布式锁的三种实现方案。具有一定的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • RabbitMQ使用案例详解

    RabbitMQ使用案例详解

    RabbitMQ是基于Erlang语言开发的开源的消息中间件,这篇文章给大家介绍RabbitMQ使用案例,感兴趣的朋友跟随小编一起看看吧
    2024-03-03
  • java实现单链表中的增删改

    java实现单链表中的增删改

    这篇文章主要为大家详细介绍了java实现单链表中的增删改,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • 一文教你Java如何快速构建项目骨架

    一文教你Java如何快速构建项目骨架

    在 Java 项目开发过程中,构建项目骨架是一项繁琐但又基础重要的工作,Java 领域有许多代码生成工具可以帮助我们快速完成这一任务,下面就跟随小编一起来了解下吧
    2025-05-05
  • springboot web项目打jar或者war包并运行的实现

    springboot web项目打jar或者war包并运行的实现

    这篇文章主要介绍了springboot web项目打jar或者war包并运行的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-11-11
  • 基于指针pointers和引用references的区别分析

    基于指针pointers和引用references的区别分析

    本篇文章介绍了,基于指针pointers和引用references的区别分析。需要的朋友参考下
    2013-05-05

最新评论