盤它,okhttp3吊炸天源碼分析(一):同步、異步請求源碼解析

本篇文章已授權微信公眾號guolin_blog(郭霖)獨家發(fā)布

okhttp一經(jīng)推出,讓其他的網(wǎng)絡請求框架變得黯然失色。網(wǎng)上關于okhttp的介紹文章一大堆,這里我就不繼續(xù)bb了,今天我嘗試從源碼的角度去分析一下okhttp的一個工作流程。其實在動筆寫這篇文章之前,我已經(jīng)在網(wǎng)上看過不少關于okhttp源碼分析的文章,怎么說呢,有些文章,一上來就是一個大總結或者一個UML圖,讓人看得晦澀難懂。對于大部分沒有閱讀過okhttp源碼的人來說,全盤托出的一上來一大堆接口啊類啊什么的,容易把人整暈。今天我嘗試用通俗易懂的語言來描述一下同步、異步的操作流程,以及okhttp背后到底干了啥。

先貼一下 okhttp的github地址:okhttp項目地址。最新的源碼是3.0+,而3.0與舊版本多少還是有些差別的,這里我就不介紹了,感興趣的話自行去百度。

同步請求

 //  構建okHttpClient,相當于請求的客戶端,Builder設計模式
 OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
        // 構建一個請求體,同樣也是Builder設計模式
        Request request = new Request.Builder().url("http://www.baidu.com").build();
        //  生成一個Call對象,該對象是接口類型,后面會說
        Call call = okHttpClient.newCall(request);
        try {
            //  拿到Response
            Response response = call.execute();
            Log.i("TAG",response.body().string());
        } catch (IOException e) {
            
        }

以上就是一個簡單的同步請求示例代碼,我們來看一看要經(jīng)過哪些步驟:

1.通過Builder模式創(chuàng)建OkHttpClient對象和Request對象
2.調用OkHttpClient的newCall方法,獲取一個Call對象,參數(shù)是Request
3.調用execute方法獲取一個Respone

(一) OkHttpClient源碼分析

public static final class Builder {
    Dispatcher dispatcher;
    ...
    ...
    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
   
}
  

 public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

Builder是OkHttpClient一個靜態(tài)內部類,在Builder的構造函數(shù)中進行了一系列的初始化操作,其中Dispatcher中文是分發(fā)器的意思,和攔截器不同的是分發(fā)器不做事件處理,只做事件流向。他負責將每一次Requst進行分發(fā),壓棧到自己的線程池,并通過調用者自己不同的方式進行異步和同步處理,那具體是怎么操作的呢?后面會講。ConnectionPool是一個連接池對象,它可以用來管理連接對象,從它的構造方法中可以看到連接池的默認空閑連接數(shù)為5個,keepAlive時間為5分鐘。

(二) Request源碼分析

public static class Builder {
    @Nullable HttpUrl url;
    String method;
    Headers.Builder headers;
    @Nullable RequestBody body;

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }
}

Builder()構造函數(shù)中設置了默認的請求方法是GET方法,Request類的build()方法是用來創(chuàng)建一個Request對象,將當前的Builder對象傳進去,并完成了對象的賦值。我們來看一下Request的構造函數(shù):

Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tags = Util.immutableMap(builder.tags);
  }

將Builder類的相關屬性賦值給Request的相關屬性,這也是Builder模式的精髓。

(三) Call對象的創(chuàng)建:newCall()執(zhí)行分析

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false);
  }

可以看到newCall方法內部調用了RealCall的newRealCall方法,Call是一個接口,RealCall是它的實現(xiàn)類,接下來我們去newRealCall方法內部看看

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

在newRealCall方法里,完成了RealCall對象的創(chuàng)建,并把它返回出去。至此,Call對象已經(jīng)創(chuàng)建完畢,實際上創(chuàng)建的對象是Call的實現(xiàn)類RealCall 對象。

(四) Response對象的創(chuàng)建: call.execute()執(zhí)行分析

前面已經(jīng)說到 Call call = okHttpClient.newCall(request)拿到的是一個RealCall對象,所以我們直接去RealCall類的execute()方法看它的源碼

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      e = timeoutExit(e);
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

