okhttp源碼分析(-)

基本使用

GET請求

private fun doGet() {
    val client = OkHttpClient()
    val request = Request.Builder()
            .url("http://square.github.io/okhttp/")
            .build()
    val call = client.newCall(request)

    //todo 1. 同步請求
//        val response = call.execute()
    //獲取響應結果
    //String
//        val string = response.body()?.string()
    //byte
//        val bytes = response.body()?.bytes()
    //IO
//        val inputStream = response.body()?.byteStream()

    //todo 2. 異步請求
    call.enqueue(object : Callback {
        override fun onFailure(call: Call?, e: IOException?) {
            Log.i(TAG, "response onFailure $call: ${e?.message}")
        }

        override fun onResponse(call: Call?, response: Response?) {
            Log.i(TAG, "response onResponse $call: ${response?.body()?.string()}")
        }
    })
}

POST 請求

private fun doPost() {
    val client = OkHttpClient()
    //表單形式
    val requestBody = FormBody.Builder()
            .add("code", "209")
            .addEncoded(URLEncoder.encode("姓名"), URLEncoder.encode("廖少"))
            .build()
    //JSON形式
//        val requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"),
//                "{\"code\":\"209\",\"name\":\"廖少\"}")

    //文件形式
//        val requestBody = RequestBody.create(MediaType.parse("application/octet-stream"),
//                File("xx/xxx.png"))

    val request = Request.Builder()
            .url("http://square.github.io/okhttp/")
            .post(requestBody)
            .build()
    val call = client.newCall(request)

    //todo 1. 同步請求
//        val response = call.execute()
    //獲取響應結果
    //String
//        val string = response.body()?.string()
    //byte
//        val bytes = response.body()?.bytes()
    //IO
//        val inputStream = response.body()?.byteStream()

    //todo 2. 異步請求
    call.enqueue(object : Callback {
        override fun onFailure(call: Call?, e: IOException?) {
            Log.i(TAG, "response onFailure $call: ${e?.message}")
        }

        override fun onResponse(call: Call?, response: Response?) {
            Log.i(TAG, "response onResponse $call: ${response?.body()?.string()}")
        }
    })
}

內部請求分析

  1. val client = OkHttpClient()
  • 通過OkHttpClient的Builder的默認構造方法來初始化網絡所需的各種成員:
public OkHttpClient() {
    this(new Builder());
}

......

public Builder() {
    //線程池,線程調度器
    dispatcher = new Dispatcher();
    //協(xié)議,默認http/1.1與HTTP/2
    protocols = DEFAULT_PROTOCOLS;
    //連接規(guī)則
    connectionSpecs = DEFAULT_CONNECTION_SPECS;
    //發(fā)起dns請求等一系列監(jiān)聽
    eventListenerFactory = EventListener.factory(EventListener.NONE);
    //代理選擇器
    proxySelector = ProxySelector.getDefault();
    //Cookie緩存相關
    cookieJar = CookieJar.NO_COOKIES;
    //創(chuàng)建socket工廠,單例模式
    socketFactory = SocketFactory.getDefault();
    //主機校驗
    hostnameVerifier = OkHostnameVerifier.INSTANCE;
    //SSL證書相關
    certificatePinner = CertificatePinner.DEFAULT;
    //安全認證相關
    proxyAuthenticator = Authenticator.NONE;
    authenticator = Authenticator.NONE;
    //連接池,默認最大空閑鏈接5個,連接有限5分鐘
    connectionPool = new ConnectionPool();
    dns = Dns.SYSTEM;
    //是否允許SSL重定向
    followSslRedirects = true;
    followRedirects = true;
    //失敗是否重試
    retryOnConnectionFailure = true;
    //連接超時時長
    connectTimeout = 10_000;
    //讀寫超時時長
    readTimeout = 10_000;
    writeTimeout = 10_000;
    pingInterval = 0;
}
  • 構造函數(shù)傳入Builder賦值OkHttpClient成員變量:
Builder(OkHttpClient okHttpClient) {
    this.dispatcher = okHttpClient.dispatcher;
    this.proxy = okHttpClient.proxy;
    this.protocols = okHttpClient.protocols;
    this.connectionSpecs = okHttpClient.connectionSpecs;
    this.interceptors.addAll(okHttpClient.interceptors);
    this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
    this.eventListenerFactory = okHttpClient.eventListenerFactory;
    this.proxySelector = okHttpClient.proxySelector;
    this.cookieJar = okHttpClient.cookieJar;
    this.internalCache = okHttpClient.internalCache;
    this.cache = okHttpClient.cache;
    this.socketFactory = okHttpClient.socketFactory;
    this.sslSocketFactory = okHttpClient.sslSocketFactory;
    this.certificateChainCleaner = okHttpClient.certificateChainCleaner;
    this.hostnameVerifier = okHttpClient.hostnameVerifier;
    this.certificatePinner = okHttpClient.certificatePinner;
    this.proxyAuthenticator = okHttpClient.proxyAuthenticator;
    this.authenticator = okHttpClient.authenticator;
    this.connectionPool = okHttpClient.connectionPool;
    this.dns = okHttpClient.dns;
    this.followSslRedirects = okHttpClient.followSslRedirects;
    this.followRedirects = okHttpClient.followRedirects;
    this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure;
    this.connectTimeout = okHttpClient.connectTimeout;
    this.readTimeout = okHttpClient.readTimeout;
    this.writeTimeout = okHttpClient.writeTimeout;
    this.pingInterval = okHttpClient.pingInterval;
}
  1. 創(chuàng)建Request對象
  • 通過Request的Builder默認構造方法,設置請求方式為GET與初始化請求頭
public Builder() {
  this.method = "GET";
  this.headers = new Headers.Builder();
}
  • 通過Builder的url()方式設置請求鏈接,最后調用build()方法返回request對象
public final class Request {
    final HttpUrl url;
    final String method;
    final Headers headers;
    final @Nullable RequestBody body;
    final Map<Class<?>, Object> tags;
    
    private volatile CacheControl cacheControl; // Lazily initialized.
  
    ......
  
    public Builder url(String url) {
      if (url == null) throw new NullPointerException("url == null");
    
      // Silently replace web socket URLs with HTTP URLs.
      if (url.regionMatches(true, 0, "ws:", 0, 3)) {
        url = "http:" + url.substring(3);
      } else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
        url = "https:" + url.substring(4);
      }
    
      return url(HttpUrl.get(url));
    }
    ......
    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }
    
    ......
    Request(Builder builder) {
        this.url = builder.url;
        this.method = builder.method;
        this.headers = builder.headers.build();
        this.body = builder.body;
        this.tags = Util.immutableMap(builder.tags);
    }
    ......
}
  1. 將Request請求封裝:val call = client.newCall(request)
  • 通過OkhttpClient的newCall()方法,構建一個RealCall對象
@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
  • 創(chuàng)建RealCall對象的同時還會創(chuàng)建一個RetryAndFollowUpInterceptor攔截器
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
  1. 執(zhí)行同步請求:val response = call.execute()
  • 調用RealCall的execute()方法,該方法先檢查該RealCall對象是否已經執(zhí)行過該方法了,重復執(zhí)行會拋出異常。
@Override public Response execute() throws IOException {
    synchronized (this) {
    //檢查是否已經執(zhí)行過該請求了
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
    //提交網絡請求
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
}
  • 獲取Dispatcher,執(zhí)行executed()方法
/**正在執(zhí)行的同步請求隊列,包括取消的請求和還沒有完成的請求*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
......  
//添加在的同步請求隊列對尾
synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}
  • 執(zhí)行RellCall的getResponseWithInterceptorChain()方法
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //初始化OkhttpClient的攔截器
    interceptors.addAll(client.interceptors());
    //重試和重定向的攔截器,最多20次
    interceptors.add(retryAndFollowUpInterceptor);
    //橋接轉換攔截器:負責請求構建與響應
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //緩存攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //鏈接攔截器:負責socket的IO操作,這里使用了Okio提供的封裝
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        //初始化OkhttpClient的網絡攔截器
        interceptors.addAll(client.networkInterceptors());
    }
    //服務器回調攔截器:向服務器發(fā)送請求,將請求header和body寫入socket中,然后讀取響應header和body,返回最后需要的響應數(shù)據(jù).
    interceptors.add(new CallServerInterceptor(forWebSocket));
    
    //創(chuàng)建RealInterceptorChain對象
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    
    return chain.proceed(originalRequest);
}

RealInterceptorChain構造方法

/**
*@param index 用來標記執(zhí)行到了Interceptors集合中哪一個攔截器的intercept()方法。
*/
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
  HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
  EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.connection = connection;
    this.streamAllocation = streamAllocation;
    this.httpCodec = httpCodec;
    this.index = index;
    this.request = request;
    this.call = call;
    this.eventListener = eventListener;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
}
  • 執(zhí)行RealInterceptorChain的proceed()方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.執(zhí)行下一個攔截器
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
}

