转载

Rxjava2解析-订阅流程

首先创建一个 observerobservable

new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
复制代码
Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {

            }
        });
复制代码
  • 先看 Observable 的创建过程, create() 只是把传进去 ObservableOnSubscribe 对象包装了一层返回,这里可以忽略

  • 然后是 ObservableOnSubscribe 类的内部

final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        // 1
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 2
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
        // 3 
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

1. 在这里把传进来的ObservableOnSubscribe对象保存起来

2. 这里new了一个Emitter,也就是我们用来onNext,onError的类,然后把Emitter传给observer中,由于CreateEmitter实现了Disposable接口,所以可以看到Observer里有一个 onSubscribe(Disposable d) ,我们可以用来控制流的结束等操作,实际上这个 Disposable 就是源Obserable创建的,

3. 这里把 Emitter 传给source,也就是传给我们new的 ObservableOnSubscribesubscribe()

到这里可以看出最简单的一个订阅流程是什么样的

变化操作

  • 以map操作来看,map变化返回了一个 ObservableMap ,也是一个 Observable
Rxjava2解析-订阅流程
Rxjava2解析-订阅流程
Rxjava2解析-订阅流程
  • 这里其实可以看出,每一个 Observable 都会有一个 subscribeActual() 方法,这个方法在 Observable 调用 subscrible() 之后会被调用,也就是说一个 Observable 在调用 subscrible() 之后,实际上的操作逻辑都是在 subscribeActual() 里面
  • ObservableMapsubscribeActual() 里,调用了 source.subscribe(new MapObserver<T, U>(t, function)); 这句代码, source 是上层的observable,参数为包装过后的 observerMapObserver ,由图三可以看出, MapObserver 内部的 onNext 是往下游传递一个经过 apply() 变化过后的数据,也就达到了我们用 map 变化数据的功能了。
原文  https://juejin.im/post/5d3724946fb9a07ed441420e
正文到此结束
Loading...