SpringBoot集成XXL-JOB实现灵活控制的分片处理方案

 更新时间:2024年09月26日 11:39:21   作者:码到三十五  
因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理,所以本文给大家介绍了SpringBoot集成XXL-JOB实现灵活控制的分片处理方案,需要的朋友可以参考下

场景

一个应用需要支持大量数据的批处理任务,要求:

  • 并行处理能力:应用需能够同时处理多个数据块,即实现并行处理。
  • 灵活的并发控制:可以灵活调整并行处理的任务数量,以确保资源利用最大化且不过载。
  • 均衡负载分配:应将任务均匀分配到不同的服务器节点上,以平衡各节点的负载,避免单点压力过大。

解决思路

因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。那XXL-JOB 的路由策略「分片广播 & 动态分片」很贴合这种场景」来调度定时任务;

实现DEMO

SpringBoot环境下,我们集成xxl-job来实现上述方案。
SpringBoot如何集成xxl-job查看官网即可,这里不再叙述,下面看下分片调度的代码:

1.xxl-job调度管理页面配置分片调度任务

路由策略选择: 分片广播

2. 编写task代码:

要获取分片总数和当前分片序号,作为参数传给sql语句:

	@Resource
    private OrderDataMapper orderDataMapper;
	
    @XxlJob("orderDataStatusTask")
    public void orderDataStatusTask() {
		// 计时器
        Stopwatch timer = Stopwatch.createStarted();
        
		// 获取xxl-job的localThread中的总的分片数和当前分片
		OrderDataParam param = new OrderDataParam();
        param.setShardIndex(XxlJobHelper.getShardIndex());
        param.setShardTotal(XxlJobHelper.getShardTotal());
        // 其他参数设置,略了....
        
		// 根据分片数拉取当前分片的数据
        List<OrderData> orderDataList = orderDataMapper.getInitStatusOrder(param);
        XxlJobHelper.log("获取待处理订单数据:分片号={},数据量={},总分片数={}", XxlJobHelper.getShardIndex(), orderDataList.size(), XxlJobHelper.getShardTotal());
        if (CollUtil.isEmpty(orderDataList)) {
            return;
        }
        // 处理逻辑,略了....
        
        XxlJobHelper.log("当前分片({})处理完成,耗时={}秒", XxlJobHelper.getShardIndex(), timer.stop().elapsed(TimeUnit.SECONDS));
    }

这里服务启动了4个实例,总分片数ShardTotal就是4,每个实例的ShardIndex分别是0,1,2,3

3. mybatis中编写sql语句

根据分片总数和当前分片数据对Id哈希取模, 这里做了两次hash,主要作用是用id最后一位hash方便直接看出数据被哪个分片调度了。

	// 获取未处理的订单数据
	// 根据id末位数取hash后分片拉取
	<select id="getInitStatusOrder" parameterType="com.xxx.OrderDataParam"
            resultType="com.xxx.OrderData">
			select id,order_no,customer_code,
            from tt_order_data t
            where t.status = 0
				  and t.fail_count <![CDATA[ < ]]> #{retryCount}
				  and t.update_time <![CDATA[ >= ]]> #{lastUpdateTime}
				  and mod(mod(t.id, 10) , #{shardTotal}) = #{shardIndex}
			limit 0,200	
	</select>	

4.最后看下调度日志

同一次调度任务,4个实例个调度一次,并且拉取到各自部分的数据进行处理:

第3个实例的调度日志:

	    2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144] 
		----------- xxl-job job execute start -----------
		----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
		2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=3,数据量=100,总分片数=4
		2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(3)处理完成,耗时=1秒
		2024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144] 
		----------- xxl-job job execute end(finish) -----------
		----------- Result: handleCode=200, handleMsg = null
		2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.

第4个实例的调度日志:

	    2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144] 
		----------- xxl-job job execute start -----------
		----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
		2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=4,数据量=80,总分片数=4
		2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(4)处理完成,耗时=1秒
		2024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144] 
		----------- xxl-job job execute end(finish) -----------
		----------- Result: handleCode=200, handleMsg = null
		2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.

到此这篇关于SpringBoot集成XXL-JOB实现灵活控制的分片处理方案的文章就介绍到这了,更多相关SpringBoot XXL-JOB分片处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot使用MockMvc进行Web集成测试的示例详解

    SpringBoot使用MockMvc进行Web集成测试的示例详解

    MockMvc 是一个测试框架,可以模拟 HTTP 请求和响应,在本文中,我们将介绍如何使用MockMvc进行Web集成测试,以及如何编写测试用例来测试Spring MVC控制器,希望对大家有所帮助
    2023-06-06
  • java基础--自己动手实现一个LRU

    java基础--自己动手实现一个LRU

    这篇文章主要介绍了运用方案如何实现LUR,文章中通过代码讲解的非常详细,对大家的工作或学习有一定的参考价值,感兴趣的朋友可以参考一下
    2021-08-08
  • 使用Spark SQL实现读取不带表头的txt文件

    使用Spark SQL实现读取不带表头的txt文件

    这篇文章主要为大家详细介绍了如何使用Spark SQL实现读取不带表头的txt文件,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2024-03-03
  • Java 改造ayui表格组件实现多重排序

    Java 改造ayui表格组件实现多重排序

    layui 的表格组件目前只支持单列排序,在实际应用中并不能很好的支撑我们的业务需求。今天一时手痒,决定改造一番以支持多重排序。
    2021-04-04
  • Java自动化设置PDF文档属性的示例代码

    Java自动化设置PDF文档属性的示例代码

    在现代开发中,PDF 文件已经成为广泛使用的文档格式,本文将展示如何使用 Java 设置 PDF 文档的属性,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下
    2026-01-01
  • Java利用递归实现树形结构的工具类

    Java利用递归实现树形结构的工具类

    有时候,我们的数据是带有层级的,比如常见的省市区三级联动,就是一层套着一层。而我们在数据库存放数据的时候,往往是列表形式的,这个时候可能就需要递归处理为树形结构了。本文就为大家介绍了Java利用递归实现树形结构的工具类,希望对大家有所帮助
    2023-03-03
  • Java的静态方法Arrays.asList()使用指南

    Java的静态方法Arrays.asList()使用指南

    Arrays.asList() 是一个 Java 的静态方法,它可以把一个数组或者多个参数转换成一个 List 集合,这个方法可以作为数组和集合之间的桥梁,方便我们使用集合的一些方法和特性,本文将介绍 Arrays.asList() 的语法、应用场景、坑点和总结
    2023-09-09
  • IDEA中设置代码自动提示为Alt+/的具体做法

    IDEA中设置代码自动提示为Alt+/的具体做法

    很多公司都强制性要求使用Intellij IDEA,其实Intellij IDEA也确实很好用,但是一下子从Eclipse跳转到Intellij IDEA转也是需要一段时间的,为了迎合之前的习惯,就需要在Intellij IDEA中改变一些设置,如代码自动生成,本文给大家分享设置方法,感兴趣的朋友一起看看吧
    2023-01-01
  • Spring之SseEmitter实现让你的进度条实时更新

    Spring之SseEmitter实现让你的进度条实时更新

    Spring SseEmitter是一种实现服务器端推送事件(SSE)的机制,支持单向通信,适用于实时数据传输需求,通过代码示例和应用场景分析,展示了如何在服务端和客户端使用SseEmitter进行实时数据推送
    2025-02-02
  • 详解Mybatis中javaType和ofType的区别

    详解Mybatis中javaType和ofType的区别

    本文主要介绍了详解Mybatis中javaType和ofType的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-05-05

最新评论