转载

重学Android——OkHttp3源码解读

官网的介绍——An HTTP & HTTP/2 client for Android and Java applications。

它的优点:

  1. 支持http2,对一台机器的所有请求共享同一个socket
  2. 支持连接池,支持连接复用,减少延迟
  3. 支持透明gzip压缩响应体
  4. 通过缓存避免重复的请求
  5. 请求失败时自动重试主机的其他ip,自动重定向
  6. API调用方便

OkHttp的使用

implementation 'com.squareup.okhttp3:okhttp:3.14.2'
复制代码
OkHttpClient okHttpClient = new OkHttpClient();
    okHttpClient.newCall(
        new Request.Builder()
        .url("https://api.github.com/users/apkcore")
        .build())
        //enqueue是异步执行
        .enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.d(TAG, "onFailure: failure");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.d(TAG, "onResponse: success");
            }
        });
复制代码

它的使用是非常简单的,我们在Android中一般使用enqueue执行异步请求。

现在我们看它的源码实现

OkHttp源码分析

初构请求

先看newCall的源码

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
复制代码

可以看到,直接调用了realCall的方法,说明RealCall是Call的实现,继续看RealCall.newRealCall的源码。

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  //利用传递过来的参数,new一个RealCall
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.transmitter = new Transmitter(client, call);
  return call;
}
复制代码

可以看到,先调用RealCall的构造方法,其中第三个参数是否为webSocket(可以服务端主动向客户端推送)。

然后new Transmitter这个对象,放入call中,它的构造源码如下

/**
 * Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams.
 * ...
 */
public final class Transmitter {

	public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
      //主要是这句,连接的状态的时间点记录
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
  }
}
复制代码

可以看它的类注释可以知道,这是OkHttp的应用层和网络层之间的桥梁。中文又叫发射器,这个类执行连接,请求,响应和流的真正操作。

然后,我们继续看.enqueue方法的源码,直接点的话进入的是Call接口,刚刚我们已经发现了,RealCall才是Call的实现类,所以,在RealCall中找到.enqueue方法

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
        //每个请求只能执行一次
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
      //
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
复制代码

可以看到,每个Call只执行一次,否则会抛异常,这里先transmitter.callStart方法,绑定call,开始通知监听器组件网络请求开始了, 主要是通知 EventListener

然后到了关键代码 client.dispatcher().enqueue(new AsyncCall(responseCallback)); ,我们分开来阅读它的源码。

先看外层client.dispatcher()

final Dispatcher dispatcher;

	public Dispatcher dispatcher() {
    return dispatcher;
  }
复制代码

这个Dispatcher类的源码如下:

/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently.
 */
public final class Dispatcher {
    //控制最大请求并发数
    private int maxRequests = 64;
    //单个主机最大并发数
    private int maxRequestsPerHost = 5;
    private @Nullable Runnable idleCallback;

    /** Executes calls. Created lazily. */
    //线程池
    private @Nullable ExecutorService executorService;

    /** Ready async calls in the order they'll be run. */
    //已经准备好异步执行的队列
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

    /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
    //正在执行的异步队列
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

    /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
    //正在执行的同步队列
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    ...
}
复制代码

可以知道, Dispatcher 本质上是异步请求的调度器,负责调度异步请求的执行,控制最大请求并发数和单个主机的最大并发数,并持有一个线程池来负面异步请求,对同步只做统计操作。

再看enqueue

void enqueue(AsyncCall call) {
        synchronized (this) {
            //先把AsyncCall加到准备队列中
            readyAsyncCalls.add(call);

            // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
            // the same host.
            //更改AsyncCall使其能共享同一主机
            if (!call.get().forWebSocket) {
                AsyncCall existingCall = findExistingCallWithHost(call.host());
                if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
            }
        }
        //关键调用
        promoteAndExecute();
    }
复制代码

通过分析,我们知道,enqueue先把AsycCall加入到准备队列中,然后调用promoteAndExecute,其实看方法名我们也知道,这是执行代码

private boolean promoteAndExecute() {
        assert (!Thread.holdsLock(this));

        List<AsyncCall> executableCalls = new ArrayList<>();
        boolean isRunning;
        synchronized (this) {
            for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
                AsyncCall asyncCall = i.next();

                //如果超出最大同步线程,当然这任务就老实呆在准备队列里
                if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
                //同理,超过单一主机连接数,继续遍历下一个asyncCall
                if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

                i.remove();
                asyncCall.callsPerHost().incrementAndGet();
                //添加进executableCalls
                executableCalls.add(asyncCall);
                //添加到运行队列
                runningAsyncCalls.add(asyncCall);
            }
            isRunning = runningCallsCount() > 0;
        }

        for (int i = 0, size = executableCalls.size(); i < size; i++) {
            AsyncCall asyncCall = executableCalls.get(i);
            //执行线程任务
            asyncCall.executeOn(executorService());
        }

        return isRunning;
    }
复制代码

最后,我们看一下asyncCall.executeOn(executorService());这个代码里面的调用

//创建一个线程池
	public synchronized ExecutorService executorService() {
        if (executorService == null) {
            executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                     new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
复制代码

AsyncCall

传入进入的参数AsyncCall

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
      ...
  }
复制代码

可以看到,AsyncCall是一个runnable类,由enqueue的源码分析我们可以知道,最后调用了 asyncCall.executeOn 方法

void executeOn(ExecutorService executorService) {
        ...
        try {
            //执行线程池
            executorService.execute(this);
            success = true;
        } catch (RejectedExecutionException e) {
            ...
        } finally {
            if (!success) {
                client.dispatcher().finished(this); // This call is no longer running!
            }
        }
    }
复制代码

可以看到,线程池执行时,就会调用AsyncCall的run方法,我们去找它的run方法,发现实现在父类

public abstract class NamedRunnable implements Runnable {
  ...
  @Override public final void run() {
    ...
      execute();
    ...
  }
    ...
}
复制代码

可以看到,就是调用了execute方法,这个方法在AsyncCall中有重写

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();//1
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
复制代码

可以看到,真正的Respone是通过getResponseWithInterceptorChain这个方法获取的,其他的代码只是控件与回调,这也是OkHttp的最精彩的设计Interceptor(拦截器)与Chain(调用链)

okhttp中execute同步执行

上面我们分析了enqueue异步执行的源码,其实同步执行的源码也差不多

@Override public Response execute() throws IOException {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
            //先调用Dispatcher有executed方法
            client.dispatcher().executed(this);
            //然后调用getResponseWithInterceptorChain方法
            return getResponseWithInterceptorChain方法();
        } finally {
            client.dispatcher().finished(this);
        }
    }
复制代码

dispatcher的executed方法只是一个把任务添加到运行同步队列的方法

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
复制代码

OkHttp中的属性

这里对我们阅读源码没什么影响,不过可以扩展我们对网络的一些知识的了解。我们回到最开始,看 OkHttpClient 的源码中的属性

final Dispatcher dispatcher;//线程调度器,负责异步请求的执行,会平衡性能,对同步操作只做统计
  final @Nullable Proxy proxy;//可配置的代理,比如翻墙访问
  final List<Protocol> protocols;//给服务端列出的所支持的协议的版本
  final List<ConnectionSpec> connectionSpecs;//配置的是使用的Http还是Https,以及可支持的tls等
  final List<Interceptor> interceptors;//拦截器
  final List<Interceptor> networkInterceptors;//
  final EventListener.Factory eventListenerFactory;//创建EventListener
  final ProxySelector proxySelector;//设置代理的用户名密码
  final CookieJar cookieJar;//cookie存储器
  final @Nullable Cache cache;//缓存
  final @Nullable InternalCache internalCache;//内部缓存接口
  final SocketFactory socketFactory;//创建tcp连接端口
  final SSLSocketFactory sslSocketFactory;//ssl的factory
  final CertificateChainCleaner certificateChainCleaner;//整理证书链
  final HostnameVerifier hostnameVerifier;//主机名验证器,给HTTPs用的
  final CertificatePinner certificatePinner;//用来验证自定义证书的
  final Authenticator proxyAuthenticator;//用于监听代理权限不足的回调
  final Authenticator authenticator;//用于监听权限不足的回调
  final ConnectionPool connectionPool;//连接池。带缓存的集合
  final Dns dns;//用于设置dns,根据域名拿到ip列表
  final boolean followSslRedirects;//http与https互跳时
  final boolean followRedirects;//有重定向时要不要跳转
  final boolean retryOnConnectionFailure;//请求失败是否重连
  final int callTimeout;//call的超时时间
  final int connectTimeout;//tcp超时时间
  final int readTimeout;//下载超时时间
  final int writeTimeout;//写入请求的超时时间
  final int pingInterval;//心跳包ping的间隔
