springboot配置kafka批量消费,并发消费方式

 更新时间:2024年12月30日 16:50:22   作者:梵法利亚  
文章介绍了如何在Spring Boot中配置Kafka进行批量消费,并发消费,需要注意的是,并发量必须小于等于分区数,否则会导致线程空闲,文章还总结了创建Kafka分区的命令,并鼓励读者分享经验

springboot配置kafka批量消费,并发消费

 @KafkaListener(id = "id0",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"0"})},
            containerFactory = "batchFactory")
    public void listener0(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
          //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id1",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"1"})},
            containerFactory = "batchFactory")
    public void listener1(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id2",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"2"})},
            containerFactory = "batchFactory")
    public void listener2(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }


    @KafkaListener(id = "id3",groupId = "forest_fire_ql_firecard_test_info3",
            topicPartitions = {@TopicPartition(topic = CommonConstants.KafkaTop.personnelCardRealTimeRecord,partitions = {"3"})},
            containerFactory = "batchFactory")
    public void listener3(List<String> record, Consumer<String,String> consumer){
        consumer.commitSync();
        try {
            //业务处理
        } catch (Exception e) {
            log.error(e.toString());
        }
    }
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServersConfig;





    public Map<String,Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "forest_fire_ql_firecard_test_info3");
        log.info("bootstrapServersConfig:自定义配置="+ bootstrapServersConfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,3);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"20000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory(KafkaProperties properties) {
        //Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //并发数量
        factory.setConcurrency(3);
        //开启批量监听,消费
        factory.setBatchListener(true);
        //factory.set
        return factory;
    }

}

按照以上配置内容即可,可以达到kafka批量消费的能力。

但是,要特别需要注意的一个点是:

  • 并发量根据实际的分区数量决定
  • 必须小于等于分区数
  • 否则会有线程一直处于空闲状态

下面是创建4个分区的命令写法

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic   personnel_card_real_time_recordinfo    --partitions 4 --replication-factor 1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • jdk配置完之后java -version还是默认的jdk版本问题解决过程

    jdk配置完之后java -version还是默认的jdk版本问题解决过程

    在CentOS7中配置JDK后,java -version仍显示默认JDK,因为系统默认的/usr/bin/java软链接优先级高于PATH环境变量,这篇文章主要介绍了jdk配置完之后java -version还是默认的jdk版本问题的解决过程,需要的朋友可以参考下
    2026-01-01
  • JAXB解析xml转换成类的实现方式

    JAXB解析xml转换成类的实现方式

    本文主要介绍了如何使用JAXB将XML配置项转换为Java类,JAXB提供了多种注解,如@XmlRootElement、@XmlElement、@XmlElementWrapper、@XmlAttribute等,可以方便地将XML元素映射为Java对象,并且可以控制生成的XML结构,同时,文章也提到了一些需要注意的问题
    2025-11-11
  • 详解java创建一个女朋友类(对象啥的new一个就是)==建造者模式,一键重写

    详解java创建一个女朋友类(对象啥的new一个就是)==建造者模式,一键重写

    这篇文章主要介绍了java建造者模式一键重写,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • springboot 整合 SA-Token 使用详解

    springboot 整合 SA-Token 使用详解

    本文详细介绍了SA-Token这款安全框架的使用,并结合实际操作演示了如何集成到springboot项目中,感兴趣的朋友跟随小编一起看看吧
    2024-08-08
  • java使用POI批量导入excel数据的方法

    java使用POI批量导入excel数据的方法

    这篇文章主要为大家详细介绍了java使用POI批量导入excel数据的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-07-07
  • Java实战之酒店人事管理系统的实现

    Java实战之酒店人事管理系统的实现

    这篇文章主要介绍了如何用Java实现酒店人事管理系统,文中采用的技术有:JSP、Spring、SpringMVC、MyBatis等,感兴趣的小伙伴可以学习一下
    2022-03-03
  • SpringBoot集成Session的实现示例

    SpringBoot集成Session的实现示例

    Session是一个在Web开发中常用的概念,它表示服务器和客户端之间的一种状态管理机制,用于跟踪用户在网站或应用程序中的状态和数据,本文主要介绍了SpringBoot集成Session的实现示例,感兴趣的可以了解一下
    2023-09-09
  • java二维数组遍历的2种代码

    java二维数组遍历的2种代码

    这篇文章主要介绍了java二维数组遍历的2种代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • Java @Pointcut注解表达式案例详解

    Java @Pointcut注解表达式案例详解

    这篇文章主要介绍了Java @Pointcut注解表达式案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • 老生常谈 MyBatis 复杂查询

    老生常谈 MyBatis 复杂查询

    这篇文章主要介绍了 MyBatis 复杂查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01

最新评论