RxJava2 操作符总结

本文总结项目中常用的 Rxjava2
操作符。

RxJava2 操作符总结

使用RxJava2原因

  • ★★★★★ 封装了线程切换,不用自己封装线程池、Handler了
  • ★★★★☆ 链式调用,一气呵成

单值发射

Single
:大多数场景都是单值发射,所以使用 Single
即可覆盖大部分场景。

Case1. 在非UI线程执行且不关注结果

  • fromCallable
Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Logger.d(TAG, "test: fromCallable() invoked on %s", Thread.currentThread().getName());
                return generateRandom();
            }
        }).subscribeOn(Schedulers.io()).subscribe();
复制代码

从名字 Callable
就能看出,这是个回调函数,在 io 线程运行

  • defer
Single.defer(new Callable<SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> call() throws Exception {
                Logger.d(TAG, "test: defer() invoked on %s", Thread.currentThread().getName());
                return Single.just(generateRandom());
            }
        }).subscribeOn(Schedulers.io()).subscribe();
复制代码

注意,不能直接使用 Single.just(generateRandom()).subscribeOn(Schedulers.io()).subscribe()
,这么写将直接在当前线程调用 generateRandom()
,无法实现在 io 线程执行的效果。

Case2. 在非UI线程执行并关注结果

需要关注结果的场景,建议都订阅 Consumer<Throwable>
,因为 RxJava
内部捕获了 Exception
,导致外部无感知

Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return generateRandom();
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.d(TAG, "test: accept(Throwable throwable) invoked on %s", Thread.currentThread().getName());
            }
        });
复制代码

Case3. 在非UI线程执行但在UI线程关注结果

Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return generateRandom();
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName());
            }
        });
复制代码

Case4. 变换返回值

Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return generateRandom();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return String.valueOf(integer + "_mapped");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Logger.d(TAG, "test: " + s);
            }
        });
复制代码

Case5. 按顺序做某事且下一件事依赖上一件事的结果[常用于网络请求接口依赖时]

Single.defer(new Callable<SingleSource<String>>() {
            @Override
            public SingleSource<String> call() throws Exception {
                return getUserId();
            }
        }).flatMap(new Function<String, SingleSource<UserInfo>>() {
            @Override
            public SingleSource<UserInfo> apply(String userId) throws Exception {
                return getUserInfo(userId);
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Consumer<UserInfo>() {
            @Override
            public void accept(UserInfo userInfo) throws Exception {
                Logger.d(TAG, "test: get userInfo success: " + userInfo);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.e(TAG, "test: get userInfo error.", throwable);
            }
        });
复制代码

Case6. 并发读取不同数据源,转换成同类型后,合并

Single<IBook> novel = Single.fromCallable(new Callable<Novel>() {
            @Override
            public Novel call() throws Exception {
                return getNovel();
            }
        }).map(new Function<Novel, IBook>() {
            @Override
            public IBook apply(Novel novel) throws Exception {
                return new NovelAdapter(novel);
            }
        }).subscribeOn(Schedulers.io());

        Single<IBook> rxJava2Tutorial = Single.fromCallable(new Callable<RxJava2Tutorial>() {
            @Override
            public RxJava2Tutorial call() throws Exception {
                return getRxJava2Tutorial();
            }
        }).map(new Function<RxJava2Tutorial, IBook>() {
            @Override
            public IBook apply(RxJava2Tutorial rxJava2Tutorial) throws Exception {
                return new RxJava2TutorialAdapter(rxJava2Tutorial);
            }
        }).subscribeOn(Schedulers.io());

        Single.zip(novel, rxJava2Tutorial, new BiFunction<IBook, IBook, List<IBook>>() {
            @Override
            public List<IBook> apply(IBook iBook, IBook iBook2) throws Exception {
                List<IBook> books = new ArrayList<>(2);
                books.add(iBook);
                books.add(iBook2);
                return books;
            }
        }).subscribe(new Consumer<List<IBook>>() {
            @Override
            public void accept(List<IBook> iBooks) throws Exception {
                Logger.d(TAG, "test: books are " + iBooks);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Logger.d(TAG, "test: get books error.", throwable);
            }
        });
复制代码

多值发射

Observable
:使用场景较少,比如搜索功能需要不断发射搜索关键字。

Case1. 搜索频率限制

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
                mEditText.addTextChangedListener(new TextWatcher() {
                    @Override
                    public void beforeTextChanged(CharSequence s, int start, int count, int after) {
                    }

                    @Override
                    public void onTextChanged(CharSequence s, int start, int before, int count) {
                    }

                    @Override
                    public void afterTextChanged(Editable s) {
                        if (!emitter.isDisposed()) {
                            emitter.onNext(s.toString().trim());
                        }
                    }
                });
            }
        }).debounce(200, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String keyword) throws Exception {
                        mTextView.setText(search(keyword));
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Logger.e(TAG, "test: emitter keyword error.", throwable);
                    }
                });
复制代码

背压

Flowable
:使用场景最少,目前仅发射下载进度时可以用上。

原文 

https://juejin.im/post/5c2d960af265da61285a3cfc

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » RxJava2 操作符总结

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址