Spring Boot 异步框架的使用详解

 更新时间:2019年01月17日 14:21:19   作者:JavaDog  
这篇文章主要介绍了Spring Boot 异步框架的使用详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

1. 前言

随着数据量和调用量的增长,用户对应用的性能要求越来越高。另外,在实际的服务中,还存在着这样的场景:系统在组装数据的时候,对于数据的各个部分的获取实际上是没有前后依赖关系的。这些问题都很容易让我们想到将这些同步调用全都改造为异步调用。不过自己实现起来比较麻烦,还容易出错。好在Spring已经提供了该问题的解决方案,而且使用起来十分方便。

2.Spring异步执行框架的使用方法

2.1 maven 依赖

Spring异步执行框架的相关bean包含在spring-context和spring-aop模块中,所以只要引入上述的模块即可。

2.2 开启异步任务支持

Spring提供了@EnableAsync的注解来标注是否启用异步任务支持。使用方式如下:

@Configuration
@EnableAsync
public class AppConfig {
}

Note: @EnableAsync必须要配合@Configuration使用,否则会不生效

2.3 方法标记为异步调用

将同步方法的调用改为异步调用也很简单。对于返回值为void的方法,直接加上@Async注解即可。对于有返回值的方法,除了加上上述的注解外,还需要将方法的返回值修改为Future类型和将返回值用AsyncResult包装起来。如下所示:

// 无返回值的方法直接加上注解即可。 
@Async
public void method1() {
 ...
}

// 有返回值的方法需要修改返回值。
@Async
public Future<Object> method2() {
 ...
 return new AsyncResult<>(Object);
}  

2.4 方法调用

对于void的方法,和普通的调用没有任何区别。对于非void的方法,由于返回值是Future类型,所以需要用get()方法来获取返回值。如下所示:

public static void main(String[] args) {
  service.method1();
  Future<Object> futureResult = service.method2();
  Object result;
  try {
     result = futureResult.get(); 
    } catch (InterruptedException | ExecutionException e) {
      ...
    }
}

3. 原理简介

这块的源码的逻辑还是比较简单的,主要是Spring帮我们生成并管理了一个线程池,然后方法调用的时候使用动态代理将方法的执行包装为Callable类型并提交到线程池中执行。核心的实现逻辑在AsyncExecutionInterceptor类的invoke()方法中。如下所示:

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

  AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  if (executor == null) {
    throw new IllegalStateException(
        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  }

  Callable<Object> task = new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      try {
        Object result = invocation.proceed();
        if (result instanceof Future) {
          return ((Future<?>) result).get();
        }
      }
      catch (ExecutionException ex) {
        handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
      }
      catch (Throwable ex) {
        handleError(ex, userDeclaredMethod, invocation.getArguments());
      }
      return null;
    }
  };

  return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

4.自定义taskExecutor及异常处理

4.1自定义taskExecutor

Spring查找TaskExecutor逻辑是:

1. 如果Spring context中存在唯一的TaskExecutor bean,那么就使用这个bean。

2. 如果1中的bean不存在,那么就会查找是否存在一个beanName为taskExecutor且是java.util.concurrent.Executor实例的bean,有则使用这个bean。

3. 如果1、2中的都不存在,那么Spring就会直接使用默认的Executor,即SimpleAsyncTaskExecutor。

在第2节的实例中,我们直接使用的是Spring默认的TaskExecutor。但是对于每一个新的任务,SimpleAysncTaskExecutor都是直接创建新的线程来执行,所以无法重用线程。具体的执行的代码如下:

@Override
public void execute(Runnable task, long startTimeout) {
  Assert.notNull(task, "Runnable must not be null");
  Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
  if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
    this.concurrencyThrottle.beforeAccess();
    doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
  }
  else {
    doExecute(taskToUse);
  }
}

protected void doExecute(Runnable task) {
  Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
  thread.start();
} 

所以我们在使用的时候,最好是使用自定义的TaskExecutor。结合上面描述的Spring查找TaskExecutor的逻辑,最简单的自定义的方法是使用@Bean注解。示例如下:

// ThreadPoolTaskExecutor的配置基本等同于线程池
@Bean("taskExecutor")
public Executor getAsyncExecutor() {
  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
  taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
  taskExecutor.setQueueCapacity(CORE_POOL_SIZE * 10);
  taskExecutor.setThreadNamePrefix("wssys-async-task-thread-pool");
  taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  taskExecutor.setAwaitTerminationSeconds(60 * 10);
  taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  return taskExecutor;
}

另外,Spring还提供了一个AsyncConfigurer接口,通过实现该接口,除了可以实现自定义Executor以外,还可以自定义异常的处理。代码如下:

@Configuration
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

  private static final int MAX_POOL_SIZE = 50;

  private static final int CORE_POOL_SIZE = 20;

  @Override
  @Bean("taskExecutor")
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

    taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    taskExecutor.setQueueCapacity(CORE_POOL_SIZE * 10);
    taskExecutor.setThreadNamePrefix("async-task-thread-pool");
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setAwaitTerminationSeconds(60 * 10);
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    return taskExecutor;
  }

  @Override
  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return (ex, method, params) -> log.error("invoke async method occurs error. method: {}, params: {}",
      method.getName(), JSON.toJSONString(params), ex);
  }

}

Note:

Spring还提供了一个AsyncConfigurerSupport类,该类也实现了AsyncConfigurer接口,且方法的返回值都是null,旨在提供一个方便的实现。

