博客主页
RxJava 的条件操作符主要包括以下几个:
RxJava 的布尔操作符主要包括:
判定 Observable 发射的所有数据是否都满足某个条件
传递一个谓词函数给 all 操作符,这个函数接受原始 Observable 发射的数据,根据计算返回一个布尔值。 all 返回一个只发射单个布尔值的 Observable,如果原始 Observable 正常终止并且每一项数据都满足条件,就返回 true。如果原始 Observabl 的任意一项数据不满足条件,就返回false
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 10;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Error: " + throwable);
}
});
// 执行结果
Success: true
判断 Observable 发射的所有数据是否都大于 3
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 3;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Error: " + throwable);
}
});
// 执行结果
Success: false
判定一个 Observable 是否发射了一个特定的值
给 contains 传一个指定的值,如果原始 Observable 发射了那个值,那么返回的 Observable 将发射 true,否则发射 false 。与它相关的一个操作符是 isEmpty ,用于判定原始 Observable 是否未发射任何数据。
Observable.just(2, 30, 22, 5, 60, 1)
.contains(22)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Error: " + throwable);
}
});
// 执行结果
Success: true
给定两个或多个 Observable ,它只发射首先发射数据或通知的那个 Observable 的所有数据
当传递多个 Observable 给 amb 时,它只发射其中一个 Observable 数据和通知: 首先发送通知给 amb 的那个 Observable ,不管发射的是一项数据 ,还是一个 onError 或 onCompleted 通知。 amb 忽略和丢弃其他所有 Observables 的发射物。
在 RxJava 中, amb 还有一个类似的操作符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等价的
在 RxJava 2.x 中, amb 需要传递 Iterable 对象,或者使用 ambArray 来传递可变参数。
Observable.ambArray(
Observable.just(1, 2, 3),
Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Next: " + integer);
}
});
// 执行结果
Next: 1
Next: 2
Next: 3
修改一下代码,第一个 Observable 延迟 ls 后再发射数据
Observable.ambArray(
Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS),
Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Next: " + integer);
}
});
// 执行结果
Next: 4
Next: 5
Next: 6
由于第一个 Observable 延迟发射,因此我们只消费了第二个 Observable 的数据,第一个 Observable 发射的数据就不再处理了。
发射来自原始 Observable 值,如果原始 Observable 没有发射任何值,就发射一个默认值
defaultIfEmpty 简单精确地发射原始 Observable 的值,如果原始 Observable 没有发射任何数据,就正常终止(以 onComplete 形式了),那么 defaultlfEmpty 返回的 Observable 就发射一个我们提供的默认值。
Observable.empty()
.defaultIfEmpty(8)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "Next: " + o);
}
});
// 执行结果
Next: 8
在 defaultIfEmpty 方法内部,其实调用的是 switchIfEmpty 操作符,源码如下:
public final Observable<T> defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
return switchIfEmpty(just(defaultItem));
}
defaultIfEmpty 和 switchIfEmpty 的区别是, defaultIfEmpty 操作符只能在被观察者不发送数据时发送一个默认的数据 ,如果想要发送更多数据,则可以使用 switchIfEmpty 操作符,发送自定义的被观察者作为替代。
Observable.empty()
.switchIfEmpty(Observable.just(1, 2, 3))
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "Next: " + o);
}
});
// 执行结果
Next: 1
Next: 2
Next: 3
判定两个 Observable 是否发射相同的数据序列
传递两个 Observable 给 sequenceEqual 操作符时,它会比较两个 Observable 发射物,如果两个序列相同(相同的数据,相同的顺序,相同的终止状态〉 ,则发射 true 否则发射 false
Observable.sequenceEqual(
Observable.just(1, 2, 3, 4, 5),
Observable.just(1, 2, 3, 4, 5)
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
});
// 执行结果
Success: true
将两个 Observable 改成不一致
Observable.sequenceEqual(
Observable.just(1, 2, 3, 4, 5),
Observable.just(1, 2, 3, 4, 5, 6)
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
});
// 执行结果
Success: false
sequenceEqual 还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。对于复杂对象的比较,用三个参数的版本更为合适。
Observable.sequenceEqual(
Observable.just(1, 2, 3, 4, 5),
Observable.just(1, 2, 3, 4, 5),
new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer integer, Integer integer2) throws Exception {
return integer == integer2;
}
}
).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "Success: " + aBoolean);
}
});
// 执行结果
Success: true
丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据
skipUntil 订阅原始的 Observable,但是忽略它的发射物,直到第二个 Observable 发射一项数据那一刻,它才开始发射原始的 Observabl。 skipUntil 默认不在任何特定的调度器上执行。
Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS)
.skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "Next: " + aLong);
}
});
// 执行结果
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
上述代码,原始的 Observable 发射 1 到 9 这 9 个数 ,初始延迟时间是 0,每间隔 lms。由于使用 skipUntil,因此它会发射原始 Observable 在 3ms 之后的数据。
丢弃 Observable 发射的数据,直到一个指定的条件不成立。
skipWhile 订阅原始的 Observable ,但是忽略它的发射物,直到指定的某个条件变为 false。它才开始发射原始的 Observable。skipWhile 默认不在任何特定的调度器上执行
Observable.just(1, 2, 3, 4, 5)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer <= 2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Next: " + integer);
}
});
// 执行结果
Next: 3
Next: 4
Next: 5
当第二个 Observable 发射了一项数据或者终止时,丢弃原始 Observable 发射的任何数据
takeUntil 订阅并开始发射原始 Observable ,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,则 takeUntil 返回的 Observable 会停止发射原始 Observable 并终止。
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer == 4;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Next: " + integer);
}
});
// 执行结果
Next: 1
Next: 2
Next: 3
Next: 4
发射原始 Observable 发射的数据,直到一个指定的条件不成立
takeWhile 发射原始的 Observable 直到某个指定的条件不成立,它会立即停止发射原始 Observable,并终止自己的 Observable。
RxJava 中的 takeWhile 操作符返回一个原始 Observable 行为的 Observable,直到某项数据,指定的函数返回 false ,这个新的 Observable 就会发射 onComplete 终止通知
Observable.just(1, 2, 3, 4, 5, 6)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer <= 2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Next: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Error: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Complete.");
}
});
// 执行结果
Next: 1
Next: 2
Complete.