ETCD《二》--Peer監(jiān)聽

Peer監(jiān)聽

Peer 監(jiān)聽通常監(jiān)聽在 2380 端口上,用于提供 Peer 之間的 https 服務(wù)

注冊的一些服務(wù)端點:

  • /raft:raftHandler處理

  • /members:peerMembersHandler處理

  • /members/promote/:peerMemberPromoteHandler處理

  • /leases、/leases/internal:leaseHandler處理

  • /downgrade/enabled:downgradeEnabledHandler處理

  • /members/hashkv:hashKVHandler處理

  • /version:versionHandler處理

Raft Handler

Raft Handler有可以進(jìn)一步劃分為

  • /raft:pipelineHandler處理

  • /raft/stream/:streamHandler處理

  • /raft/snapshot:snapHandler處理

  • /raft/probing:httpHealth處理

raft handler在初始化的時候會創(chuàng)建兩類 Transport,一種是pipeline transport, 另一種是 stream transport

func (t *Transport) Start() error {
    var err error
    t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)

    t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)

    return nil
}
  • stream transport:長連接不斷開,Peer 之間相互推送的方式

  • pipeline transport:短鏈接,請求一次,響應(yīng)一次;發(fā)送 snapshot 或者 stream transport 不可用時才會使用這個;snapshot 也使用這個主要考慮是 snapshot 較大 ,避免阻塞其他消息

func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
    var ok bool
    
    if isMsgSnap(m) {
        return p.pipeline.msgc, pipelineMsg
    }  else if writec, ok = p.writer.writec(); ok {
        return writec, streamMsg
    }
    return p.pipeline.msgc, pipelineMsg
}

Stream Handler

每個 Member 啟動的時候都會相互添加 Peer ,每個 Peer 都會維護(hù)一個寫通道、一個讀通道

讀協(xié)程

讀協(xié)程的初始化

p.msgAppReader = &streamReader{
        lg:     t.Logger,
        peerID: peerID,
        typ:    "message",
        tr:     t,
        picker: picker,
        status: status,
        recvc:  p.recvc,
        propc:  p.propc,
        rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
    }

讀通道啟動的時候就會嘗試 dial 對應(yīng)的 Peer 地址

rc, err := cr.dial(t)

dial 實際上就是向 Peer 發(fā)送請求 GET /raft/stream/message/local_member_id,使用的就是 stream transport

uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())

resp, err := cr.tr.streamRt.RoundTrip(req)

Peer 端已經(jīng)注冊了 /raft/stream/ 開頭的請求由 streamHandler 處理,這個處理器會先返回200狀態(tài)碼給發(fā)送方

    w.WriteHeader(http.StatusOK)
    w.(http.Flusher).Flush()

然后將當(dāng)前連接發(fā)送給寫通道,當(dāng)前的 Writer 就是 ResponseWriter , Flusher 用于將寫入到 Response 緩沖區(qū)的消息強(qiáng)制推送給對方

c := newCloseNotifier()
    conn := &outgoingConn{
        t:       t,
        Writer:  w,
        Flusher: w.(http.Flusher),
        Closer:  c,
        localID: h.tr.ID,
        peerID:  from,
    }
    select {
    case streamWWriter.connc <- conn:
        return true
    case <-cw.done:
        return false
    }

最后將連接保持不斷開,連接兩端的消息依靠 Flusher 來推送

<-c.closeNotify()

而發(fā)送方接收到對方推送過來的 200 狀態(tài)碼后,會不斷嘗試從 Response 中讀取數(shù)據(jù),讀到數(shù)據(jù)后就進(jìn)行反序列化,再將反序列化后的消息發(fā)送到 recv_channel 或者 prop_channel

