java实现请求缓冲合并的示例代码

 更新时间:2024年04月12日 11:23:56   作者:muguazhi  
我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,这篇文章主要介绍了java实现请求缓冲合并,需要的朋友可以参考下

业务背景:

我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:

{
	"createBy": "aa",
	"receiverList": [
		"bb",
		"cc"
	]
}

实际第三方业务会进行多次调用接口,每次传递的数据可能如下:

{
	"createBy": "aa",
	"receiverList": [
		"bb"
	]
}
或者
{
	"createBy": "aa",
	"receiverList": [
		"cc"
	]
}
或者
{
	"createBy": "bb",
	"receiverList": [
		"cc"
	]
}
或者
{
	"createBy": "aa",
	"receiverList": [
		"bb",
		"cc"
	]
}

所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。

代码实现

package com.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 * Description: 请求合并管理类
 */
@Slf4j
@Component
public class RequestMerger {
    // 线程池核心线程数
    private final int corePoolSize = 200;
    // 任务执行超时时间,单位:毫秒
    private final int timeout = 5 * 60 * 1000;
    // 队列,队列长度为Integer.MAX_VALUE
    private final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();
    // 定时器,所有任务共用线程池
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,
            new CustomizableThreadFactory("schedule-executor-"));
    // 是否关闭标志
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    /**
     * 构造函数,用于初始化请求合并器。
     *
     * @param batchSize   每次合并的最大请求数量。
     * @param delayMillis 合并请求的周期间隔,单位为毫秒。
     */
    public RequestMerger(int batchSize, long delayMillis) {
        // 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次
        scheduler.scheduleAtFixedRate(() -> {
            if (!isShutdown.get()) {
                List<String> batch = new ArrayList<>(batchSize);
                int drainedCount = requestQueue.drainTo(batch, batchSize);
                log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());
                if (!batch.isEmpty()) {
                    // 异步执行任务,防止业务执行时间过长导致业务整体延迟过大
                    scheduler.submit(() -> {
                        sendRequestBatch(batch);
                    });
                }
            }
        }, delayMillis, delayMillis, TimeUnit.MILLISECONDS);
    }
    /**
     * 发送请求批次的方法。
     *
     * @param batch 请求批次。
     */
    private void sendRequestBatch(List<String> batch) {
        Future<?> future = scheduler.submit(() -> {
            try {
                // 在这里实现你的请求发送逻辑
                // 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求
                // ...
                System.out.println("Sending batch of " + batch.size() + " requests");
            } catch (Exception e) {
                // 异常处理逻辑
                System.err.println("Error sending requests: " + e.getMessage());
            }
        });
        // 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务
        try {
            // 超时时间,单位:毫秒
            future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException | ExecutionException e) {
            // 超时或执行异常时取消任务
            future.cancel(true);
        } catch (Exception e) {
            log.error("==>任务执行异常", e);
            // 任务执行异常
            future.cancel(true);
        }
    }
    /**
     * 在对象销毁前执行的关闭操作。
     * 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。
     * 无参数和返回值。
     */
    @PreDestroy
    public void shutdown() {
        isShutdown.set(true);
        List<String> batch = new ArrayList<>();
        // 获取请求队列中的剩余所有请求
        int drainedCount = requestQueue.drainTo(batch);
        log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());
        // 批量发送收集到的剩余请求
        sendRequestBatch(batch);
        // 关闭定时执行器
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
                log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted during scheduler termination, force shutting down.");
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    /**
     * 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;
     * 如果服务已关闭,则将该请求作为一批请求发送。
     *
     * @param request 要添加的请求字符串。
     */
    public void addRequest(String request) throws InterruptedException {
        // 检查服务是否已关闭
        if (!isShutdown.get()) {
            // 未关闭,直接添加到请求队列
            requestQueue.put(request);
        } else {
            // 已关闭,将当前请求作为一批发送
            List<String> batch = new ArrayList<>();
            batch.add(request);
            sendRequestBatch(batch);
        }
    }
}

参考资料

https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent

注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。

到此这篇关于java实现请求缓冲合并的文章就介绍到这了,更多相关java请求缓冲合并内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 简介Java编程中的Object类

    简介Java编程中的Object类

    这篇文章主要介绍了简介Java编程中的Object类,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-09-09
  • 详解SpringBoot 快速整合Mybatis(去XML化+注解进阶)

    详解SpringBoot 快速整合Mybatis(去XML化+注解进阶)

    本篇文章主要介绍了详解SpringBoot 快速整合Mybatis(去XML化+注解进阶),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-11-11
  • Java结合Kotlin实现宝宝年龄计算

    Java结合Kotlin实现宝宝年龄计算

    这篇文章主要为大家介绍了Java结合Kotlin实现宝宝年龄计算示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • SpringBoot项目中HTTP请求体只能读一次的解决方案

    SpringBoot项目中HTTP请求体只能读一次的解决方案

    在基于Spring开发Java项目时,可能需要重复读取HTTP请求体中的数据,例如使用拦截器打印入参信息等,但当我们重复调用getInputStream()或者getReader()时,通常会遇到SpringBoot HTTP请求只读一次的问题,本文给出了几种解决方案,需要的朋友可以参考下
    2024-08-08
  • JVM钩子函数的使用场景详解

    JVM钩子函数的使用场景详解

    当jvm进程退出的时候,或者受到了系统的中断信号,hook线程就会启动,一个线程可以注入多个钩,下面这篇文章主要给大家介绍了关于JVM钩子函数使用的相关资料,需要的朋友可以参考下
    2021-08-08
  • Java跨环境部署的完整指南(开发/测试/生产配置隔离)

    Java跨环境部署的完整指南(开发/测试/生产配置隔离)

    在现代软件开发中,一次编写,到处运行的 Java 理念虽然广为人知,但真正实现 跨环境无缝部署 却远非易事,本文将深入探讨如何在 Java 项目中实现 开发(dev)、测试(test)、生产(prod) 等多环境的配置隔离与部署策略,需要的朋友可以参考下
    2026-03-03
  • Java Timer单线程下的定时任务举例详解

    Java Timer单线程下的定时任务举例详解

    在日常的项目开发中,多多少少都会涉及到一些定时任务的需求,下面这篇文章主要介绍了Java Timer单线程下定时任务的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-10-10
  • 详解Java Socket通信封装MIna框架

    详解Java Socket通信封装MIna框架

    Mina异步IO使用的Java底层JNI框架,Mina提供服务端和客户端,将我们的业务解耦开发,真正做到高内聚低耦合的思想。
    2021-06-06
  • Java基础教程之String深度分析

    Java基础教程之String深度分析

    这篇文章主要给大家介绍了关于Java基础教程之String的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-06-06
  • Java 迭代器Iterator完整示例解析

    Java 迭代器Iterator完整示例解析

    迭代器(Iterator)是Java集合框架中的一个核心接口,位于java.util包下,本文给大家讲解Java迭代器Iterator完整示例,感兴趣的朋友跟随小编一起看看吧
    2025-09-09

最新评论