1 LiteFlow
1.1 前言
在日常的開(kāi)發(fā)過(guò)程中,經(jīng)常會(huì)遇到一些串行或者并行的業(yè)務(wù)流程問(wèn)題,而業(yè)務(wù)之間不必存在相關(guān)性。
在這樣的場(chǎng)景下,使用策略和模板模式的結(jié)合可以很好的解決這個(gè)問(wèn)題,但是使用編碼的方式會(huì)使得文件太多,在業(yè)務(wù)的部分環(huán)節(jié)可以這樣操作,在項(xiàng)目角度就無(wú)法一眼洞穿其中的環(huán)節(jié)和邏輯。
1.2 LiteFlow
1.2.1 簡(jiǎn)介
liteflow 是一個(gè)輕巧而且強(qiáng)大的規(guī)則引擎,能夠?qū)崿F(xiàn)開(kāi)箱即用,可以在短時(shí)間內(nèi)就可以完成復(fù)雜的規(guī)則編排,下圖是 liteflow 的整體架構(gòu)。liteflow 支持較多的規(guī)則文件格式,比如 xml/json/yaml, 對(duì)于規(guī)則文件的存儲(chǔ)方式可以有sql/zk/nacos/apollo 等。
通過(guò)LiteFlow我們可以把業(yè)務(wù)邏輯都定義到不同組件之中,然后使用簡(jiǎn)潔的規(guī)則文件來(lái)串聯(lián)整個(gè)流程,從而實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
LiteFlow主要特性如下:
- 組件定義統(tǒng)一:所有的邏輯都是組件,直接使用
Spring原生注解@Component定義即可。 - 規(guī)則輕量:基于規(guī)則文件來(lái)編排流程,學(xué)習(xí)規(guī)則表達(dá)式入門(mén)僅需5分鐘。
- 規(guī)則多樣化:規(guī)則支持xml、json、yml三種規(guī)則文件寫(xiě)法,喜歡哪種用哪個(gè)。
- 任意編排:同步異步混編,再?gòu)?fù)雜的邏輯過(guò)程,都能輕易實(shí)現(xiàn)。
- 規(guī)則能從任意地方加載:框架中提供本地文件配置源和zk配置源的實(shí)現(xiàn),也提供了擴(kuò)展接口。
- 優(yōu)雅熱刷新機(jī)制:規(guī)則變化,無(wú)需重啟應(yīng)用,即時(shí)改變應(yīng)用的
LiteFlowX 規(guī)則引擎官方網(wǎng)址:https://liteflow.yomahub.com
1.2.2 架構(gòu)原理

liteflow 的使用是從獲取上下文開(kāi)始的,通過(guò)數(shù)據(jù)上下文來(lái)解析對(duì)應(yīng)的規(guī)則文件,通過(guò) liteflow 執(zhí)行器來(lái)執(zhí)行對(duì)應(yīng)的鏈路,每個(gè)鏈路上都有需要執(zhí)行的業(yè)務(wù) node(即節(jié)點(diǎn)組件,可以支持多種語(yǔ)言腳本, groovy/js/python/lua等), 各個(gè)業(yè)務(wù)node 之間是獨(dú)立的。
liteflow 可以支持如下所示的復(fù)雜流程

此外,liteflow 可以支持熱部署,可以實(shí)時(shí)替換或者增加節(jié)點(diǎn),即修改規(guī)則文件后可以實(shí)時(shí)生效。

1.3 插件及簡(jiǎn)單使用
LiteFlow 還擁有自己的IDEA插件LiteFlowX,通過(guò)該插件能支持規(guī)則文件的智能提示、語(yǔ)法高亮、組件與規(guī)則文件之間的跳轉(zhuǎn)及LiteFlow工具箱等功能,強(qiáng)烈建議大家安裝下。
首先我們?cè)?code>IDEA的插件市場(chǎng)中安裝該插件;

安裝好LiteFlowX插件后,我們代碼中所定義的組件和規(guī)則文件都會(huì)顯示特定的圖標(biāo);

當(dāng)我們編輯規(guī)則文件時(shí),會(huì)提示我們已經(jīng)定義好的組件,并支持從規(guī)則文件中跳轉(zhuǎn)到組件;

還支持從右側(cè)打開(kāi)工具箱,快捷查看組件和規(guī)則文件。

