关于RxJava在业务上的一些思考

最近在工作中,频繁的使用了Rxjava来解决一些问题,在使用过程中也给予了自己一些思考,如何使用好RxJava,在什么样的场景中才能发挥它更好的作用,如何脱离代码来理解RxJava的工作机制,下面是自己一些浅显的思考。

示例

太多示例喜欢链式的把RxJava的流程表述起来,这个地方我把观察者和订阅者拆开来看。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("123");
            }
        });

 Observer observer = new Observer<String>() {
            ...
            @Override
            public void onNext(String s) {
                Log.i("TAG", "onNext: " + s);
            }
        };

observable.subscribe(observer);
复制代码

这个简单的例子大家应该都知道,只要subcribe产生了订阅,onNext方法将会收到 emitter.onNext("123"); 发射出去的数据

这个地方让我产生思考主要是有一次去吃自助餐,大家在打酸奶的时候,都会拿着一个杯子对准出口,然后按住开关,酸奶就会自动流到杯子中。在这个过程中,我们不妨把酸奶机看做 Observable ,酸奶机里面的酸奶是许许多多的 emitter.onNext("123") ,按住开关的那一刻产生了 subscribe 订阅,然后我们是用杯子 Observer 去接牛奶的,当然,我们还有橙子机、酸梅汤机等,则机子内盛的饮料类型就是 Observable<String> 。我们知道,酸奶机有很多个开关入口,这时候,又来一个人,拿着杯子 Observer 来打牛奶,那么,我和他是一块共享这酸奶机里面的酸奶,我们俩都能接收到酸奶,等我们不需要接酸奶了,我们就dispose关闭开关。

eg:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                //模拟耗时任务
                for (int i = 0; i < 5; i++) {
                    observableEmitter.onNext(""+i);
                }
            }
        }).subscribeOn(Schedulers.io());
//杯子1
observable.subscribe(observer1);
//杯子2
observable.subscribe(observer2);
复制代码

result:

observer1 onNext: 0
observer2 onNext: 0
observer1 onNext: 1
observer2 onNext: 1
observer1 onNext: 2
observer2 onNext: 2
observer1 onNext: 3
observer2 onNext: 3
observer1 onNext: 4
observer2 onNext: 4
复制代码

事件驱动的思考

之前在思考事件驱动这一块,如何更好的通知其他业务组件,业界比较有名的当属EventBus,但EventBus用起来很杂乱无章,当项目规模大起来,业务复杂起来时,都不敢修改这个post,虽然解耦了,但事件变得更乱了,所以,自己重新思考了事件驱动这一块。

鉴于EventBus提供的的思路,我打算用RxJava的方式来实现。以酸奶机为例,当前页面我想订阅一个事件,等待被触发,我完全可以先准备一个杯子(Observer),然后将他们存到一个集合里面,待酸奶机(Observable)里面有酸奶了(observableEmitter.onNext),然后订阅(subcribe)这个杯子的集合,将酸奶倒到杯子里,鉴于此思路,用代码大致的实现下。

List<Observer> list = new ArrayList<>();
    
    //注册事件
    public void registerObserver(Observer observer) {
        list.add(observer);
    }

    //驱动事件
    public void postEvent() {
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("123");
            }
        });

        for (int i = 0; i < list.size(); i++) {
            observable.subscribeOn(Schedulers.io()).subscribe(list.get(i));
        }
    }

    @Test
    public void Test() {
        Observer observer = new Observer<String>() {
            ...
            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

        };
        //注册事件
        registerObserver(observer);

        //发送事件
        postEvent();
    }
复制代码

这里只给了大致的思路,用了一个平时都可见的例子来实现了事件驱动。

异步回调的思考

最近有一个业务场景,需要监听RTK当前状态的变化,业务场景是:

关于RxJava在业务上的一些思考

有一个前提,RTK必须先设置账户,才能使用后续的服务。如果用户按照应用准则走,进入主页后,先去设置页面设置RTK,然后回到主页,进入任务执行页,这时候监听RTK state是可用的;但如果用户忘了设置的步骤,或是有强迫症的用户,我就不往你提示的方式走,我就要先进任务执行员页,这时候RTK State监听是不可用的,我们会引导用户进入设置页面设置RTK,这个地方又要分情况,如果用户设置好RTK账户,然后返回了任务页,那么任务页的RTK state监听到了用户设置了账户就会返回可用,那么这次任务是可使用的;但如果用户设置好了账户,想去诊断RTK当前连接的状态的话,RTK state监听事件就会被诊断RTK页面给设置,也就意味着,任务页的RTK state监听就会失效,那么返回任务页的话,任务页是不会有任何反应的。但这一块也是有解决办法的,就是任务页的RTK state监听放在 onResume 方法里面,即设置页面返回任务页后,触发任务页的 onResume 方法,重新夺回RTK state的监听。 办法都是有的,但依赖生命周期去做到这点,感觉并不是特别的可靠,我们可以参考上面事件驱动的例子,将RTK state看成是酸奶机,然后哪个页面(杯子)需要RTK state信息的话,就可以订阅(subcribe)酸奶机,如果想要牛奶的话,就post一个信息出去,告诉酸奶机我要酸奶,下面,我给出一份示例:

Set<Observer> set = new HashSet<>();
    //模拟一个RTK state 单例
    public Observable getObservableInstance() {
        return Observable.create(new ObservableOnSubscribe<RTKState>() {
            @Override
            public void subscribe(ObservableEmitter<RTKState> emitter) throws Exception {
                RTK rtk = DjiSettingUtils.getRTK();
                rtk.setStateCallback(new RTKState.Callback() {
                    @Override
                    public void onUpdate(@NonNull RTKState rtkState) {
                        emitter.onNext(rtkState);
                    }
                });
            }
        });
    }

    //驱动事件
    public void postEvent(Observer observer) {
        if (!set.contains(observer)) {
            getObservableInstance().subscribeOn(Schedulers.io()).subscribe(observer);
        }
    }

    @Test
    public void onCreate() {
        Observer observer = new Observer<RTKState>() {
            ...
            @Override
            public void onNext(RTKState s) {
                System.out.println("onNext: " + s.isRTKBeingUsed());
            }
        };
        //发送事件
        postEvent(observer);
    }
复制代码

之后,我们只需要关注 onCreate 方法,在任务页我们发起一个订阅事件,接收RTK state信息,在诊断页面也发起一个订阅,接收RTK信息,这样就不会像上面那样,抢断监听事件的问题。

多图上传的思考

业务场景中有需要从无人机中读取缩略图,并将缩略图上传至服务器图片上传我们使用的是七牛,因为一次任务产生的缩略图非常多,基本上都在百张左右,我们不可能为了在上传过程中,因为某些原因导致了断开了,让用户重新上传所有的缩略图,所以,我们打算让百张缩略图采用顺序上传,当哪个节点发生错误的时候,记住index,等用户点击重新上传时,我们再从index的位置继续上传,如果按照传统方式来做的话,第一张上传成功后,如何通知第二张上传呢,我这里给个大致的代码:

List<File> list=new ArrayList<>();
int index=0;

public void uploadPic(){
    uploadManager.put(list.get(index), key, token, new UpCompletionHandler() {
                @Override
                public void complete(String key, ResponseInfo info, JSONObject res) {
                    if (info.isOK()) {
                       index++;
                       uploadPic()
                    } else {
                        //弹框提示用户,当前index上传失败
                    }
                }
            }, null);
}  
@Test
public void test(){
    uploadPic()
}


复制代码

每次上传成功后都调用自身的方法,如果上传失败了,则记住index的位置,提示用户,用户点击重试上传,那么就继续调用 uploadPic 方法,上传的拿到的文件还是从index位置开始拿,所以,也是没有任何问题的。 但是,总觉得这么设计不那么优雅,比如我想知道上传进度的话,那也就意味着我需要在index++方法下面加一个设置进度条的功能,那如果业务需要再加一个上传完成的操作的话,那是不是又要在index++下面多加一个 index==list.size() 的判断呢,其实,这样设计下去的话,整个上传功能就变得特别的松散,移植性也不强,所以,是时候发挥RxJava的 Observer 了。

鉴于异步回调的思考,我打算把上传任务封装成一个 ObservableOnSubcribe ,每次执行任务成功后,就将事件流onNext交给下游,告诉他我完成了一次上传,如果上传失败了,则发射onError异常。

public class QiNiuBitmapOnSubscribe implements ObservableOnSubscribe<QiniuParam> {
    ...
    @Override
    public void subscribe(final ObservableEmitter<QiniuParam> emitter) throws Exception {
        //上传操作
        uploadManager.put(file, key, token, new UpCompletionHandler() {
            @Override
            public void complete(String key, ResponseInfo info, JSONObject res) {
                if (info.isOK()) {
                    emitter.onNext(new QiniuParam(key, info, res));
                    emitter.onComplete();
                } else {
                    emitter.onError(new ServerException(-1, res.toString()));
                }
            }
        }, null);
    }
}
复制代码

由于图片是存储在一个集合中,那么就肯定要用到RxJava的 fromIterable遍历集合,由于需要保证图片是有序上传,就需要用到 concatMap 操作符 , 所以,大致代码如下

Observable.fromIterable(fileList)
                .concatMap(new Function<QiNiuFile, ObservableSource<QiniuParam>>() {
                    @Override
                    public ObservableSource<QiniuParam> apply(QiNiuFile qiniuFile) throws Exception {
                    //返回七牛云上传
                    return Observable.create(new QiNiuFileOnSubscribe(uploadManager,
                                    qiniuFile.getFile(), qiniuFile.getKey(), qiniuFile.getUploadToken()));
                    }
                }).subscribe(new Observer<QiniuParam>() {
             ...
            @Override
            public void onNext(QiniuParam qiniuParam) {
                index++;
                //通知上传进度
                uploadCallBack.onUploadProcess(index);
            }

            @Override
            public void onError(Throwable e) {
                //通知断传的位置
                uploadCallBack.onUploadQiNiuError(index);
            }

            @Override
            public void onComplete() {
               //上传成功
                uploadCallBack.onUploadQiNiuComplete();
            }
        });
复制代码

对于 Observer 来说,他是一个干净的接收流,他不关心上游发生的事情,只专注结果的处理。

思考

以上思考有的地方可能不是特别的完善,还需要多思考,RxJava用的人确实很多,但要想玩的溜的话,确实任重而道远。

原文 

https://juejin.im/post/5bffabe3e51d453f321947a1

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 关于RxJava在业务上的一些思考

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址