转载

RxJava(九):背压

博客主页

1. 背压

在 RxJava 中, 会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压( Back Pressure )场景。

在 RxJava 官方的维基百科中关于 Back Pressure 是这样描述的:

In ReactiveX it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or observer can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.

Back Pressure 经常被翻译为背压, 背压的字面意思比较晦涩,难以理解。它是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的 buffer 溢出,这种现象叫作背压。

首先,背压必须是在异步的场景下才会出现,即被观察者和观察者处于不同的线程中。

其次, RxJava 是基于 Push 模型 。对于 Pull 模型而言,当消费者请求数据的时候,如果生产者比较慢 ,则消费者会阻塞等待。如果生产者比较快,生产者会等待消费者处理完后再生产新的数据,所以不会出现背压的情况。然而在 RxJava 中,只要生产者数据准备好了就会发射出去。如果生产者比较慢,则消费者会等待新的数据到来。如果生产者比较快,则会有很多数据发射给消费者,而不管消费者当前有没有能力处理数据,这样就会导致背压。

最后,在 RxJava 2.x 中,只有新增的 Flowable 类型是支持背压的,并且 Flowable 很多操作

符内部都使用了背压策略,从而避免过多的数据填满内部的队列。

在 RxJava l.x 中,有很多事件因为不能被正确地背压,从而抛出

MissingBackpressureException 。在 RxJava l.x 中的 observeOn ,由于切换了观察者的线程,因此内部实现用队列存储事件。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; ; i++) {
            subscriber.onNext(i);
        }
    }
}).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next: " + integer);
            }
        });

这段代码其实并不会产生背压,只会出现 ANR (application not responding),因为被观察者和订阅者处在同一个线程中,只有二者不在同一个线程时,才会出现背压。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; ; i++) {
                    subscriber.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "Next: " + integer);
                    }
                });

// 执行结果
Caused by: rx.exceptions.MissingBackpressureException

修改完之后立即引起了 App Crash,查看日志之后发现,出现 MissingBackpressureException 异常。

在 RxJava l.x 中, Observable 是支持背压的,从 Observable 的源码中可以看到,在 RxJava

1.x 中的 Buffer 的大小只有 16

public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
    return buffer(boundary, 16);
}

也就是说,刚才的代码无须发无限次,只要发 17 次就可以引起异常。下面的代码将原先的无数次改成了 17 次。

果然可以抛出 MissingBackpressureException ,符合预期。如果把 17 改成 16 ,则程序可以正常运行,打印出 0-15。

在 RxJava l.x 中,不是所有的 Observable 都支持背压。我们知 Observable 有 Hot 和 Cold 之分。 Rx.Java 1.x 中, Hot Observables 是不支持背压的,而 Cold Observables 中也有一部分不支持背压。

2. RxJava 2.x 的背压策略

在 RxJava 2.x 中, Observable 不再支持背压,而是改用 Flowable 来专门支持背压。默认队列大小为 128 ,并且要求所有的操作符强制支持背压。

从 BackpressureStrategy 的源码可以看到, Flowable 一共有 5 种背压策略:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

2.1 MISSING

此策略表示,通过 Create 方法创建的 Flowable 没有指定背压策略,不会对通过 OnNext 发射的数据做缓存或丢弃处理,需要下游通过背压操作符 (onBackpressureBuffer/onBackpressureDrop/onBackpressureLatest)指定背压策略。

2.2 ERROR

此策略表示,如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 129; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

// 执行结果
Caused by: io.reactivex.exceptions.MissingBackpressureException

运行这段代码后,会立刻引起 App Crash ,查看 LogCat 之后发现,出现 MissingBackpressureException 异常

因为 Flowable 的默认队列是 128, 所以将上述代码的 129 改成 128 程序就可以正常运行了。

2.3 BUFFER

此策略表示, Flowable 的异步缓存池同 Observable 的一样,没有固定大小,可以无限制添加数据,不会 MissingBackpressureException 异常 但会导致 OOM

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; ; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中运行的话只会引起 ANR。

2.4 DROP

此策略表示,如果 Flowable 的异步缓存池满了,则会丢掉将要放入缓存池中的数据。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 129; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.DROP)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中运行这段代码,不会引起 Crash ,但只会打印 0~127,第 128 则被丢弃,因 Flowable 的内部队列己经满了。

2.5 LATEST

此策略表示,如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点与 DROP 策略一样,不同的是,不管缓存池的状态如何, LATEST 策略会将最后一条数据强行放入缓存池中。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 1000; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.LATEST)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中运行这段代码,也不会引起 Crash,并且会打印出 0-127 以及 999。因为 999 是最后一条数据。

Flowable 不仅可以通过 create 创建时需要指定背压策略,还可以在通过其他创建操作符,例如 just、fromArray 等创建后通过背压操作符指定背压策略。例如, onBackpressureBuffer() 对应

BackpressureStrategy.BUFFER , onBackpressureDrop() 对应 BackpressureStrategy.DROP ,

onBackpressureLatest() 对应 BackpressureStrategy.LATEST

Flowable.interval(1, TimeUnit.SECONDS)
        .onBackpressureBuffer()
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

原文  https://segmentfault.com/a/1190000021606688
正文到此结束
Loading...