SpringCloudStream中的消息分区数详解

 更新时间:2022年12月28日 11:52:34   作者:DayDayUp丶  
这篇文章主要介绍了SpringCloudStream中的消息分区数,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、前言

本文仅针对 Kafka 来聊消息分区数相关的话题。

SpringCloudStream 中的消息分区数如何配置?

或者说消息分区数会受到哪些配置的影响。

  • SpringCloudStream:Greenwich.SR2
  • Kafka:kafka_2.12-2.3.0

二、影响因素

2.1 Kafka服务端

首先应该想到的,Kafka 配置文件 server.properties 中默认每一个 topic 的分区数 num.partitions=1

# The default number of log partitions per topic. More partitions allow greater
num.partitions=1

2.2 生产者端

SpringCloudStream的配置中可以看到,生产者可以指定分区数,默认1:

spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n

【说明】:当分区功能开启时,使用该参数来配置消息数据的分区数。

如果消息生产者已经配置了分区键的生成策略,那么它的值必须大于1。

2.3 消费者端

SpringCloudStream 允许通过配置,使得消费者能够自动创建分区。

#输入通道消费者的并发数,默认1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2

若想以上配置生效,还需添加如下通用配置:

#Kafka绑定器允许在需要的时候自动创建分区。默认false
spring.cloud.stream.kafka.binder.autoAddPartitions=true

消费者端如此配置以后,将表现为一个消费者服务或进程中,会有2个线程各自消费1个分区,即2个消费者线程同时消费。

以下是该配置的效果验证步骤:

消费者代码:

1个 @StreamListener 消费自己的 topic 或自己的输出channel:

@EnableBinding(SpiderSink.class)
@Slf4j
public class SpiderSinkReceiver {
 
    @Autowired
    private SpiderMessageService spiderMessageService;
 
    @StreamListener(SpiderSink.INPUT)
    public void receive(Object payload) {
        log.info("SPIDER-SINK received: {}", payload);
    }
}

方式一:通过日志验证:

通过在 log4j 日志中,打印线程名称的方式,验证 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置确确实实会新增1个消费者线程。

[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
[INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)

方式二:直接查看分区数来验证:

另外,也可在启动一个生产者服务时,等待自动创建一个新 topic 后(此时默认分区数为1),比如我们创建的 topic 为“topic-spider-dev”,此时通过kafka命令查看分区数,此时分区数为1:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1

然后,配置消费者服务的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,启动一个消费者服务,再次查看分区数,已经变为2了:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-spider-dev Partition: 1    Leader: 2       Replicas: 2     Isr: 2

同时查看消费者端的应用日志,看到2个消费者线程各自分配了一个分区:

[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-0]
[INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-1]

最终,确确实实地验证了 concurrency 配置对消费者线程数和分区数的影响。

2.4 其他因素

比如,SpringCloudStream 中 Kafka 绑定器的配置中,也有一个相关的影响因素:

#最小分区数,默认1
spring.cloud.stream.kafka.binder.minPartitionCount=n

【说明】:该参数仅在设置了 autoCreateTopics 和 autoAddPartitions 时生效,用来设置该绑定器所使用主题的全局分区最小数量。

如果当生产者的 partitionCount 参数或 instanceCount * concurrency 设置大于该参数配置时,该参数值将被覆盖。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringCloud配置中心Config过程解析

    SpringCloud配置中心Config过程解析

    这篇文章主要介绍了SpringCloud配置中心Config过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • 解决jackson反序列化失败InvalidFormatException:Can not deserialize value of type java.util.Date

    解决jackson反序列化失败InvalidFormatException:Can not dese

    这篇文章主要介绍了解决jackson反序列化失败InvalidFormatException:Can not deserialize value of type java.util.Date问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • String与XML互转以及从XML取节点值并修改的方法

    String与XML互转以及从XML取节点值并修改的方法

    今天小编就为大家分享一篇String与XML互转以及从XML取节点值并修改的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • springboot结合全局异常处理实现登录注册验证

    springboot结合全局异常处理实现登录注册验证

    这篇文章主要介绍了springboot结合全局异常处理实现登录注册验证,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-05-05
  • SpringBoot自定义配置项过程

    SpringBoot自定义配置项过程

    在SpringBoot项目中,通过在application.properties文件中添加配置项,然后使用@ConfigurationProperties注解将这些配置项与实体Bean进行绑定,可以实现配置项与实体类字段的自动关联,进而方便地读取配置文件中的数据,这种方法不仅简化了配置管理
    2024-11-11
  • java排序算法图文详解

    java排序算法图文详解

    这篇文章主要为大家详细介绍了Java经典排序算法之归并排序,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望可以对你有所帮助
    2021-07-07
  • java向上转型与向下转型详解

    java向上转型与向下转型详解

    这篇文章主要为大家详细介绍了java向上转型与向下转型,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • mybatis框架之mybatis中dao层开发的两种方法

    mybatis框架之mybatis中dao层开发的两种方法

    这篇文章主要介绍了mybatis框架之mybatis中dao层开发的两种方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Java中的反射,枚举及lambda表达式的使用详解

    Java中的反射,枚举及lambda表达式的使用详解

    这篇文章主要为大家详细介绍了Java的反射,枚举及lambda表达式,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • 带你重新认识MyBatis的foreach

    带你重新认识MyBatis的foreach

    这篇文章主要介绍了重新认识MyBatis的foreach,本文提出了一种简化<foreach>写法的设想,更重要的是通过解决空集时生成的SQL语法问题,更深刻地理解MyBatis的foreach的生成机制,需要的朋友可以参考下
    2022-11-11

最新评论