socket.io 原理分析

在項(xiàng)目中用到socket.io在WEB端做消息推送,遂花了點(diǎn)時(shí)間看了socket.io實(shí)現(xiàn),做個(gè)簡(jiǎn)單分析,如有錯(cuò)漏,歡迎指正。

1 概述

socket.io是一個(gè)基于WebSocket的CS的實(shí)時(shí)通信庫(kù),它底層基于engine.io。engine.io使用WebSocket和xhr-polling(或jsonp)封裝了一套自己的協(xié)議,在不支持WebSocket的低版本瀏覽器中(支持websocket的瀏覽器版本見這里)使用了長(zhǎng)輪詢(long polling)來代替。socket.io在engine.io的基礎(chǔ)上增加了namespace,room,自動(dòng)重連等特性。

本文接下來會(huì)先簡(jiǎn)單介紹websocket協(xié)議,然后在此基礎(chǔ)上講解下engine.io和socket.io協(xié)議以及源碼分析,后續(xù)再通過例子說明socket.io的工作流程。

2 WebSocket協(xié)議

我們知道,在HTTP 協(xié)議開發(fā)的時(shí)候,并不是為了雙向通信程序準(zhǔn)備的,起初的 web 應(yīng)用程序只需要 “請(qǐng)求-響應(yīng)” 就夠了。由于歷史原因,在創(chuàng)建擁有雙向通信機(jī)制的 web 應(yīng)用程序時(shí),就只能利用 HTTP 輪詢的方式,由此產(chǎn)生了 “短輪詢” 和 “長(zhǎng)輪詢”(注意區(qū)分短連接和長(zhǎng)連接)。

短輪詢通過客戶端定期輪詢來詢問服務(wù)端是否有新的信息產(chǎn)生,缺點(diǎn)也是顯而易見,輪詢間隔大了則信息不夠?qū)崟r(shí),輪詢間隔過小又會(huì)消耗過多的流量,增加服務(wù)器的負(fù)擔(dān)。長(zhǎng)輪詢是對(duì)短輪詢的優(yōu)化,需要服務(wù)端做相應(yīng)的修改來支持??蛻舳讼蚍?wù)端發(fā)送請(qǐng)求時(shí),如果此時(shí)服務(wù)端沒有新的信息產(chǎn)生,并不立刻返回,而是Hang住一段時(shí)間等有新的信息或者超時(shí)再返回,客戶端收到服務(wù)器的應(yīng)答后繼續(xù)輪詢??梢钥吹介L(zhǎng)輪詢比短輪詢可以減少大量無用的請(qǐng)求,并且客戶端接收取新消息也會(huì)實(shí)時(shí)不少。

雖然長(zhǎng)輪詢比短輪詢優(yōu)化了不少,但是每次請(qǐng)求還是都要帶上HTTP請(qǐng)求頭部,而且在長(zhǎng)輪詢的連接結(jié)束之后,服務(wù)器端積累的新消息要等到下次客戶端連接時(shí)才能傳遞。更好的方式是只用一個(gè)TCP連接來實(shí)現(xiàn)客戶端和服務(wù)端的雙向通信,WebSocket協(xié)議正是為此而生。WebSocket是基于TCP的一個(gè)獨(dú)立的協(xié)議,它與HTTP協(xié)議的唯一關(guān)系就是它的握手請(qǐng)求可以作為一個(gè)Upgrade request經(jīng)由HTTP服務(wù)器解析,且與HTTP使用一樣的端口。WebSocket默認(rèn)對(duì)普通請(qǐng)求使用80端口,協(xié)議為ws://,對(duì)TLS加密請(qǐng)求使用443端口,協(xié)議為wss://。

握手是通過一個(gè)HTTP Upgrade request開始的,一個(gè)請(qǐng)求和響應(yīng)頭部示例如下(去掉了無關(guān)的頭部)。WebSocket握手請(qǐng)求頭部與HTTP請(qǐng)求頭部是兼容的(見RFC2616)。

## Request Headers ##
Connection: Upgrade
Host: socket.io.demo.com
Origin: http://socket.io.demo.com
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: mupA9l2rXciZKoMNQ9LphA==
Sec-WebSocket-Version: 13
Upgrade: websocket

