springboot使用kafka的过程

 更新时间:2025年06月18日 09:17:31   作者:小鱼小鱼.oO  
本文介绍了Spring Boot集成Kafka的步骤,包括启动服务、配置生产者与消费者,以及Kafka从依赖Zookeeper到Kraft模式的版本演进,本文结合实例代码给大家介绍的非常详细,需要的朋友参考下吧

启动kafka

确保本地已安装并启动 Kafka 服务(或连接远程 Kafka 集群 ),比如通过 Kafka 官网下载解压后,启动 Zookeeper(老版本 Kafka 依赖,新版本用 KRaft 可不依赖 )和 Kafka 服务:

# 启动 Zookeeper(若用 KRaft 模式可跳过)

bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动 Kafka 服务

bin/kafka-server-start.sh config/server.properties

版本:

Kafka 从2.8.0版本开始引入了 KIP-500,提供了无 Zookeeper 的早期访问功能1。不过,此时的实现并不完全,不建议在生产环境中使用。

3.0版本开始真正全面摒弃 Zookeeper,使用新的元数据管理方式 Kraft,提高了 Kafka 的可扩展性、可用性和性能4。

4.0版本是第一个完全无需 Apache Zookeeper 运行的重大版本,将不再支持以 ZK 模式运行或从 ZK 模式迁移。

项目引依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.0</version> <!-- 版本按需选,建议用较新稳定版 -->
    </dependency>
</dependencies>

创建 Producer 类(编写生产者代码)

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerDemo {
    public static void main(String[] args) {
        // 1. 配置 Kafka 连接、序列化等参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器
        // 2. 创建 Producer 实例
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 3. 构造消息(指定主题、键、值)
        String topic = "test_topic"; // 要发送到的主题,需提前在 Kafka 创建或允许自动创建
        String key = "key1";
        String value = "Hello, Kafka from IDEA!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        // 4. 发送消息(异步发送 + 回调处理结果)
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("消息发送失败:" + exception.getMessage());
                } else {
                    System.out.printf("消息发送成功!主题:%s,分区:%d,偏移量:%d%n", 
                        metadata.topic(), metadata.partition(), metadata.offset());
                }
            }
        });
        // 5. 关闭 Producer(实际生产环境可能在程序结束时或合适时机关闭)
        producer.close();
    }
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
    private final static String TOPIC = "mytopic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                producer.send(new ProducerRecord<>(TOPIC, message));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