1.4 規(guī)則表達(dá)式
接下來(lái)我們學(xué)習(xí)下規(guī)則表達(dá)式,也就是規(guī)則文件的編寫(xiě),入門(mén)表達(dá)式非常簡(jiǎn)單,但這對(duì)使用LiteFlow非常有幫助
1.4.1 串行編排
當(dāng)我們想要依次執(zhí)行a、b、c、d四個(gè)組件時(shí),直接使用THEN關(guān)鍵字即可。
<chain name="chain1">
THEN(a, b, c, d);
</chain>
1.4.2 并行編排
如果想并行執(zhí)行a、b、c三個(gè)組件的話(huà),可以使用WHEN關(guān)鍵字。
<chain name="chain1">
WHEN(a, b, c);
</chain>
1.4.3 選擇編排
如果想實(shí)現(xiàn)代碼中的switch邏輯的話(huà),例如通過(guò)a組件的返回結(jié)果進(jìn)行判斷,如果返回的是組件名稱(chēng)b的話(huà)則執(zhí)行b組件,可以使用SWITCH關(guān)鍵字。
<chain name="chain1">
SWITCH(a).to(b, c, d);
</chain>
1.4.4 條件編排
如果想實(shí)現(xiàn)代碼中的if邏輯的話(huà),例如當(dāng)x組件返回為true時(shí)執(zhí)行a,可以使用IF關(guān)鍵字
<chain name="chain1">
IF(x, a);
</chain>
如果想實(shí)現(xiàn)if的三元運(yùn)算符邏輯的話(huà),例如x組件返回為true時(shí)執(zhí)行a組件,返回為false時(shí)執(zhí)行b組件,可以編寫(xiě)如下規(guī)則文件。
<chain name="chain1">
IF(x, a, b);
</chain>
如果想實(shí)現(xiàn)if else邏輯的話(huà),可以使用ELSE關(guān)鍵字,和上面實(shí)現(xiàn)效果等價(jià)。
<chain name="chain1">
IF(x, a).ELSE(b);
</chain>
如果想實(shí)現(xiàn)else if邏輯的話(huà),可以使用ELIF關(guān)鍵字。
<chain name="chain1">
IF(x1, a).ELIF(x2, b).ELSE(c);
</chain>
1.4.5 使用子流程
當(dāng)某些流程比較復(fù)雜時(shí),我們可以定義子流程,然后在主流程中引用,這樣邏輯會(huì)比較清晰。
例如我們有如下子流程,執(zhí)行C、D組件。
<chain name="subChain">
THEN(C, D);
</chain>
然后我們直接在主流程中引用子流程即可。
<chain name="mainChain">
THEN(
A, B,
subChain,
E
);
</chain>
1.5 使用
1.5.1 配置
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-spring-boot-starter</artifactId>
<version>2.10.6</version>
</dependency>
接下來(lái)修改配置文件application.yml,配置好LiteFlow的規(guī)則文件;在 liteflow 中,需要配置的內(nèi)容有規(guī)則文件地址,節(jié)點(diǎn)重試(執(zhí)行報(bào)錯(cuò)時(shí)可以進(jìn)行重試,類(lèi)似于 spring-retry), 流程并行執(zhí)行線(xiàn)程池參數(shù)配置,流程的請(qǐng)求ID配置。
server:
port: 8580
liteflow:
#規(guī)則文件路徑
rule-source: liteflow/*.el.xml
retry-count: 0
print-execution-log: true
monitor:
enable-log: true
period: 300000
request-id-generator-class: com.platform.orderserver.config.AppRequestIdGenerator
# 上下文的最大數(shù)量槽
slot-size : 10240
# 線(xiàn)程數(shù),默認(rèn)為64
main-executor-works: 64
# 異步線(xiàn)程最長(zhǎng)等待時(shí)間 秒
when-max-wait-seconds: 15
# when 節(jié)點(diǎn)全局異步線(xiàn)程池最大線(xiàn)程數(shù)
when-max-workers: 16
# when 節(jié)點(diǎn)全局異步線(xiàn)程池隊(duì)列數(shù)
when-queue-limit: 5120
# 在啟動(dòng)的時(shí)候就解析規(guī)則
parse-on-start: true
enable: true
1.5.2 組件
1.5.2.1 組件講解
首先我們需要定義好各個(gè)組件,普通組件需要繼承NodeComponent并實(shí)現(xiàn)process()方法,還需設(shè)置@Component注解的名稱(chēng),可以通過(guò)重寫(xiě)isAccess方法來(lái)決定是否執(zhí)行該組件;
liteflow 的組件在規(guī)則文件中即對(duì)應(yīng)的節(jié)點(diǎn),組件對(duì)應(yīng)的種類(lèi)有很多,具體的如下所示:
- 普通組件
普通組件需要集成的是NodeComponent, 可以用在when 和 then邏輯中,具體的業(yè)務(wù)需要在process中去執(zhí)行。同時(shí)在node節(jié)點(diǎn)中,可以覆蓋isAccess方法,表示是否進(jìn)入該節(jié)點(diǎn)執(zhí)行業(yè)務(wù)邏輯,isContinueOnError判斷在出錯(cuò)的情況下是否繼續(xù)執(zhí)行下一個(gè)組件,默認(rèn)為 false。isEnd方法表示是否終止流程,默認(rèn)為true。 - 選擇組件
選擇組件是通過(guò)業(yè)務(wù)邏輯來(lái)判斷接下來(lái)的動(dòng)作要執(zhí)行哪一個(gè)節(jié)點(diǎn),類(lèi)似于Java中的 switch, 在代碼中則需要繼承NodeSwitchComponent實(shí)現(xiàn)processWitch方法來(lái)處理業(yè)務(wù)。 - 條件組件
條件組件稱(chēng)之為if組件,返回的結(jié)果是 true 或者 false, 代碼需要集成NodeIfComponent重寫(xiě)processIf方法,返回對(duì)應(yīng)的業(yè)務(wù)節(jié)點(diǎn),這個(gè)和選擇組件類(lèi)似。
1.5.2.2 組件使用
@Component("couponCmp")
public class CouponCmp extends NodeComponent {
@Override
public void process() throws Exception {
PriceContext context = this.getContextBean(PriceContext.class);
/**這里Mock下根據(jù)couponId取到的優(yōu)惠卷面值為15元**/
Long couponId = context.getCouponId();
BigDecimal couponPrice = new BigDecimal(15);
BigDecimal prePrice = context.getLastestPriceStep().getCurrPrice();
BigDecimal currPrice = prePrice.subtract(couponPrice);
context.addPriceStep(new PriceStepVO(PriceTypeEnum.COUPON_DISCOUNT,
couponId.toString(),
prePrice,
currPrice.subtract(prePrice),
currPrice,
PriceTypeEnum.COUPON_DISCOUNT.getName()));
}
@Override
public boolean isAccess() {
PriceContext context = this.getContextBean(PriceContext.class);
if(context.getCouponId() != null){
return true;
}else{
return false;
}
}
}
較特殊組件,比如用于判斷是按國(guó)內(nèi)運(yùn)費(fèi)計(jì)算規(guī)則來(lái)計(jì)算還是境外規(guī)則的條件組件,需要繼承NodeSwitchComponent并實(shí)現(xiàn)processSwitch()方法;
@Component("postageCondCmp")
public class PostageCondCmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
PriceContext context = this.getContextBean(PriceContext.class);
//根據(jù)參數(shù)oversea來(lái)判斷是否境外購(gòu),轉(zhuǎn)到相應(yīng)的組件
boolean oversea = context.isOversea();
if(oversea){
return "overseaPostageCmp";
}else{
return "postageCmp";
}
}
}
定義好組件之后就可以通過(guò)規(guī)則文件將所有流程連接起來(lái)了
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="promotionChain">
THEN(fullCutCmp, fullDiscountCmp, rushBuyCmp);
</chain>
</flow>
最后在Controller中添加接口,然后調(diào)用FlowExecutor類(lèi)的執(zhí)行方法即可;
@Controller
public class PriceExampleController {
@Resource
private FlowExecutor flowExecutor;
@RequestMapping(value = "/submit", method = RequestMethod.POST)
@ResponseBody
public String submit(@Nullable @RequestBody String reqData) {
try {
PriceCalcReqVO req = JSON.parseObject(reqData, PriceCalcReqVO.class);
LiteflowResponse response = flowExecutor.execute2Resp("promotionChain", req, PriceContext.class);
return response.getContextBean(PriceContext.class).getPrintLog();
} catch (Throwable t) {
t.printStackTrace();
return "error";
}
}
}
1.5.3 數(shù)據(jù)上下文
我們平時(shí)在寫(xiě)復(fù)雜代碼時(shí),后面一步經(jīng)常會(huì)用到前面一步的結(jié)果,然而使用LiteFlow之后,組件里并沒(méi)有參數(shù)傳遞,那么各個(gè)流程中參數(shù)是這么處理的?其實(shí)是LiteFlow中有個(gè)上下文的概念,流程中的所有數(shù)據(jù)都統(tǒng)一存放在此,比如上面的PriceContext類(lèi);
在 liteflow 中,數(shù)據(jù)上下文的概念非常重要,上下文對(duì)象起到參數(shù)傳遞的作用,因?yàn)椴煌瑯I(yè)務(wù)需要的輸入輸出參數(shù)是不同的,所以上下文非常的重要。
# 執(zhí)行流程時(shí),需要傳遞el文件,初始化參數(shù)以及上下文對(duì)象,這里的上下文可以設(shè)置多個(gè)
LiteflowResponse response = flowExecutor.execute2Resp("chain1", 流程初始參數(shù), CustomContext.class);
因?yàn)樯舷挛膫魅氲氖且粋€(gè) class 類(lèi)型參數(shù),流程參數(shù)是可以傳入?yún)?shù)的,一般情況下是在第一個(gè)節(jié)點(diǎn)中,將傳入?yún)?shù)設(shè)置到上下文對(duì)象中。
1.6 業(yè)務(wù)實(shí)踐
使用電商場(chǎng)景的應(yīng)用,訂單完成后,進(jìn)行積分的發(fā)放,消息發(fā)送,同時(shí)并行發(fā)送短信和郵件。
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="test_flow">
THEN(
prepareTrade, grantScore, sendMq, WHEN(sendEmail, sendPhone)
);
</chain>
</flow>
在訂單完成之后異步執(zhí)行,傳遞參數(shù)并執(zhí)行相應(yīng)的規(guī)則流程。
/**
*處理 交易完成后任務(wù),異步執(zhí)行
*/
@Async(value = "getAsyncExecutor")
public void handleApp(AppFlowDto flowDto){
// 使用的規(guī)則文件,傳遞參數(shù),上下文對(duì)象
LiteflowResponse response = flowExecutor.execute2Resp("test_flow", flowDto, AppFLowContext.class);
// 獲取流程執(zhí)行后的結(jié)果
if (!response.isSuccess()) {
Exception e = response.getCause();
Log.warn(" error is {}", e.getCause(),e);
}
AppFlowContext context = response.getContextBean(AppFlowContext.class);
log.info("handleApp 執(zhí)行完成后 context {}",JSONObject.toJSONString(context));
}
在正式處理業(yè)務(wù)流程之前,需要先進(jìn)行數(shù)據(jù)的預(yù)處理,將流程入?yún)⑥D(zhuǎn)轉(zhuǎn)換成上下文對(duì)象,方便參數(shù)的傳遞和設(shè)置。
/**
@Description 數(shù)據(jù)準(zhǔn)備和校驗(yàn)處理
*/
@Slf4j
@Component(valve = "prepareTrade")
public class PrepareTrade extends NodeComponent {
@Override
public void process() throws Exception {
log.info("交易完成后業(yè)務(wù)處理數(shù)據(jù)準(zhǔn)備和校驗(yàn)");
//拿到請(qǐng)求參數(shù)
AppFlowDto req = this.getslot().getRequestData();
log.info("請(qǐng)求參數(shù) {}",JSONObject.toJSONString(req));
// 停止任務(wù)
// setIsEnd(Boolean.TRUE);
AppFlowContext context = this.getContextBean(AppFlowContext.class);
log.info("設(shè)置上下文對(duì)象{}",JSONObject.toJSONString(context));
}
在具體的業(yè)務(wù)處理環(huán)節(jié),以積分發(fā)放為例,可以獲取上下文對(duì)象進(jìn)行業(yè)務(wù)操作,同時(shí)也可以重寫(xiě) isAccess 方法,來(lái)判斷是否處理該節(jié)點(diǎn)。
@Slf4j
@Component(value="grantScore")
public class GrantScore extends NodeComponent {
@Override
public void process() throws Exception {
AppFlowContext context = this.getContextBean(AppFlowContext.class);
log.info("business cxt {}",JSONObject.toJSONString(context));
TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,20));
}
//是否處理該節(jié)點(diǎn)
@Override
public boolean isAccess() throws Exception {
AppFlowContext context = this.getContextBean(AppFlowContext.class);
log.info("判斷是否處理該節(jié)點(diǎn) cxt {}",JSONObject.toJSONString(context));
return Boolean.TRUE;
}
}
如上所示,具體的業(yè)務(wù)流程都可以抽象成一個(gè) node 節(jié)點(diǎn),存放在 test_flow.el.xml 中進(jìn)行執(zhí)行