目錄
1. PhxPaxos源碼分析之關(guān)于PhxPaxos
2. PhxPaxos分析之網(wǎng)絡(luò)基礎(chǔ)部件
3. PhxPaxos源碼分析之Proposer、Acceptor
4. PhxPaxos源碼分析之Learner
5. PhxPaxos源碼分析之狀態(tài)機(jī)
6. PhxPaxos源碼分析之歸檔機(jī)制
7. PhxPaxos源碼分析之整體架構(gòu)
2.1 背景
Paxos 算法解決是一個(gè)分布式系統(tǒng)如何就某個(gè)值(決議)達(dá)成一致,因此網(wǎng)絡(luò)通信是該算法可正常運(yùn)行的基礎(chǔ)。
在Paxos中,涉及網(wǎng)絡(luò)通信的角色包括:
- Proposer:Paxos提案發(fā)起者。
- Acceptor:Paxos提案接收者。
- Learner:Paxos確定值的學(xué)習(xí)者。
除算法本身角色外,PhxPaxos還設(shè)定了下述角色需要網(wǎng)絡(luò)通信:
-
Follower Node:Paxos集群的跟隨者。
Follower指定一個(gè)運(yùn)行Paxos協(xié)議的節(jié)點(diǎn)用于數(shù)據(jù)同步,它不參與Paxos算法。Follower更像傳統(tǒng)意義上的同步備,當(dāng)Paxos算法節(jié)點(diǎn)確定一個(gè)值后,將數(shù)據(jù)同步到Follower節(jié)點(diǎn)。但若follow的節(jié)點(diǎn)無法同步數(shù)據(jù),F(xiàn)ollower可以向整個(gè)Paxos集群發(fā)起learn請求。 -
CheckpointMgr:鏡像數(shù)據(jù)管理者。
指引業(yè)務(wù)生成鏡像數(shù)據(jù),一旦指定instance id之前的鏡像數(shù)據(jù)產(chǎn)生,理論上就可以移除該instance id之前的Paxos Log數(shù)據(jù),以免空間的無限擴(kuò)展。關(guān)于CheckpointMgr后面將單獨(dú)討論。
2.2 網(wǎng)絡(luò)層架構(gòu)
按協(xié)議劃分,PhxPaxos支持TCP、UDP兩種通信協(xié)議;按操作劃分,支持網(wǎng)絡(luò)數(shù)據(jù)讀、寫兩種操作;按角色劃分,分為客戶端、服務(wù)器兩種角色。在PhxPaxos中,服務(wù)器只負(fù)責(zé)讀取操作,客戶端只負(fù)責(zé)寫入操作。因此,在最簡場景下,我們需要4個(gè)封裝類,分別如下:
- TCPClient:TCP客戶端。
- TCPServer:TCP服務(wù)器。
- UDPSender:UDP數(shù)據(jù)發(fā)送器。
- UDPReceiver:UDP數(shù)據(jù)接收器。
而PhxPaxos中的實(shí)際實(shí)現(xiàn)要比這復(fù)雜的多,來看一組網(wǎng)絡(luò)架構(gòu)圖:

大致分為如下幾部分:
-
UDP封裝層
UDPRecv:開啟指定端口的UDP通信通道,啟動(dòng)獨(dú)立線程、通過poll方式接收網(wǎng)絡(luò)消息,最后將數(shù)據(jù)交給NetWork處理。
UDPSend:創(chuàng)建UDP Socket,啟動(dòng)獨(dú)立線程、異步觸發(fā)式發(fā)送消息。提供AddMessage接口接收需發(fā)送的數(shù)據(jù)內(nèi)容,將其放入消息隊(duì)列,供線程消費(fèi)。 -
TCP封裝層
包括ServerSocket、Socket、Event、EventLoop、TcpAcceptor、TcpClient、TcpRead、TcpWrite、TcpIoThread。
見2.3節(jié)。 -
整體網(wǎng)絡(luò)抽象層
包括NetWork、DfNetWork、MsgTransport、Communicate。
見2.4節(jié)。
2.3 TCP封裝層
為了清楚了解作者意圖,來看TCP封裝層類圖:

各個(gè)類功能說明如下:
-
Socket
Socket客戶端通信封裝類,負(fù)責(zé)TCP連接建立,數(shù)據(jù)收發(fā). -
ServerSocket
Socket服務(wù)器通信封裝類,負(fù)責(zé)Tcp服務(wù)器啟動(dòng)、監(jiān)聽. -
Event
網(wǎng)絡(luò)事件接收器(handler),負(fù)責(zé)操作Socket讀寫 -
EventLoop
網(wǎng)絡(luò)事件分發(fā)器(dispatcher),支持基于socket的訂閱,每個(gè)socket配對一個(gè)event。當(dāng)前主要分發(fā)兩類網(wǎng)絡(luò)事件:發(fā)送(write)、接收(read)。采用epoll實(shí)現(xiàn)網(wǎng)絡(luò)事件監(jiān)聽處理,并對外屏蔽實(shí)現(xiàn)細(xì)節(jié)。 -
TcpAcceptor
本節(jié)點(diǎn)TCP服務(wù)器,負(fù)責(zé)監(jiān)聽其他節(jié)點(diǎn)的客戶端連接。每收到一個(gè)客戶端連接,配置一個(gè)讀操作的Socket并配對一個(gè)Event -
TcpClient
TCP客戶端,負(fù)責(zé)發(fā)送數(shù)據(jù)到其他節(jié)點(diǎn)。每個(gè)ip、port配置一個(gè)寫操作的Socket,并配對一個(gè)Event。 -
TcpRead
啟動(dòng)TCP Acceptor線程,接收其他客戶端連接。啟動(dòng)EventLoop,接收已連接客戶端的數(shù)據(jù)包。 -
TcpWrite
啟動(dòng)EventLoop,發(fā)送TcpClient接收到的數(shù)據(jù)包。 -
TcpIoThread
對外的TCP操作封裝類,其自身并不啟動(dòng)線程,僅負(fù)責(zé)整合TcpRead、TcpWrite,對外提供AddMessage接口。
void TcpIOThread :: Stop()
{
if (m_bIsStarted)
{
m_oTcpRead.Stop();
m_oTcpWrite.Stop();
}
PLHead("TcpIOThread [END]");
}
int TcpIOThread :: Init(const std::string& sListenIp, const int iListenPort)
{
int ret = m_oTcpRead.Init(sListenIp, iListenPort);
if (ret == 0)
{
return m_oTcpWrite.Init();
}
return ret;
}
void TcpIOThread :: Start()
{
m_oTcpWrite.start();
m_oTcpRead.start();
m_bIsStarted = true;
}
int TcpIOThread :: AddMessage(const std::string& sIP, const int iPort, const std::string& sMessage)
{
return m_oTcpWrite.AddMessage(sIP, iPort, sMessage);
}
代碼結(jié)構(gòu)還算清晰,但是不是總覺得哪里不對?沒關(guān)系,先順著作者的思路看完。
2.4 整體網(wǎng)絡(luò)抽象層
相比UDP、TCP封裝層,整體網(wǎng)絡(luò)抽象層是更高一級的概念。PhxPaxos中屬于該層的網(wǎng)絡(luò)抽象類如下:
-
NetWork
整體網(wǎng)絡(luò)抽象類,對外屏蔽上述所有TCP\UDP實(shí)現(xiàn),支持接收、發(fā)送網(wǎng)絡(luò)數(shù)據(jù)。接收到的網(wǎng)絡(luò)數(shù)據(jù)交由處理器處理(當(dāng)前為Node對象)。
class NetWork
{
public:
virtual void RunNetWork() = 0;
virtual void StopNetWork() = 0;
virtual int SendMessageTCP(const std::string& sIp, const int iPort, const std::string& sMessage) = 0;
virtual int SendMessageUDP(const std::string& sIp, const int iPort, const std::string& sMessage) = 0;
int OnReceiveMessage(const char* pcMessage, const int iMessageLen);
};
-
DfNetWork
內(nèi)置的網(wǎng)絡(luò)實(shí)現(xiàn)類,負(fù)責(zé)管理UDPRecv、UDPSend、TpcIoThread(內(nèi)含TcpRead、TcpWrite)。
class DFNetWork : public NetWork
{
public:
int Init(const std::string& sListenIp, const int iListenPort);
//super interface
...
private:
UDPRecv m_oUDPRecv;
UDPSend m_oUDPSend;
TcpIOThread m_oTcpIOThread;
};
-
MsgTransport
網(wǎng)絡(luò)消息發(fā)送器,接口類。
class MsgTransport
{
public:
virtual int SendMessage(const nodeid_t iSendtoNodeID, const std::string& sBuffer,
const int iSendType = Message_SendType_UDP) = 0;
virtual int BroadcastMessage(const std::string& sBuffer,
const int iSendType = Message_SendType_UDP) = 0;
virtual int BroadcastMessageFollower(const std::string& sBuffer,
const int iSendType = Message_SendType_UDP) = 0;
virtual int BroadcastMessageTempNode(const std::string& sBuffer,
const int iSendType = Message_SendType_UDP) = 0;
};
-
Communicate
網(wǎng)絡(luò)消息發(fā)送器,實(shí)現(xiàn)類。內(nèi)部讀取配置信息并對NetWork接口的簡單封裝。
class Communicate : public MsgTransport
{
public:
//super interface
...
private:
Config* m_poConfig;
NetWork* m_poNetwork;
nodeid_t m_iMyNodeID;
size_t m_iUDPMaxSize;
};
其實(shí),這還沒完,還有一個(gè)不屬于“整體網(wǎng)絡(luò)抽象層”的網(wǎng)絡(luò)抽象,在base.h中。來看和網(wǎng)絡(luò)相關(guān)的接口定義:
class Base
{
public:
int PackMsg(const PaxosMsg& oPaxosMsg, std::string& sBuffer);
int PackCheckpointMsg(const CheckpointMsg& oCheckpointMsg, std::string& sBuffer);
void PackBaseMsg(const std::string& sBodyBuffer, const int iCmd, std::string& sBuffer);
static int UnPackBaseMsg(const std::string& sBuffer, Header& oHeader, size_t& iBodyStartPos, size_t& iBodyLen);
protected:
virtual int SendMessage(const nodeid_t iSendtoNodeID, const PaxosMsg& oPaxosMsg, const int iSendType = Message_SendType_UDP);
virtual int BroadcastMessage(
const PaxosMsg& oPaxosMsg,
const int bRunSelfFirst = BroadcastMessage_Type_RunSelf_First,
const int iSendType = Message_SendType_UDP);
int BroadcastMessageToFollower(
const PaxosMsg& oPaxosMsg,
const int iSendType = Message_SendType_TCP);
int BroadcastMessageToTempNode(
const PaxosMsg& oPaxosMsg,
const int iSendType = Message_SendType_UDP);
protected:
int SendMessage(const nodeid_t iSendtoNodeID, const CheckpointMsg& oCheckpointMsg,
const int iSendType = Message_SendType_TCP);
protected:
Config* m_poConfig;
MsgTransport* m_poMsgTransport;
Instance* m_poInstance;
};
base是三個(gè)主要角色(Proposer、Accepor、Learner)的基類,這里網(wǎng)絡(luò)相關(guān)操作主要包括打包和發(fā)送兩種。和MsgTransport的發(fā)送接口相比有何區(qū)別呢?base中的發(fā)送函數(shù)負(fù)責(zé)打包、發(fā)送以及基于Instance對象的部分特殊處理。以其中的一個(gè)SendMessage為例:
int Base :: SendMessage(const nodeid_t iSendtoNodeID, const PaxosMsg& oPaxosMsg, const int iSendType)
{
if (m_bIsTestMode)
{
return 0;
}
BP->GetInstanceBP()->SendMessage();
//本節(jié)點(diǎn)立即處理
if (iSendtoNodeID == m_poConfig->GetMyNodeID())
{
m_poInstance->OnReceivePaxosMsg(oPaxosMsg);
return 0;
}
//打包
string sBuffer;
int ret = PackMsg(oPaxosMsg, sBuffer);
if (ret != 0)
{
return ret;
}
//發(fā)送
return m_poMsgTransport->SendMessage(iSendtoNodeID, sBuffer, iSendType);
}
2.5 架構(gòu)探討
前面將PhxPaxos的網(wǎng)絡(luò)層抽象類全部過了一遍,現(xiàn)在我們來窺視下作者的意圖,并進(jìn)一步探討是否存在更合理的架構(gòu)。
PhxPaxos的網(wǎng)絡(luò)抽象層存在一個(gè)明顯的分界點(diǎn):NetWork抽象類。NetWork及其之下抽象是基于純粹網(wǎng)絡(luò)的,業(yè)務(wù)無關(guān)的;NetWork之上的是業(yè)務(wù)相關(guān)的。NetWork做為明顯分界點(diǎn)到額另外一個(gè)原因是:允許PhxPaxos的使用者注冊自己的NetWork實(shí)現(xiàn)類,以替換內(nèi)置的DfNetWork。來看NetWork及其之下的抽象:
XXX
按文章中最初提到的觀點(diǎn),理想中的NetWork應(yīng)該只包含四個(gè)類:TCPClient、TCPServer、UDPSender、UDPReceiver。當(dāng)前UDP和預(yù)期一致,但TCP當(dāng)前一共有9個(gè)類,比預(yù)想的多了7個(gè)。其中Socket和ServerSocket的抽象是合理的、粒度恰當(dāng)?shù)?,Event概念引入及抽象略顯奇怪。EventLoop的定位應(yīng)該是一個(gè)Package Dispather(網(wǎng)絡(luò)數(shù)據(jù)分發(fā)器),它反向依賴了TcpAcceptor、TcpClient。而且在Phxpaxos場景下讀寫操作是完全隔離的。因此,Event、EventLoop的功能垂直拆分到TcpClient和TcpServer,薄抽象層TcpRead、TcpWrite、TcpIoThread移除或許更為合理。
針對NetWork層說幾點(diǎn):
- NetWork應(yīng)該是一個(gè)接口,而非抽象類。
- NetWork不應(yīng)該被TcpIOThread、TcpRead、TcpWrite、TcpAcceptor、TcpClient反向依賴。
- NetWork中的OnReceiveMessage應(yīng)該被移除,TCP中收到的消息應(yīng)該提供單獨(dú)的接口抽象(MessageHandler)?;赑hxPaxos的實(shí)現(xiàn),由Node實(shí)現(xiàn)這個(gè)接口是合適的。
class MessageHandler
{
virtual int OnReceiveMessage(const char* pcMessage, const int iMessageLen) = 0;
}
再來看MsgTransport、Communicate、base這三個(gè)類。因?yàn)镹etWork只做純網(wǎng)絡(luò)的抽象,直接交由上層使用并不合適,因此基于業(yè)務(wù)抽象的MsgTransport接口確有必要。但這一層抽象并不徹底,進(jìn)而又在base中做了二次封裝。從設(shè)計(jì)上來看,有如下幾個(gè)缺陷:
- base是Learner、Proposer、Accepor的基類,應(yīng)該只包含角色共識邏輯,網(wǎng)絡(luò)消息處理不屬于其職責(zé)。
- base中包含CheckpointMsg的消息處理,但該消息是Checkpoint機(jī)制專有的處理方式,并不適合放到公共的base類中。
個(gè)人認(rèn)為合理的做法應(yīng)該是:
- base中所有網(wǎng)絡(luò)相關(guān)操作移除,網(wǎng)絡(luò)發(fā)送部分功能移至Communicate。Communicate做為Phxpaxos的業(yè)務(wù)網(wǎng)絡(luò)抽象層。
- 網(wǎng)絡(luò)數(shù)據(jù)打包、解包部分拆分抽單獨(dú)的PackageUtil類處理,該類僅被Communicate使用。
2.6 總結(jié)
Phxpaxos基于socket、poll、epoll構(gòu)建自己的網(wǎng)絡(luò)層,支持UDP、TCP兩種通信方式。網(wǎng)絡(luò)層一共啟動(dòng)了如下五個(gè)線程;
-
UDPRecv線程
處理來自其他節(jié)點(diǎn)的UDP數(shù)據(jù)。 -
UDPSend線程
處理本節(jié)點(diǎn)發(fā)往其他節(jié)點(diǎn)的UDP數(shù)據(jù)。 -
TcpAcceptor線程
處理來自其他節(jié)點(diǎn)的連接請求。 -
TcpRead線程
處理來自其他節(jié)點(diǎn)的Tcp數(shù)據(jù)。 -
TcpWrite線程
處理本節(jié)點(diǎn)發(fā)往其他節(jié)點(diǎn)的Tcp數(shù)據(jù)。
NetWork接口對外屏蔽了上述細(xì)節(jié),并且允許用戶替換為自己的網(wǎng)絡(luò)部件。而Phxpaxos使用的是更上層的MsgTransport,其包含了和Phxpaxos業(yè)務(wù)相關(guān)的一些操作,如打包、本節(jié)點(diǎn)特殊處理等。
在完成了Communicate之后,我們有了一個(gè)有效的網(wǎng)絡(luò)通信機(jī)制,下一步,讓我們真正開始了解PhxPaxos的核心 --- Paxos算法實(shí)現(xiàn)。
【轉(zhuǎn)載請注明】隨安居士. 2. PhxPaxos分析之網(wǎng)絡(luò)基礎(chǔ)部件. 2017.11.13