定時(shí)任務(wù)管理系統(tǒng)(Quartz和Spring的整合)開源和源碼簡述(四)

利用學(xué)習(xí)的時(shí)間這里寫了個(gè)Spring和Quartz結(jié)合的一個(gè)web項(xiàng)目,純后端的項(xiàng)目,restful接口
實(shí)現(xiàn)對定時(shí)任務(wù)的增、刪、改、查、停止, 啟動(dòng)、定時(shí)規(guī)則修改、立即執(zhí)行等。github地址:holly-quartz-web,這里剛開始是為了學(xué)習(xí)源碼,后來有了一些改動(dòng),再后來就想做一些業(yè)務(wù)上的改造,所以clone了一個(gè)quartz-core的項(xiàng)目進(jìn)行改造,后期打算對其集群方式進(jìn)行改造等等。github地址:quartz-core,有一起感興趣的朋友可以一起改造,目前的項(xiàng)目比較簡單可以作為學(xué)習(xí)入門的項(xiàng)目,也可以作為搭建job管理系統(tǒng)的初期項(xiàng)目,慢慢迭代。

三的時(shí)候講到了QuartzSchedulerThread這個(gè)類,QuartzSchedulerThread是主處理線程,在三的時(shí)候我們發(fā)現(xiàn)創(chuàng)建Scheduler的時(shí)候已經(jīng)啟動(dòng)了該線程。它作為一個(gè)非守護(hù)線程運(yùn)行在正常優(yōu)先級(jí)下。
看一下該類的run方法

   public void run() {
        boolean lastAcquireFailed = false;
        //
        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }
                    if (halted.get()) {
                        break;
                    }
                }
                /獲取當(dāng)前線程池中線程的數(shù)量
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
                    List<OperableTrigger> triggers = null;
                    long now = System.currentTimeMillis();
                    clearSignaledSchedulingChange();
                    try {
                        //調(diào)度器在trigger隊(duì)列中尋找30秒內(nèi)一定數(shù)目的trigger準(zhǔn)備執(zhí)行調(diào)度,
                        //參數(shù)1:nolaterthan = now+3000ms,參數(shù)2 最大獲取數(shù)量,大小取線程池線程剩余量與定義值得較小者
                        //參數(shù)3 時(shí)間窗口 默認(rèn)為0,程序會(huì)在nolaterthan后加上窗口大小來選擇trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        //上一步獲取成功將失敗標(biāo)志置為false;
                        lastAcquireFailed = false;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if(!lastAcquireFailed) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        //捕捉到異常則值標(biāo)志為true,再次嘗試獲取
                        lastAcquireFailed = true;
                        continue;
                    } catch (RuntimeException e) {
                        if(!lastAcquireFailed) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        lastAcquireFailed = true;
                        continue;
                    }
                    if (triggers != null && !triggers.isEmpty()) {
                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;//計(jì)算距離trigger觸發(fā)的時(shí)間
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                //如果這時(shí)調(diào)度器發(fā)生了改變,新的trigger添加進(jìn)來,那么有可能新添加的trigger比當(dāng)前待執(zhí)行的trigger
                                //更急迫,那么需要放棄當(dāng)前trigger重新獲取,然而,這里存在一個(gè)值不值得的問題,如果重新獲取新trigger
                                //的時(shí)間要長于當(dāng)前時(shí)間到新trigger出發(fā)的時(shí)間,那么即使放棄當(dāng)前的trigger,仍然會(huì)導(dǎo)致xntrigger獲取失敗,
                                //但我們又不知道獲取新的trigger需要多長時(shí)間,于是,我們做了一個(gè)主觀的評(píng)判,若jobstore為RAM,那么
                                //假定獲取時(shí)間需要7ms,若jobstore是持久化的,假定其需要70ms,當(dāng)前時(shí)間與新trigger的觸發(fā)時(shí)間之差小于
                                // 這個(gè)值的我們認(rèn)為不值得重新獲取,返回false
                                //這里判斷是否有上述情況發(fā)生,值不值得放棄本次trigger,若判定不放棄,則線程直接等待至trigger觸發(fā)的時(shí)刻
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            //該方法調(diào)用了上面的判定方法,作為再次判定的邏輯
                            //到達(dá)這里有兩種情況1.決定放棄當(dāng)前trigger,那么再判定一次,如果仍然有放棄,那么清空triggers列表并
                            // 退出循環(huán) 2.不放棄當(dāng)前trigger,且線程已經(jīng)wait到trigger觸發(fā)的時(shí)刻,那么什么也不做
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                            //這時(shí)觸發(fā)器已經(jīng)即將觸發(fā),值會(huì)<2
                        }
                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;
                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                //觸發(fā)triggers,結(jié)果付給bndles,注意,從這里返回后,trigger在數(shù)據(jù)庫中已經(jīng)經(jīng)過了鎖定,解除鎖定,這一套過程
                                //所以說,quratz定不是等到j(luò)ob執(zhí)行完才釋放trigger資源的占有,而是讀取完本次觸發(fā)所需的信息后立即釋放資源
                                //然后再執(zhí)行jobs
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }
                        }
                        //迭代trigger的信息,分別跑job
                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();
                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            //在特殊情況下,bndle可能為null,看triggerFired方法可以看到,當(dāng)從數(shù)據(jù)庫獲取trigger時(shí),如果status不是
                            //STATE_ACQUIRED,那么會(huì)直接返回空.quratz這種情況下本調(diào)度器啟動(dòng)重試流程,重新獲取4次,若仍有問題,
                            // 則拋出異常.
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }
                            //執(zhí)行job
                            JobRunShell shell = null;
                            try {
                                //創(chuàng)建一個(gè)job的Runshell
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            //把runShell放在線程池里跑
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }
                        }
                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }
                //保證負(fù)載平衡的方法,每次執(zhí)行一輪觸發(fā)后,本scheduler會(huì)等待一個(gè)隨機(jī)的時(shí)間,這樣就使得其他節(jié)點(diǎn)上的scheduler可以得到資源.
                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }
            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)
        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

