序言
筆者在《軟件設(shè)計(jì)的演變過程》一文中,將通信系統(tǒng)軟件的DDD分層模型最終演進(jìn)為五層模型,即調(diào)度層(Schedule)、事務(wù)層(Transaction DSL)、環(huán)境層(Context)、領(lǐng)域?qū)?Domain)和基礎(chǔ)設(shè)施層(Infrastructure),我們簡單回顧一下:

- 調(diào)度層:維護(hù)UE的狀態(tài)模型,只包括業(yè)務(wù)的本質(zhì)狀態(tài),將接收到的消息派發(fā)給事務(wù)層。
- 事務(wù)層:對應(yīng)一個(gè)業(yè)務(wù)流程,比如UE Attach,將各個(gè)同步消息或異步消息的處理組合成一個(gè)事務(wù),當(dāng)事務(wù)失敗時(shí),進(jìn)行回滾。當(dāng)事務(wù)層收到調(diào)度層的消息后,委托環(huán)境層的Action進(jìn)行處理。
- 環(huán)境層:以Action為單位,處理一條同步消息或異步消息,將Domain層的領(lǐng)域?qū)ο骳ast成合適的role,讓role交互起來完成業(yè)務(wù)邏輯。
- 領(lǐng)域?qū)樱翰粌H包括領(lǐng)域?qū)ο蠹捌渲g關(guān)系的建模,還包括對象的角色role的顯式建模。
- 基礎(chǔ)實(shí)施層:為其他層提供通用的技術(shù)能力,比如事務(wù)模型的框架、消息通信機(jī)制、對象持久化機(jī)制和通用的算法等
本文將聚焦于事務(wù)層,主要討論事務(wù)模型,代碼抽象層次和業(yè)務(wù)流程圖一一對應(yīng)。
同步模型
毫無疑問,異步模型是復(fù)雜的。但在管理域的組件中,對實(shí)時(shí)性和性能并沒有極致的要求,同時(shí)協(xié)程(比如,Goroutine)非常輕量級,所以使用同步模型是一種非常聰明且簡單的處理方式,如下圖所示:

在一個(gè)同步模型里,一個(gè)系統(tǒng)一旦發(fā)出一個(gè)請求消息,并需要等待其應(yīng)答,則當(dāng)前協(xié)程就會(huì)進(jìn)入休眠態(tài),直到應(yīng)答消息來臨或超時(shí)為止。協(xié)程可以看做是用戶態(tài)輕量級的線程,占用資源非常少,當(dāng)前系統(tǒng)同時(shí)可以有成百上千個(gè)協(xié)程運(yùn)行。
假定Action是一條同步消息的交互,那么業(yè)務(wù)的流程圖就對應(yīng)一個(gè)Action序列。
事務(wù)
事務(wù)(Transaction,簡寫為Trans)一詞來源于數(shù)據(jù)處理的概念,下面是Wikipedia 對事務(wù)的定義:
In computer science, transaction processing is information processing thatis divided into individual, indivisible operations, called transactions. Each transaction must succeed or fail as a complete unit; it cannot remain in an intermediate state.
一般情況下,一個(gè)單一場景的用戶流程圖就對應(yīng)一個(gè)事務(wù),而事務(wù)則由一個(gè)Action序列組成。

