Kafka 在 java 中的基本使用步骤

 更新时间:2025年09月05日 10:55:38   作者:张小虎在学习  
本文给大家介绍Kafka在java中的基本使用,本文分步骤结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

1. 使用 kafka 原生客户端

现在基本都直接使用 springboot 版本,但了解原生客户端,能更好的理解 springboot 版的 kafka 客户端原理。

步骤1:pom 引入核心依赖:

引入依赖时,尽量选择和 kafka 版本对应的依赖版本。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

步骤2:提供者客户端代码:

提供者客户端要做三件事:

  1. 设置提供者客户端属性(可选属性都被定义在 ProducerConfig 类中)
  2. 设置要发送的消息
  3. 发送(有三种发送方式,下面代码中都有)
public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 第一步:设置提供者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 第二步:设置要发送的消息
            ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");
            // 第三部:发送消息
            // send(producer, record);
            // sendSync(producer, record);
            sendASync(producer, record);
        }
    }
    /**
     * 发送方式1:单向推送,不关心服务器的应答
     */
    private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record);
    }
    /**
     * 发送方式2:同步推送,得到服务器的应答前会阻塞当前线程
     */
    private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(metadata.topic());
        System.out.println(metadata.partition());
        System.out.println(metadata.offset());
    }
    /**
     * 发送方式3:异步推送,不需等待服务器应答,当服务器有应答后会触发函数回调
     */
    private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                throw new RuntimeException("向 kafka 推送失败", exception);
            }
            System.out.println(metadata.topic());
            System.out.println(metadata.partition());
            System.out.println(metadata.offset());
        });
    }
}

步骤3:消费者客户端代码:

消费者客户端要做三件事:

  1. 设置消费者客户端属性(可选属性都被定义在 ConsumerConfig 类中)
  2. 设置消费者订阅的主题
  3. 拉取消息
  4. 提交 offset(有两种提交方式,下面代码中都有)
public class MyConsumer {
    public static void main(String[] args) {
        // 第一步:设置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 第二步:设置要订阅的主题
            consumer.subscribe(Collections.singletonList("testTopic"));
            while (true) {
                // 第三步:拉取消息,100 代表最大等待时间,如果时间到了还没有拉取到消息就不阻塞了继续往后执行
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
                // 第四步:提交 offset
                // consumer.commitSync(); // 同步提交,表示必须等到 offset 提交完毕,再去消费下⼀批数据
                consumer.commitSync(); // 异步提交,表示发送完提交 offset 请求后,就开始消费下⼀批数据了。不⽤等到Broker的确认。
            }
        }
    }
}

2. Kafka 集成 springboot

springboot 版本是最常用的,比原生客户端使用方便。但是道理是一样的,底层也是原生客户端。

pom 引入核心依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

yaml 配置文件:

这一步无非就是把原生客户端中的属性配置,写在 yaml 中

spring:
  kafka:
    bootstrap-servers: 192.168.2.28:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

提供者客户端代码:

只需要两步:

  1. 注入 KafkaTemplate
  2. 发送
@RestController
public class ProducerController {
    /**
     * kafka
     */
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/test")
    public void send() {
        // 发送 kafka 消息
        kafkaTemplate.send("testTopic", "testKey", "testValue");
    }
}

消费者客户端代码:

只需要监听主题就可以

@RestController
public class ConsumerController {
    // 监听 kafka 消息
    @KafkaListener(topics = {"testTopic"})
    public void test(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

到此这篇关于Kafka 在 java 中的基本使用的文章就介绍到这了,更多相关Kafka java使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现选择排序算法的实例教程

    Java实现选择排序算法的实例教程

    这篇文章主要介绍了Java实现选择排序算法的实例教程,选择排序的时间复杂度为О(n&sup2;),需要的朋友可以参考下
    2016-05-05
  • SpringBoot和前端Vue的跨域问题及解决方案

    SpringBoot和前端Vue的跨域问题及解决方案

    所谓跨域就是从 A 向 B 发请求,如若他们的地址协议、域名、端口都不相同,直接访问就会造成跨域问题,跨域是非常常见的现象,这篇文章主要介绍了解决SpringBoot和前端Vue的跨域问题,需要的朋友可以参考下
    2023-11-11
  • Java中Controller引起的Ambiguous mapping问题及解决

    Java中Controller引起的Ambiguous mapping问题及解决

    这篇文章主要介绍了Java中Controller引起的Ambiguous mapping问题及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-10-10
  • Java通俗易懂系列设计模式之责任链模式

    Java通俗易懂系列设计模式之责任链模式

    这篇文章主要介绍了Java通俗易懂系列设计模式之责任链模式,对设计模式感兴趣的同学,一定要看一下
    2021-04-04
  • Spring Boot多模块化后,服务间调用的坑及解决

    Spring Boot多模块化后,服务间调用的坑及解决

    这篇文章主要介绍了Spring Boot多模块化后,服务间调用的坑及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • springboot整合druid的完整流程记录

    springboot整合druid的完整流程记录

    Java程序很大一部分要操作数据库,为了提高性能操作数据库的时候,又不得不使用数据库连接池,Druid是阿里巴巴开源平台上一个数据库连接池实现,结合了DBCP等DB池的优点,同时加入了日志监控,这篇文章主要介绍了springboot整合druid的相关资料,需要的朋友可以参考下
    2026-02-02
  • Java实现简单的模板渲染

    Java实现简单的模板渲染

    这篇文章主要为大家详细介绍了Java实现简单的模板渲染的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-12-12
  • 一文搞懂Java并发AQS的共享锁模式

    一文搞懂Java并发AQS的共享锁模式

    这篇文章主要为大家阐述AQS另外一个重要模式,共享锁模式。共享锁可以由多个线程同时获取, 比较典型的就是读锁,感兴趣的小伙伴可以了解一下
    2022-10-10
  • 多线程_解决Runnable接口无start()方法的情况

    多线程_解决Runnable接口无start()方法的情况

    这篇文章主要介绍了多线程_解决Runnable接口无start()方法的情况,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • SpringCloud Alibaba Sentinel 从入门到精通

    SpringCloud Alibaba Sentinel 从入门到精通

    本文介绍了Sentinel的核心概念、使用方式及规则配置,Sentinel是阿里巴巴开源的流量控制框架,主要应用于流量控制、熔断降级等场景,文中有详细的规则配置示例,并介绍了如何与Feign整合,感兴趣的朋友一起看看吧
    2026-04-04

最新评论