Zookeeper-watcher機(jī)制源碼分析(一)

Watcher的基本流程

ZooKeeper 的 Watcher 機(jī)制,總的來(lái)說可以分為三個(gè)過程:客戶端注冊(cè) Watcher、服務(wù)器處理 Watcher 和客戶端回調(diào) Watcher

客戶端注冊(cè)watcher有3種方式,getData、exists、getChildren;以如下代碼為例來(lái)分析整個(gè)觸發(fā)機(jī)制的原理

|

ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

public void processor(WatchedEvent event){

System.out.println(“event.type”);

}

});

zookeeper.create(“/mic”,”0”.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //創(chuàng)建節(jié)點(diǎn)

zookeeper.exists(“/mic”,true); //注冊(cè)監(jiān)聽

zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改節(jié)點(diǎn)的值觸發(fā)監(jiān)聽

|

ZooKeeper API的初始化過程

|

ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

public void processor(WatchedEvent event){

System.out.println(“event.type”);

}

});

|

在創(chuàng)建一個(gè) ZooKeeper 客戶端對(duì)象實(shí)例時(shí),我們通過new Watcher()向構(gòu)造方法中傳入一個(gè)默認(rèn)的 Watcher, 這個(gè) Watcher 將作為整個(gè) ZooKeeper會(huì)話期間的默認(rèn) Watcher,會(huì)一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中;代碼如下

|

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,

boolean canBeReadOnly, HostProvider aHostProvider,

ZKClientConfig clientConfig) throws IOException {

LOG.info("Initiating client connection, connectString=" + connectString

  • " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

if (clientConfig == null) {

clientConfig = new ZKClientConfig();

}

this.clientConfig = clientConfig;

watchManager = defaultWatchManager();

watchManager.defaultWatcher = watcher; --在這里將watcher設(shè)置到ZKWatchManager

ConnectStringParser connectStringParser = new ConnectStringParser(

connectString);

hostProvider = aHostProvider;

--初始化了ClientCnxn,并且調(diào)用cnxn.start()方法

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

hostProvider, sessionTimeout, this, watchManager,

getClientCnxnSocket(), canBeReadOnly);

cnxn.start();

}

|

ClientCnxn:是Zookeeper客戶端和Zookeeper服務(wù)器端進(jìn)行通信和事件通知處理的主要類,它內(nèi)部包含兩個(gè)類,

1. SendThread :負(fù)責(zé)客戶端和服務(wù)器端的數(shù)據(jù)通信, 也包括事件信息的傳輸

2. EventThread : 主要在客戶端回調(diào)注冊(cè)的Watchers進(jìn)行通知處理

ClientCnxn初始化

|

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,

long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

this.zooKeeper = zooKeeper;

this.watcher = watcher;

this.sessionId = sessionId;

this.sessionPasswd = sessionPasswd;

this.sessionTimeout = sessionTimeout;

this.hostProvider = hostProvider;

this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();

readTimeout = sessionTimeout * 2 / 3;

readOnly = canBeReadOnly;

sendThread = new SendThread(clientCnxnSocket); --初始化sendThread

eventThread = new EventThread(); --初始化eventThread

this.clientConfig=zooKeeper.getClientConfig();

}

public void start() { --啟動(dòng)兩個(gè)線程

sendThread.start();

eventThread.start();

}

|

客戶端通過exists注冊(cè)監(jiān)聽

|

zookeeper.exists(“/mic”,true); //注冊(cè)監(jiān)聽

|

通過exists方法來(lái)注冊(cè)監(jiān)聽,代碼如下

|

public Stat exists(final String path, Watcher watcher)

throws KeeperException, InterruptedException

