OkHttp使用分析—WebSocket篇

OkHttp使用分析—WebSocket篇

我們先看一下怎么使用OKhtttp完成WebSocket的請求:

  //設(shè)置連接超時(shí)時(shí)間
        mOkHttpClient = new OkHttpClient.Builder().connectTimeout(9 * 10, TimeUnit.SECONDS).build();
        Request request = new Request.Builder().url(BASE_URL).build();
        mWebSocket = mOkHttpClient.newWebSocket(request, this);

重點(diǎn)在這里,打開OkHttpClient.class查找newWebSocket()方法:

  /**
   * Uses {@code request} to connect a new web socket.
   */
  @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
    webSocket.connect(this);
    return webSocket;
  }

這里傳入request對象和websocket的專用監(jiān)聽WebSocketListener,WebSocketListener 對象稍后再做贅述,主流程還是看RealWebSocket.class的connect()方法:
步驟1:

 client = client.newBuilder()
        .protocols(ONLY_HTTP1)
        .build();

我們都知道普通的請求時(shí)client是需要被bulid的,這里拿到OkHttpClient又重新創(chuàng)建了一遍,一開始就創(chuàng)建好了干嘛還要?jiǎng)?chuàng)建創(chuàng)建呢?看這個(gè)方法:protocols(ONLY_HTTP1),

 private static final List<Protocol> ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1);

步驟2:

 final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();

對request對象的頭部加工,

步驟3:

 call = Internal.instance.newWebSocketCall(client, request);

從OkHttpClient中 獲取WebSocket的call對象(回調(diào)使用),這個(gè)Internal.instance雖然是接口方法,其實(shí)現(xiàn)是在OkHttpClient中,直接看對應(yīng)方法:

 @Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) {
        return new RealCall(client, originalRequest, true);
      }

步驟4:搜嘎 原來enqueue()方法是使用RealCall.class的enqueue()方法,這是一個(gè)入隊(duì)的方法,而且是個(gè)異步的方法。這就說明webSocket建立連接后才響應(yīng)回調(diào)。而且如果是長連接那么這個(gè)線程就一直在線程池里不會(huì)被釋放掉。

call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

照現(xiàn)在的進(jìn)度已經(jīng)到了設(shè)置好的回調(diào)要開始執(zhí)行了,那就轉(zhuǎn)戰(zhàn)RealCall

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

其實(shí)我對okhttp同步請求有幾點(diǎn)疑惑:
1一開始我沒有創(chuàng)建線程,那么這個(gè)請求就是在主線程中嗎?
2如果是同步請求那么如果同時(shí)多次請求是不是如果前面的請求在執(zhí)行后面的請求在進(jìn)入等待的狀態(tài)了呢?
其實(shí)這些問題就需要從dispatcher()的線程池入手了。

這個(gè)dispatcher在一開始介紹ok的時(shí)候已經(jīng)介紹過了,我們來看dispatcher中的enqueue()方法:
嘿嘿嘿,又到了OkHttp請求里了 而且 這時(shí)候realCall內(nèi)部創(chuàng)建了AsyncCall(異步的Call),其實(shí)看方法名就應(yīng)該知道的,ok的webSocket都是使用異步的,而且我們要明白現(xiàn)在只是一個(gè)最初的socket,之后的通信,都會(huì)在該線程池的一個(gè)線程中進(jìn)行。

問題1:ok的websocket是異步的,并不會(huì)阻塞主線程,而且也不需要單獨(dú)開辟一個(gè)子線程來創(chuàng)建連接。
問題2:會(huì)不會(huì)阻塞首先我們再次看看這個(gè)executorService的線程池結(jié)構(gòu)。雖然在同步篇對dispatcher的線程池做過介紹,但是在我看來還是很解釋不夠清晰的地方:
首先 這個(gè)是dispatcher線程池的結(jié)構(gòu)

executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));

我在這里做一個(gè)詳細(xì)的說明:首先,SynchronousQueue是一個(gè)無緩存的阻塞的隊(duì)列,什么意思呢?我們可以理解為當(dāng)這個(gè)隊(duì)列中有元素的時(shí)候,這個(gè)元素沒有被取走(take方法)之前是不允許繼續(xù)對之后的內(nèi)容進(jìn)行操作。