從S1到S2的一次同步請求處理過程中,站在S1的視角是一個(gè)Action,而站在S2的視角卻是一個(gè)事務(wù)。
事務(wù)過程控制
基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)
TransInfo
TransInfo是事務(wù)模型中一個(gè)非常重要的數(shù)據(jù)結(jié)構(gòu),用于事務(wù)執(zhí)行過程中的數(shù)據(jù)傳遞,比如事務(wù)層注入到環(huán)境層的數(shù)據(jù),Action之間串聯(lián)的數(shù)據(jù)。
TransInfo在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,如下:
type TransInfo struct {
// trans dsl framework params
Times int
RepeatIdx int
// user app info
AppInfo interface{}
}
S1Obj
當(dāng)前系統(tǒng)為S2,當(dāng)收到來自S1的同步請求時(shí),S2啟動(dòng)一個(gè)協(xié)程處理該請求。當(dāng)該協(xié)程調(diào)用到調(diào)度層后,執(zhí)行相關(guān)事務(wù)。如果該事務(wù)執(zhí)行失敗,則進(jìn)行回滾。
一個(gè)簡化的事務(wù)接口調(diào)用代碼,如下:
func scheduleS1ReqTrans(req []byte) error {
transInfo := &transdsl.TransInfo{AppInfo: &context.S2Info{}}
s1ReqTrans := trans.NewS1ReqTrans()
err = s1ReqTrans.Exec(transInfo)
if err != nil {
s1ReqTrans.RollBack(transInfo)
}
return err
}
Fragment
從語義層次上看,一個(gè)Fragment是一個(gè)流程片段。
從代碼層次上看,一個(gè)Fragment是一個(gè)interface。
Fragment在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,如下:
type Fragment interface {
Exec(transInfo *TransInfo) error
RollBack(transInfo *TransInfo)
}
Action
Action是一條同步消息的交互。所有具體的Action在環(huán)境層中定義,是提供給事務(wù)層的原子操作,是一個(gè)粒度最小的流程片段,需要實(shí)現(xiàn)Fragment接口,具體實(shí)現(xiàn)和業(yè)務(wù)緊密相關(guān)。
Procedure
Procedure是多條關(guān)系緊密的同步消息的交互,在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,是一個(gè)比Action更大的復(fù)用單元,也是一個(gè)流程片段,需要實(shí)現(xiàn)Fragment接口。
Procedure本身又是一個(gè)由Action或Procedre組成的序列,其中Action是葉子節(jié)點(diǎn),Procedure是中間節(jié)點(diǎn),所以Procedre是一棵多叉樹。
Procedure的定義如下:
type Procedure struct {
Fragments []Fragment
}
在事務(wù)層創(chuàng)建一個(gè)具體的Procedure的代碼如下:
func newProcedure1() transdsl.Fragment {
procedure := &transdsl.Procedure {
Fragments: []transdsl.Fragment {
new(context.Action11),
newProcedure2(),
new(context.Action12),
},
}
return procedure
}
func newProcedure2() transdsl.Fragment {
procedure := &transdsl.Procedure {
Fragments: []transdsl.Fragment {
new(context.Action21),
new(context.Action22),
},
}
return procedure
}
Transaction
Transaction對應(yīng)一次業(yè)務(wù)處理,在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,是該業(yè)務(wù)中最大的Procedure,當(dāng)然也是一個(gè)流程片段,需要實(shí)現(xiàn)Fragment接口。
Transaction的定義如下:
type Transaction struct {
Fragments []Fragment
errIndex int
}
說明:errIndex用于事務(wù)回滾。
在事務(wù)層創(chuàng)建一個(gè)具體的Transaction的代碼如下:
func NewS1ReqTrans() *transdsl.Transaction {
trans := &transdsl.Transaction {
Fragments: []transdsl.Fragment {
new(context.Action1),
newProcedure1(),
new(context.Action2),
},
}
return trans
}
Repeat
從語義層次來看,Repeat用來修飾Action或Procedure,說明該Action或Procedure可以執(zhí)行多次,并且至少執(zhí)行一次,同時(shí)產(chǎn)生一個(gè)新的Procedure。
從代碼層次上看,Repeat在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,實(shí)現(xiàn)了接口Fragment。
Repeat的定義如下:
type Repeat struct {
Fragments []Fragment
FuncVar fun() Fragment
}
說明:Fragments用于事務(wù)回滾。
Repeat的執(zhí)行次數(shù)是動(dòng)態(tài)確定的,即由上一個(gè)Action寫入TransInfo。
有了Repeat后,我們可以在事務(wù)層定義一個(gè)事務(wù)如下:
func NewS1ReqTrans() *transdsl.Transaction {
trans := &transdsl.Transaction {
Fragments: []transdsl.Fragment {
new(context.Action1),
new(context.Action2),
&transdsl.Repeat {
FuncVar: newProcedure1,
},
new(context.Action3),
},
}
return trans
}
Optional
Optional與Repeat類似-。
從語義層次來看,Optional用來修飾Action或Procedure,說明該Action或Procedure最多執(zhí)行一次,并且可以不執(zhí)行。
從代碼層次來看,Optional在基礎(chǔ)設(shè)施層的trans-dsl框架中定義,實(shí)現(xiàn)了接口Fragment。
Optional的定義如下:
type Optional struct {
Spec Specification
Fragment Fragment
isExec bool
}
說明:isExec用于事務(wù)回滾。
Optional的執(zhí)行次數(shù)由謂詞Specification確定,Specification是一個(gè)interface,它也在環(huán)境層定義,如下:
type Specification interface {
Ok(transInfo *TransInfo) bool
}
謂詞的實(shí)例來自兩個(gè)方面的確認(rèn):
- 系統(tǒng)的某個(gè)開關(guān)是否打開,即開關(guān)打開時(shí),謂詞為真,執(zhí)行一次Action或Procedure,否則執(zhí)行零次。
- 系統(tǒng)的當(dāng)前狀態(tài)是否滿足某個(gè)條件,即條件滿足時(shí),謂詞為真,執(zhí)行一次Action或Procedure,否則執(zhí)行零次。
有了Optional后,我們可以在事務(wù)層定義一個(gè)事務(wù)如下:
func NewS1ReqTrans() *transdsl.Transaction {
trans := &transdsl.Transaction {
Fragments: []transdsl.Fragment {
new(context.Action1),
&transdsl.Optional {
Spec: new(context.ShouldExecAction2),
Fragment: new(context.Action2),
},
&transdsl.Repeat {
FuncVar: newProcedure1,
},
new(context.Action3),
},
}
return trans
}
默認(rèn)
從語義層次來看, 沒有Repeat或Optional修飾的Action或Procedure就是默認(rèn)的情況,說明Action或Procedure僅且執(zhí)行一次。
事務(wù)回滾
對于事務(wù)來說,執(zhí)行要么成功,要么失敗。當(dāng)事務(wù)執(zhí)行失敗時(shí),必須觸發(fā)回滾,使得系統(tǒng)無資源泄露或殘留。
當(dāng)事務(wù)執(zhí)行失敗時(shí),肯定是在某一個(gè)Fragment執(zhí)行時(shí)失敗,我們記作fragments[i],事務(wù)回滾的過程為:
- fragments[i]完成自己已分配的資源的回收和自己已寫入的數(shù)據(jù)的清理;
- 從fragments[i-1]到fragments[0],依次調(diào)用它的RollBack方法。
Action
Action是事務(wù)的原子執(zhí)行者,從葉子節(jié)點(diǎn)來看,事務(wù)都是Action序列。
當(dāng)某個(gè)Action執(zhí)行失敗時(shí),在Exec方法內(nèi)進(jìn)行該Action相關(guān)的資源回收或數(shù)據(jù)清理,不會(huì)調(diào)用該Action的RollBack函數(shù)。
Action的RollBack方法實(shí)現(xiàn)很簡單,僅進(jìn)行該Action相關(guān)的所有資源回收和數(shù)據(jù)清理。
舉個(gè)例子:
Action5在執(zhí)行失敗前,打開了文件file1,在表table1中寫了一條記錄,那么它在返回error前要?jiǎng)h除表table1中的記錄,并關(guān)閉文件file1,即逆序的進(jìn)行資源回收和數(shù)據(jù)清理。
至于Action1到Action4中打開了什么資源或?qū)懥耸裁磾?shù)據(jù),Action5一點(diǎn)都不care。
Action5返回錯(cuò)誤后,事務(wù)回滾框架會(huì)自動(dòng)依次調(diào)用[Action4,Action3, Action2, Action1]的Rollback函數(shù),從而完成事務(wù)的回滾。
Procedure
如果Procedure執(zhí)行失敗,則在Exec方法中進(jìn)行“錯(cuò)誤處理”:
func (this *Procedure) Exec(transInfo *TransInfo) error {
index, err := forEachFragments(this.Fragments, transInfo)
if err != nil {
if index <= 0 {
return err
}
backEachFragments(this.Fragments, transInfo, index)
}
return err
}
Exec方法在實(shí)現(xiàn)中使用了事務(wù)層的原語forEachFragments和backEachFragments:
- 對于forEachFragments原語,正向遍歷Fragments,依次調(diào)用它的Exec方法。
- 對于backEachFragments原語,從index - 1開始反向遍歷Fragments,依次調(diào)用它的RollBack方法。
如果Procedure執(zhí)行成功,回滾時(shí)直接調(diào)用RollBack方法即可:
func (this *Procedure) RollBack(transInfo *TransInfo) {
backEachFragments(this.Fragments, transInfo, len(this.Fragments))
}
Repeat
如果Repeat執(zhí)行失敗,則進(jìn)行“錯(cuò)誤處理”:
func (this *Repeat) Exec(transInfo *TransInfo) error {
this.Fragments = make([]Fragment, transInfo.Times)
for i := 0; i < transInfo.Times; i++ {
transInfo.RepeatIdx = i
this.Fragments[i] = this.FuncVar()
err := this.Fragments[i].Exec(transInfo)
if err != nil {
if IsErrorEqual(err, ErrContinue) {
continue
}
if i == 0 {
return err
}
i--
for j := i; j >= 0; j-- {
transInfo.RepeatIdx = j
this.Fragments[j].RollBack(transInfo)
}
return err
}
}
return nil
}
這里的transInfo.RepeatIdx需要解釋一下:
- 在this.Fragments[i].Exec之前賦值為i,是為了Repeat在執(zhí)行Action或Procedure時(shí),找到對應(yīng)的領(lǐng)域?qū)ο蟆?/li>
- this.Fragments[j].RollBack之前賦值為j,是為了Repeat在“錯(cuò)誤處理”時(shí),即回滾已經(jīng)完成的Action或Procedure時(shí),找到對應(yīng)的領(lǐng)域?qū)ο?。舉個(gè)例子,比如Repeat的最大次數(shù)是5,當(dāng)進(jìn)行到第4次時(shí)發(fā)生了錯(cuò)誤,這時(shí)需要回滾前3次的Action或Procedure。
如果repeat執(zhí)行成功,回滾時(shí)直接調(diào)用RollBack方法即可:
func (this *Repeat) RollBack(transInfo *TransInfo) {
for i := transInfo.Times; i >= 0; i-- {
transInfo.RepeatIdx = i
this.Fragments[i].RollBack(transInfo)
}
}
Optional
optional就比較簡單了,如果執(zhí)行過程中發(fā)生了錯(cuò)誤,則啥也不用干,因?yàn)锳ction或Procedure已完成了錯(cuò)誤處理,如下所示:
func (this *Optional) Exec(transInfo *TransInfo) error {
if this.Spec.Ok(s1Obj, transInfo) {
this.isExec = true
return this.Fragment.Exec(s1Obj, transInfo)
}
return nil
}
如果optional執(zhí)行成功,回滾時(shí)需要根據(jù)是否執(zhí)行過Action或Procedure來進(jìn)行Action或Procedure的回滾,如下所示:
func (this *Optional) RollBack(transInfo *TransInfo) {
if this.isExec {
this.Fragment.RollBack(transInfo)
}
}
事務(wù)并發(fā)
事務(wù)的執(zhí)行過程是一個(gè)同步模型,而事務(wù)之間卻是異步的。多個(gè)事務(wù)間可能共享資源,所以要對事務(wù)進(jìn)行并發(fā)控制。
在Golang中,協(xié)程之間的并發(fā)控制一般使用channel,非常簡單且高效。
假設(shè)一組協(xié)程使用一個(gè)共享資源,這時(shí)通過一個(gè)channel控制,那么多組協(xié)程就需要多個(gè)channel來控制。我們可以使用map,key為shareId,value為channel。
讀channel
根據(jù)業(yè)務(wù)流程,要在某個(gè)Specification(謂詞,Optional的第一個(gè)參數(shù))中讀channel。假設(shè)該謂詞為IsSomethingNotExist,示例代碼如下:
func (this *IsSomethingNotExist) Ok(transInfo *transdsl.TransInfo) bool {
...
s2Info := transInfo.AppInfo.(*S2Info)
<- s2Info.Chan
s2Info.ChanFlag = true
...
}
要讀channel,必須先注入。根據(jù)局部化原則,我們在謂詞IsSomethingNotExist中進(jìn)行注入,而不在前面的Action或Specification中進(jìn)行注入,于是示例代碼變?yōu)椋?/p>
func (this *IsSomethingNotExist) Ok(transInfo *transdsl.TransInfo) bool {
...
s2Info := transInfo.AppInfo.(*S2Info)
concurrencyctrl.ChanMapLock.Lock()
value, ok := concurrencyctrl.ChanMap[shareId]
if ok {
s2Info.Chan = value
} else {
s2Info.Chan = make(chan int, 1)
s2Info.Chan <- 1
concurrencyctrl.ChanMap[shareId] = s2Info.Chan
}
concurrencyctrl.ChanMapLock.Unlock()
<- s2Info.Chan
s2Info.ChanFlag = true
...
}
寫channel
根據(jù)業(yè)務(wù)流程,要在讀channel的Specification之后的某個(gè)Action中寫channel。假設(shè)該Action為DiscussAction,示例代碼如下:
func (this *DiscussAction) Exec(transInfo *transdsl.TransInfo) error {
...
s2Info := transInfo.AppInfo.(*S2Info)
s2Info.Chan <- 1
s2Info.ChanFlag = false
...
}
細(xì)心的讀者可能已經(jīng)發(fā)現(xiàn),上面的描述“要在讀channel的Specification之后的某個(gè)Action中寫channel”存在兩種情況:
- 該Specification是optional的第一個(gè)參數(shù),而該Action或包含該Action的Procedure是對應(yīng)的第二個(gè)參數(shù)
- 該Action在該Specification對應(yīng)的optional操作之后
不管Specification的Ok方法是否返回true,第二種情況總是會(huì)進(jìn)行寫channel操作,而第一種情況則未必,即當(dāng)Specification的Ok方法返回為false時(shí),并不會(huì)進(jìn)行寫channel操作,所以有瑕疵。該瑕疵的修復(fù)方法是在該Specification的Ok方法內(nèi)進(jìn)行判斷,如果返回值為false,則進(jìn)行寫channel操作。假設(shè)該謂詞為IsSomethingNeedDel,則示例代碼為:
func (this *IsSomethingNeedDel) Ok(transInfo *transdsl.TransInfo) bool {
...
s2Info := transInfo.AppInfo.(*S2Info)
concurrencyctrl.ChanMapLock.Lock()
value, ok := concurrencyctrl.ChanMap[shareId]
if ok {
s2Info.Chan = value
} else {
s2Info.Chan = make(chan int, 1)
s2Info.Chan <- 1
concurrencyctrl.ChanMap[shareId] = s2Info.Chan
}
concurrencyctrl.ChanMapLock.Unlock()
<- s2Info.Chan
s2Info.ChanFlag = true
...
if flag {
log.Infof("***IsSomethingNeedDel: true***")
} else {
s2Info.Chan <- 1
s2Info.ChanFlag = false
log.Infof("***IsSomethingNeedDel: false***")
}
return flag
}
錯(cuò)誤和異常處理
channel的閉合操作
在事務(wù)執(zhí)行過程中,不管是遇到錯(cuò)誤還是發(fā)生了異常(panic),可能會(huì)出現(xiàn)對于channel讀了沒有寫的情況,即在事務(wù)處理過程中沒有實(shí)現(xiàn)channel的閉合操作,這將導(dǎo)致該組的其他協(xié)程(Goroutine)也阻塞了。
該問題的解決思路是在事務(wù)調(diào)度的入口方法中使用defer修飾的閉包對異常進(jìn)行捕獲,同時(shí)針對錯(cuò)誤或異常都對channel嘗試閉合操作,示例代碼如下:
func scheduleS1ReqTrans(req []byte) (err error) {
transInfo := &transdsl.TransInfo{AppInfo: &context.S2Info{}}
defer func() {
if p := recover(); p != nil {
str, ok := p.(string)
if ok {
err = errors.New(str)
} else {
err = errors.New("panic")
}
log.Info("S1ReqTrans panic recover start!")
log.Error("Stack:", string(debug.Stack()))
log.Info("S1ReqTrans panic recover end!")
}
s2Info := transInfo.AppInfo.(*S2Info)
if s2Info.ChanFlag {
s2Info.Chan <- 1
}
}()
s1ReqTrans := trans.NewS1ReqTrans()
err = s1ReqTrans.Exec(transInfo)
if err != nil {
s1ReqTrans.RollBack(transInfo)
}
return err
}
事務(wù)的閉合操作
在事務(wù)執(zhí)行的過程中,如果發(fā)生了panic,則會(huì)被調(diào)度層的recover函數(shù)恢復(fù),然后將panic轉(zhuǎn)換為error。雖然知道這次業(yè)務(wù)是失敗的,但是卻沒有觸發(fā)事務(wù)回滾操作,從而釋放已經(jīng)申請的資源,所以必須在Context層顯式的對panic進(jìn)行處理:
- 在Action的defer函數(shù)中釋放已經(jīng)成功申請的資源
- 在Specification或Action的defer函數(shù)中將panic轉(zhuǎn)換為error,觸發(fā)事務(wù)回滾流程
小結(jié)
在管理域的組件中,對實(shí)時(shí)性和性能并沒有極致的要求,同時(shí)Goroutine非常輕量級,所以使用同步模型是一種非常聰明且簡單的處理方式。本文所討論的事務(wù)模型針對的就是同步過程,先詳細(xì)闡述了事務(wù)的過程控制,然后對事務(wù)的回滾給出了通用的設(shè)計(jì)框架,最后對事務(wù)的并發(fā)控制給出了簡單高效的解決方案。事務(wù)模型在DDD的分層架構(gòu)中位于第四層,代碼抽象層次高且表達(dá)力強(qiáng),和業(yè)務(wù)流程圖一一對應(yīng),同時(shí)代碼可以以Action或Procedure為粒度進(jìn)行復(fù)用。