RxJava 已经成为了 Android 开发必备的技能,鉴于大部分敏捷开发团队,掌握 RxJava 和 Retroft 快速开发网络框架,能大幅度减少网络框架重构时间。
有大量文章教程在写 RxJava 如何使用,如扔物线的RxJava详解,还有南尘的RxJava2.x 教程,那我为什么写这多余的文章呢?
本文不是一个指导如何使用 RxJava 特性和基本的探索,而是从更高的角度上挖掘它,去了解代码库是怎么做的,内部工作原理又是怎样的,相比其他网络请求框架有什么优势?
RxJava 在 GitHub 主页上的简介是
"Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM."
翻译过来就是:活性扩展JVM库编写异步和基于事件的程序使用Java VM可观察序列。
很费解的一句话,你可以这么理解,压缩版能优简化代码的异步请求库。
RxJava , Retrofit , Rxandroid 相关依赖 implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
复制代码
<uses-permission android:name="android.permission.INTERNET"/> 复制代码
访问 小木箱 github 仓库,通过get请求得到了以下报文:
然后,通过 Gsonformat 得到相关实体类对象:
class MicroKibacoRepo {
private int id;
private String node_id;
private String name;
private String full_name;
// ---为了避免浪费篇幅,省略无用代码---
}
复制代码
Single interface 作为 Web Service 的请求集合,在⾥⾯⽤注解( Annotation )写⼊需要配置的请求方法 public interface Api {
@GET("users/{username}/repos")
Single<List<MicroKibacoRepo>> listRepos(@Path("username") String user);
}
复制代码
Retrofit 创建出 interface 的实例 Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
Api api = retrofit.create(Api.class);
复制代码
observeOn , observeOn 和 subscribe 等订阅事件切换线程达到网络请求效果。 api.listRepos("MicroKibaco")
.subscribeOn(Schedulers.io()) // 切换网络请求
.observeOn(AndroidSchedulers.mainThread()) // 切换到UI线程
.subscribe(new SingleObserver<List<MicroKibacoRepo>>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅者模式,我向你订阅事件
mDisposable = d;
}
@Override
public void onSuccess(List<MicroKibacoRepo> microKibacoRepos) {
text_view.setText(microKibacoRepos.get(0).getName());
}
@Override
public void onError(Throwable e) {
String msg = e.getMessage();
if (msg == null) {
msg = e.getClass().getName();
}
text_view.setText(msg);
}
});
复制代码
一个简单的 RxJava 网络处理库就做好了,下面我们来分析一下,具体 Api 的底层实现: RxJava 的整体结构是一条链,其中:
Observable 。 Observer 。 Observable ,又是上游的 Observer 。 我们访问 Single 的 just 方法发现: 里面有一个关键的钩子方法: onAssembly 。
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
// 钩子方法
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
// 查询静态对象,是否存在
Function<? super Single, ? extends Single> f = onSingleAssembly;
// 如果存在,额外处理
if (f != null) {
return apply(f, source);
}
// 如果不存在,直接返回
return source;
}
复制代码
整个 Single.just 内部其实创建了一个SingleJust, SingleJust 里面有一个关键方法: subscribeActual 而 subscribeActual 有一个比较重要的抽象方法 onSubscribe 和 onSuccess 。
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
复制代码
整个方法做的事情就是优先执行 onSubscribe ,后执行 onSuccess 。
// just 方法创建了一个上层被观察的对象
Single<String> single = Single.just("1");
single = single.subscribeOn(Schedulers.io());
single = single.subscribeOn(AndroidSchedulers.mainThread());
// subscribe 把观察者传进来
single.subscribe(new SingleObserver<String>() {
// subscribe 内部会调用 subscribeActual
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
text_view.setText(s);
}
@Override
public void onError(Throwable e) {
String msg = e.getMessage();
if (msg == null) {
msg = e.getClass().getName();
}
text_view.setText(msg);
}
});
复制代码
而 Disposable 的 disposed 方法其实是:在我不需要订阅关系的时候,我们切断这种关系。
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
复制代码
可以通过 dispose() 方法来让上游停止工作,达到 『丢弃』 的效果
原理: 在 Scheduler 指定的线程里启动 subscribe()
原理: 在内部创建的 Observer 的 onNext() onError() onSuccess() 等回调方法里面,通过 Scheduler 指定的线程 来调用下级 Observer 的 对应回调方法
Schedulers.newThread() 和 Schedulers.io():
AndroidSchedulers.mainThread():
通过内部的 Handle 把任务放到主线程去做