注意1:它一種阻塞隊(duì)列,其中每個(gè) put 必須等待一個(gè) take,反之亦然。同步隊(duì)列沒有任何內(nèi)部容量,甚至連一個(gè)隊(duì)列的容量都沒有。
注意2:它是線程安全的,是阻塞的。
注意3:不允許使用 null 元素。
注意4:公平排序策略是指調(diào)用put的線程之間,或take的線程之間。公平排序策略可以查考ArrayBlockingQueue中的公平策略。
所以這又解決了一個(gè)困擾我多年的難題:
okhttp的能同時(shí)執(zhí)行多少個(gè)請求?
這個(gè)線程池的配置其實(shí)就是Executors提供的線程池配置方案之一,構(gòu)造一個(gè)緩沖功能的線程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個(gè)無容量的阻塞隊(duì)列 SynchronousQueue,因此任務(wù)提交之后,將會(huì)創(chuàng)建新的線程執(zhí)行;線程空閑超過60s將會(huì)銷毀:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


用一個(gè)形象的比喻就是一個(gè)傳球手,當(dāng)從主線程傳進(jìn)了任務(wù),就創(chuàng)建一個(gè)runnable來接收。


ThreadPoolExecutor.jpg

這里是Dispatcher的異步啟動(dòng)方法:

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

在這里專門用runningAsyncCalls來記錄在執(zhí)行的Call,每次執(zhí)行都會(huì)記錄,當(dāng)向executor添加call的時(shí)候,根據(jù)2,將任務(wù)放入SynchronousQueue中等待前面的request被取出才能執(zhí)行之后的request,這里maxRequests 被定為64.超出64的將會(huì)被放入readyAsyncCalls。
ready和running之間怎么傳遞呢?
這就需要我們對比分析下RealCall這個(gè)類:
同步的時(shí)候是調(diào)用RealCall的:@Override public Response execute() throws IOException
異步的時(shí)候是調(diào)用AsyncCall的:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

事件的回調(diào)已經(jīng)具備了,回收需要看這里.finished(this)方法,最終會(huì)調(diào)用這個(gè):

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

那么問題又來了,
請對比分析Ok與Volley的優(yōu)缺點(diǎn)。

websocket篇:

此前我先聲明一點(diǎn),一個(gè)websocket鏈接的建立是在一個(gè)子線程當(dāng)中,如果鏈接不關(guān)閉這個(gè)子線程一直存在,
在鏈接前 我們創(chuàng)建了一個(gè)RealWebSocket.class我們進(jìn)它的構(gòu)造里看看也許有個(gè)驚喜:

public RealWebSocket(Request request, WebSocketListener listener, Random random) {
//省略部分代碼  
this.writerRunnable = new Runnable() {
  @Override public void run() {
try {
  while (writeOneFrame()) {
  }
} catch (IOException e) {
  failWebSocket(e, null);
}
  }
};
  }

在這里創(chuàng)建了一個(gè)寫的線程,writerRunnable
再看connect()方法:這次只需要看call的回調(diào)就可以。根據(jù)現(xiàn)在的流程,鏈接成功,走了成功的回調(diào),Call的onResponse方法:

 try {
  listener.onOpen(RealWebSocket.this, response);
  String name = "OkHttp WebSocket " + request.url().redact();
  initReaderAndWriter(name, pingIntervalMillis, streams);
  streamAllocation.connection().socket().setSoTimeout(0);
  loopReader();
} catch (Exception e) {
  failWebSocket(e, null);
}
  }

核心代碼在這里:
1.initReaderAndWriter()初始化讀寫者。這是為同服務(wù)器交互進(jìn)行準(zhǔn)備?

 this.writer = new WebSocketWriter(streams.client, streams.sink, random);
 this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));

準(zhǔn)備了Writer,準(zhǔn)備了定時(shí)任務(wù)(心跳鏈接ping——pong)
runWriter();方法都做了什么呢?
private void runWriter() {
assert (Thread.holdsLock(this));

if (executor != null) {
  executor.execute(writerRunnable);
}
  }

