转载

Rxjava 中的源码解析(二)线程切换

在 Android 开发中,使用最多就是把耗时任务放到其他线程去执行,其他线程执行完了后,就切换到 UI 线程(也叫主线程)进行接收数据;常用代码如下:

声明一个 Observer

class IntegerObserver : Observer<Int> {
    override fun onError(e: Throwable?) {
        Log.d("Observable", "onError: $e")
    }
    override fun onCompleted() {
        Log.d("Observable", "onCompleted:" + Thread.currentThread())
        Log.d(tag, "onCompleted")
    }
    override fun onNext(t: Int) {
        Log.d("Observable", "onNext: $t")
    }
}
复制代码

实例代码

var observable: Observable<Int> = Observable.create {
        Log.d("Observable", "call:" + Thread.currentThread())
        it.onNext(1)
        it.onNext(2)
        it.onNext(3)
        it.onCompleted()
    } // 1.生成一个 observer
    observable = observable.subscribeOn(Schedulers.io())// 2.工作线程池设置,
    observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看
    var subscription: Subscription = observable.subscribe(IntegerObserver())// 4.触发任务(订阅开始)
    subscription.isUnsubscribed

复制代码

打印结果:

D/Observable: call:Thread[RxIoScheduler-2,5,main]
D/Observable: onNext: 1
D/Observable: onNext: 2
D/Observable: onNext: 3
D/Observable: onCompleted:Thread[main,5,main]
D/Observable: onCompleted
复制代码

以上这种模式大家都非常熟悉,那么我想从源码里面去看看,为什么,他是怎么做到线程之间的切换的呢?代码中 14 上一篇文章已经说明过了; 为了便于理解, 我把上一篇文章中的主要几个角色的图再贴一下; 命名: 图一

Rxjava 中的源码解析(二)线程切换
好了那么我们就来分析一下源码中 23

;

一、分析一下 "2.工作线程池设置" 代码

1.1 源码分析

做这个代码分析的时候,为了便于理解,可以先把 3 这个代码注释掉;

observable = observable.subscribeOn(Schedulers.io()) // 2.工作线程池设置
//observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看
复制代码

图一 中我们可以看出来,创建一个 Observerable 需要一个 OnSubscribe ;以上这个地方创建 Observable 主要是需要 rx.internal.operators.OperatorSubscribeOn 这么一个 OnSubscribe对象,然后我们可以看下这个OperatorSubscribeOn 这个对象的源码:

Rxjava 中的源码解析(二)线程切换

之前设置的 scheduler 就是在这里的,然后在看 call 方法中会创建 Woker inner , 这个 inner 执行 schedule, 就会在异步线程执行到 source.unsafeSubscribe() ;

Rxjava 中的源码解析(二)线程切换
代码的 8666 行,这里执行的是 onSubscribe.call() 方法,这个方法其实就是会执行到自己的 1 这个地方的代码; 1 代码执行完后,又会回调执行到 OperatorSubscribeOn 中的第50行这里的代码,这里代码就是会执行onNext(), 这样就完整的走完一个异步任务的过程;这个线程切换最关键的代码在 OperatorSubscribeOn 中;是由这个 OperatorSubscribeOn

call() 方法去触发异步线程工作的;

1.2 结论

从 1.1 上分析得到一个结论: subscribeOn 这个代码过程中主要是对 OnSubscibe 进行封装;然后在 subscribe() 触发的时候直接执行 OnSubscibe 的 call 方法;call 方法里面参数是 Subscriber, 可以直接执行 Subscriber 的 3 个生命周期方法;这就完成了回调过程;

subcriber() 
  —> OnSubscribe.call(subscriber) 
  —> schedule() 
  —> 内部类的 Subscriber 
  —> 触发内部类中 onNext() 
  —> 触发被内部类包裹的 subscriber.onNext()
复制代码

二、分析一下 "3.把查看数据,放到主线程中看"

2.1 源码分析

observable = observable.observeOn(AndroidSchedulers.mainThread()) //3.把查看数据,放到主线程中看
复制代码

同理以上的 observable 的创建过程需要创建一个 OnSubscribe , 那我们就先看看 observable.observeOn() 创建的 OnSubscribe 创建的对象是由哪个实现类完成, 我们一起来跟踪一下源码。

Rxjava 中的源码解析(二)线程切换

到这里个地方需要注意一下这里有个 lift 方法, lift() 方法接收的参数是 Operator 这个一个重要的角色类; 这里先留放着,等会回来说;我们再看看 lift() 方法;

Rxjava 中的源码解析(二)线程切换

