簡介
熟悉Java編程的童鞋對于Quartz應該是比較屬性的了。一個開源的任務調(diào)度框架。當你的系統(tǒng)中需要定時、定期來執(zhí)行一次或多次任務的時候,那它一定是不二之選。
目前市面上也要好多開源的任務調(diào)度平臺,比如TBSchedule,Elastic-Job,XXL-JOB,大多的任務調(diào)度平臺都是以Quartz為基礎進行的二次封裝或者重新??梢奞uartz在調(diào)度框架中的地位還是比較高的。
本文主要從代碼層面介紹Quartz執(zhí)行任務調(diào)度時,任務的獲取、執(zhí)行、重試的流程。
案例介紹
場景:某個系統(tǒng)在用戶提交請求的時候,需要后臺異步處理數(shù)據(jù)。
搭建一個簡單的Quartz + spring boot + mysql 的系統(tǒng)
- pom.xml 引入依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
- 參數(shù)文件
#數(shù)據(jù)庫參數(shù)配置 application.properties
spring.datasource.primary.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.primary.url=jdbc:mysql://localhost:3306/tf?useUnicode=true&characterEncoding=utf-8
spring.datasource.primary.username=***
spring.datasource.primary.password=****
spring.datasource.primary.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.primary.initialSize=30
spring.datasource.primary.minIdle=10
spring.datasource.primary.maxActive=50
spring.datasource.primary.maxWait=60000
spring.datasource.primary.timeBetweenEvictionRunsMillis=60000
spring.datasource.primary.minEvictableIdleTimeMillis=300000
spring.datasource.primary.validationQuery=SELECT 1
spring.datasource.primary.testWhileIdle=true
spring.datasource.primary.testOnBorrow=true
spring.datasource.primary.testOnReturn=true
### 配置Quartz參數(shù) quartz.properties
org.quartz.scheduler.instanceName=TfScheduler
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=20
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
org.quartz.jobStore.misfireThreshold=50000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.clusterCheckinInterval=15000
- 配置Bean
配置數(shù)據(jù)
@Configuration
public class DuridDatasource {
@Primary
@Bean("dataSource")
@ConfigurationProperties("spring.datasource.primary")
public DataSource dataSourceOne() {
return DruidDataSourceBuilder.create().build();
}
}
配置Quartz
@Configuration
public class QuartzConfig {
@Autowired
private JobFactory jobFactory;
@Autowired
@Qualifier("dataSource")
private DataSource primaryDataSource;
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
//獲取配置屬性
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
//在quartz.properties中的屬性被讀取并注入后再初始化對象
propertiesFactoryBean.afterPropertiesSet();
//創(chuàng)建SchedulerFactoryBean
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(propertiesFactoryBean.getObject());
//使用數(shù)據(jù)源,自定義數(shù)據(jù)源
factory.setDataSource(primaryDataSource);
factory.setJobFactory(jobFactory);
factory.setWaitForJobsToCompleteOnShutdown(true);//這樣當spring關(guān)閉時,會等待所有已經(jīng)啟動的quartz job結(jié)束后spring才能完全shutdown。
factory.setOverwriteExistingJobs(false);
factory.setStartupDelay(10);
return factory;
}
}
配置JOB能夠注入Spring Bean
@Component
public class QuartzJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//調(diào)用父類的方法
Object jobInstance = super.createJobInstance(bundle);
//進行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
- 編寫一個簡單的JOB處理數(shù)據(jù)
public class TfCommandJob implements Job {
private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("start do tf command");
try {
TimeUnit.SECONDS.sleep(30);
} catch (Exception e) {
log.error("error in sleep", e);
}
log.info("end");
}
}
- 編寫觸發(fā)入口
@RestController
@RequestMapping("/test")
public class TestController {
private static final String NAME = "tf_command_%s";
private static final String GROUP = "tf_command_group";
private static final String TRIGGER_NAME = "tf_command_trigger_%s";
private static final String TRIGGER_GROUP = "tf_command_trigger_group";
@Resource
private Scheduler quartzScheduler;
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@RequestMapping("/startJob")
public JsonResponse startJob() {
JobDataMap jobDataMap = new JobDataMap();
Long time = new Date().getTime();
JobDetail job = JobBuilder
.newJob(TfCommandJob.class)
.withIdentity(String.format(NAME, time), GROUP)
.setJobData(jobDataMap)
.requestRecovery()
.storeDurably()
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(String.format(TRIGGER_NAME, time), TRIGGER_GROUP)
.forJob(job)
.startNow().build();
try {
quartzScheduler.scheduleJob(job, trigger);
} catch (Exception e) {
return JsonResponse.fail(ResultConst.ERROR_CODE, ResultConst.ERROR_MSG);
}
return JsonResponse.succ(null);
}
}
- 最后,在執(zhí)行案例程序之前需要創(chuàng)建Quartz用到的數(shù)據(jù)庫表。創(chuàng)建表的SQL文件在quartz源碼包的 org.quartz.impl.jdbcjobstore 目錄下,根據(jù)不同的數(shù)據(jù)庫選擇不同的 tables*.sql ,本案例使用的是mysql,所以我選擇的 tables_mysql.sql 文件執(zhí)行。
案例分析
配置相關(guān)的不具體說了。
在該案例中,我們編寫了一個需要異步執(zhí)行一個JOB:TfCommandJob.class
為了讓這個JOB可以異步執(zhí)行,并且 :
- 可以在任務失敗的時候被重試
- 在實例被停止,重啟的時候JOB可以被重新執(zhí)行(不會被丟失)
- 多實例運行的時候任務不會被重復執(zhí)行
- 任務數(shù)量過多的時候,可以控制并發(fā),控制執(zhí)行數(shù)量,避免JVM OOM
所以用到了Quartz任務調(diào)度框架。
配置好了Quartz,編寫好了JOB之后。需要把JOB注冊給調(diào)度器,在TestController 中,創(chuàng)建 JobDetail (代表一個具體的可執(zhí)行的調(diào)度程序), 在創(chuàng)建一個 Trigger,講兩者交給調(diào)度器。
這樣調(diào)度器就會根據(jù)Trigger指定的時間規(guī)則執(zhí)行JOB。
通過PostMan 請求 localhost:8080/test/startJob ,后臺顯示

