SpringCloudStream中的消息分区数详解

 更新时间:2022年12月28日 11:52:34   作者:DayDayUp丶  
这篇文章主要介绍了SpringCloudStream中的消息分区数,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、前言

本文仅针对 Kafka 来聊消息分区数相关的话题。

SpringCloudStream 中的消息分区数如何配置?

或者说消息分区数会受到哪些配置的影响。

  • SpringCloudStream:Greenwich.SR2
  • Kafka:kafka_2.12-2.3.0

二、影响因素

2.1 Kafka服务端

首先应该想到的,Kafka 配置文件 server.properties 中默认每一个 topic 的分区数 num.partitions=1

# The default number of log partitions per topic. More partitions allow greater
num.partitions=1

2.2 生产者端

SpringCloudStream的配置中可以看到,生产者可以指定分区数,默认1:

spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n

【说明】:当分区功能开启时,使用该参数来配置消息数据的分区数。

如果消息生产者已经配置了分区键的生成策略,那么它的值必须大于1。

2.3 消费者端

SpringCloudStream 允许通过配置,使得消费者能够自动创建分区。

#输入通道消费者的并发数,默认1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2

若想以上配置生效,还需添加如下通用配置:

#Kafka绑定器允许在需要的时候自动创建分区。默认false
spring.cloud.stream.kafka.binder.autoAddPartitions=true

消费者端如此配置以后,将表现为一个消费者服务或进程中,会有2个线程各自消费1个分区,即2个消费者线程同时消费。

以下是该配置的效果验证步骤:

消费者代码:

1个 @StreamListener 消费自己的 topic 或自己的输出channel:

@EnableBinding(SpiderSink.class)
@Slf4j
public class SpiderSinkReceiver {
 
    @Autowired
    private SpiderMessageService spiderMessageService;
 
    @StreamListener(SpiderSink.INPUT)
    public void receive(Object payload) {
        log.info("SPIDER-SINK received: {}", payload);
    }
}

方式一:通过日志验证:

通过在 log4j 日志中,打印线程名称的方式,验证 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置确确实实会新增1个消费者线程。

[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
[INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)

方式二:直接查看分区数来验证:

另外,也可在启动一个生产者服务时,等待自动创建一个新 topic 后(此时默认分区数为1),比如我们创建的 topic 为“topic-spider-dev”,此时通过kafka命令查看分区数,此时分区数为1:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1

然后,配置消费者服务的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,启动一个消费者服务,再次查看分区数,已经变为2了:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-spider-dev Partition: 1    Leader: 2       Replicas: 2     Isr: 2

同时查看消费者端的应用日志,看到2个消费者线程各自分配了一个分区:

[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-0]
[INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-1]

最终,确确实实地验证了 concurrency 配置对消费者线程数和分区数的影响。

2.4 其他因素

比如,SpringCloudStream 中 Kafka 绑定器的配置中,也有一个相关的影响因素:

#最小分区数,默认1
spring.cloud.stream.kafka.binder.minPartitionCount=n

【说明】:该参数仅在设置了 autoCreateTopics 和 autoAddPartitions 时生效,用来设置该绑定器所使用主题的全局分区最小数量。

如果当生产者的 partitionCount 参数或 instanceCount * concurrency 设置大于该参数配置时,该参数值将被覆盖。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java NIO原理图文分析及代码实现

    Java NIO原理图文分析及代码实现

    本文主要介绍Java NIO原理的知识,这里整理了详细资料及简单示例代码和原理图,有需要的小伙伴可以参考下
    2016-09-09
  • 详解Java 微服务架构

    详解Java 微服务架构

    这篇文章主要介绍了Java 微服务架构的相关资料,帮助大家更好的理解和使用Java,感兴趣的朋友可以了解下
    2021-02-02
  • Java时区转换实例代码解析

    Java时区转换实例代码解析

    这篇文章主要介绍了Java时区转换实例代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • 关于Java中反射的作用义及使用方法

    关于Java中反射的作用义及使用方法

    这篇文章主要介绍了关于Java中反射的作用义及使用方法,反射使得程序可以在运行时对类进行检查和操作,而不需要在编译时知道类的完整信息,需要的朋友可以参考下
    2023-07-07
  • JAVA设计模式中的策略模式你了解吗

    JAVA设计模式中的策略模式你了解吗

    这篇文章主要为大家详细介绍了JAVA设计模式中的策略模式,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • java线程本地变量ThreadLocal详解

    java线程本地变量ThreadLocal详解

    ThreadLocal则为每一个线程提供了一个变量副本,从而隔离了多个线程访问数据的冲突,ThreadLocal提供了线程安全的对象封装,下面我们就来详细了解一下吧
    2019-06-06
  • java 常规轮询长轮询Long polling实现示例详解

    java 常规轮询长轮询Long polling实现示例详解

    这篇文章主要为大家介绍了java 常规轮询长轮询Long polling实现示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • java实现解析二进制文件的方法(字符串、图片)

    java实现解析二进制文件的方法(字符串、图片)

    本篇文章主要介绍了java实现解析二进制文件的方法(字符串、图片),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • Matlab及Java实现小时钟效果

    Matlab及Java实现小时钟效果

    这篇文章主要为大家详细介绍了Matlab及Java实现小时钟效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-05-05
  • 详解Java的按位操作符

    详解Java的按位操作符

    Java的位操作符用来操作整数基本数据类型中的单个“比特”(bit),即代进制位。下面通过本文给大家分享Java的按位操作符,感兴趣的朋友一起看看吧
    2017-09-09

最新评论