## Response Headers ##
101 Web Socket Protocol Handshake
Connection: upgrade
Sec-WebSocket-Accept: s4VAqh7eedG0a11ziQlwTzJUY3s=
Sec-WebSocket-Origin: http://socket.io.demo.com
Server: nginx/1.6.2
Upgrade: WebSocket
  • Upgrade 是HTTP/1.1中規(guī)定的用于轉(zhuǎn)換當(dāng)前連接的應(yīng)用層協(xié)議的頭部,表示客戶端希望用現(xiàn)有的連接轉(zhuǎn)換到新的應(yīng)用層協(xié)議WebSocket協(xié)議。

  • Origin 用于防止跨站攻擊,瀏覽器一般會(huì)使用這個(gè)來標(biāo)識(shí)原始域,對(duì)于非瀏覽器的客戶端應(yīng)用可以根據(jù)需要使用。

  • 請(qǐng)求頭中的 Sec-WebSocket-Version 是WebSocket版本號(hào),Sec-WebSocket-Key 是用于握手的密鑰。Sec-WebSocket-Extensions 和 Sec-WebSocket-Protocol 是可選項(xiàng),暫不討論。

  • 響應(yīng)頭中的 Sec-WebSocket-Accept 是將請(qǐng)求頭中的 Sec-WebSocket-Key 的值加上一個(gè)固定魔數(shù)258EAFA5-E914-47DA-95CA-C5AB0DC85B11經(jīng)SHA1+base64編碼后得到。計(jì)算過程的python代碼示例(uwsgi中的實(shí)現(xiàn)見 core/websockets.c的 uwsgi_websocket_handshake函數(shù)):

    magic_number = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    key = 'mupA9l2rXciZKoMNQ9LphA=='
    accept = base64.b64encode(hashlib.sha1(key + magic_number).digest())
    assert(accept == 's4VAqh7eedG0a11ziQlwTzJUY3s=')
    
  • 客戶端會(huì)檢查響應(yīng)頭中的status code 和 Sec-WebSocket-Accept 值是否是期待的值,如果發(fā)現(xiàn)Accept的值不正確或者狀態(tài)碼不是101,則不會(huì)建立WebSocket連接,也不會(huì)發(fā)送WebSocket數(shù)據(jù)幀。

WebSocket協(xié)議使用幀(Frame)收發(fā)數(shù)據(jù),幀格式如下?;?a target="_blank" rel="nofollow">安全考量,客戶端發(fā)送給服務(wù)端的幀必須通過4字節(jié)的掩碼(Masking-key)加密,服務(wù)端收到消息后,用掩碼對(duì)數(shù)據(jù)幀的Payload Data進(jìn)行異或運(yùn)算解碼得到數(shù)據(jù)(詳見uwsgi的 core/websockets.c 中的uwsgi_websockets_parse函數(shù)),如果服務(wù)端收到未經(jīng)掩碼加密的數(shù)據(jù)幀,則應(yīng)該馬上關(guān)閉該WebSocket。而服務(wù)端發(fā)給客戶端的數(shù)據(jù)則不需要掩碼加密,客戶端如果收到了服務(wù)端的掩碼加密的數(shù)據(jù),則也必須關(guān)閉它。

 0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-------+-+-------------+-------------------------------+
     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
     | |1|2|3|       |K|             |                               |
     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
     |     Extended payload length continued, if payload len == 127  |
     + - - - - - - - - - - - - - - - +-------------------------------+
     |                               |Masking-key, if MASK set to 1  |
     +-------------------------------+-------------------------------+
     | Masking-key (continued)       |          Payload Data         |
     +-------------------------------- - - - - - - - - - - - - - - - +
     :                     Payload Data continued ...                :
     +---------------------------------------------------------------+

幀分為控制幀和數(shù)據(jù)幀,控制幀不能分片,數(shù)據(jù)幀可以分片。主要字段說明如下:

  • FIN: 沒有分片的幀的FIN為1,分片幀的第一個(gè)分片的FIN為0,最后一個(gè)分片F(xiàn)IN為1。
  • opcode: 幀類型編號(hào),其中控制幀:0x8 (Close), 0x9 (Ping), and 0xA (Pong),數(shù)據(jù)幀主要有:0x1 (Text), 0x2 (Binary)。
  • MASK:客戶端發(fā)給服務(wù)端的幀MASK為1,Masking-key為加密掩碼。服務(wù)端發(fā)往客戶端的MASK為0,Masking-key為空。
  • Payload len和Payload Data分別是幀的數(shù)據(jù)長(zhǎng)度和數(shù)據(jù)內(nèi)容。

3 engine.io和socket.io

前面提到socket.io是基于engine.io的封裝,engine.io(協(xié)議版本3)有一套自己的協(xié)議,任何engine.io服務(wù)器都必須支持polling(包括jsonp和xhr)和websocket兩種傳輸方式。engine.io使用websocket時(shí)有一套自己的ping/pong機(jī)制,使用的是opcode為0x1(Text)類型的數(shù)據(jù)幀,不是websocket協(xié)議規(guī)定的ping/pong類型的幀,標(biāo)準(zhǔn)的 ping/pong 幀被uwsgi使用。

