Java kafka如何实现自定义分区类和拦截器

 更新时间:2020年06月09日 09:42:23   作者:护花使者  
这篇文章主要介绍了Java kafka如何实现自定义分区类和拦截器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,CustomPartitioner实现Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 当前的发送的topic
   * @param key  当前的key值
   * @param keyBytes 当前的key的字节数组
   * @param value 当前的value值
   * @param valueBytes 当前的value的字节数组
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //这边根据返回值就是分区号, 这边就是固定发送到三号分区
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, ?> configs) {

  }

}

2、producer配置文件指定,具体的分区类

// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer拦截器

拦截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.ProducerInterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是MessageInterceptor的configure方法");
  }

  /**
   * 这个是消息发送之前进行处理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 创建一个新的record,把uuid入消息体的最前部
    System.out.println("为消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 这个是生产者回调函数调用之前处理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定义计数拦截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 统计成功和失败的次数
    System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存结果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

3、producer客户端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 请求延时,可能生产数据太快了
    props.put("linger.ms", 1);
    // 发送缓存区内存大小,数据是先放到生产者的缓冲区
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 具体的分区类
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //定义拦截器
    List<String> interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("这是producer回调函数");
        }
      });
    }
    /*System.out.println("现在执行关闭producer");
    producer.close();*/
    producer.close();
  }
}

总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:

(1)执行A的configure方法,执行B的configure方法

(2)执行A的onSend方法,B的onSend方法

(3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)执行producer自身的callback回调函数。

(5)执行A的close方法,B的close方法。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot 整合 ElasticSearch操作各种高级查询搜索

    SpringBoot 整合 ElasticSearch操作各种高级查询搜索

    这篇文章主要介绍了SpringBoot 整合 ES 进行各种高级查询搜索的实践记录,本文主要围绕 SpringBoot 整合 ElasticSearch 进行各种高级查询的介绍,需要的朋友可以参考下
    2022-06-06
  • Java日志相关技术_动力节点Java学院整理

    Java日志相关技术_动力节点Java学院整理

    这篇文章主要介绍了Java日志相关技术_动力节点Java学院整理的相关资料,需要的朋友可以参考下
    2017-07-07
  • 基于Java8并行流(parallelStream)的注意点

    基于Java8并行流(parallelStream)的注意点

    这篇文章主要介绍了Java8并行流(parallelStream)的注意点,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Java责任链模式定义与用法分析

    Java责任链模式定义与用法分析

    这篇文章主要介绍了Java责任链模式定义与用法,结合具体实例分析了java责任链模式的功能、定义、使用方法、适用情况等,需要的朋友可以参考下
    2017-06-06
  • RocketMQ中消费者的消费进度管理

    RocketMQ中消费者的消费进度管理

    这篇文章主要介绍了RocketMQ中消费者的消费进度管理,业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才会认为这批消息(默认是1条)是消费完成的,需要的朋友可以参考下
    2023-10-10
  • spring boot环境抽象的实现方法

    spring boot环境抽象的实现方法

    在实际开发中,开发人员在编写springboot的时候通常要在本地环境测试然后再部署到Production环境,这两种环境一般来讲是不同的,最主要的区别就是数据源的不同。本文主要介绍了这两种,感兴趣的可以了解一下
    2019-04-04
  • Sentinel热门词汇限流的实现详解

    Sentinel热门词汇限流的实现详解

    这篇文章主要介绍了使用Sentinel对热门词汇进行限流的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • SpringCloud使用Feign实现服务调用

    SpringCloud使用Feign实现服务调用

    这篇文章主要为大家详细介绍了SpringCloud使用Feign实现服务调用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • JDBC核心技术详解

    JDBC核心技术详解

    这篇文章主要介绍了JDBC核心技术详解,文中有非常详细的代码示例,对正在学习JDBC的小伙伴们有很好的帮助,需要的朋友可以参考下
    2021-05-05
  • 使用Feign实现微服务间文件下载

    使用Feign实现微服务间文件下载

    这篇文章主要为大家详细介绍了使用Feign实现微服务间文件下载,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04

最新评论