Kafka 在 java 中的基本使用步骤

 更新时间:2025年09月05日 10:55:38   作者:张小虎在学习  
本文给大家介绍Kafka在java中的基本使用,本文分步骤结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

1. 使用 kafka 原生客户端

现在基本都直接使用 springboot 版本,但了解原生客户端,能更好的理解 springboot 版的 kafka 客户端原理。

步骤1:pom 引入核心依赖:

引入依赖时,尽量选择和 kafka 版本对应的依赖版本。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

步骤2:提供者客户端代码:

提供者客户端要做三件事:

  1. 设置提供者客户端属性(可选属性都被定义在 ProducerConfig 类中)
  2. 设置要发送的消息
  3. 发送(有三种发送方式,下面代码中都有)
public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 第一步:设置提供者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 第二步:设置要发送的消息
            ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");
            // 第三部:发送消息
            // send(producer, record);
            // sendSync(producer, record);
            sendASync(producer, record);
        }
    }
    /**
     * 发送方式1:单向推送,不关心服务器的应答
     */
    private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record);
    }
    /**
     * 发送方式2:同步推送,得到服务器的应答前会阻塞当前线程
     */
    private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(metadata.topic());
        System.out.println(metadata.partition());
        System.out.println(metadata.offset());
    }
    /**
     * 发送方式3:异步推送,不需等待服务器应答,当服务器有应答后会触发函数回调
     */
    private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                throw new RuntimeException("向 kafka 推送失败", exception);
            }
            System.out.println(metadata.topic());
            System.out.println(metadata.partition());
            System.out.println(metadata.offset());
        });
    }
}

步骤3:消费者客户端代码:

消费者客户端要做三件事:

  1. 设置消费者客户端属性(可选属性都被定义在 ConsumerConfig 类中)
  2. 设置消费者订阅的主题
  3. 拉取消息
  4. 提交 offset(有两种提交方式,下面代码中都有)
public class MyConsumer {
    public static void main(String[] args) {
        // 第一步:设置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 第二步:设置要订阅的主题
            consumer.subscribe(Collections.singletonList("testTopic"));
            while (true) {
                // 第三步:拉取消息,100 代表最大等待时间,如果时间到了还没有拉取到消息就不阻塞了继续往后执行
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
                // 第四步:提交 offset
                // consumer.commitSync(); // 同步提交,表示必须等到 offset 提交完毕,再去消费下⼀批数据
                consumer.commitSync(); // 异步提交,表示发送完提交 offset 请求后,就开始消费下⼀批数据了。不⽤等到Broker的确认。
            }
        }
    }
}

2. Kafka 集成 springboot

springboot 版本是最常用的,比原生客户端使用方便。但是道理是一样的,底层也是原生客户端。

pom 引入核心依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

yaml 配置文件:

这一步无非就是把原生客户端中的属性配置,写在 yaml 中

spring:
  kafka:
    bootstrap-servers: 192.168.2.28:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

提供者客户端代码:

只需要两步:

  1. 注入 KafkaTemplate
  2. 发送
@RestController
public class ProducerController {
    /**
     * kafka
     */
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/test")
    public void send() {
        // 发送 kafka 消息
        kafkaTemplate.send("testTopic", "testKey", "testValue");
    }
}

消费者客户端代码:

只需要监听主题就可以

@RestController
public class ConsumerController {
    // 监听 kafka 消息
    @KafkaListener(topics = {"testTopic"})
    public void test(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

到此这篇关于Kafka 在 java 中的基本使用的文章就介绍到这了,更多相关Kafka java使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈java中的路径表示

    浅谈java中的路径表示

    下面小编就为大家带来一篇浅谈java中的路径表示。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04
  • SpringBoot中基于AOP和Semaphore实现API限流

    SpringBoot中基于AOP和Semaphore实现API限流

    调用速率限制是 Web API 中的常见要求,旨在防止滥用并确保公平使用资源,借助Spring Boot 中的 AOP,我们可以通过拦截方法调用并限制在特定时间范围内允许的请求数量来实现速率限制,需要的朋友可以参考下
    2024-10-10
  • ElasticSearch突然采集不到日志问题解决分析

    ElasticSearch突然采集不到日志问题解决分析

    这篇文章主要为大家介绍了ElasticSearch突然采集不到日志问题解决分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • Java如何获取发送请求的电脑的IP地址

    Java如何获取发送请求的电脑的IP地址

    文章介绍了如何通过HttpServletRequest获取客户端IP地址,特别是当客户端通过代理访问时,如何使用x-forwarded-for头来获取真实的IP地址
    2024-11-11
  • java数据结构和算法之马踏棋盘算法

    java数据结构和算法之马踏棋盘算法

    这篇文章主要为大家详细介绍了java数据结构和算法之马踏棋盘算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • spring java 动态获取consul K/V的方法

    spring java 动态获取consul K/V的方法

    这篇文章主要介绍了spring java 动态获取consul K/V的相关资料,主要包括springConsul配置kv路径以及自动注入consulKV到服务中,本文给大家介绍的非常详细,需要的朋友可以参考下
    2023-10-10
  • Java二维数组与稀疏数组相互转换实现详解

    Java二维数组与稀疏数组相互转换实现详解

    在某些应用场景中需要大量的二维数组来进行数据存储,但是二维数组中却有着大量的无用的位置占据着内存空间,稀疏数组就是为了优化二维数组,节省内存空间
    2022-09-09
  • Spring Boot中slf4j日志依赖关系示例详解

    Spring Boot中slf4j日志依赖关系示例详解

    在项目开发中,记录日志是必做的一件事情。而当我们使用Springboot框架时,记录日志就变得极其简单了。下面这篇文章主要给大家介绍了关于Spring Boot中slf4j日志依赖关系的相关资料,需要的朋友可以参考下
    2018-11-11
  • Mybatis中resultMap标签和sql标签的设置方式

    Mybatis中resultMap标签和sql标签的设置方式

    这篇文章主要介绍了Mybatis中resultMap标签和sql标签的设置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • springboot 使用zookeeper实现分布式队列的基本步骤

    springboot 使用zookeeper实现分布式队列的基本步骤

    这篇文章主要介绍了springboot 使用zookeeper实现分布式队列,通过ZooKeeper的协调和同步机制,多个应用程序可以共享一个队列,并按照先进先出的顺序处理队列中的消息,需要的朋友可以参考下
    2023-08-08

最新评论