engine.io的數(shù)據(jù)編碼分為Packet和Payload,其中 Packet是數(shù)據(jù)包,有6種類型:

  • 0 open:從服務(wù)端發(fā)出,標(biāo)識(shí)一個(gè)新的傳輸方式已經(jīng)打開。
  • 1 close:請(qǐng)求關(guān)閉這條傳輸連接,但是它本身并不關(guān)閉這個(gè)連接。
  • 2 ping:客戶端周期性發(fā)送ping,服務(wù)端響應(yīng)pong。注意這個(gè)與uwsgi自帶的ping/pong不一樣,uwsgi里面發(fā)送ping,而瀏覽器返回pong。
  • 3 pong:服務(wù)端發(fā)送。
  • 4 message:實(shí)際發(fā)送的消息。
  • 5 upgrade:在轉(zhuǎn)換transport前,engine.io會(huì)發(fā)送探測(cè)包測(cè)試新的transport(如websocket)是否可用,如果OK,則客戶端會(huì)發(fā)送一個(gè)upgrade消息給服務(wù)端,服務(wù)端關(guān)閉老的transport然后切換到新的transport。
  • 6 noop:空操作數(shù)據(jù)包,客戶端收到noop消息會(huì)將之前等待暫停的輪詢暫停,用于在接收到一個(gè)新的websocket強(qiáng)制一個(gè)新的輪詢周期。

而Payload是指一系列綁定到一起的編碼后的Packet,它只用在poll中,websocket里面使用websocket幀里面的Payload字段來傳輸數(shù)據(jù)。如果客戶端不支持XHR2,則payload格式如下,其中l(wèi)ength是數(shù)據(jù)包Packet的長(zhǎng)度,而packet則是編碼后的數(shù)據(jù)包內(nèi)容(測(cè)試發(fā)現(xiàn)客戶端發(fā)送給服務(wù)端的poll請(qǐng)求中的payload用的這種字符編碼)。

<length1>:<packet1>[<length2>:<packet2>[...]]

若支持XHR2,則payload中內(nèi)容全部以字節(jié)編碼,其中第1位0表示字符串,1表示二進(jìn)制數(shù)據(jù),而后面接著的數(shù)字則是表示packet長(zhǎng)度,然后以\xff結(jié)尾。如果一個(gè)長(zhǎng)度為109的字符類型的數(shù)據(jù)包,則前面長(zhǎng)度編碼是 \x00\x01\x00\x09\xff,然后后面接packet內(nèi)容。(測(cè)試發(fā)現(xiàn)服務(wù)端返回給客戶端的payload為這種字節(jié)編碼)

<0 for string data, 1 for binary data><Any number of numbers between 0 and 9><The number 255><packet1 (first type,
then data)>[...]

engine.io服務(wù)器維護(hù)了一個(gè)socket的字典結(jié)構(gòu)用于管理連接到該機(jī)的客戶端,而客戶端的標(biāo)識(shí)就是sid。如果有多個(gè)worker,則需要保證同一個(gè)客戶端的請(qǐng)求落在同一臺(tái)worker上(可以配置nginx根據(jù)sid分發(fā))。因?yàn)槊總€(gè)worker只維護(hù)了一部分客戶端連接,如果要支持廣播,room等特性,則后端需要使用 redis 或者 RabbitMQ 消息隊(duì)列,使用redis的話則是通過redis的訂閱發(fā)布機(jī)制實(shí)現(xiàn)多機(jī)多worker之間的消息推送。

socket.io是engine.io的封裝,在其基礎(chǔ)上增加了自動(dòng)重連,多路復(fù)用,namespace,room等特性。socket.io本身也有一套協(xié)議,它Packet類型分為(CONNECT 0, DISCONNECT 1, EVENT 2, ACK 3, ERROR 4, BINARY_EVENT 5, BINARY_ACK 6)。注意與engine.io的Packet類型有所不同,但是socket.io的packet實(shí)際是借助的engine.io的Message類型發(fā)送的,在后面實(shí)例中可以看到Packet的編碼方式。當(dāng)連接出錯(cuò)的時(shí)候,socket.io會(huì)通過自動(dòng)重連機(jī)制重新連接。

4 源碼分析

在建立連接后,每個(gè)客戶端會(huì)被自動(dòng)加入到一個(gè)默認(rèn)的命名空間/。在每個(gè)命名空間中,socket會(huì)被默認(rèn)加入兩個(gè)名為Nonesid的房間。None的房間用于廣播,而sid是當(dāng)前客戶端的session id,用于單播。除默認(rèn)的房間外,我們可以根據(jù)需要將對(duì)應(yīng)socket加入自定義房間,roomid唯一即可。socket.io基于engine.io,支持websocket和long polling。如果是long polling,會(huì)定時(shí)發(fā)送GET, POST請(qǐng)求,當(dāng)沒有數(shù)據(jù)時(shí),GET請(qǐng)求在拉取隊(duì)列消息時(shí)會(huì)hang住(超時(shí)時(shí)間為pingTimeout),如果hang住期間服務(wù)器一直沒有數(shù)據(jù)產(chǎn)生,則需要等到客戶端發(fā)送下一個(gè)POST請(qǐng)求時(shí),此時(shí)服務(wù)器會(huì)往隊(duì)列中存儲(chǔ)POST請(qǐng)求中的消息,這樣上一個(gè)GET請(qǐng)求才會(huì)返回。如果upgrade到了websocket連接,則探測(cè)成功之后會(huì)定期ping/pong來?;钸B接。流程如下圖所示:

socketio通信流程圖

為方便描述,下面提到的engine.io服務(wù)器對(duì)應(yīng)源文件是engineio/server.py,engine.io套接字對(duì)應(yīng)源文件engineio/socket.py,而socket.io服務(wù)器則對(duì)應(yīng)socketio/server.py。下面分析下socket.io連接建立、消息接收和發(fā)送、連接關(guān)閉過程。socket.io版本為1.9.0,engine.io版本為2.0.4。

連接建立

首先,客戶端會(huì)發(fā)送一個(gè)polling請(qǐng)求來建立連接。此時(shí)的請(qǐng)求參數(shù)沒有sid,表示要建立連接。 engine.io服務(wù)器通過handle_get_request()handle_post_request()方法來分別處理初始化連接以及長(zhǎng)輪詢中的 GET 和 POST 請(qǐng)求。

socket.io在初始化時(shí)便注冊(cè)了3個(gè)事件到engine.io的handlers中,分別是connect(處理函數(shù)_handle_eio_connect),message(_handle_eio_message),disconnect(_handle_eio_disconnect),在engine.io套接字接收到了上述三個(gè)類型的消息后,在自身做了對(duì)應(yīng)處理后都會(huì)觸發(fā)socket.io中的對(duì)應(yīng)的處理函數(shù)做進(jìn)一步處理。

當(dāng)接收到GET請(qǐng)求且沒有sid參數(shù)時(shí),則engine.io服務(wù)器會(huì)調(diào)用 _handle_connect()方法來建立連接。這個(gè)方法主要工作是為當(dāng)前客戶端生成sid,創(chuàng)建Socket對(duì)象并保存到engine.io服務(wù)器的sockets集合中。做了這些初始化工作后,engine.io服務(wù)器會(huì)發(fā)送一個(gè)OPEN類型的數(shù)據(jù)包給客戶端,接著會(huì)觸發(fā)socket.io服務(wù)器的connect事件。

客戶端第一次連接的時(shí)候,socket.io也要做一些初始化的工作,這是在socket.io服務(wù)器的_handle_eio_connect()處理的。這里做的事情主要有幾點(diǎn):

  • 初始化manager,比如用的是redis做后端隊(duì)列的話,則需要初始化redis_manager,包括設(shè)置redis連接配置,如果沒有訂閱頻道則還要訂閱頻道flask_socketio(默認(rèn)頻道是"socket.io"),如果用到gevent,則還要對(duì)redis模塊的socket庫(kù)打monkey-patch等。

  • 將該客戶端加入到默認(rèn)房間None,sid中。

  • 調(diào)用代碼中對(duì)connect事件注冊(cè)的函數(shù)。如下面這個(gè),注意下,socket.io中也有個(gè)用于事件處理的handlers,它保存的是在后端代碼中對(duì)socket.io事件注冊(cè)的函數(shù)(開發(fā)者定義的),而engine.io的handlers中保存的函數(shù)是socket.io注冊(cè)的那三個(gè)針對(duì)connect,message和disconnect事件的固定的處理函數(shù)。

    socketio.on("connect")
    def test_connect():
        print "client connected"
    
  • 發(fā)送一個(gè)sockeio的connect數(shù)據(jù)包給客戶端。

最后在響應(yīng)中engine.io會(huì)為客戶端設(shè)置一個(gè)名為io值為sid的cookie,響應(yīng)內(nèi)容payload包括兩個(gè)數(shù)據(jù)包,一個(gè)是engine.io的OPEN數(shù)據(jù)包,內(nèi)容為sid,pingTimeout等配置和參數(shù);另一個(gè)是socket.io的connect數(shù)據(jù)包,內(nèi)容為40。其中4表示的是engine.io的message消息,0則表示socket.io的connect消息,以字節(jié)流返回。這里的pingTimeout客戶端和服務(wù)端共享這個(gè)配置,用于檢測(cè)對(duì)端是否超時(shí)。

