Java中基于DeferredResult的异步服务详解

 更新时间:2023年12月15日 10:02:27   作者:爱喝咖啡的程序员  
这篇文章主要介绍了Java中基于DeferredResult的异步服务详解,DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,需要的朋友可以参考下

一. 简介

Servlet3.0提供了基于servlet的异步处理api,Spring MVC只是将这些api进行了一系列的封装,从而实现了DeferredResult。

DeferredResult字面意思是"延迟结果",它允许Spring MVC收到请求后,立即释放(归还)容器线程,以便容器可以接收更多的外部请求,提升吞吐量,与此同时,DeferredResult将陷入阻塞,直到我们主动将结果set到DeferredResult,最后,DeferredResult会重新申请容器线程,并将本次请求返回给客户端。

二. 使用

1. 监听器 onTimeout()

当deferredResult被创建出来之后,执行setResult()之前,这之间的时间超过设定值时(比如下方案例中设置为5秒超时),则被判定为超时。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置超时事件
deferredResult.onTimeout(() -> {
    System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
    deferredResult.setResult("异步线程执行超时");
});

2. 监听器 onError()

当onTimeout()或onCompletion()等回调函数中的代码报错时,则会执行监听器onError()的回调函数。

PS: DeferredResult之外的代码报错不会影响到onError()。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置异常事件
deferredResult.onError((throwable) -> {
    System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
    deferredResult.setErrorResult("异步线程执行出错");
});

3. 监听器 onCompletion()

代码任意位置调用了同一个DeferredResult的setResult()后,则会被DeferredResult的onCompletion()监听器捕获到。

Spring会任选一条容器线程来执行onCompletion( )中的代码(由于请求线程已被释放(归还),所以此处可能再次由同一条请求线程来处理,也可能由其他线程来处理)。

DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 设置完成事件
deferredResult.onCompletion(() -> {
    System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
});

完整的代码为:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class DemoController {
    // 自定义线程池
    public static ExecutorService exec = Executors.newCachedThreadPool();
    @GetMapping("/demo")
    public DeferredResult<String> demoResult() {
        System.out.println("容器线程: " + Thread.currentThread().getName());
        // 创建DeferredResult对象,设置超时时长 20秒
        DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
        // 设置超时事件
        deferredResult.onTimeout(() -> {
            System.out.println("异步线程执行超时, 异步线程的名称: " + Thread.currentThread().getName());
            // throw new RuntimeException("超时事件报错了!");
            deferredResult.setResult("异步线程执行超时");
        });
        // 设置异常事件
        deferredResult.onError((throwable) -> {
            System.out.println("异步请求出现错误,异步线程的名称: " + Thread.currentThread().getName() + "异常: " + throwable);
            deferredResult.setErrorResult("异步线程执行出错");
        });
        // 设置完成事件
        deferredResult.onCompletion(() -> {
            System.out.println("异步线程执行完毕,异步线程的名称: " + Thread.currentThread().getName());
        });
        exec.execute(() -> {
            System.out.println("[线程池] 异步线程的名称: " + Thread.currentThread().getName());
            deferredResult.setResult("异步线程执行完毕");
        });
        System.out.println("Servlet thread release");
        return deferredResult;
    }
}

三. 拓展

有些业务场景下,我们希望新的请求触发(激活)之前陷入阻塞的请求,此外可以通过不同的key来区分不同的请求。

比如apollo在实现时就利用了DeferredResult。客户端向服务器端发送轮询请求,服务端收到请求后,会立刻释放容器线程,并阻塞本次请求,若apollo托管的配置文件没有发生任何改变,则轮询请求会超时(返回304)。当有新的配置发布时,服务端会调用DeferredResult setResult()方法,进入onCompletion(),并使尚未超时的轮寻请求正常返回(200)。

大概如下: 

@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer {
 
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
 
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
@RestController
public class ApolloController {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    //guava中的Multimap,多值map,对map的增强,一个key可以保持多个value
    private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
    //模拟长轮询
    @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
        logger.info("Request received");
        DeferredResult<String> deferredResult = new DeferredResult<>();
        //当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watch key
        deferredResult.onCompletion(new Runnable() {
            @Override
            public void run() {
                System.out.println("remove key:" + namespace);
                watchRequests.remove(namespace, deferredResult);
            }
        });
        watchRequests.put(namespace, deferredResult);
        logger.info("Servlet thread released");
        return deferredResult;
    }
    //模拟发布namespace配置
    @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html")
    public Object publishConfig(@PathVariable("namespace") String namespace) {
        if (watchRequests.containsKey(namespace)) {
            Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
            Long time = System.currentTimeMillis();
            //通知所有watch这个namespace变更的长轮训配置变更结果
            for (DeferredResult<String> deferredResult : deferredResults) {
                deferredResult.setResult(namespace + " changed:" + time);
            }
        }
        return "success";
    }
}

