raft 協(xié)議是一個(gè)一致性算法,解決多臺(tái)機(jī)器之間數(shù)據(jù)一致的問(wèn)題。raft 聲稱(chēng)簡(jiǎn)潔明了,可以取代非常復(fù)雜的 PAXOS 算法。然而翻看 raft 的論文后,會(huì)發(fā)現(xiàn)即便聲稱(chēng)簡(jiǎn)潔明了,自己完整地實(shí)現(xiàn) raft 還是很麻煩的。
etcd是一個(gè)分布式的 key-value 存儲(chǔ)組件,它通過(guò) raft 算法保證多臺(tái)機(jī)器數(shù)據(jù)的一致性。那么 etcd 中的 raft 算法可以提取出來(lái)用在自己的項(xiàng)目中嗎?
答案是可以的。etcd 不僅實(shí)現(xiàn)了 raft,還把 raft 解耦得很完美,完全可以獨(dú)立使用。代碼庫(kù)點(diǎn)這兒:https://github.com/etcd-io/etcd/tree/master/raft。
美中不足的是,etcd raft的使用文檔寫(xiě)得很爛,文檔中列的代碼缺了很多關(guān)鍵部分,是跑不起來(lái)的。按照文檔中的代碼寫(xiě),不是報(bào)錯(cuò)就是 go panic,要不就是跑起來(lái)后機(jī)器都僵著不選舉。經(jīng)過(guò)筆者的實(shí)踐,補(bǔ)齊了缺失的代碼,完成了一個(gè)可以跑起來(lái)的示例,代碼見(jiàn)文章最后。
實(shí)踐過(guò)程中,使用文檔中沒(méi)有提及的幾個(gè)點(diǎn):
文檔說(shuō)
n := raft.StartNode()就可以啟動(dòng)一個(gè)節(jié)點(diǎn),實(shí)際這樣做會(huì) panic,要自己額外再封裝一個(gè) struct ,并且實(shí)現(xiàn)Process()方法才行(見(jiàn)本文 raft.go里的rNode)文檔說(shuō)集群中在收到對(duì)方節(jié)點(diǎn)的 RPC 消息時(shí),要調(diào)用
n.Step()方法:
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}
但這個(gè)recvRaftRPC() 又在哪調(diào)用呢?回顧第 1 條不是要自己封裝一個(gè) struct 嗎,n.Step() 應(yīng)該寫(xiě)在這個(gè) struct 的 Process() 方法里,而不是放在什么 recvRaftRPC() 里(見(jiàn)本文 raft.go 里的 rNode)。raft 算法會(huì)在接收到其他節(jié)點(diǎn)的RPC請(qǐng)求時(shí)調(diào)用 Process(),
- 還是
raft.StartNode(),文檔的這段代碼:
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
意思是三個(gè)節(jié)點(diǎn)的集群,如果當(dāng)前啟動(dòng)節(jié)點(diǎn) ID 是 0x01,那么啟動(dòng)時(shí) peer 列表只傳 0x02, 0x03,不傳自己,實(shí)際這樣做啟動(dòng)集群后會(huì)僵住不選舉。正確做法是把節(jié)點(diǎn)自己也傳入 peer 列表。
- 文檔中的
for-select循環(huán),是要寫(xiě)在一個(gè) go 協(xié)程里的。不然啟動(dòng)后集群會(huì)僵住不選舉。
示例代碼介紹
本文的示例代碼是一個(gè)三節(jié)點(diǎn)的集群,節(jié)點(diǎn)之前通過(guò) http 交換 raft 報(bào)文。
集群?jiǎn)?dòng)之后,0x01節(jié)點(diǎn)會(huì)每隔 1 秒申請(qǐng)?zhí)岚福ㄒ簿褪菢I(yè)務(wù)數(shù)據(jù)):
for {
log.Printf("Propose on node %v\n", *id)
n.node.Propose(context.TODO(), []byte("hello"))
time.Sleep(time.Second)
}
然后在代碼的 這個(gè)地方:
for _, entry := range rd.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
....
}
集群的每個(gè)節(jié)點(diǎn)都會(huì)收到這個(gè)提案,這時(shí)后提案在集群里是一致的了,可以放心地持久化了。
完整代碼:
main.go
package main
import (
"context"
"flag"
"log"
"time"
)
func main() {
id := flag.Uint64("id", 1, "node id")
flag.Parse()
log.Printf("I'am node %v\n", *id)
cluster := map[uint64]string{
1: "http://127.0.0.1:22210",
2: "http://127.0.0.1:22220",
3: "http://127.0.0.1:22230",
}
n := newRaftNode(*id, cluster)
if *id == 1 {
time.Sleep(5 * time.Second)
for {
log.Printf("Propose on node %v\n", *id)
n.node.Propose(context.TODO(), []byte("hello"))
time.Sleep(time.Second)
}
}
select {}
}
raft.go
package main
import (
"context"
"log"
"net/http"
"strconv"
"strings"
"time"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
stats "go.etcd.io/etcd/etcdserver/api/v2stats"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
type rNode struct {
id uint64
peerMap map[uint64]string
node raft.Node
raftStorage *raft.MemoryStorage
transport *rafthttp.Transport
}
func newRaftNode(id uint64, peerMap map[uint64]string) *rNode {
n := &rNode{
id: id,
peerMap: peerMap,
raftStorage: raft.NewMemoryStorage(),
}
go n.startRaft()
return n
}
func (rn *rNode) startRaft() {
peers := []raft.Peer{}
for i := range rn.peerMap {
peers = append(peers, raft.Peer{ID: uint64(i)})
}
c := &raft.Config{
ID: rn.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rn.raftStorage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
rn.node = raft.StartNode(c, peers)
rn.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
ID: types.ID(rn.id),
ClusterID: 0x1000,
Raft: rn,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(rn.id))),
ErrorC: make(chan error),
}
rn.transport.Start()
for peer, addr := range rn.peerMap {
if peer != rn.id {
rn.transport.AddPeer(types.ID(peer), []string{addr})
}
}
go rn.serveRaft()
go rn.serveChannels()
}
func (rn *rNode) serveRaft() {
addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):]
server := http.Server{
Addr: addr,
Handler: rn.transport.Handler(),
}
server.ListenAndServe()
}
func (rn *rNode) serveChannels() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rn.node.Tick()
case rd := <-rn.node.Ready():
rn.raftStorage.Append(rd.Entries)
rn.transport.Send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
rn.raftStorage.ApplySnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
rn.node.ApplyConfChange(cc)
}
}
rn.node.Advance()
case err := <-rn.transport.ErrorC:
log.Fatal(err)
}
}
}
func (rn *rNode) Process(ctx context.Context, m raftpb.Message) error {
return rn.node.Step(ctx, m)
}
func (rn *rNode) IsIDRemoved(id uint64) bool { return false }
func (rn *rNode) ReportUnreachable(id uint64) {}
func (rn *rNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}