RxJava(六):过滤操作符

博客主页

RxJava 的过滤操作符主要包括以下几种:

  • filter:过滤数据
  • takeLast:只发射最后的 N 项数据
  • last :只发射最后一项数据
  • lastOrDefault:只发射最后一项数据,如果 Observable 空,就发射默认值。
  • takeLastBuffer :将最后的 N 项数据当作单个数据发射
  • skip :跳过开始的 N 项数据
  • skipLast:跳过最后的 N 项数据
  • take :只发射开始的 N 项数据
  • first and takeFirst:只发射第一项数据,或者满足某种条件的第一项数据
  • firstOrDefault :只发射第一项数据,如果 Observable 空,就发射默认值
  • elementAt :发射第 N 项数据
  • elementAtOrDefault:发射第 N 项数据,如果 Observable 数据少于 N 项,就发射默认值
  • sample or throttleLast:定期发射 Observable 最近的数据
  • throttleFirst :定期发射 Observable 发射的第一项数据
  • throttleWithTimeout or debounce:只有当 Observable 在指定的时间段后还没有发射数据时,才发射一个数据
  • timeout:如果在一个指定的时间段后还没发射数据,就发射一个异常
  • distinct:过滤掉重复的数据
  • distinctUntilChanged:过滤掉连续重复的数据
  • ofType:只发射指定类型的数据
  • ignoreElements:丢弃所有的正常数据,只发射错误或完成通知

1. first 和 last

1.1 first 操作符

只发射第 一 项(或者满足某个条件的第一项)数据

RxJava(六):过滤操作符

如果只对 Observable 发射的第一项数据,或者满足某个条件的第一项数据感兴趣,那么就可以使用 first 操作符。

在 RxJava 2.x 中,使用 first() 需要一个默认的 Item ,对于 Observable 而言,使用了 first()会返回 Single 类型。

public final Single<T> first(T defaultItem) {
    return elementAt(0L, defaultItem);
}

Observable.just(3, 4, 5)
        .first(8)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        });

// 执行结果
 Next-> 3

如果 Observable 不发射任何数据,那么 first 操作符的默认值就起了作用。

Observable.<Integer>empty()
        .first(8)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        });

// 执行结果
 Next-> 8

在 R.Java 2.x 中,还有 firstElement 操作符表示只取第一个数据,没有默认值。 firstOrError 操作符表示要么能取到第一个数据,要么执行 onError 方法,它们分别返回 Maybe 类型和 Single 类型。

1.2 last 操作符

只发射最后一项(或者满足某个条件的最后一项)数据

RxJava(六):过滤操作符

如果只对 Observable 发射的最后一项数据, 或者满足某个条件的最后一项数据感兴趣,那么就可以使用 last 操作符。

last 操作符跟 first 操作符类似,需要一个默认的 Item ,也是返回 Single 类型。

public final Single<T> last(T defaultItem) {
    ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
    return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
}

Observable.just(3, 4, 5)
        .last(8)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        });

// 执行结果
 Next-> 5

在 RxJava 2.x 中,有 lastElement 操作符和 lastOrError 操作符。

2. take 和 takeLast

2.1 take 操作符

只发射前面的 N 项数据

RxJava(六):过滤操作符

使用 take 操作符可以只修改 Observable 的行为,返回前面的 N 项数据,发射完成通知,忽略剩余的数据

