Kafka Java Producer代码实例详解

 更新时间:2020年06月04日 10:05:31   作者:liuming_1992  
这篇文章主要介绍了Kafka Java Producer代码实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer

Kafka的Producer API主要提供下列三个方法:

  •   public void send(KeyedMessage<K,V> message) 发送单条数据到Kafka集群
  •   public void send(List<KeyedMessage<K,V>> messages) 发送多条数据(数据集)到Kafka集群
  •   public void close() 关闭Kafka连接资源

一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducerPartitioner implements Partitioner {

  /**
   * 无参构造函数
   */
  public JavaKafkaProducerPartitioner() {
    this(new VerifiableProperties());
  }

  /**
   * 构造函数,必须给定
   *
   * @param properties 上下文
   */
  public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
    // nothings
  }

  @Override
  public int partition(Object key, int numPartitions) {
    int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
    return num % numPartitions;
  }
}

二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducer {
  private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
  public static final String TOPIC_NAME = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
  public static final int chartsLength = charts.length;


  public static void main(String[] args) {
    String brokerList = "192.168.187.149:9092";
    brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerList = "192.168.187.146:9092";
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    /**
     * 0表示不等待结果返回<br/>
     * 1表示等待至少有一个服务器返回数据接收标识<br/>
     * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
     * */
    props.put("request.required.acks", "0");
    /**
     * 内部发送数据是异步还是同步
     * sync:同步, 默认
     * async:异步
     */
    props.put("producer.type", "async");
    /**
     * 设置序列化的类
     * 可选:kafka.serializer.StringEncoder
     * 默认:kafka.serializer.DefaultEncoder
     */
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    /**
     * 设置分区类
     * 根据key进行数据分区
     * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
     * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
     */
    props.put("partitioner.class", "JavaKafkaProducerPartitioner");

    // 重试次数
    props.put("message.send.max.retries", "3");

    // 异步提交的时候(async),并发提交的记录数
    props.put("batch.num.messages", "200");

    // 设置缓冲区大小,默认10KB
    props.put("send.buffer.bytes", "102400");

    // 2. 构建Kafka Producer Configuration上下文
    ProducerConfig config = new ProducerConfig(props);

    // 3. 构建Producer对象
    final Producer<String, String> producer = new Producer<String, String>(config);

    // 4. 发送数据到服务器,并发线程发送
    final AtomicBoolean flag = new AtomicBoolean(true);
    int numThreads = 50;
    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
    for (int i = 0; i < 5; i++) {
      pool.submit(new Thread(new Runnable() {
        @Override
        public void run() {
          while (flag.get()) {
            // 发送数据
            KeyedMessage message = generateKeyedMessage();
            producer.send(message);
            System.out.println("发送数据:" + message);

            // 休眠一下
            try {
              int least = 10;
              int bound = 100;
              Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }

          System.out.println(Thread.currentThread().getName() + " shutdown....");
        }
      }, "Thread-" + i));

    }

    // 5. 等待执行完成
    long sleepMillis = 600000;
    try {
      Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    flag.set(false);

    // 6. 关闭资源

    pool.shutdown();
    try {
      pool.awaitTermination(6, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    } finally {
      producer.close(); // 最后之后调用
    }
  }

  /**
   * 产生一个消息
   *
   * @return
   */
  private static KeyedMessage<String, String> generateKeyedMessage() {
    String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
    StringBuilder sb = new StringBuilder();
    int num = ThreadLocalRandom.current().nextInt(1, 5);
    for (int i = 0; i < num; i++) {
      sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");
    }
    String message = sb.toString().trim();
    return new KeyedMessage(TOPIC_NAME, key, message);
  }

  /**
   * 产生一个给定长度的字符串
   *
   * @param numItems
   * @return
   */
  private static String generateStringMessage(int numItems) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < numItems; i++) {
      sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);
    }
    return sb.toString();
  }
}

三、Pom.xml依赖配置如下

<properties>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Java解析变量公式的简单示例

    Java解析变量公式的简单示例

    在Java编程中,经常会遇到需要解析表达式或公式的情况,特别是涉及到动态计算或配置项的场景,在本篇文章中,我将介绍如何在Java中解析变量公式,并给出一个简单的实现示例,需要的朋友可以参考下
    2024-10-10
  • Java缩小文件内存占用的方法技巧分享

    Java缩小文件内存占用的方法技巧分享

    在Java应用程序中,处理大文件时经常会遇到内存占用过高的问题,为了缩小文件的内存占用,我们可以采取一些有效的方法来优化和管理内存的使用,本文将介绍一些在Java中缩小文件内存占用的技巧,需要的朋友可以参考下
    2024-10-10
  • 如何解决shardingsphere报错Missing the data source name:‘null‘

    如何解决shardingsphere报错Missing the data source name:‘null‘

    使用ShardingSphere进行分库操作时,如果遇到“Missing the datasource name: ‘null’”的错误,通常是因为所操作的表没有配置相关的路由信息,例如,如果在properties中仅配置了health_record和health_task的路由规则
    2024-11-11
  • java swing实现贪吃蛇双人游戏

    java swing实现贪吃蛇双人游戏

    这篇文章主要为大家详细介绍了java swing实现贪吃蛇双人小游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-01-01
  • springBoot项目如何实现启动多个实例

    springBoot项目如何实现启动多个实例

    这篇文章主要介绍了springBoot项目如何实现启动多个实例的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Java编写时间工具类ZTDateTimeUtil的示例代码

    Java编写时间工具类ZTDateTimeUtil的示例代码

    这篇文章主要为大家详细介绍了如何利用Java编写时间工具类ZTDateTimeUtil,文中的示例代码讲解详细,有需要的小伙伴可以跟随小编一起学习一下
    2023-11-11
  • Java获取XML节点总结之读取XML文档节点的方法

    Java获取XML节点总结之读取XML文档节点的方法

    下面小编就为大家带来一篇Java获取XML节点总结之读取XML文档节点的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • java中redissonClient 分布式锁的使用

    java中redissonClient 分布式锁的使用

    在集群的情况下,用户多次请求接口时,存入的内容可能会导致重复,这时候就可以使用分布式锁来限制,本文就来介绍一下java中redissonClient 分布式锁的使用,感兴趣的可以了解一下
    2024-03-03
  • spring boot admin 搭建详解

    spring boot admin 搭建详解

    本篇文章主要介绍了spring boot admin 搭建详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • 95%的Java程序员人都用不好Synchronized详解

    95%的Java程序员人都用不好Synchronized详解

    这篇文章主要为大家介绍了95%的Java程序员人都用不好Synchronized详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-03

最新评论