組件介紹
在這里主要提幾個概念:
- 任務(Job): 定義任務執(zhí)行的任務,需要實現(xiàn)Job接口,重寫execute方法。在案例中是 TfCommandJob ,他表示的是一個實際需要執(zhí)行的任務。在實際執(zhí)行的過程中,我們可以通過JobBuilder進行實例化為一個JobDetail對象,然后配合合適的Trigger,交給Scheduler進行調(diào)度。
JobDetail job = JobBuilder
//指定JOB的class
.newJob(TfCommandJob.class)
//指定Name唯一識別
.withIdentity(String.format(NAME, time), GROUP)
//指定傳入到JOB執(zhí)行時時候使用到的參數(shù)數(shù)據(jù)
.setJobData(jobDataMap)
//當Quartz服務被中止后,再次啟動任務時會嘗試恢復執(zhí)行之前未完成的所有任務
.requestRecovery()
//標識job是持久的,刪除所有觸發(fā)器的時候不被刪除
.storeDurably()
.build();
- 觸發(fā)器(Trigger):為任務設定出發(fā)條件,比如馬上,即刻,每天早上8點,每月1號12點……。在案例中是
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(String.format(TRIGGER_NAME, time), TRIGGER_GROUP)
.forJob(job)
.startNow().build();
該trigger是指馬上觸發(fā)。它還有很多類的Trigger,比如
- 調(diào)度器(Scheduler):將任務與觸發(fā)器相結(jié)合,注冊到調(diào)度器中,待執(zhí)行。這個是在配置文件 schedulerFactoryBean 定義了Scheduler構(gòu)造工廠,并且Controller中注入使用
quartzScheduler.scheduleJob(job, trigger);
調(diào)度流程分析
分析其調(diào)度過程之前,有幾個問題:
- 調(diào)度如何控制并發(fā)
- 如何做到多實例情況下不被重復執(zhí)行
- 如何保證實例停止的時候任務能被別的實例接管
就算大家沒有這幾個問題我也要提出并且講清楚這幾個問題 ^ _ ^
調(diào)度框架的啟動
我們先來分析調(diào)度框架的啟動流程。
在案例中的 QuartzConfig 文件,初始化了 SchedulerFactoryBean ,通過這個Factory創(chuàng)建了 QuartzScheduler , QuartzScheduler 這個類是Quartz的 核心 ,間接繼承了Scheduler接口,包含了調(diào)度Job、注冊JobListener實例的方法, 通過這個 Scheduler 創(chuàng)建了實際執(zhí)行調(diào)度的調(diào)度線程: QuartzSchedulerThread
而 QuartzSchedulerResources 相當于一個配置信息保存的空間,它包含創(chuàng)建QuartzScheduler實例所需的所有資源(JobStore,ThreadPool等),當然也擁有一個執(zhí)行QuartzSchedulerThread的執(zhí)行器。

