Quartz——分布式任務調(diào)度 任務獲取

簡介

熟悉Java編程的童鞋對于Quartz應該是比較屬性的了。一個開源的任務調(diào)度框架。當你的系統(tǒng)中需要定時、定期來執(zhí)行一次或多次任務的時候,那它一定是不二之選。

目前市面上也要好多開源的任務調(diào)度平臺,比如TBSchedule,Elastic-Job,XXL-JOB,大多的任務調(diào)度平臺都是以Quartz為基礎進行的二次封裝或者重新??梢奞uartz在調(diào)度框架中的地位還是比較高的。

本文主要從代碼層面介紹Quartz執(zhí)行任務調(diào)度時,任務的獲取、執(zhí)行、重試的流程。

案例介紹

場景:某個系統(tǒng)在用戶提交請求的時候,需要后臺異步處理數(shù)據(jù)。

搭建一個簡單的Quartz + spring boot + mysql 的系統(tǒng)
  1. pom.xml 引入依賴
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

  1. 參數(shù)文件
#數(shù)據(jù)庫參數(shù)配置 application.properties
spring.datasource.primary.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.primary.url=jdbc:mysql://localhost:3306/tf?useUnicode=true&characterEncoding=utf-8
spring.datasource.primary.username=***
spring.datasource.primary.password=****
spring.datasource.primary.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.primary.initialSize=30
spring.datasource.primary.minIdle=10
spring.datasource.primary.maxActive=50
spring.datasource.primary.maxWait=60000
spring.datasource.primary.timeBetweenEvictionRunsMillis=60000
spring.datasource.primary.minEvictableIdleTimeMillis=300000
spring.datasource.primary.validationQuery=SELECT 1 
spring.datasource.primary.testWhileIdle=true
spring.datasource.primary.testOnBorrow=true
spring.datasource.primary.testOnReturn=true

### 配置Quartz參數(shù)  quartz.properties
org.quartz.scheduler.instanceName=TfScheduler
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=20
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
org.quartz.jobStore.misfireThreshold=50000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.clusterCheckinInterval=15000

  1. 配置Bean
配置數(shù)據(jù)
@Configuration
public class DuridDatasource {
    @Primary
    @Bean("dataSource")
    @ConfigurationProperties("spring.datasource.primary")
    public DataSource dataSourceOne() {
        return DruidDataSourceBuilder.create().build();
    }
}

配置Quartz
@Configuration
public class QuartzConfig {
    @Autowired
    private JobFactory jobFactory;
    @Autowired
    @Qualifier("dataSource")
    private DataSource primaryDataSource;

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        //獲取配置屬性
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        //在quartz.properties中的屬性被讀取并注入后再初始化對象
        propertiesFactoryBean.afterPropertiesSet();
        //創(chuàng)建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(propertiesFactoryBean.getObject());
        //使用數(shù)據(jù)源,自定義數(shù)據(jù)源
        factory.setDataSource(primaryDataSource);
        factory.setJobFactory(jobFactory);
        factory.setWaitForJobsToCompleteOnShutdown(true);//這樣當spring關(guān)閉時,會等待所有已經(jīng)啟動的quartz job結(jié)束后spring才能完全shutdown。
        factory.setOverwriteExistingJobs(false);
        factory.setStartupDelay(10);
        return factory;
    }
}

配置JOB能夠注入Spring Bean
@Component
public class QuartzJobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //調(diào)用父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        //進行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}
  1. 編寫一個簡單的JOB處理數(shù)據(jù)
public class TfCommandJob implements Job {

    private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("start do tf command");
        try {
            TimeUnit.SECONDS.sleep(30);
        } catch (Exception e) {
            log.error("error in sleep", e);
        }
        log.info("end");
    }
}
  1. 編寫觸發(fā)入口
@RestController
@RequestMapping("/test")
public class TestController {

    private static final String NAME = "tf_command_%s";
    private static final String GROUP = "tf_command_group";
    private static final String TRIGGER_NAME = "tf_command_trigger_%s";
    private static final String TRIGGER_GROUP = "tf_command_trigger_group";

    @Resource
    private Scheduler quartzScheduler;

    private static final Logger logger = LoggerFactory.getLogger(TestController.class);

