转载

深入理解 RxJava2:揭秘 subscribeOn(3)

前言

欢迎来到深入理解 RxJava2 系列第三篇。在上一篇中,我们详细地介绍了 Scheduler 与 Worker 的概念,并分析了 ComputationSchedulerIoScheduler 的实现,以帮助大家加深理解。本篇文章将基于 Scheduler ,来和大家分享 RxJava2 非常重要的概念:线程操作符。顺带提一下,本系列文章所有内容如不特别说明,均是指 Flowable 相关的概念,因为这是 RxJava2 遵循 RS 的实现。

定义

Scheduler 相关操作符

RxJava 有很多基于 Scheduler 的操作符,如 timerintervaldebounce 等,但是笔者认为这些操作符与 subscribeOnunsubscribeOnobserveOn 有本质上的区别。

其他的操作符,把 Scheduler 当做了计时工具,而 Scheduler 的调度导致线程切换是其附带属性,其核心是操作符本身的特性,如:

  • buffer / window 按照时间段缓存数据
  • throttle / debounce / throttle / skip 按照时间段采样数据
  • timer/interval 按照时间段产生数据
  • delay 延迟数据
  • ...

线程操作符

因此笔者定义狭义上的线程操作符,其目的是为了改变上下游的某些操作所在的线程。更严格的说法是,其目的是将上下游的某些操作由目标 Scheduler 调度执行,因为某些 Scheduler 的调度并不一定会切换线程,如 Schedulers.trampoline() 。虽然如此,但是我们还是称之为线程操作符,因为通常我们的本意是为了切换线程。

以下是所有的线程操作符:

  • subscribeOn:调度上游的 Flowablesubscribe 方法,可能会调度上游 Subscriptionrequest 方法
  • unsubscribeOn:调度上游的 Subscriptioncancel 方法
  • observeOn:调度下游 SubscriberonNext / onError / onComplete 方法

详解

通常 subscribeOnobserveOn 更受大家关注一些,因为 unsubscribeOn 使用的场景很少。因此本文就不会再花费过多笔墨在 unsubscribeOn 上,而且这个操作符本身的实现就非常简单,诸位一览便知。

subscribeOn

subscribeOn 顾名思义,改变了上游的 subscribe 所在的线程。在传统的 Observable 中,只是改变了 Observable.subscribe 所在的线程,而在 Flowable 中不仅如此,还同样的改变了 Subscription.request 所在的线程。

这里就涉及到 subscribeOn 设计的用途,它最主要的目标是改变发射数据源的线程。因此在 Observable 中数据的发射,也就是耗时操作一般在 subscribe 所在的线程(这里不考虑在 onSubscribe 后内部开线程异步回调的情况)。

而在 RS 的规范中数据的回调是由消费者主动调用 Subscription.request 来触发的,因此在 Flowable 的实现中也要处理 request 的情况。

Asynchronous 数据源

上面我们提到 RS 的规范中由消费者主动调用 Subscription.request 来触发回调数据,但是有些数据是异步产生的,可能在 subscribe 的一刻或者在那之前,譬如下面 2 个 API:

create

create 方法接受 FlowableOnSubscribe 作为真正的数据源。这个方法其实相比 RxJava1 已经做了很大的限制,通过封装了一层来支持 Backpressure。

关于此方法的细节,不再详细介绍,笔者之前有写过一篇文章分析过这个方法 《Rx2:小create,大文章》 ,有兴趣的读者可以去看看。

但是即便封装后支持了 Backpressure,背压的逻辑更多的还是隐藏在操作符内部了,对外部的使用者还是尽量屏蔽了这些细节。 FlowableEmitter 唯一能与 Backpressure 交互的接口仅是 long requested(); ,并不能实时的响应 Subscription.request

unsafeCreate / fromPublisher

这两者是几乎一致的,接受一个 Publisher 作为数据源,外面封了一层 Flowable 代理该 Publisher 对象,通过这种方式来提供 Flowable 的丰富的操作符。

换种角度来看,其实这两个方法更像 RxJava1.x 中的 create 方法。因为数据源是来自 Publisher ,因此使用更加自由与随意。

强与弱

