RxJava 2.x 学习笔记

这段时间学习 RxJava 2.x 的笔记

观察者、被观察者

Observable、Observer

// 1. 创建上游(被观察者)
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(ObservableEmitter emitter)throws Exception {
        emitter.onNext(1);
        emitter.onError();
        emitter.onComplete();
    }
});

// 2. 创建下游(观察者)
Observer observer = new Observer<Integer>() {
    Disposable disposable = null;

    @Override
    public void onSubscribe(Disposable d){
    Log.d(TAG, "onSubscribe: ");
        disposable = d;
    }

    @Override
    public void onNext(Integer o){
        log.d(TAG, "onNext: " + o)
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: ");
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
};

// 3. 建立连接(订阅)
observable.subscribe(observer);

// 4. 打印结果:
onSubscribe:
onNext: 1
onComplete:

ObservableEmitter

  • ObservableEmitter:可理解为发射器,可发射3种类型的事件,即调用onNext()、onComplete()、onError() 方法。
  • 上游可以发送多个 onNext() 和 onComplete(),只能发送一个 onError();可以不发送 onComplete() 和 onError()。
  • 上游发送 onComplete() 或者 onError() 后会继续发送其他事件,但是下游接收到 onComplete() 或者 onError() 事件之后不再接收其他事件。
  • 上游可以先发送 onError() 再发送 onComplete(),不能先发送 onComplete() 再发送 onError()。

Disposable

  • 调用 dispose() 方法可以切断上下游之间的连接,上游可以继续发送除 onError() 之外的事件,但是下游不再接收事件。
  • 可通过两种方式获取 Disposable 对象,分别为:
    • 在 onSubscribe() 回调方法的参数中获取;
    • 某些重载的订阅方法 subscribe() 返回值是 Disposable 对象:
      public final Disposable subscribe(){}
      public final Disposable subscribe(Consumer<?super T> onNext){}
      public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError){} 
      public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError, Action onComplete){}
      public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe){}
      public final void subscribe(Observer<?super T> observer){}
      

Flowable、Subscriber

// 1. 创建上游
Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
    @Override
    public void subscribe(FlowableEmitter emitter)throws Exception {
        emitter.onNext(1);
        emitter.onComplete();
        // emitter.onError();
    }
}, BackpressureStrategy.ERROR);

// 2. 创建下游
Subscriber subscriber = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s){
        Log.d(TAG, "onSubscribe: ");
        s.request(1);
    }

    @Override
    public void onNext(Integer integer){
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t){
        Log.d(TAG, "onError: ");
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
};

// 3. 建立连接(订阅)
flowable.subscribe(subscriber);

BackpressureStrategy

  • 表示背压策略,有以下几种:
    MissingBackpressureException
    

Subscription

  • 和 Disposable 类似,调用 cancel() 表示请求停止发送事件,可切断上下游的连接
  • 必须显示调用 request(long n)
    ,表示下游可以处理 n 个事件

FlowableEmitter

  • 继承自 Emitter,即 onNext()、onError()、onComplete()
  • requested():表示下游的处理能力,即下游 s.request() 的大小
  • isCancelled():下游是否请求停止发送,即 s.cancel()

Single、SingleObserver

  • 上游可以发送多个事件,但是下游只能接收到一个事件
  • onSuccess() 和 onError() 是互斥
    // 1. 创建上游
    Single<Integer> single = Single.create(new SingleOnSubscribe<Integer>() {
        @Override
        public void subscribe(SingleEmitter<Integer> e)throws Exception {
            for (int i = 0; i < 3; i++) {
                e.onSuccess(i);
                Log.d(TAG, "subscribe: " + i);
            }
        }
    });
    
    // 2. 创建下游
    SingleObserver<Integer> observer = new SingleObserver<Integer>() {
        @Override
        public void onSubscribe(Disposable d){
            Log.d(TAG, "onSubscribe: ");
        }
    
        @Override
        public void onSuccess(Integer integer){
            // 相当于 onNext() 和 onComplete()
            Log.d(TAG, "onSuccess: " + integer);
        }
    
        @Override
        public void onError(Throwable e){
            Log.d(TAG, "onError: ");
        }
    };
    
    // 3. 建立连接(订阅)
    single.subscribe(observer);
    
    // 4. 打印结果:
    onSubscribe: 
    onSuccess: 0
    subscribe: 0
    subscribe: 1
    subscribe: 2
    

