Reactor 多任务并发执行且结果按顺序返回第一个

 更新时间:2022年09月22日 15:06:38   作者:​​​​​​​六七十三  
这篇文章主要介绍了Reactor 多任务并发执行且结果按顺序返回第一个,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

1 场景

调用多个平级服务,按照服务优先级返回第一个有效数据。

具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

2 创建 service

2.1 创建基本接口和实体类

public interface TestServiceI {
    Mono request();
}

提供一个 request 方法,返回一个 Mono 对象。

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
    private String name;
}

2.2 创建 service 实现

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

第一个 service 执行耗时 500ms。返回空对象;

创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

3 主体方法

public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        TestServiceI testServiceImpl4 = new TestServiceImpl4();
        TestServiceI testServiceImpl5 = new TestServiceImpl5();
        TestServiceI testServiceImpl6 = new TestServiceImpl6();
        List<TestServiceI> serviceIList = new ArrayList<>();
        serviceIList.add(testServiceImpl4);
        serviceIList.add(testServiceImpl5);
        serviceIList.add(testServiceImpl6);

    // 执行 service 列表,这样有多少个 service 都可以
        Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
                .map(service -> {
                    return service.request();
                });
    // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行
        Flux flux = monoFlux.flatMapSequential(mono -> {
            return mono.map(user -> {
                        TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
                        if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
                            return testUser;
                        }
            // null 在 reactor 中是异常数据。
                        return null;
                    })
                    .onErrorContinue((err, i) -> {
                        log.info("onErrorContinue={}", i);
                    });
        });
        Mono mono = flux.elementAt(0, Mono.just(""));
        Object block = mono.block();
        System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));
    }
  • 1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
  • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:Reactor中的onErrorContinue 和 onErrorResume
  • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。

执行输出:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 执行耗时ms:2895

  • 1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
  • 2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

4 实现异步

4.1 subcribeOn 实现异步

修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                //增加subscribeOn
                .subscribeOn(Schedulers.boundedElastic())
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

再次执行输出如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 执行耗时ms:1242

  • 1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic
  • 2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

4.2 CompletableFuture 实现异步

修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

@Slf4j
public class TestServiceImpl1 implements TestServiceI{
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("service1.threadName=" + Thread.currentThread().getName());
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "testname1";
        });

        return Mono.fromFuture(uCompletableFuture).map(name -> {
            return new TestUser(name);
        });
    }
}

执行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 执行耗时ms:1238

  • 1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
  • 2、最终耗时和方法1基本差不多。

到此这篇关于Reactor 多任务并发执行且结果按顺序返回第一个的文章就介绍到这了,更多相关Reactor 多任务执行内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Springboot整合mqtt服务的示例代码

    Springboot整合mqtt服务的示例代码

    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。本文为大家分享了Springboot整合mqtt服务的示例代码,需要的可以参考一下
    2022-03-03
  • RabbitMQ 的七种队列模式和应用场景

    RabbitMQ 的七种队列模式和应用场景

    最近学习RabbitMQ,本文就记录一下RabbitMQ 的七种队列模式和应用场景,方便以后使用,也方便和大家共享,相互交流
    2021-05-05
  • spring 重复注解和aop拦截的实现示例

    spring 重复注解和aop拦截的实现示例

    本文主要介绍了spring 重复注解和aop拦截的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-08-08
  • java compiler没有1.8怎么解决

    java compiler没有1.8怎么解决

    这篇文章主要介绍了java compiler没有1.8的解决方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-08-08
  • Spring源码BeanFactoryPostProcessor详解

    Spring源码BeanFactoryPostProcessor详解

    BeanFactoryPostProcessor的执行时机是在Spring扫描完成后,Bean初始化前,当我们实现BeanFactoryPostProcessor接口,可以在Bean的初始化之前对Bean进行属性的修改,下面通过本文看下Spring源码分析-BeanFactoryPostProcessor的实例代码,感兴趣的朋友一起看看吧
    2021-11-11
  • springboot整合jquery和bootstrap框架过程图解

    springboot整合jquery和bootstrap框架过程图解

    这篇文章主要介绍了springboot整合jquery和bootstrap框架过程图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • java 抽象类的实例详解

    java 抽象类的实例详解

    这篇文章主要介绍了java 抽象类的实例详解的相关资料,希望通过本大家能理解掌握这部分内容,需要的朋友可以参考下
    2017-09-09
  • 使用JPA自定义VO类型转换(EntityUtils工具类)

    使用JPA自定义VO类型转换(EntityUtils工具类)

    这篇文章主要介绍了使用JPA自定义VO类型转换(EntityUtils工具类),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • java网络之基于UDP的聊天程序示例解析

    java网络之基于UDP的聊天程序示例解析

    这篇文章主要介绍了java网络之基于UDP的聊天程序示例解析,文中通过步骤及示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 基于java SSM springboot实现景区行李寄存管理系统

    基于java SSM springboot实现景区行李寄存管理系统

    这篇文章主要介绍了基于java SSM springboot实现的景区行李寄存管理系统,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08

最新评论