结合线程池实现apache kafka消费者组的误区及解决方法

 更新时间:2022年07月07日 11:03:27   作者:字母哥哥  
这篇文章主要介绍了结合线程池实现apache kafka消费者组的误区及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

一个错误:多线程使用单一消费者

下图显现了一种错误的使用KafkaConsumer的方法

  • 创建多个线程用来消费kafka数据
  • 多线程使用同一个KafkaConsumer对象
  • 在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。

在这里插入图片描述

这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。

一个误区:多线程就是消费者组

下图中体现的是一种正常的KafkaConsumer使用方式

  • 使用一个KafkaConsumer拉取数据
  • 拉取数据后将一个批次的数据交给一个线程去处理

在这里插入图片描述

这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。

常规正确做法:使用线程池实现消费者组

下面的方法是常规的正确实现方式

在这里插入图片描述

  • 因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
  • 每个线程持有一个KafkaConsumer对象
  • 多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此这篇关于结合线程池实现apache kafka消费者组的文章就介绍到这了,更多相关apache kafka消费者组内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java程序运行之JDK,指令javac java解读

    Java程序运行之JDK,指令javac java解读

    这篇文章主要介绍了Java程序运行之JDK,指令javac java,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • AsyncHttpClient IOExceptionFilter异常过滤器

    AsyncHttpClient IOExceptionFilter异常过滤器

    这篇文章主要为大家介绍了AsyncHttpClient IOExceptionFilter异常过滤器代码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • WIN7系统JavaEE(java)环境配置教程(一)

    WIN7系统JavaEE(java)环境配置教程(一)

    这篇文章主要介绍了WIN7系统JavaEE(java+tomcat7+Eclipse)环境配置教程,本文重点在于java配置,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • Spring中配置数据源的几种方式

    Spring中配置数据源的几种方式

    今天小编就为大家分享一篇关于Spring中配置数据源的几种方式,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Java无需解压直接读取ZIP压缩包里的文件及内容

    Java无需解压直接读取ZIP压缩包里的文件及内容

    最近开发的时候遇到要获取到zip压缩包里面的文件内容,解决方案就是通过ZipInputStream来读取,下面通过实例代码介绍Java无需解压直接读取ZIP压缩包里的文件及内容,感兴趣的朋友跟随小编一起看看吧
    2024-03-03
  • Spring boot集成spring session实现session共享的方法

    Spring boot集成spring session实现session共享的方法

    这篇文章主要介绍了Spring boot集成spring session实现session共享的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-06-06
  • Java如何基于poi操作Wold工具类

    Java如何基于poi操作Wold工具类

    这篇文章主要介绍了Java如何基于poi操作Wold工具类,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Java判断List中相同值元素的个数实例

    Java判断List中相同值元素的个数实例

    今天小编就为大家分享一篇Java判断List中相同值元素的个数实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • Java动态脚本Groovy

    Java动态脚本Groovy

    本文介绍了Java动态脚本Groovy,Groovy是用于Java虚拟机的一种敏捷的动态语言,它是一种成熟的面向对象编程语言,既可以用于面向对象编程,又可以用作纯粹的脚本语言。使用该种语言不必编写过多的代码,同时又具有闭包和动态语言中的其他特性,需要的朋友可以参考一下
    2021-12-12
  • 最长重复子数组 findLength示例详解

    最长重复子数组 findLength示例详解

    今天给大家分享一道比较常问的算法面试题,最长重复子数组 findLength,文中给大家分享解题思路,结合示例代码介绍的非常详细,需要的朋友参考下吧
    2023-08-08

最新评论