RxJava实战之订阅流基本原理示例解析

 更新时间:2022年12月30日 16:21:38   作者:itbird01  
这篇文章主要为大家介绍了RxJava实战之订阅流基本原理示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。

为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。

订阅流

有人会问,小编,你到现在为止,只是讲了流程,而没有讲到具体每个中间操作符,在转换的对象里面的方法调用,这个问题,问的特别好!!!

还记得小编开篇说的那句话吗?我们从Rxjava的使用代码入手

private void test() {
	//第一步:just调用
    Observable.just("https://img-blog.csdn.net/20160903083319668")
    //第二步:map调用
            .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    //Bitmap bitmap = downloadImage(s);
                    return null;
                }
            })
            //第三步:subscribeOn、observeOn调用
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            //第四步:subscribe调用
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe() {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Bitmap s) {
                    Log.d(TAG, "onNext s = " + s);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

从上面的样例代码分析、分解,我们明面上看到四个步骤,暂且列下来:

  • 第一步:just调用
  • 第二步:map调用
  • 第三步:subscribeOn、observeOn调用
  • 第四步:subscribe调用

之所以我们没有讲到ObservableObserveOn、ObservableMap、ObservableJust等对象里面的具体方法调用,是因为到目前为止,从使用例子入手,根本就没有调用到,所以我们也就无从分析到。,本节,接下来我们分析subscribe调用,大家就发现,里面的某些方法开始调用上了。

subscribe的解读收下

我们知道,上面的just、map、subscribeOn、observeOn一系列调用下来,依然是一个Observable对象、

Observable是被观察者的意思,subscribe是订阅的意思,Observer是观察者的意思。

大家发现了没有?这里有个问题,这家伙和我们标准的观察者有很大的不同,标准观察者模式,是一种一对多的行为型设计模式,其实就是若干个观察者,将自身的接口引用注册到被观察者内部,被观察者状态发生变更时,遍历内部的list列表,一一通知观察者,如下图

Observable.just("https://img-blog.csdn.net/20160903083319668")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }
            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext s = " + s);
            }
            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

可是我们看上面Rxjava的代码,与标准观察者模式有两点不同

  • 一是被观察者订阅了观察者
  • 二是从使用上看观察者和被观察者的订阅关系是一对一的

上面提出的两点不同,我们一边看源码,一边试着去理解一下。

  • 一对一的通知:因为响应式编程思想的重点在于,一个变化,另外一个要能感知到,那么通过这样变形的观察者模式,去实现一对一的通知,我觉得也没啥问题。
  • 被观察者订阅观察者:这个从理论上讲,就没办法去理解了,对吧,因为你再怎么变形标准观察者模式,那也肯定是观察者订阅被观察者,所以这里我们有必要简单通过源码去了解一下

我们从上面看到just是将传入的T,再次封装为了一个ObservableJust对象

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

我们看一下ObservableJust类代码

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

从上面看到,ObservableJust仅仅是将传入的T封装了一层而已,它继承与Observable抽象类,而Observable抽象类实现了ObservableSource接口

public abstract class Observable<T> implements ObservableSource<T> {

而ObservableSource接口,就是我们外界调用的subscribe订阅方法的源头

public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

所以Observable肯定实现了subscribe方法,我们看一下Observable的subscribe方法干什么了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //对象封装,暂时不是重点,我们跳过
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //判空
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

大家看到这里,其实关键在于,最终调用了一个subscribeActual方法,而这个方法是个啥?在哪里实现的?一看,这玩意原来是Observable类中的一个抽象方法

protected abstract void subscribeActual(Observer&lt;? super T&gt; observer);

所以这里绕回到开头,我们知道just,实际上是将传入的参数T,转换封装为了ObservableJust对象,而ObservableJust继承与 Observable,所以subscribeActual方法它肯定去了

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        //最终这里还是调用了观察者的相应方法
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

小结

大家,发现了没有,这里绕了一圈,最终调用通过Observable的抽象方法subscribeActual的巧妙实现,最终还是观察者订阅了被观察者,被观察者内部最终调用了观察者的具体方法。

这里和标准观察者模式不同的是,被观察者立马去通知了观察者,说直接点,在调用被观察者的订阅方法时,其实就是直接调用了观察者相应的方法,只不过这里通过模板方法模式,巧妙的封装了,好了,Rxjava的观察者模式源码,我们简单理解到这里,我们试着自己去编写实现一下。

也就是订阅流的过程中,是以执行subscribe方法为开始,从右往左执行,这个执行过程中,每个节点,做两件事情

  • 对后面的observer节点,做一层包装代理,变为代理的observerProxy

由于构建流的执行,每个节点实际上拥有上一个节点observable对象的引用,所以执行 source.subscribe(observerProxy)

订阅流讲到现在,大家是否理解了?当然这里没有详细讲解其中ObservableSubscribeOn、ObservableObserveOn中的订阅,如何进行的线程切换,这个并非是不去讲,还是那句老话,饭要一点一点的吃,我们congoing使用方法入手,想要去了解的是Rxjava的整体框架原理。至于线程切换如何实现的?这个留个念想,大家可以认真想一下,不建议大家直接去看源码。我们在Rxjava实践环节,也会带大家一点一点的去实现这个核心功能。

以上就是RxJava实战之订阅流基本原理示例解析的详细内容,更多关于RxJava订阅流基本原理的资料请关注脚本之家其它相关文章!

相关文章

最新评论