我们眼中 RxJava 式的网络请求

诞生 5 年之久的 RxJava,已经不只是一个开源库,可以说它的诞生 改变了我们写代码的方式 ,把它比作「神兵利器」也毫不为过。我们现在已经能看到各式各样名为「最佳实践」的使用教程,如果我们没能用好这把利器,不仅不会发挥它的作用,反而会伤着我们自己。

回顾它的诞生原因,是为了解决 回调地狱 (callback hell) 以及 麻烦的线程切换 。在 Android 开发中,哪个地方最会出现 多层的回调嵌套 以及 频繁的线程切换 呢?对!没错!是「网络请求」。所以 RxJava、Retrofit 这俩兄弟总会一起出现的,我们项目中关于 RxJava 的使用,也几乎都和网络请求相关。

过去的经验

最初我们对 RxJava + Retrofit 的使用经验都是来源于 RxJava 与 Retrofit 结合的最佳实践 这篇文章,相信大家都看过。这篇文章中的基本封装思想是: 订阅每个网络请求的流,将流的订阅结果再通过回调的方式返给流(也就是网络请求)的创建者。 如下所示:

//HttpMethod
public void getTopMovie(final ResultListener listener, int start, int count){
     movieService.getTopMovie(start, count)
         .subscribeOn(Schedulers.io())
         .unsubscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Subscriber(){
            @Override
            public void onStart() {}

            @Override
            public void onNext(Subject t) {
                 listener.onNext(t)
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onCompleted() {}

        })
}

//Activity
HttpMethods.getInstance().getTopMovie(new ResultListener(){
    @Override
    public void onNext(Subject t){
        //handle result
    }
}, 0, 10)

有什么问题?

这种封装方式,对于初步的使用以及简单的项目,是没有问题的。但是遇到复杂一点的网络请求,它的扩展性就不那么灵活了:

  1. 多个连续的网络请求怎么写?按照上面的那种封装方式,我们有两种选择
    1. 拆解这个请求,在 subscribe 之前通过 flatMap 发起第二个或者第三个网络请求。这种写法肯定会影响项目中已有的外部调用。
    2. onNext 中发起第二个请求,再在第二个网络请求的 onNext 中发起第三个网络请求……这一层又一层的回调嵌套,正是用 RxJava 所能解决、避免的这样写,我们就又回到了最初的原点。
  2. 怎么取消网络请求?不取消,意味着内存泄露的风险。

回到 RxJava 本身

RxJava 提供给我们的、我们所中意的强大之处在哪?在于它的「操作符」, mapflatMapzip 等等,甚至线程的切换 subscribeOnobserveOn 也是操作符。RxJava 的各种强大的功能就是通过各式各样的「操作符」实现的。

操作符操作的是什么?流。流( ObservableFlowable )是 RxJava 的基本单位。所以一套链式请求拆开应该是这样的:


我们眼中 RxJava 式的网络请求

所以说,网络请求库对外提供网络请求的结果应该是以「流」的形式进行提供:

  • 单个网络请求,对外提供单个「流」
  • 多个网络请求,将多个网络请求结果流通过「操作符」组合成一个「流」对外提供
  • 持久化:网络请求结果流和持久化的缓存流,总能通过「操作符」组合成一个对外提供的结果「流」

我们需要背压吗?

当生产者大于消费者,


则市场价格会降低,
则会产生背压问题(Backpressure)。解决背压有很多种策略,RxJava2 中的
Flowable 天然支持背压。所以
Flowable

这个万金油,不管三七二十一,直接拿来用是没有问题的。

但是, 网络请求,会产生背压问题吗?不会,为了防止抬杠,可以说大部分情况下是不会的。 网络请求的每一个流,即用即走,上游的生产者( Request )和下游的消费者( Responese ),永远是一对一的关系,不会出现连续的事件流。杀鸡焉用牛刀,所以我们可以退一步,改用 Observable

网络请求不会出现连续的事件流,在 onNext 出现之后, onComplete 马上就会被调用,所以只需要这两者中的一个就够了,也就不用考虑 Observable ,同样 Maybe 也是可以排除的。

剩下的也就只有 SingleCompletable 了,相对于 SingleCompletable 没有 mapflatMap 方法。所以需要进一步处理网络请求结果的我们,可以选择使用 Single

抛出异常

网络请求过程,协议层的异常会自动抛至 onError() ,如 404、503 错误。对于如下 有请求结果但无目标请求数据 ,我们也应当作为异常来处理:

{
    "code": "6002",
    "msg": "公钥为空"
}

毕竟这样的请求结果,是后端经过异常处理返回给我们的。

假定我们的请求结果是这样的范式:

data CommonResult<T>(
    var code: Int = 0,
    var data: T? = null,
    var message: String? = null
)

我们活用 RxJava 的操作符,用 map 来处理请求到的 ResponseBody (这也是前面选择 Single 的原因),为了便于复用,可以定义一个这样的 mapper:

class CommonResultMapper<T> : Function<CommonResult<T>, T> {
    override fun apply(t: CommonResult<T>): T {
        val data = t.data
        if (t.code == SUCCESS_CODE && data != null) {
            return data
        } else {
            //抛出异常
            throw Throwable("请求 $t 失败")
        }
    }
}

使用这个定义好的 mapper:

@GET(PUSH_URL)
fun fetchTag(@Query("udid") udid: String): Single<CommonResult<Tag>>

fun fetchTagResult(udid: String): Single<Tag> {
    return netService.fetchTag(udid)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .map(CommonResultMapper())
}

如果你愿意,你还可以将 线程切换数据处理 结合在一起,使用 RxJava 的 Transformer

//定义一个 transformer
fun <T> resultTransformer(): SingleTransformer<CommonResult<T>, T> {
    return SingleTransformer { single ->
        single.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(CommonResultMapper())
    }
}

//使用
fun fetchTagResult(udid: String): Single<Tag> {
    return netService.fetchTag(udid)
        .compose(resultTransformer())
}

使用这样封装,结果归结果,异常归异常。

fetchTagResult("123321")
    .subscribeBy(
        onSuccess = { tag ->
            //结果
        },
        onError = { e ->
            //异常
        }
    )

回顾上图中的 对内封装对外可见 ,在得到真正想要的网络请求结果之前,需要一直保持 对内封装 的状态。因此,如果需要同时或者按顺序发起多个网络请求,那么就应该在 对内封装 中进行操作,例如可以使用 flatMap 按顺序发起第二个网络请求:

fun fetchUserSingle(tag: Tag): Single<User> {
    return netService.fetchUser(tag)
}

fun fetchUserResult(udid: String): Single<User> {
    return netService.fetchTag(udid)
        .compose(resultTransformer())
        .flatMap{ tag ->
            //使用第一个请求的结果作为第二个请求的参数
            return@flatMap fetchUserSingle(tag)
        }
}

无论如何,善用 操作符 ,我们的代码总会是「链式」的。

取消网络请求与内存泄漏

最后还需要关注一下这里的内存泄漏问题,在 Activity 销毁时,要及时取消掉这些已经失去上下文意义的网络请求。这里我们及时 unsubscribe 就好了。

同时在管理生命周期方面,也有更成熟的方案: RxLifecycle 。

以上

关注公众号,了解更多:point_down:

我们眼中 RxJava 式的网络请求

原文 

https://qiantao.one/2019/02/25/best-practice-in-rxjava-network-method/

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 我们眼中 RxJava 式的网络请求

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址