    @RequestMapping("/startJob")
    public JsonResponse startJob() {
        JobDataMap jobDataMap = new JobDataMap();

        Long time = new Date().getTime();
        JobDetail job = JobBuilder
                .newJob(TfCommandJob.class)
                .withIdentity(String.format(NAME, time), GROUP)
                .setJobData(jobDataMap)
                .requestRecovery()
                .storeDurably()
                .build();

        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(String.format(TRIGGER_NAME, time), TRIGGER_GROUP)
                .forJob(job)
                .startNow().build();
        try {
            quartzScheduler.scheduleJob(job, trigger);
        } catch (Exception e) {
            return JsonResponse.fail(ResultConst.ERROR_CODE, ResultConst.ERROR_MSG);
        }
        return JsonResponse.succ(null);
    }
}

  1. 最后,在執(zhí)行案例程序之前需要創(chuàng)建Quartz用到的數(shù)據(jù)庫表。創(chuàng)建表的SQL文件在quartz源碼包的 org.quartz.impl.jdbcjobstore 目錄下,根據(jù)不同的數(shù)據(jù)庫選擇不同的 tables*.sql ,本案例使用的是mysql,所以我選擇的 tables_mysql.sql 文件執(zhí)行。

案例分析

配置相關(guān)的不具體說了。
在該案例中,我們編寫了一個需要異步執(zhí)行一個JOB:TfCommandJob.class
為了讓這個JOB可以異步執(zhí)行,并且 :

  1. 可以在任務失敗的時候被重試
  2. 在實例被停止,重啟的時候JOB可以被重新執(zhí)行(不會被丟失)
  3. 多實例運行的時候任務不會被重復執(zhí)行
  4. 任務數(shù)量過多的時候,可以控制并發(fā),控制執(zhí)行數(shù)量,避免JVM OOM

所以用到了Quartz任務調(diào)度框架。
配置好了Quartz,編寫好了JOB之后。需要把JOB注冊給調(diào)度器,在TestController 中,創(chuàng)建 JobDetail (代表一個具體的可執(zhí)行的調(diào)度程序), 在創(chuàng)建一個 Trigger,講兩者交給調(diào)度器。
這樣調(diào)度器就會根據(jù)Trigger指定的時間規(guī)則執(zhí)行JOB。
通過PostMan 請求 localhost:8080/test/startJob ,后臺顯示

執(zhí)行日志

組件介紹

在這里主要提幾個概念:

  • 任務(Job): 定義任務執(zhí)行的任務,需要實現(xiàn)Job接口,重寫execute方法。在案例中是 TfCommandJob ,他表示的是一個實際需要執(zhí)行的任務。在實際執(zhí)行的過程中,我們可以通過JobBuilder進行實例化為一個JobDetail對象,然后配合合適的Trigger,交給Scheduler進行調(diào)度。
        JobDetail job = JobBuilder
              //指定JOB的class
                .newJob(TfCommandJob.class)
               //指定Name唯一識別
                .withIdentity(String.format(NAME, time), GROUP)
                //指定傳入到JOB執(zhí)行時時候使用到的參數(shù)數(shù)據(jù)
                .setJobData(jobDataMap)
                //當Quartz服務被中止后,再次啟動任務時會嘗試恢復執(zhí)行之前未完成的所有任務
                .requestRecovery() 
                //標識job是持久的,刪除所有觸發(fā)器的時候不被刪除
                .storeDurably()
                .build();
  • 觸發(fā)器(Trigger):為任務設定出發(fā)條件,比如馬上,即刻,每天早上8點,每月1號12點……。在案例中是
Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(String.format(TRIGGER_NAME, time), TRIGGER_GROUP)
                .forJob(job)
                .startNow().build();

該trigger是指馬上觸發(fā)。它還有很多類的Trigger,比如

  • 調(diào)度器(Scheduler):將任務與觸發(fā)器相結(jié)合,注冊到調(diào)度器中,待執(zhí)行。這個是在配置文件 schedulerFactoryBean 定義了Scheduler構(gòu)造工廠,并且Controller中注入使用
quartzScheduler.scheduleJob(job, trigger);

調(diào)度流程分析

分析其調(diào)度過程之前,有幾個問題:

  1. 調(diào)度如何控制并發(fā)
  2. 如何做到多實例情況下不被重復執(zhí)行
  3. 如何保證實例停止的時候任務能被別的實例接管

就算大家沒有這幾個問題我也要提出并且講清楚這幾個問題 ^ _ ^

調(diào)度框架的啟動

