SpringBoot3+Kafka实战指南

 更新时间:2025年09月19日 10:18:22   作者:neoooo  
本文主要介绍了SpringBoot3+Kafka实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. 项目层级

像火锅店的分工:点单员、传菜员、食客清清楚楚。

kafka/
├── pom.xml                 # 根 POM(BOM对齐)
├── provider/               # 点单:生产者
│   ├── pom.xml             # 子模块 POM
│   └── src/main/java/org/example/provider/
│       ├── ProviderApplication.java
│       ├── conf/KafkaTopicsConfig.java
│       ├── controller/ProviderController.java
│       └── service/KafkaProducerService.java
│   └── src/main/resources/application.yaml
└── consumer/               # 上桌:消费者
    ├── pom.xml             # 子模块 POM
    └── src/main/java/org/example/consumer/
        ├── ConsumerApplication.java
        └── listener/KafkaConsumerListener.java
    └── src/main/resources/application.yaml

2. 根 POM(大厨的调料表)

<modules>
    <module>provider</module>
    <module>consumer</module>
</modules>


<properties>
    <java.version>17</java.version>
    <spring.boot.version>3.4.3</spring.boot.version>
    <spring.cloud.version>2024.0.2</spring.cloud.version>
</properties>

<!-- 关键:用 BOM 管理依赖版本(不用 parent 也行) -->
<dependencyManagement>
    <dependencies>
        <!-- Spring Boot 依赖版本对齐(含 starter、lombok 等) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <!-- Spring Cloud 依赖版本对齐 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    
 <!-- hutool工具类 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-ai</artifactId>
        <version>5.8.38</version>
    </dependency>

</dependencies>

👉 全局版本对齐,避免“锅底和食材不搭”。

3. 子模块 POM

3.1 provider/pom.xml

<parent>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>provider</name>
<description>provider</description>
<packaging>jar</packaging>

<properties>
    <java.version>17</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 consumer/pom.xml

<parent>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>consumer</description>
<packaging>jar</packaging>

<properties>
    <java.version>17</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.58</version>
    </dependency>
</dependencies>

4. 配置(菜单写清楚)

4.1 Provider(application.yaml - 生产者)

server:
  port: 1003  # 本模块 HTTP 端口

app:
  kafka:
    topic: demo.topic.v1         # 要发送/创建的主题名
    auto-create-topic: true      # 开启后,会注册 NewTopic bean 从而在启动时创建主题(见 KafkaTopicsConfig)

spring:
  kafka:
    bootstrap-servers: yiqiquhuxi.cn:9092
    # 数据网络IO 序列化方式
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 发送字符串
      # 可靠性
      acks: all
      retries: 3

4.2 Consumer(application.yaml - 消费者)

server:
  port: 1004  # 本模块端口(通常只看日志)

app:
  kafka:
    topic: demo.topic.v1  # 要订阅的主题名(与 provider 保持一致)


spring:
  kafka:
    bootstrap-servers: yiqiquhuxi.cn:9092
    consumer:
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 接收字符串

# 说明:
# - JsonDeserializer 的默认类型键为 spring.json.value.default.type(源码常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7}
# - @KafkaListener 支持使用 ${...} 占位符读取上述配置。:contentReference[oaicite:8]{index=8}

5. 核心代码(厨师上阵)

5.1 入口

@SpringBootApplication
public class ProviderApplication {
  public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); }
}
@SpringBootApplication
public class ConsumerApplication {
  public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
}

5.2 消息模型

public record MessagePayload(String id, String content, long ts) {}

5.3 Provider(点菜 + 上菜)

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafka;

    @Value("${app.kafka.topic}")
    private String topic;


    public void send(String content) {
        MessagePayload payload = new MessagePayload(
                UUID.randomUUID().toString(),
                content,
                System.currentTimeMillis()
        );
        //序列化
        String jsonStr = JSONUtil.toJsonStr(payload);
        kafka.send(topic, jsonStr);
    }
}

@RestController
@RequestMapping("/provider")
public class ProviderController {
  @Autowired private KafkaProducerService producer;
  @GetMapping("/done") public String done() { producer.send("done"); return "done"; }
}

5.4 Provider(创建 Topic)

@Configuration
public class KafkaTopicsConfig {

    @Value("${app.kafka.topic}")
    private String topic;

