ETCD《六》--Put/Del操作

etcdctl put k v

etcdctl 作為 etcd 的客戶端工具,通過他操作 kv 時(shí),它會(huì)將命令轉(zhuǎn)換為一個(gè) grpc請(qǐng)求,發(fā)送到 etcd 的client監(jiān)聽端口

etcdctl put k v 對(duì)應(yīng)的會(huì)請(qǐng)求grpc注冊(cè)方法:/etcdserverpb.KV/Put

etcd收到這個(gè) 請(qǐng)求后,會(huì)把這個(gè)請(qǐng)求包裝為一個(gè) raft 請(qǐng)求,交給 raft 狀態(tài)機(jī)處理,并返回 raft 處理結(jié)果

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
    resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
    if err != nil {
        return nil, err
    }
    return resp.(*pb.PutResponse), nil
}

轉(zhuǎn)換 raft 請(qǐng)求

首先會(huì)檢查當(dāng)前 server狀態(tài),已經(jīng) commit 的index不能落后 apply index太多,commit意味著記錄已經(jīng)通過了 raft 提議,即被超過半數(shù)節(jié)點(diǎn)認(rèn)可;apply意味著記錄已完成了持久化,數(shù)據(jù)寫入了 db 文件;如果當(dāng)前 server 存在大量沒有落盤的記錄,就會(huì)拒絕后續(xù)請(qǐng)求

ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+5000{
        return nil, errors.ErrTooManyRequests
    }

然后初始化一個(gè)隨機(jī)請(qǐng)求ID

r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

請(qǐng)求序列化,檢測(cè)序列化后的大小不能超過最大的請(qǐng)求大小,默認(rèn)是1.5M

data, err := r.Marshal()

if len(data) > int(1.5M) {
        return nil, errors.ErrRequestTooLarge
    }

將請(qǐng)求注冊(cè)到map中,等待 raft 處理完成后,再?gòu)倪@里通知處理結(jié)果

ch := s.w.Register(id)

func (w *list) Register(id uint64) <-chan any {
    idx := id % 64
    newCh := make(chan any, 1)
    w.e[idx].l.Lock()
    defer w.e[idx].l.Unlock()
    if _, ok := w.e[idx].m[id]; !ok {
        w.e[idx].m[id] = newCh
    } else {
        log.Panicf("dup id %x", id)
    }
    return newCh
}

像 raft 狀態(tài)機(jī)提交這個(gè)請(qǐng)求

err = s.r.Propose(cctx, data)

阻塞等待 raft 處理結(jié)果

select {
    case x := <-ch:
        return x.(*apply2.Result), nil
    }

提交 raft 請(qǐng)求

先將請(qǐng)求數(shù)據(jù)包裝為一個(gè)消息,類型是pb.MsgProp,請(qǐng)求數(shù)據(jù)作為 entry

func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

將上述消息發(fā)送到 raft_node 的 prop_channel,即提議一條消息

  • propc: make(chan msgWithResult) ;prop_channel 是一個(gè)無(wú)緩沖的通道,通道的讀寫操作都會(huì)阻塞

  • 對(duì)于不需要返回結(jié)果的消息,會(huì)直接返回

  • 對(duì)于需要返回結(jié)果的會(huì)進(jìn)一步等待這條消息被提交到 raft 狀態(tài)機(jī);注意只是等待這條消息被正常提議,而不是等待實(shí)際的請(qǐng)求執(zhí)行結(jié)果,實(shí)際的執(zhí)行結(jié)果還是通過上述注冊(cè)Register的通道返回

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    
    ch := n.propc
    pm := msgWithResult{m: m}
    if wait {
        pm.result = make(chan error, 1)
    }
    select {
    case ch <- pm:
        if !wait {
            return nil
        }
    
    }
    select {
    case err := <-pm.result:
        if err != nil {
            return err
        }
    }
    return nil
}

raft_node 會(huì)監(jiān)聽在 prop_channel 上,收到消息后會(huì)向 raft 提議這條消息,并返回提議結(jié)果,一般用于快速失敗

case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

raft 處理請(qǐng)求

Follower

如果這個(gè)請(qǐng)求是 Follower 收到的,F(xiàn)ollower 是不能處理 pb.MsgProp 類型的消息的,需要把這個(gè)消息進(jìn)一步轉(zhuǎn)發(fā)給 Leader

switch m.Type {
    case pb.MsgProp:
        m.To = r.lead
        r.send(m)

這里的發(fā)送也是簡(jiǎn)單的把這個(gè)消息追加到 msgs 消息列表里

r.msgs = append(r.msgs, m)

Step完成之后,自動(dòng)進(jìn)入下一次循環(huán),此時(shí)會(huì)檢測(cè)到 raft 狀態(tài)機(jī)有變化,因?yàn)?code>HasReady方法里會(huì)檢測(cè) msgs 消息列表里是否存在消息;然后構(gòu)建 Ready 信號(hào)發(fā)送到 ready_channel