接著會(huì)發(fā)送一個(gè)輪詢請(qǐng)求和websocket握手請(qǐng)求,如果websocket握手成功后客戶端會(huì)發(fā)送2 probe探測(cè)幀,服務(wù)端回應(yīng)3 probe,然后客戶端會(huì)發(fā)送內(nèi)容為5的Upgrade幀,服務(wù)端回應(yīng)內(nèi)容為6的noop幀。探測(cè)幀檢查通過后,客戶端停止輪詢請(qǐng)求,將傳輸通道轉(zhuǎn)到websocket連接,轉(zhuǎn)到websocket后,接下來就開始定期(默認(rèn)是25秒)的 ping/pong(這是socket.io自定義的ping/pong,除此之外,uwsgi也會(huì)定期(默認(rèn)30秒)對(duì)客戶端ping,客戶端回應(yīng)pong,這個(gè)在chrome的Frames里面是看不到的,需要借助wireshark或者用其他瀏覽器插件來觀察)。

服務(wù)端消息接收流程

對(duì)接收消息的則統(tǒng)一通過engine.io套接字的receive()函數(shù)處理:

  • 對(duì)于輪詢,一旦收到了polling的POST請(qǐng)求,則會(huì)調(diào)用receive往該socket的消息隊(duì)列里面發(fā)送消息,從而釋放之前hang住的GET請(qǐng)求。
  • 對(duì)于websocket:
    • 收到了ping,則會(huì)馬上響應(yīng)一個(gè)pong。
    • 接收到了upgrade消息,則馬上發(fā)送一個(gè)noop消息。
    • 接收到了message,則調(diào)用socket.io注冊(cè)到engine.io的_handle_eio_message方法來處理socket.io自己定義的各種消息。
  • 因?yàn)榉?wù)端接收消息并沒有用到消息隊(duì)列來處理,所以要求同一個(gè)客戶端的請(qǐng)求必須落到同一個(gè)worker上面,否則接收消息時(shí)會(huì)報(bào)Invalid session錯(cuò)誤。

服務(wù)端消息發(fā)送流程

而服務(wù)端要給客戶端發(fā)送消息,則需要通過socket.io服務(wù)器的emit方法,注意emit方法是針對(duì)room來發(fā)送消息的,如果是context-aware的,則emit默認(rèn)是對(duì)namespace為/且room名為sid的房間發(fā)送,如果是context-free的,則默認(rèn)是廣播即對(duì)所有連接的客戶端發(fā)送消息(當(dāng)然在context-free的場(chǎng)景下面,你也可以指定room來只給指定room推送消息)。

socket.io要實(shí)現(xiàn)多進(jìn)程以及廣播,房間等功能,勢(shì)必需要接入一個(gè)redis之類的消息隊(duì)列,進(jìn)而socket.io的emit會(huì)調(diào)用對(duì)應(yīng)隊(duì)列管理器pubsub_manager的emit方法,比如用redis做消息隊(duì)列則最終調(diào)用 redis_manager中的_publish() 方法通過redis的訂閱發(fā)布功能將消息推送到flask_socketio頻道。另一方面,每個(gè)進(jìn)程在初始化時(shí)都訂閱了 flask_socketio頻道,而且都有一個(gè)協(xié)程(或線程)在監(jiān)聽頻道中是否有消息,一旦有消息,就會(huì)調(diào)用pubsub_manager._handle_emit()方法對(duì)本機(jī)對(duì)應(yīng)的socket發(fā)送對(duì)應(yīng)的消息,最終是通過socket.io服務(wù)器的_emit_internal()方法實(shí)現(xiàn)對(duì)本機(jī)中room為sid的所有socket發(fā)送消息的,如果room為None,則就是廣播,即對(duì)所有連接到本機(jī)的所有客戶端推送消息。

socket.io服務(wù)器發(fā)送消息要基于engine.io消息包裝,所以歸結(jié)到底還是調(diào)用的engine.io套接字中的send()方法。engine.io為每個(gè)客戶端都會(huì)維護(hù)一個(gè)消息隊(duì)列,發(fā)送數(shù)據(jù)都是先存到隊(duì)列里面待拉取,websocket除了探測(cè)幀之外的其他數(shù)據(jù)幀也都是通過該消息隊(duì)列發(fā)送。

關(guān)閉連接(只分析websocket)

websocket可能異常關(guān)閉的情況很多。比如客戶端發(fā)了ping后等待pong超時(shí)關(guān)閉,服務(wù)端接收到ping跟上一個(gè)ping之間超過了pingTimeout;用的uwsgi的話,uwsgi發(fā)送ping,如果在websockets-pong-tolerance(默認(rèn)3秒)內(nèi)接收不到pong回應(yīng),也會(huì)關(guān)閉連接;還有如果nginx的proxy_read_timeout配置的比pingInterval小等。

只要不是客戶端主動(dòng)關(guān)閉連接,socket.io就會(huì)在連接出錯(cuò)后不斷重試以建立連接。重試間隔和重試次數(shù)由reconnectionDelayMax(默認(rèn)5秒)和reconnectionAttempts(默認(rèn)一直重連)設(shè)定。下面討論客戶端正常關(guān)閉的情況,各種異常關(guān)閉情況請(qǐng)具體情況具體分析。