StdSchedulerFactory.instantiate():生成StdScheduler過程中會new一個QuartzScheduler實例
生成StdScheduler過程中會new一個QuartzScheduler實例
在QuartzScheduler的構(gòu)造器方法里面可以看到創(chuàng)建QuartzSchedulerThread的代碼邏輯,并通過QuartzSchedulerResources對象獲取ThreadExecutor對象,最后execute新建的QuartzSchedulerThread。
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}
調(diào)度框架啟動完成之后
所有任務調(diào)度的工作都交給了 QuartzSchedulerThread 來完成:

流程圖中,QuartzSchedulerThread整個的運行方法都包含在while語句中,在執(zhí)行JOB之前會判斷 halted,paused的狀態(tài)
然后通過 QuartzSchedulerResources 這個資源存儲實例獲取線程池,查看是否有可用的線程。如果有再往下進行。再次也回答了之前提到的幾個問題中的第四個問題 任務數(shù)量過多的時候,可以控制并發(fā),控制執(zhí)行數(shù)量,避免JVM OOM
-
再有可用的線程之后再調(diào)用QuartzSchedulerResources 獲取JobStore. 這個的JobStore是負責跟蹤所有scheduler的“工作數(shù)據(jù)”,包括Jobs,Triggers,比較常用的有:
- RAMJobStore :RAMJobStore 正如它名字描述的一樣,它保存數(shù)據(jù)在內(nèi)存中的,所以只要服務重啟,任務就會丟失,而且它無法做到分布式任務調(diào)度,只能做到單機任務調(diào)度。
- JDBCJobStore : 與上面的不同,它以JDBC的方式保存數(shù)據(jù)在數(shù)據(jù)庫中,它比RAMJobStore的配置復雜一點,當然,肯定也沒有RAMJobStore快。其廣泛用于 Oracle, PostgreSQL, MySQL, MS SQLServer, HSQLDB, 和DB2。使用JDBCJobStore之前你必須首先創(chuàng)建一系列Quartz要使用的表。這個表在上文的案例中有提到。由于其數(shù)據(jù)都是存儲在數(shù)據(jù)庫中,所以實例在被停止重啟之后,JOB不會丟失 這也就回答了第二個問題
JobStore
在執(zhí)行的過程中有這么幾個方法調(diào)用
- 獲取triggers
qsRsrcs.getJobStore().acquireNextTriggers - 觸發(fā)執(zhí)行
qsRsrcs.getJobStore().triggersFired - 釋放trigger
qsRsrcs.getJobStore().releaseAcquiredTrigger - 完成觸發(fā)任務后的調(diào)用
qsRsrcs.getJobStore().triggeredJobComplete
這幾個方法都是由JobStore來調(diào)用的。而實現(xiàn)分布式任務調(diào)度需要使用到的JobStore實現(xiàn)類為數(shù)據(jù)庫類型
so .
Quartz核心表介紹
先來介紹一下Quartz的各個表。暫時先了解有這么幾個表。
| Table Name | Description |
|---|---|
| QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
| QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區(qū)信息 |
| QRTZ_FIRED_TRIGGERS | 存儲與已觸發(fā)的Trigger相關(guān)的狀態(tài)信息,以及相聯(lián)Job的執(zhí)行信息 |
| QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
| QRTZ_SCHEDULER_STATE | 存儲少量的有關(guān)Scheduler的狀態(tài)信息,和別的Scheduler實例 |
| QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
| QRTZ_JOB_DETAILS | 存儲每一個已配置的Job的詳細信息 |
| QRTZ_JOB_LISTENERS | 存儲有關(guān)已配置的JobListener的信息 |
| QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重復次數(shù)、間隔、以及已觸的次數(shù) |
| QRTZ_BLOG_TRIGGERS | Trigger作為Blob類型存儲 |
| QRTZ_TRIGGER_LISTENERS | 存儲已配置的TriggerListener的信息 |
| QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
- acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
查詢、獲取一段時間內(nèi)將要被調(diào)度的triggers.我們走進源碼看一下他的實現(xiàn):