基于上述原因,在 subscribeOn 还提供了第二个参数来控制 request 的调度。

我们看一下方法的签名:

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn)

再看一眼唯一使用该参数的地方:

void requestUpstream(final long n, final Subscription s) {
    if (nonScheduledRequests || Thread.currentThread() == get()) {
        s.request(n);
    } else {
        worker.schedule(new Request(s, n));
    }
}

注意这里 nonScheduledRequests = !requestOn ,该参数的作用就很明显了。

如果 requestOn = true ,确保 Subscription.request 方法一定在目标线程执行。反之 requestOn = false ,则直接在当前线程执行 request

我们再看一下重载的单一参数的方法:

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

这里解释一下 FlowableCreate 是 Flowable.create 方法返回的类名,也就是说除了 create 作为上游的 Flowable,其他都推荐用强调度的方式。为什么单单 create 不可以用强调度呢。

我们用一个例子演示一下:

举例

Flowable.<Integer>create(t -> {
    t.onNext(1);
    Thread.sleep(500);
    t.onNext(2);
    t.onNext(3);
    t.onComplete();
}, BackpressureStrategy.DROP)
// 注释 1 .map(i -> i + 1)
// 注释 2 .subscribeOn(Schedulers.io())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(1);
                new Thread(() -> {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException ignored) {
                    }
                    s.request(2);
                }).start();
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });

我们在 create 中发射了一个 1,延时 500ms,再次发射 2、3,随后结束,但是我们在订阅的时候先请求了 1 个数据,随后延时 100ms 再次请求 2个数据。

按照正常的流程,虽然数据请求延迟 100ms,但是数据发射延迟了 500ms,因而 Subscriber 能正确的收到3个数据:

1
2
3
complete

非常棒,一切都很美好。此时我们把注释 2 处给取消掉,再次执行结果依然同上。

此时我们应该清楚,重载的函数传入的参数是 false。好我们再试一下,但是这次把注释 2 处的代码换成:

.subscribeOn(Schedulers.io(), true)

结果:
1
complete

很意外,2 和 3 去哪了?其实原因很简单,因为我们把参数改成 true 以后, request 方法要被 worker 调度后执行。

我们在 《深入理解 RxJava2:Scheduler(2)》 中强调过, Worker 有一个职责,保证入队的任务是串行执行的,换言之,我们的

t -> {
    t.onNext(1);
    Thread.sleep(500);
    t.onNext(2);
    t.onNext(3);
    t.onComplete();
}

是在 Worker 中执行的,因为这里的函数没有执行完,就无法执行后续的 request 任务。因此在数据发射过程中,上游自始至终都认为下游一开始只请求了一次数据,所以多发射的 2 与 3 就被丢弃了。

不仅如此,我们再把注释 1 与 2 同时取消掉:

.map(i -> i + 1)
.subscribeOn(Schedulers.io())

结果:
2
complete

如果读者能理解笔者上面分享的内容,就能知道是为什么,奥秘就在:

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

subscribeOn 前面增加了 map 操作符后,对象就不再是 FlowableCreate 了,而被 map 封了一层。所以导致 requestOn 错误的判别为 true ,最终导致线程锁住了 request 的个数。

因此别看 subscribeOn 简单,使用起来还是有不少道道的,望大家留心。

线程影响

上面我们提过 subscribeOn 会影响发射数据的线程,从而间接的影响了消费者的消费的线程。

但是,消费线程和生产线程依然是同一个线程,这里从官网取一张示意图:

深入理解 RxJava2:揭秘 subscribeOn(3)

数据产生后在传递给下游的过程中,是不会发生线程切换的,请大家谨记。

结语

笔者本想一起介绍 subscribeOnobserveOn 的,奈何洋洋洒洒地一写便收不住,为了避免文章过长导致读者厌倦, observeOn 以及这两者的结合与对比留待下篇分享。

感觉大家的阅读,欢迎关注笔者公众号,可以第一时间获取更新,同时欢迎留言沟通。

深入理解 RxJava2:揭秘 subscribeOn(3)

原文  http://dieyidezui.com/deep-into-rxjava2-subscribeon/
正文到此结束
Loading...