Project Reactor源码解析publishOn使用示例

 更新时间:2022年08月15日 17:05:15   作者:夜尽天明_  
这篇文章主要为大家介绍了Project Reactor源码解析publishOn使用示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

功能分析

相关示例源码:github.com/chentianmin…

public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)

onNext()onComplete()onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。

  • scheduler:线程切换的调度器,Scheduler用来生成实际执行异步任务的Worker
  • delayError:是否延时转发Error。如果为true,当收到上游的Error时,会等队列中的元素消费完毕后再向下游转发Error。否则会立即转发Error,可能导致队列中的元素丢失。默认为true
  • prefetch:预取元素的数量,同时也是队列的容量。默认值为Queues.SMALL_BUFFER_SIZE,该值通过配置进行修改。

代码示例

prefetch

/**
 * 每隔delayMillis生产一个元素
 */
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生产");
                    fluxSink.next(i);
                });
        fluxSink.complete();
    });
}
@Test
public void testPreFetch() {
    delayPublishFlux(1000, 1, 5)
            .doOnRequest(i -> logLong(i, "request"))
            .publishOn(Schedulers.boundedElastic(), 2)
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

每次会都向上游请求2个元素。另外还能发现,从第二个request开始,线程发生了切换。

delayError

/**
 * 每隔delayMillis生产一个元素,最后发送Error
 */
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
    return Flux.create(fluxSink -> {
        IntStream.range(startInclusive, endExclusive)
                .forEach(i -> {
                    // 同步next
                    sleep(delayMillis);
                    logInt(i, "生产");
                    fluxSink.next(i);
                });
        fluxSink.error(new RuntimeException("发布错误!"));
    });
}
@Test
public void testDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            // 只是为了消费慢一点
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

元素消费完才触发Error