SingleEmitter

  • 是个接口,onSuccess()、onError()

Completable、CompletableObserver

// 1. 创建上游
Completable completable = Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(CompletableEmitter e)throws Exception {
        for (int i = 0; i < 3; i++) {
            e.onComplete();
            Log.d(TAG, "subscribe: " + i);
        }
    }
});

// 2. 创建下游
CompletableObserver observer = new CompletableObserver() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }
};

// 3. 建立连接(订阅)
completable.subscribe(observer);

// 4. 打印结果:
onSubscribe: 
onComplete:
subscribe: 0
subscribe: 1
subscribe: 2
  • 上游可以发送多个事件,但是下游只能接收到一个事件
  • onComplete() 和 onError() 是互斥

Maybe、MaybeObserver

// 1. 创建上游
Maybe<Integer> maybe = Maybe.create(new MaybeOnSubscribe<Integer>() {
    @Override
    public void subscribe(MaybeEmitter<Integer> e)throws Exception {
        for (int i = 0; i < 3; i++) {
            e.onSuccess(i);
            // 不会起作用
            e.onComplete();
        }
    }
});

// 2. 创建下游
MaybeObserver<Integer> observer = new MaybeObserver<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onSuccess(Integer integer){
        Log.d(TAG, "onSuccess: " + integer);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
};

// 3. 建立连接(订阅)
maybe.subscribe(observer);

// 4. 打印结果:
onSubscribe: 
onSuccess: 0
  • 类似于 Single 和 Completable 的混合体,onSuccess()、onComplete()、onError() 三者互斥

线程调度

subscribeOn()

  • 表示上游发送事件的线程
  • 有多个 subscribeOn() 时,上游只会在第一个 subscribeOn() 表示的线程发送事件

observeOn()

  • 表示下游接收事件的线程
  • 有多个 observeOn() 时,下游只会在最后一个 observeOn() 表示的线程接收事件

Scheduler

  • Schedulers.immediate()
    : 默认的 Scheduler
    ,直接在当前线程运行。
  • Schedulers.newThread()
    : 总是启用新线程,并在新线程执行操作。
  • Schedulers.io()
    : I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
    。行为模式和 newThread()
    差不多,区别在于 io()
    的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()
    newThread()
    更有效率。不要把计算工作放在 io()
    中,可以避免创建不必要的线程。
  • Schedulers.computation()
    : 计算所使用的 Scheduler
    。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler
    使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()
    中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread()
    : Android 专用的,它指定的操作将在 Android 主线程运行。

背压策略

ERROR

  • error:当下游没有请求数据时,上游最多只能发送128个事件,多于 128 时将会调用 onError() 抛出 MissingBackpressureException 异常;当上下游流速均衡(即上游发送数据和下游处理数据的速度相同)时,上游可以发送无限数据,不会出现 OOM
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 ErrorAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的 onNext()
    方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 ErrorAsyncEmitter 的实现方法 onOverflow()
    onOverflow()
    方法里面调用 onError()
    方法抛出 MissingBackpressureException 异常
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync()
    方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
    ,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
    ,然后再请求上游发送 96 个数据到下游的缓存队列(此时如果上游继续发送数据(e.onNext(i)),由于 AtomicLong 大于 0 会继续发送数据到下游缓存队列,否则就不发送数据到下游),与此同时下游继续从缓存队列取数据发送出去,发送一个数据就 e++
    ,直到 while(e != r)
    不成立导致不再发送给外界。此时如果外界主动调用 s.request(n)
    请求数据将继续发送数据给外界
  • 上游发送完全部数据之前,如果上游发送过的所有数据比下游请求过的所有数据 >= 96
    时抛出 MissingBackpressureException 异常。因为每次下游都是请求 96 个数据,96 保存在上游的 AtomicLong 中,发送一个数据就减 1,当 AtomicLong 为 0 时就抛出 MissingBackpressureException 异常