switch resp.StatusCode {
    case http.StatusOK:
        return resp.Body, nil

for {
        m, err := dec.decode()
        recvc := cr.recvc
        if m.Type == raftpb.MsgProp {
            recvc = cr.propc
        }

        select {
        case recvc <- m:

而這兩個 channel 都有專門的協(xié)程監(jiān)聽,而這里的 Process 就是通過 step 方法將消息流入到 raft 流程里

go func() {
        for {
            select {
            case mm := <-p.recvc:
                if err := r.Process(ctx, mm); err != nil {
                    
                }
            case <-p.stopc:
                return
            }
        }
    }()

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    
    return s.r.Step(ctx, m)
}

寫協(xié)程

寫協(xié)程的初始化

w := &streamWriter{
        lg: lg,

        localID: local,
        peerID:  id,

        status: status,
        fs:     fs,
        r:      r,
        msgc:   make(chan raftpb.Message, 4096),
        connc:  make(chan *outgoingConn),
        stopc:  make(chan struct{}),
        done:   make(chan struct{}),
    }

寫通道在啟動的時候會監(jiān)聽在conn_channel 上,而讀通道在在啟動的時候會將連接發(fā)送到這個通道,這里寫通道就能正常收到這個連接,并完成一些初始化功能,比如將 working 設(shè)置為true,意味著 stream transport 是正常工作的,那么在 Peer 之間相互發(fā)送消息的時候就會優(yōu)先選擇 stream transport,而不會退化到 pipeline transport;同時再初始化 msg_channel,意味著可以開始接收消息了

case conn := <-cw.connc:
            cw.working = true
            heartbeatc, msgc = tickc.C, cw.msgc

在 Peer 之間相互發(fā)送消息的時候,首先會挑選一個寫入通道,正如前面所說,發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport,其它消息或者 stream transport 正常初始化時都會使用 stream 的寫入通道

func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
    var ok bool
    
    if isMsgSnap(m) {
        return p.pipeline.msgc, pipelineMsg
    }  else if writec, ok = p.writer.writec(); ok {
        return writec, streamMsg
    }
    return p.pipeline.msgc, pipelineMsg
}

發(fā)送時直接向消息寫入到 pick 到的寫通道里

writec, name := p.pick(m)
    select {
    case writec <- m:

而寫協(xié)程同時也監(jiān)聽著 msg_channel ,對于需要發(fā)送給 Peer 的消息會先編碼,再選擇合適的時機(jī)推送給 Peer ,如沒有更多消息 或者 消息累計到了 緩沖區(qū)的一半,就會通過 Flusher 將 Response 緩沖區(qū)的消息強(qiáng)制推送給對方

case m := <-msgc:
            err := enc.encode(&m)
            if err == nil {
                unflushed += m.Size()

                if len(msgc) == 0 || batched > 4096 / 2 {
                    flusher.Flush()
                    sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
                    unflushed = 0
                    batched = 0
                } else {
                    batched++
                }

                continue
            }

Pipeline Handler

相比較之下,pipeline handler 的工作原理就簡單很多了;按照 pick 的規(guī)則,發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport

此時需要發(fā)送給 Peer 的消息,就會 pick 到這個 pipeline 的寫通道,而 pipeline 在啟動時會監(jiān)聽在這個寫通道上,收到消息后就構(gòu)建 HTTP 請求發(fā)送給 Peer

func (p *pipeline) handle() {
    defer p.wg.Done()

    for {
        select {
        case m := <-p.msgc:
            start := time.Now()
            err := p.post(pbutil.MustMarshal(&m))

這里構(gòu)建的是 HTTP POST請求,請求的就是 POST /raft路徑,使用的就是 pipeline transport

req, err := http.NewRequest(http.MethodPost, "/raft", body)

req.Header.Set("Content-Type", "application/protobuf")

resp, err := p.tr.pipelineRt.RoundTrip(req)

正如前面所說,POST /raft路徑的請求會交給 pipelineHandler處理,pipelineHandler 的處理也很簡單,直接通過 Process 調(diào)用 step 方法將消息發(fā)給 raft 流程處理

func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if err := h.r.Process(context.TODO(), m); err != nil {

小結(jié)

  • 每個 Member 都會 維護(hù)到其它 Peers 的兩個連接,一種是pipeline transport, 另一種是 stream transport;發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport;否則 raft 相關(guān)的消息都會通過 stream transport 發(fā)送

  • 每個 Member 到每個 Peer 都會維護(hù)兩個協(xié)程:

    • 一個讀協(xié)程,會不斷從 stream transport 中讀取數(shù)據(jù),然后發(fā)送到 recv_channel 或者 prop_channel,而這兩個通道都會通過 step 方法將消息發(fā)送給 raft 流程處理

    • 一個寫協(xié)程,需要發(fā)送給 Peer 的消息,首先會 pick 一個寫通道,pick 的原則就是 發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport,否則都會 pick 到 stream transport;然后將消息寫入到這個寫通道,而這個寫通道對應(yīng)的就是 Peer 連接中的 Response,負(fù)責(zé)將消息推送給 Peer

    • Peer 的讀協(xié)程就負(fù)責(zé)從 Response 中讀取數(shù)據(jù),再發(fā)送給自己的 raft 流程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容