Spring Boot集成Apache Kafka的实战指南

 更新时间:2025年06月23日 15:33:55   作者:超级小忍  
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构,下面我们来看看如何在 Spring Boot 项目中集成 Kafka吧

Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道、日志聚合系统和事件溯源架构。Spring Boot 提供了对 Kafka 的良好集成支持,使得开发者可以非常便捷地在项目中使用 Kafka。

本文将手把手教你如何在 Spring Boot 项目中集成 Kafka,包括生产者(Producer)和消费者(Consumer)的实现,并提供完整的代码示例。

开发环境准备

Java 17+

Maven 或 Gradle

Spring Boot 3.x

Apache Kafka 3.0+(本地或远程)

IDE(如 IntelliJ IDEA、VS Code)

创建 Spring Boot 项目

你可以通过 Spring Initializr 创建一个新的 Spring Boot 项目,选择以下依赖:

  • Spring Web
  • Spring for Apache Kafka

或者手动添加 pom.xml 中的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Spring Boot 会自动管理版本兼容性,无需手动指定版本号。

配置 Kafka 连接信息

在 application.yml 或 application.properties 文件中配置 Kafka 相关参数:

application.yml 示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

编写 Kafka 生产者(Producer)

创建一个服务类用于发送消息到 Kafka 主题。

KafkaProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Sent message: " + message);
    }
}

编写 Kafka 消费者(Consumer)

使用 @KafkaListener 注解监听特定主题的消息。

KafkaConsumer.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record) {
        System.out.printf("Received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

添加 REST 接口用于测试发送消息

为了方便测试,我们可以创建一个简单的 REST 控制器来触发消息发送。

KafkaController.java

import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/send")
    public String sendMessage(@RequestParam String msg) {
        kafkaProducer.sendMessage("test-topic", msg);
        return "Message sent: " + msg;
    }
}

启动 Kafka 环境(可选)

如果你还没有运行 Kafka,可以按照以下步骤快速启动:

启动 Zookeeper(Kafka 依赖)

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

创建测试 Topic

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

测试接口

启动 Spring Boot 应用后,访问如下接口发送消息:

POST http://localhost:8080/kafka/send?msg=HelloKafka

观察控制台输出,确认是否收到类似以下内容:

Received message: topic - test-topic, partition - 0, offset - 5, key - null, value - HelloKafka

扩展功能建议

使用 JSON 格式传输对象(自定义序列化/反序列化)

多消费者组配置与负载均衡

异常处理与重试机制(@DltHandler, SeekToCurrentErrorHandler)

Kafka Streams 实现实时流处理逻辑

配置 SSL、SASL 安全认证

结合 Spring Cloud Stream 构建云原生事件驱动架构

到此这篇关于Spring Boot集成Apache Kafka的实战指南的文章就介绍到这了,更多相关SpringBoot集成Apache Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • FreeMarker配置(Configuration)

    FreeMarker配置(Configuration)

    所有与该configuration 对象关联的模版实例都就可以通过获得to_upper 转换器,company 来获得字符串,因此你不需要再一次次的往root 中添加这些变量了。如果你往root 添加同名的变量,那么你新添加的变量将会覆盖之前的共享变量。
    2016-04-04
  • Java的JSTL标签库详解

    Java的JSTL标签库详解

    JSTL包含用于编写和开发JSP页面的一组标准标签,它可以为用户提供一个无脚本环境。在此环境中,用户可以使用标签编写代码,而无须使用Java脚本
    2023-05-05
  • Java接口RandomAccess全面了解

    Java接口RandomAccess全面了解

    下面小编就为大家带来一篇Java接口RandomAccess全面了解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09
  • Spring Boot 控制层之参数传递方法详解

    Spring Boot 控制层之参数传递方法详解

    这篇文章主要介绍了Spring Boot 控制层之参数传递方法详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • mybatis批量更新与插入方式

    mybatis批量更新与插入方式

    这篇文章主要介绍了mybatis批量更新与插入方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 一次由Lombok的@AllArgsConstructor注解引发的错误及解决

    一次由Lombok的@AllArgsConstructor注解引发的错误及解决

    这篇文章主要介绍了一次由Lombok的@AllArgsConstructor注解引发的错误及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • SpringBoot常见问题小结

    SpringBoot常见问题小结

    这篇文章主要介绍了SpringBoot常见问题小结,需要的朋友可以参考下
    2017-07-07
  • 利用exe4j生成java的exe文件

    利用exe4j生成java的exe文件

    本文主要介绍了利用exe4j生成java的exe文件,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • 将Mybatis升级为Mybatis-Plus的详细过程

    将Mybatis升级为Mybatis-Plus的详细过程

    本文详细介绍了在若依管理系统(v3.8.8)中将MyBatis升级为MyBatis-Plus的过程,旨在提升开发效率,通过本文,开发者可实现系统功能无损升级,同时享受MyBatis-Plus带来的便捷特性,如代码简化和性能优化,需要的朋友可以参考下
    2025-04-04
  • Java常用对象操作工具代码实例

    Java常用对象操作工具代码实例

    这篇文章主要介绍了Java常用对象操作工具代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12

最新评论