SpringBoot 2.2.5 整合Xxl-Job,實現(xiàn)各種任務(wù)調(diào)度

說明

  1. Xxl-Job是一個輕量級分布式任務(wù)調(diào)度平臺,其核心設(shè)計目標是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。
  2. Xxl-Job有兩個核心,一個調(diào)度中心,一個執(zhí)行器,需要同時部署才行。

特性

簡單:支持通過Web頁面對任務(wù)進行CRUD操作,操作簡單,一分鐘上手
動態(tài):支持動態(tài)修改任務(wù)狀態(tài)、啟動/停止任務(wù),以及終止運行中任務(wù),即時生效
調(diào)度中心HA(中心式):調(diào)度采用中心式設(shè)計,“調(diào)度中心”基于集群Quartz實現(xiàn)并支持集群部署,可保證調(diào)度中心HA
執(zhí)行器HA(分布式):任務(wù)分布式執(zhí)行,任務(wù)"執(zhí)行器"支持集群部署,可保證任務(wù)執(zhí)行HA
注冊中心: 執(zhí)行器會周期性自動注冊任務(wù), 調(diào)度中心將會自動發(fā)現(xiàn)注冊的任務(wù)并觸發(fā)執(zhí)行。同時,也支持手動錄入執(zhí)行器地址
彈性擴容縮容:一旦有新執(zhí)行器機器上線或者下線,下次調(diào)度時將會重新分配任務(wù)
路由策略:執(zhí)行器集群部署時提供豐富的路由策略,包括:第一個、最后一個、輪詢、隨機、一致性HASH、最不經(jīng)常使用、最近最久未使用、故障轉(zhuǎn)移、忙碌轉(zhuǎn)移等
故障轉(zhuǎn)移:任務(wù)路由策略選擇"故障轉(zhuǎn)移"情況下,如果執(zhí)行器集群中某一臺機器故障,將會自動Failover切換到一臺正常的執(zhí)行器發(fā)送調(diào)度請求。
阻塞處理策略:調(diào)度過于密集執(zhí)行器來不及處理時的處理策略,策略包括:單機串行(默認)、丟棄后續(xù)調(diào)度、覆蓋之前調(diào)度
任務(wù)超時控制:支持自定義任務(wù)超時時間,任務(wù)運行超時將會主動中斷任務(wù)
任務(wù)失敗重試:支持自定義任務(wù)失敗重試次數(shù),當(dāng)任務(wù)失敗時將會按照預(yù)設(shè)的失敗重試次數(shù)主動進行重試;其中分片任務(wù)支持分片粒度的失敗重試
任務(wù)失敗告警;默認提供郵件方式失敗告警,同時預(yù)留擴展接口,可方便的擴展短信、釘釘?shù)雀婢绞?分片廣播任務(wù):執(zhí)行器集群部署時,任務(wù)路由策略選擇"分片廣播"情況下,一次任務(wù)調(diào)度將會廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務(wù),可根據(jù)分片參數(shù)開發(fā)分片任務(wù)
動態(tài)分片:分片廣播任務(wù)以執(zhí)行器為維度進行分片,支持動態(tài)擴容執(zhí)行器集群從而動態(tài)增加分片數(shù)量,協(xié)同進行業(yè)務(wù)處理;在進行大數(shù)據(jù)量業(yè)務(wù)操作時可顯著提升任務(wù)處理能力和速度
事件觸發(fā):除了"Cron方式"和"任務(wù)依賴方式"觸發(fā)任務(wù)執(zhí)行之外,支持基于事件的觸發(fā)任務(wù)方式。調(diào)度中心提供觸發(fā)任務(wù)單次執(zhí)行的API服務(wù),可根據(jù)業(yè)務(wù)事件靈活觸發(fā)
任務(wù)進度監(jiān)控:支持實時監(jiān)控任務(wù)進度
Rolling實時日志:支持在線查看調(diào)度結(jié)果,并且支持以Rolling方式實時查看執(zhí)行器輸出的完整的執(zhí)行日志
GLUE:提供Web IDE,支持在線開發(fā)任務(wù)邏輯代碼,動態(tài)發(fā)布,實時編譯生效,省略部署上線的過程。支持30個版本的歷史版本回溯
腳本任務(wù):支持以GLUE模式開發(fā)和運行腳本任務(wù),包括Shell、Python、NodeJS、PHP、PowerShell等類型腳本
命令行任務(wù):原生提供通用命令行任務(wù)Handler(Bean任務(wù),"CommandJobHandler");業(yè)務(wù)方只需要提供命令行即可
任務(wù)依賴:支持配置子任務(wù)依賴,當(dāng)父任務(wù)執(zhí)行結(jié)束且執(zhí)行成功后將會主動觸發(fā)一次子任務(wù)的執(zhí)行, 多個子任務(wù)用逗號分隔
一致性:“調(diào)度中心”通過DB鎖保證集群分布式調(diào)度的一致性, 一次任務(wù)調(diào)度只會觸發(fā)一次執(zhí)行
自定義任務(wù)參數(shù):支持在線配置調(diào)度任務(wù)入?yún)?,即時生效
調(diào)度線程池:調(diào)度系統(tǒng)多線程觸發(fā)調(diào)度運行,確保調(diào)度精確執(zhí)行,不被堵塞
數(shù)據(jù)加密:調(diào)度中心和執(zhí)行器之間的通訊進行數(shù)據(jù)加密,提升調(diào)度信息安全性
郵件報警:任務(wù)失敗時支持郵件報警,支持配置多郵件地址群發(fā)報警郵件
推送maven中央倉庫: 將會把最新穩(wěn)定版推送到maven中央倉庫, 方便用戶接入和使用
運行報表:支持實時查看運行數(shù)據(jù),如任務(wù)數(shù)量、調(diào)度次數(shù)、執(zhí)行器數(shù)量等;以及調(diào)度報表,如調(diào)度日期分布圖,調(diào)度成功分布圖等
全異步:任務(wù)調(diào)度流程全異步化設(shè)計實現(xiàn),如異步調(diào)度、異步運行、異步回調(diào)等,有效對密集調(diào)度進行流量削峰,理論上支持任意時長任務(wù)的運行
跨平臺:原生提供通用HTTP任務(wù)Handler(Bean任務(wù),"HttpJobHandler"),業(yè)務(wù)方只需要提供HTTP鏈接即可,不限制語言、平臺
國際化:調(diào)度中心支持國際化設(shè)置,提供中文、英文兩種可選語言,默認為中文
容器化:提供官方docker鏡像,并實時更新推送dockerhub,進一步實現(xiàn)產(chǎn)品開箱即用
線程池隔離:調(diào)度線程池進行隔離拆分,慢任務(wù)自動降級進入"Slow"線程池,避免耗盡調(diào)度線程,提高系統(tǒng)穩(wěn)定性
...