重點看下這三句代碼:

RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
    connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
    writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

上面在proceed()方法里面有創(chuàng)建了新的RealInterceptorChain對象,且將index=index+1,而且RealInterceptorChain作為參數(shù)傳給當前執(zhí)行的第index個攔截器,而攔截器執(zhí)行了intercept()方法,在這個方法里面有執(zhí)行了RealInterceptorChain的proceed()方法,如此遞歸直到執(zhí)行完所有的攔截器。

  • 下面重點來看下連接攔截器ConnectInterceptor
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

具體的封裝細節(jié),后面單獨留一個文章再敘述,現(xiàn)在先按流程走。最后執(zhí)行CallServerInterceptor這個攔截器,通過將請求headers與body寫入socket,向服務端發(fā)起請求,然后讀取返回的header與body。

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    //向socket寫入header
    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
      //向socket寫入body
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    //讀取服務端返回的header與body
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
  • 執(zhí)行結束后返回Response對象,不管執(zhí)行成功或者失敗都會回調到 client.dispatcher().finished(this);
void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
        //從runningSyncCalls隊列中移除當前請求的RealCall對象
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //這里是異步的時候執(zhí)行
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
}
  1. 同步請求步驟總結
  • 通過Builder創(chuàng)建OkHttpClient與Request對象
  • 調用OkHttpClient的newCall方法創(chuàng)建RealCall對象,同時在RealCall構造方法里面創(chuàng)建了RetryAndFollowUpInterceptor攔截器
  • 調用RealCall的execute方法,在方法里面先判斷該請求是否已經執(zhí)行了,如果已經執(zhí)行了則拋出IllegalStateException("Already Executed")異常
  • RealCall的execute方法里面調用了client.dispatcher()獲取Dispatcher對象,在調用Dispatcher的executed方法,向正在運行的同步請求隊列runningSyncCalls中添加
  • 調用RealCall的getResponseWithInterceptorChain方法,執(zhí)行請求的攔截器鏈,結束后返回Response
  • 最后調用Dispatcher的finished方法,使該請求從runningSyncCalls隊列中移除,請求結束,所以在整個同步請求過程中,Dispatcher扮演的角色主要是同步隊列的添加與移除

異步請求分析

關于異步請求,其實前面創(chuàng)建OkhttpClient,Request,RealCall創(chuàng)建都是一樣的,區(qū)別在于在RealCall里面執(zhí)行的方法不一樣,下面我們重點來看下RealCall的enqueue方法:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
  1. 同樣也是先去判斷是否已經執(zhí)行了,然后調用Dispatcher對象的enqueue方法
  • 在看enqueue方法之前,我們先來看下AsyncCall是什么東西?
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    ......
  }
  
  public abstract class NamedRunnable implements Runnable {
    ......
  }