    // 只有当 app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注册 NewTopic
    @Bean
    @ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true)
    public NewTopic demoTopic() {
        // 分区/副本按你的集群实际调整;单 Broker 可用 (3,1)
        return new NewTopic(topic, 3, (short) 1);
    }
}

👉 有了它,就不用手动 kafka-topics.sh --create,Spring Boot 启动时就能帮你“先起锅烧水”。

5.5 Consumer(开吃)

@Slf4j
@Component
public class KafkaConsumerListener {


    @KafkaListener(
            topics = "${app.kafka.topic}",
            groupId = "${spring.kafka.consumer.group-id}"
    )
    public void onMessage(String msg) {
        try {
            // json反序列化成对象
            MessagePayload payload = JSON.parseObject(msg, MessagePayload.class);
            log.info("✅ received: id={}, content={}, ts={}",
                    payload.id(), payload.content(), payload.ts());
        } catch (Exception e) {
            log.error("❌ JSON解析失败,原始消息: {}", msg, e);
        }
    }
}

6. 运行流程

  1. 点火:Kafka Broker 先启动
  2. 开店:先跑 consumer,再跑 provider
  3. 点单GET http://localhost:1003/provider/done
  4. 吃菜:consumer 日志里出现 🍜 → 成功!

7. 常见坑

  • 锅点不着bootstrap-servers 不通,先查网络
  • 没菜:topic 不存在?开 auto-create-topic
  • 吃不到:改 group-id 或加 auto-offset-reset=earliest
  • 串味了:序列化不匹配 Producer 默认用 StringSerializer,Consumer 却用 JsonDeserializer,两边火候不对,消息就“夹生”了。
    • 建议:
      • 如果传字符串,就都用 StringSerializer / StringDeserializer
      • 如果传对象,就统一用 JsonSerializer / JsonDeserializer,并在 application.yaml 里显式声明 spring.json.value.default.type

8. 总结

Spring Boot + Kafka 的套路:
👉 Provider 点单,Kafka 传菜,Consumer 开吃。

到此这篇关于SpringBoot3+Kafka实战指南的文章就介绍到这了,更多相关SpringBoot3 Kafka实战内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • 详解SpringBoot项目整合Vue做一个完整的用户注册功能

    详解SpringBoot项目整合Vue做一个完整的用户注册功能

    本文主要介绍了SpringBoot项目整合Vue做一个完整的用户注册功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • SSH框架网上商城项目第14战之商城首页UI的设计

    SSH框架网上商城项目第14战之商城首页UI的设计

    这篇文章主要为大家详细介绍了SSH框架网上商城项目第14战之商城首页UI的设计,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用详解

    MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用详解

    这篇文章主要介绍了MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用详解,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • java微信企业号开发之开发模式的开启

    java微信企业号开发之开发模式的开启

    这篇文章主要为大家详细介绍了java微信企业号开发之开发模式的开启方法,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • Spring AI + ollama 本地搭建聊天 AI 功能

    Spring AI + ollama 本地搭建聊天 AI 功能

    这篇文章主要介绍了Spring AI + ollama 本地搭建聊天 AI ,本文通过实例图文相结合给大家讲解的非常详细,需要的朋友可以参考下
    2024-11-11
  • mybatis-plus中BaseMapper入门使用

    mybatis-plus中BaseMapper入门使用

    本文主要介绍了mybatis-plus中BaseMapper入门使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Spring Boot2配置Swagger2生成API接口文档详情

    Spring Boot2配置Swagger2生成API接口文档详情

    这篇文章主要介绍了Spring Boot2配置Swagger2生成API接口文档详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • Springboot项目如何获取所有的接口

    Springboot项目如何获取所有的接口

    这篇文章主要介绍了Springboot项目如何获取所有的接口,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • SpringBoot新手入门的快速教程

    SpringBoot新手入门的快速教程

    这篇文章主要给大家介绍了关于SpringBoot新手入门的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用SpringBoot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-12-12
  • Java流程控制语句之If选择结构

    Java流程控制语句之If选择结构

    今天继续带大家复习Java流程控制语句的相关知识,本文对If选择结构作了非常详细的介绍及代码示例,对正在学习的小伙伴们很有帮助,需要的朋友可以参考下
    2021-06-06

最新评论