java如何多线程批量更新10万级的数据

 更新时间:2024年10月12日 08:42:30   作者:carter171717  
在处理大数据量的批量更新时,直接使用mybatis的updateBatch可能导致效率低下甚至OOM,通过每次处理5000条数据的方式虽然安全但效率低,更优的解决方案是使用多线程处理,将数据分批并多线程执行,有效提高了处理速度并保证了系统稳定性

java多线程批量更新10万级的数据

好久没有写文章,今天刚好没啥事,就动手记录一下,好记性不如烂笔头!言归正传,我最近接到的一个工作任务大概内容是,有一张数据量在十万+级别的表,需要新增一个字段,并且要写入初始化值。

业务其实非常的简单,全部查询出来一个列表,然后用mybatis的updateBatch批量更新,其实在我的实践过程中也没什么问题,但是执行的效率是很低的,而且一旦数据量过大,如果机器配置不太行的话,很可能会直接OOM,如果在正式环境出现这个问题,那完犊子,准备删库跑路!

所以呢,我就想了一个比较保险但是比较低级的办法,每次查询出5000条数据,去做批量更新,确保内存不会溢出导致服务崩盘,这当然也是可以解决问题,但是就是修复数据需要执行很多次,显得比较愚蠢一点。

如何用比较方便并高效的方式来修复大数量的数据

第一反应肯定是多线程啦,方案是:

  • 1.查询出全部的数据(10万条)
  • 2.对数据进行分批,每批5000条,
  • 3.多线程同时处理多批数据
  • 4.等待执行完成,返回成功

直接上核心代码

写一个通用的分批工具类,把一个List集合,拆分成多个小的List集合

/**
 * 拆分集合
 *
 * @param <T> 泛型对象
 * @param resList 需要拆分的集合
 * @param subListLength 每个子集合的元素个数
 * @return 返回拆分后的各个集合组成的列表
 **/
public static <T> List<List<T>> splitList(List<T> resList, int subListLength) {
    if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
        return new ArrayList<>();
    }
    List<List<T>> ret = new ArrayList<>();
    int size = resList.size();
    if (size <= subListLength) {
        // 数据量不足 subListLength 指定的大小
        ret.add(resList);
    } else {
        int pre = size / subListLength;
        int last = size % subListLength;
        // 前面pre个集合,每个大小都是 subListLength 个元素
        for (int i = 0; i < pre; i++) {
            List<T> itemList = new ArrayList<>(subListLength);
            for (int j = 0; j < subListLength; j++) {
                itemList.add(resList.get(i * subListLength + j));
            }
            ret.add(itemList);
        }

        // last的进行处理
        if (last > 0) {
            List<T> itemList = new ArrayList<>(last);
            for (int i = 0; i < last; i++) {
                itemList.add(resList.get(pre * subListLength + i));
            }
            ret.add(itemList);
        }
    }
    return ret;
}

然后就是用多线程业务处理了,代码如下

 public static void doThreadBusiness(List<String> totalList) {
        Long startTime = System.currentTimeMillis();
        System.out.println("本次更新任务开始");
        System.out.println("本机CPU核心数:"+Runtime.getRuntime().availableProcessors());
        List<String> updateList = new ArrayList();
        // 初始化线程池, 参数一定要一定要一定要调好!!!!
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 50,
                4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardPolicy());
        // 大集合拆分成N个小集合,然后用多线程去处理数据,确保不会因为数据量过大导致执行过慢
        List<List<String>> splitNList = SplitListUtils.splitList(totalList, 5000);
        // 记录单个任务的执行次数
        CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
        // 对拆分的集合进行批量处理, 先拆分的集合, 再多线程执行
        for (List<String> singleList : splitNList) {
            // 线程池执行
            threadPool.execute(new Thread(new Runnable(){
                @Override
                public void run() {
                    //模拟执行时间
                    System.out.println("当前线程:"+Thread.currentThread().getName());
                    List<String> batchUpdateVipList = new ArrayList<>();
                    for (String str : singleList) {
                        //组装要执行的批量更新数据
                        batchUpdateVipList.add(str);
                    }
                    // 这里模拟执行数据库批量更新操作
                    System.out.println("本次批量更新数据量:"+ batchUpdateVipList.size());
                    // 任务个数 - 1, 直至为0时唤醒await()
                    countDownLatch.countDown();
                }
            }));
        }
        try {
            // 让当前线程处于阻塞状态,直到锁存器计数为零
            countDownLatch.await();
        } catch (Exception e) {
            System.out.println("系统出现异常");
        }
        Long endTime = System.currentTimeMillis();
        Long useTime = endTime - startTime;
        System.out.println("本次更新任务结束,共计用时"+useTime+"毫秒");
    }