@Test
public void testNotDelayError() {
    delayPublishFluxError(500, 1, 5)
            .publishOn(Schedulers.boundedElastic(), false, 256)
            // 只是为了消费慢一点
            .doOnNext(i -> sleep(1000))
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

元素还没消费完就触发Error

源码分析

首先看一下publishOn()操作符在装配阶段做了什么,直接查看Flux#publishOn()源码。

Flux#publishOn()

publishOn()装配阶段重点是创建了FluxPublishOn对象。

接下来,我们分析订阅阶段发生了什么。一个Publisher在订阅的时候调用的是其subscribe()方法,因此我们继续看Flux#subscribe()源码。

Flux#subscribe()

Flux#subscribe()方法的实现中,如果上游PublisherOptimizableOperator类型,实际的Subscriber是通过调用该InternalFluxOperator#subscribeOrReturn()方法返回的。如果返回值为null,直接return

对于publishOn()操作符来说,装配阶段创建的FluxPublishOn就是OptimizableOperator类型。所以继续查看FluxPublishOn#subscribeOrReturn()源码。

FluxPublishOn#subscribeOrReturn()

可以看到,方法返回的是PublishOnSubscriber,它包装了原始的Subscriber

在后续的订阅阶段一定会调用其onSubscribe()方法,在运行阶段一定会调用其onNext()方法。我们先看FluxPublishOn#onSubscribe()源码。

FluxPublishOn#onSubscribe()

onSubscribe()实现中,分为同步队列融合、异步队列融合以及非融合方式处理。

如果上游的SubscriptionQueueSubscription类型,则会进行队列融合。具体采用同步还是异步,取决于该QueueSubscription#requestFusion()实现。

  • 同步队列融合:复用当前队列,继续调用下游onSubscribe()方法,但不会继续调用上游request()方法。
  • 异步队列融合:复用当前队列,然后继续调用下游onSubscribe()以及上游request()方法,请求数量是prefetch
  • 非融合:创建一个新的队列,然后继续调用下游onSubscribe()以及上游request()方法,请求数量是prefetch

接下来,我们从源码角度分别介绍上述三种方式的处理逻辑,首先介绍非融合方式。

非融合

先看如下代码示例,该代码会以非融合方式执行。

@Test
public void testNoFuse() {
    delayPublishFlux(1000, 1, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(i -> logInt(i, "消费"));
    sleep(10000);
}

间隔1s生产消费元素!

在消费阶段,一定会调用FluxPublishOn#onNext()方法。

FluxPublishOn#onNext()

我们重点关注非融合方式执行逻辑,其实只做了2件事:

  • 将下发的元素添加到队列中,该队列就是onSubscribe()阶段创建的新队列。
  • 调用trySchedule()方法进行调度。

继续看FluxPublishOn#trySchedule()源码。

FluxPublishOn#trySchedule()

这里其实就是交由woker异步执行,后续会执行FluxPublishOn.run()方法。

FluxPublishOn#run()

在run()方法执行的时候,分为3段逻辑:

  • 如果是输出融合,执行runBackfused()方法。
  • 如果是同步队列融合,执行runSync()方法。
  • 否则,执行runAsync()方法。

对于当前例子,实际执行的是runAsync()方法,继续查看其源码。

FluxPublishOn#runAsync()

runAsync()做的事情比较简单,就是排空队列中的元素下发给下游。同时在这里会继续调用request()向上游请求数据,这也是前面说的从第二个request()开始会进行线程切换的原因。

另外这里还会调用checkTerminated(),检查终止情况。

FluxPublishOn#checkTerminated()

如果delayError=true,必须当前队列为空是才会转发Error。如果delayError=false,则直接转发Error。继续查看onComplete()方法。

FluxPublishOn#onComplete()

如果未结束,将done标记设置为true,然后再次调用trySchedule()进行调度。后续再被调度到的时候,如果队列已经排空,才会调用下游onComplete(),触发完成。

小结

简单总结一下非融合执行过程:

onSubscribe()时创建一个队列,在onNext()时将上游下发的元素添加到队列中,然后异步排空队列中的元素,继续下发给下游。

同步队列融合

以下代码会以同步队列融合方式执行。

@Test
public void testSyncFuse() {
    Flux.just(1, 2 ,3, 4, 5)
            .publishOn(Schedulers.boundedElastic())
            .subscribe(this::logInt);
    sleep(10000);
}

因为Flux.just()对应的SubscriptionSynchronousSubscription,其requestFusion()方法实现如下:

SynchronousSubscription#requestFusion()

此时返回的是SYNC,执行同步队列融合。

前面提到过,同步队列融合会复用当前队列,继续调用下游onSubscribe()方法,但不会继续调用上游request()方法。

这意味着,此时FluxPublishOn#onNext()FluxPublishOn#onComplete()方法并不会调用。但是FluxPublishOn#request()依然会被下游调用到。

FluxPublishOn#request()

request()方法中还是会调用trySchedule(),后续会异步调用runSync()方法(前面已经分析了)。

对于非融合方式,trySchedule()也会执行,只是这次调度的时候,队列中还没有数据被添加进去。

FluxPublishOn#runSync()

runSync()实现上runAsync()差不多,也是排空队列的元素,继续下发给下游。不同的点是少了request()调用,以及取消完成控制有差异。

小结

简单总结一下同步队列融合执行过程:

onSubsrribe()时直接复用上游QueueSubscription作为队列,不会调用上游request()请求数据,在自身request()时异步排空队列中的元素,继续下发给下游。

异步队列融合

以下代码会以异步队列融合方式执行。

@Test
public void testAsyncFuse() {
    Flux.just(1, 2, 3, 4, 5)
            .windowUntil(i -&gt; i % 3 == 0)
            .publishOn(Schedulers.boundedElastic())
            .flatMap(Function.identity())
            .subscribe(this::logInt);
    sleep(10000);
}

因为windowUntil()对应的SubscriptionWindowPredicateMain,其requestFusion()方法实现如下:

WindowPredicateMain#requestFusion()

此时返回ASYNC,执行异步队列融合。接下来再看一下FluxPublishOn#onNext()源码。

FluxPublishOn#onNext()

注意,此时onNext()方法参数是null,表明上游并没有真正下发元素,可以将其看做是一个触发Worker调度的信号。后续还是会异步执行runAsync()方法,这里就不再分析了。

这其实也很容易理解:异步队列融合直接复用了上游的QueueSubscription作为队列,真正的数据应该由这个队列下发。

总结

简单总结一下同步队列融合执行过程:

onSubsrribe()时直接复用上游QueueSubscription作为队列,在onNext()时接收上游信号,异步排空队列中的元素,继续下发给下游。

非融合、同步队列融合、异步队列融合比较如下:

以上就是Project Reactor源码解析publishOn使用示例的详细内容,更多关于Project Reactor publishOn的资料请关注脚本之家其它相关文章!

相关文章

  • springboot中使用FastJson解决long类型在js中失去精度的问题

    springboot中使用FastJson解决long类型在js中失去精度的问题

    这篇文章主要介绍了springboot中使用FastJson解决long类型在js中失去精度的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • CyclicBarrier线程同步共享变量底层原理示例解析

    CyclicBarrier线程同步共享变量底层原理示例解析

    这篇文章主要为大家介绍了CyclicBarrier线程同步共享变量底层原理示例解析详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • Java中的IO流之字符流Reader和Writer

    Java中的IO流之字符流Reader和Writer

    这篇文章主要介绍了Java中的IO流之字符流Reader和Writer,Reader : 和InputStream的唯一的区别就在于读的数据单位不同,继承自Reader的流都是用于向程序中输入数据,且数据的单位为字符16bit,需要的朋友可以参考下
    2023-10-10
  • Java多线程模式之Balking模式详解

    Java多线程模式之Balking模式详解

    这篇文章主要介绍了Java多线程模式之Balking模式,结合实例形式较为详细的分析了Balking模式的原理、用法与相关注意事项,需要的朋友可以参考下
    2017-06-06
  • 详解在springmvc中解决FastJson循环引用的问题

    详解在springmvc中解决FastJson循环引用的问题

    本篇文章主要介绍了在springmvc中解决FastJson循环引用的问题,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • SSH框架网上商城项目第21战之详解易宝支付的流程

    SSH框架网上商城项目第21战之详解易宝支付的流程

    这篇文章主要为大家详细介绍了SSH框架网上商城项目第21战之易宝支付的流程,感兴趣的小伙伴们可以参考一下
    2016-06-06
  • Java中进程、协程与线程的区别详解

    Java中进程、协程与线程的区别详解

    这篇文章主要介绍了Java中进程,线程,协程的概念、区别以及使用场景的选择,早期的操作系统每个程序就是一个进程,知道一个程序运行完,才能进行下一个进程,就是"单进程时代",一切的程序只能串行发生,需要的朋友可以参考下
    2023-08-08
  • springboot + devtools(热部署)实例教程

    springboot + devtools(热部署)实例教程

    devtools是boot的一个热部署工具,当我们修改了classpath下的文件(包括类文件、属性文件、页面等)时,会重新启动应用。本文通过实例给大家介绍springboot+devtools热部署,感兴趣的朋友一起看看吧
    2017-04-04
  • 一文教你如何通过三级缓存解决Spring循环依赖

    一文教你如何通过三级缓存解决Spring循环依赖

    这篇文章主要介绍了如何通过三级缓存解决 Spring 循环依赖,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考价值,需要的朋友可以参考下
    2023-07-07
  • 详解Spring Cloud 跨服务数据聚合框架

    详解Spring Cloud 跨服务数据聚合框架

    这篇文章主要介绍了详解Spring Cloud 跨服务数据聚合框架,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03

最新评论