结合线程池实现apache kafka消费者组的误区及解决方法

 更新时间:2022年07月07日 11:03:27   作者:字母哥哥  
这篇文章主要介绍了结合线程池实现apache kafka消费者组的误区及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

一个错误:多线程使用单一消费者

下图显现了一种错误的使用KafkaConsumer的方法

  • 创建多个线程用来消费kafka数据
  • 多线程使用同一个KafkaConsumer对象
  • 在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。

在这里插入图片描述

这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。

一个误区:多线程就是消费者组

下图中体现的是一种正常的KafkaConsumer使用方式

  • 使用一个KafkaConsumer拉取数据
  • 拉取数据后将一个批次的数据交给一个线程去处理

在这里插入图片描述

这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。

常规正确做法:使用线程池实现消费者组

下面的方法是常规的正确实现方式

在这里插入图片描述

  • 因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
  • 每个线程持有一个KafkaConsumer对象
  • 多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此这篇关于结合线程池实现apache kafka消费者组的文章就介绍到这了,更多相关apache kafka消费者组内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot事务不回滚的解决方案

    SpringBoot事务不回滚的解决方案

    这篇文章主要介绍了SpringBoot事务不回滚的解决方案的相关资料,需要的朋友可以参考下
    2022-09-09
  • ES结合java代码聚合查询详细示例

    ES结合java代码聚合查询详细示例

    es查询有一个很常用的一种叫聚合查询,相当于mysql中的分组group by 后拿各组数量进行统计,实现起来也是很简单,下面这篇文章主要给大家介绍了关于ES结合java代码聚合查询的相关资料,需要的朋友可以参考下
    2023-05-05
  • c语言来实现贪心算法之装箱问题

    c语言来实现贪心算法之装箱问题

    这篇文章主要介绍了c语言来实现贪心算法之装箱问题,需要的朋友可以参考下
    2015-03-03
  • Java语言实现数据结构栈代码详解

    Java语言实现数据结构栈代码详解

    这篇文章主要介绍了Java语言实现数据结构栈代码详解,简单介绍了栈的概念,然后分享了线性栈和链式栈的Java代码,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • ireport数据表格报表的简单使用

    ireport数据表格报表的简单使用

    这篇文章给大家介绍了如何画一个报表模板,这里介绍下画表格需要用到的组件,文中通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-10-10
  • Java 实现分布式服务的调用链跟踪

    Java 实现分布式服务的调用链跟踪

    分布式服务中完成某一个业务动作,需要服务之间的相互协作才能完成,在这一次动作引起的多服务的联动我们需要用1个唯一标识关联起来,关联起来就是调用链的跟踪。本文介绍了Java 实现分布式服务的调用链跟踪的步骤
    2021-06-06
  • mybatis中返回多个map结果问题

    mybatis中返回多个map结果问题

    这篇文章主要介绍了mybatis中返回多个map结果问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • maven私服搭建与使用教程

    maven私服搭建与使用教程

    在使用maven进行Java项目的开发过程中,难免会有些公共的私有库,这些库是不太方便放到中央仓库的,可以通过Nexus搭建一个私有仓库,这篇文章主要介绍了maven私服搭建与使用,需要的朋友可以参考下
    2023-03-03
  • Java current并发包超详细分析

    Java current并发包超详细分析

    current并发包、在JDK1.5之前Java并没有提供线程安全的一些工具类去操作多线程,需要开发人员自行编写实现线程安全,但仍然无法完全避免低性能、死锁、资源管理等问题。在JDK1.5时新增了java.util.current并发包,其中提供了许多供我们使用的并发编程工具类
    2023-02-02
  • IDEA解决src和resource下创建多级目录的操作

    IDEA解决src和resource下创建多级目录的操作

    这篇文章主要介绍了IDEA解决src和resource下创建多级目录的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02

最新评论