可以看到他又調(diào)用了另一個獲取方法acquireNextTrigger


在這個方法中我們可以看到實際的執(zhí)行方法有:
getDelegate().selectTriggerToAcquire
直接執(zhí)行語句:
SELECT
TRIGGER_NAME,
TRIGGER_GROUP,
NEXT_FIRE_TIME,
PRIORITY
FROM
{ 0 } TRIGGERS
WHERE
SCHED_NAME = { 1 }
AND TRIGGER_STATE = ?
AND NEXT_FIRE_TIME <= ? AND ( MISFIRE_INSTR = - 1 OR ( MISFIRE_INSTR != - 1 AND NEXT_FIRE_TIME >= ?))
ORDER BY
NEXT_FIRE_TIME ASC,
PRIORITY DESC
getDelegate().updateTriggerStateFromOtherState
UPDATE { 0 } TRIGGERS
SET TRIGGER_STATE = ?
WHERE
SCHED_NAME = { 1 }
AND TRIGGER_NAME = ?
AND TRIGGER_GROUP = ?
AND TRIGGER_STATE = ?
getDelegate().insertFiredTrigger
INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY)
VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
第一個SQL語句,目的就是為了獲取當前所有可執(zhí)行的Trigger的記錄。
第二個SQL語句,目的是為了實現(xiàn)任務鎖定,采用的是CAS模式
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
從把Triggers里面的記錄,從一個狀態(tài)更新為另一個狀態(tài),如果更新成功,表示該任務被本實例獲取了。如果更新失敗,則表示另外一個實例搶先更改了狀態(tài)。
第三個SQL語句,在本實例已經(jīng)獲取到了Trigger的情況下,往FIRED_TRIGGERS插入一條記錄,表示當前的任務處于執(zhí)行中。
- executeInNonManagedTXLock
如果同一時間執(zhí)行Trigger的數(shù)量大于1,則屬性 org.quartz.jobStore.acquireTriggersWithinLock 應設置為true,開啟分布式悲觀鎖,可以避破壞數(shù)據(jù)。
executeInNonManagedTXLock方法源碼如下:

總體的步驟為: 獲取鎖、執(zhí)行callback、提交事務、最后釋放鎖
getLockHandler().obtainLock 表示獲取鎖
txCallback.execute(conn) 表示執(zhí)行邏輯
commitConnection(conn) 表示提交事務
releaseLock(lockName, transOwner) 表示釋放鎖

getLockHandler()方法會獲得一個信號量,如果是單機環(huán)境使用的是SimpleSemaphore,如果集群環(huán)境使用的是DBSemaphore。在集群環(huán)境下obtainLock()方法如下:

obtainLock首先判斷是否已經(jīng)獲取到鎖,如果沒有執(zhí)行方法executeSQL,其中有兩條重要的SQL,分別是:expandedSQL和expandedInsertSQL,以SCHED_NAME = ‘TfScheduler’為例:
expandedSQL:
SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = 'TfScheduler' AND LOCK_NAME = ? FOR UPDATE
expandedInsertSQL:
INSERT INTO QRTZ_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('TfScheduler', ?)
select語句后面添加了FOR UPDATE是使用了MySQL的悲觀鎖,如果LOCK_NAME存在,當多個節(jié)點去執(zhí)行此SQL時,只有第一個節(jié)點會成功,其他的節(jié)點都將進入等待;
如果LOCK_NAME不存在,多個節(jié)點同時執(zhí)行expandedInsertSQL,只會有一個節(jié)點插入成功,執(zhí)行插入失敗的節(jié)點將進入重試,重新執(zhí)行expandedSQL;
參考: