SpringBoot线程池ThreadPoolTaskExecutor异步处理百万级数据

 更新时间:2024年03月13日 11:03:53   作者:princeAladdin  
本文主要介绍了SpringBoot线程池ThreadPoolTaskExecutor异步处理百万级数据,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、背景:

利用ThreadPoolTaskExecutor多线程异步批量插入,提高百万级数据插入效率。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。

二、ThreadPoolTaskExecutor异步处理

2.1、配置application.yml

异步线程配置 自定义使用参数

async:
  executor:
    thread:
      core_pool_size:  10  # 配置核心线程数 默认8个 核数*2+2
      max_pool_size:  100   # 配置最大线程数
      queue_capacity:  99988  # 配置队列大小
      keep_alive_seconds:  20  #设置线程空闲等待时间秒s
      name:
        prefix: async-thread-  # 配置线程池中的线程的名称前缀

2.2、ThreadPoolConfig配置注入Bean

package com.wonders.common.config;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO:利用ThreadPoolTaskExecutor多线程批量执行相关配置
 * 自定义线程池
 * 发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。
 */
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
    //自定义使用参数
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;   //配置核心线程数
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;    //配置最大线程数
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

    //1、自定义asyncServiceExecutor线程池
    @Bean(name = "asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor......");
        //在这里修改
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //设置线程空闲等待时间 s
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //配置队列大小 设置任务等待队列的大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        //设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
    /**
     * 2、公共线程池,利用系统availableProcessors线程数量进行计算
     */
    @Bean(name = "commonThreadPoolTaskExecutor")
    public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        pool.setCorePoolSize(corePoolSize); // 核心池大小
        pool.setMaxPoolSize(maxPoolSize); // 最大线程数
        pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
        pool.setThreadPriority(Thread.MAX_PRIORITY);
        pool.setDaemon(false);
        pool.setKeepAliveSeconds(300);// 线程空闲时间
        return pool;
    }
   //3自定义defaultThreadPoolExecutor线程池
    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
        int maxNumPool=Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(3,
                maxNumPool,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                //置线程名前缀,例如设置前缀为hutool-thread-,则线程名为hutool-thread-1之类。
                new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }

}

2.3、创建异步线程,业务类

 //1、自定义asyncServiceExecutor线程池
    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<Student> students,
                             StudentService studentService,
                             CountDownLatch countDownLatch) {
        try{
            log.info("start executeAsync");
            //异步线程要做的事情
            studentService.saveBatch(students);
            log.info("end executeAsync");
        }finally {
            countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
        }
    }

2.4、拆分集合工具类

package com.wonders.threads;

import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * @Description: TODO:拆分工具类
 * 1、获取需要进行批量更新的大集合A,对大集合进行拆分操作,分成N个小集合A-1 ~ A-N;
 * 2、开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作;
 * 3、对流程进行控制,控制线程执行顺序。按照指定大小拆分集合的工具类
 */
public class SplitListUtils {
    /**
     * 功能描述:拆分集合
     * @param <T> 泛型对象
     * @MethodName: split
     * @MethodParam: [resList:需要拆分的集合, subListLength:每个子集合的元素个数]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表
     * 代码里面用到了guava和common的结合工具类
     */
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
            return Lists.newArrayList();
        }
        List<List<T>> ret = Lists.newArrayList();
        int size = resList.size();
        if (size <= subListLength) {
            // 数据量不足 subListLength 指定的大小
            ret.add(resList);
        } else {
            int pre = size / subListLength;
            int last = size % subListLength;
            // 前面pre个集合,每个大小都是 subListLength 个元素
            for (int i = 0; i < pre; i++) {
                List<T> itemList = Lists.newArrayList();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // last的进行处理
            if (last > 0) {
                List<T> itemList = Lists.newArrayList();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(pre * subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }

    /**
     * 功能描述:方法二:集合切割类,就是把一个大集合切割成多个指定条数的小集合,方便往数据库插入数据
     * 推荐使用
     * @MethodName: pagingList
     * @MethodParam:[resList:需要拆分的集合, subListLength:每个子集合的元素个数]
     * @Return: java.util.List<java.util.List<T>>:返回拆分后的各个集合组成的列表
  
     */
    public static <T> List<List<T>> pagingList(List<T> resList, int pageSize){
        //判断是否为空
        if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
            return Lists.newArrayList();
        }
        int length = resList.size();
        int num = (length+pageSize-1)/pageSize;
        List<List<T>> newList =  new ArrayList<>();
        for(int i=0;i<num;i++){
            int fromIndex = i*pageSize;
            int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
            newList.add(resList.subList(fromIndex,toIndex));
        }
        return newList;
    }

    // 运行测试代码 可以按顺序拆分为11个集合
    public static void main(String[] args) {
        //初始化数据
        List<String> list = Lists.newArrayList();
        int size = 19;
        for (int i = 0; i < size; i++) {
            list.add("hello-" + i);
        }
        // 大集合里面包含多个小集合
        List<List<String>> temps = pagingList(list, 100);
        int j = 0;
        // 对大集合里面的每一个小集合进行操作
        for (List<String> obj : temps) {
            System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj));
        }
    }

}

