SpringBoot整合Kafka生产环境标准配置

 更新时间:2026年04月16日 08:26:17   作者:别掉进我的异常  
本文介绍了在SpringBoot 3.x项目中集成Apache Kafka的完整方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、环境准备

组件版本说明
Java17+Spring Boot 3.x 需 Java 17+
Apache Kafka3.6.x下载地址:Kafka 官网
Spring Boot3.1.0+通过 Spring Initializr 生成项目
Maven/Gradle最新项目构建工具

✅ 启动 Kafka 服务(本地开发示例):

# 启动 Zookeeper(Kafka 3.0+ 已内置)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties

二、项目搭建(Maven 依赖)

1. 添加 Spring Kafka 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>

💡 关键点:Spring Boot 3.x 已移除 spring-boot-starter-kafka,直接使用 spring-kafka。

三、YAML 配置(application.yml)

✅ 重点:YAML 配置替代 properties,支持对象序列化器配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.example.config.AttendanceSerializer # 自定义序列化器
      batch-size: 16384 # 批量发送优化
      linger-ms: 100 # 批量发送延迟
      retries: 3
      acks: all
    consumer:
      group-id: attendance-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.example.config.AttendanceDeserializer # 自定义反序列化器
      auto-offset-reset: earliest
      enable-auto-commit: true
      max-poll-records: 500
      isolation.level: read_committed
      # 事务配置(可选)
      transaction-id-prefix: attendance-trans

🌟 YAML 配置优势

  • 层级清晰,避免 . 重复
  • 支持嵌套配置(如 producer.batch-size
  • 与 Spring Boot 3.x 完美兼容

四、自定义序列化器(核心代码)

1. 定义消息实体(AttendanceStatisticsSingleDto)

package jnpf.model.attendance.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Date;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AttendanceStatisticsSingleDto implements Serializable {
    private static final long serialVersionUID = 1L;
    @NotBlank(message = "租户Id不能为空")
    private String tenantId;
    @NotBlank(message = "考勤组Id不能为空")
    private String groupId;
    @NotBlank(message = "用户Id不能为空")
    private String userId;
    @NotNull(message = "日期不能为空")
    private Date day;
}

2. 创建序列化器(AttendanceSerializer.java)

package com.example.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import jnpf.model.attendance.event.AttendanceStatisticsSingleDto;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class AttendanceSerializer implements Serializer<AttendanceStatisticsSingleDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 配置方法,无需额外操作
    }
    @Override
    public byte[] serialize(String topic, AttendanceStatisticsSingleDto data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new RuntimeException("序列化失败: " + data, e);
        }
    }
    @Override
    public void close() {
        // 无需关闭资源
    }
}

3. 创建反序列化器(AttendanceDeserializer.java)

package com.example.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import jnpf.model.attendance.event.AttendanceStatisticsSingleDto;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class AttendanceDeserializer implements Deserializer<AttendanceStatisticsSingleDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 配置方法,无需额外操作
    }
    @Override
    public AttendanceStatisticsSingleDto deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, AttendanceStatisticsSingleDto.class);
        } catch (Exception e) {
            throw new RuntimeException("反序列化失败: " + topic, e);
        }
    }
    @Override
    public void close() {
        // 无需关闭资源
    }
}

💡 为什么需要自定义序列化器?
Spring Kafka 默认只支持 String/byte[],复杂对象必须通过序列化器转换为字节数组。

五、完整集成示例

1. 生产者服务(发送 AttendanceStatisticsSingleDto)