Observable.just(1, 2, 3, 4, 5)
        .take(3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 1
 Next-> 2
 Next-> 3
 Complete.

如果对一个 Observable 使用 take 操作符,而那个 Observabl 发射的数据少于 N 项,那么 take 操作符生成的 Observable 就不会抛出异常或者发射 Error 通知,而是仍然会发射那些数据

Observable.just(1, 2, 3, 4, 5)
        .take(6)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Next-> 5
 Complete.

take 有一个重载方法能够接受一个时长而不是数量参数。它会丢掉发射 Observable 开始的那段时间发射的数据,时长和时间单位通过参数指定。

Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS)
        .take(3, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next-> " + aLong);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 0
 Next-> 1
 Next-> 2
 Complete.

上述代码使用了 intervalRange 操作符表示每隔 ls 会发射一个数据,它们从 0 开始到 9 结束,发射 10 个数据。由于在这里使用了 take 操作符,最后只打印前 3 个数据.

take 的这个重载方法默认在 computation 调度器上执行,也可以使用参数来指定其他调度器。

2.2 takeLast 操作符

发射 Observable 发射的最后 N 项数据

RxJava(六):过滤操作符

使用 takeLast 操作符修改原始 Observable,我们可以只发射 Observable 发射的最后 N 项数据,忽略前面的数据。

Observable.just(1, 2, 3, 4, 5)
        .takeLast(3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 3
 Next-> 4
 Next-> 5
 Complete.

同样,如果对一个 Observable 使用 takeLast(n) 操作符,而那个 Observable 发射的数据少于 N 项,那么 takeLast 操作符生成的 Observable 不会抛出异常或者发射 onError 通知,而是仍然发射那些数据。

takeLast 也有一个重载方法能够接受一个时长而不是数量参数。它会发射在原始 Observable 生命周期内最后一段时间发射的数据,时长和时间单位通过参数指定。

Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS)
        .takeLast(3, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next-> " + aLong);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 7
 Next-> 8
 Next-> 9
 Complete.

3. skip 和 skipLast

3.1 skip 操作符

抑制 Observable 发射的前 N 项数据

RxJava(六):过滤操作符

使用 skip 操作符,可以忽略 Observable 发射的前 N 项数据,只保留之后的数据

Observable.just(1, 2, 3, 4, 5)
        .skip(3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 4
 Next-> 5
 Complete.

skip 有一个重载方法能够接受一个时长而不是数量参数。它会丢弃原始 Observable 开始那段时间发射的数据,时长和时间单位通过参数指定。

Observable.interval(1, TimeUnit.SECONDS)
        .skip(3, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next-> " + aLong);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 3
 Next-> 4
 Next-> 5
 Next-> 6
 Next-> 7
 Next-> 8
 Next-> 9
 ......

3.2 skipLast 操作符

抑制 Observable 发射的后 N 项数据

RxJava(六):过滤操作符

使用 skipLast 操作符修改原始 Observable,可以忽略 Observable 发射后 N 项数据,只保留前面的数据。

Observable.just(1, 2, 3, 4, 5)
        .skipLast(3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 1
 Next-> 2
 Complete.

同样, skipLast 也有一个重载方法接受一个时长而不是数量参数。它会丢弃在原始 Observable 生命周期最后一段时间内发射的数据,时长和时间单位通过参数指定。

Observable.interval(1, TimeUnit.SECONDS)
        .skipLast(3, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next-> " + aLong);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Next-> 5
 Next-> 6
 ......

4. elementAt 和 ignoreElements

4.1 elementAt 操作符

只发射第 N 项数据

RxJava(六):过滤操作符

elementAt 操作符获取原始 Observable 发射的数据序列指定索引位置的数据项,然后当作自己的唯一数据发射

它传递一个基于 0 的索引值,发射原始 Observable 数据序列对应索引位置的值,如果传递给 elementAt 的值为 5,那么它会发射第 6 项数据。如果传递的是一个负数,则将会抛出 IndexOutOfBoundsException 异常。

Observable.just(1, 2, 3, 4, 5)
        .elementAt(2)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next-> 3

elementAt(index)返回一个 Maybe 类型。

public final Maybe<T> elementAt(long index) {
    if (index < 0) {
        throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
    }
    return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<T>(this, index));
}

如果原始 Observable 的数据项数小于 index+1 ,那么会调用 onComplete 方法(在 RxJava l.x 中也会抛出一个 IndexOutOfBoundsException 异常)。所以 elementAt 还提供了一个带默认值的方法,它返回一个 Single 类型。

Observable.just(1, 2, 3, 4, 5)
        .elementAt(8, 10)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Success: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        });

// 执行结果
 Success: 10

如果 index 超出了索引范围,那么取默认值

4.2 ignoreElements

不发射任何数据,只发射 Observable 终止通知

RxJava(六):过滤操作符

ignoreElements 操作符抑制原始 Observable 发射的所有数据,只允许它的终止通知( onError 或 onComplete )通过。它返回 Completable 类型

如果我们不关心一个 Observable 发射的数据,但是希望在它完成时或遇到错误终止时收到通知,那么就可以对 Observable 使用 gnoreElements 操作符,它将确保永远不会调用观察者的 onNext 方法。

Observable.just(1, 2, 3, 4, 5)
        .ignoreElements()
        .subscribe(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable);
            }
        });

// 执行结果
 Complete.

5. distinct 和 filter

5.1 distinct 操作符

过滤掉重复的数据项

RxJava(六):过滤操作符

distinct 的过滤规则是: 只允许还没有发射过的数据项通过

Observable.just(1, 2, 2, 3, 4, 4, 4, 5)
        .distinct()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next->" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next->1
 Next->2
 Next->3
 Next->4
 Next->5
 Complete.

distinct 还能接受 Function 作为参数,这个函数根据原始 Observable 发射的数据项产生一个 Key ,然后比较这些 Key 而不是数据本身,来判定两个数据是否不同。

与 distinct 类似的是 distinctUntilChanged 操作符,该操作符与 distinct 的区别是:它只判定一个数据和它的直接前驱是否不同

Observable.just(1, 2, 1, 2, 3, 4, 4, 4, 5)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next->" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next->1
 Next->2
 Next->1
 Next->2
 Next->3
 Next->4
 Next->5
 Complete.

5.2 filter 操作符

只发射通过谓词测试的数据项

RxJava(六):过滤操作符

filter 操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。

Observable.just(2, 30, 22, 5, 60, 1)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 10;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next->" + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 执行结果
 Next->30
 Next->22
 Next->60
 Complete.

6. debounce

仅在过了一段指定的时间还没发射数据的才发射一个数据

RxJava(六):过滤操作符

debounce 操作符会过滤掉发射速率过快的数据项

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        if (emitter.isDisposed()) return;

        try {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(i);
                Thread.sleep(i * 100);
            }
            emitter.onComplete();
        } catch (Exception e) {
            emitter.onError(e);
        }
    }
}).debounce(500, TimeUnit.MILLISECONDS)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next->" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 执行结果
 Next->6
 Next->7
 Next->8
 Next->9
 Complete.

debounce 还有另外一种形式,使用一个 Function 函数来限制发送的数据。

跟 debounce 类似的是由throttleWithTimeout 操作符,它与只使用时间参数来限流的 debounce 的功能相同。

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

原文 

https://segmentfault.com/a/1190000021600716

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » RxJava(六):过滤操作符

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址