kafka自定义分区器使用详解

 更新时间:2025年11月19日 11:20:38   作者:princeAladdin  
本文介绍了如何根据企业需求自定义Kafka分区器,只需实现Partitioner接口并重写partition()方法,示例中,包含"cuihaida"的数据发送到0号分区,否则发送到1号分区,在生产者配置中添加分区器参数即可使用

kafka自定义分区器

根据企业需求,自己重新实现分区器

只需要定义类实现Partitioner接口,然后重写partition()方法即可

假设现在有一个需求

发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区

package com.example.kafkademo.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口Partitioner
 * 2. 实现3个方法:partition,close,configure
 * 3. 编写partition方法,返回分区号
 */
public class MyPartitioner implements Partitioner {

    /**
     * 重写这个方法
     * @param topic 主题
     * @param key 消息的key
     * @param keyBytes 消息的key序列化后的字节数组
     * @param value 消息的值
     * @param valueBytes 消息的值序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return 信息对应的分区
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
        return msgValue.contains("cuihaida") ? 0 : 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

使用分区器的方法

在生产者的配置中添加分区器参数

package com.example.kafkademo.util;

import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class CommonUtils {
    /**
     * kafka生产者配置配置
     * @return 配置内容
     */
    public static Properties buildKafkaProperties() {
        // 1. 创建kafka生产者配置对象
        Properties properties = new Properties();
        // 2. 给kafka的配置对象添加信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key, value初始化【必须有】
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
        // =========> 添加自定义分区器 <============
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")
        
        return properties;
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java通过Excel批量上传数据的实现示例

    java通过Excel批量上传数据的实现示例

    Excel批量上传是常见的一种功能,本文就来介绍一下java通过Excel批量上传数据的实现示例,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • Java中logback 自动刷新不生效的问题解决

    Java中logback 自动刷新不生效的问题解决

    本文主要介绍了Java中logback 自动刷新不生效的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05
  • tomcat301与java301解析

    tomcat301与java301解析

    这篇文章主要介绍了omcat301与java301,有需要的朋友可以参考一下
    2014-01-01
  • Java实现复制文件并命名的超简洁写法

    Java实现复制文件并命名的超简洁写法

    这篇文章主要介绍了Java实现复制文件并命名的超简洁写法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java类初始化顺序详解

    Java类初始化顺序详解

    这篇文章主要介绍了Java类初始化顺序详解,java语言在使用过程中最先开始就是初始化,在工作中如果遇到什么问题需 要定位往往到最后也可能是初始化的问题,因此掌握初始化的顺序很重要,需要的朋友可以参考下
    2023-08-08
  • 基于JavaCore文件的深入分析

    基于JavaCore文件的深入分析

    本篇文章介绍了,对JavaCore文件的深入分析。需要的朋友参考下
    2013-05-05
  • 关于MD5算法原理与常用实现方式

    关于MD5算法原理与常用实现方式

    这篇文章主要介绍了关于MD5算法原理与常用实现方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-08-08
  • 解决java.lang.StringIndexOutOfBoundsException: String index out of range: -1错误问题

    解决java.lang.StringIndexOutOfBoundsException: String&nbs

    这篇文章主要介绍了解决java.lang.StringIndexOutOfBoundsException: String index out of range: -1错误问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • Java BigInteger类,BigDecimal类,Date类,DateFormat类及Calendar类用法示例

    Java BigInteger类,BigDecimal类,Date类,DateFormat类及Calendar类用法示例

    这篇文章主要介绍了Java BigInteger类,BigDecimal类,Date类,DateFormat类及Calendar类用法,结合实例形式详细分析了Java使用BigInteger类,BigDecimal类,Date类,DateFormat类及Calendar类进行数值运算与日期运算相关操作技巧,需要的朋友可以参考下
    2019-03-03
  • Java方法参数传递如何实现

    Java方法参数传递如何实现

    这篇文章主要介绍了Java方法参数传递如何实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05

最新评论