转载

RxJava概览

处理完相应的操作,将结果通知被观察者

Observable observable  = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("are");
                emitter.onNext("you");
                emitter.onNext("ok");
                emitter.onComplete();
            }
        });
复制代码

Observer观察者

接收被观察者的通知

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            //                d.dispose();  //取消订阅
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");

            }
        };

				
 observable.subscribe(observer);
复制代码

Schedulers线程调度器

调度程序执行的线程

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread() Android的主线程
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("are");
                emitter.onNext("you");
                emitter.onNext("ok");
                emitter.onComplete();
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
                //                d.dispose();  //取消订阅
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");

            }
        });
复制代码

Operator操作符

创建事件序列的方法
create
interval
defer
emptyNeverError
repeat
timer
from
just
range



变换操作
mapCast
flatMap2contactMap
flatMapExample
flatMapIterable
buffer
groupBy
scan
window



过滤操作/条件操作符
filter
element
distinct
skip
take
ignoreElements
debounce
ofType
all
contains
isEmpty
defaultIfEmpty
amb



组合操作
concat
merge
startWith
zip
combineLast
reduce
collect
count



功能操作符/辅助操作
delay
doSeries
retry
subscribeOn
observeOn



RxKotlin扩展库
rkExExample



额外其他
compose

复制代码
原文  https://juejin.im/post/5d8229b4e51d4561db5e3b03
正文到此结束
Loading...