转载

RxJava2源码初探

众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始

一、例子

这个例子只是为了示例,正常情况下也不会这么写

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
        override fun onError(e: Throwable) {
            println("onError")
        } 
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
复制代码

那么为什么会按这个顺序输出呢?从代码中也可以看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
复制代码

可以看出当我们没有设置onObservableAssembly时其实就是直接创建了一个ObservableCreate实例返回,接着看看subscribe

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);
}
复制代码

可以看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    source.subscribe(parent);
}
复制代码

内部主要就是先创建了一个CreateEmitter实例,然后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNext

// CreateEmitter.java
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
复制代码

如果还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 1、onNext 2、onNext 3三个输出接着看onComplete

// CreateEmitter.java
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}
复制代码

如果还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,我们注意到Observer还有一个onError回调,该方法可以通过调用emitter.onError手动触发

// CreateEmitter.java
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}
复制代码

可以看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为如下几步

  1. 创建Observable实例
  2. 调用该实例的subscribeActual方法
  3. 回调observer的onSubscribe方法
  4. 调用source的subscribe方法
  5. 上述的subscribe方法内部可以执行若干次onNext,最多一个onError,最多一次onComplete

上述实例的整体流程图如下

RxJava2源码初探

下面来从源码的角度研究研究RxJava中的几个基本方法

二、基本方法

首先从最基本的map方法开始

1. map

map方法将每个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,如下图所示

RxJava2源码初探

实例代码如下所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .map {
       it + 1
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onError(e: Throwable) {
            println("onError")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
复制代码

很明显map方法会对所有的next的数据做一次变化这里是加1,接着看看map的源码实现

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码

内部创建了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法

// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码

创建了MapObserver实例将Observer实例进行包装然后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次然后调用一次onComplete

// MapObserver.java
public void onNext(T t) {
    // 初始化的时候done为false
    if (done) {
        return;
    }
    // 初始化的时候就是NONE
    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}
复制代码

我们可以看到内部调用了mapper.apply方法,接着将拿到的结果v当做参数调用downstream的onNext方法,注意这里的downStream就是外界创建的一个Observer对象。上述实例是整体流程图如下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称

RxJava2源码初探

综上我们可以知道map通过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现

2. flatMap

flatMap方法用于将上游的每一个onNext事件都转换成一个Observable实例,如下图所示

RxJava2源码初探
fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .flatMap {
        Observable.just(it, it + 1)
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }

        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }

        override fun onError(e: Throwable) {
            println("onError")
        }

        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
复制代码

输出结果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
复制代码

很显然flatMap将每一个事件比如1转换成一个拥有1、2两个事件的Observable实例,来看看其源码实现

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

默认delayErrors为false表示当一个事件出现异常就会停止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,然后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,我们看看其subscribeActual方法

// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码

内部又通过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法

// MergeObserver.java
public void onSubscribe(Disposable d) {
    // 只会回调一次下游的onSubscribe方法
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);
    }
}
复制代码

如果已经有上游了就不做任何处理不然进行上游的赋值,然后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的

public void onNext(T t) {
    ObservableSource<? extends U> p;
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    subscribeInner(p);
}
复制代码

先是调用了传入的apply方法将每个onNext数据源转化为Observable实例,接着调用subscribeInner方法

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
复制代码

为每个Observable对象都创建了一个InnerObserver实例,然后将其放入到一个数组中去,最后调用subscribe方法进行订阅,由于apply方法返回了一个ObservableFromArray实例,所以看看其subscribeActual方法

// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
    observer.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}
复制代码

observer指代InnerObserver,看看其onSubscribe方法

public void onSubscribe(Disposable d) {
    // 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable
    if (DisposableHelper.setOnce(this, d)) {
        if (d instanceof QueueDisposable) {
            QueueDisposable<U> qd = (QueueDisposable<U>) d;
            // 相当于传入了7
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
    // 很明显7 & 1 != 0
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}
复制代码

这里暂时还没法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法

void drain() {
    // 只会执行一次,循环将所有事件取出
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// 当取消了或者出现了错误并其dealyErrors为false时会将所有InnerObserver都dispose掉
boolean checkTerminate() {
    if (cancelled) {
        return true;
    }
    Throwable e = errors.get();
    if (!delayErrors && (e != null)) {
        disposeAll();
        e = errors.terminate();
        if (e != ExceptionHelper.TERMINATED) {
            downstream.onError(e);
        }
        return true;
    }
    return false;
}
void drainLoop() {
    // 这里的downstream就是外界自定义的Observer实例
    final Observer<? super U> child = this.downstream;
    for (;;) {
        ...
        for (int i = 0; i < n; i++) {
            if (checkTerminate()) {
                return;
            }
            InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
            SimpleQueue<U> q = is.queue;
            if (q != null) {
                for (;;) {
                    U o;
                    try {
                        o = q.poll();
                    } catch (Throwable ex) {
                        ...
                        if (checkTerminate()) {
                            return;
                        }
                        continue sourceLoop;
                    }
                    // 每取出一个便会调用下游的onNext方法
                    child.onNext(o);
                }
                // 会把一个Observable源所有的数据都取完了以后才会进入下一个
                if (o == null) {
                    break;
                }
            }
            ...
        }
        ..
    }
}
复制代码

drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源然后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程

RxJava2源码初探
原文  https://juejin.im/post/5ceb8ee2f265da1b6f435332
正文到此结束
Loading...