非常适合新手学生的Java线程池优化升级版

 更新时间:2022年03月30日 17:16:47   作者:摸鱼打酱油  
作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门

升级版线程池的优化

1:新增了4种拒绝策略。分别为:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2:对线程池MyThreadPoolExecutor的构造方法进行优化,增加了参数校验,防止乱传参数现象。

3:这是最重要的一个优化。

  • 移除线程池的线程预热功能。因为线程预热会极大的耗费内存,当我们不用线程池时也会一直在运行状态。
  • 换来的是在调用execute方法添加任务时通过检查workers线程集合目前的大小与corePoolSize的值去比较,再通过new MyWorker()去创建添加线程到线程池,这样好处就是当我们创建线程池如果不使用的话则对当前内存没有一点影响,当使用了才会创建线程并放入线程池中进行复用。

线程池构造器

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }

线程池拒绝策略

策略接口:MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

策略内部实现类

/**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }

封装拒绝方法

    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

execute方法

    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }

可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**
 * 自定义线程池业务接口
 * @author 游政杰
 */
public interface MyExecutorService {

    boolean execute(Runnable runnable);

    void shutdown();

    void shutdownNow();

    boolean isShutdown();

    BlockingQueue<Runnable> getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**
 * 自定义拒绝异常
 */
public class MyRejectedExecutionException extends RuntimeException {

    public MyRejectedExecutionException() {
    }
    public MyRejectedExecutionException(String message) {
        super(message);
    }

    public MyRejectedExecutionException(String message, Throwable cause) {
        super(message, cause);
    }

    public MyRejectedExecutionException(Throwable cause) {
        super(cause);
    }

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**
 * 自定义拒绝策略
 * @author 游政杰
 */
public interface MyRejectedExecutionHandle {

    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

核心类MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 纯手撸线程池框架
 * @author 游政杰
 */
public class MyThreadPoolExecutor implements MyExecutorService{

    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private static volatile int corePoolSize; //核心线程数
    private final HashSet<MyWorker> workers; //工作线程
    private final BlockingQueue<Runnable> waitingQueue; //等待队列
    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private volatile ThreadFactory threadFactory; //线程工厂
    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略
    private volatile MyRejectedExecutionHandle handle; //拒绝紫略

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
    }
    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
        this.workers=new HashSet<>(corePoolSize);
        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
            this.corePoolSize=corePoolSize;
            this.waitingQueue=waitingQueue;
            this.threadFactory=threadFactory;
            this.handle=handle;
        }else {
            throw new NullPointerException("线程池参数不合法");
        }
    }
    /**
     * 实现自定义拒绝策略
     */
    //抛异常策略(默认)
    public static class MyAbortPolicy implements MyRejectedExecutionHandle{
        public MyAbortPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");
        }
    }
    //默默丢弃策略
    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

        public MyDiscardPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

        }
    }
    //丢弃掉最老的任务策略
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
        public MyDiscardOldestPolicy() {
        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭
                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了
                threadPoolExecutor.execute(runnable); //把新任务加入到队列中
            }
        }
    }
    //由调用者调用策略
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
        public MyCallerRunsPolicy(){

        }
        @Override
        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭
                runnable.run();
            }
        }
    }
    //call拒绝方法
    protected final void reject(Runnable runnable){
        this.handle.rejectedExecution(runnable, this);
    }

    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
        this.handle.rejectedExecution(runnable, threadPoolExecutor);
    }

    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        //执行任务
        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    //往线程池中放任务
    @Override
    public boolean execute(Runnable runnable)
    {
        if (!this.waitingQueue.offer(runnable)) {
            this.reject(runnable);
            return false;
        }
        else {
            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程
                MyWorker worker = new MyWorker(); //通过构造方法添加线程
            }
            return true;
        }
    }
    //优雅的关闭
    @Override
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    @Override
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }

    //判断线程池是否关闭
    @Override
    public boolean isShutdown() {
        return !this.isRunning||STOPNOW;
    }

    //获取等待队列
    @Override
    public BlockingQueue<Runnable> getWaitingQueue() {
        return this.waitingQueue;
    }
}

线程池测试类

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {


//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());


      for(int i=0;i<11;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();




  }
}

好了升级版线程池就优化到这了,后面可能还会出完善版,不断进行优化。

到此这篇关于非常适合新手学生的Java线程池升级版的文章就介绍到这了,更多相关Java 线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中Socket设置连接超时的代码分享

    Java中Socket设置连接超时的代码分享

    在我们日常连接中,如果超时时长过长的话,在开发时会影响测试,下面这篇文章主要给大家分享了关于Java中Socket设置连接超时的代码,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-06-06
  • Java编程实现快速排序及优化代码详解

    Java编程实现快速排序及优化代码详解

    这篇文章主要介绍了Java编程实现快速排序及优化代码详解,具有一定借鉴价值,需要的朋友可以了解下。
    2017-12-12
  • 使用JWT创建解析令牌及RSA非对称加密详解

    使用JWT创建解析令牌及RSA非对称加密详解

    这篇文章主要介绍了JWT创建解析令牌及RSA非对称加密详解,JWT是JSON Web Token的缩写,即JSON Web令牌,是一种自包含令牌,一种情况是webapi,类似之前的阿里云播放凭证的功能,另一种情况是多web服务器下实现无状态分布式身份验证,需要的朋友可以参考下
    2023-11-11
  • java解析.yml文件方式

    java解析.yml文件方式

    这篇文章主要介绍了java解析.yml文件方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • Java中Cookie和Session详解及区别总结

    Java中Cookie和Session详解及区别总结

    这篇文章主要介绍了Java中Cookie和Session详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下
    2022-06-06
  • Java IPage分页操作 附加自定义sql

    Java IPage分页操作 附加自定义sql

    这篇文章主要介绍了Java IPage分页加自定义sql,主要包括引入依赖,impl常规操作,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • 快速搭建Spring Boot+MyBatis的项目IDEA(附源码下载)

    快速搭建Spring Boot+MyBatis的项目IDEA(附源码下载)

    这篇文章主要介绍了快速搭建Spring Boot+MyBatis的项目IDEA(附源码下载),本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Mybatis Plus批处理操作的实现示例

    Mybatis Plus批处理操作的实现示例

    MyBatis Plus 提供了强大的批处理支持,可以帮助我们高效地处理大规模数据,本文主要介绍了Mybatis Plus批处理操作的实现示例,具有一定的参考价值,感兴趣的可以了解一下
    2024-07-07
  • Java类加载器与双亲委派机制和线程上下文类加载器专项解读分析

    Java类加载器与双亲委派机制和线程上下文类加载器专项解读分析

    类加载器负责读取Java字节代码,并转换成java.lang.Class类的一个实例的代码模块。本文主要和大家聊聊JVM类加载器ClassLoader的使用,需要的可以了解一下
    2022-12-12
  • Java数据结构通关时间复杂度和空间复杂度

    Java数据结构通关时间复杂度和空间复杂度

    对于一个算法,其时间复杂度和空间复杂度往往是相互影响的,当追求一个较好的时间复杂度时,可能会使空间复杂度的性能变差,即可能导致占用较多的存储空间,这篇文章主要给大家介绍了关于Java时间复杂度、空间复杂度的相关资料,需要的朋友可以参考下
    2022-05-05

最新评论