客戶端主動(dòng)關(guān)閉

假定客戶端調(diào)用socket.close()主動(dòng)關(guān)閉websocket連接,則會(huì)先發(fā)送一個(gè)消息41(4:engine.io的message,1:socket.io的disconnect)再關(guān)閉連接。如前面提到,engine.io套接字接收到消息后會(huì)交給socket.io服務(wù)器注冊(cè)的 _handle_eio_message()處理。最終是調(diào)用的socket.io的_handle_disconnect(),該函數(shù)工作包括調(diào)用socketio.on("disconnect")注冊(cè)的函數(shù),將該客戶端從加入的房間中移除,清理環(huán)境變量等。

uwsgi而接收到客戶端關(guān)閉websocket連接消息后會(huì)關(guān)閉服務(wù)端到客戶端的連接。engine.io服務(wù)器的websocket數(shù)據(jù)接收例程ws.wait()因?yàn)檫B接關(guān)閉報(bào)IOError,觸發(fā)服務(wù)端循環(huán)收發(fā)數(shù)據(jù)過程停止,并從維護(hù)的sockets集合中移除這個(gè)關(guān)閉的sid。然后調(diào)用engine.io套接字的close(wait=True, abort=True)方法,由于是客戶端主動(dòng)關(guān)閉,這里就不會(huì)再給客戶端發(fā)送一個(gè)CLOSE消息。而 engine.io服務(wù)器的close方法一樣會(huì)觸發(fā)socket.io之前注冊(cè)的disconnect事件處理函數(shù),由于前面已經(jīng)調(diào)用_handle_disconnect()處理了關(guān)閉連接事件,所以這里_handle_eio_disconnect()不需要再做其他操作(這個(gè)操作不是多余的,其作用見后一節(jié))。

瀏覽器關(guān)閉

直接關(guān)閉瀏覽器發(fā)送的是websocket的標(biāo)準(zhǔn)CLOSE消息,opcode為8。socket.io服務(wù)端處理方式基本一致,由于這種情況下并沒有發(fā)送socket.io的關(guān)閉消息41,socket.io的關(guān)閉操作需要等到engine.io觸發(fā)的_handle_eio_disconnect()中處理,這就是前一節(jié)中為什么engine.io服務(wù)器后面還要多調(diào)用一次 _handle_eio_disconnect()的原因所在。

5 實(shí)例

協(xié)議說明容易讓人有點(diǎn)迷糊,websocket,engine.io,socket.io,各自協(xié)議是如何工作的,看看實(shí)例可能會(huì)比較清晰,為了方便測(cè)試,我寫了個(gè)Dockerfile,安裝了docker的童鞋可以拉取代碼執(zhí)行 bin/start.sh 即可啟動(dòng)擁有完整的 nginx+uwsgi+gevent+flask_socketio測(cè)試環(huán)境的容器開始測(cè)試,瀏覽器打開http://127.0.0.1即可測(cè)試。flask_socketio支持的異步模式有threading, eventlet, gevent 和 gevent_uwsgi等,我的測(cè)試環(huán)境async_mode用的是gevent_uwsgi,完整代碼見 這里。

對(duì)于不支持websocket的低版本瀏覽器,socket.io會(huì)退化為長(zhǎng)輪詢的方式,通過定期的發(fā)送GET, POST請(qǐng)求來拉取數(shù)據(jù)。沒有數(shù)據(jù)時(shí),會(huì)將請(qǐng)求數(shù)據(jù)的GET請(qǐng)求hang住,直到服務(wù)端有數(shù)據(jù)產(chǎn)生或者客戶端的POST請(qǐng)求將GET請(qǐng)求釋放,釋放之后會(huì)緊接著再次發(fā)送一個(gè)GET請(qǐng)求,除此之外,數(shù)據(jù)編解碼和處理流程與websocket方式基本一致。實(shí)例只針對(duì)websocket進(jìn)行分析,如果要測(cè)試長(zhǎng)輪詢,可以將nginx配置中的proxy_set_header中的Connection和Upgrade去掉即可。

為了觀察socket.io客戶端的調(diào)用流程,可以設(shè)置localStorage.debug = '*';,測(cè)試的前段代碼片段如下(完整代碼見倉(cāng)庫(kù)):

 <script type="text/javascript" charset="utf-8">
    var socket = io.connect('/', {
        "reconnectionDelayMax": 10000,
        "reconnectionAttempts": 10
    });
    socket.on('connect', function() {
        $('#log').append('<br>' + $('<div/>').text('connected').html());
    })

    $(document).ready(function() {

        socket.on('server_response', function(msg) {
            $('#log').append('<br>' + $('<div/>').text('Received from server: ' + ': ' + msg.data).html());
        });

        $('form#emit').submit(function(event) {
            socket.emit('client_event', {data: $('#emit_data').val()});
            return false;
        });
    });

 </script>