我們先來分析調(diào)度框架的啟動流程。

在案例中的 QuartzConfig 文件,初始化了 SchedulerFactoryBean ,通過這個Factory創(chuàng)建了 QuartzScheduler , QuartzScheduler 這個類是Quartz的 核心 ,間接繼承了Scheduler接口,包含了調(diào)度Job、注冊JobListener實例的方法, 通過這個 Scheduler 創(chuàng)建了實際執(zhí)行調(diào)度的調(diào)度線程: QuartzSchedulerThread
QuartzSchedulerResources 相當于一個配置信息保存的空間,它包含創(chuàng)建QuartzScheduler實例所需的所有資源(JobStore,ThreadPool等),當然也擁有一個執(zhí)行QuartzSchedulerThread的執(zhí)行器。

QuartzScheduler
  1. StdSchedulerFactory.instantiate():生成StdScheduler過程中會new一個QuartzScheduler實例

  2. 生成StdScheduler過程中會new一個QuartzScheduler實例

  3. 在QuartzScheduler的構(gòu)造器方法里面可以看到創(chuàng)建QuartzSchedulerThread的代碼邏輯,并通過QuartzSchedulerResources對象獲取ThreadExecutor對象,最后execute新建的QuartzSchedulerThread。

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

調(diào)度框架啟動完成之后

所有任務調(diào)度的工作都交給了 QuartzSchedulerThread 來完成:

QuartzSchedulerThread

  1. 流程圖中,QuartzSchedulerThread整個的運行方法都包含在while語句中,在執(zhí)行JOB之前會判斷 halted,paused的狀態(tài)

  2. 然后通過 QuartzSchedulerResources 這個資源存儲實例獲取線程池,查看是否有可用的線程。如果有再往下進行。再次也回答了之前提到的幾個問題中的第四個問題 任務數(shù)量過多的時候,可以控制并發(fā),控制執(zhí)行數(shù)量,避免JVM OOM

  3. 再有可用的線程之后再調(diào)用QuartzSchedulerResources 獲取JobStore. 這個的JobStore是負責跟蹤所有scheduler的“工作數(shù)據(jù)”,包括Jobs,Triggers,比較常用的有:

    • RAMJobStore :RAMJobStore 正如它名字描述的一樣,它保存數(shù)據(jù)在內(nèi)存中的,所以只要服務重啟,任務就會丟失,而且它無法做到分布式任務調(diào)度,只能做到單機任務調(diào)度。
    • JDBCJobStore : 與上面的不同,它以JDBC的方式保存數(shù)據(jù)在數(shù)據(jù)庫中,它比RAMJobStore的配置復雜一點,當然,肯定也沒有RAMJobStore快。其廣泛用于 Oracle, PostgreSQL, MySQL, MS SQLServer, HSQLDB, 和DB2。使用JDBCJobStore之前你必須首先創(chuàng)建一系列Quartz要使用的表。這個表在上文的案例中有提到。由于其數(shù)據(jù)都是存儲在數(shù)據(jù)庫中,所以實例在被停止重啟之后,JOB不會丟失 這也就回答了第二個問題

JobStore

在執(zhí)行的過程中有這么幾個方法調(diào)用

  1. 獲取triggers
    qsRsrcs.getJobStore().acquireNextTriggers
  2. 觸發(fā)執(zhí)行
    qsRsrcs.getJobStore().triggersFired
  3. 釋放trigger
    qsRsrcs.getJobStore().releaseAcquiredTrigger
  4. 完成觸發(fā)任務后的調(diào)用
    qsRsrcs.getJobStore().triggeredJobComplete

這幾個方法都是由JobStore來調(diào)用的。而實現(xiàn)分布式任務調(diào)度需要使用到的JobStore實現(xiàn)類為數(shù)據(jù)庫類型

so .

Quartz核心表介紹

先來介紹一下Quartz的各個表。暫時先了解有這么幾個表。