在同步代碼塊中,首先判斷excuted是不是true,它的含義是否有在執(zhí)行,如果是,拋出異常,如果沒有執(zhí)行過,將excuted置為true。eventListener.callStart(this)開啟事件監(jiān)聽,eventListener在RealCall對象創(chuàng)建的時候,也一起創(chuàng)建了。

接下來我們看看execute()的核心方法: client.dispatcher().executed(this)、 Response result = getResponseWithInterceptorChain()、client.dispatcher().finished(this)

public Dispatcher dispatcher() {
    return dispatcher;
  }

  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

首先調用了Dispatcher的dispatcher()方法,dispatcher()方法返回一個Dispatcher對象,緊接著調用了Dispatcher的executed方法,往runningSyncCalls對象中添加了一個 call對象,runningSyncCalls是一個存放同步請求的隊列,Dispatcher類中維護了3種類型的請求隊列:

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  • readyAsyncCalls 是異步請求的就緒隊列
  • runningAsyncCalls 是異步請求的執(zhí)行隊列
  • runningSyncCalls 是同步請求的執(zhí)行隊列

調用完Dispatcher的execute()方法后,緊接著調用了getResponseWithInterceptorChain()方法。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

該方法返回一個Response對象,我們可以看到,在方法的第一行中就創(chuàng)建了interceptors 集合,然后緊接著放進去很多攔截器對象,此處使用了責任鏈設計模式,依次在攔截器中做相應的操作。

在Response的excute方法的finally模塊中,最后調用了 client.dispatcher().finished(this)。我們點進去瞧一瞧

void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

finished方法內部調用了它的重載的方法,并把同步請求的消息隊列對象和RealCall對象傳過去,我們繼續(xù)往下看。

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

在同步代碼快中,將call對象從同步請求消息隊列中移除,如果移除出問題,就會拋出異常。 緊接著調用了promoteAndExecute方法:

 private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

同步代碼塊有一個for循環(huán),去迭代readyAsyncCalls,也就是待準備消息隊列。但是我們從前面一步步分析過來,并沒有往readyAsyncCalls添加過數(shù)據(jù),所以當前的for循環(huán)并不會執(zhí)行,之后的一個for循環(huán)也不會執(zhí)行,isRunning返回false.

promoteAndExecute()方法返回false。

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

并且idleCallback 已經(jīng)完成了初始化,所以會執(zhí)行 idleCallback的run()方法

總結:通過以上的分析,我們發(fā)現(xiàn)在同步請求中Dispatcher主要負責了兩件事,同步請求的保存和移除。
save.png

remove.png

至此,okhttp3的同步請求源碼已經(jīng)分析完了,接下來,我們看看okhttp3的異步請求源碼分析。

異步請求

 //  構建okHttpClient,相當于請求的客戶端,Builder設計模式
 OkHttpClient okHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
        // 構建一個請求體,同樣也是Builder設計模式
        Request request = new Request.Builder().url("http://www.baidu.com").build();
        //  生成一個Call對象,該對象是接口類型,后面會說
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {
                        
                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {

                    }
                });

簡單的看下異步請求的幾個步驟:

1.通過Builder模式創(chuàng)建OkHttpClient對象和Request對象
2.調用OkHttpClient的newCall方法,獲取一個Call對象,參數(shù)是Request
3.調用call對象的enqueue()方法

步驟1和步驟2跟同步請求的步驟一致,主要看一下步驟3

異步請求源碼:call.enqueue() 源碼分析

異步請求和同步請求的步驟1和步驟2一致,都是在做準備工作,并沒有發(fā)起請求,所以這次我們直接忽略了步驟1和步驟2的分析,直接分析步驟3的源碼,我們點開RealCall的enqueue方法:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

我們看看它的核心代碼: client.dispatcher().enqueue(new AsyncCall(responseCallback)),
client.dispatcher()返回一個Dispatcher對象沒什么可講的,緊接著調用Dispatcher的enqueue方法,參數(shù)是AsyncCall對象,我們先看看AsyncCall是什么?

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    AtomicInteger callsPerHost() {
      return callsPerHost;
    }
  
  }

