Springboot项目消费Kafka数据的方法

 更新时间:2025年01月14日 14:22:31   作者:布朗克168  
本文详细介绍了如何在Spring Boot项目中配置和实现Kafka消费者和生产者,结合实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧

一、引入依赖

你需要在 pom.xml 中添加 spring-kafka 相关依赖:

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:

application.yml 示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服务器地址
    consumer:
      group-id: my-consumer-group   # 消费者组ID
      auto-offset-reset: earliest   # 消费者从头开始读取(如果没有已提交的偏移量)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串
    listener:
      missing-topics-fatal: false    # 如果主题不存在,不抛出致命错误

application.properties 示例:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 设置value的反序列化器为字符串

注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 对 Key 使用 StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 对 Value 使用 ErrorHandlingSerializer
      properties:
        spring.json.value.default.type: com.example.Order  # 默认的 JSON 反序列化目标类型为 Order

三、创建 Kafka 消费者

创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息

(一)Kafka生产的消息是JSON 字符串

1、方式一

如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
Java 对象(如 Order)。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka  // 启用 Kafka 消费者
public class KafkaConsumer {
    private final ObjectMapper objectMapper = new ObjectMapper();
    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(String message) {
        try {
            // 将 JSON 字符串反序列化为 Order 对象
            Order order = objectMapper.readValue(message, Order.class);
            System.out.println("Received order: " + order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

说明:

@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。

2、方式二:需要直接访问消息元数据

可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, String> record) {
        // 获取消息的详细信息
        String key = record.key();           // 获取消息的 key
        String value = record.value();       // 获取消息的 value
        String topic = record.topic();       // 获取消息的 topic
        int partition = record.partition(); // 获取消息的分区
        long offset = record.offset();      // 获取消息的偏移量
        long timestamp = record.timestamp(); // 获取消息的时间戳
        // 处理消息(这里我们只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

(二)Kafka生产的消息是对象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 监听 Kafka 中的 order-topic 主题
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, Order> record) {
        // 获取消息的详细信息
        String key = record.key();           // 获取消息的 key
        Order value = record.value();       // 获取消息的 value
        String topic = record.topic();       // 获取消息的 topic
        int partition = record.partition(); // 获取消息的分区
        long offset = record.offset();      // 获取消息的偏移量
        long timestamp = record.timestamp(); // 获取消息的时间戳
        // 处理消息(这里我们只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

四、创建 启动类

确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

五、配置 Kafka 生产者(可选)

(一)消息类型为json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;  // 发送的是 String 类型消息
    private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化
    // 发送订单到 Kafka
    public void sendOrder(String topic, Order order) {
        try {
            // 将 Order 对象转换为 JSON 字符串
            String orderJson = objectMapper.writeValueAsString(order);
            // 发送 JSON 字符串到 Kafka
            kafkaTemplate.send(topic, orderJson);  // 发送字符串消息
            System.out.println("Order JSON sent to Kafka: " + orderJson);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(二)消息类型为对象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    // 发送订单到 Kafka
    public void sendOrder(String topic, Order order) {
        kafkaTemplate.send(topic, order);  // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON
    }
}

六、启动 Kafka 服务

启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

七、测试 Kafka 消费者

你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;
    @GetMapping("/sendOrder")
    public String sendOrder() {
        Order order = new Order();
        order.setOrderId(1L);
        order.setUserId(123L);
        order.setProduct("Laptop");
        order.setQuantity(2);
        order.setStatus("Created");
        kafkaProducer.sendOrder("order-topic", order);
        return "Order sent!";
    }
}

当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。

九、测试和调试

你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。

十、 结语

至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。

到此这篇关于Springboot项目如何消费Kafka数据的文章就介绍到这了,更多相关Springboot消费Kafka数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 如何计算Java对象占用了多少空间?

    如何计算Java对象占用了多少空间?

    在Java中没有sizeof运算符,所以没办法知道一个对象到底占用了多大的空间,但是在分配对象的时候会有一些基本的规则,我们根据这些规则大致能判断出来对象大小,需要的朋友可以参考下
    2016-01-01
  • Java编程细节重构之为什么if-else不是好代码详析

    Java编程细节重构之为什么if-else不是好代码详析

    这篇文章主要给大家介绍了关于Java编程细节重构之为什么if-else不是好代码的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学些学习吧
    2018-09-09
  • SpringBoot中加密模块的使用

    SpringBoot中加密模块的使用

    本文主要介绍了SpringBoot中加密模块的使用,包括对称加密、非对称加密和哈希加密等,同时还会提供相应的代码示例,感兴趣的朋友可以参考一下
    2023-05-05
  • Spring Boot 内置工具类ReflectionUtils的实现

    Spring Boot 内置工具类ReflectionUtils的实现

    ReflectionUtils是一个反射工具类,它封装了Java反射的操作,使得我们能够更轻松地操作和访问类的方法、字段,本文主要介绍了Spring Boot 内置工具类ReflectionUtils的实现,感兴趣的可以了解一下
    2023-11-11
  • Java使用Cookie实现认证跳转功能

    Java使用Cookie实现认证跳转功能

    在 Web 开发中,用户身份认证是一个基础而关键的功能点,本文将通过一个简单的前后端示例系统,介绍如何基于 Cookie 实现 Token 保存与自动跳转认证的功能,并结合 Cookie 与 Header 的区别、使用场景、安全性等维度做全面分析,需要的朋友可以参考下
    2025-06-06
  • Java实现操作excel表格

    Java实现操作excel表格

    在日常工作中,对Excel工作表格的操作处理可是多的数不清楚,下面是java语言对其的操作,有需要的小伙伴可以参考下
    2015-10-10
  • mybatis的坑-integer类型为0的数据if test失效问题

    mybatis的坑-integer类型为0的数据if test失效问题

    这篇文章主要介绍了mybatis的坑-integer类型为0的数据if test失效问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • 使用SpringBoot-JPA进行自定义保存及批量保存功能

    使用SpringBoot-JPA进行自定义保存及批量保存功能

    这篇文章主要介绍了使用SpringBoot-JPA进行自定义的保存及批量保存功能,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-06-06
  • SpringBoot应用快速部署到K8S的详细教程

    SpringBoot应用快速部署到K8S的详细教程

    这篇文章主要介绍了SpringBoot应用快速部署到K8S的详细教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Java中的ThreadLocal线程变量详解

    Java中的ThreadLocal线程变量详解

    这篇文章主要介绍了Java中的ThreadLocal线程变量详解,ThreadLocal叫做线程变量,意思是在ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的,它是用来提供线程内部的局部变量,需要的朋友可以参考下
    2024-01-01

最新评论