{

final String clientPath = path;

PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path

WatchRegistration wcb = null;

if (watcher != null) {

wcb = new ExistsWatchRegistration(watcher, clientPath); //構(gòu)建ExistWatchRegistration

}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();

h.setType(ZooDefs.OpCode.exists); //設(shè)置操作類型為exists

ExistsRequest request = new ExistsRequest(); // 構(gòu)造ExistsRequest

request.setPath(serverPath);

request.setWatch(watcher != null); //是否注冊(cè)監(jiān)聽

SetDataResponse response = new SetDataResponse(); //設(shè)置服務(wù)端響應(yīng)的接收類

//將封裝的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到發(fā)送隊(duì)列

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

if (r.getErr() != 0) {

if (r.getErr() == KeeperException.Code.NONODE.intValue()) {

return null;

}

throw KeeperException.create(KeeperException.Code.get(r.getErr()),

clientPath);

}

//返回exists得到的結(jié)果(Stat信息)

return response.getStat().getCzxid() == -1 ? null : response.getStat();

}

|

cnxn.submitRequest

|

public ReplyHeader submitRequest(RequestHeader h, Record request,

Record response, WatchRegistration watchRegistration,

WatchDeregistration watchDeregistration)

throws InterruptedException {

ReplyHeader r = new ReplyHeader();

//將消息添加到隊(duì)列,并構(gòu)造一個(gè)Packet傳輸對(duì)象

Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);

synchronized (packet) {

while (!packet.finished) { //在數(shù)據(jù)包沒有處理完成之前,一直阻塞

packet.wait();

}

}

return r;

}

|

|

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,

Record response, AsyncCallback cb, String clientPath,

String serverPath, Object ctx, WatchRegistration watchRegistration,

WatchDeregistration watchDeregistration) {

//將相關(guān)傳輸對(duì)象轉(zhuǎn)化成Packet

Packet packet = null;

packet = new Packet(h, r, request, response, watchRegistration);

packet.cb = cb;

packet.ctx = ctx;

packet.clientPath = clientPath;

packet.serverPath = serverPath;

packet.watchDeregistration = watchDeregistration;

synchronized (state) {

if (!state.isAlive() || closing) {

conLossPacket(packet);

} else {

if (h.getType() == OpCode.closeSession) {

closing = true;

}

outgoingQueue.add(packet); //添加到outgoingQueue

}

}

sendThread.getClientCnxnSocket().packetAdded();//此處是多路復(fù)用機(jī)制,喚醒Selector,告訴他有數(shù)據(jù)包添加過來(lái)了

return packet;

}

|

在 ZooKeeper 中,Packet 是一個(gè)最小的通信協(xié)議單元,即數(shù)據(jù)包。Pakcet 用于進(jìn)行客戶端與服務(wù)端之間的網(wǎng)絡(luò)傳輸,任何需要傳輸?shù)膶?duì)象都需要包裝成一個(gè) Packet 對(duì)象。在 ClientCnxn 中 WatchRegistration 也會(huì)被封裝到 Pakcet 中,然后由 SendThread 線程調(diào)用queuePacket方法把 Packet 放入發(fā)送隊(duì)列中等待客戶端發(fā)送,這又是一個(gè)異步過程,分布式系統(tǒng)采用異步通信是一個(gè)非常常見的手段

SendThread的發(fā)送過程

在初始化連接的時(shí)候,zookeeper初始化了兩個(gè)線程并且啟動(dòng)了。接下來(lái)我們來(lái)分析SendThread的發(fā)送過程,因?yàn)槭且粋€(gè)線程,所以啟動(dòng)的時(shí)候會(huì)調(diào)用SendThread.run方法

|

