怎样给Kafka新增分区

 更新时间:2022年12月27日 15:19:16   作者:KK架构  
这篇文章主要介绍了怎样给Kafka新增分区问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

给Kafka新增分区

数据量猛增的时候,需要给 kafka 的 topic 新增分区,增大处理的数据量,可以通过以下步骤

1、修改 topic 的分区

kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3

2、迁移数据

生成迁移计划,手动新建一个 json 文件

{
"topics": [
{"topic": "flink-test-03"}
],
"version": 1
}

生成迁移计划

kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate

Current partition replica assignment:

{"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}

新建一个文件reassignment.json,保存上边这些信息

3、迁移

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute

4、验证

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify

Kafka分区原理机制

分区结构

kafka的消息总共是三层结构

Topic(第一层结构,表示一个主题)-> Partition(分区,每个消息可以有多个分区) -> 消息实例(具体的消息文本等等,一个消息实例只可能在一个分区里面,不会出现在多个分区中)


在这里插入图片描述

分区优点

分区其实是一个负载均衡的思想。如此设计能使每一个分区独自处理单独的读写请求,提高吞吐量。

分区策略

  • 轮询策略Round-robin(未指定key新版本默认策略)
  • 随机策略Randomness(老版本默认策略)
  • 消息键排序策略Key-ordering(指定了key,则使用该策略)
  • 根据地理位置进行分区
  • 自定义分区 需要在生产者端实现org.apache.kafka.clients.producer.Partitioner接口,并配置一下实现类的全限定名

根据分区策略实现消息的顺序消费

可以只设置一个分区,这样子消息都是放在一个partition,肯定是先进先出进行消费,然而这种场景无法利用kafka多分区的高吞吐量以及负载均衡的优势。

将需要顺序消费的消息设置key,这个时候根据默认的分区策略,kafka会将所有的相同的key放在一个partition上面,这样既可以使用kafka的partition又可以实现顺序消费。

默认分区策略源码

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public void configure(Map<String, ?> configs) {}
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close() {}
}

从类注释当中已经很明显的看出来分区逻辑

3. 如果指定了分区,则使用指定分区

4. 如果没有指定分区,但是有key,则使用hash过的key放置消息

5. 如果没有指定分区,也没有key,则使用轮询

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java Swing组件文件选择器JFileChooser简单用法示例

    Java Swing组件文件选择器JFileChooser简单用法示例

    这篇文章主要介绍了Java Swing组件文件选择器JFileChooser简单用法,结合实例形式分析了Swing组件中的文件选择器JFileChooser的简单使用方法,需要的朋友可以参考下
    2017-11-11
  • Java唤醒本地应用的两种方法详解

    Java唤醒本地应用的两种方法详解

    这篇文章主要为大家介绍了Java唤醒本地应用的两种方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • 你真的懂java的日志系统吗

    你真的懂java的日志系统吗

    日志管理的第一件事,就是日志的收集,日志收集是开发者必备的技巧,不管是哪个开发语言,哪个开发平台,日志收集的插件都是有很多选择的,下面这篇文章主要给大家介绍了关于java日志系统的相关资料,需要的朋友可以参考下
    2022-02-02
  • java编程FinalReference与Finalizer原理示例详解

    java编程FinalReference与Finalizer原理示例详解

    这篇文章主要为大家介绍了java编程FinalReference与Finalizer的核心原理以及示例源码的分析详解,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-01-01
  • 如何利用泛型封装通用的service层

    如何利用泛型封装通用的service层

    这篇文章主要介绍了如何利用泛型封装通用的service层,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • SpringBoot集成PostgreSQL并设置最大连接数

    SpringBoot集成PostgreSQL并设置最大连接数

    本文主要介绍了SpringBoot集成PostgreSQL并设置最大连接数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-11-11
  • Java详细介绍单例模式的应用

    Java详细介绍单例模式的应用

    单例模式(Singleton Pattern)是 Java 中最简单的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式
    2022-09-09
  • RocketMQ NameServer架构设计启动流程

    RocketMQ NameServer架构设计启动流程

    这篇文章主要为大家介绍了RocketMQ NameServer架构设计启动流程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-02-02
  • Spring MVC中使用Google kaptcha验证码的方法详解

    Spring MVC中使用Google kaptcha验证码的方法详解

    kaptcha 是一个非常实用的验证码生成工具。有了它,你可以生成各种样式的验证码,因为它是可配置的,下面这篇文章主要给大家介绍了关于Spring MVC中使用Google kaptcha验证码的方法,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-10-10
  • Java实现的猜数字游戏示例

    Java实现的猜数字游戏示例

    这篇文章主要介绍了Java实现的猜数字游戏,涉及Java数学运算与判断相关操作技巧,需要的朋友可以参考下
    2018-06-06

最新评论