通過Packetbeat抓取Finagle協(xié)議數(shù)據(jù)包(自定義Thrift協(xié)議)總結(jié)

寫在前面

最近一年多一直在做服務(wù)治理相關(guān)的開發(fā)工作. 起初服務(wù)監(jiān)控采用了成本比較低的方式來實(shí)現(xiàn)(提供者,消費(fèi)者自己按分鐘維度上報(bào)健康數(shù)據(jù)到Redis,但是這種方式只是在Java的服務(wù)提供者和消費(fèi)者做到了很好的實(shí)現(xiàn), 其他語言目前只能上報(bào)很少部分的監(jiān)控?cái)?shù)據(jù)). 因?yàn)楣镜拈_發(fā)語言是多樣的, 其中包括: Nodejs, Ruby, Golang, Java, Scala等, 那么將來要對(duì)監(jiān)控?cái)?shù)據(jù)的模型拓展, 需求變更等, 將很難快速推廣實(shí)現(xiàn). 隨著公司業(yè)務(wù)的高速發(fā)展, 以及將來所有服務(wù)部署Docker化, 服務(wù)的監(jiān)控預(yù)警已經(jīng)是服務(wù)治理工作中的重中之重. 服務(wù)監(jiān)控最好可以同時(shí)監(jiān)控基礎(chǔ)服務(wù)(Mysql, Redis等),業(yè)務(wù)服務(wù). 我們的業(yè)務(wù)服務(wù)是采用的Twitter的Finagle-thrift實(shí)現(xiàn)多語言之間的RPC調(diào)用. Balabala說了這么多, 就是我們現(xiàn)在要做全鏈路監(jiān)控, 做監(jiān)控首先第一步是需要可以收集到這些網(wǎng)絡(luò)調(diào)用的原始數(shù)據(jù), 這個(gè)時(shí)候ElasticStack中的Beats項(xiàng)目進(jìn)入了我們的視線, Beats項(xiàng)目中的Packetbeat子項(xiàng)目可以抓取到像Mysql, Redis, Thrift等協(xié)議的數(shù)據(jù)包. 但是,我們業(yè)務(wù)使用的通信協(xié)議是Finagle-thrift, 這里面為了滿足一些拓展(比如:用于RPC調(diào)用鏈跟蹤的Zipkin),Finagle-thrift在原生Thrift上做了二次封裝, 接下來需要讓Packetbeat對(duì)Finagle-thrift協(xié)議支持. 下面我將分析過程整理如下, 方便以后溫習(xí)回顧.

Packetbeat項(xiàng)目介紹

更詳細(xì)的請(qǐng)參考 Medcl的一個(gè)教程

整個(gè)Beats項(xiàng)目都是用的Golang語言開發(fā), Golang這幾天也是現(xiàn)學(xué)現(xiàn)賣, 我在整個(gè)調(diào)試過程中沒有找到可以比較方便進(jìn)行Debug的方式, 只能通過fmt.Println進(jìn)行各種調(diào)試信息的輸出, 這個(gè)過程比較痛苦. 這里我順便記錄一下怎么配置Go的環(huán)境, 有幾個(gè)概念比較懵,在此記錄一下.

安裝GO

這里 獲取對(duì)應(yīng)的操作系統(tǒng)的GO安裝bao

GOPATH

  • 安裝好Go后需要設(shè)置環(huán)境變量,如下:

    #這是Go的安裝路徑
    export GOROOT=/usr/local/go
    export GOBIN=$GOROOT/bin
    
    #這里可以理解為Go項(xiàng)目的工作空間, 這里允許有多個(gè)目錄,注意用":"分割
    #當(dāng)有多個(gè)GOPATH時(shí),執(zhí)行 go get命令的內(nèi)容默認(rèn)會(huì)放在第一個(gè)目錄下
    export GOPATH=/work/goworkspace
    
  • GOPATH的的幾個(gè)目錄約定

    • src 放置Go項(xiàng)目的源碼
    • pkg Go項(xiàng)目中使用的第三方包
    • bin 編譯后生成的可執(zhí)行文件, 可以把此目錄加入到 PATH 變量中