public void run() {

clientCnxnSocket.introduce(this, sessionId, outgoingQueue);

clientCnxnSocket.updateNow();

clientCnxnSocket.updateLastSendAndHeard();

int to;

long lastPingRwServer = Time.currentElapsedTime();

final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

while (state.isAlive()) {

try {

if (!clientCnxnSocket.isConnected()) {// 如果沒有連接:發(fā)起連接

// don't re-establish connection if we are closing

if (closing) {

break;

}

startConnect(); //發(fā)起連接

clientCnxnSocket.updateLastSendAndHeard();

}

if (state.isConnected()) { //如果是連接狀態(tài),則處理sasl的認(rèn)證授權(quán)

// determine whether we need to send an AuthFailed event.

if (zooKeeperSaslClient != null) {

boolean sendAuthEvent = false;

if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {

try {

zooKeeperSaslClient.initialize(ClientCnxn.this);

} catch (SaslException e) {

LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);

state = States.AUTH_FAILED;

sendAuthEvent = true;

}

}

KeeperState authState = zooKeeperSaslClient.getKeeperState();

if (authState != null) {

if (authState == KeeperState.AuthFailed) {

// An authentication error occurred during authentication with the Zookeeper Server.

state = States.AUTH_FAILED;

sendAuthEvent = true;

} else {

if (authState == KeeperState.SaslAuthenticated) {

sendAuthEvent = true;

}

}

}

if (sendAuthEvent == true) {

eventThread.queueEvent(new WatchedEvent(

Watcher.Event.EventType.None,

authState,null));

}

}

to = readTimeout - clientCnxnSocket.getIdleRecv();

} else {

to = connectTimeout - clientCnxnSocket.getIdleRecv();

}

//to,表示客戶端距離timeout還剩多少時(shí)間,準(zhǔn)備發(fā)起ping連接

if (to <= 0) {//表示已經(jīng)超時(shí)了。

String warnInfo;

warnInfo = "Client session timed out, have not heard from server in "

  • clientCnxnSocket.getIdleRecv()

  • "ms"

  • " for sessionid 0x"

  • Long.toHexString(sessionId);

LOG.warn(warnInfo);

throw new SessionTimeoutException(warnInfo);

}

if (state.isConnected()) {

//計(jì)算下一次ping請(qǐng)求的時(shí)間

int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -

((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL

if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {

sendPing(); //發(fā)送ping請(qǐng)求

clientCnxnSocket.updateLastSend();

} else {

if (timeToNextPing < to) {

to = timeToNextPing;

}

}

}

// If we are in read-only mode, seek for read/write server

if (state == States.CONNECTEDREADONLY) {

long now = Time.currentElapsedTime();

int idlePingRwServer = (int) (now - lastPingRwServer);

if (idlePingRwServer >= pingRwTimeout) {

lastPingRwServer = now;

idlePingRwServer = 0;

pingRwTimeout =

Math.min(2*pingRwTimeout, maxPingRwTimeout);

pingRwServer();

}

to = Math.min(to, pingRwTimeout - idlePingRwServer);

}

調(diào)用clientCnxnSocket,發(fā)起傳輸

其中 pendingQueue是一個(gè)用來(lái)存放已經(jīng)發(fā)送、等待回應(yīng)的Packet隊(duì)列,

clientCnxnSocket默認(rèn)使用ClientCnxnSocketNIO(ps:還記得在哪里初始化嗎?在實(shí)例化zookeeper的時(shí)候)

clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

} catch (Throwable e) {

if (closing) {

if (LOG.isDebugEnabled()) {

// closing so this is expected

LOG.debug("An exception was thrown while closing send thread for session 0x"

  • Long.toHexString(getSessionId())

  • " : " + e.getMessage());

}

break;

} else {

// this is ugly, you have a better way speak up

if (e instanceof SessionExpiredException) {

LOG.info(e.getMessage() + ", closing socket connection");

} else if (e instanceof SessionTimeoutException) {

LOG.info(e.getMessage() + RETRY_CONN_MSG);

} else if (e instanceof EndOfStreamException) {

LOG.info(e.getMessage() + RETRY_CONN_MSG);

} else if (e instanceof RWServerFoundException) {

LOG.info(e.getMessage());

} else {

LOG.warn(

"Session 0x"

  • Long.toHexString(getSessionId())

  • " for server "

  • clientCnxnSocket.getRemoteSocketAddress()

  • ", unexpected error"

  • RETRY_CONN_MSG, e);

}

// At this point, there might still be new packets appended to outgoingQueue.

// they will be handled in next connection or cleared up if closed.

cleanup();

if (state.isAlive()) {

eventThread.queueEvent(new WatchedEvent(

Event.EventType.None,

Event.KeeperState.Disconnected,

null));

}

clientCnxnSocket.updateNow();

clientCnxnSocket.updateLastSendAndHeard();

}

}

}

synchronized (state) {

// When it comes to this point, it guarantees that later queued

// packet to outgoingQueue will be notified of death.

cleanup();

}

clientCnxnSocket.close();

if (state.isAlive()) {

eventThread.queueEvent(new WatchedEvent(Event.EventType.None,

Event.KeeperState.Disconnected, null));

}

ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

"SendThread exited loop for session: 0x"

  • Long.toHexString(getSessionId()));

}