package com.example.service;
import com.example.config.AttendanceSerializer;
import jnpf.model.attendance.event.AttendanceStatisticsSingleDto;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class AttendanceProducer {
    private final KafkaTemplate<String, AttendanceStatisticsSingleDto> kafkaTemplate;
    public AttendanceProducer(KafkaTemplate<String, AttendanceStatisticsSingleDto> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public void sendAttendanceData(AttendanceStatisticsSingleDto attendance) {
        // 发送消息到主题 attendance-topic
        kafkaTemplate.send("attendance-topic", attendance.getUserId(), attendance);
        System.out.println("✅ 消息发送成功: " + attendance.getUserId() + " | " + attendance.getDay());
    }
}

2. 消费者服务(接收 AttendanceStatisticsSingleDto)

package com.example.consumer;
import com.example.config.AttendanceDeserializer;
import jnpf.model.attendance.event.AttendanceStatisticsSingleDto;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class AttendanceConsumer {
    @KafkaListener(
        topics = "attendance-topic",
        groupId = "attendance-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void listen(AttendanceStatisticsSingleDto attendance, 
                      Acknowledgment ack,
                      Headers headers) {
        // 获取 Kafka 元数据
        int partition = Integer.parseInt(headers.lastHeader("partition").value().toString());
        long offset = headers.lastHeader("offset").value().toString().getBytes()[0];
        System.out.println("✅ 消息消费成功 | 分区: " + partition + 
                          " | 偏移量: " + offset + 
                          " | 用户: " + attendance.getUserId());
        // 手动提交偏移量(可选)
        ack.acknowledge();
    }
}

3. 配置 Kafka 监听器容器工厂(关键!)

package com.example.config;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.DeserializationException;
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, AttendanceStatisticsSingleDto> consumerFactory() {
        // 配置消费者工厂
        return new DefaultKafkaConsumerFactory<>(
            Map.of(
                "bootstrap.servers", "localhost:9092",
                "group.id", "attendance-group",
                "key.deserializer", StringDeserializer.class.getName(),
                "value.deserializer", AttendanceDeserializer.class.getName()
            )
        );
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, AttendanceStatisticsSingleDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, AttendanceStatisticsSingleDto> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3), // 重试3次,间隔1秒
            new TopicPartitionOffset("attendance-topic.DLQ", 0) // 死信队列
        ));
        // 启用自动提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

🌟 关键配置说明

  • setCommonErrorHandler:实现自动重试 + 死信队列
  • setAckMode(ContainerProperties.AckMode.MANUAL):手动提交偏移量(推荐)
  • AttendanceDeserializer:与生产者序列化器严格匹配

六、测试用例(JUnit 5)

package com.example;
import com.example.service.AttendanceProducer;
import jnpf.model.attendance.event.AttendanceStatisticsSingleDto;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@SpringBootTest
public class KafkaIntegrationTest {
    @Autowired
    private AttendanceProducer producer;
    @Test
    public void testSendAttendanceData() {
        AttendanceStatisticsSingleDto attendance = AttendanceStatisticsSingleDto.builder()
            .tenantId("tenant_001")
            .groupId("group_001")
            .userId("user_001")
            .day(new Date())
            .build();
        producer.sendAttendanceData(attendance);
        // 实际测试需等待消费者处理(此处简化)
        System.out.println("✅ 测试消息已发送");
    }
}

✅ 测试执行流程

  1. 启动 Kafka 服务
  2. 运行 Spring Boot 应用
  3. 执行测试用例
  4. 消费者控制台输出日志

七、最佳实践与配置优化

场景配置说明
批量发送优化spring.kafka.producer.batch-size=16384
spring.kafka.producer.linger-ms=100
提升吞吐量 30%+
消费者线程控制spring.kafka.consumer.max-poll-records=500防止单次拉取过多消息
事务保障spring.kafka.consumer.transaction-id-prefix=attendance-trans确保消息与数据库操作原子性
死信队列error-handler.dlq-topic=attendance-topic.DLQ消费失败消息自动移至 DLQ
监控指标micrometer.kafka.enabled=true集成 Prometheus 监控

八、常见问题排查(YAML 配置特有)

问题现象解决方案
序列化器未生效ClassCastException: String cannot be cast to AttendanceStatisticsSingleDto检查 value-serializer 配置路径是否正确
YAML 缩进错误Invalid configuration严格使用 2 空格缩进(避免 Tab)
死信队列未创建消息未进入 DLQ手动创建主题:kafka-topics.sh --create --topic attendance-topic.DLQ --partitions 1 --replication-factor 1
日期格式异常Invalid format for Date在 AttendanceStatisticsSingleDto 中添加 @JsonFormat 注解
消费者无法启动No available broker检查 bootstrap-servers 是否可访问