当请求超时的时候会产生AsyncRequestTimeoutException,我们定义一个全局异常捕获类:

 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ControllerAdvice
class GlobalControllerExceptionHandler {
    protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
    @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码
    @ResponseBody
    @ExceptionHandler(AsyncRequestTimeoutException.class) //捕获特定异常
    public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) {
        System.out.println("handleAsyncRequestTimeoutException");
    }
}

然后我们通过postman工具发送请求//localhost:8080/watch/mynamespace,请求会挂起,60秒后,DeferredResult超时,客户端正常收到了304状态码,表明在这个期间配置没有变更过。

然后我们在模拟配置变更的情况,再次发起请求//localhost:8080/watch/mynamespace,等待个10秒钟(不要超过60秒),然后调用//localhost:8080/publish/mynamespace,发布配置变更。这时postman会立刻收到response响应结果:  

mynamespace changed:1538880050147

表明在轮训期间有配置变更过。

这里我们用了一个MultiMap来存放所有轮训的请求,Key对应的是namespace,value对应的是所有watch这个namespace变更的异步请求DeferredResult,需要注意的是:在DeferredResult完成的时候记得移除MultiMap中相应的key,避免内存溢出请求。

采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。

四. DeferredResult与Callable的区别

DeferredResult和Callable都可以在Controller层的方法中直接返回,请求收到后,释放容器线程,在另一个线程中通过异步的方式执行任务,最后将请求返回给客户端。

不同之处在于,使用Callable时,当其它线程中的任务执行完毕后,请求会立刻返回给客户端,而DeferredResult则需要用户在代码中手动set值到DeferredResult,否则即便异步线程中的任务执行完毕,DeferredResult仍然不会向客户端返回任何结果。

到此这篇关于Java中基于DeferredResult的异步服务详解的文章就介绍到这了,更多相关基于DeferredResult的异步服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java synchronized与死锁深入探究

    Java synchronized与死锁深入探究

    这篇文章主要介绍了Java synchronized与死锁,Java中提供了synchronized关键字,将可能引发安全问题的代码包裹在synchronized代码块中,表示这些代码需要进行线程同步
    2023-01-01
  • java文件读写操作实例详解

    java文件读写操作实例详解

    java的io流读取数据使用io流读取文件和向文件中写数据,这篇文章主要给大家介绍了关于java文件读写操作的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-02-02
  • SpringBoot如何实现文件下载

    SpringBoot如何实现文件下载

    这篇文章主要介绍了SpringBoot如何实现文件下载问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-11-11
  • Maven pom的distributionManagement配置方式

    Maven pom的distributionManagement配置方式

    文章主要介绍了Maven的distributionManagement配置方式,以及它的作用、配置方法和重要性,distributionManagement用于指定构件的发布位置,包括下载URL、状态等,文章还详细解释了如何配置repository和snapshotRepository,以及它们的用途和区别
    2025-01-01
  • Springboot中Jackson用法详解

    Springboot中Jackson用法详解

    Springboot自带默认json解析Jackson,可以在不引入其他json解析包情况下,解析json字段,下面我们就来聊聊Springboot中Jackson的用法吧
    2025-01-01
  • springboot前端传参date类型后台处理的方式

    springboot前端传参date类型后台处理的方式

    这篇文章主要介绍了springboot前端传参date类型后台处理的方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • 基于HttpServletRequest 相关常用方法的应用

    基于HttpServletRequest 相关常用方法的应用

    本篇文章小编为大家介绍,基于HttpServletRequest 相关常用方法的应用,需要的朋友参考下
    2013-04-04
  • Java实现简单的贪吃蛇游戏

    Java实现简单的贪吃蛇游戏

    这篇文章主要介绍了Java实现简单的贪吃蛇游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • Java中的观察者模式实例讲解

    Java中的观察者模式实例讲解

    这篇文章主要介绍了Java中的观察者模式实例讲解,本文先是讲解了观察者模式的概念,然后以实例讲解观察者模式的实现,以及给出了UML图,需要的朋友可以参考下
    2014-12-12
  • SpringBoot整合Pulsar的实现示例

    SpringBoot整合Pulsar的实现示例

    本文主要介绍了SpringBoot整合Pulsar的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07

最新评论