Spring4.x 手動(dòng)事務(wù),監(jiān)聽處理未關(guān)閉事務(wù)的幾點(diǎn)思路,文末彩蛋

簡書 慢黑八
轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處,謝謝!
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注

背景

由于某項(xiàng)目獨(dú)特的特色需要手動(dòng)開啟事務(wù)。然而,在手動(dòng)開啟事務(wù)后,事務(wù)能否正常結(jié)束 commit or rollback 就出現(xiàn)了各式各樣的不確定情況。如果commit or rollback未執(zhí)行或執(zhí)行失敗,將會(huì)導(dǎo)致該事務(wù)持有的數(shù)據(jù)庫連接無法正常歸還到連接池中。高并發(fā)場景下的現(xiàn)象就是連接池中的可用連接越來越少,最后導(dǎo)致獲取連接超時(shí)的異常。

以下為手動(dòng)事務(wù)工具類
@Service
public class TransactionTool {

    //spring注入事務(wù)管理對(duì)象
    @Resource(name = "transactionManager")
    private PlatformTransactionManager transManager ;
      
    public TransactionStatus getTransSatus(int propagate) {

        // TransactionStatus.
        // TransactionDefinition
        // 事務(wù)定義
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        // 傳播范圍
        def.setPropagationBehavior(propagate);

        TransactionStatus transactionStatus = transManager.getTransaction(def);
        return transactionStatus;
    }
}

下面是開啟事務(wù)的業(yè)務(wù)處理邏輯
@Service
public class BizService{
    @Autowired
    TransactionTool transactionTool;

    public void bizMethod(){
        //以下代碼手動(dòng)開啟事務(wù)
        TransactionStatus transactionStatus = null;

        try{
            transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // ..業(yè)務(wù)邏輯
            transactionManager.commit(transactionStatus);
        }catch(Exception e){
            transactionManager.rollback(transactionStatus);
        }finally{
            //略掉一些分庫分表的特殊處理
        }
    }
}
主要導(dǎo)致事務(wù)沒有正常結(jié)束的三種場景
  • 場景 1、處理業(yè)務(wù)邏輯時(shí),拋出的是Error而不是Exception,catch接不住,導(dǎo)致rollback不能正常執(zhí)行,這也意味著事務(wù)無法正?;貪L,造成連接泄露。
  • 場景 2、處理業(yè)務(wù)邏輯時(shí),未執(zhí)行到commitreturn了,這樣也會(huì)導(dǎo)致了該事務(wù)沒有正常結(jié)束,connection沒有正常歸還連接池,造成泄露。
  • 場景3、同一個(gè)方法中事務(wù)雙開,雙關(guān),按照以下順序執(zhí)行
    開啟事務(wù)1(requires_new)-> 然后開事務(wù)2(requires_new) -> 之后提交事務(wù)1(commit) -> 在提交事務(wù)2(commit)
    事務(wù)上下文狀態(tài)切換如下:
    TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction
步驟 事務(wù)操作 TransactionSynchronizationManager 掛起\執(zhí)行
1 TS1=getTransaction(REQUIRES_NEW)
publish TE1
T1(con1)、TE1 掛起 NULL
2 TS2=getTransaction(REQUIRES_NEW)
publish TE2
T2(con2)、TE2 掛起T1,TE1
3 commit(TS1) TE2執(zhí)行,同步器清理T2
解掛步驟1掛起的null事務(wù)資源
執(zhí)行T1.commit成功
con1歸還連接池
4 commit(TS2) 當(dāng)前事務(wù)資源為null導(dǎo)致同步器
事件處理出現(xiàn)異常,導(dǎo)致con2
不能正常歸還到連接池,造成
連接泄露
執(zhí)行 T2.commit失敗
con2泄露

在開啟事務(wù)1的時(shí)候掛起的事務(wù)資源為空,在commit事務(wù)1的之后,會(huì)解掛當(dāng)前線程的事務(wù)資源為:null,提交事務(wù)2時(shí)候,如果當(dāng)前線程的事務(wù)資源為null,會(huì)拋空指針異常,最后在解綁資源unbindResource()的時(shí)候拋出以下代碼塊中的IllegalStateException異常(遺憾的是,該異常被spring框架捕獲后沒有打印出來)。最終導(dǎo)致事務(wù)2持有的連接不能正常釋放。TransactionEvent 會(huì)在事務(wù)結(jié)束的時(shí)候執(zhí)行當(dāng)前TransactionSynchronizationManager線程本地變量中的synchronizations事件。

public static Object unbindResource(Object key) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Object value = doUnbindResource(actualKey);
    if (value == null) {
        throw new IllegalStateException(
                "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    return value;
}

以上3中情況,在自動(dòng)事務(wù)@Transactional的處理邏輯中都不會(huì)出現(xiàn)。首先spring-tx都進(jìn)行了統(tǒng)一封裝充分考慮了非正常的可以。其次,在嵌套事務(wù)雙開的時(shí)候,都是先開的事務(wù)后關(guān)。所以,手動(dòng)事務(wù)一定要遵循先開的事務(wù)后關(guān)這個(gè)原則。

監(jiān)控解決未關(guān)閉事務(wù)的幾個(gè)思路
  • 思路1:采用spring的ApplicationEventPublisher的事件發(fā)布監(jiān)聽機(jī)制。
    訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)事務(wù)完成階段的監(jiān)聽,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改。
  • 思路2:在finally中對(duì)事務(wù)進(jìn)行統(tǒng)一關(guān)閉。
    調(diào)整catch的范圍,從Exception修改為Throwable捕捉到所有Exception 或者Error的情況,把commit移動(dòng)到finally中。commit的前置條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行commit。如果小伙伴覺得思路2改動(dòng)方式比較激進(jìn),想暫時(shí)先觀察一下那些服務(wù)存在 事務(wù)未正常結(jié)束 的情況,可以參考思路3。
@Service
public class BizService{
    @Autowired
    TransactionTool transactionTool;

    public void bizMethod(){
        //以下代碼手動(dòng)開啟事務(wù)
        TransactionStatus transactionStatus = null;

        try{
            transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // ..業(yè)務(wù)邏輯
        }catch(Throwable t){
            transactionManager.rollback(transactionStatus);
        }finally{
            //try..catch內(nèi)容可提煉成公共方法
            try {
                if (transactionStatus != null && transactionStatus.isNewTransaction() 
                        && !transactionStatus.isCompleted()) {
                    //TODO: arms日志輸出 堆棧相關(guān)信息
                    transactionManager.commit(transactionStatus);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //略掉一些分庫分表的特殊處理
        }
    }
}
  • 思路3: 在finally中檢查未完成的事物并進(jìn)行預(yù)警。
    預(yù)警的前提條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行預(yù)警日志信息輸出。該思路在finally中增加try..catch塊進(jìn)行檢查,對(duì)應(yīng)用程序改動(dòng)影響較小。
    需要注意的是:這種方式仍然監(jiān)控不到上文中場景3連接泄露的問題,如果想解決場景3的問題,需要從TransactionStatus中獲取事務(wù)對(duì)象,抽取ConnectionHolder中的數(shù)據(jù)庫Connection,用conn.isClosed()來判斷連接是否已經(jīng)關(guān)閉。另外還需要修改DataSourceTransactionManager源碼,把內(nèi)部類DataSourceTransactionObject的訪問修飾符從private修改為public
    參考如下代碼:
@Service
public class BizService{
    @Autowired
    TransactionTool transactionTool;

    public void bizMethod(){
        //以下代碼手動(dòng)開啟事務(wù)
        TransactionStatus transactionStatus = null;

        try{
            transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // ..業(yè)務(wù)邏輯
            transactionManager.commit(transactionStatus);
        }catch(Throwable t){
            transactionManager.rollback(transactionStatus);
        }finally{
            //try..catch內(nèi)容可提煉成公共方法
            try {
                if (transactionStatus != null && transactionStatus.isNewTransaction()) {
                    if(!transactionStatus.isCompleted()) {
                        // arms日志輸出 堆棧相關(guān)信息
                        System.out.println("事務(wù)未結(jié)束原因[事務(wù)-未完成]");
                        printStackTrace(Thread.currentThread().getStackTrace());
                    }else {
                        Connection conn = null;
                        DefaultTransactionStatus defaultTransactionStatus = (DefaultTransactionStatus)transactionStatus;
                        if(defaultTransactionStatus.getTransaction().getClass().getClassLoader() == DataSourceTransactionObject.class.getClassLoader()) {
                            conn = ((DataSourceTransactionObject)defaultTransactionStatus.getTransaction()).getConnectionHolder().getConnection();  
                            if(conn != null && conn.isClosed()==false) {
                                System.out.println("事務(wù)未結(jié)束原因[連接-未關(guān)閉]");
                                printStackTrace(Thread.currentThread().getStackTrace());
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //略掉一些分庫分表的特殊處理
        }
    }
}
接下來說下上面三種思路的可行性

[X ] 思路1,不可行
[ok] 思路2,可行
[ok] 思路3為過渡監(jiān)控性的解決方案,可行
[ok] 思路2+思路3為最終解決方案,可行

思路1中,基于spring事件的發(fā)布訂閱模式會(huì)存在什么問題?

使用spring的ApplicationEventPublisher的事件發(fā)布監(jiān)聽機(jī)制。
訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
事務(wù)完成階段**的監(jiān)聽,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改。
1、改造TransactionTool在執(zhí)行getTransSatus方法時(shí)調(diào)用publishTransactionEvent(transactionStatus , propagate)發(fā)布包含transactionId 的 "新事務(wù)事件" ,然后把需要監(jiān)控的事務(wù)事件存放在aliveTransactionMap中 。

@Service
public class TransactionTool {
    private AtomicLong transactionId = new AtomicLong(0);
    // transcatioId,BizTransactionEvent 存儲(chǔ)存活的事務(wù)事件
    public static ConcurrentHashMap<String, BizTransactionEvent> aliveTransactionMap = 
        new ConcurrentHashMap<String, BizTransactionEvent>();

    //spring注入事務(wù)管理對(duì)象
    @Resource(name = "transactionManager")
    private PlatformTransactionManager transManager ;

    @Autowired
    private ApplicationEventPublisher publisher;
    public TransactionStatus getTransSatus(int propagate) {

        // TransactionStatus.
        // TransactionDefinition
        // 事務(wù)定義
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        // 傳播范圍
        def.setPropagationBehavior(propagate);

        TransactionStatus transactionStatus = transManager.getTransaction(def);
        // 增加事務(wù)監(jiān)聽
        publishTransactionEvent(transactionStatus , propagate);
        return transactionStatus;
    }

    public void publishEvent(long tid,int propagate) {
        long temp = tid;
        StackTraceElement[] stackTraceElementArray = Thread.currentThread().getStackTrace();
        if(stackTraceElementArray.length>2) {
            if(transactionId.longValue() == Long.MAX_VALUE) {
                transactionId.compareAndSet(Long.MAX_VALUE, 0);
            }
            BizTransactionEvent bizTransactionEvent = new BizTransactionEvent();
            bizTransactionEvent.setTransactionId(""+temp);
            bizTransactionEvent.setTransactionName(stackTraceElementArray[3].getClassName()+":"
                    +stackTraceElementArray[3].getMethodName()+":"+stackTraceElementArray[3].getLineNumber());
            bizTransactionEvent.setCurrentTimeMillis(System.currentTimeMillis());
            bizTransactionEvent.setStackTraceElement(stackTraceElementArray);
            bizTransactionEvent.setPropagate(propagate);
            System.out.println("[NEWTX"+bizTransactionEvent.getTransactionId()+"]"+bizTransactionEvent.toString());
            publisher.publishEvent(bizTransactionEvent);
            //在這里處理新建的事務(wù)操作,可以放入一個(gè)map中
            TransactionTool.aliveTransactionMap.put(bizTransactionEvent.getTransactionId(), bizTransactionEvent);
        }
    }
}

2、增加事物事件類1BizTransactionEvent ,事務(wù)監(jiān)聽類BizTransactionEventListener,通過事務(wù)commit時(shí)候,同步調(diào)用標(biāo)有注解@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)afterCompletion方法把aliveTransactionMap中transactionId對(duì)應(yīng)的事務(wù)事件刪掉。

事務(wù)事件監(jiān)聽類

@Component
public class BizTransactionEventListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    public void afterCompletion(PayloadApplicationEvent<BizTransactionEvent> event) {
        System.out.println("[NEWTX" + event.getPayload().getTransactionId() + "-REMOVE]  " + event.toString()
                + "Duration:" + (System.currentTimeMillis() - event.getPayload().getCurrentTimeMillis()) + "ms");
        TransactionTool.aliveTransactionMap.remove(event.getPayload().getTransactionId());
    }
}

事務(wù)事件類

public class BizTransactionEvent {
    
    private static final int STACK_TRACE_ELEMENT_DEEP = 4;
    private String transactionId;
    private String transactionName;
    private StackTraceElement[] stackTraceElement;
    private long currentTimeMillis;
    private int propagate;
    
    
    public String getTransactionName() {
        return transactionName;
    }
    public void setTransactionName(String transactionName) {
        this.transactionName = transactionName;
    }

    
    public int getPropagate() {
        return propagate;
    }
    public void setPropagate(int propagate) {
        this.propagate = propagate;
    }
    public String getTransactionId() {
        return transactionId;
    }
    public void setTransactionId(String transactionId) {
        this.transactionId = transactionId;
    }

    public long getCurrentTimeMillis() {
        return currentTimeMillis;
    }
    public void setCurrentTimeMillis(long currentTimeMillis) {
        this.currentTimeMillis = currentTimeMillis;
    }

    public StackTraceElement[] getStackTraceElement() {
        return stackTraceElement;
    }
    public void setStackTraceElement(StackTraceElement[] stackTraceElement) {
        this.stackTraceElement = stackTraceElement;
    }
    @Override
    public String toString() {
        return "BizTransactionEvent [transactionId=" + transactionId + ", transactionName=" + transactionName
                + ", stackTraceElement=" + Arrays.toString(Arrays.copyOf(stackTraceElement, STACK_TRACE_ELEMENT_DEEP))
                + ", currentTimeMillis=" + currentTimeMillis + ", propagate=" + propagate + "]";
    }
}

3、我們可以通過監(jiān)控aliveTransactionMap中的事務(wù)事件存活時(shí)間來尋找發(fā)現(xiàn)事務(wù)未關(guān)閉的業(yè)務(wù)代碼。

代碼略...

4、我們看下以下邏輯中問題出在哪:

@Service
public class BizService{
    @Autowired
    TransactionTool transactionTool;

    @Transactional 
    public void bizMethod(){
        //以下代碼手動(dòng)開啟事務(wù)
        TransactionStatus transactionStatus1 = null;
        TransactionStatus transactionStatus2 = null;
        try{
            transactionStatus1 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // ..業(yè)務(wù)邏輯
            //transactionManager.commit(transactionStatus1);
        }catch(Exception){
            transactionManager.rollback(transactionStatus1);
        }finally{
            //略掉一些分庫分表的特殊處理
        }

        try{
            transactionStatus2 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // ..業(yè)務(wù)邏輯
            transactionManager.commit(transactionStatus2);
        }catch(Exception){
            transactionManager.rollback(transactionStatus2);
        }finally{
            //略掉一些分庫分表的特殊處理
        }
    }
}

事務(wù)上下文狀態(tài)切換如下:
TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction

步驟 事務(wù)操作 TransactionSynchronizationManager 掛起\執(zhí)行
1 @Transactional TS0=getTransaction(REQUIRESD) T0(con0) 掛起 NULL
2 TS1=getTransaction(REQUIRES_NEW)
publish TE1
T1(con1)、TE1 掛起T0
3 commit(TS1)被注掉了,不執(zhí)行 . con1連接泄露
4 TS2=getTransaction(REQUIRES_NEW)
publish TE2
T2(con1)、TE2 掛起T1、TE1
5 commit(TS2) TE2執(zhí)行,同步器清理T2
解掛步驟4的T1、TE1
執(zhí)行T2.commit成功
con2歸還連接池
6 commit(TS0) TE1執(zhí)行,同步器清理T1
解掛步驟2的T0
執(zhí)行 T0.commit成功
con0歸還連接池

這種方式的最大問題在于,程序執(zhí)行完成后,當(dāng)前線程在事務(wù)同步器中仍存在解掛的事務(wù)資源(T0),并且事務(wù)commit(TS1)沒有執(zhí)行,TE1卻被正常執(zhí)行了,同時(shí)aliveTransactionMap中的TE1被移除了,失去了后續(xù)的監(jiān)控基礎(chǔ)。
所以對(duì)于手動(dòng)事務(wù)來說,思路1比較失敗

文末彩蛋:簡述手動(dòng)Spring事務(wù)處理邏輯

spring-tx、spring-jdbc中比較重要的四個(gè)關(guān)鍵處理類:

  • AbstractPlatformTransactionManager:事務(wù)核心處理類,開啟事務(wù),掛起/恢復(fù),釋放資源等功能
  • DataSourceTransactionManager:數(shù)據(jù)庫操作都有這個(gè)類來完成,例如:setAutoCommit,commit,rollback
  • TransactionSynchronizationManager:這里的TransactionSynchronizationManager都是以線程為單位來記錄相關(guān)的資源息。resources中記錄了,key為datasource,value為ConnectionHolder的map結(jié)構(gòu)信息。上文中publisher.publishEvent(bizTransactionEvent)會(huì)把事務(wù)事件到synchronizations中,后續(xù)事務(wù)在提交的時(shí)候會(huì)執(zhí)行synchronizations中的事件。
  • DefaultTransactionStatus:存放當(dāng)前事務(wù),掛起的事務(wù)資源,事務(wù)定義等內(nèi)容。

自動(dòng)事務(wù)cglib代理可參考TransactionAspectSupport

spring事務(wù)

在事務(wù)處理的過程中參考如下步驟,偷個(gè)懶不畫時(shí)序圖了,大家按照序號(hào),腦補(bǔ)一下

[package:spring-tx]AbstractPlatformTransactionManager
1、首先調(diào)用getTransaction()方法,獲取連接,獲取當(dāng)前事務(wù)狀態(tài)
4、調(diào)用handleExistingTransaction()處理已存在的事務(wù)

  • 如果是REQUIRES_NEW就要掛起當(dāng)前存在事務(wù)、創(chuàng)建新事務(wù)把掛起的事務(wù)資源放入新事務(wù)中,并且切換TransactionSynchronizationManager的本地線程變量為新事務(wù)相關(guān)內(nèi)容,解綁當(dāng)前事務(wù)資源。
  • 如果是NESTED則需要?jiǎng)?chuàng)建保存點(diǎn)
  • 如果是REQUIRED,創(chuàng)建新把newTransaction設(shè)定為false。

5、掛起資源SuspendedResourcesHolder結(jié)構(gòu)與TransactionSynchronizationManager相同,用于解掛時(shí)恢復(fù)TransactionSynchronizationManager中的本地線程變量。
7、調(diào)用prepareSynchronization方法,初始化當(dāng)前線程的事務(wù)同步管理器,設(shè)置Threadlocal相關(guān)內(nèi)容,并反回新的TransactionStatus對(duì)象。


以下為事務(wù)提交后的操作
8、調(diào)用commit方法提交事務(wù)。這里會(huì)調(diào)用processCommit方法,在這個(gè)方法中會(huì)調(diào)用事務(wù)事件監(jiān)聽邏輯。通過ApplicationListenerMethodTransactionalAdapter處理各個(gè)不同階段的transactionEvent,需要注意的是待處理的transactionEvent是從TransactionSynchronizationManager.getSynchronizations()當(dāng)前的本地線程變量中獲取的。
9、cleanupAfterCompletion設(shè)置事務(wù)狀態(tài)為完成,清理當(dāng)前線程TransactionSynchronizationManager資源,解綁connection資源,設(shè)置autocommit=true。還原connection屬性,回并且把連接歸還給連接池。
10、調(diào)用resume()方法還原掛起的資源,繼續(xù)執(zhí)行。

[package:spring-jdbc]DataSourceTransactionManager
2、調(diào)用doGetTransaction() 獲取事務(wù)對(duì)象DataSourceTransactionObject
3、檢索綁定到當(dāng)前線程(TransactionSynchronizationManager)的資源(ConnectionHolder),把ConnectionHolder放入DataSourceTransactionObject中
6、調(diào)用dobegin開啟事務(wù)con.setAutoCommit(false);并且修改transactionActive為true。如果連接資源為空則獲取新的連接,并且在TransactionSynchronizationManager進(jìn)行資源綁定。
8.1、調(diào)用doCommit提交事務(wù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • java事務(wù)的處理 轉(zhuǎn) https://www.cnblogs.com/Bonker/p/5417967.html...
    小小的Jobs閱讀 1,642評(píng)論 0 1
  • 前面我們講到了Spring在進(jìn)行事務(wù)邏輯織入的時(shí)候,無論是事務(wù)開始,提交或者回滾,都會(huì)觸發(fā)相應(yīng)的事務(wù)事件。本文首先...
    AI喬治閱讀 1,794評(píng)論 0 1
  • 這部分的參考文檔涉及數(shù)據(jù)訪問和數(shù)據(jù)訪問層和業(yè)務(wù)或服務(wù)層之間的交互。 Spring的綜合事務(wù)管理支持覆蓋很多細(xì)節(jié),然...
    竹天亮閱讀 1,110評(píng)論 0 0
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,211評(píng)論 2 11
  • IOC和DI是什么? Spring IOC 的理解,其初始化過程? BeanFactory 和 FactoryBe...
    justlpf閱讀 3,599評(píng)論 1 21

友情鏈接更多精彩內(nèi)容