BUFFER

  • buffer: 上游可以发送无限个数据,不会出现 MissingBackpressureException 异常,但是会 OOM
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 BufferAsyncEmitter 的 onNext()
    方法中先用 queue.offer(t)
    保存发送过的所有数据,然后再调用 drain()
    方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync()
    方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
    ,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
    ,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
    ,这时发送了 4 个数据后 while(e != r)
    不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request()
    方法设置 AtomicLong 的值为 96,再去调用 BufferAsyncEmitter 实现的 onRequested()
    方法, onRequested()
    中再调用 drain()
    方法完成数据的发送。 drain()
    方法会从 queue 中取出未发送过的数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request()
    方法中去调用空方法 onRequested()
    ,而 onNext()
    方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界

DROP

  • drop: 上游可发送无限个数据
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 DropAsyncEmitter 的父类 NoOverflowBaseAsyncEmitter 的 onNext()
    方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 onOverflow()
    onOverflow()
    是个空方法,也就是丢弃数据
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync()
    方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
    ,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
    ,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
    ,这时发送了 4 个数据后 while(e != r)
    不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request()
    方法中去调用空方法 onRequested()
    ,而 onNext()
    方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界

LATEST

  • latest: 上游可以发送无限个数据
  • 当下游没有请求数据时,上游用 AtomicLong 记录需要发送给下游的数据个数,默认是 128 个;发送数据是在 LatestAsyncEmitter 的 onNext()
    方法中先用 queue(AtomicReference对象) 保存当前发送的数据, 所以发送完所有数据后 queue 保存的是最后一个数据
    ,然后再调用 drain()
    方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列
  • 当下游请求数据时(s.request(100)),请求的个数保存在 requested 变量中;请求的数据最终是在 FlowableObserveOn 的子类 ObserveOnSubscriber 的 runAsync()
    方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
    ,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
    ,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
    ,这时发送了 4 个数据后 while(e != r)
    不成立导致不再发送给外界
  • 当上游发送完全部数据下游再请求数据时,最终会到达 BaseEmitter 的 request()
    方法中去调用 LatestAsyncEmitter 实现的 onRequested()
    方法, onRequested()
    中再调用 drain()
    方法完成数据的发送。 drain()
    方法会把 queue 中保存的最后一个数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界,所以 LATEST 策略总是可以请求到上游的最后一个数据

MISSING

  • missing: 上游没有背压策略,需要下游通过背压操作符( onBackpressureBuffer()
    onBackpressureDrop()
    onBackpressureLatest()
    )来指定背压策略
  • 当下游没有指定背压策略时会抛出 MissingBackpressureException 异常

创建操作符

just

  • 最多只能发送 10 个数据,最后发送 onComplete
  • 当发送数据超过 2 个时,内部调用 fromArray()
Observable.just(1, 2).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Integer integer){
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
});

// 打印结果:
onSubscribe: 
onNext: 1
onNext: 2
onComplete:

fromArray

fromArray(T... items)
  • 如果参数是个空数组,直接调用 empty() 创建符;如果只有一个元素,则调用 just()
    public static <T> Observable<T>fromArray(T... items){
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
int[] ints = new int[]{1, 2};

Observable.fromArray(1, 2).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Integer integer){
        // 如果传入的是数组,此处的参数也是数组
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
});

// 打印结果:
onSubscribe: 
onNext: 1
onNext: 2
onComplete:

empty

  • 只发送 onComplete
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Object o){
        Log.d(TAG, "onNext: " + o);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: ");
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
});

// 打印结果:
onSubscribe: 
onComplete:

fromIterable

List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
        
Observable.fromIterable(list).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Integer integer){
        Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
});

// 输出结果:
onSubscribe: 
onNext: 1, Thread[main,5,main]
onNext: 2, Thread[main,5,main]
onComplete:

timer

timer(long delay, TimeUnit unit)
timer(long delay, TimeUnit unit, Scheduler scheduler)
  • 延迟 delay 发送一个 0 和 onComplete
  • 默认在子线程发送事件,可指定发送事件所在的线程
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: ");
    }

    @Override
    public void onNext(Long aLong){
        Log.d(TAG, "onNext: " + aLong);
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: ");
    }
});

// 输入结果:
onSubscribe: Thread[main,5,main]
onNext: 0, Thread[RxComputationThreadPool-1,5,main]
onComplete: Thread[RxComputationThreadPool-1,5,main]

interval

