? ? ? ? ?ElasticJob 社區(qū)在經過之前幾年的停滯狀況之后并入到apache,并且作為 Apache ShardingSphere 的子項目繼續(xù)發(fā)光發(fā)熱,終于迎來了它的3.0時代,作為一個老碼農我也趕緊跟著時代的腳本,進行了一次深入學習和研究,這里記錄下學習筆記。
首先elasticjob底層還是基于quartz實現(xiàn)的,熟悉quartz的朋友都知道quartz有三個非常重要的概念:
1、scheduler是一個計劃調度器容器,容器里面可以有眾多的JobDetail和trigger,當容器啟動后,里面的每個JobDetail都會根據trigger按部就班自動去執(zhí)行。
2、JobDetail是一個可執(zhí)行的工作,它本身可能是有狀態(tài)的。
3、Trigger代表一個調度參數(shù)的配置,什么時候去調。
當JobDetail和Trigger在scheduler容器上注冊后,形成了裝配好的作業(yè)(JobDetail和Trigger所組成的一對兒),就可以伴隨容器啟動而調度執(zhí)行了。
scheduler是個容器,容器中有一個線程池,用來并行調度執(zhí)行每個作業(yè),這樣可以提高容器效率。
elasticjob3.0.0相較之前不同之處
1、通過閱讀官網發(fā)現(xiàn)elasticjob3.0.0新增了http類型的作業(yè),可以直接調用http請求,并在請求頭中將分片上下文以json串的形式傳遞,同時也增加了HttpJobExecutor來支撐HttpJob的執(zhí)行
我們看看官網上的一段代碼

文檔關于這部分的介紹就寥寥數(shù)筆,沒有寫得很細致。
不過通過自己寫個demo來體驗再加上結合之前用ElasitcJob的經驗仔細想想,個人總結了一下這樣做的好處:之前都是將job任務處理的核心邏輯集中在job服務本身,job不僅要負責調度還需要處理核心業(yè)務邏輯,這樣做之后job服務本身只需關注任務的調度和管理,核心邏輯可以分離到其他服務中去,有助于縷清微服務盛行的當下業(yè)務邊界問題同時減少job服務本身的壓力。
2、job監(jiān)聽器采用SPI的方式由ElasticJobServiceLoader來進行加載和管理



再來看官網上的一段代碼

上面就調用了ScheduleJobBootstrap類的schedule()方法,job的啟動就是從這里開始的,我們看看這個方法

繼續(xù)跟進去發(fā)現(xiàn)是調用了JobScheduleController的scheduleJob(String corn)方法

發(fā)現(xiàn)調用了scheduler的start方法,而scheduler是quartz的任務調度器,到這里就接上了quartz的任務調度過程,有沒有瞬間就懂了?
接下來我們關注下job的線程池,查看了下demo在執(zhí)行job的調用棧

關注一下SimpleThreadpool,QuartzSchedulerThread,JobRunShell,LiteJob這4個類
其中QuartzSchedulerThread是一個線程類,負責查詢并觸發(fā)Triggers,該線程類的主要工作分為以下幾個步驟:
1、等待QuartzScheduler啟動
2、查詢待觸發(fā)的Trigger
3、等待Trigger觸發(fā)時間到來
4、觸發(fā)Trigger
5、循環(huán)上述步驟
具體的源碼我將在單獨的文章中進行解析,這里不再展開。
在底部發(fā)現(xiàn)由quartz的SimpleThreadpool拉起的,而這個線程池是由StdSchedulerFactory完成初始化的

看下StdSchedulerFactory的instantiate()方法,SimpleThreadpool和QuartzSchedulerThread都是這個方法中初始化的



可以看到QuartzScheduler構造方法中初始化了QuartzSchedulerThread并在線程池中啟動
我們看下QuartzSchedulerThread的run方法

這里getThreadPool獲取的線程池就是前面所述的 SimpleThreadPool,調用runInThread方法傳入JobRunShell線程對象,并將JobRunShell賦值給SimpleThreadPool內部類定義的工作線程 ,進入runInThread方法之后啟動了SimpleThreadPool內部類定義的工作線程?

在WorkThread的run方法中調用了JobRunShell的run方法,

這里的job就是LiteJob

這里實際上調用的是ElasticJobExecutor的execute方法,進入到方法里面

1、檢查job執(zhí)行環(huán)境
2、job執(zhí)行前的處理
3、正常執(zhí)行job
4、如果發(fā)生misfire則執(zhí)行job
5、job執(zhí)行后的處理
這里2和5就是調用ElasticJobListener進行處理的,經過輪詢監(jiān)聽器然后調用對應的方法跟進execute方法最后發(fā)現(xiàn)調用的是JobItemExecutor.process方法 ,那前面我們知道有 SimpleJob、DataFlowJob、ScriptJob、HttpJob四種不同類型的作業(yè),這個方法是怎么處理這四種不同的作業(yè)的呢?然后JobItemExecutor也是elasitcJob提供的一個接口,也是采用SPI的方式進行擴展的 ,根據不同的job類型加載不同的作業(yè)處理器進行對應作業(yè)的處理,我們來看看它的類結構

有興趣的同學可以看看對應的job處理器是怎么處理作業(yè)的,這里就不再展開了。