只截取了部分代碼,該類繼承自NamedRunnable,我們看看NamedRunnable:

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

NamedRunnable 實現(xiàn)了Runnable 接口。

我們直接去Dispatcher的enqueue()看看做了哪些操作。

void enqueue(AsyncCall call) {
   synchronized (this) {
     readyAsyncCalls.add(call);

     // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
     // the same host.
     if (!call.get().forWebSocket) {
       AsyncCall existingCall = findExistingCallWithHost(call.host());
       if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
     }
   }
   promoteAndExecute();
 }

在同步代碼塊中,將當前的call請求添加到待準備消息隊列中去,注意這里跟同步請求的區(qū)別,同步請求的時候,并沒有把當前的call添加到準備消息隊列中去。然后又調用了 promoteAndExecute()方法,同步請求的時候也調用了promoteAndExecute()方法

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

此時,readyAsyncCalls不為空了,我們單獨的把這個for循環(huán)拎出來講:

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }

這段代碼的含義是:把符合條件的call請求從readyAsyncCalls提升為runningAsyncCalls,我們看這段代碼中的兩個if語句,第一個if語句判斷當前異步請求的執(zhí)行隊列長度大于等于請求最大值,如果滿足直接跳出for循環(huán),maxRequests的值為64,第二個if語句判斷當前執(zhí)行的異步請求隊列中相同主機的請求數(shù)是否大于等于maxRequestsPerHost(每個主機最大請求數(shù),默認為5),如果這兩個條件都不滿足的情況下,把從readyAsyncCalls取出來的call請求,存到臨時的
executableCalls 隊列中去。

緊接著去遍歷executableCalls:

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

從executableCalls獲取AsyncCall對象,并且調用它的executeOn方法,executeOn()方法參數(shù)是executorService(),我們看看executorService():

 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看出,該方法是一個同步方法,返回的是一個線程池對象,ThreadPoolExecutor()的第二個參數(shù)傳入了Integer的最大值,即線程池所能容納的最大線程數(shù)為Integer.MAX_VALUE,雖然這里設置了很大的值,但是實際情況下并非會達到最大值,因為上面enqueue()方法中有做了判斷。

回到 asyncCall.executeOn(executorService())這里,executorService返回了一個線程池對象,緊接著調用線程池對象的execute方法,execute()方法傳入實現(xiàn)了Runable接口的AsyncCall對象,前面在分析同步請求的時候,說了AsyncCall實現(xiàn)了Runable接口

實現(xiàn)Runnable.png

ok,現(xiàn)在我們要看看線程池做了什么操作,直接去NamedRunnable的run方法看看做了什么操作。

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

execute()是一個抽象方法,所以我們直接去NamedRunnable的實現(xiàn)類AsyncCall的execute()方法看:

@Override protected void execute() {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

這段代碼才是真正執(zhí)行異步請求的邏輯,getResponseWithInterceptorChain()返回Response對象,然后判斷retryAndFollowUpInterceptor是否取消回調CallBack接口的onFailure()或onResponse()方法,最后finally中,和同步請求的處理一樣,調用了Dispatcher對象的finished()方法。

void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

看完promoteAndExecute()方法的具體操作,我們發(fā)現(xiàn),調用eneque的時候會把call請求添加到readyAsyncCalls(異步請求準備隊列)中,而readyAsyncCalls隊列中的請求什么時候執(zhí)行呢,看完promoteAndExecute的代碼就恍然大悟了。

總結:異步請求中,我們先去遍歷異步請求的就緒隊列,并判斷異步請求的執(zhí)行隊列的隊列大小是否小于設置的最大數(shù)的時候,如果條件滿足,把該請求添加到異步請求的執(zhí)行隊列中去,同時把該請求添加到臨時的異步請求的執(zhí)行隊列去中。之后,遍歷這個臨時的異步請求的執(zhí)行隊列,去執(zhí)行AsyncCall的execute()方法。

0a5f3743d5bd285e679ea2eae5dcde5.png

Android的技術討論Q群:947460837

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容