众所周知RxJava有许多优点比如强大的链式调用,方便的线程调度,但是我对其原理还是了解的太少了,因此打算阅读下源码,先从一个最基本的例子开始
这个例子只是为了示例,正常情况下也不会这么写
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onNext(t: Int) {
println("onNext $t")
}
override fun onError(e: Throwable) {
println("onError")
}
})
}
复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 1 onNext 2 onNext 3 onComplete 复制代码
那么为什么会按这个顺序输出呢?从代码中也可以看出从始至终也只调用了create、subscribe两个方法,先来看看create的源码
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
复制代码
可以看出当我们没有设置onObservableAssembly时其实就是直接创建了一个ObservableCreate实例返回,接着看看subscribe
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
复制代码
可以看到内部就是调用了subscribeActual方法,而这个方法是个抽象方法,ObservableCreate实现了该方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
复制代码
内部主要就是先创建了一个CreateEmitter实例,然后调用observer的onSubscribe方法,最后再调用source的subscribe方法,这就解释了onObserverSubscribe和onSourceSubscribe的输出,而source的subscribe方法又调用了三次onNext方法和一次onComplete方法,先看看onNext
// CreateEmitter.java
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
复制代码
如果还没dispose那么直接就调用了observer的onNext,这也就解释了onNext 1、onNext 2、onNext 3三个输出接着看onComplete
// CreateEmitter.java
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
复制代码
如果还没dispose就直接调用observer的onComplete,直接解释了onComplete的输出,我们注意到Observer还有一个onError回调,该方法可以通过调用emitter.onError手动触发
// CreateEmitter.java
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
复制代码
可以看到当还没被dispose就会调用到observer的onError方法,至此这个基本demo的源码已经分析完毕。 总结下上述代码其实就分为如下几步
上述实例的整体流程图如下
下面来从源码的角度研究研究RxJava中的几个基本方法
首先从最基本的map方法开始
map方法将每个onNext事件都调用所传入的Function实例的apply方法来达到转化数据源的效果,如下图所示
实例代码如下所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.map {
it + 1
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 2 onNext 3 onNext 4 onComplete 复制代码
很明显map方法会对所有的next的数据做一次变化这里是加1,接着看看map的源码实现
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码
内部创建了一个ObservableMap实例并将当前的Observable实例和Function实例传入,根据本文一开始的分析当调用Observable的subscribe方法其实会调用subscribeActual方法
// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码
创建了MapObserver实例将Observer实例进行包装然后调用source.subscribe,这个source其实就是上一级Observable实例本例中对应ObservableCreate实例,接着根据上文的分析会调用该MapObserver实例的onNext三次然后调用一次onComplete
// MapObserver.java
public void onNext(T t) {
// 初始化的时候done为false
if (done) {
return;
}
// 初始化的时候就是NONE
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
复制代码
我们可以看到内部调用了mapper.apply方法,接着将拿到的结果v当做参数调用downstream的onNext方法,注意这里的downStream就是外界创建的一个Observer对象。上述实例是整体流程图如下。注:绿色框表示对象,蓝色框表示方法调用,括号内为简称
综上我们可以知道map通过代理下游Observer实例完成数据转换,接着看看flatMap的源码实现
flatMap方法用于将上游的每一个onNext事件都转换成一个Observable实例,如下图所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.flatMap {
Observable.just(it, it + 1)
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
复制代码
输出结果:
onObserverSubscribe onSourceSubscribe onNext 1 onNext 2 onNext 2 onNext 3 onNext 3 onNext 4 onComplete 复制代码
很显然flatMap将每一个事件比如1转换成一个拥有1、2两个事件的Observable实例,来看看其源码实现
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
默认delayErrors为false表示当一个事件出现异常就会停止整个事件序列,默认并发数为Int的最大值,默认缓存大小为128,然后根据这些参数和当前Observable实例构建出一个ObservableFlatMap实例,我们看看其subscribeActual方法
// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
内部又通过这些参数和下游的Observer实例构建了一个MergeObserver实例,直接看看其onSubscribe方法
// MergeObserver.java
public void onSubscribe(Disposable d) {
// 只会回调一次下游的onSubscribe方法
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
复制代码
如果已经有上游了就不做任何处理不然进行上游的赋值,然后回调了下游也就是自定义的那个Observer的onSubscribe方法,接着看看其onNext方法是怎么把一个输入源转化成一个Observable的
public void onNext(T t) {
ObservableSource<? extends U> p;
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
subscribeInner(p);
}
复制代码
先是调用了传入的apply方法将每个onNext数据源转化为Observable实例,接着调用subscribeInner方法
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
...
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
复制代码
为每个Observable对象都创建了一个InnerObserver实例,然后将其放入到一个数组中去,最后调用subscribe方法进行订阅,由于apply方法返回了一个ObservableFromArray实例,所以看看其subscribeActual方法
// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
复制代码
observer指代InnerObserver,看看其onSubscribe方法
public void onSubscribe(Disposable d) {
// 第一次调用会返回true,d就是FromArrayDisposable实例其派生自QueueDisposable
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
QueueDisposable<U> qd = (QueueDisposable<U>) d;
// 相当于传入了7
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
// 很明显7 & 1 != 0
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
复制代码
这里暂时还没法理解这个fusionMode(混合模式)是干什么用的,接着会调用到MergeObserver的drain方法
void drain() {
// 只会执行一次,循环将所有事件取出
if (getAndIncrement() == 0) {
drainLoop();
}
}
// 当取消了或者出现了错误并其dealyErrors为false时会将所有InnerObserver都dispose掉
boolean checkTerminate() {
if (cancelled) {
return true;
}
Throwable e = errors.get();
if (!delayErrors && (e != null)) {
disposeAll();
e = errors.terminate();
if (e != ExceptionHelper.TERMINATED) {
downstream.onError(e);
}
return true;
}
return false;
}
void drainLoop() {
// 这里的downstream就是外界自定义的Observer实例
final Observer<? super U> child = this.downstream;
for (;;) {
...
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
o = q.poll();
} catch (Throwable ex) {
...
if (checkTerminate()) {
return;
}
continue sourceLoop;
}
// 每取出一个便会调用下游的onNext方法
child.onNext(o);
}
// 会把一个Observable源所有的数据都取完了以后才会进入下一个
if (o == null) {
break;
}
}
...
}
..
}
}
复制代码
drainLoop内部会从数组中一个个取出InnerObserver实例,并取出所对应的数据源然后每取出一个回调下游Observer的onNext方法,下面用一张图来总结下实例的流程