创建 Consumer 类(编写消费者代码)

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        // 1. 配置 Kafka 连接、反序列化、消费者组等参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
        props.put("group.id", "test_group"); // 消费者组 ID,同一组内消费者协调消费
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器
        props.put("auto.offset.reset", "earliest"); // 没有已提交偏移量时,从最早消息开始消费
        // 2. 创建 Consumer 实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 3. 订阅主题
        String topic = "test_topic";
        consumer.subscribe(Collections.singletonList(topic));
        // 4. 循环拉取消息(长轮询)
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息:主题=%s,分区=%d,偏移量=%d,键=%s,值=%s%n", 
                        record.topic(), record.partition(), record.offset(), 
                        record.key(), record.value());
                }
                // 手动提交偏移量(也可配置自动提交,生产环境建议手动更可靠)
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5. 关闭 Consumer
            consumer.close();
        }
    }
}
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    private final static String TOPIC = "mytopic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "mygroup";
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", GROUP_ID);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                // 处理接收到的消息
                records.forEach(record -> {
                    System.out.println("Received message: " + record.value());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

必须要素:

  • 必要配置
    • bootstrap.servers:Kafka 集群地址。
    • group.id:消费者组 ID(相同组内的消费者会负载均衡消费)。
    • key.deserializer 和 value.deserializer:消息键和值的反序列化器。
    • auto.offset.reset:消费位置重置策略(如 earliest 从最早消息开始消费)。
  • 订阅主题:通过 consumer.subscribe() 订阅目标主题。
  • 消息消费:通过 consumer.poll() 轮询拉取消息,并处理 ConsumerRecords
  • 偏移量管理:自动提交(enable.auto.commit=true)或手动提交(consumer.commitSync())消费偏移量。
  • 资源管理:使用后调用 consumer.close() 关闭连接。

与 Kafka 的对比

Kafka的Producer和Consumer需要手动管理连接和资源的关闭,因此在使用完毕后需要调用close方法来关闭Producer(或Consumer)。

总结来说,可以使用KafkaProducer的send方法来替代RabbitTemplate的convertAndSend方法在Kafka中发送消息。

Spring AMQP 是 Spring 框架提供的一个用于简化 AMQP(Advanced Message Queuing Protocol) 消息中间件开发的模块。它基于 AMQP 协议,提供了一套高层抽象和模板类,帮助开发者更便捷地实现消息发送和接收,支持多种 AMQP 消息中间件(如 RabbitMQ、Apache Qpid 等)。

维度Spring AMQP(RabbitMQ)Spring Kafka
协议AMQP(高级消息队列协议)Kafka 自研协议
消息模型支持多种交换器类型(Direct、Topic 等)基于主题(Topic)和分区(Partition)
顺序性单队列内保证顺序分区内保证顺序,多分区需按 Key 路由
吞吐量中等(万级 TPS)高(十万级 TPS)
适用场景企业集成、任务调度、事务性消息大数据、日志收集、实时流处理

到此这篇关于springboot使用kafka的文章就介绍到这了,更多相关springboot使用kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Druid关闭监控页面关闭不了的问题及解决

    Druid关闭监控页面关闭不了的问题及解决

    这篇文章主要介绍了Druid关闭监控页面关闭不了的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • springboot+thymeleaf打包成jar后找不到静态资源的坑及解决

    springboot+thymeleaf打包成jar后找不到静态资源的坑及解决

    这篇文章主要介绍了springboot+thymeleaf打包成jar后找不到静态资源的坑及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • java String类常量池分析及

    java String类常量池分析及"equals"和"==”区别详细介绍

    这篇文章主要介绍了java String类常量池分析及"equals"和"==”区别详细介绍的相关资料,需要的朋友可以参考下
    2016-12-12
  • Mybatis批量操作sql写法示例(批量新增、更新)

    Mybatis批量操作sql写法示例(批量新增、更新)

    Mybatis技术,现在是工作中使用频率越来越高,我们在对数据库进行操作的时候,经常会遇到批量操作的需求,这篇文章主要给大家介绍了关于Mybatis批量操作sql写法的相关资料,需要的朋友可以参考下
    2021-05-05
  • 深入探讨Druid动态数据源的实现方式

    深入探讨Druid动态数据源的实现方式

    Druid是一个高性能的实时分析数据库,它可以处理大规模数据集的快速查询和聚合操作,在Druid中,动态数据源是一种可以在运行时动态添加和删除的数据源,使用动态数据源,您可以在Druid中轻松地处理不断变化的数据集,本文讲给大家介绍一下Druid动态数据源该如何实现
    2023-08-08
  • Java中如何使用Gson将对象转换为JSON字符串

    Java中如何使用Gson将对象转换为JSON字符串

    这篇文章主要给大家介绍了关于Java中如何使用Gson将对象转换为JSON字符串的相关资料,Gson是Google的一个开源项目,可以将Java对象转换成JSON,也可能将JSON转换成Java对象,需要的朋友可以参考下
    2023-11-11
  • Spring如何消除代码中的if-else/switch-case

    Spring如何消除代码中的if-else/switch-case

    这篇文章主要给大家介绍了关于Spring如何消除代码中if-else/switch-case的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-04-04
  • java简单选择排序实例

    java简单选择排序实例

    这篇文章主要为大家详细介绍了java简单选择排序实例,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • 为spring get请求添加自定义的参数处理操作(如下划线转驼峰)

    为spring get请求添加自定义的参数处理操作(如下划线转驼峰)

    这篇文章主要介绍了为spring get请求添加自定义的参数处理操作(如下划线转驼峰),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • springboot+thymeleaf找不到视图的解决方案

    springboot+thymeleaf找不到视图的解决方案

    这篇文章主要介绍了springboot+thymeleaf找不到视图的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06

最新评论