獲取項(xiàng)目

 #創(chuàng)建相應(yīng)目錄
 mkdir -p $GOPATH/src/github.com/elastic/ 
 cd $GOPATH/src/github.com/elastic
    
 #簽出源碼
 git clone https://github.com/elastic/beats.git
 cd beats
    
 #修改官方倉庫為upstream源,設(shè)置自己的倉庫為origin源
 git remote rename origin upstream
 git remote add origin git@github.com:medcl/packetbeat.git
    
 #獲取上游最新的代碼,如果是剛fork的話可不用管
 git pull upstream master
    
 #簽出一個(gè)名為finagle的分支,用于開發(fā)這個(gè)功能
 git checkout -b finagle
    
 #切換到packetbeat模塊
 cd packetbeat
    
 #獲取依賴信息
 (mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone  https://github.com/golang/tools.git )
 go get github.com/tools/godep
    
 #編譯
 make

yml配置文件說明

interfaces:
  #如果提供者消費(fèi)者在本機(jī),直接寫成lo0 
  device: en0
protocols:
    # 自定義協(xié)議名
  finaglethrift:
    ports: [20880, 9090, 9091, 9099, 9098]
    # 自定義Thrift的Transport type一定要是frame的方式, 否則解析不出來
    transport_type: framed
    protocol_type: binary
    # idl文件一定要有
    idl_files: ["test_cfg/result.thrift","test_cfg/order.thrift","test_cfg/hello.thrift"]


output:
  elasticsearch:
    hosts: ["192.168.10.235:9200"]
  kafka:
    hosts: ["192.168.5.159:9092"]
    topic: "packetbeat_test_qqq"
shipper:
logging:
  files:
    path: /tmp/mybeat

需要修改哪些文件

  • 新增協(xié)議目錄, packetbeat啟動(dòng)時(shí)會(huì)自動(dòng)掃描protos目錄下的協(xié)議包


    因?yàn)槭且獙?duì)Thrift協(xié)議進(jìn)行拓展, 所以之前很多代碼是可以復(fù)用的, 直接將原來的thrift目錄在當(dāng)前目錄下復(fù)制一份, 直接改名為finaglethrift

  • 文件修改
    為了便于區(qū)分, 我們將原來所有文件名中的thrift變更為finaglethrift, 變更之后我們只需要修改finaglethrift.go文件即可.

    • 將包名從thrift變更為finaglethrift

    • 修改協(xié)議注冊(cè)名,這里的名稱直接匹配yml配置文件中的協(xié)議名

  • 協(xié)議解析的具體方法修改, 主要業(yè)務(wù)抓包分析將在這個(gè)方法中完成,我們本次改動(dòng)也是針對(duì)這個(gè)方法的修改

原生Thrift簡(jiǎn)單分析

通訊協(xié)議格式

  • TCompactProtocol

  • TBinaryProtocol(我們主要采用這種格式進(jìn)行通訊)
    TBinaryProtocol下通信方式采用TFramedTransport,即以幀的方式對(duì)數(shù)據(jù)進(jìn)行傳輸

    注意: 服務(wù)端, 服務(wù)端需要采用Framed的方式進(jìn)行通信, packetbeat采用Framed的方式進(jìn)行抓包分析, 如果thrift的傳輸方式不是這種方式, packetbeat將解析不出

  • TJSONProtocol

核心模型

  • TTransport, 這是一個(gè)基類,我們使用的傳輸方式是Framed, 那么直接使用的TFramedTransport將繼承TTransport. TFramedTransport會(huì)將數(shù)據(jù)寫入到一個(gè)buf中, 等全部寫完之后會(huì)調(diào)用flush方法,首先計(jì)算出buf中的數(shù)據(jù)長(zhǎng)度,將4個(gè)字節(jié)的幀長(zhǎng)度和數(shù)據(jù)內(nèi)容進(jìn)行封裝進(jìn)行發(fā)送. 針對(duì)解析方怎么判斷是否解析完,都是通過發(fā)送的data中頭四個(gè)字節(jié)判斷.具體如下圖:


    具體封裝源代碼:

    @Override
    public void flush() throws TTransportException {
        byte[] buf = writeBuffer_.get();
        int len = writeBuffer_.len();
        writeBuffer_.reset();
        # 封裝成 4個(gè)字節(jié) + 幀內(nèi)容
        encodeFrameSize(len, i32buf);
        transport_.write(i32buf, 0, 4);
        transport_.write(buf, 0, len);
        transport_.flush();
    }
    
  • TProtocol, 協(xié)議接口, 我們主要是采用TBinaryProtocol的協(xié)議類進(jìn)行通信, 其中實(shí)現(xiàn)了接口中的操作協(xié)議的方法. TBinaryProtocol需要為消息體封裝一個(gè)Header, 其中還定義了Thrift中的讀寫模式(這里很重要,如果模式不匹配將無法正常解析),主要分為: 嚴(yán)謹(jǐn)?shù)淖x寫, 普通讀寫. 因?yàn)槲覀冎饕槍?duì)嚴(yán)謹(jǐn)讀寫模式進(jìn)行抓包分析, 下面將重點(diǎn)解析一下在嚴(yán)謹(jǐn)讀寫模式下的消息體內(nèi)容都是什么, 具體如下圖:

    2016-04-30_11-39-09