從上面我們可以看出AsyncCall其實就是Runnable,其實想想也是,畢竟后面是需要加入線程池中執(zhí)行的。

  • 然后我們回來看enqueue方法
  synchronized void enqueue(AsyncCall call) {
  //判斷當前運行的異步線程任務執(zhí)行條件
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

方法里面首先判斷當前運行的異步隊列大小是否超過了maxRequests(源碼里面定義64)最大請求以及同一個域名下的請求是否超過了maxRequestsPerHost(固定值5)最大請求,如果都滿足了這兩個條件,則將請求任務加入runningAsyncCalls隊列中,然后調用線程池執(zhí)行該請求;如果不滿足這兩個條件,則將任務加入到準備執(zhí)行異步請求隊列readyAsyncCalls中等待調度執(zhí)行。

這里為什么要加入這兩個條件呢?要弄明白這個我們先來看下Dispatcher中創(chuàng)建線程池的代碼:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看到它是單列模式創(chuàng)建的,而且corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,也就是說該線程池對于任務請求大小沒有任何限制,可以無限添加,另外他們的空閑存活時長只有60秒,這樣是為了最大提升OkHttp的請求吞吐且60s能及時清理掉線程池占用的內存,而這樣就會導致一個問題,當大量請求過來的時候,就會無限的創(chuàng)建線程執(zhí)行,這顯然是不行了,會導致內存占用不斷擴大,所以就在執(zhí)行的時候添加了上面兩個條件的限制,最大請求不超過64個,同一個域名下請求不超過5個,這個就很好的解決了并發(fā)的問題。

  1. 執(zhí)行AsyncCall的execute()方法
    AsyncCall這個是繼承自Runnable的,也就是說會執(zhí)行run方法,到時AsyncCall沒有run方法,而是execute方法,這是因為在NamedRunnable類中run方法調用了execute方法,所以最終是執(zhí)行了execute方法:
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

代碼上可以看出,同樣是調用了getResponseWithInterceptorChain()方法,即執(zhí)行請求攔截器鏈,返回Response,另外根據(jù)判斷調用相應的回調方法。

  1. 調用Dispatcher的finished方法,從正在異步隊列runningAsyncCalls中移除當前請求RealCall對象
  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

另外異步的時候調用的時候promoteCalls=true的,進而執(zhí)行了promoteCalls()方法,下面我們看下這個方法:

  private void promoteCalls() {
  //條件判斷
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
      //輪詢從等待隊列中獲取且移除,另外將請求Call加入到正在運行隊列runningAsyncCalls中,加入線程池執(zhí)行
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

從這里可以看出是將任務取出執(zhí)行,直到所有的任務結束,這即便是Dispatcher的任務調度功能。

  1. 異步請求步驟
  • 通過Builder創(chuàng)建OkHttpClient與Request對象
  • 調用OkHttpClient的newCall方法創(chuàng)建RealCall對象,同時在RealCall構造方法里面創(chuàng)建了RetryAndFollowUpInterceptor攔截器
  • 調用RealCall的enqueue方法,在方法里面先判斷該請求是否已經執(zhí)行了,如果已經執(zhí)行了則拋出IllegalStateException("Already Executed")異常
  • RealCall的enqueue方法里面調用了client.dispatcher()獲取Dispatcher對象,在調用Dispatcher的enqueue方法,且傳入AsyncCall對象
  • 在enqueue方法里面判斷正在請求的任務大小是否滿足條件,滿足條件則加入runningAsyncCalls隊列,加入線程池執(zhí)行,否則加入readyAsyncCalls隊列中,等待Dispatcher的調度
  • 隨后執(zhí)行AsyncCall的execute方法,在execute方法里面調用getResponseWithInterceptorChain方法,執(zhí)行請求的攔截器鏈,結束后返回Response
  • 最后調用Dispatcher的finished方法,使該請求從runningSyncCalls隊列中移除,且通過輪詢獲取下一個任務,添加至runningAsyncCalls隊列中且執(zhí)行該請求,所以在整個異步請求過程中,Dispatcher扮演的角色主要是同步隊列的添加與移除,以及對隊列任務進行調度
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • OkHttp源碼分析 在現(xiàn)在的Android開發(fā)中,請求網絡獲取數(shù)據(jù)基本上成了我們的標配。在早期的Android開...
    BlackFlag閱讀 418評論 0 5
  • 那么我今天給大家簡單地講一下Okhttp這款網絡框架及其原理。它是如何請求數(shù)據(jù),如何響應數(shù)據(jù)的 有什么優(yōu)點?它的應...
    卓而不群_0137閱讀 348評論 0 1
  • 主目錄見:Android高級進階知識(這是總目錄索引)?OkHttp的知識點實在是不少,優(yōu)秀的思想也有很多,這里只...
    ZJ_Rocky閱讀 2,401評論 2 6
  • OkHttp源碼分析-同步篇 很早就想拿okhttp開刀了,這次就記一次使用OKhttp的網絡請求。首先需要說明的...
    埃賽爾閱讀 1,062評論 1 2
  • 1、難過篇 “半夜的時候突然醒了,眼淚嘩啦啦的掉?!?“晚上準時睡覺,早上準時起床,對著鏡子哭一會,再若無其事的去...
    蝸牛一只閱讀 676評論 1 3

友情鏈接更多精彩內容