for {
        if advancec == nil && n.rn.HasReady() {
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }

        select {
        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }
        case readyc <- rd:
            n.rn.acceptReady(rd)
            advancec = n.advancec
            readyc = nil

上層 server 同時(shí)監(jiān)聽在 ready_channel ,將消息通過 Peer的Stream連接發(fā)送給Leader

case rd := <-r.readyc:
                    r.transport.Send(msgs)

Leader

Leader 是可以直接處理 pb.MsgProp 這個(gè)類型的消息的,他會(huì)先嘗試把這個(gè)消息追加到本地,然后廣播這些消息給所有Followers

switch m.Type {
    case pb.MsgProp:
        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()
        return nil

可以看到這個(gè)流程和當(dāng)選 Leader 后發(fā)送的空 entry 的流程基本上是一致的,不同的是這里的 entry 是攜帶了 kv 數(shù)據(jù)的

因此這里也需要等待超過半數(shù) Members 對(duì)這個(gè) entry 進(jìn)行 commit 確認(rèn);確認(rèn)后 Leader 會(huì)先更新自己的 commit index;然后再將最新的 commit index 廣播給所有 Followers,來推動(dòng) Followers 的 commit index 的更新

func (l *raftLog) commitTo(tocommit uint64) {
    // never decrease commit
    if l.committed < tocommit {
        l.committed = tocommit
    }
}

r.bcastAppend()

以 Leader 為例,更新了 commit index 后,而 hard_state 是和 commit index 息息相關(guān)的

func (r *raft) hardState() pb.HardState {
    return pb.HardState{
        Term:   r.Term,
        Vote:   r.Vote,
        Commit: r.raftLog.committed,
    }
}

因此下一輪會(huì)通過 hard_state 檢測(cè)到 Ready 信號(hào)再次就緒,而在 Ready 信號(hào)中,會(huì)通過 CommittedEntries 攜帶已經(jīng)確認(rèn)的那些 entry,讀取的就是上一次 apply index 到 這次 commit index 之間的這些 entry;上述已經(jīng)通過 raft 流程的 kv entry 就會(huì)在這里獲取到

rd := Ready{
        Entries:          r.raftLog.nextUnstableEnts(),
        CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
        Messages:         r.msgs,
    }

上層 server 通過 ready_channel 獲取到 CommittedEntries后,會(huì)包裝為一個(gè) Apply 對(duì)象,并發(fā)送到 apply_channel

ap := toApply{
                    entries:       rd.CommittedEntries,
                    snapshot:      rd.Snapshot,
                    notifyc:       notifyc,
                    raftAdvancedC: raftAdvancedC,
                }

select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

而上層 server 中另外的協(xié)程來處理 apply_channel 中的數(shù)據(jù)

for {
        select {
        case ap := <-s.r.apply():

具體是通過一個(gè) FIFO 的隊(duì)列來順序處理收到的 apply 消息,在沒有消息的時(shí)候,F(xiàn)IFO 隊(duì)列會(huì)一直阻塞,收到任務(wù)后開始順序執(zhí)行

sched := schedule.NewFIFOScheduler(lg)

f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)

在處理 apply 消息時(shí),會(huì)按照順序依次處理每一個(gè) entry 數(shù)據(jù)

for i := range apply.entries {
        switch e.Type {
        case raftpb.EntryNormal:
            // gofail: var beforeApplyOneEntryNormal struct{}
            s.applyEntryNormal(&e, shouldApplyV3)

對(duì)于每一條 entry 數(shù)據(jù),會(huì)先反序列化成 InternalRaftRequest 對(duì)象·

var raftReq pb.InternalRaftRequest
    if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
        
    }

然后再次通過上述初始化的隨機(jī)請(qǐng)求ID,來查詢這個(gè)請(qǐng)求是否注冊(cè)過了,注冊(cè)過的就是需要返回結(jié)果的

id := raftReq.ID
needResult := s.w.IsRegistered(id)

然后在根據(jù)請(qǐng)求類型來分別處理,這里是 Put 請(qǐng)求,因此會(huì)走 Put 的邏輯

switch {
    case r.Range != nil:
        op = "Range"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
    case r.Put != nil:
        op = "Put"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
    case r.DeleteRange != nil:
        op = "DeleteRange"
        ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)

Put 數(shù)據(jù)

put 操作與前文中關(guān)于 MVCC 中的處理就完全一致了

Del 數(shù)據(jù)

刪除數(shù)據(jù) 和 Put 數(shù)據(jù)的邏輯是完全一致的,也是通過追加 entry 的方式來經(jīng)過完整的一遍 raft 流程,只不過 entry 中的數(shù)據(jù)是pb.InternalRaftRequest{DeleteRange: r} , 即 Delete 請(qǐng)求數(shù)據(jù)

走完 raft 流程后,也是通過 apply 來更新 MVCC 和 boltdb 中的數(shù)據(jù)

數(shù)據(jù)的更新方式與前文中關(guān)于 MVCC 中的數(shù)據(jù)刪除處理完全一致了

總結(jié)

  • Put 和 Del 操作都需要經(jīng)歷完整的 raft 流程,并且都是通過追加 entry 的方式發(fā)送 pb.MsgProp 消息

  • Followers 不處理 pb.MsgProp 類型的消息,而是轉(zhuǎn)發(fā)給 Leader 處理

  • Leader 發(fā)起 raft 流程,并將消息廣播給 Followers,等待超過半數(shù) Members 對(duì) entry 進(jìn)行 commit

  • commit 之后的 entry 會(huì)通過 Ready 信號(hào)通知上層 server 進(jìn)行 apply 操作,apply 操作會(huì)依次對(duì) commit 的 entries 進(jìn)行處理,每一個(gè) entry 都會(huì)經(jīng)歷 MVCC 中的 Put 或者 Delete 完整流程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容