在TBinaryProtocol中有對(duì)消息體的讀取和寫入操作, 具體代碼如下:

  public TMessage readMessageBegin() throws TException {
        int size = readI32();
        if (size < 0) {
          int version = size & VERSION_MASK;
          if (version != VERSION_1) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
          }
          return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
        } else {
          if (strictRead_) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
          }
          return new TMessage(readStringBody(size), readByte(), readI32());
        }
  } 
    
    public void writeMessageBegin(TMessage message) throws TException {
        if (strictWrite_) {
          int version = VERSION_1 | message.type;
          writeI32(version);
          writeString(message.name);
          writeI32(message.seqid);
        } else {
          writeString(message.name);
          writeByte(message.type);
          writeI32(message.seqid);
        }
  }
    
    
    /**
     * Message type constants in the Thrift protocol.
     *
     */
    public final class TMessageType {
          public static final byte CALL  = 1;
          public static final byte REPLY = 2;
          public static final byte EXCEPTION = 3;
          public static final byte ONEWAY = 4;
    }


  • TMessage, 服務(wù)提供者,消費(fèi)者在進(jìn)行RPC通信時(shí)都會(huì)講傳遞的數(shù)據(jù)封裝成TMessage, 主要包含三部分
    • 名稱
    • 序號(hào)
    • 類型

Finagle-thrift協(xié)議分析

因?yàn)镕inagle-thrift是在Thrift協(xié)議之上做了封裝, 我們主要對(duì)著兩個(gè)協(xié)議中具體的數(shù)據(jù)進(jìn)行比對(duì).

測(cè)試數(shù)據(jù)IDL

為了讓測(cè)試具有代表性, 構(gòu)建的IDL文件中既有簡(jiǎn)單的沒有入?yún)?返回值的finaglePing方法, 也有有入?yún)?復(fù)雜返回值的detail方法

    
include "result.thrift"
    
/*訂單*/
struct Order {
    1:i32 userId
    /*買家*/
    2:string userName,
    /*訂單ID*/
    3:string orderId,
}
    
struct OrderResult {
    1:result.Result result,
    2:optional Order order
}
    
service OrderServ{
    /*訂單詳情*/
    OrderResult detail(1:i32 userId, 2:string userName, 3:string orderId)
    void finaglePing()
}   
    
    
    
    
/************************復(fù)雜返回值Result的定義***************************/
    
struct FailDesc {
    1:string name,
    2:string failCode,
    3:string desc
}
    
struct Result {
    
    1:i32 code,
    
    2:optional list<FailDesc> failDescList
}
    
struct StringResult {
    1:Result result,
    
    2:optional string value,
    
    3:optional string extend
}   
    
    

一次RPC調(diào)用的差異

原生Thrift調(diào)用

我們針對(duì)finaglePing方法通過原生Thrift進(jìn)行一次RPC調(diào)用,并在Client端TcpDump出產(chǎn)生的數(shù)據(jù)包


2016-04-30_12-33-13

從圖上可以看出,包含了3次握手, 1次Client與Server的業(yè)務(wù)請(qǐng)求交互, 4次揮手關(guān)閉連接.

下面我們看Client發(fā)送請(qǐng)求時(shí)的具體數(shù)據(jù)包內(nèi)容如下圖:


2016-04-30_12-39-12

這里包含數(shù)據(jù)長(zhǎng)度, Thrift是否是嚴(yán)謹(jǐn)讀寫,消息類型, 消息內(nèi)容等信息.

Fiangle-thrift調(diào)用及分析

