public class Biz {
public static Single<String> method() {
RespTransformer<String> transformer = RespTransform.newInstance();
return ApiGenerator.createApi(Api.class)
.method()
.compose(transformer)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}
public class presenter {
public void method() {
Biz.method()
.subscribe(new Consumer<List<Resp>>() {
@Override
public void accept(List<Resp> resps) {
...
}
}, new BaseRespThrowableObserver() {
@Override
public void onV2Error(String code, String errorMsg) {
...
}
}));
}
}
public class ApiGenerator {
public static <S> createApi(Class<S> apiClass) {
// serviceCreator()返回Retrofit实例;
return NetManager.getInstance().serviceCreator().create(apiClass);
}
}
复制代码
上面就是Retrofit的调用流程, 下面根据这个调用流程来进行分析, Retrofit是如何完成一次网络请求的,
@SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety.
public <T> T create(final Class<T> service) {
validateServiceInterface(service);
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
private final Object[] emptyArgs = new Object[0];
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}
复制代码
retrofit使用到 动态代理
的技术, 使得不同的接口调用, 最终都走到这里, 进行网络请求.
Retrofit.loadServiceMethod
--->ServiceMethod.parseAnnotations
--->HttpServiceMethod.parseAnnotations
复制代码
static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
Retrofit retrofit, Method method, RequestFactory requestFactory) {
boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
boolean continuationWantsResponse = false;
boolean continuationBodyNullable = false;
Annotation[] annotations = method.getAnnotations();
Type adapterType;
adapterType = method.getGenericReturnType();
// 1.获取adapter;
CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations);
Type responseType = callAdapter.responseType();
// 2.获取convert;
Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType);
okhttp3.Call.Factory callFactory = retrofit.callFactory;
// 3.构建CallAdapter;
return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
}
复制代码
retrofit的好处, 支持自定义convert与adapter, 默认okhttp只是单纯的网络请求, 与rxjava并没有关系, 但是通过Retrofit可以实现okhttp与rxjava的结合. 贴出来一段代码retrofit如何将okhttp与rxjava结合
public void initRetrofit() {
Retrofit retrofit = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create(/*自定义Gson*/))
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
public static RxJava2CallAdapterFactory create() {
return new RxJava2CallAdapterFactory(null, false);
}
}
复制代码
class HttpServiceMethod::createCallAdapter
private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
}
class Retrofit::callAdapter
public CallAdapter<?, ?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
Objects.requireNonNull(returnType, "returnType == null");
Objects.requireNonNull(annotations, "annotations == null");
int start = callAdapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
}
class RxJava2CallAdapterFactory::get
@Override
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
Class<?> rawType = getRawType(returnType);
if (rawType == Completable.class) {
// Completable is not parameterized (which is what the rest of this method deals with) so it
// can only be created with a single configuration.
return new RxJava2CallAdapter(Void.class, scheduler, isAsync, false, true, false, false,
false, true);
}
boolean isFlowable = rawType == Flowable.class;
boolean isSingle = rawType == Single.class;
boolean isMaybe = rawType == Maybe.class;
// ...
// 这里为okhttp与rxjava结合做了铺垫
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false);
}
复制代码
class CallAdapted::invoke
@Override
final ReturnT invoke(Object[] args) {
Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
return adapt(call, args);
}
@Override
protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
// callAdapter指向的是初始化Retrofit中传入的RxJava2CallAdapter
return callAdapter.adapt(call);
}
复制代码
2.1模块loadServiceMethod
返回CallAdapted对象, 当有网络请求时, 触发CallAdapted.invoke方法的执行 @Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return RxJavaPlugins.onAssembly(observable);
}
复制代码
retrofit端作为网络请求的入口, 将请求权和结果的处理权都交给了rxjava, 实现了rxjava与okhttp的结合.
结合rxjava的源码可知, subsrcibe()会触发CallExecuteObservable.subscribeActual的执行
@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
//...
Response<T> response = call.execute();
//...
}
复制代码
@Override
public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
executed = true;
// call = RealCall;
call = getRawCall();
}
return parseResponse(call.execute());
}
复制代码
override fun execute(): Response {
timeout.enter()
callStart()
// 流程省略, client指向OkHttpClient
client.dispatcher.executed(this)
// 网络请求的处理使用了责任链模式
return getResponseWithInterceptorChain()
}
复制代码
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
val response = chain.proceed(originalRequest)
return response
}
复制代码