从上面的代码里面,我可以看出来这个 Observable 的创建需要 rx.internal.operators.OnSubscribeLift 这个实现类,这个实现类是 implements OnSubscribe<R> ,所以我们去看下这个类的 call 方法;

Rxjava 中的源码解析(二)线程切换

这里的call 方法里面调用了红色箭头指定的地方,这里看到没有,有个 operator, 这里的 operator 就是之前在 observable.observeOn() 中实例化了一个 rx.internal.operators.OperatorObserveOn 这个类的对象;那我们继续看下这个对象的 call() 方法;

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //
            parent.init(); // 初始化操作
            return parent;
        }
    }
复制代码

这个 call 的作用就是把一个 Subscriber 包装一下,换成另外一个 Subscriber 是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 这是一个内部类,同样的他会被触发 call 方法, 再看下 call() 方法;

Rxjava 中的源码解析(二)线程切换

从上面那个 for 可以看出来,这个想要一个死循环;从 localChild.onNext(localOn.getValue(v)) 这个对于这个 Subscriber 进行传递回调;那这里还有一个问题,就是这个call 怎么回调到主线程中呢?或者怎么进行线程之间的切换呢?在回退一步到 rx.internal.operators.OperatorObserveOn 的 call 方法

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //
            parent.init(); // 初始化操作
            return parent;
        }
    }
复制代码

这里有个 parent.init()

void init() {
            // don't want this code in the constructor because `this` can escape through the 
            // setProducer call
            Subscriber<? super T> localChild = child;
            
            localChild.setProducer(new Producer() {

                @Override
                public void request(long n) {
                    if (n > 0L) {
                        BackpressureUtils.getAndAddRequest(requested, n);
                        schedule(); // 这个地方启动的线程切换操作
                    }
                }

            });
            localChild.add(recursiveScheduler);
            localChild.add(this);
        }
复制代码

从上面这个地方来看,schedule() 就是把接收消息的事情,切换到自己的线程池里面去操作了;这个线程池中主要是 Scheduler.Worker 在工作, Worker 主要是接收 Action0, Action0 的回调就是 call(), 也就是 rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber 中的 call(), 这个 call() 里面有个循环,用来接收和处理所有的消息,代码就是上面说的那个 localChild.onNext(localOn.getValue(v)) 这个调用地方是在循环体内,所以这个地方会被调用多次,就是把值传递给 subsciber.onNext(), 也就是我们的订阅者(Subscriber),我们自己的注册的回调;这个步骤主要是一直在封装 Subscriber;

2.2 结论

从 2.1 代码分析上可以看出来, observable.observeOn() 里面的代码来看,主要也是对 OnSubscibe 进行装饰;装饰完后,然后去触发执行 Subscriber;这里有个特别需要注意的地方,在装饰 OnSubscibe 的时候,引入了一个 Operator 角色;这个角色是 OnSubscribeLift 的必要成员变量, OnSubscribeLift 的 call 方法会触发 Operator 的 call 方法;这个 call 方法里面会包装一个 新的 Subscriber 给 OnSubscribe 使用;归纳一下这个触发过程

subscribe() 
  —> OnSubscribeLift.call() 
  —> hook.onLift(operator).call(o) 
  —> OperatorObserveOn.call() 
  —> rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber. call()  
  —> 创建线程循环接收可能被创建了 
  —> ObserveOnSubscriber.onNext, onComplete, onNext 
  —> work.schedule() 
  -> 这个在循环体内 localChild.onNext(localOn.getValue(v));
复制代码

三、总结

  • Observable 创建必须要有一个OnSubscribe, OnSubscribe 的 call 方法里面传递的必须是一个 Subscriber 对象;
  • 在 rxjava 中 OnSubscribe 会因为多次创建 Observable; 也需要对 OnSubscribe 进行多次创建,OnSubscribe 的 call 方法可以把执行任务放到一个线程池里面进行;
  • 当在 observable.observeOn() 做线程切换的时候,会有一个 Operator 角色出现,这个类的内部类会有一个死循环,用来接听自己的Subscriber 下发 onNext, onComplete, onError 触发监听这些回调函数会触发 schedule(), 这个schedule() 本质会按照一定的条件触发那个死循环,并且把收到的数据在这个死循环里面传递下去;
  • 看源码的过程中,所有的关键代码都在 call() 方法里面,这点很重要;
  • 从目前看源码的感受,最重要是 3 个角色,Observable, OnSubscribe, Subscriber; 大部分情况下,主要是对这3种角色对象,进行装饰;
原文  https://juejin.im/post/5d391c46f265da1bba594877
正文到此结束
Loading...