2.5、造数据,多线程异步插入

public int batchInsertWay() throws Exception {
        log.info("开始批量操作.........");
        Random rand = new Random();
        List<Student> list = new ArrayList<>();
        //造100万条数据
        for (int i = 0; i < 1000003; i++) {
            Student student=new Student();
            student.setStudentName("大明:"+i);
            student.setAddr("上海:"+rand.nextInt(9) * 1000);
            student.setAge(rand.nextInt(1000));
            student.setPhone("134"+rand.nextInt(9) * 1000);
            list.add(student);
        }
        //2、开始多线程异步批量导入
        long startTime = System.currentTimeMillis(); // 开始时间
        //boolean a=studentService.batchInsert(list);
        List<List<Student>> list1=SplitListUtils.pagingList(list,100);  //拆分集合
        CountDownLatch countDownLatch = new CountDownLatch(list1.size());
        for (List<Student> list2 : list1) {
            asyncService.executeAsync(list2,studentService,countDownLatch);
        }
        try {
            countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
            long endTime = System.currentTimeMillis(); //结束时间
            log.info("一共耗时time: " + (endTime - startTime) / 1000 + " s");
            // 这样就可以在下面拿到所有线程执行完的集合结果
        } catch (Exception e) {
            log.error("阻塞异常:"+e.getMessage());
        }
        return list.size();

    }

2.6、测试结果

在这里插入图片描述

结论:对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。

个人推荐配置:

int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));

到此这篇关于SpringBoot线程池ThreadPoolTaskExecutor异步处理百万级数据的文章就介绍到这了,更多相关SpringBoot异步处理百万级数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • Spring Boot中@RequestParam参数的5种情况说明

    Spring Boot中@RequestParam参数的5种情况说明

    这篇文章主要介绍了Spring Boot中@RequestParam参数的5种情况说明,具有很好的参考价值,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Spring中的@ResponseStatus使用

    Spring中的@ResponseStatus使用

    这篇文章主要介绍了Spring中的@ResponseStatus使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • 详解Java中跳跃表的原理和实现

    详解Java中跳跃表的原理和实现

    跳跃表(Skip list)是有序链表的扩展,简称跳表,它在原有的有序链表上增加了多级索引,通过索引来实现快速查找,实质上是一种可以进行二分查找的有序链表。本文主要为大家介绍了跳跃表的原理和实现,需要的可以参考一下
    2022-12-12
  • SpringBoot、mybatis返回树结构的数据实现

    SpringBoot、mybatis返回树结构的数据实现

    本文主要介绍了SpringBoot、mybatis返回树结构的数据实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-04-04
  • SpringMVC实现RESTful风格:@PathVariable注解的使用方式

    SpringMVC实现RESTful风格:@PathVariable注解的使用方式

    这篇文章主要介绍了SpringMVC实现RESTful风格:@PathVariable注解的使用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • 详解springboot启动时是如何加载配置文件application.yml文件

    详解springboot启动时是如何加载配置文件application.yml文件

    这篇文章主要介绍了详解springboot启动时是如何加载配置文件application.yml文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • java web将数据导出为pdf格式文件代码片段

    java web将数据导出为pdf格式文件代码片段

    这篇文章主要为大家详细介绍了java web将数据导出为pdf格式文件代码片段,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-01-01
  • 详解MyBatis XML配置解析

    详解MyBatis XML配置解析

    这篇文章主要介绍了详解MyBatis XML配置解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Java Rabbitmq中四种集群架构的区别详解

    Java Rabbitmq中四种集群架构的区别详解

    这篇文章主要为大家详细介绍了Java Rabbitmq中四种集群架构的区别,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-02-02
  • 使用@SpringBootTest注解进行单元测试

    使用@SpringBootTest注解进行单元测试

    这篇文章主要介绍了使用@SpringBootTest注解进行单元测试,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09

最新评论