哈哈 原來是為心跳鏈接做準(zhǔn)備啊,定時(shí)進(jìn)行通知服務(wù)器 我還在哈。

2.loopReader()開始輪訓(xùn)讀取消息(隨時(shí)準(zhǔn)備接受來自服務(wù)器的消息)

 public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
  // This method call results in one or more onRead* methods being called on this thread.
  reader.processNextFrame();
}
  }

這不,一直循環(huán)調(diào)用reader.processNextFrame();

 /**
   * Process the next protocol frame.
   *
   * <ul>
   * <li>If it is a control frame this will result in a single call to {@link FrameCallback}.
   * <li>If it is a message frame this will result in a single call to {@link
   * FrameCallback#onReadMessage}. If the message spans multiple frames, each interleaved
   * control frame will result in a corresponding call to {@link FrameCallback}.
   * </ul>
   */
  void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
  readControlFrame();
} else {
  readMessageFrame();
}
  }

沒辦法 注釋寫的太好了,我忍不住都粘貼了進(jìn)來:
1如果是控制幀將會(huì)有一個(gè)單一的callback:FrameCallback
2如果是消息幀也會(huì)有一個(gè)單一的callback:FrameCallback#onReadMessage

看到這里websocket基本上已經(jīng)完了,剩下的就是調(diào)用監(jiān)聽了。
~~~~~~~~~~~~~~ 補(bǔ)充部分 ~~~~~~~~~~~~~~~

感謝網(wǎng)友朋友細(xì)心指導(dǎo),因?yàn)閷戇@篇文章比較早(細(xì)節(jié)忘了很多,尷尬)還原問題:
“框架會(huì)自動(dòng)發(fā)送ping包嗎? 怎么設(shè)置發(fā)送間隔時(shí)間呢?”

真的會(huì),而且在而且OkHttpClient也支持設(shè)置心跳間隔:

 // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);

還對 ping pong的次數(shù)進(jìn)行了記錄:至于怎么發(fā)送ping 需要看這個(gè):

  initReaderAndWriter(name, pingIntervalMillis, streams);

沒錯(cuò) 又追蹤到了初始化讀寫者,在初始化讀寫者的時(shí)候有這樣一句(多看一句就能回答 讀者的問題了 甚是慚愧):

      if (pingIntervalMillis != 0) {
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }

由此可見:
1 如果pingIntervalMillis 設(shè)置為0的時(shí)候 心跳executor是不會(huì)執(zhí)行的。
2 executor 原來也負(fù)責(zé)心跳包的定時(shí)任務(wù)

讓我們看看 pingrunnable里都做了什么吧:

  private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    synchronized (this) {
      if (failed) return;
      writer = this.writer;
    }

    try {
      writer.writePing(ByteString.EMPTY);
    } catch (IOException e) {
      failWebSocket(e, null);
    }
  }

果然簡單實(shí)用:
一個(gè)runnable 調(diào)用writer的writePing方法。想一想還是很合理啊,畢竟發(fā)送消息就是需要 writer來做,所以 writer有這些方法也不足為其。具體writer怎么寫 我們看下:

 /** Send a ping with the supplied {@code payload}. */
  void writePing(ByteString payload) throws IOException {
    synchronized (this) {
      writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload);
    }
  }

  /** Send a pong with the supplied {@code payload}. */
  void writePong(ByteString payload) throws IOException {
    synchronized (this) {
      writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload);
    }
  }