完整代碼地址在結(jié)尾!!

第一步,先部署調(diào)度中心,克隆地址:https://github.com/xuxueli/xxl-job,本文用的是2.2.0-SNAPSHOT,需要下載對應(yīng)版本,步驟如下

  1. 克隆完代碼后,進入db目錄,將sql復(fù)制到MySQL執(zhí)行一遍創(chuàng)建數(shù)據(jù)庫。
  2. 進入xxl-job-admin目錄,修改application.properties文件,將數(shù)據(jù)源的配置信息修改為自己剛剛創(chuàng)建的數(shù)據(jù)庫。
  3. 啟動xxl-job-admin項目,訪問http://localhost:8080/xxl-job-admin,用戶名:admin,密碼:123456,到此調(diào)度中心部署完畢。

第二步,部署執(zhí)行器,克隆的代碼里面有官方的示例項目,可以直接運行,也可以集成到自己的項目中,下面集成到自己的項目,在pom.xml加入依賴,如下

<!-- 任務(wù)調(diào)度xxl-job -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

第三步,編寫application.yml配置文件,如下

server:
  port: 8186

spring:
  application:
    name: xxljob-demo-server

# xxl-job配置
xxl:
  job:
    admin:
      # 調(diào)度中心部署跟地址 [選填]:如調(diào)度中心集群部署存在多個地址則用逗號分隔。執(zhí)行器將會使用該地址進行"執(zhí)行器心跳注冊"和"任務(wù)結(jié)果回調(diào)";為空則關(guān)閉自動注冊;
      addresses: http://localhost:8080/xxl-job-admin
    executor:
      # 執(zhí)行器注冊 [選填]:優(yōu)先使用該配置作為注冊地址,為空時使用內(nèi)嵌服務(wù) ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執(zhí)行器動態(tài)IP和動態(tài)映射端口問題。
      address:
      # 執(zhí)行器AppName [選填]:執(zhí)行器心跳注冊分組依據(jù);為空則關(guān)閉自動注冊
      appname: demo-app
      # 執(zhí)行器IP [選填]:默認為空表示自動獲取IP,多網(wǎng)卡時可手動設(shè)置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用于 "執(zhí)行器注冊" 和 "調(diào)度中心請求并觸發(fā)任務(wù)";
      ip:
      # 執(zhí)行器端口號 [選填]:小于等于0則自動獲?。荒J端口為9999,單機部署多個執(zhí)行器時,注意要配置不同執(zhí)行器端口;
      port: 0
      # 執(zhí)行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權(quán)限;為空則使用默認路徑;
      logpath: /Users/luoyu/Documents/log/xxl-job/jobhandler
      # 執(zhí)行器日志文件保存天數(shù) [選填] : 過期日志自動清理, 限制值大于等于3時生效; 否則, 如-1, 關(guān)閉自動清理功能;
      logretentiondays: 15
    # 執(zhí)行器通訊TOKEN [選填]:非空時啟用;
    accessToken: xxx

