Java实现CompletionService并发编排消费任务

 更新时间:2026年05月18日 08:40:54   作者:NigulasiLiu  
RocketMQ批量拉取消息,消费端一条一条串行处理导致耗时较高,为了解决这个问题,文章提出使用CompletionService来实现并发处理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

假如出现了这种情况:RocketMQ 批量拉取了 128 条消息,但消费端是一条一条串行处理的。128 条消息,每条 50 毫秒,一轮消费就要 6.4 秒。

可以直接用批量消费吗?

串行消费的瓶颈

RocketMQ 的 setConsumeMessageBatchMaxSize(128) 让你一次拉 128 条消息过来,但如果你还是逐条同步处理,那批量拉取的意义就只剩「减少网络往返」了。处理速度的瓶颈,一条都没解开。

直觉上的解法很简单,交给线程池并发执行嘛。但这里藏着一个很容易忽略的问题。

如果异步线程执行失败了,RocketMQ 的 Broker 是不知道的。主线程已经返回了 CONSUME_SUCCESS,Broker 提交了偏移量,那条失败的消息就丢了。或者更常见的情况,主线程根本不知道哪些子线程成功了、哪些失败了,只能盲目地返回成功或重试。

我们需要一个办法,在主线程中感知每一个子线程的执行结果。全部成功才返回 CONSUME_SUCCESS,有一条失败就返回 RECONSUME_LATER 让 RocketMQ 整体重发。

这不是「能不能并发」的问题,是「并发了能不能控」的问题。

CompletionService,先完成先取结果的编排器

Java 并发包里有一个接口叫 CompletionService,位于 java.util.concurrent,专门干这件事,批量提交异步任务,按完成顺序逐个取结果。

它的唯一实现类是 ExecutorCompletionService,用法就三步。

  1. 提交所有任务到线程池。
  2. 循环取结果,发现失败立即标记。
  3. 全部成功返回 CONSUME_SUCCESS,否则返回 RECONSUME_LATER

代码逻辑如下。

@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
    consumer.setPullInterval(1000);
    consumer.setConsumeMessageBatchMaxSize(128);
    consumer.setPullBatchSize(64);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        log.info("NewBuyBatchMsgListener receive message size: {}", msgs.size());

        CompletionService<Boolean> completionService = new ExecutorCompletionService<>(executor);
        List<Future<Boolean>> futures = new ArrayList<>();

        // 1. 提交所有任务
        msgs.forEach(messageExt -> {
            Callable<Boolean> task = () -> {
                try {
                    OrderCreateRequest orderCreateRequest = JSON.parseObject(JSON.parseObject(messageExt.getBody()).getString("body"), OrderCreateRequest.class);
                    return doNewBuyExecute(orderCreateRequest);
                } catch (Exception e) {
                    log.error("Task failed", e);
                    return false; // 标记失败
                }
            };
            futures.add(completionService.submit(task));
        });

        // 2. 检查结果
        boolean allSuccess = true;
        try {
            for (int i = 0; i < msgs.size(); i++) {
                Future<Boolean> future = completionService.take();
                if (!future.get()) { // 3. 发现一个失败立即终止
                    allSuccess = false;
                    break;
                }
            }
        } catch (Exception e) {
            allSuccess = false;
        }

        // 3. 根据结果返回消费状态
        return allSuccess ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                : ConsumeConcurrentlyStatus.RECONSUME_LATER;
    });
}

128 条消息并发处理,总耗时从 6.4 秒降到最慢那条的耗时,通常几十毫秒就够了。

那么,「CompletionService 底层是怎么做到先完成先取的?」

底层原理,BlockingQueue + QueueingFuture

ExecutorCompletionService 的源码非常精炼,核心就三个成员变量。

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

executor 是你传入的线程池,completionQueue 是一个 LinkedBlockingQueue,用来存放已完成任务的 Future 对象。

关键在 submit 方法里。当你调用 completionService.submit(task) 时,它并没有直接把 task 丢给线程池,而是先包装了一层。

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

QueueingFuture 继承自 FutureTask,重写了 done() 方法。done()FutureTask 提供的钩子,任务无论正常完成还是异常终止,都会回调这个方法。

所以整个流程是这样的。你 submit 一个任务,它被包装成 QueueingFuture 交给线程池执行。任务跑完的那一刻,done() 触发,把对应的 Future 塞进 completionQueue。你调 take(),就是从 completionQueue 里阻塞地取一个出来。

谁先完成,谁的 Future 先入队,你就先取到谁的结果。提交顺序和完成顺序解耦了。

我觉得这个设计很漂亮。它没有用任何锁排序、没有用优先队列、没有用回调链,就是最朴素的「生产者往队列里放,消费者从队列里取」。BlockingQueue 天然线程安全,生产消费解耦,简单到几乎不可能出错。

那 CompletableFuture 呢?

Java 8 引入了 CompletableFuture,同样是处理异步任务的利器。它和 CompletionService 解决的问题有重叠,但设计哲学完全不同。