|

client 和 server的網(wǎng)絡(luò)交互

|

@Override

void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {

try {

if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {

return;

}

Packet head = null;

if (needSasl.get()) {

if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {

return;

}

} else {

//判斷outgoingQueue是否存在待發(fā)送的數(shù)據(jù)包,不存在則直接返回

if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {

return;

}

}

// check if being waken up on closing.

if (!sendThread.getZkState().isAlive()) {

// adding back the patck to notify of failure in conLossPacket().

addBack(head);

return;

}

// channel disconnection happened

if (disconnected.get()) { //異常流程,channel關(guān)閉了,講當(dāng)前的packet添加到addBack中

addBack(head);

throw new EndOfStreamException("channel for sessionid 0x"

  • Long.toHexString(sessionId)

  • " is lost");

}

if (head != null) { //如果當(dāng)前存在需要發(fā)送的數(shù)據(jù)包,則調(diào)用doWrite方法,pendingQueue表示處于已經(jīng)發(fā)送過等待響應(yīng)的packet隊(duì)列

doWrite(pendingQueue, head, cnxn);

}

} finally {

updateNow();

}

}

|

DoWrite方法

|

private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {

updateNow();

while (true) {

if (p != WakeupPacket.getInstance()) {

if ((p.requestHeader != null) && //判斷請(qǐng)求頭以及判斷當(dāng)前請(qǐng)求類型不是ping或者auth操作

(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&

(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {

p.requestHeader.setXid(cnxn.getXid()); //設(shè)置xid,這個(gè)xid用來(lái)區(qū)分請(qǐng)求類型

synchronized (pendingQueue) {

pendingQueue.add(p); //將當(dāng)前的packet添加到pendingQueue隊(duì)列中

}

}

sendPkt(p); //將數(shù)據(jù)包發(fā)送出去

}

if (outgoingQueue.isEmpty()) {

break;

}

p = outgoingQueue.remove();

}

}

|

sendPkt

|

private void sendPkt(Packet p) {

// Assuming the packet will be sent out successfully. Because if it fails,

// the channel will close and clean up queues.

p.createBB(); //序列化請(qǐng)求數(shù)據(jù)

updateLastSend(); //更新最后一次發(fā)送updateLastSend

sentCount++; //更新發(fā)送次數(shù)

channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //通過nio channel發(fā)送字節(jié)緩存到服務(wù)端

}

|

createBB

|

public void createBB() {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

boa.writeInt(-1, "len"); // We'll fill this in later

if (requestHeader != null) {

requestHeader.serialize(boa, "header"); //序列化header頭(requestHeader)

}

if (request instanceof ConnectRequest) {

request.serialize(boa, "connect");

// append "am-I-allowed-to-be-readonly" flag

boa.writeBool(readOnly, "readOnly");

} else if (request != null) {

request.serialize(boa, "request"); //序列化request(request)

}

baos.close();

this.bb = ByteBuffer.wrap(baos.toByteArray());

this.bb.putInt(this.bb.capacity() - 4);

this.bb.rewind();

} catch (IOException e) {

LOG.warn("Ignoring unexpected exception", e);

}

}

|

從createBB方法中,我們看到在底層實(shí)際的網(wǎng)絡(luò)傳輸序列化中,zookeeper只會(huì)講requestHeader和request兩個(gè)屬性進(jìn)行序列化,即只有這兩個(gè)會(huì)被序列化到底層字節(jié)數(shù)組中去進(jìn)行網(wǎng)絡(luò)傳輸,不會(huì)將watchRegistration相關(guān)的信息進(jìn)行網(wǎng)絡(luò)傳輸。

總結(jié)

用戶調(diào)用exists注冊(cè)監(jiān)聽以后,會(huì)做幾個(gè)事情

  1. 講請(qǐng)求數(shù)據(jù)封裝為packet,添加到outgoingQueue

  2. SendThread這個(gè)線程會(huì)執(zhí)行數(shù)據(jù)發(fā)送操作,主要是將outgoingQueue隊(duì)列中的數(shù)據(jù)發(fā)送到服務(wù)端

  3. 通過clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 其中ClientCnxnSocket只zookeeper客戶端和服務(wù)端的連接通信的封裝,有兩個(gè)具體的實(shí)現(xiàn)類ClientCnxnSocketNetty和ClientCnxnSocketNIO;具體使用哪一個(gè)類來(lái)實(shí)現(xiàn)發(fā)送,是在初始化過程是在實(shí)例化Zookeeper的時(shí)候設(shè)置的,代碼如下

|

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

hostProvider, sessionTimeout, this, watchManager,

getClientCnxnSocket(), canBeReadOnly);

|

private ClientCnxnSocket getClientCnxnSocket() throws IOException {

String clientCnxnSocketName = getClientConfig().getProperty(

ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);

if (clientCnxnSocketName == null) {

clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();

}

try {

Constructor<?> clientCxnConstructor =

Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);

ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());

return clientCxnSocket;

} catch (Exception e) {

IOException ioe = new IOException("Couldn't instantiate "

  • clientCnxnSocketName);

ioe.initCause(e);

throw ioe;

}

}