我們針對(duì)finaglePing方法同樣通過Fiangle-thrift方式進(jìn)行一次RPC調(diào)用,并在Client端TcpDump出產(chǎn)生的數(shù)據(jù)包


2016-04-30_12-45-41

從上圖看出, 一次RPC調(diào)用包含了, 3次握手, 1次fiangle確認(rèn)協(xié)議的請(qǐng)求交互, 1次Client與Server的業(yè)務(wù)請(qǐng)求交互, 4次揮手關(guān)閉連接.

關(guān)于Client發(fā)送的請(qǐng)求和原生Thrift還不太一樣, 在創(chuàng)建完連接之后, 需要發(fā)送
一次帶有__can__finagle__trace__v3__信息的請(qǐng)求已確認(rèn)是否是Finagle-thrift協(xié)議, 確認(rèn)成功之后才會(huì)進(jìn)行真正的業(yè)務(wù)交互, 這次確認(rèn)是一次標(biāo)準(zhǔn)的Thrift通信,具體如下圖:

2016-04-30_12-52-52

下面是在確認(rèn)Fiangle標(biāo)識(shí)之后進(jìn)行的真正的業(yè)務(wù)通信,具體如下圖:

2016-04-30_12-57-36

我們上面這張圖中可以看出在標(biāo)準(zhǔn)的Thrift協(xié)議數(shù)據(jù)之前Finagle-thrift自己又加了很多自己的數(shù)據(jù),具體加了什么, 我們來看一下Fiangle的源碼, 具體如下:

```
/**
 * ThriftClientFramedCodec implements a framed thrift transport that
 * supports upgrading in order to provide TraceContexts across
 * requests.
 */
object ThriftClientFramedCodec {
  /**
   * Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodecFactory]].
   * Passing a ClientId will propagate that information to the server iff the server is a finagle
   * server.
   */
  def apply(clientId: Option[ClientId] = None) =
    new ThriftClientFramedCodecFactory(clientId)

  def get() = apply()
}

class ThriftClientFramedCodecFactory(
    clientId: Option[ClientId],
    _useCallerSeqIds: Boolean,
    _protocolFactory: TProtocolFactory)
  extends CodecFactory[ThriftClientRequest, Array[Byte]]#Client {

  def this(clientId: Option[ClientId]) = this(clientId, false, Protocols.binaryFactory())

  def this(clientId: ClientId) = this(Some(clientId))

  // Fix this after the API/ABI freeze (use case class builder)
  def useCallerSeqIds(x: Boolean): ThriftClientFramedCodecFactory =
    new ThriftClientFramedCodecFactory(clientId, x, _protocolFactory)

  /**
   * Use the given protocolFactory in stead of the default `TBinaryProtocol.Factory`
   */
  def protocolFactory(pf: TProtocolFactory) =
    new ThriftClientFramedCodecFactory(clientId, _useCallerSeqIds, pf)

  /**
   * Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodec]]
   * with a default TBinaryProtocol.
   */
  def apply(config: ClientCodecConfig) =
    new ThriftClientFramedCodec(_protocolFactory, config, clientId, _useCallerSeqIds)
}

class ThriftClientFramedCodec(
  protocolFactory: TProtocolFactory,
  config: ClientCodecConfig,
  clientId: Option[ClientId] = None,
  useCallerSeqIds: Boolean = false
) extends Codec[ThriftClientRequest, Array[Byte]] {

  private[this] val preparer = ThriftClientPreparer(
    protocolFactory, config.serviceName,
    clientId, useCallerSeqIds)

  def pipelineFactory: ChannelPipelineFactory =
    ThriftClientFramedPipelineFactory

  override def prepareConnFactory(
    underlying: ServiceFactory[ThriftClientRequest, Array[Byte]]
  ) = preparer.prepare(underlying)

  override val protocolLibraryName: String = "thrift"
}       
```

Scala源碼看起來太費(fèi)勁, 既然知道了原理, 為了可以解析出具體的Fiangle-thrift中的東西, 我只需要設(shè)置FrameSize和data的offset的位置, 獲取到原生的Thrift協(xié)議中的Framed數(shù)據(jù)即可, 然后復(fù)用Packetbeat自帶的針對(duì)Thrift協(xié)議包的抓取與組合邏輯.