Table Name Description
QRTZ_CALENDARS 存儲Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存儲CronTrigger,包括Cron表達式和時區(qū)信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的Trigger組的信息
QRTZ_SCHEDULER_STATE 存儲少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實例
QRTZ_LOCKS 存儲程序的悲觀鎖的信息
QRTZ_JOB_DETAILS 存儲每一個已配置的Job的詳細信息
QRTZ_JOB_LISTENERS 存儲有關(guān)已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存儲簡單的Trigger,包括重復次數(shù)、間隔、以及已觸的次數(shù)
QRTZ_BLOG_TRIGGERS Trigger作為Blob類型存儲
QRTZ_TRIGGER_LISTENERS 存儲已配置的TriggerListener的信息
QRTZ_TRIGGERS 存儲已配置的Trigger的信息
  1. acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
    查詢、獲取一段時間內(nèi)將要被調(diào)度的triggers.我們走進源碼看一下他的實現(xiàn):
acquireNextTriggers

可以看到他又調(diào)用了另一個獲取方法acquireNextTrigger


在這個方法中我們可以看到實際的執(zhí)行方法有:
getDelegate().selectTriggerToAcquire
直接執(zhí)行語句:

SELECT
    TRIGGER_NAME,
    TRIGGER_GROUP,
    NEXT_FIRE_TIME,
    PRIORITY 
FROM
    { 0 } TRIGGERS 
WHERE
    SCHED_NAME = { 1 } 
    AND TRIGGER_STATE = ? 
    AND NEXT_FIRE_TIME <= ? AND ( MISFIRE_INSTR = - 1 OR ( MISFIRE_INSTR != - 1 AND NEXT_FIRE_TIME >= ?)) 
ORDER BY
    NEXT_FIRE_TIME ASC,
    PRIORITY DESC

getDelegate().updateTriggerStateFromOtherState

UPDATE { 0 } TRIGGERS 
SET TRIGGER_STATE = ? 
WHERE
    SCHED_NAME = { 1 } 
    AND TRIGGER_NAME = ? 
    AND TRIGGER_GROUP = ? 
    AND TRIGGER_STATE = ?

getDelegate().insertFiredTrigger

INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) 
VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

第一個SQL語句,目的就是為了獲取當前所有可執(zhí)行的Trigger的記錄。
第二個SQL語句,目的是為了實現(xiàn)任務鎖定,采用的是CAS模式

// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
    continue; // next trigger
}

從把Triggers里面的記錄,從一個狀態(tài)更新為另一個狀態(tài),如果更新成功,表示該任務被本實例獲取了。如果更新失敗,則表示另外一個實例搶先更改了狀態(tài)。

第三個SQL語句,在本實例已經(jīng)獲取到了Trigger的情況下,往FIRED_TRIGGERS插入一條記錄,表示當前的任務處于執(zhí)行中。

  1. executeInNonManagedTXLock

如果同一時間執(zhí)行Trigger的數(shù)量大于1,則屬性 org.quartz.jobStore.acquireTriggersWithinLock 應設置為true,開啟分布式悲觀鎖,可以避破壞數(shù)據(jù)。

executeInNonManagedTXLock方法源碼如下:

分布式鎖

總體的步驟為: 獲取鎖、執(zhí)行callback、提交事務、最后釋放鎖

getLockHandler().obtainLock 表示獲取鎖
txCallback.execute(conn) 表示執(zhí)行邏輯
commitConnection(conn) 表示提交事務
releaseLock(lockName, transOwner) 表示釋放鎖

獲取鎖邏輯

getLockHandler()方法會獲得一個信號量,如果是單機環(huán)境使用的是SimpleSemaphore,如果集群環(huán)境使用的是DBSemaphore。在集群環(huán)境下obtainLock()方法如下:

image.png

obtainLock首先判斷是否已經(jīng)獲取到鎖,如果沒有執(zhí)行方法executeSQL,其中有兩條重要的SQL,分別是:expandedSQL和expandedInsertSQL,以SCHED_NAME = ‘TfScheduler’為例:

expandedSQL:
SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = 'TfScheduler' AND LOCK_NAME = ? FOR UPDATE
expandedInsertSQL:
INSERT INTO QRTZ_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('TfScheduler', ?)

select語句后面添加了FOR UPDATE是使用了MySQL的悲觀鎖,如果LOCK_NAME存在,當多個節(jié)點去執(zhí)行此SQL時,只有第一個節(jié)點會成功,其他的節(jié)點都將進入等待;

如果LOCK_NAME不存在,多個節(jié)點同時執(zhí)行expandedInsertSQL,只會有一個節(jié)點插入成功,執(zhí)行插入失敗的節(jié)點將進入重試,重新執(zhí)行expandedSQL;

參考:

http://m.itdecent.cn/p/38e5e0953e56

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容