測(cè)試代碼比較簡(jiǎn)單,引入socket.io的js庫(kù)文件,然后在連接成功后在頁(yè)面顯示“connected”,在輸入框輸入文字,可以通過連接發(fā)送至服務(wù)器,然后服務(wù)器將瀏覽器發(fā)送的字符串加上server標(biāo)識(shí)回顯回來。

建立連接

在chrome中打開頁(yè)面可以看到發(fā)了3個(gè)請(qǐng)求,分別是:

1 http://127.0.0.1/socket.io/?EIO=3&transport=polling&t=MAkXxBR
2 http://127.0.0.1/socket.io/? EIO=3&transport=polling&t=MAkXxEz&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4
3 ws://127.0.0.1/socket.io/?EIO=3&transport=websocket&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4

請(qǐng)求默認(rèn)路徑是/socket.io,注意命名空間并不會(huì)在路徑中,而是在參數(shù)中傳遞。第1個(gè)請(qǐng)求是polling,EIO是engine.io協(xié)議的版本號(hào),t是一個(gè)隨機(jī)字符串,第一個(gè)請(qǐng)求時(shí)還還沒有生成sid。服務(wù)端接收到消息后會(huì)調(diào)用engine.io/server.py_handle_connect()建立連接。

返回的結(jié)果是

## Response Headers: Content-Type: application/octet-stream ##
??0{"pingInterval":25000,"pingTimeout":60000,"upgrades":["websocket"],"sid":"9c54f9c1759c4dbab8f3ce20c1fe43a4"}??40

可以看到,這里返回的是字節(jié)流的payload,content-type為"application/octet-stream"。這個(gè)payload其實(shí)包含兩個(gè)packet,第一個(gè)packet是engine.io的OPEN消息,類型為0,它的內(nèi)容為pingInterval,pingTimeout,sid等;第二個(gè)packet類型是4(message),而它的數(shù)據(jù)內(nèi)容是0,表示socket.io的CONNECT。而其中的看起來亂碼的部分實(shí)則是前面提到的payload編碼中的長(zhǎng)度的編碼\x00\x01\x00\x09\xff\x00\x02\xff。

如果在js代碼中將io.connect的namespace參數(shù)不用默認(rèn)的/,而設(shè)置為/demo,那么連接時(shí)還會(huì)發(fā)一個(gè)POST請(qǐng)求帶上7:40/demo的字符格式payload(其中7是數(shù)據(jù)長(zhǎng)度,4是engineio的message,0則是表示socket.io的connect類型消息),服務(wù)器接收到該P(yáng)OST請(qǐng)求后會(huì)將該客戶端再加入到/demo命名空間中。

  • 第2個(gè)請(qǐng)求是輪詢請(qǐng)求,如果websocket建立并測(cè)試成功(使用內(nèi)容為probe的ping/pong幀)后,會(huì)暫停輪詢請(qǐng)求??梢钥吹捷喸冋?qǐng)求一直hang住到websocket建立并測(cè)試成功后才返回,響應(yīng)結(jié)果是??6,前面亂碼部分是payload長(zhǎng)度編碼\x00\x01\xff,后面的數(shù)字6是engine.io的noop消息。

  • 第3個(gè)請(qǐng)求是websocket握手請(qǐng)求,握手成功后,可以在chrome的Frames里面看到websocket的數(shù)據(jù)幀交互流程,可以看到如前面分析,確實(shí)是先發(fā)的探測(cè)幀,然后是Upgrade幀,接著就是定期的ping/pong幀了。

    2probe
    3probe
    5
    2
    3
    ...
    

客戶端發(fā)送消息給服務(wù)端

如果要發(fā)送消息給服務(wù)器,在瀏覽器輸入框輸入test,點(diǎn)擊echo按鈕,可以看到websocket發(fā)送的幀的內(nèi)容如下,其中4是engine.io的message類型標(biāo)識(shí),2是socket.io的EVENT類型標(biāo)識(shí),而后面則是事件名稱和數(shù)據(jù),數(shù)據(jù)可以是字符串,字典,列表等類型。

42["client_event",{"data":"test"}]

服務(wù)端接收消息流程

而服務(wù)端接收消息并返回一個(gè)新的event為"server_response",數(shù)據(jù)為"TEST",代碼如下,其中socketio是flask_socketio模塊的SocketIO對(duì)象,它提供了裝飾器方法 on將自定義的client_event和處理函數(shù)test_client_event注冊(cè)到sockerio服務(wù)器的handlers中。

當(dāng)接收到 client_event 消息時(shí),會(huì)通過sockerio/server.py中的 _handle_eio_message()方法處理消息,對(duì)于socket.io的EVENT類型的消息最終會(huì)通過_trigger_event()方法處理,該方法也就是從handlers中拿到client_event對(duì)應(yīng)的處理函數(shù)并調(diào)用之。

from flask_socketio import SocketIO, emit
socketio = SocketIO(...)
    
@socketio.on("client_event")
def test_client_event(msg):
    emit("server_response", {"data": msg["data"].upper()})

服務(wù)端發(fā)送消息到客戶端

服務(wù)端發(fā)送消息通過 flask_socketio提供的emit方法實(shí)現(xiàn),如前一節(jié)分析的,最終還是通過的engine.io包裝成engine.io的消息格式后發(fā)出。

42["server_response",{"data":"TEST"}]

關(guān)閉連接

客戶端要主動(dòng)關(guān)閉連接,在JS中調(diào)用 socket.close() 即可,此時(shí)發(fā)送的數(shù)據(jù)包為 41,其中4代表的是engine.io的消息類型message,而數(shù)據(jù)1則是指的socket.io的消息類型disconnect,關(guān)閉流程見上一章的說明。

幾個(gè)小點(diǎn)

假如客戶端連接時(shí)namespace為/demo,而服務(wù)端發(fā)送消息emit(namespace="/")指定的命名空間為默認(rèn)的/,那這個(gè)消息是否會(huì)發(fā)給客戶端?答案是會(huì)。因?yàn)榍懊嬲f到,每個(gè)客戶端默認(rèn)加入到了/中,所以,服務(wù)端的消息肯定會(huì)發(fā)給客戶端的,但是客戶端接收到消息會(huì)檢查namespace是否與其connect時(shí)的namespace一致,如果不一致,雖然接收到了消息但是并不會(huì)觸發(fā)客戶端的操作。

如果客戶端想知道自己發(fā)送的事件是否被服務(wù)端成功接收,可以在emit里面加回調(diào)函數(shù),如下所示。加了回調(diào)函數(shù)后客戶端發(fā)送的消息格式為421["client_event",{"data":"test"}],即在原來基礎(chǔ)上多加了一個(gè)id標(biāo)識(shí)1,服務(wù)端接收到事件后,發(fā)現(xiàn)消息中有id,則會(huì)多發(fā)送一個(gè)socket.io的ACK包給客戶端,內(nèi)容為該事件處理函數(shù)的返回值,客戶端收到ACK包后會(huì)調(diào)用下面的callback。

  socket.emit('client_event', {data: $('#message').val()}, callback);

而服務(wù)端如果要確認(rèn)發(fā)送的消息是否被客戶端接收到,可以在emit函數(shù)里面指定 callback參數(shù),而客戶端的事件監(jiān)聽里面回調(diào)函數(shù)加多一個(gè)ack參數(shù)并調(diào)用ack函數(shù)即可,這樣客戶端收到了服務(wù)端的消息后,調(diào)用ack時(shí)就會(huì)發(fā)送一個(gè)ACK消息給服務(wù)端,ack函數(shù)里面也可以傳參數(shù)給服務(wù)端。

### 服務(wù)端
flask_socketio.emit("server_response", {"data": "xxx"}, callback=callback)

### 客戶端
socket.on('server_response', function(msg, ack) {
      ...  
      ack();
 });

6 總結(jié)

本文示例中,為了便于分析,只用了默認(rèn)的namespace和room,而在項(xiàng)目中可以根據(jù)業(yè)務(wù)需要使用namespace,room等高級(jí)特性。在nginx+uwsgi使用socket.io時(shí),注意nginx的超時(shí)配置proxy_read_timeout和uwsgi的websocket超時(shí)配置websocket-ping-freq和websockets-pong-tolerance,配置不當(dāng)會(huì)導(dǎo)致socke.io因?yàn)閣ebsocket的ping/pong超時(shí)而不斷重連。如果要禁用websocket,可以在SocketIO參數(shù)里面加上allow_upgrades=False即可。

調(diào)研了一些其他系統(tǒng)WEB端的推送機(jī)制,微信網(wǎng)頁(yè)版沒有用websocket,而是統(tǒng)一用的長(zhǎng)輪詢的方式。今日頭條WEB版其實(shí)都沒有實(shí)時(shí)推送信息流,而是定時(shí)提示用戶去手動(dòng)點(diǎn)擊刷新。即刻WEB版則是用的短連接定期拉取是否有未讀消息,不過它也用到了socket.io。

需要注意,不要在服務(wù)端socketio.on("connect)"調(diào)用emit函數(shù)或者過多的其他操作,否則容易引起服務(wù)端連接不關(guān)閉的問題。

參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容