通過比對(duì)兩個(gè)業(yè)務(wù)包我知道中間Fiangle-thrift自己添加的信息字節(jié)大小固定為129個(gè)字節(jié),這只是Client在發(fā)送請(qǐng)求時(shí)才會(huì)添加這些附加信息, Server端返回值則是在原生Thrift協(xié)議中添加了1個(gè)字節(jié), 其中我還需要排除創(chuàng)建連接之后發(fā)送的Finagle-thrift協(xié)議確認(rèn)請(qǐng)求.

我們完成了普通Finagle-thrift協(xié)議的解析,接下來還要針對(duì)附帶Zipkin信息的Finagle-thrift協(xié)議的解析, Zipkin是參考Google的Dapper完成的可以對(duì)RPC調(diào)用鏈進(jìn)行跟蹤的框架, 這已經(jīng)是業(yè)內(nèi)針對(duì)分布式系統(tǒng)之間RPC調(diào)用鏈跟蹤的通用解決方案. Zipkin無非就是在RPC調(diào)用時(shí)多傳輸了TraceId, SpanId, ParentSpanId, IsSample等信息, 通過下面的Zipkin源碼可以確定這些信息的大小也是固定字節(jié),并且大小為4個(gè)字節(jié). Zipkin關(guān)于這塊的源碼如下:

```
/**
 * The wire format is (big-endian):
 *     ''spanId:8 parentId:8 traceId:8 flags:8''
 */
def tryUnmarshal(body: Buf): Try[TraceId] = {
      if (body.length != 32)
        return Throw(new IllegalArgumentException("Expected 32 bytes"))

      val bytes = local.get()
      body.write(bytes, 0)

      val span64 = ByteArrays.get64be(bytes, 0)
      val parent64 = ByteArrays.get64be(bytes, 8)
      val trace64 = ByteArrays.get64be(bytes, 16)
      val flags64 = ByteArrays.get64be(bytes, 24)

      val flags = Flags(flags64)
      val sampled = if (flags.isFlagSet(Flags.SamplingKnown)) {
        if (flags.isFlagSet(Flags.Sampled)) someTrue else someFalse
      } else None

      val traceId = TraceId(
        if (trace64 == parent64) None else Some(SpanId(trace64)),
        if (parent64 == span64) None else Some(SpanId(parent64)),
        SpanId(span64),
        sampled,
        flags)

      Return(traceId)
}
```

下面是Fiangle-thrift針對(duì)Zipkin關(guān)閉和開啟的一個(gè)抓包對(duì)比圖:


2016-04-30_13-46-49

根據(jù)上面的分析邏輯,我就可以在Packetbeat中的messageParser方法中通過一些字節(jié)特征修正FrameSize和data的offset來把數(shù)據(jù)包變成原生的Thrift協(xié)議數(shù)據(jù)包, 具體代碼如下:

func (thrift *Thrift) messageParser(s *ThriftStream) (bool, bool) {
        var ok, complete bool
        var m = s.message
        for s.parseOffset < len(s.data) {
            dataStr := string(s.data)
            switch s.parseState {
            case ThriftStartState:
                m.start = s.parseOffset
                if thrift.TransportType == ThriftTFramed {
                    if len(s.data) < 4 {
                        return true, false
                    }
                    frameSize := common.Bytes_Ntohl(s.data[:4])
                    m.FrameSize = frameSize
                    s.parseOffset = 4
                    
                    if (!strings.Contains(dataStr, "__can__finagle__trace__v3__")) {
                        var thriftFlagIndex1 int = bytes.LastIndex(s.data, thriftFlag1)
                        if thriftFlagIndex1> -1 {// 如果標(biāo)識(shí)為80010001 那么代表是client->server
                            // client -> server
                            m.FrameSize = common.Bytes_Ntohl(s.data[:4]) - uint32(thriftFlagIndex1) - 4  // 從8001位置之后開始
                            s.parseOffset = thriftFlagIndex1// 從8001位置開始(包括8001位置)
                        }else{//如果沒有標(biāo)識(shí)為80010001, 那么應(yīng)該有標(biāo)識(shí)位80010002, 那么代表是server->client
                            // finagle 返回值
                            if bytes.LastIndex(s.data, thriftFlag2)==5 {
                                m.FrameSize = frameSize - 1
                                s.parseOffset = 4 + 1
                            }
                        }
                    }
                }
        ... ...
}

參考

Thrift Tutorial

Packetbeat協(xié)議擴(kuò)展開發(fā)教程

由淺入深了解Thrift

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容