AsyncDispatcher是yarn的事件異步調(diào)度器,異步事件調(diào)用應(yīng)用于yarn的很多組件中
下面舉出幾個具體事件調(diào)度實例:
應(yīng)用于resourceManager:
NodesListManagerEventType:節(jié)點管理事件
SchedulerEventType:程序調(diào)度事件具體包括:
NODE_ADDED:節(jié)點添加
NODE_REMOVED:節(jié)點移除
NODE_UPDATE:節(jié)點更新
NODE_RESOURCE_UPDATE:節(jié)點資源更新
APP_ADDED:應(yīng)用任務(wù)添加
APP_REMOVED:應(yīng)用任務(wù)移除
上面列出了resourceManager部分需要處理的事件,都是基于AsyncDispatcher完成。
應(yīng)用于NodeManager:
ContainerManagerEventType具體包括:
FINISH_APPS,
FINISH_CONTAINERS,
UPDATE_CONTAINERS,
SIGNAL_CONTAINERS
NodeManagerEventType具體包括事件如下:
SHUTDOWN, RESYNC
還有一個比較重要的服務(wù)叫ContainerManagerImpl,主要用于管理container資源,封裝的事件包括:
ContainerEventType具體包括:
INIT_CONTAINER,
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
ROLLBACK_REINIT,
PAUSE_CONTAINER,
RESUME_CONTAINER,
UPDATE_CONTAINER_TOKEN等
ContainersLauncherEventType具體包括:
LAUNCH_CONTAINER,
RELAUNCH_CONTAINER,
RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
PAUSE_CONTAINER,
RESUME_CONTAINER,
RECOVER_PAUSED_CONTAINER
上面只是列出的部分例子,我們可以看到,異步事件調(diào)度處理算是hadoop-yarn內(nèi)部流程運轉(zhuǎn)的核心。幾乎我們所有對yarn的操作,都會封裝成一個個事件,在yarn內(nèi)部流轉(zhuǎn)并通過AsyncDispatcher進行調(diào)度處理。我們可以看一下org.apache.hadoop.yarn.event.AbstractEvent的實現(xiàn)類,了解yarn內(nèi)部的事件種類。
下面我們來查看一下AsyncDispatcher的內(nèi)部實現(xiàn)邏輯:
首先看它的構(gòu)造函數(shù):AsyncDispatcher內(nèi)部默認使用阻塞隊列來保存事件,所有事件的處理都是圍繞著這個阻塞隊列進行。

事件入隊列:



通過上面的三張代碼截圖,我們可以得知AsyncDispatcher提供了一個內(nèi)部handler用于供給外界將事件保存到阻塞隊列中.具體調(diào)用可通過如下方式:
AsyncDispatcher dispatcher = new?AsyncDispatcher()
dispatcher.getEventHandler().handler(event);
事件處理:
AsyncDispatcher通過一個單獨的線程來循環(huán)處理內(nèi)部隊列中的事件。并且這個線程通過serviceStart()方法啟動。我們可以看到,當阻塞隊列中有事件時,線程會取出事件并通過dispatch方法進行調(diào)度。


事件調(diào)度:
AsyncDispatcher提供register方法用于注冊某種事件類型需要由某個handler進行處理,其中eventDispatchers是一個hashMap,內(nèi)部保存了事件type-->具體handler。當進行事件處理時,dispatch會從eventDispatchers中取得事件類型對用的handler,然后由handler進行具體的事件操作。


通常由AsyncDispatcher的調(diào)用者進行事件類型與handler的注冊, 比如在ContainerManagerImpl服務(wù)中,

以上就是AsyncDispatcher對于事件處理的簡要說明。
值得注意的是,當AsyncDispatcher服務(wù)停止時,會先將隊列內(nèi)的事件處理完,再停止服務(wù),就是通過下面那段代碼實現(xiàn),

waitForDrained為對象鎖,當阻塞隊列內(nèi)數(shù)據(jù)不為空時,servicestop將處于阻塞狀態(tài),當隊列內(nèi)事件都處理完后,drained字段為true,然后通過調(diào)用waitForDrainted.notify()使得serviceStop解除阻塞。
