转载

Rxjava(七):条件操作符和布尔操作符

博客主页

RxJava 的条件操作符主要包括以下几个:

  • amb :给定多个 Observable ,只让第一个发射数据的 Observable 发射全部数据
  • defaultlfEmpty :发射来自原始 Observable 的数据,如果原始 Observable 没有发射数据,则发射一个默认数据
  • skipUntil :丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据
  • skipWhile :丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据
  • takeUntil :发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知
  • takeWhile and takeWhileWithIndex:发射原始 Observable 的数据,直到一个特定的条件为真,然后跳过剩余的数据

RxJava 的布尔操作符主要包括:

  • all :判断是否所有的数据项都满足某个条件
  • contains :判断 Observable 是否会发射一个指定的值
  • exists and isEmpty :判断 Observable 是否发射了一个值
  • sequenceEqual :判断两个 Observables 发射的序列是否相等

1. all、contains 和 amb

1.1 all 操作符

判定 Observable 发射的所有数据是否都满足某个条件

Rxjava(七):条件操作符和布尔操作符

传递一个谓词函数给 all 操作符,这个函数接受原始 Observable 发射的数据,根据计算返回一个布尔值。 all 返回一个只发射单个布尔值的 Observable,如果原始 Observable 正常终止并且每一项数据都满足条件,就返回 true。如果原始 Observabl 的任意一项数据不满足条件,就返回false

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 10;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 执行结果
 Success: true

判断 Observable 发射的所有数据是否都大于 3

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 3;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 执行结果
 Success: false

1.2 contains 操作符

判定一个 Observable 是否发射了一个特定的值

Rxjava(七):条件操作符和布尔操作符

给 contains 传一个指定的值,如果原始 Observable 发射了那个值,那么返回的 Observable 将发射 true,否则发射 false 。与它相关的一个操作符是 isEmpty ,用于判定原始 Observable 是否未发射任何数据。

Observable.just(2, 30, 22, 5, 60, 1)
        .contains(22)
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG, "Success: " + aBoolean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        });

// 执行结果
 Success: true

1.3 amb 操作符

给定两个或多个 Observable ,它只发射首先发射数据或通知的那个 Observable 的所有数据

Rxjava(七):条件操作符和布尔操作符

当传递多个 Observable 给 amb 时,它只发射其中一个 Observable 数据和通知: 首先发送通知给 amb 的那个 Observable ,不管发射的是一项数据 ,还是一个 onError 或 onCompleted 通知。 amb 忽略和丢弃其他所有 Observables 的发射物。

在 RxJava 中, amb 还有一个类似的操作符 ambWith。 例如, Observable.amb(ol, o2 )和

ol.ambWith(o2)是等价的

在 RxJava 2.x 中, amb 需要传递 Iterable 对象,或者使用 ambArray 来传递可变参数。

Observable.ambArray(
        Observable.just(1, 2, 3),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 1
 Next: 2
 Next: 3

修改一下代码,第一个 Observable 延迟 ls 后再发射数据

Observable.ambArray(
        Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 4
 Next: 5
 Next: 6

由于第一个 Observable 延迟发射,因此我们只消费了第二个 Observable 的数据,第一个 Observable 发射的数据就不再处理了。

2. defaultlfEmpty

发射来自原始 Observable 值,如果原始 Observable 没有发射任何值,就发射一个默认值

Rxjava(七):条件操作符和布尔操作符

defaultIfEmpty 简单精确地发射原始 Observable 的值,如果原始 Observable 没有发射任何数据,就正常终止(以 onComplete 形式了),那么 defaultlfEmpty 返回的 Observable 就发射一个我们提供的默认值。

Observable.empty()
        .defaultIfEmpty(8)
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 执行结果
 Next: 8

在 defaultIfEmpty 方法内部,其实调用的是 switchIfEmpty 操作符,源码如下:

public final Observable<T> defaultIfEmpty(T defaultItem) {
    ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
    return switchIfEmpty(just(defaultItem));
}

defaultIfEmpty 和 switchIfEmpty 的区别是, defaultIfEmpty 操作符只能在被观察者不发送数据时发送一个默认的数据 ,如果想要发送更多数据,则可以使用 switchIfEmpty 操作符,发送自定义的被观察者作为替代。

Observable.empty()
        .switchIfEmpty(Observable.just(1, 2, 3))
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 执行结果
 Next: 1
 Next: 2
 Next: 3

3. sequenceEqual

判定两个 Observable 是否发射相同的数据序列

Rxjava(七):条件操作符和布尔操作符

传递两个 Observable 给 sequenceEqual 操作符时,它会比较两个 Observable 发射物,如果两个序列相同(相同的数据,相同的顺序,相同的终止状态〉 ,则发射 true 否则发射 false

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: true

将两个 Observable 改成不一致

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5, 6)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: false

sequenceEqual 还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。对于复杂对象的比较,用三个参数的版本更为合适。

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5),
        new BiPredicate<Integer, Integer>() {
            @Override
            public boolean test(Integer integer, Integer integer2) throws Exception {
                return integer == integer2;
            }
        }
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 执行结果
 Success: true

4. skipUntil 和 skipWhile

4.1 skipUntil 操作符

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据

Rxjava(七):条件操作符和布尔操作符

skipUntil 订阅原始的 Observable,但是忽略它的发射物,直到第二个 Observable 发射一项数据那一刻,它才开始发射原始的 Observabl。 skipUntil 默认不在任何特定的调度器上执行。

Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS)
        .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 执行结果
 Next: 4
 Next: 5
 Next: 6
 Next: 7
 Next: 8
 Next: 9

上述代码,原始的 Observable 发射 1 到 9 这 9 个数 ,初始延迟时间是 0,每间隔 lms。由于使用 skipUntil,因此它会发射原始 Observable 在 3ms 之后的数据。

4.2 skipWhile 操作符

丢弃 Observable 发射的数据,直到一个指定的条件不成立。

Rxjava(七):条件操作符和布尔操作符

skipWhile 订阅原始的 Observable ,但是忽略它的发射物,直到指定的某个条件变为 false。它才开始发射原始的 Observable。skipWhile 默认不在任何特定的调度器上执行

Observable.just(1, 2, 3, 4, 5)
        .skipWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 3
 Next: 4
 Next: 5

5. takeUntil 和 takeWhile

5.1 takeUntil 操作符

当第二个 Observable 发射了一项数据或者终止时,丢弃原始 Observable 发射的任何数据

Rxjava(七):条件操作符和布尔操作符

takeUntil 订阅并开始发射原始 Observable ,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,则 takeUntil 返回的 Observable 会停止发射原始 Observable 并终止。

Observable.just(1, 2, 3, 4, 5, 6)
        .takeUntil(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer == 4;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 执行结果
 Next: 1
 Next: 2
 Next: 3
 Next: 4

5.2 takeWhile 操作符

发射原始 Observable 发射的数据,直到一个指定的条件不成立

Rxjava(七):条件操作符和布尔操作符

takeWhile 发射原始的 Observable 直到某个指定的条件不成立,它会立即停止发射原始 Observable,并终止自己的 Observable。

RxJava 中的 takeWhile 操作符返回一个原始 Observable 行为的 Observable,直到某项数据,指定的函数返回 false ,这个新的 Observable 就会发射 onComplete 终止通知

Observable.just(1, 2, 3, 4, 5, 6)
        .takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
 Next: 1
 Next: 2
 Complete.

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

原文  https://segmentfault.com/a/1190000021602935
正文到此结束
Loading...