九、总结:Spring Boot + Kafka 核心流程

✅ 关键结论:

  1. YAML 配置更清晰:避免 properties 中的 spring.kafka.consumer.group-id 拼写错误
  2. 自定义序列化器是核心:必须与消息实体严格匹配
  3. 死信队列是生产必备:任何消费者都应配置 DLQ
  4. 手动提交偏移量:避免消息丢失(AckMode.MANUAL)

💡 立即实践:

  1. 创建 AttendanceStatisticsSingleDto 实体
  2. 实现 AttendanceSerializer/AttendanceDeserializer
  3. 配置 application.yml 中的序列化器路径
  4. 启动 Kafka + Spring Boot 服务
  5. 发送测试消息 → 查看消费者日志

到此这篇关于SpringBoot整合Kafka生产环境标准配置的文章就介绍到这了,更多相关SpringBoot整合Kafka生产环境配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java提取2个集合中的相同和不同元素代码示例

    Java提取2个集合中的相同和不同元素代码示例

    这篇文章主要介绍了Java提取2个集合中的相同和不同元素代码示例,涉及对removeall方法的简单介绍,然后分享了主要的示例代码,具有一定借鉴价值,需要的朋友可以参考下。
    2017-11-11
  • mybatis中方法返回泛型与resultType不一致的解决

    mybatis中方法返回泛型与resultType不一致的解决

    这篇文章主要介绍了mybatis中方法返回泛型与resultType不一致的解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • 在Java中实现线程之间的数据共享的几种方式总结

    在Java中实现线程之间的数据共享的几种方式总结

    在 Java 中实现线程间数据共享是并发编程的核心需求,但需要谨慎处理同步问题以避免竞态条件,本文通过代码示例给大家介绍了几种主要实现方式及其最佳实践,需要的朋友可以参考下
    2025-08-08
  • 手动部署java项目到k8s中的实现

    手动部署java项目到k8s中的实现

    本文主要介绍了手动部署java项目到k8s中的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-08-08
  • 浅析Java中的SPI原理

    浅析Java中的SPI原理

    SPI:由调用方制定接口标准,实现方来针对接口提供不同的实现,SPI其实就是"为接口查找实现"的一种服务发现机制。本文将浅谈一下SPI机制的原理,需要的可以参考一下
    2022-09-09
  • 如何将文件流转换成byte[]数组

    如何将文件流转换成byte[]数组

    这篇文章主要介绍了如何将文件流转换成byte[]数组,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • java交换排序之鸡尾酒排序实现方法

    java交换排序之鸡尾酒排序实现方法

    这篇文章主要介绍了java交换排序之鸡尾酒排序实现方法,实例分析了排序的原理与相关的实现技巧,需要的朋友可以参考下
    2015-02-02
  • Java实现U盘的插入和移除检测程序

    Java实现U盘的插入和移除检测程序

    在现代计算机系统中,U盘的使用非常普遍,那么如何在Java程序中实现对U盘的插入和移除进行检测,并对U盘中的文件进行递归处理呢,下面我们就来简单介绍一下吧
    2025-12-12
  • SpringBoot通过RedisTemplate执行Lua脚本的方法步骤

    SpringBoot通过RedisTemplate执行Lua脚本的方法步骤

    这篇文章主要介绍了SpringBoot通过RedisTemplate执行Lua脚本的方法步骤,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • java String类功能、原理与应用案例【统计、判断、转换等】

    java String类功能、原理与应用案例【统计、判断、转换等】

    这篇文章主要介绍了java String类功能、原理与应用案例,结合实例形式详细分析了java String类的基本功能、构造方法,以及使用String类实现统计、判断、转换等功能相关操作技巧,需要的朋友可以参考下
    2019-03-03

最新评论