Kafka中Producer和Consumer的作用详解

 更新时间:2023年12月05日 09:45:59   作者:杨荧  
这篇文章主要介绍了Kafka中Producer和Consumer的作用详解,Kafka是一个分布式的流处理平台,它的核心是消息系统,Producer是Kafka中用来将消息发送到Broker的组件之一,它将消息发布到主题,并且负责按照指定的分区策略将消息分配到对应的分区中,需要的朋友可以参考下

一、Producer

Kafka是一个分布式的流处理平台,它的核心是消息系统。Producer是Kafka中用来将消息发送到Broker的组件之一。它将消息发布到主题(topic),并且负责按照指定的分区策略将消息分配到对应的分区中。

下面是使用Java语言编写的Kafka Producer示例代码:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("acks", "all"); // 所有副本都响应了才认为发送成功
        props.put("retries", 0); // 发送失败时重试次数
        props.put("batch.size", 16384); // 缓冲区大小
        props.put("linger.ms", 1); // 延迟1ms发送以便等待更多的消息
        props.put("buffer.memory", 33554432); // 缓存总量
        // key和value序列化方式,这里使用默认的StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test_topic", Integer.toString(i), "hello world" + i));
        }
        producer.close();
    }
}

上述代码中,我们先设置了Kafka集群地址、消息确认方式等参数。

然后使用这些参数创建一个KafkaProducer实例,并通过send方法发送消息到指定的主题。

在这个例子中,我们将10条带有字符串"hello world"的消息发送到名为"test_topic"的主题中。最后别忘了关闭producer连接。

二、Consumer

Kafka是一个分布式流媒体平台,其中Consumer是Kafka中消费数据的组件之一。

Kafka Consumer可以订阅一个或多个Topic,并从这些Topic中消费消息。

Kafka Consumer可以以不同的方式处理消息,例如将其写入到数据库、打印出来或进行其他自定义处理。

Kafka Consumer使用一组API来与Kafka Broker通信,并接收Broker返回的数据。

在接收到数据后,Consumer会将其提交给应用程序,由应用程序进一步处理。

以下是一个使用Java编写的Kafka Consumer样例代码:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s value=%s%n", record.key(), record.value());
            }
        }
    }
}

在这个样例代码中,我们首先创建了一个Properties对象,其中包含连接Kafka Broker所需的配置信息。

然后,我们创建了一个Kafka Consumer实例,并订阅了名为“test-topic”的Topic。

最后,在while循环中,我们使用poll()方法从Broker获取消息,并在控制台上打印出每条消息的键和值。

三、Producer和Consumer有什么作用?

Kafka是一个分布式的消息队列系统,Producer和Consumer都是Kafka中的核心组件之一。

Producer负责向Kafka集群发送消息,将消息发布到一个或多个主题(topic)中。Producer可以选择在消息发送成功后等待确认(ack)或不等待,在等待确认时会阻塞,直到收到Broker返回的确认信息。

而Consumer则是从Kafka集群消费消息,并且订阅一个或多个主题。每个Consumer在消费消息时都有自己独立的offset(偏移量),用来标识该Consumer已经消费到哪个位置。消费者可以随时停止消费或重新开始消费,而不影响其他Consumer的消费进度。

总体来说,Producer和Consumer的作用是实现了消息的生产和消费,帮助用户构建高可靠、高性能的消息处理系统。

到此这篇关于Kafka中Producer和Consumer的作用详解的文章就介绍到这了,更多相关Producer和Consumer的作用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现简单树结构

    Java实现简单树结构

    这篇文章主要为大家详细介绍了Java实现简单树结构的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-01-01
  • 实现一个规则引擎的可视化具体方案

    实现一个规则引擎的可视化具体方案

    项目原因需要用到规则引擎,但是发现大部分不可以自由的进行规则定义,通过不断尝试变换关键字在搜索引擎搜索,最终在stackoverflow找到了一个探讨这个问题的帖子,特此将帖子中提到的方案分享一下,如果你跟我一样在研究同样的问题,也许对你有用
    2021-04-04
  • java freemarker实现动态生成excel文件

    java freemarker实现动态生成excel文件

    这篇文章主要为大家详细介绍了java如何通过freemarker实现动态生成excel文件,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-12-12
  • JAVA利用HttpClient进行POST请求(HTTPS)实例

    JAVA利用HttpClient进行POST请求(HTTPS)实例

    下面小编就为大家带来一篇JAVA利用HttpClient进行POST请求(HTTPS)实例。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起 小编过来看看吧
    2016-11-11
  • Java中八大包装类举例详解(通俗易懂)

    Java中八大包装类举例详解(通俗易懂)

    这篇文章主要介绍了Java中的包装类,包括它们的作用、特点、用途以及如何进行装箱和拆箱,包装类还提供了许多实用方法,如转换、获取基本类型值、比较和类型检测,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-02-02
  • Java核心技术之反射

    Java核心技术之反射

    本文非常详细的讲解了java反射的相关资料,java反射在现今的使用中很频繁,希望此文可以帮大家解答疑惑,可以帮助大家理解
    2021-11-11
  • Mybatis Plus使用条件构造器增删改查功能的实现方法

    Mybatis Plus使用条件构造器增删改查功能的实现方法

    这篇文章主要介绍了Mybatis-Plus使用条件构造器增删改查,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-05-05
  • Java初学者之五子棋游戏实现教程

    Java初学者之五子棋游戏实现教程

    这篇文章主要为大家详细介绍了Java初学者之五子棋游戏实现教程,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • SpringBoot如何配置文件给bean赋值问题

    SpringBoot如何配置文件给bean赋值问题

    这篇文章主要介绍了SpringBoot如何配置文件给bean赋值问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • Spring MVC InitBinder验证方法

    Spring MVC InitBinder验证方法

    这篇文章主要介绍了Spring MVC InitBinder验证方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03

最新评论