上一篇说了 rxjava2
的线程池原理,这篇我们来说说 rxjava
的变换。
变换和线程切换算是 rxjava
最关键的两个功能。常见的变换有 map()
, flatMap()
。我们先从 map
方法说起吧。
我们先举一个简单的例子,来看看 map
能做什么:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Function<Student, String>() {
@Override
public String apply(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
复制代码
上面的例子是一个功能,打印一个班级里students的名字。很简单,通过 from
方法对student进行遍历,一个 map
方法将student变换成name,然后下游打印就完事了。我们知道 rxjava2
里面是有很多泛型设定的,如果类型错误是会直接标红。 from
方法返回的下游数据类型是student,而 subscriber
中接收的数据类型必须是String。很显然,这里map就将下游的数据类型进行了变换。
具体在源码中是如何实现的呢?我们先看 map
的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码
还是老样子,抛开判空代码和hock机制,直接看 ObservableMap
类。不过在此之前,先看看 map
方法里面设定的泛型。T是Observable里设定的上游数据类型,map方法会返回一个Observable,这里就将整个链条的数据类型进行了变换。
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码
看过前面的几篇就知道,这里还是老套路,还是装饰器模式,还是创建一个内部处理器 MapObserver
。内部处理器 MapObserver
负责与上游绑定,所以它的处理数据类型仍为T。 ObservableMap
与下游进行绑定订阅,所以 ObservableMap
中数据的类型为R。我们在看 MapObserver
之前,先看看 Function
是什么。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
复制代码
OK,Function是一个接口,只有一个接口方法 apply
。 Function
规定了两个泛型:T、R。其中T是 apply
的参数类型,R是返回值类型。我们在使用过程中,重写 apply
方法进行数据类型变换,然后再用 map
方法插入到整条流水线中,就达到了变换的目的。
下面看看 MapObserver
中具体是怎么实现的:
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
复制代码
很简单, MapObserver
的 onNext
负责处理上游下来的数据,在 onNext
方法中调用 Function
的 apply
方法,将 T
变换为下游需要的 U
(也就是前面的 R
),然后再将数据传递下去,达到变换的目的。
map的使用和源码都很简单,我们来看看 flatMap
的。
还是先用一个简单的例子来看 flatMap
的用途:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Function<Student, Observable<Course>>() {
@Override
public Observable<Course> apply(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
复制代码
产品说功能要改一改,不是打印每个student的名字,而是要打印每个sutdent所有课程名称。正常情况下,我们在 subscriber
中获取到每个student,然后用个for循环进行遍历打印就行,但是 flatMap
可以直接一步搞定。
细心的已经发现,这里的 Function
比较奇怪,它的返回值类型竟然是 Observable
。具体怎么回事,我们看看源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
//这里的delayErrors,maxConcurrency,bufferSize都是默认值。
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
先解释一下, delayErrors
, maxConcurrency
, bufferSize
这几个参数的意义:
delayErrors
表示异常是否需要延迟到所有内部数据都传输完毕后抛出。默认值是 false
。 maxConcurrency
表示最大并发数,默认值为 Integer.MAX_VALUE
。 bufferSize
缓存的内部被观察者事件总数大小,默认值为128.
老样子,我们直接看 ObservableFlatMap
:
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
还是原来的配方,还是原来的味道。我们来看看 MergeObserver
的源码一探究竟:
@Override
public void onNext(T t) {
//调用apply方法,获取到转换的Observable
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
//隐藏了一些判断代码
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//这里会走到else
if (p instanceof Callable) {
...
} else {
//这里新建一个InnerObserver,调用addInner添加到队列中,然后用apply中生成的Observable与之订阅。
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
复制代码
如注释中所示,这里根据上游每一个数据,生成一个 Observable
,然后新建一个 InnerObserver
,将这个 InnerObserver
添加到内部处理器队列中,并将 Observable
与这个 InnerObserver
进行订阅。
我们以 Observable.from()
为例,看看这中间的流程是什么样的。
//from 方法返回一个ObservableFromArray装饰器
public static <T> Observable<T> fromArray(T... items) {
//省略部分判空代码
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
//ObservableFromArray源码
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
//订阅后,创建一个FromArrayDisposable内部类对象
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
//这个方法很关键,我们待会可以看看InnerObserver的onSubscribe方法。
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
//FromArrayDisposable不是一个处理器,他只是一个带简单队列的Disposable
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
// 这里显然是返回同步
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
//poll方法会逐个返回队列中的数据
@Nullable
@Override
public T poll() {
int i = index;
T[] a = array;
if (i != a.length) {
index = i + 1;
return ObjectHelper.requireNonNull(a[i], "The array element is null");
}
return null;
}
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
//在run方法中,开始向下游传递数据。不过这时候已经不重要了,因为在InnerObserver的onSubscribe方法中,已经通过poll方法将队列中的数据都传递出去了。当然这仅仅是在这个示例中是这样
void run() {
T[] a = array;
int n = a.length;
//开始向下游传递数据
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}
复制代码
如上面注释所示, from
方法返回一个简单的 ObservableFromArray
, ObservableFromArray
的 subscribe
中,调用下游处理器的 onSubscribe
方法,然后调用自身的 run
方法。我们看看 InnerObserver
中是怎么处理的:
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
//这里会用一个独特的ID来给每个InnerObserver做标记
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
//FromArrayDisposable满足这个条件
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//由上面FromArrayDisposable的源码可知这里返回SYNC
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
//这里直接将done设置为true,是因为下面的parent.drain()会直接取出所有数据并传递给下游
done = true;
//数据在这其中进行下发和传递
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
//当上游执行到这里时,数据已经被传递完毕了。这里单指这次示例
parent.drain();
}
}
....
}
复制代码
具体的信息都写在上面的注释中,我们直接来看 MergeObserver
的 drain()
方法。
void drain() {
//这里进行判断,确保drainLoop还在执行时不会被再次调用
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
//获取到下游Observer
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
//判断是否有error
if (checkTerminate()) {
return;
}
...
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
int nSources = 0;
...
int innerCompleted = 0;
if (n != 0) {
//初始lastId lastIndex都为0
long startId = lastId;
int index = lastIndex;
...
int j = index;
sourceLoop:
for (int i = 0; i < n; i++) {
//获取到当前InnerObserver
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
//q就是FromArrayDisposable。
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
//在这里循环调取FromArrayDisposable队列中数据,然后传递到下游
o = q.poll();
} catch (Throwable ex) {
....
}
if (o == null) {
break;
}
child.onNext(o);
...
}
}
//前面标记过,在onSubscribe中已经将done设置为true.
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
//由于上面已经将数据处理完毕,这里innerQueue.isEmpty()返回为true。
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
//将该InnerObserver从队列中移除
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
...
//这里与开头getAndIncrement()相呼应,确保drainLoop在执行时不会被再次调用
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
复制代码
OK,整个流程就清晰了,划重点:
flatMap()
是基础装饰器 Observable
的一个方法,参数是一个 Function
,只不过这个 Function
中 apply()
方法返回类型为一个 Observable
。 flatMap()
返回一个 ObservableFlatMap
装饰器对象。 ObservableFlatMap
被订阅后会调用 subscribeActual()
方法,在此方法中,会创建一个内部类 MergeObserver
对象,并将上游装饰器与之订阅。 MergeObserver
在接收到上游数据后,会调用 Function
中 apply()
方法,将数据转换为一个 Observable
,并创建一个内部 InnerObserver
,将这个 InnerObserver
放入队列中,然后将生成的 Observable
与之订阅。 InnerObserver
的 onSubscribe()
方法会直接调用 MergeObserver
的 drain()
方法,将数据全部都直接传递给下游。从而完成整个流程。
观察代码会发现,同步仅仅是 flatMap
的一个简单情况,更复杂的情况在于异步。具体的大家可以去源码里研究一下,毕竟这篇的篇幅已经够长了。下一篇预告一下,我们来看看背压。