interval(long period, TimeUnit unit)    // 每隔 period 发送一次 onNext() 事件
interval(long period, TimeUnit unit, Scheduler scheduler)   // 每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程
interval(long initialDelay, long period, TimeUnit unit) // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)    // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程
  • 默认在子线程发送事件,可通过参数指定发送事件所在的线程
  • 从 0 开始无限制的发送 onNext 事件
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: " + Thread.currentThread());
    }

    @Override
    public void onNext(Long aLong){
        Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread());
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: " + Thread.currentThread());
    }
});

intervalRange

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  • 一开始延迟 initialDelay,然后从 start 开始每隔 period 发送一次 onNext,一共发送 count 个事件,可指定发送事件所在的线程
  • 默认是在子线程发送事件,发送了 count 个 onNext 后会发送 onComplete
Observable.intervalRange(3, 3, 0, 1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: " + Thread.currentThread());
    }

    @Override
    public void onNext(Long aLong){
        Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread());
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: " + Thread.currentThread());
    }
});

// 输出结果:
onSubscribe: Thread[main,5,main]
onNext: 3, Thread[RxComputationThreadPool-1,5,main]
onNext: 4, Thread[RxComputationThreadPool-1,5,main]
onNext: 5, Thread[RxComputationThreadPool-1,5,main]
onComplete: Thread[RxComputationThreadPool-1,5,main]

range、rangeLong

range(final int start, final int count)
rangeLong(long start, long count)
  • 从 start 开始发送 count 个 onNext,最后发送 onComplete
Observable.range(1, 3).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d){
        Log.d(TAG, "onSubscribe: " + Thread.currentThread());
    }

    @Override
    public void onNext(Integer integer){
        Log.d(TAG, "onNext: " + integer + ", " + Thread.currentThread());
    }

    @Override
    public void onError(Throwable e){
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete(){
        Log.d(TAG, "onComplete: " + Thread.currentThread());
    }
});

// 输出结果:
onSubscribe: Thread[main,5,main]
onNext: 1, Thread[main,5,main]
onNext: 2, Thread[main,5,main]
onNext: 3, Thread[main,5,main]
onComplete: Thread[main,5,main]

zip

  • zip:将多个 Observable 发送的事件组合起来,然后再发送这个新的事件
  • 严格按照发送事件的顺序来组合新的事件
  • 下游收到的事件数量和上游发送最少的事件相同,即 observable1 发送 1 个事件,observable2 发送 2 个事件,下游会收到 1 个事件
    // observable1
    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {    
        @Override           
        public void subscribe(ObservableEmitter<Integer> emitter)throws Exception {       
            emitter.onNext(i);                                
        }                  
    }).subscribeOn(Schedulers.io());
    
    // observable2
    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {      
        @Override                 
        public void subscribe(ObservableEmitter<String> emitter)throws Exception {        
            emitter.onNext("A");               
        }           
    }).subscribeOn(Schedulers.io());
    
    // zip
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
        @Override       
        public String apply(Integer integer, String s)throws Exception {                  
            return integer + s;                     
        }                                                                                  
    }).observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<String>() {                               
        @Override                        
        public void accept(String s)throws Exception { 
            Log.d(TAG, s);                   
        }                                                                                  
    }, new Consumer<Throwable>() {                   
        @Override                          
        public void accept(Throwable throwable)throws Exception {   
            Log.w(TAG, throwable);                      
        }                        
    });
    

sample

  • sample:每隔一定的时间从上游取一个事件发送给下游
    // 每隔 2 秒从上游取出一个事件发送给下游
    Observable.create(...).sample(2, TimeUnit.SECONDS)
    

filter

  • filter:过滤事件,符合条件的才发送到下游
    Observable.create(...).filter(new Predicate<Object>() {
        @Override
        public boolean test(Object o)throws Exception {
            // 返回 true 才继续往下走
            return ...;
        }
    })
    

take

take(long count)    // 发送 count 个事件给下游
take(long time, TimeUnit unit)  // 发送多久
take(long time, TimeUnit unit, Scheduler scheduler) // 指定线程发送多久
// 发送 3 个事件
Flowable.interval(1, TimeUnit.SECONDS)
        .take(3)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer)throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });
        
// 发送 300 毫秒
Flowable.interval(1, TimeUnit.SECONDS)
        .take(3000, TimeUnit.MILLISECONDS)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer)throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

原文 

https://ljuns.itscoder.com/2018/06/27/RxJava-简介/

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » RxJava 2.x 学习笔记

分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址