|

  1. 基于第3步,最終會(huì)在ClientCnxnSocketNetty方法中執(zhí)行sendPkt將請(qǐng)求的數(shù)據(jù)包發(fā)送到服務(wù)端

對(duì)Java技術(shù),架構(gòu)技術(shù)感興趣的同學(xué),歡迎加QQ群619881427,一起學(xué)習(xí),相互討論。

群內(nèi)已經(jīng)有小伙伴將知識(shí)體系整理好(源碼,筆記,PPT,學(xué)習(xí)視頻),歡迎加群免費(fèi)領(lǐng)取。

分享給喜歡Java,喜歡編程,有夢(mèng)想成為架構(gòu)師的程序員們,希望能夠幫助到你們。

不是Java程序員也沒關(guān)系,幫忙轉(zhuǎn)發(fā)給更多朋友!謝謝。

分享一個(gè)小技巧點(diǎn)擊閱讀原文也可以輕松獲取到學(xué)習(xí)資料哦?。?/p>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,715評(píng)論 19 139
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語(yǔ)法,類相關(guān)的語(yǔ)法,內(nèi)部類的語(yǔ)法,繼承相關(guān)的語(yǔ)法,異常的語(yǔ)法,線程的語(yǔ)...
    子非魚_t_閱讀 34,896評(píng)論 18 399
  • 《獨(dú)自叩問》的作者高全喜老師在后記中說,本書的評(píng)論文章基本上寫于二十年前。當(dāng)時(shí)他雖然未直接參與美術(shù)創(chuàng)作,但與美術(shù)界...
    大雨時(shí)行閱讀 1,176評(píng)論 0 3
  • 作者/佚名 世界上最遙遠(yuǎn)的距離, 不是生與死 而是我就站在你面前, 你卻不知道我愛你。。。 世界上最遙遠(yuǎn)的距離, ...
    相逢一笑307閱讀 399評(píng)論 0 0
  • 《弗洛伊德及其后繼者》讀后感之二 我是誰(shuí)? 這是一個(gè)只有身為人才會(huì)問的問題。因?yàn)橹挥腥瞬艜?huì)有自我的意識(shí)。 亞當(dāng)和夏...
    DrDan閱讀 1,541評(píng)論 0 2

友情鏈接更多精彩內(nèi)容