要求:会使用 rxjava 进行日常开发,复杂功能可以通过搜索和查阅官方文档解决即可
想要用 RxJava 必须要在 build.gradle 内加入依赖
implementation 'io.reactivex.rxjava2:rxjava:2.2.6' implementation 'io.reactivex.rxjava2:rxandroid:2.1.1' 复制代码
使用步骤就是:1. 创建被观察者,2. 创建观察者,3. 订阅
// 创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
复制代码
ObservableEmitter emitter 对象是发射器的意思,有三种发射的方法 void onNext(T value)、void onError(Throwable error)、void onComplete(),onNext 方法可以无限调用,Observer(观察者)所有的都能接收到,onError和onComplete是互斥的,Observer(观察者)只能接收到一个,OnComplete 可以重复调用,但是Observer(观察者)只会接收一次,而 onError 不可以重复调用,第二次调用就会报异常。
// 创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
复制代码
onNext、onError、onComplete 都是跟被观察者发射的方法一一对应的,这里就相当于接收了。需要特别说明的是 onSubscribe 方法中的 Disposable 参数,它只有两个方法 dispose() 和 isDisposed(),前者是取消对 Observable(被观察者)的订阅,后面很明显就是查看订阅的状态。
// 订阅 observable.subscribe(observer); 复制代码
为了保证链式编程,这里的逻辑好像是 Observable 订阅了 Observer。其实我认为还有其他的原因,熟悉「观察者模式」的应该知道「具体被观察者」会持有一个容器用来存储观察者,这样才能实现数据更新之后通知所有的观察者。
如果认为 RxJava 只是实现了一个「观察者模式」那就大错特错,其实 RxJava 主要是给我们提供了一个异步编程的工具。
在介绍异步之前,我们先看一下 Observable.just() 方法,这个方法最多可以接受 10 个参数,并且对这些参数依次调用 onNext 方法,执行完 onNext 之后还会调用 onComplete。
public static <T> Observable<T> just(T item1, T item2) 复制代码
其实 RxJava 的异步使用也很简单,在订阅之前给 Observable 加上 subscribeOn(Scheduler scheduler) 和 observeOn(Scheduler scheduler) 描述,subscribeOn() 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。observeOn() 指定 Observer 所运行的线程,或者叫事件消费的线程。
所以简单的使用如下:
Observable.just(11, 22, 33)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Observer 的回调发生在主线程
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
复制代码
由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件内容 11,22,33 将会在 IO 线程发出。而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 Observer 数字的打印将发生在主线程 。
这种策略非常适合大多数的「后台线程处理数据,主线程显示结果」的情况。
所谓变换,就是将事件序列中的对象或者整个序列进行加工处理,转换成不同的事件或者事件序列。
map() 变换 将对象集合转化成对象
Observable.just(1, 2, 3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "I'm " + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
复制代码
在这里面最重要的是 Function 接口,它就一个方法,就是将构造函数中的第一个参数转为第二个参数进行返回。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
复制代码
flatMap() 变换
private void flatMap() {
Log.d(TAG, "flatMap: =============");
List<Person> personList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
List<Plan> plans = new ArrayList<>();
// 计划一
Plan plan1 = new Plan("8:30", "上班");
// 填充准备做的事情
List<String> actions1 = new ArrayList<>();
actions1.add("1打开电脑");
actions1.add("1打开AS");
actions1.add("1打开WX");
plan1.setActionList(actions1);
// 计划二
Plan plan2 = new Plan("12:00", "吃饭");
// 填充准备做的事情
List<String> actions2 = new ArrayList<>();
actions2.add("2下楼");
actions2.add("2买饭");
actions2.add("2开吃");
plan2.setActionList(actions2);
// 计划三
Plan plan3 = new Plan("18:00", "下班");
// 填充准备做的事情
List<String> actions3 = new ArrayList<>();
actions3.add("3关闭WX");
actions3.add("3关闭AS");
actions3.add("3关闭电脑");
plan3.setActionList(actions3);
plans.add(plan1);
plans.add(plan2);
plans.add(plan3);
Person person = new Person("tom" + i, plans);
personList.add(person);
}
Observable.fromIterable(personList)
.flatMap(new Function<Person, ObservableSource<Plan>>() {
@Override
public ObservableSource<Plan> apply(Person person) throws Exception {
if ("tom1".equals(person.getName())) {
return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.SECONDS);
}
return Observable.fromIterable(person.getPlanList());
}
})
.flatMap(new Function<Plan, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Plan plan) throws Exception {
return Observable.fromIterable(plan.getActionList());
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
复制代码