說明
- Xxl-Job是一個輕量級分布式任務(wù)調(diào)度平臺,其核心設(shè)計目標是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。
- Xxl-Job有兩個核心,一個調(diào)度中心,一個執(zhí)行器,需要同時部署才行。
特性
簡單:支持通過Web頁面對任務(wù)進行CRUD操作,操作簡單,一分鐘上手
動態(tài):支持動態(tài)修改任務(wù)狀態(tài)、啟動/停止任務(wù),以及終止運行中任務(wù),即時生效
調(diào)度中心HA(中心式):調(diào)度采用中心式設(shè)計,“調(diào)度中心”基于集群Quartz實現(xiàn)并支持集群部署,可保證調(diào)度中心HA
執(zhí)行器HA(分布式):任務(wù)分布式執(zhí)行,任務(wù)"執(zhí)行器"支持集群部署,可保證任務(wù)執(zhí)行HA
注冊中心: 執(zhí)行器會周期性自動注冊任務(wù), 調(diào)度中心將會自動發(fā)現(xiàn)注冊的任務(wù)并觸發(fā)執(zhí)行。同時,也支持手動錄入執(zhí)行器地址
彈性擴容縮容:一旦有新執(zhí)行器機器上線或者下線,下次調(diào)度時將會重新分配任務(wù)
路由策略:執(zhí)行器集群部署時提供豐富的路由策略,包括:第一個、最后一個、輪詢、隨機、一致性HASH、最不經(jīng)常使用、最近最久未使用、故障轉(zhuǎn)移、忙碌轉(zhuǎn)移等
故障轉(zhuǎn)移:任務(wù)路由策略選擇"故障轉(zhuǎn)移"情況下,如果執(zhí)行器集群中某一臺機器故障,將會自動Failover切換到一臺正常的執(zhí)行器發(fā)送調(diào)度請求。
阻塞處理策略:調(diào)度過于密集執(zhí)行器來不及處理時的處理策略,策略包括:單機串行(默認)、丟棄后續(xù)調(diào)度、覆蓋之前調(diào)度
任務(wù)超時控制:支持自定義任務(wù)超時時間,任務(wù)運行超時將會主動中斷任務(wù)
任務(wù)失敗重試:支持自定義任務(wù)失敗重試次數(shù),當(dāng)任務(wù)失敗時將會按照預(yù)設(shè)的失敗重試次數(shù)主動進行重試;其中分片任務(wù)支持分片粒度的失敗重試
任務(wù)失敗告警;默認提供郵件方式失敗告警,同時預(yù)留擴展接口,可方便的擴展短信、釘釘?shù)雀婢绞?分片廣播任務(wù):執(zhí)行器集群部署時,任務(wù)路由策略選擇"分片廣播"情況下,一次任務(wù)調(diào)度將會廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務(wù),可根據(jù)分片參數(shù)開發(fā)分片任務(wù)
動態(tài)分片:分片廣播任務(wù)以執(zhí)行器為維度進行分片,支持動態(tài)擴容執(zhí)行器集群從而動態(tài)增加分片數(shù)量,協(xié)同進行業(yè)務(wù)處理;在進行大數(shù)據(jù)量業(yè)務(wù)操作時可顯著提升任務(wù)處理能力和速度
事件觸發(fā):除了"Cron方式"和"任務(wù)依賴方式"觸發(fā)任務(wù)執(zhí)行之外,支持基于事件的觸發(fā)任務(wù)方式。調(diào)度中心提供觸發(fā)任務(wù)單次執(zhí)行的API服務(wù),可根據(jù)業(yè)務(wù)事件靈活觸發(fā)
任務(wù)進度監(jiān)控:支持實時監(jiān)控任務(wù)進度
Rolling實時日志:支持在線查看調(diào)度結(jié)果,并且支持以Rolling方式實時查看執(zhí)行器輸出的完整的執(zhí)行日志
GLUE:提供Web IDE,支持在線開發(fā)任務(wù)邏輯代碼,動態(tài)發(fā)布,實時編譯生效,省略部署上線的過程。支持30個版本的歷史版本回溯
腳本任務(wù):支持以GLUE模式開發(fā)和運行腳本任務(wù),包括Shell、Python、NodeJS、PHP、PowerShell等類型腳本
命令行任務(wù):原生提供通用命令行任務(wù)Handler(Bean任務(wù),"CommandJobHandler");業(yè)務(wù)方只需要提供命令行即可
任務(wù)依賴:支持配置子任務(wù)依賴,當(dāng)父任務(wù)執(zhí)行結(jié)束且執(zhí)行成功后將會主動觸發(fā)一次子任務(wù)的執(zhí)行, 多個子任務(wù)用逗號分隔
一致性:“調(diào)度中心”通過DB鎖保證集群分布式調(diào)度的一致性, 一次任務(wù)調(diào)度只會觸發(fā)一次執(zhí)行
自定義任務(wù)參數(shù):支持在線配置調(diào)度任務(wù)入?yún)?,即時生效
調(diào)度線程池:調(diào)度系統(tǒng)多線程觸發(fā)調(diào)度運行,確保調(diào)度精確執(zhí)行,不被堵塞
數(shù)據(jù)加密:調(diào)度中心和執(zhí)行器之間的通訊進行數(shù)據(jù)加密,提升調(diào)度信息安全性
郵件報警:任務(wù)失敗時支持郵件報警,支持配置多郵件地址群發(fā)報警郵件
推送maven中央倉庫: 將會把最新穩(wěn)定版推送到maven中央倉庫, 方便用戶接入和使用
運行報表:支持實時查看運行數(shù)據(jù),如任務(wù)數(shù)量、調(diào)度次數(shù)、執(zhí)行器數(shù)量等;以及調(diào)度報表,如調(diào)度日期分布圖,調(diào)度成功分布圖等
全異步:任務(wù)調(diào)度流程全異步化設(shè)計實現(xiàn),如異步調(diào)度、異步運行、異步回調(diào)等,有效對密集調(diào)度進行流量削峰,理論上支持任意時長任務(wù)的運行
跨平臺:原生提供通用HTTP任務(wù)Handler(Bean任務(wù),"HttpJobHandler"),業(yè)務(wù)方只需要提供HTTP鏈接即可,不限制語言、平臺
國際化:調(diào)度中心支持國際化設(shè)置,提供中文、英文兩種可選語言,默認為中文
容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn)產(chǎn)品開箱即用
線程池隔離:調(diào)度線程池進行隔離拆分,慢任務(wù)自動降級進入"Slow"線程池,避免耗盡調(diào)度線程,提高系統(tǒng)穩(wěn)定性
...
完整代碼地址在結(jié)尾!!
- 克隆完代碼后,進入db目錄,將sql復(fù)制到MySQL執(zhí)行一遍創(chuàng)建數(shù)據(jù)庫。
- 進入xxl-job-admin目錄,修改application.properties文件,將數(shù)據(jù)源的配置信息修改為自己剛剛創(chuàng)建的數(shù)據(jù)庫。
- 啟動xxl-job-admin項目,訪問http://localhost:8080/xxl-job-admin,用戶名:admin,密碼:123456,到此調(diào)度中心部署完畢。
第二步,部署執(zhí)行器,克隆的代碼里面有官方的示例項目,可以直接運行,也可以集成到自己的項目中,下面集成到自己的項目,在pom.xml加入依賴,如下
<!-- 任務(wù)調(diào)度xxl-job -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
第三步,編寫application.yml配置文件,如下
server:
port: 8186
spring:
application:
name: xxljob-demo-server
# xxl-job配置
xxl:
job:
admin:
# 調(diào)度中心部署跟地址 [選填]:如調(diào)度中心集群部署存在多個地址則用逗號分隔。執(zhí)行器將會使用該地址進行"執(zhí)行器心跳注冊"和"任務(wù)結(jié)果回調(diào)";為空則關(guān)閉自動注冊;
addresses: http://localhost:8080/xxl-job-admin
executor:
# 執(zhí)行器注冊 [選填]:優(yōu)先使用該配置作為注冊地址,為空時使用內(nèi)嵌服務(wù) ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執(zhí)行器動態(tài)IP和動態(tài)映射端口問題。
address:
# 執(zhí)行器AppName [選填]:執(zhí)行器心跳注冊分組依據(jù);為空則關(guān)閉自動注冊
appname: demo-app
# 執(zhí)行器IP [選填]:默認為空表示自動獲取IP,多網(wǎng)卡時可手動設(shè)置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用于 "執(zhí)行器注冊" 和 "調(diào)度中心請求并觸發(fā)任務(wù)";
ip:
# 執(zhí)行器端口號 [選填]:小于等于0則自動獲?。荒J端口為9999,單機部署多個執(zhí)行器時,注意要配置不同執(zhí)行器端口;
port: 0
# 執(zhí)行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權(quán)限;為空則使用默認路徑;
logpath: /Users/luoyu/Documents/log/xxl-job/jobhandler
# 執(zhí)行器日志文件保存天數(shù) [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關(guān)閉自動清理功能;
logretentiondays: 15
# 執(zhí)行器通訊TOKEN [選填]:非空時啟用;
accessToken: xxx
注意
- 重點關(guān)注appname,后面要用到
- 需要調(diào)度中心跟執(zhí)行器進行認證的話,把兩者的accessToken配置一樣的即可
第四步,創(chuàng)建XxlJobConfig配置文件,如下
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 針對多網(wǎng)卡、容器內(nèi)部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;
*
* 1、引入依賴:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器啟動變量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、獲取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
第五步,創(chuàng)建XxlJobService服務(wù)類,如下
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;
/**
* XxlJob開發(fā)示例(Bean模式)
*
* 開發(fā)步驟:
* 1、在Spring Bean實例中,開發(fā)Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
* 2、為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值。
* 3、執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Slf4j
@Component
public class XxlJobService {
/**
* 1、簡單任務(wù)示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
/**
* 2、分片廣播任務(wù)
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片參數(shù)
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業(yè)務(wù)邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return ReturnT.SUCCESS;
}
/**
* 3、命令行任務(wù)
*/
@XxlJob("commandJobHandler")
public ReturnT<String> commandJobHandler(String param) throws Exception {
String command = param;
int exitValue = -1;
BufferedReader bufferedReader = null;
try {
// command process
Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobLogger.log(line);
}
// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobLogger.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
}
}
/**
* 4、跨平臺Http任務(wù)
*/
@XxlJob("httpJobHandler")
public ReturnT<String> httpJobHandler(String param) throws Exception {
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(param);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod("GET");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
//Map<String, List<String>> map = connection.getHeaderFields();
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();
XxlJobLogger.log(responseMsg);
return ReturnT.SUCCESS;
} catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobLogger.log(e2);
}
}
}
/**
* 5、生命周期任務(wù)示例:任務(wù)初始化與銷毀時,支持自定義相關(guān)邏輯;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public ReturnT<String> demoJobHandler2(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
return ReturnT.SUCCESS;
}
public void init(){
log.info("init");
}
public void destroy(){
log.info("destory");
}
}
注意
- 重點關(guān)注@XxlJob,后面要用到
第六步,使用流程,如下
- 啟動執(zhí)行器項目,在調(diào)度中心->執(zhí)行器管理,創(chuàng)建新執(zhí)行器,AppName要與執(zhí)行器配置文件里面配置的名稱一致,選擇自動注冊即可
- 在調(diào)度中心->任務(wù)管理,創(chuàng)建新任務(wù),選擇我們剛剛創(chuàng)建的新執(zhí)行器,重點關(guān)注JobHandler這個配置,名稱要跟相應(yīng)的任務(wù)方法上@XxlJob注解里面的名稱一致,才能找到相應(yīng)的任務(wù)去執(zhí)行
- 創(chuàng)建完成后,在操作那里進行執(zhí)行一次,查看對應(yīng)的日志輸出,可知配置成功
- 還有多種任務(wù)的玩法,這里就不一一舉例了,需要的請自行百度查詢
注:此工程包含多個module,本文所用代碼均在xxljob-demo模塊下
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。