详解Java使用双异步后如何保证数据一致性

 更新时间:2024年01月22日 10:20:19   作者:哪吒编程  
这篇文章主要为大家详细介绍了Java使用双异步后如何保证数据一致性,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以了解下

一、前情提要

在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?

很简单,通过对比Excel文件行数和入库数量是否相等即可。

那么,如何获取异步线程的返回值呢?

二、通过Future获取异步返回值

我们可以通过给异步方法添加Future返回值的方式获取结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer实现的

AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。 基于 AQS 实现的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS实现的同步器包含两种操作:

  • acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;
  • release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。

2、FutureTask执行流程

执行@Async异步方法;

建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);

判断状态state;

  • 如果未新建或者不处于AQS,直接返回;
  • 否则进入COMPLETING状态,执行异步线程代码;

如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;

线程async-executor-1被唤醒后

  • 将自己从AQS队列中移除;
  • 然后唤醒next线程async-executor-2;
  • 改变线程async-executor-1的state;
  • 等待get()线程取值。

next等待线程被唤醒后,循环线程async-executor-1的步骤

  • 被唤醒
  • 从AQS队列中移除
  • 唤醒next线程
  • 改变异步线程状态

新建线程async-executor-N,监听异步方法的state

  • 如果处于EXCEPTIONAL以上状态,抛出异常;
  • 如果处于COMPLETING状态,加入AQS队列等待;
  • 如果处于NORMAL状态,返回结果;

3、get()方法执行流程

get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。

自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

  • 如果state小于等于COMPLETING,表示任务还在执行中;
    • 如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;
    • 如果state大于COMPLETING;
      • 如果已有等待节点WaitNode,将线程置空;
      • 返回当前状态;
    • 如果任务正在执行,让出时间片;
    • 如果还未构造等待节点,则new一个新的等待节点;
    • 如果未入队列,CAS尝试入队;
    • 如果有超时时间参数;
      • 计算超时时间;
      • 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;
      • 阻塞队列nanos毫秒。
    • 否则阻塞队列;
  • 如果state大于COMPLETING;
    • 如果执行完毕,返回结果;
    • 如果大于等于取消状态,则抛出异常。

很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~

其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~

简而言之:

1. 如果异步线程还没执行完,则进入CAS自旋; 2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程; 3. 再通过get()判断状态state; 4. 直至返回结果或(取消、超时、异常)为止。

三、FutureTask源码具体分析

1、FutureTask源码

通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {

	// 最初始的状态是new 新建状态
	private volatile int state;
    private static final int NEW          = 0; // 新建状态
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常执行完
    private static final int EXCEPTIONAL  = 3; // 异常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 正在中断
    private static final int INTERRUPTED  = 6; // 已中断

	public V get() throws InterruptedException, ExecutionException {
	    int s = state;
	    // 任务还在执行中
	    if (s <= COMPLETING)
	        s = awaitDone(false, 0L);
	    return report(s);
	}
	
	private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        	// 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任务已执行完毕或取消
            if (s > COMPLETING) {
            	// 如果已有等待节点WaitNode,将线程置空
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任务正在执行,让出时间片
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 还未构造等待节点,则new一个新的等待节点
            else if (q == null)
                q = new WaitNode();
            // 未入队列,CAS尝试入队
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果有超时时间参数
            else if (timed) {
            	// 计算超时时间
                nanos = deadline - System.nanoTime();
                // 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞队列nanos毫秒
                LockSupport.parkNanos(this, nanos);
            }
            else
            	// 阻塞队列
                LockSupport.park(this);
        }
    }
    
	private V report(int s) throws ExecutionException {
		// 获取outcome中记录的返回结果
        Object x = outcome;
        // 如果执行完毕,返回结果
        if (s == NORMAL)
            return (V)x;
            // 如果大于等于取消状态,则抛出异常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}

2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
    	// 此代码为简化关键性代码
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入数据异常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

3、通过Future<Integer>.get()获取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("获取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在执行---获取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("获取Future返回值异常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);
    return insertFlag;
}

4、这里也可以通过新线程+Future获取Future返回值

不过感觉多此一举了,就当练习Future异步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("新线程+Future获取Future返回值异常: ", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?

以上就是详解Java使用双异步后如何保证数据一致性的详细内容,更多关于Java双异步如何保证数据一致性的资料请关注脚本之家其它相关文章!

相关文章

  • 关于Hadoop中Spark Streaming的基本概念

    关于Hadoop中Spark Streaming的基本概念

    这篇文章主要介绍了关于Hadoop中Spark Streaming的基本概念,Spark Streaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力,Spark Streaming可结合批处理和交互式查询,需要的朋友可以参考下
    2023-07-07
  • Java用 Rhino/Nashorn 代替第三方 JSON 转换库

    Java用 Rhino/Nashorn 代替第三方 JSON 转换库

    本篇文章主要介绍了Java用 Rhino/Nashorn 代替第三方 JSON 转换库,非常具有实用价值,需要的朋友可以参考下
    2017-05-05
  • Java 阻塞队列和线程池原理分析

    Java 阻塞队列和线程池原理分析

    这篇文章主要介绍了Java 阻塞队列和线程池原理分析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • JVM角度调试优化MyEclipse

    JVM角度调试优化MyEclipse

    这篇文章主要介绍了从JVM角度对MyEclipse进行调试优化,为大家分析调试优化MyEclipse的步骤,感兴趣的小伙伴们可以参考一下
    2016-05-05
  • Java8 Lambda表达式详解及实例

    Java8 Lambda表达式详解及实例

    这篇文章主要介绍了Java8 Lambda表达式详解的相关资料,需要的朋友可以参考下
    2016-09-09
  • JavaWeb开发入门第二篇Tomcat服务器配置讲解

    JavaWeb开发入门第二篇Tomcat服务器配置讲解

    JavaWeb开发入门第二篇主要介绍了Tomcat服务器配置的方法教大家如何使用Tomcat服务器,感兴趣的小伙伴们可以参考一下
    2016-04-04
  • Java开发微信公众号接收和被动回复普通消息

    Java开发微信公众号接收和被动回复普通消息

    这篇文章主要介绍了Java开发微信公众号接收和被动回复普通消息的相关资料,需要的朋友可以参考下
    2016-01-01
  • java实现文件夹解压和压缩

    java实现文件夹解压和压缩

    这篇文章主要为大家详细介绍了java实现文件夹解压和压缩,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-03-03
  • idea开启mybatis控制台SQL日志打印的代码示例

    idea开启mybatis控制台SQL日志打印的代码示例

    本文主要介绍了idea开启mybatis控制台SQL日志打印的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-12-12
  • JDBC以反射机制加载类注册驱动连接MySQL

    JDBC以反射机制加载类注册驱动连接MySQL

    这篇文章介绍了JDBC以反射机制加载类注册驱动连接MySQL的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-01-01

最新评论