当getAsyncExecutor()方法返回null的时候,Spring会使用默认的处理器(强烈不推荐)。

当getAsyncUncaughtExceptionHandler()返回null的时候,Spring会使用SimpleAsyncUncaughtExceptionHandler来处理异常,该类会打印出异常的信息。

所以对该类的使用,最佳的实践是继承该类,并且覆盖实现getAsyncExecutor()方法。

4.2 异常处理

Spring异步框架对异常的处理如下所示:

// 所在类:AsyncExecutionAspectSupport
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
  if (Future.class.isAssignableFrom(method.getReturnType())) {
    ReflectionUtils.rethrowException(ex);
  }
  else {
    // Could not transmit the exception to the caller with default executor
    try {
      this.exceptionHandler.handleUncaughtException(ex, method, params);
    }
    catch (Throwable ex2) {
      logger.error("Exception handler for async method '" + method.toGenericString() +
          "' threw unexpected exception itself", ex2);
    }
  }
}

从代码来看,如果返回值是Future类型,那么直接将异常抛出。如果返回值不是Future类型(基本上包含的是所有返回值void类型的方法,因为如果方法有返回值,必须要用Future包装起来),那么会调用handleUncaughtException方法来处理异常。

注意:在handleUncaughtException()方法中抛出的任何异常,都会被Spring Catch住,所以没有办法将void的方法再次抛出并传播到上层调用方的!!!

关于Spring 这个设计的缘由我的理解是:既然方法的返回值是void,就说明调用方不关心方法执行是否成功,所以也就没有必要去处理方法抛出的异常。如果需要关心异步方法是否成功,那么返回值改为boolean就可以了。

4.4 最佳实践的建议

  1. @Async可以指定方法执行的Executor,用法:@Async("MyTaskExecutor")。推荐指定Executor,这样可以避免因为Executor配置没有生效而Spring使用默认的Executor的问题。
  2. 实现接口AsyncConfigurer的时候,方法getAsyncExecutor()必须要使用@Bean,并指定Bean的name。如果不使用@Bean,那么该方法返回的Executor并不会被Spring管理。用java doc api的原话是:is not a fully managed Spring bean.(具体含义没有太理解,不过亲测不加这个注解无法正常使用)
  3. 由于其本质上还是基于代理实现的,所以如果一个类中有A、B两个异步方法,而A中存在对B的调用,那么调用A方法的时候,B方法不会去异步执行的。
  4. 在异步方法上标注@Transactional是无效的。
  5. future.get()的时候,最好使用get(long timeout, TimeUnit unit)方法,避免长时间阻塞。
  6. ListenableFuture和CompletableFuture也是推荐使用的,他们相比Future,提供了对异步调用的各个阶段或过程进行介入的能力。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • MySQL数据文件直接通过拷贝备份与恢复的操作方法

    MySQL数据文件直接通过拷贝备份与恢复的操作方法

    这篇文章主要介绍了MySQL数据文件直接通过拷贝备份与恢复的操作方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-09-09
  • Java中的snowflake算法详解

    Java中的snowflake算法详解

    这篇文章主要介绍了Java中的snowflake算法详解,Snowflake算法产生是为了满足Twitter每秒上万条消息的请求,每条消息都必须分配一条唯一的id,这些id还需要一些大致的顺序,并且在分布式系统中不同机器产生的id必须不同,需要的朋友可以参考下
    2023-08-08
  • Netty内存池泄漏问题以解决方案

    Netty内存池泄漏问题以解决方案

    这篇文章主要介绍了Netty内存池泄漏问题以解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • MyBatis学习教程(八)-Mybatis3.x与Spring4.x整合图文详解

    MyBatis学习教程(八)-Mybatis3.x与Spring4.x整合图文详解

    这篇文章主要介绍了MyBatis学习教程(八)-Mybatis3.x与Spring4.x整合图文详解的相关资料,需要的朋友可以参考下
    2016-05-05
  • Java多线程状态及方法实例解析

    Java多线程状态及方法实例解析

    这篇文章主要介绍了Java多线程状态及方法实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • spring-cloud-stream结合kafka使用详解

    spring-cloud-stream结合kafka使用详解

    这篇文章主要介绍了spring-cloud-stream结合kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • MyBatis3.X复杂Sql查询的语句

    MyBatis3.X复杂Sql查询的语句

    这篇文章主要介绍了MyBatis3.X复杂Sql查询的相关资料,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • Java中StringBuilder与StringBuffer使用及源码解读

    Java中StringBuilder与StringBuffer使用及源码解读

    我们前面学习的String就属于不可变字符串,因为理论上一个String字符串一旦定义好,其内容就不可再被改变,但实际上,还有另一种可变字符串,包括StringBuilder和StringBuffer两个类,那可变字符串有什么特点,又怎么使用呢,接下来就请大家跟我一起来学习吧
    2023-05-05
  • JVM执行引擎和垃圾回收要点总结

    JVM执行引擎和垃圾回收要点总结

    不论是在问题现场还是跳槽面试,我们面对JVM性能问题,依旧会束手无辞,它需要你对Java虚拟机的实现和优化,有极为深刻的理解。所以我在这里整理了一下 JVM的知识点。今天说说虚拟机执行引擎和垃圾回收,都是十足的干货,请各位看官耐心批阅!
    2021-06-06
  • servlet异步请求的实现

    servlet异步请求的实现

    本文主要介绍了servlet异步请求的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07

最新评论