QuartzSchedulerThread 的主處理輪循步驟:

  1. 檢查是否為scheuler是否為停止?fàn)顟B(tài)
  2. 檢查是否為暫停狀態(tài),暫停的話會(huì)嘗試去獲得信號(hào)鎖,并wait一會(huì),這里算是樂觀鎖方式吧,直到獲得鎖并重新運(yùn)行。
  3. 從線程池獲取可用的線程(will always be true, due to semantics of blockForAvailableThreads...)
  4. 獲取需要下次執(zhí)行的triggers
  5. 根據(jù)一些條件過濾下triggers
  6. 根據(jù)triggers獲取或者創(chuàng)建job并構(gòu)建JobRunShell
  7. 利用線程池里面的線程去執(zhí)行job(qsRsrcs.getThreadPool().runInThread(shell))
  8. 數(shù)據(jù)庫release正在執(zhí)行的job
  9. 去檢查job是否在執(zhí)行的時(shí)候變更了(如果變?yōu)楝F(xiàn)在需要立即執(zhí)行那就不在做等待了 QTZ-336 A job might have been completed in the mean time and we might have missed the scheduled changed signal by not waiting for the notify() yet Check that before waiting for too long in case this very job needs to be scheduled very soon)
  10. while (!halted)
    基本的大致操作就是這樣,在細(xì)節(jié)中涉及到分布式的實(shí)現(xiàn)。我們現(xiàn)在來看線quzrtz的分布式實(shí)現(xiàn)方式
  1. 總體來說是借助數(shù)據(jù)庫


    摘自網(wǎng)絡(luò),但愿原作者莫怪
  2. 借助的語句是 select for update 操作

  3. 表示QUARTZ_LOCKS表。每個(gè)sheduler會(huì)有兩行數(shù)據(jù)

數(shù)據(jù)庫表記錄

多個(gè)Quartz服務(wù)器去獲取trigger節(jié)點(diǎn)的時(shí)候 會(huì)鎖住TRIGGER_ACCESS這行記錄,別的節(jié)點(diǎn)獲得不到后就會(huì)發(fā)現(xiàn)已經(jīng)有別的服務(wù)器節(jié)點(diǎn)處理了這個(gè)Scheduler 所以不需要自己執(zhí)行了。
另一行記錄是 在執(zhí)行Trigger對應(yīng)的Job的時(shí)候 狀態(tài)變更的一個(gè)鎖。因?yàn)榧偃缒骋淮螆?zhí)行的時(shí)間過長,下一次執(zhí)行過來的時(shí)候上一次還沒執(zhí)行完,那么久需要等待在這里。 這兩個(gè)條記錄就實(shí)現(xiàn)了分布式。多么美好的事情。哈哈哈
在網(wǎng)上別人博客找到的組件圖和時(shí)序圖 我覺得相當(dāng)細(xì)致和完整貼過來。

組件圖原作者莫怪
時(shí)序圖原作者莫怪
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容