在 Android 开发中,使用最多就是把耗时任务放到其他线程去执行,其他线程执行完了后,就切换到 UI 线程(也叫主线程)进行接收数据;常用代码如下:
class IntegerObserver : Observer<Int> {
override fun onError(e: Throwable?) {
Log.d("Observable", "onError: $e")
}
override fun onCompleted() {
Log.d("Observable", "onCompleted:" + Thread.currentThread())
Log.d(tag, "onCompleted")
}
override fun onNext(t: Int) {
Log.d("Observable", "onNext: $t")
}
}
复制代码
var observable: Observable<Int> = Observable.create {
Log.d("Observable", "call:" + Thread.currentThread())
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onCompleted()
} // 1.生成一个 observer
observable = observable.subscribeOn(Schedulers.io())// 2.工作线程池设置,
observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看
var subscription: Subscription = observable.subscribe(IntegerObserver())// 4.触发任务(订阅开始)
subscription.isUnsubscribed
复制代码
打印结果:
D/Observable: call:Thread[RxIoScheduler-2,5,main] D/Observable: onNext: 1 D/Observable: onNext: 2 D/Observable: onNext: 3 D/Observable: onCompleted:Thread[main,5,main] D/Observable: onCompleted 复制代码
以上这种模式大家都非常熟悉,那么我想从源码里面去看看,为什么,他是怎么做到线程之间的切换的呢?代码中 1 和 4 上一篇文章已经说明过了; 为了便于理解, 我把上一篇文章中的主要几个角色的图再贴一下; 命名: 图一
2 和
3
;
做这个代码分析的时候,为了便于理解,可以先把 3 这个代码注释掉;
observable = observable.subscribeOn(Schedulers.io()) // 2.工作线程池设置 //observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 复制代码
从 图一 中我们可以看出来,创建一个 Observerable 需要一个 OnSubscribe ;以上这个地方创建 Observable 主要是需要 rx.internal.operators.OperatorSubscribeOn 这么一个 OnSubscribe对象,然后我们可以看下这个OperatorSubscribeOn 这个对象的源码:
之前设置的 scheduler 就是在这里的,然后在看 call 方法中会创建 Woker inner , 这个 inner 执行 schedule, 就会在异步线程执行到 source.unsafeSubscribe() ;
1 这个地方的代码;
1 代码执行完后,又会回调执行到
OperatorSubscribeOn 中的第50行这里的代码,这里代码就是会执行onNext(), 这样就完整的走完一个异步任务的过程;这个线程切换最关键的代码在
OperatorSubscribeOn 中;是由这个
OperatorSubscribeOn
call() 方法去触发异步线程工作的;
从 1.1 上分析得到一个结论: subscribeOn 这个代码过程中主要是对 OnSubscibe 进行封装;然后在 subscribe() 触发的时候直接执行 OnSubscibe 的 call 方法;call 方法里面参数是 Subscriber, 可以直接执行 Subscriber 的 3 个生命周期方法;这就完成了回调过程;
subcriber() —> OnSubscribe.call(subscriber) —> schedule() —> 内部类的 Subscriber —> 触发内部类中 onNext() —> 触发被内部类包裹的 subscriber.onNext() 复制代码
observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看 复制代码
同理以上的 observable 的创建过程需要创建一个 OnSubscribe , 那我们就先看看 observable.observeOn() 创建的 OnSubscribe 创建的对象是由哪个实现类完成, 我们一起来跟踪一下源码。
到这里个地方需要注意一下这里有个 lift 方法, lift() 方法接收的参数是 Operator 这个一个重要的角色类; 这里先留放着,等会回来说;我们再看看 lift() 方法;
从上面的代码里面,我可以看出来这个 Observable 的创建需要 rx.internal.operators.OnSubscribeLift 这个实现类,这个实现类是 implements OnSubscribe<R> ,所以我们去看下这个类的 call 方法;
这里的call 方法里面调用了红色箭头指定的地方,这里看到没有,有个 operator, 这里的 operator 就是之前在 observable.observeOn() 中实例化了一个 rx.internal.operators.OperatorObserveOn 这个类的对象;那我们继续看下这个对象的 call() 方法;
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //
parent.init(); // 初始化操作
return parent;
}
}
复制代码
这个 call 的作用就是把一个 Subscriber 包装一下,换成另外一个 Subscriber 是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 这是一个内部类,同样的他会被触发 call 方法, 再看下 call() 方法;
从上面那个 for 可以看出来,这个想要一个死循环;从 localChild.onNext(localOn.getValue(v)) 这个对于这个 Subscriber 进行传递回调;那这里还有一个问题,就是这个call 怎么回调到主线程中呢?或者怎么进行线程之间的切换呢?在回退一步到 rx.internal.operators.OperatorObserveOn 的 call 方法
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //
parent.init(); // 初始化操作
return parent;
}
}
复制代码
这里有个 parent.init()
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule(); // 这个地方启动的线程切换操作
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
复制代码
从上面这个地方来看,schedule() 就是把接收消息的事情,切换到自己的线程池里面去操作了;这个线程池中主要是 Scheduler.Worker 在工作, Worker 主要是接收 Action0, Action0 的回调就是 call(), 也就是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 中的 call(), 这个 call() 里面有个循环,用来接收和处理所有的消息,代码就是上面说的那个 localChild.onNext(localOn.getValue(v)) 这个调用地方是在循环体内,所以这个地方会被调用多次,就是把值传递给 subsciber.onNext(), 也就是我们的订阅者(Subscriber),我们自己的注册的回调;这个步骤主要是一直在封装 Subscriber;
从 2.1 代码分析上可以看出来, observable.observeOn() 里面的代码来看,主要也是对 OnSubscibe 进行装饰;装饰完后,然后去触发执行 Subscriber;这里有个特别需要注意的地方,在装饰 OnSubscibe 的时候,引入了一个 Operator 角色;这个角色是 OnSubscribeLift 的必要成员变量, OnSubscribeLift 的 call 方法会触发 Operator 的 call 方法;这个 call 方法里面会包装一个 新的 Subscriber 给 OnSubscribe 使用;归纳一下这个触发过程
subscribe() —> OnSubscribeLift.call() —> hook.onLift(operator).call(o) —> OperatorObserveOn.call() —> rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber. call() —> 创建线程循环接收可能被创建了 —> ObserveOnSubscriber.onNext, onComplete, onNext —> work.schedule() -> 这个在循环体内 localChild.onNext(localOn.getValue(v)); 复制代码