Observable 、 Flowable 、 Subject Observer 、 Subscrption Observable#subscribe 才开始请求上游发送数据。当下游请求 dispose() 停止通知上游停止发送。 Rxjava1 开始就有人说Rxjava可以看作流水线,上游怎么加工对于下游来说是无感知的,下游只要负责接收响应对应数据事件就行。 对于rxajva的思考,可以参考一下:Rxjava沉思录系列和 Rxjava主要负责人系列博客
一般cold Observable创建都是通过 just 、 create 、 fromXX 、 just 创建的。最简单粗暴的创建方式:
Observable.create<String> { it.onNext("") }.subscribe()
复制代码
//[仅关注点相关代码]
//ObservableOnSubscribe仅一个subscribe方法
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins这是一个全局Hook,#onAssembly不实现默认直接返回
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final void subscribe(Observer<? super T> observer) {
try {
........
//真正调用subscribe的实现
subscribeActual(observer);
}
......
}
//整个Observable唯一的抽象方法,由子Observable实现,通过这个方法将上游和下游关联起来
protected abstract void subscribeActual(Observer<? super T> observer);
}
复制代码
由 Observable#create 真正返回的是 ObservableCreate ,当调用 Observable#subscribe 才真正通知上游 Observable 开始发送数据。其实质是通过 #subscribeActual 将上下游建立联系,并调用上游 #subscribe (在 ObservableCreate 中就是 ObservableOnSubscribe#subscribe )方法通知上游,下游已订阅可以开始发送数据。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//source(上游)即Observable#create传入的ObservableOnSubscribe
//这里就将上下游真正的联系了起来。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
.....
}
}
复制代码
所以实质就是下游通知上游,下游已产生订阅触发上游下发数据/事件,上游再通过下发数据/事件,最终下游通过指定方法响应上游下发的数据/事件。所以一开始说的流水线方式就可以理解了。
因为每次下游产生一次订阅都会通知到上游的 #subscribe ,所以如果上游只在 #subscribe 中去创建初始数据源就可以每个做到不同下游的数据不关联
Observable.create<String> { it.onNext("") }.subscribe() 流程图如下:
//Observable#map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//**ObservableMap将上游Observable和当前的转换Function建立联系
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//将下游包装成MapObserver,并将MapObserver和上游建立联系
//这样上游下发时,先通过MapObserver处理才下发给真正的Observer
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
....
U v;
try {
//通过Function获取到map后的数据
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch ....
//向下游下发数据
actual.onNext(v);
}
...
}
}
复制代码
可以看到 map 操作符的作用就是通过将上游拦截返回 ObservableMap 提供给下游订阅,并在map上游返回数据前通过 mapper 将上游数据转化并下发给下游。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//emmmm,是不是点眼熟
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//onSubscribe()方法执行在 订阅处所在的线程
s.onSubscribe(parent);
//将上游放入scheduler中调用,且立即执行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
//scheduler#scheduleDirect中执行完后
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//该方法调用已经在scheduler中调用
source.subscribe(parent);
}
}
}
复制代码
由源码可以看出由 scheduler.scheduleDirect -> SubscribeTask#run -> SubscribeOnObserver#subscribe(observer) 将整个调度切换到指定线程中。
因为订阅是用下自上的,所以 subscribeOn 也总是离源最近的一个生效。因为触发源的 subscribe 是离源最近一个。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
可以看出Rxjava的操作符套路基本是将源 Observable 通过装饰者模式封装一层再返回新的Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;//默认false
final int bufferSize;//一般128
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建主线程调度器
Scheduler.Worker w = scheduler.createWorker();
//关联上下游,触发上游订阅过程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
复制代码
这里可以看出 ObservableObserveOn 还是很简单的,上游订阅过程并不用关心,下游的触发则由 ObserveOnObserver 处理。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//上游数据的缓存队列
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
......
//创建对接缓存数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游onSubscribe
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {//执行过complete/error则done为true
return;
}
if (sourceMode != QueueDisposable.ASYNC) {//非异步数据,默认同步数据
queue.offer(t);//入队列
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;//标记已完成
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;//标记已完成
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
//自旋+1,!=0则表示worker.schedule已在执行无需在调度
if (getAndIncrement() == 0) {
worker.schedule(this);//通过调度器处理,将数据取出下发到下游
}
}
@Override
public void run() {
if (outputFused) {//默认false
drainFused();
} else {
drainNormal();//取出数据下发
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (; ; ) {
//检测是否不用再处理
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (; ; ) {
boolean d = done;
T v;
try {
v = q.poll();//取出一个数据
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {//可能已经提前disposed了
return;
}
if (empty) {//数据为空队列无数据,退出下发循环
break;
}
//下发
a.onNext(v);
}
//可能有错过的schedule,再次循环检测
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//检测是否compelte/error/队列已空
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {//已经disposed
queue.clear();
return true;
}
if (d) {//是否已结束
Throwable e = error;
if (delayError) {//延迟error,等待队列清空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);//下发error
worker.dispose();
return true;
} else if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
复制代码
ObserveOnObserver 继承于 BasicIntQueueDisposable 继承于 AtomicInteger ,通过自身的原子性(自旋/CAS)来消除多线程对 #schedule 的调用。
可以看出 #observeOn 只对下游有影响。