順便一瞅 就在下邊有個(gè)pong的發(fā)送方法,分析一下:
1 入?yún)ayload 是ByteString.EMPTY 就是一個(gè)空的字節(jié),
2 最終都是相同的方法writeControlFrameSynchronized,
3 對于消息的區(qū)分:依靠writeControlFrameSynchronized的第一個(gè)入?yún)pcode,
4 writeControlFrameSynchronized這個(gè)方法雖然沒有注釋 但是 即然寫消息都需要調(diào)用這個(gè)方法,相比這個(gè)方法才是writer的實(shí)力擔(dān)當(dāng):

  private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException {
    assert Thread.holdsLock(this);

    if (writerClosed) throw new IOException("closed");

    int length = payload.size();
    if (length > PAYLOAD_BYTE_MAX) {
      throw new IllegalArgumentException(
          "Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);
    }

    int b0 = B0_FLAG_FIN | opcode;
    sink.writeByte(b0);

    int b1 = length;
    if (isClient) {
      b1 |= B1_FLAG_MASK;
      sink.writeByte(b1);

      random.nextBytes(maskKey);
      sink.write(maskKey);

      byte[] bytes = payload.toByteArray();
      toggleMask(bytes, bytes.length, maskKey, 0);
      sink.write(bytes);
    } else {
      sink.writeByte(b1);
      sink.write(payload);
    }

    sink.flush();
  }

操作太6 ,表示職能看懂個(gè)大概 , 都被寫入這個(gè)sink中了!??!

問題來了:sink是什么東西?

 /** Writes must be guarded(被守護(hù)的) by synchronizing on 'this'. */
  final BufferedSink sink;

沒有交代,但是有這樣一個(gè)提醒,對sink寫的時(shí)候必須是被synchronizing保護(hù)的 這樣我算是明白為嘛ping和pong的方法都會(huì)加鎖了(他說咋做就咋做 嘻嘻 稍后看)。

我們先從單詞上理解這個(gè)變量的意義吧:sink,水槽,洗滌池,什么鬼?看不懂。。。我還是看BufferedSink吧:

  • A sink that keeps a buffer internally so that callers can do small writes
  • 在內(nèi)部保留緩沖區(qū)的接收器,以便調(diào)用方可以執(zhí)行小的寫入操作。
  • without a performance penalty.

都說了是個(gè)小型的緩沖池,因此在寫的時(shí)候會(huì)對大小進(jìn)行限制:
static final long PAYLOAD_BYTE_MAX = 125L;

雖然是個(gè)接口但是已經(jīng)給了我們足夠多的有效信息,讓我們看看在創(chuàng)建的時(shí)候是怎么實(shí)現(xiàn)這個(gè)BufferedSink,回到最初writer創(chuàng)建的地方:

  this.writer = new WebSocketWriter(streams.client, streams.sink, random);

哦?在初始化的時(shí)候從Stream中獲取的。在向上找當(dāng)初的stream是怎么創(chuàng)建的:
當(dāng)鏈接成功后就會(huì) 返回一個(gè)Call:

   @Override public void onResponse(Call call, Response response) 

  // Promote the HTTP streams into web socket streams.
  // 促進(jìn) http流初始化這個(gè)socket流
  StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
   // Prevent connection pooling!
   // 防止連接共用
        streamAllocation.noNewStreams(); 
  //創(chuàng)建 Stream
   Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

看來一切的謎底都在 RealConnection的newWebSockerStreams里:

 public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) {
    return new RealWebSocket.Streams(true, source, sink) {
      @Override public void close() throws IOException {
        streamAllocation.streamFinished(true, streamAllocation.codec());
      }
    };
  }

呵呵,看到真相我有點(diǎn)想放棄, new RealWebSocket.Streams(true, source, sink) sink就是這樣被賦予的,讓我回想一下,RealConnection還是挺熟悉的,是在什么時(shí)候創(chuàng)建的呢?
今天先研究到這里我容我仔細(xì)研究一番。。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 參考資源 官網(wǎng) 國內(nèi)博客 GitHub官網(wǎng) 鑒于一些關(guān)于OKHttp3源碼的解析文檔過于碎片化,本文系統(tǒng)的,由淺入...
    風(fēng)骨依存閱讀 12,722評論 11 82
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,699評論 19 139
  • OkHttp源碼的samples的簡單使用的示例: public static void main(String....
    _warren閱讀 879評論 0 1
  • OkHttp解析系列 OkHttp解析(一)從用法看清原理OkHttp解析(二)網(wǎng)絡(luò)連接OkHttp解析(三)關(guān)于...
    Hohohong閱讀 21,126評論 4 58
  • OkHttp源碼分析-同步篇 很早就想拿okhttp開刀了,這次就記一次使用OKhttp的網(wǎng)絡(luò)請求。首先需要說明的...
    埃賽爾閱讀 1,062評論 1 2

友情鏈接更多精彩內(nèi)容