etcdctl put k v
etcdctl 作為 etcd 的客戶端工具,通過他操作 kv 時(shí),它會(huì)將命令轉(zhuǎn)換為一個(gè) grpc請(qǐng)求,發(fā)送到 etcd 的client監(jiān)聽端口
etcdctl put k v 對(duì)應(yīng)的會(huì)請(qǐng)求grpc注冊(cè)方法:/etcdserverpb.KV/Put
etcd收到這個(gè) 請(qǐng)求后,會(huì)把這個(gè)請(qǐng)求包裝為一個(gè) raft 請(qǐng)求,交給 raft 狀態(tài)機(jī)處理,并返回 raft 處理結(jié)果
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
轉(zhuǎn)換 raft 請(qǐng)求
首先會(huì)檢查當(dāng)前 server狀態(tài),已經(jīng) commit 的index不能落后 apply index太多,commit意味著記錄已經(jīng)通過了 raft 提議,即被超過半數(shù)節(jié)點(diǎn)認(rèn)可;apply意味著記錄已完成了持久化,數(shù)據(jù)寫入了 db 文件;如果當(dāng)前 server 存在大量沒有落盤的記錄,就會(huì)拒絕后續(xù)請(qǐng)求
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+5000{
return nil, errors.ErrTooManyRequests
}
然后初始化一個(gè)隨機(jī)請(qǐng)求ID
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
請(qǐng)求序列化,檢測(cè)序列化后的大小不能超過最大的請(qǐng)求大小,默認(rèn)是1.5M
data, err := r.Marshal()
if len(data) > int(1.5M) {
return nil, errors.ErrRequestTooLarge
}
將請(qǐng)求注冊(cè)到map中,等待 raft 處理完成后,再?gòu)倪@里通知處理結(jié)果
ch := s.w.Register(id)
func (w *list) Register(id uint64) <-chan any {
idx := id % 64
newCh := make(chan any, 1)
w.e[idx].l.Lock()
defer w.e[idx].l.Unlock()
if _, ok := w.e[idx].m[id]; !ok {
w.e[idx].m[id] = newCh
} else {
log.Panicf("dup id %x", id)
}
return newCh
}
像 raft 狀態(tài)機(jī)提交這個(gè)請(qǐng)求
err = s.r.Propose(cctx, data)
阻塞等待 raft 處理結(jié)果
select {
case x := <-ch:
return x.(*apply2.Result), nil
}
提交 raft 請(qǐng)求
先將請(qǐng)求數(shù)據(jù)包裝為一個(gè)消息,類型是pb.MsgProp,請(qǐng)求數(shù)據(jù)作為 entry
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
將上述消息發(fā)送到 raft_node 的 prop_channel,即提議一條消息
propc: make(chan msgWithResult) ;prop_channel 是一個(gè)無(wú)緩沖的通道,通道的讀寫操作都會(huì)阻塞
對(duì)于不需要返回結(jié)果的消息,會(huì)直接返回
對(duì)于需要返回結(jié)果的會(huì)進(jìn)一步等待這條消息被提交到 raft 狀態(tài)機(jī);注意只是等待這條消息被正常提議,而不是等待實(shí)際的請(qǐng)求執(zhí)行結(jié)果,實(shí)際的執(zhí)行結(jié)果還是通過上述注冊(cè)Register的通道返回
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
ch := n.propc
pm := msgWithResult{m: m}
if wait {
pm.result = make(chan error, 1)
}
select {
case ch <- pm:
if !wait {
return nil
}
}
select {
case err := <-pm.result:
if err != nil {
return err
}
}
return nil
}
raft_node 會(huì)監(jiān)聽在 prop_channel 上,收到消息后會(huì)向 raft 提議這條消息,并返回提議結(jié)果,一般用于快速失敗
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
raft 處理請(qǐng)求
Follower
如果這個(gè)請(qǐng)求是 Follower 收到的,F(xiàn)ollower 是不能處理 pb.MsgProp 類型的消息的,需要把這個(gè)消息進(jìn)一步轉(zhuǎn)發(fā)給 Leader
switch m.Type {
case pb.MsgProp:
m.To = r.lead
r.send(m)
這里的發(fā)送也是簡(jiǎn)單的把這個(gè)消息追加到 msgs 消息列表里
r.msgs = append(r.msgs, m)
Step完成之后,自動(dòng)進(jìn)入下一次循環(huán),此時(shí)會(huì)檢測(cè)到 raft 狀態(tài)機(jī)有變化,因?yàn)?code>HasReady方法里會(huì)檢測(cè) msgs 消息列表里是否存在消息;然后構(gòu)建 Ready 信號(hào)發(fā)送到 ready_channel
for {
if advancec == nil && n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
select {
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
readyc = nil
上層 server 同時(shí)監(jiān)聽在 ready_channel ,將消息通過 Peer的Stream連接發(fā)送給Leader
case rd := <-r.readyc:
r.transport.Send(msgs)
Leader
Leader 是可以直接處理 pb.MsgProp 這個(gè)類型的消息的,他會(huì)先嘗試把這個(gè)消息追加到本地,然后廣播這些消息給所有Followers
switch m.Type {
case pb.MsgProp:
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
可以看到這個(gè)流程和當(dāng)選 Leader 后發(fā)送的空 entry 的流程基本上是一致的,不同的是這里的 entry 是攜帶了 kv 數(shù)據(jù)的
因此這里也需要等待超過半數(shù) Members 對(duì)這個(gè) entry 進(jìn)行 commit 確認(rèn);確認(rèn)后 Leader 會(huì)先更新自己的 commit index;然后再將最新的 commit index 廣播給所有 Followers,來推動(dòng) Followers 的 commit index 的更新
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
l.committed = tocommit
}
}
r.bcastAppend()
以 Leader 為例,更新了 commit index 后,而 hard_state 是和 commit index 息息相關(guān)的
func (r *raft) hardState() pb.HardState {
return pb.HardState{
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
}
}
因此下一輪會(huì)通過 hard_state 檢測(cè)到 Ready 信號(hào)再次就緒,而在 Ready 信號(hào)中,會(huì)通過 CommittedEntries 攜帶已經(jīng)確認(rèn)的那些 entry,讀取的就是上一次 apply index 到 這次 commit index 之間的這些 entry;上述已經(jīng)通過 raft 流程的 kv entry 就會(huì)在這里獲取到
rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
Messages: r.msgs,
}
上層 server 通過 ready_channel 獲取到 CommittedEntries后,會(huì)包裝為一個(gè) Apply 對(duì)象,并發(fā)送到 apply_channel
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
raftAdvancedC: raftAdvancedC,
}
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
而上層 server 中另外的協(xié)程來處理 apply_channel 中的數(shù)據(jù)
for {
select {
case ap := <-s.r.apply():
具體是通過一個(gè) FIFO 的隊(duì)列來順序處理收到的 apply 消息,在沒有消息的時(shí)候,F(xiàn)IFO 隊(duì)列會(huì)一直阻塞,收到任務(wù)后開始順序執(zhí)行
sched := schedule.NewFIFOScheduler(lg)
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
在處理 apply 消息時(shí),會(huì)按照順序依次處理每一個(gè) entry 數(shù)據(jù)
for i := range apply.entries {
switch e.Type {
case raftpb.EntryNormal:
// gofail: var beforeApplyOneEntryNormal struct{}
s.applyEntryNormal(&e, shouldApplyV3)
對(duì)于每一條 entry 數(shù)據(jù),會(huì)先反序列化成 InternalRaftRequest 對(duì)象·
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
}
然后再次通過上述初始化的隨機(jī)請(qǐng)求ID,來查詢這個(gè)請(qǐng)求是否注冊(cè)過了,注冊(cè)過的就是需要返回結(jié)果的
id := raftReq.ID
needResult := s.w.IsRegistered(id)
然后在根據(jù)請(qǐng)求類型來分別處理,這里是 Put 請(qǐng)求,因此會(huì)走 Put 的邏輯
switch {
case r.Range != nil:
op = "Range"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
Put 數(shù)據(jù)
put 操作與前文中關(guān)于 MVCC 中的處理就完全一致了
Del 數(shù)據(jù)
刪除數(shù)據(jù) 和 Put 數(shù)據(jù)的邏輯是完全一致的,也是通過追加 entry 的方式來經(jīng)過完整的一遍 raft 流程,只不過 entry 中的數(shù)據(jù)是pb.InternalRaftRequest{DeleteRange: r} , 即 Delete 請(qǐng)求數(shù)據(jù)
走完 raft 流程后,也是通過 apply 來更新 MVCC 和 boltdb 中的數(shù)據(jù)
數(shù)據(jù)的更新方式與前文中關(guān)于 MVCC 中的數(shù)據(jù)刪除處理完全一致了
總結(jié)
Put 和 Del 操作都需要經(jīng)歷完整的 raft 流程,并且都是通過追加 entry 的方式發(fā)送 pb.MsgProp 消息
Followers 不處理 pb.MsgProp 類型的消息,而是轉(zhuǎn)發(fā)給 Leader 處理
Leader 發(fā)起 raft 流程,并將消息廣播給 Followers,等待超過半數(shù) Members 對(duì) entry 進(jìn)行 commit
commit 之后的 entry 會(huì)通過 Ready 信號(hào)通知上層 server 進(jìn)行 apply 操作,apply 操作會(huì)依次對(duì) commit 的 entries 進(jìn)行處理,每一個(gè) entry 都會(huì)經(jīng)歷 MVCC 中的 Put 或者 Delete 完整流程