转载

[译]RxJava 的全面介绍:Observable 类型、背压、错误处理

RxJava 是一个不断更新的工具库,适用于除 Android 以外的许多平台的开发人员(如: RxSwift )。 RxJava 最大的优势是 以不使用回调的方式处理异步操作

相反, ObservablesObservers 结合使用来发射数据(一次或多次),并且还可以通过各自包含的方法来处理每次数据发射时要做的事情。

什么是 Observable 和 Observer

val myObservable = Observable.just(1, 2, 3)
 
val myObserver = myObservable.subscribe { receivedNumber ->
    Log.d(LOG_TAG, "Received Number $receivedNumber")
}
复制代码

Observable– 发射数据流并与接受改数据的 Observer 一起工作的对象。 Observer – 订阅一个 Observable 让它开始发射数据,并处理接受数据时要做的事情。

在上面的例子中,Observable.just(1, 2, 3) 将按照顺序发射整数 1, 2, 3。 Observer 一旦订阅 Observable ,将以相同的顺序接受这些数字。

Received Number 1
Received Number 2
Received Number 3
复制代码

Observable 的生命周期

Observable 有两个重要的方法,来处理传入的数据。

  • onNext - 每当发射新数据时调用,正如你在上面示例中的 lambda 函数看到的一样(在 subscribe 之后调用)。

  • onComplete - 在没有更多数据发射时调用。顾名思义,数据流完全发射完毕。

不同类型的 Observable

最基本的 Observable 会发射连续的数据流,直到调用 onComplete 为止,这并不总是你想要的。你可能想发射 单个值 ,或者发射一个 无法接受该值的值 ,亦或是 在执行没有返回值的异步任务后调用其他函数

val mySingle = Single.just(1)
val singleObserver = mySingle.subscribe { data ->
    Log.d(LOG_TAG, "Received $data")
}
 
val myMaybe = Maybe.empty<Int>()
val maybeObserver = myMaybe
        .defaultIfEmpty(1)
        .subscribe { data ->
            Log.d(LOG_TAG, "Received $data")
        }
 
val myCompletable = Completable.complete()
val completableObserver = myCompletable
        .subscribe {
            Log.d(LOG_TAG, "Task completed")
        }
复制代码

Single仅发射一个值。 onNext 只调用一次,并且 onComplete 将立即被调用。

Mayble发射一个或零个值,当发射零个值时,将跳过 onNext 并立即调用 onComplete 。可以使用 defalutIfEmpyty 函数发射默认值。

Completable不发射任何值,你可以像没有返回值的回调一样订阅它。

Flowables 和背压

还有一种类型的 Observable ,它就是 Flowable 。和 Observable 一样, Flowable 也发射连续的数据流,直到完成为止。但有一个关键的区别:

想象一下,上游 Observable 数据发射的速度,大于下游 Observer 处理数据的速度,这就是 背压 。在大部分情况下,将会导致错误发生。 Flowable 是一种包含背压策略的 Observable ,具有当背压发生时,如何处理数据的能力。

val myFlowable = Observable.range(1, 100000).toFlowable(BackpressureStrategy.DROP)
val flowableObserver = myFlowable.subscribe {data ->
    Log.d(LOG_TAG, "Received $data")
}
复制代码

有 5 种不同的背压策略,我们需要了解下:

  • Buffer - 在内存中缓存事件,直到 Observer 可以处理它们。默认情况下,在错误发生之前,缓冲区的大小为 128 个 items 。可以修改缓冲区的大小,但请注意,这将会带来性能上的开销。
  • Drop - 丢弃 Observer 无法处理的事件。
  • Latest - 仅保留最新发射的值,直到 Observer 可以使用它并丢弃其它的值。
  • Error - 如果发生背压,将抛出异常。
  • Missing - 缺乏背压策咯,如果你想在客户端处理背压,你可以使用它(因为背压策咯是由 Observable 创建的)。未能说出策咯会在背压下抛出异常。

Observable vs Flowable

已知的是,当你明确知道发射的数据,不会导致发生背压。你应当使用 Observable ,而不是 Flowable 。老实说,我还没有发现使用 Flowable 的场景,也许 Flowables 将使用额外的内存?

错误处理

没有任何代码可以避免错误,你已经知道,如果不处理背压将导致错误发生。最重要的是,在 Observersubscribe 方法中,由自己的代码发生的异常,都将被视为由 Observer 应处理的错误。

高兴的是, RxJava 包含了几种处理这些错误的方法。

  • doOnError - 当错误发生,只需执行该方法
val observer = myObservable
    .doOnError { Log.e(LOG_TAG, "ErrorOccurred") }
    .subscribe()
复制代码
  • onErrorReturnItem - 如果发生错误,返回一个默认值
val observer = myObservable
    .onErrorReturnItem(0)
    .subscribe()
    }
复制代码
  • onErrorReturn - 和 onErrorReturnItem 一样,但接受一个返回所需数据类型的函数(对于动态默认值)
val observer = myObservable
        .onErrorReturn{ throwable -> throwable.message}
        .subscribe()
复制代码
  • onErrorResumeNext - 如果发生错误,则返回一个默认数据流,也可以为动态数据采取功能。
val observer = myObservable
        .onErrorResumeNext(Observable.just(2, 4, 6))
        .subscribe()
复制代码
  • retry - 如果发生错误,尝试重新订阅 Observable ,你可以设置最多的重试次数,或者设置空值以便无限次重试。
val observer = myObservable
        .retry(3)
        .subscribe()
复制代码

你也可以通过布尔值,来实现 重试条件

val observer = myObservable
        .retry{ integer, throwable -> integer > 0 }
        .subscribe()
复制代码

如果没有使用上述的操作符,在错误发生时,将导致程序崩溃。

结语

到目前为止,此博客 的大部分内容都是关于Firebase的。有充分的理由相信,Google 通过强大的云数据库 、机器学习 和云服务,将推动Firebase 成为最优秀的云解决方案之一,并也为服务器开发打开了新的世界。

虽然我会回到Firebase 上,但从现在开始,我将潜心学习 RxJava 并深入了解它的细节。因为它还有更多的闪光点待我去学习:修复回调地狱、提高行能、线程调度.....

原文  https://juejin.im/post/5c4e57bdf265da6110376748
正文到此结束
Loading...