Peer監(jiān)聽
Peer 監(jiān)聽通常監(jiān)聽在 2380 端口上,用于提供 Peer 之間的 https 服務(wù)
注冊的一些服務(wù)端點:
/raft:raftHandler處理
/members:peerMembersHandler處理
/members/promote/:peerMemberPromoteHandler處理
/leases、/leases/internal:leaseHandler處理
/downgrade/enabled:downgradeEnabledHandler處理
/members/hashkv:hashKVHandler處理
/version:versionHandler處理
Raft Handler
Raft Handler有可以進(jìn)一步劃分為
/raft:pipelineHandler處理
/raft/stream/:streamHandler處理
/raft/snapshot:snapHandler處理
/raft/probing:httpHealth處理
raft handler在初始化的時候會創(chuàng)建兩類 Transport,一種是pipeline transport, 另一種是 stream transport
func (t *Transport) Start() error {
var err error
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
return nil
}
stream transport:長連接不斷開,Peer 之間相互推送的方式
pipeline transport:短鏈接,請求一次,響應(yīng)一次;發(fā)送 snapshot 或者 stream transport 不可用時才會使用這個;snapshot 也使用這個主要考慮是 snapshot 較大 ,避免阻塞其他消息
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
Stream Handler
每個 Member 啟動的時候都會相互添加 Peer ,每個 Peer 都會維護(hù)一個寫通道、一個讀通道
讀協(xié)程
讀協(xié)程的初始化
p.msgAppReader = &streamReader{
lg: t.Logger,
peerID: peerID,
typ: "message",
tr: t,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(t.DialRetryFrequency, 1),
}
讀通道啟動的時候就會嘗試 dial 對應(yīng)的 Peer 地址
rc, err := cr.dial(t)
dial 實際上就是向 Peer 發(fā)送請求 GET /raft/stream/message/local_member_id,使用的就是 stream transport
uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
resp, err := cr.tr.streamRt.RoundTrip(req)
Peer 端已經(jīng)注冊了 /raft/stream/ 開頭的請求由 streamHandler 處理,這個處理器會先返回200狀態(tài)碼給發(fā)送方
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
然后將當(dāng)前連接發(fā)送給寫通道,當(dāng)前的 Writer 就是 ResponseWriter , Flusher 用于將寫入到 Response 緩沖區(qū)的消息強(qiáng)制推送給對方
c := newCloseNotifier()
conn := &outgoingConn{
t: t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
localID: h.tr.ID,
peerID: from,
}
select {
case streamWWriter.connc <- conn:
return true
case <-cw.done:
return false
}
最后將連接保持不斷開,連接兩端的消息依靠 Flusher 來推送
<-c.closeNotify()
而發(fā)送方接收到對方推送過來的 200 狀態(tài)碼后,會不斷嘗試從 Response 中讀取數(shù)據(jù),讀到數(shù)據(jù)后就進(jìn)行反序列化,再將反序列化后的消息發(fā)送到 recv_channel 或者 prop_channel
switch resp.StatusCode {
case http.StatusOK:
return resp.Body, nil
for {
m, err := dec.decode()
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
而這兩個 channel 都有專門的協(xié)程監(jiān)聽,而這里的 Process 就是通過 step 方法將消息流入到 raft 流程里
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
}
case <-p.stopc:
return
}
}
}()
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.r.Step(ctx, m)
}
寫協(xié)程
寫協(xié)程的初始化
w := &streamWriter{
lg: lg,
localID: local,
peerID: id,
status: status,
fs: fs,
r: r,
msgc: make(chan raftpb.Message, 4096),
connc: make(chan *outgoingConn),
stopc: make(chan struct{}),
done: make(chan struct{}),
}
寫通道在啟動的時候會監(jiān)聽在conn_channel 上,而讀通道在在啟動的時候會將連接發(fā)送到這個通道,這里寫通道就能正常收到這個連接,并完成一些初始化功能,比如將 working 設(shè)置為true,意味著 stream transport 是正常工作的,那么在 Peer 之間相互發(fā)送消息的時候就會優(yōu)先選擇 stream transport,而不會退化到 pipeline transport;同時再初始化 msg_channel,意味著可以開始接收消息了
case conn := <-cw.connc:
cw.working = true
heartbeatc, msgc = tickc.C, cw.msgc
在 Peer 之間相互發(fā)送消息的時候,首先會挑選一個寫入通道,正如前面所說,發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport,其它消息或者 stream transport 正常初始化時都會使用 stream 的寫入通道
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
var ok bool
if isMsgSnap(m) {
return p.pipeline.msgc, pipelineMsg
} else if writec, ok = p.writer.writec(); ok {
return writec, streamMsg
}
return p.pipeline.msgc, pipelineMsg
}
發(fā)送時直接向消息寫入到 pick 到的寫通道里
writec, name := p.pick(m)
select {
case writec <- m:
而寫協(xié)程同時也監(jiān)聽著 msg_channel ,對于需要發(fā)送給 Peer 的消息會先編碼,再選擇合適的時機(jī)推送給 Peer ,如沒有更多消息 或者 消息累計到了 緩沖區(qū)的一半,就會通過 Flusher 將 Response 緩沖區(qū)的消息強(qiáng)制推送給對方
case m := <-msgc:
err := enc.encode(&m)
if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > 4096 / 2 {
flusher.Flush()
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue
}
Pipeline Handler
相比較之下,pipeline handler 的工作原理就簡單很多了;按照 pick 的規(guī)則,發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport
此時需要發(fā)送給 Peer 的消息,就會 pick 到這個 pipeline 的寫通道,而 pipeline 在啟動時會監(jiān)聽在這個寫通道上,收到消息后就構(gòu)建 HTTP 請求發(fā)送給 Peer
func (p *pipeline) handle() {
defer p.wg.Done()
for {
select {
case m := <-p.msgc:
start := time.Now()
err := p.post(pbutil.MustMarshal(&m))
這里構(gòu)建的是 HTTP POST請求,請求的就是 POST /raft路徑,使用的就是 pipeline transport
req, err := http.NewRequest(http.MethodPost, "/raft", body)
req.Header.Set("Content-Type", "application/protobuf")
resp, err := p.tr.pipelineRt.RoundTrip(req)
正如前面所說,POST /raft路徑的請求會交給 pipelineHandler處理,pipelineHandler 的處理也很簡單,直接通過 Process 調(diào)用 step 方法將消息發(fā)給 raft 流程處理
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := h.r.Process(context.TODO(), m); err != nil {
小結(jié)
每個 Member 都會 維護(hù)到其它 Peers 的兩個連接,一種是pipeline transport, 另一種是 stream transport;發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport;否則 raft 相關(guān)的消息都會通過 stream transport 發(fā)送
-
每個 Member 到每個 Peer 都會維護(hù)兩個協(xié)程:
一個讀協(xié)程,會不斷從 stream transport 中讀取數(shù)據(jù),然后發(fā)送到 recv_channel 或者 prop_channel,而這兩個通道都會通過 step 方法將消息發(fā)送給 raft 流程處理
一個寫協(xié)程,需要發(fā)送給 Peer 的消息,首先會 pick 一個寫通道,pick 的原則就是 發(fā)送 snapshot 或者 stream transport 不可用時才會使用 pipeline transport,否則都會 pick 到 stream transport;然后將消息寫入到這個寫通道,而這個寫通道對應(yīng)的就是 Peer 連接中的 Response,負(fù)責(zé)將消息推送給 Peer
Peer 的讀協(xié)程就負(fù)責(zé)從 Response 中讀取數(shù)據(jù),再發(fā)送給自己的 raft 流程