复制代码

小结

可以看到,无论是execute还是enqueue,都是调用了Disapatch的的对应execute与enqueue方法,真正的response都是通过 getResponseWithInterceptorChain 来获取的,区别就是execute的执行是在dispatch中只做一个同步统计,而enqueue方法会在dispatch中根据当前并发数选择是否执行队列中等待的AsyncCall。

拦截器与调用链

上面分析到了,网络请求的入口实际上就是在RealCall的 getResponseWithInterceptorChain 这个方法

Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        //创建完整的拦截器集合
        List<Interceptor> interceptors = new ArrayList<>();
        //用户定义的一系列拦截器,如日志等
        interceptors.addAll(client.interceptors());
        //负责失败重试和重连的拦截器
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        //把用户构造的请求转化为发送到服务器的请求,把服务器返回的结果转化为用户友好的结果
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //负责缓存策略的拦截器
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //负责与服务器连接的拦截器
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            interceptors.addAll(client.networkInterceptors());
        }
        //负责向服务器发送数据和从服务器获取数据的拦截器
        interceptors.add(new CallServerInterceptor(forWebSocket));

        //把这个集合转换成链
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

        boolean calledNoMoreExchanges = false;
        try {
            //关键执行代码
            Response response = chain.proceed(originalRequest);
            ...
            return response;
        } catch (IOException e) {
            ...
        } finally {
           ...
        }
    }
复制代码

从源码中可以看出,它会先把所有的拦截器,转换成一条链RealInterceptorChain,再执行RealInterceptorChain中的proceed方法

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
        throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();

        ...
            // Call the next interceptor in the chain.
            //从下一位开始的拦截器链
            RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
                                                                 index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
        Interceptor interceptor = interceptors.get(index);
        //调用对应拦截器的intercept方法
        Response response = interceptor.intercept(next);

        ...
        return response;
    }
复制代码

而每一个interceptor.intercept的方法体里,都调用了realChain.proceed方法,这样chain与interceptor互相递归调用,直到世界(链)的尽头。

在这里其实是用到了责任链模式,比如,CacheInterceptor中

@Override 
    public Response intercept(Chain chain) throws IOException {
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            ...
            Request networkRequest = strategy.networkRequest;
        	...
            // If we don't need the network, we're done.
            // 如果不需要网络直接可以返回缓存结果
            if (networkRequest == null) {
                return cacheResponse.newBuilder()
                    .cacheResponse(stripBody(cacheResponse))
                    .build();
            }
        ...
    }
复制代码

在源码中,如果可以直接取缓存里的数据时,是直接return的,不需要走后面的网络请求流程,这也是责任链模式所带来的便利性。

小结

使用了责任链模式,每个拦截器完成的大致流程是

  1. 先经过用户拦截器
  2. RetryAndFollowUpInterceptor负责自动重试和进行必要的重定向
  3. BridgeIntercetro负责将用户Request转换成一个实际的网络请求的Request,再调用下层的拦截器获取Response,最后再将网络Reponse转换成用户的Response
  4. CacheInterceptor负责控制缓存
  5. ConnectInterceptor负责进行连接主机
  6. 网络拦截器进行拦截
  7. CallServerInterceptor是真正的和服务器通信,完成Http请求。

可以看到,其实如果仔细看ConnectInterceptor,CallServerInterceptor等的源码,我们能发现,okhttp在内部自己完整的实现了一套TCP、TLS/SSL连接建立,HTTP的报文传输,各种http特性的支持(重试,跳转,cache,cookie等)。有兴趣的盆友可以自己跟着源码阅读一下,参考地址 dieyidezui.com/okhttp-3-4-…

原文  https://juejin.im/post/5d010926e51d45508c2fb832
正文到此结束
Loading...