代码很 简单一看就懂了,这里要说一下CountDownLatch的使用,其实开发中并不常用,但是面试却很常用,这里蛮写一下,我使用CountDownLatch来阻塞主线程,等待多线程执行完毕后,再继续主线程,返回更新的结果,这个场景其实很经常使用到。

如果不用CountDownLatch,主线程会马上返回,如果是数据量大的情况下,往往会执行蛮久的,但是结果秒返回,就会给人一种错觉。

CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());

在线程执行完毕后,需要调用一下countDown,

// 任务个数 - 1, 直至为0时唤醒await()
countDownLatch.countDown();

在主线程用await()进行阻塞等待,这样主线程就会一直等到所有的子线程都执行完成了,继续执行主线程的后续代码

 countDownLatch.await();

其实这里面还有一个非常重要的面试点,就是多线程的七大参数如何设置,

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 
50,
4, 
TimeUnit.SECONDS, 
new ArrayBlockingQueue(10), 
new ThreadPoolExecutor.DiscardPolicy()
);

有兴趣了解具体是设置方法可以另行查询资料,这也是面试必问的考点。

最后贴一下返回的打印结果吧,如下图所示:

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java开源区块链jdchain入门

    java开源区块链jdchain入门

    这篇文章主要介绍了java开源区块链jdchain入门,文中为大家讲解了关于部署及组件遇到的一些问题,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-02-02
  • Idea插件StopCoding的安装使用教程

    Idea插件StopCoding的安装使用教程

    这篇文章主要介绍了Idea插件StopCoding的安装使用教程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • Java源码解析之TypeVariable详解

    Java源码解析之TypeVariable详解

    这篇文章主要介绍了Java源码解析之TypeVariable详解,具有一定参考价值,需要的朋友可以了解下。
    2017-10-10
  • JAVA中Context的详细介绍和实例分析

    JAVA中Context的详细介绍和实例分析

    这篇文章主要介绍了JAVA中Context的详细介绍和实例分析,Context是维持android各组件能够正常工作的一个核心功能类。如果感兴趣来学习一下
    2020-07-07
  • java foreach循环为什么不能赋值的讲解

    java foreach循环为什么不能赋值的讲解

    这篇文章主要介绍了java foreach循环为什么不能赋值的讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 工厂方法在Spring框架中的运用

    工厂方法在Spring框架中的运用

    这篇文章介绍了工厂方法在Spring框架中的运用,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-10-10
  • Spring Data JPA 关键字Exists的用法说明

    Spring Data JPA 关键字Exists的用法说明

    这篇文章主要介绍了Spring Data JPA 关键字Exists的用法说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • springboot中shiro使用自定义注解屏蔽接口鉴权实现

    springboot中shiro使用自定义注解屏蔽接口鉴权实现

    本文主要介绍了springboot中shiro使用自定义注解屏蔽接口鉴权实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • Java lambda表达式与泛型整理总结

    Java lambda表达式与泛型整理总结

    Lambda 表达式(lambda expression)是一个匿名函数,Lambda表达式基于数学中的λ演算得名。泛型编程,故如其名,是一个泛化的编程方式。其实现原理为程序员编写一个函数/类的代码示例,让编译器去填补出不同的函数实现
    2022-07-07
  • Java编写简单猜数游戏

    Java编写简单猜数游戏

    这篇文章主要为大家详细介绍了Java编写简单猜数游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-01-01

最新评论