处理完相应的操作,将结果通知被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("are");
emitter.onNext("you");
emitter.onNext("ok");
emitter.onComplete();
}
});
复制代码
接收被观察者的通知
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
// d.dispose(); //取消订阅
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
observable.subscribe(observer);
复制代码
调度程序执行的线程
| 调度器类型 | 效果 |
|---|---|
| Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
| Schedulers.from(executor) | 使用指定的Executor作为调度器 |
| Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
| Schedulers.newThread( ) | 为每个任务创建一个新线程 |
| Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
| AndroidSchedulers.mainThread() | Android的主线程 |
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("are");
emitter.onNext("you");
emitter.onNext("ok");
emitter.onComplete();
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
// d.dispose(); //取消订阅
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
复制代码
创建事件序列的方法 create interval defer emptyNeverError repeat timer from just range 变换操作 mapCast flatMap2contactMap flatMapExample flatMapIterable buffer groupBy scan window 过滤操作/条件操作符 filter element distinct skip take ignoreElements debounce ofType all contains isEmpty defaultIfEmpty amb 组合操作 concat merge startWith zip combineLast reduce collect count 功能操作符/辅助操作 delay doSeries retry subscribeOn observeOn RxKotlin扩展库 rkExExample 额外其他 compose 复制代码