注意

  1. 重點關(guān)注appname,后面要用到
  2. 需要調(diào)度中心跟執(zhí)行器進行認證的話,把兩者的accessToken配置一樣的即可

第四步,創(chuàng)建XxlJobConfig配置文件,如下

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 針對多網(wǎng)卡、容器內(nèi)部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;
     *
     *      1、引入依賴:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器啟動變量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、獲取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */

}

第五步,創(chuàng)建XxlJobService服務(wù)類,如下

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;

/**
 * XxlJob開發(fā)示例(Bean模式)
 *
 * 開發(fā)步驟:
 * 1、在Spring Bean實例中,開發(fā)Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
 * 2、為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值。
 * 3、執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志;
 *
 * @author xuxueli 2019-12-11 21:52:51
 */
@Slf4j
@Component
public class XxlJobService {

    /**
     * 1、簡單任務(wù)示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public ReturnT<String> demoJobHandler(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobLogger.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        return ReturnT.SUCCESS;
    }

    /**
     * 2、分片廣播任務(wù)
     */
    @XxlJob("shardingJobHandler")
    public ReturnT<String> shardingJobHandler(String param) throws Exception {

        // 分片參數(shù)
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());

        // 業(yè)務(wù)邏輯
        for (int i = 0; i < shardingVO.getTotal(); i++) {
            if (i == shardingVO.getIndex()) {
                XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
            } else {
                XxlJobLogger.log("第 {} 片, 忽略", i);
            }
        }

        return ReturnT.SUCCESS;
    }

    /**
     * 3、命令行任務(wù)
     */
    @XxlJob("commandJobHandler")
    public ReturnT<String> commandJobHandler(String param) throws Exception {
        String command = param;
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            Process process = Runtime.getRuntime().exec(command);
            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobLogger.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobLogger.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            return IJobHandler.SUCCESS;
        } else {
            return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
        }
    }

    /**
     * 4、跨平臺Http任務(wù)
     */
    @XxlJob("httpJobHandler")
    public ReturnT<String> httpJobHandler(String param) throws Exception {

        // request
        HttpURLConnection connection = null;
        BufferedReader bufferedReader = null;
        try {
            // connection
            URL realUrl = new URL(param);
            connection = (HttpURLConnection) realUrl.openConnection();

            // connection setting
            connection.setRequestMethod("GET");
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setUseCaches(false);
            connection.setReadTimeout(5 * 1000);
            connection.setConnectTimeout(3 * 1000);
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
            connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

            // do connection
            connection.connect();

            //Map<String, List<String>> map = connection.getHeaderFields();

            // valid StatusCode
            int statusCode = connection.getResponseCode();
            if (statusCode != 200) {
                throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
            }

            // result
            bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
            StringBuilder result = new StringBuilder();
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                result.append(line);
            }
            String responseMsg = result.toString();

            XxlJobLogger.log(responseMsg);
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            XxlJobLogger.log(e);
            return ReturnT.FAIL;
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (connection != null) {
                    connection.disconnect();
                }
            } catch (Exception e2) {
                XxlJobLogger.log(e2);
            }
        }

    }

    /**
     * 5、生命周期任務(wù)示例:任務(wù)初始化與銷毀時,支持自定義相關(guān)邏輯;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public ReturnT<String> demoJobHandler2(String param) throws Exception {
        XxlJobLogger.log("XXL-JOB, Hello World.");
        return ReturnT.SUCCESS;
    }

    public void init(){
        log.info("init");
    }

    public void destroy(){
        log.info("destory");
    }

}

注意

  1. 重點關(guān)注@XxlJob,后面要用到

第六步,使用流程,如下

  1. 啟動執(zhí)行器項目,在調(diào)度中心->執(zhí)行器管理,創(chuàng)建新執(zhí)行器,AppName要與執(zhí)行器配置文件里面配置的名稱一致,選擇自動注冊即可
  2. 在調(diào)度中心->任務(wù)管理,創(chuàng)建新任務(wù),選擇我們剛剛創(chuàng)建的新執(zhí)行器,重點關(guān)注JobHandler這個配置,名稱要跟相應(yīng)的任務(wù)方法上@XxlJob注解里面的名稱一致,才能找到相應(yīng)的任務(wù)去執(zhí)行
  3. 創(chuàng)建完成后,在操作那里進行執(zhí)行一次,查看對應(yīng)的日志輸出,可知配置成功
  4. 還有多種任務(wù)的玩法,這里就不一一舉例了,需要的請自行百度查詢

完整代碼地址:https://github.com/Jinhx128/springboot-demo

注:此工程包含多個module,本文所用代碼均在xxljob-demo模塊下

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容