维度CompletionServiceCompletableFuture
引入版本Java 5Java 8
核心机制BlockingQueue,先完成先取回调链,任务间可编排依赖
结果获取take() 阻塞等待下一个完成thenApply() / thenCompose() 非阻塞回调
任务关系批量独立任务,互不依赖可描述 A 完成后执行 B、A 和 B 都完成后执行 C
异常处理在 Future.get() 时抛 ExecutionExceptionexceptionally() / handle() 流式处理
适用场景批量同构任务,只关心结果是否全部成功异步流程编排,任务间有依赖和组合关系

一句话总结,CompletionService 是「批量并发,按完成顺序收结果」,CompletableFuture 是「异步编排,按依赖关系串流程」。

回到我们这个 RocketMQ 批量消费的场景,128 条消息之间没有任何依赖关系,我们只关心「全部成功还是有一个失败」。这就是 CompletionService 的主场。

如果你用 CompletableFuture 来写,也能做,但你需要自己维护一个 CompletableFuture.allOf() 来等全部完成,然后再遍历检查结果。代码更啰嗦,而且 allOf 会等所有任务都完成才能继续,哪怕第 2 条消息就失败了,你也要等剩下 126 条跑完才能返回。CompletionServicetake() 则是逐个检查,发现失败立即 break,省下了不必要的等待。

当然,如果你的场景是「查商品信息,再根据商品查库存和价格,最后组装结果」,任务之间有明确的先后依赖,那 CompletableFuture 的链式编排就比 CompletionService 的队列取值优雅得多。

工具没有好坏,只有合不合适。

并发不是目的,可控才是

串行消费慢,直觉反应是加并发。但加了并发之后,如果主线程无法感知子线程的成败,那并发就不是加速,是埋雷。消息丢了都不知道。

CompletionService 解决的不是「怎么并发」的问题,而是「并发了怎么收场」的问题。它用最朴素的 BlockingQueue 机制,让主线程能按完成顺序逐个检查结果,发现异常立即止损。

我觉得并发编程最难的部分从来不是「怎么让任务跑起来」,而是「跑起来之后怎么确保结果可控」。CompletionService 给了一个很干净的答案。

到此这篇关于Java实现CompletionService并发编排消费任务的文章就介绍到这了,更多相关Java CompletionService并发编排内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 关于垃圾回收的三色标记算法的使用解读

    关于垃圾回收的三色标记算法的使用解读

    这篇文章主要介绍了关于垃圾回收的三色标记算法的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-05-05
  • Java中关于OAuth2.0的原理分析

    Java中关于OAuth2.0的原理分析

    这篇文章主要介绍了Java中关于OAuth2.0的原理分析,OAuth是一个关于授权的开放网络标准,允许用户授权第三 方应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或分享他们数据的所有内容,需要的朋友可以参考下
    2023-09-09
  • 一文带你入门JDK8新特性——Lambda表达式

    一文带你入门JDK8新特性——Lambda表达式

    这篇文章主要介绍了JDK8新特性——Lambda表达式的相关资料,帮助大家更好的理解和学习JAVA开发,感兴趣的朋友可以了解下
    2020-08-08
  • java断点续传功能实例(java获取远程文件)

    java断点续传功能实例(java获取远程文件)

    本文介绍了一种利用 Java 来实现断点续传的方法。
    2013-12-12
  • Springboot集成kafka高级应用实战分享

    Springboot集成kafka高级应用实战分享

    这篇文章主要介绍了Springboot集成kafka高级应用实战分享,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-08-08
  • 基于Java编写串口通信工具

    基于Java编写串口通信工具

    这篇文章主要为大家详细介绍了基于Java编写的一个带有图形界面的简单串口通信工具,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-12-12
  • Java基于二分搜索树、链表的实现的集合Set复杂度分析实例详解

    Java基于二分搜索树、链表的实现的集合Set复杂度分析实例详解

    这篇文章主要介绍了Java基于二分搜索树、链表的实现的集合Set复杂度分析,结合实例形式详细分析了Java基于二分搜索树、链表的实现的集合Set复杂度分析相关操作技巧与注意事项,需要的朋友可以参考下
    2020-03-03
  • springboot简单接入websocket的操作方法

    springboot简单接入websocket的操作方法

    这篇文章主要介绍了springboot简单接入websocket的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-05-05
  • springBoot静态资源加载不到,并且配置了也不生效问题及解决

    springBoot静态资源加载不到,并且配置了也不生效问题及解决

    这篇文章总结了一个在Spring Boot 2.6.x版本中,由于路径匹配策略改变导致静态资源无法加载的问题,并提供了解决方案:通过配置类或在配置文件中设置路径匹配策略为AntPathMatcher,或者直接降级Spring Boot版本
    2025-02-02
  • SpringBoot中application.properties、application.yaml、application.yml区别

    SpringBoot中application.properties、application.yaml、applicati

    本文主要介绍了SpringBoot中application.properties、application.yaml、application.yml区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-04-04

最新评论