转载

Android的三方库 - RxJava:RxJava的使用和基本订阅流程

GitHub关于RxJava的介绍:

a library for composing asynchronous and event-based programs by using observable sequences

他的意思就是 一个通过可观测的序列来组成异步和基于事件的库。

RxJava的出现消除同步问题、线程安全等问题

总的来说就是方便我们异步编程。

二:RxJava的优点和缺点

优点

异步

链式调用结构

使用复杂的异步调用方式的时候依旧可以保持简洁

缺点

学习成本比较高,入门的门槛比较高

难以理解的API,需要查看源码才能理解API的具体效果

三:RxJava的基础使用

首先明白他的基础使用步骤:

  1. 创建被观察者(Observable)
  2. 创建观察者(Observer)
  3. 订阅(subscribe)

1.创建被观察者

正常创建被观察者:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        });
复制代码

在这里面一共产生了四个事件:One、Two、Three、结束。

PS:

非正常创建第一弹:

Observable observable = Observable.just("ONE","TWO","THREE");

非正常创建第二弹:

String[] values = {"ONE", "TWO", "THREE"};
Observable observable = Observable.fromArray(values);
复制代码

其实这样的非正常创建是内部将这些信息包装成onNext()这样的事件发送给观察者。

2.创建观察者

正常创建:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("z", "onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.i("z", "onNext: s = " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("z", "onError: ");
            }

            @Override
            public void onComplete() {
                Log.i("z", "onComplete: ");
            }
        };
复制代码

非正常创建:

Consumer<String> observer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("z", "accept: s = " + s);
            }
        };
        
复制代码

3.订阅

observable.subscribe(observer);

你已经注意到不一样的地方,为什么被观察者订阅了观察者?

之所以会这样,是因为RxJava为了保持链式调用的流畅性。

4. 异步调用

RxJava既然是异步库,当然对于异步的处理会更好

在我们看RxJava的异步调用之前,我们先来学习下其中比较重要的两个点

  • subscribeOn()
  • observeOn()

subscribeOn

这个表示Observable在一个指定的环境下创建,只能使用一次,多次创建的话会以第一次为准。

observeOn

表示 事件传递和 最终处理发生在哪个环境下,可以多次调用,每次指定之后,下一步就生效。

比如:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        }) // 被观察者在一个新的线程中创建
        .subscribeOn(Schedulers.newThread()) 
                // 下面这个操作是在io线程中
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s.toLowerCase();
                    }
                })
                  // 切换,观察者是在主线程中
                .observeOn(AndroidSchedulers.mainThread()) 
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        
                    }
                });

复制代码

四:RxJava的基础订阅流程

先看一下基础的调用方式:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: ");
                emitter.onNext("ONE");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }


            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: s = " + s);
            }


            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: e = " + e.getMessage());
            }


            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        });
    }
});
复制代码

结果:

onSubscribe:
subscribe:
onNext: s = ONE
复制代码

我们先从订阅开始看,也就是 subscribe 方法

public final void subscribe(Observer<? super T> observer) {
        ... // 忽略部分源码
        subscribeActual(observer);
        ... // 忽略部分源码
}
复制代码

直接找到主要的方法 subscribeActual(observer) ,这个是抽象的方法,会被实现在子类中。

所以我们接着看看 Observable 的子类实现:

我们进入到 create 方法中:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
复制代码

其实 返回的就是 ObservableCreate 的对象,

需要注意的是: ObservableCreateObservable 的一个子类 ObservableCreate 被创建都会传入一个 source 的字段,这个 source 就是 ObservableOnSubscribe

ObservableCreate 具体实现了 subscribeActual 方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 在这里触发 observer#onSubscribe()
    observer.onSubscribe(parent);

    try {
        // 在这里回调
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

在这里方法可以看到 观察者 observeronSubscribe 会先于回调发生。

然后调用 ObservableOnSubscribe 的方法 subscribe

具体的事件后由开发者去做, 可以看到在案例中调用了 CreateEmitter ,可以进入到 CreateEmitter 看看 onNext() 的实现

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
复制代码

可以看到 在 CreateEmitteronNext() 中调用了 观察者 observeronNext() 方法.

然后可以看到案例中的调用:

@Override
public void onNext(String s) {
   Log.i(TAG, "onNext: s = " + s);
 }
复制代码
原文  https://juejin.im/post/5e1c66e35188252c6e181c73
正文到此结束
Loading...