簡書 慢黑八
轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處,謝謝!
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注
背景
由于某項(xiàng)目獨(dú)特的特色需要手動(dòng)開啟事務(wù)。然而,在手動(dòng)開啟事務(wù)后,事務(wù)能否正常結(jié)束 commit or rollback 就出現(xiàn)了各式各樣的不確定情況。如果commit or rollback未執(zhí)行或執(zhí)行失敗,將會(huì)導(dǎo)致該事務(wù)持有的數(shù)據(jù)庫連接無法正常歸還到連接池中。高并發(fā)場景下的現(xiàn)象就是連接池中的可用連接越來越少,最后導(dǎo)致獲取連接超時(shí)的異常。
以下為手動(dòng)事務(wù)工具類
@Service
public class TransactionTool {
//spring注入事務(wù)管理對(duì)象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事務(wù)定義
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 傳播范圍
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
return transactionStatus;
}
}
下面是開啟事務(wù)的業(yè)務(wù)處理邏輯
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus);
}catch(Exception e){
transactionManager.rollback(transactionStatus);
}finally{
//略掉一些分庫分表的特殊處理
}
}
}
主要導(dǎo)致事務(wù)沒有正常結(jié)束的三種場景
- 場景 1、處理
業(yè)務(wù)邏輯時(shí),拋出的是Error而不是Exception,catch接不住,導(dǎo)致rollback不能正常執(zhí)行,這也意味著事務(wù)無法正?;貪L,造成連接泄露。 - 場景 2、處理
業(yè)務(wù)邏輯時(shí),未執(zhí)行到commit就return了,這樣也會(huì)導(dǎo)致了該事務(wù)沒有正常結(jié)束,connection沒有正常歸還連接池,造成泄露。 - 場景3、同一個(gè)方法中事務(wù)雙開,雙關(guān),按照以下順序執(zhí)行
開啟事務(wù)1(requires_new)-> 然后開事務(wù)2(requires_new) -> 之后提交事務(wù)1(commit) -> 在提交事務(wù)2(commit)
事務(wù)上下文狀態(tài)切換如下:
TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction
| 步驟 | 事務(wù)操作 | TransactionSynchronizationManager | 掛起\執(zhí)行 |
|---|---|---|---|
| 1 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)、TE1 | 掛起 NULL |
| 2 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con2)、TE2 | 掛起T1,TE1 |
| 3 | commit(TS1) | TE2執(zhí)行,同步器清理T2 解掛步驟1掛起的null事務(wù)資源 |
執(zhí)行T1.commit成功 con1歸還連接池 |
| 4 | commit(TS2) | 當(dāng)前事務(wù)資源為null導(dǎo)致同步器 事件處理出現(xiàn)異常,導(dǎo)致con2 不能正常歸還到連接池,造成 連接泄露 |
執(zhí)行 T2.commit失敗 con2泄露 |
在開啟事務(wù)1的時(shí)候掛起的事務(wù)資源為空,在commit事務(wù)1的之后,會(huì)解掛當(dāng)前線程的事務(wù)資源為:null,提交事務(wù)2時(shí)候,如果當(dāng)前線程的事務(wù)資源為null,會(huì)拋空指針異常,最后在解綁資源unbindResource()的時(shí)候拋出以下代碼塊中的IllegalStateException異常(遺憾的是,該異常被spring框架捕獲后沒有打印出來)。最終導(dǎo)致事務(wù)2持有的連接不能正常釋放。TransactionEvent 會(huì)在事務(wù)結(jié)束的時(shí)候執(zhí)行當(dāng)前TransactionSynchronizationManager線程本地變量中的synchronizations事件。
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
以上3中情況,在自動(dòng)事務(wù)@Transactional的處理邏輯中都不會(huì)出現(xiàn)。首先spring-tx都進(jìn)行了統(tǒng)一封裝充分考慮了非正常的可以。其次,在嵌套事務(wù)雙開的時(shí)候,都是先開的事務(wù)后關(guān)。所以,手動(dòng)事務(wù)一定要遵循先開的事務(wù)后關(guān)這個(gè)原則。
監(jiān)控解決未關(guān)閉事務(wù)的幾個(gè)思路
-
思路1:采用spring的
ApplicationEventPublisher的事件發(fā)布監(jiān)聽機(jī)制。
訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)事務(wù)完成階段的監(jiān)聽,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改。 -
思路2:在finally中對(duì)事務(wù)進(jìn)行統(tǒng)一關(guān)閉。
調(diào)整catch的范圍,從Exception修改為Throwable捕捉到所有Exception或者Error的情況,把commit移動(dòng)到finally中。commit的前置條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行commit。如果小伙伴覺得思路2改動(dòng)方式比較激進(jìn),想暫時(shí)先觀察一下那些服務(wù)存在 事務(wù)未正常結(jié)束 的情況,可以參考思路3。
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch內(nèi)容可提煉成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()
&& !transactionStatus.isCompleted()) {
//TODO: arms日志輸出 堆棧相關(guān)信息
transactionManager.commit(transactionStatus);
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分庫分表的特殊處理
}
}
}
-
思路3: 在finally中檢查未完成的事物并進(jìn)行預(yù)警。
預(yù)警的前提條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted(),這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行預(yù)警日志信息輸出。該思路在finally中增加try..catch塊進(jìn)行檢查,對(duì)應(yīng)用程序改動(dòng)影響較小。
需要注意的是:這種方式仍然監(jiān)控不到上文中場景3連接泄露的問題,如果想解決場景3的問題,需要從TransactionStatus中獲取事務(wù)對(duì)象,抽取ConnectionHolder中的數(shù)據(jù)庫Connection,用conn.isClosed()來判斷連接是否已經(jīng)關(guān)閉。另外還需要修改DataSourceTransactionManager源碼,把內(nèi)部類DataSourceTransactionObject的訪問修飾符從private修改為public。
參考如下代碼:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus);
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch內(nèi)容可提煉成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()) {
if(!transactionStatus.isCompleted()) {
// arms日志輸出 堆棧相關(guān)信息
System.out.println("事務(wù)未結(jié)束原因[事務(wù)-未完成]");
printStackTrace(Thread.currentThread().getStackTrace());
}else {
Connection conn = null;
DefaultTransactionStatus defaultTransactionStatus = (DefaultTransactionStatus)transactionStatus;
if(defaultTransactionStatus.getTransaction().getClass().getClassLoader() == DataSourceTransactionObject.class.getClassLoader()) {
conn = ((DataSourceTransactionObject)defaultTransactionStatus.getTransaction()).getConnectionHolder().getConnection();
if(conn != null && conn.isClosed()==false) {
System.out.println("事務(wù)未結(jié)束原因[連接-未關(guān)閉]");
printStackTrace(Thread.currentThread().getStackTrace());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分庫分表的特殊處理
}
}
}
接下來說下上面三種思路的可行性
[X ] 思路1,不可行
[ok] 思路2,可行
[ok] 思路3為過渡監(jiān)控性的解決方案,可行
[ok] 思路2+思路3為最終解決方案,可行
思路1中,基于spring事件的發(fā)布訂閱模式會(huì)存在什么問題?
使用spring的ApplicationEventPublisher的事件發(fā)布監(jiān)聽機(jī)制。
訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)事務(wù)完成階段**的監(jiān)聽,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改。
1、改造TransactionTool在執(zhí)行getTransSatus方法時(shí)調(diào)用publishTransactionEvent(transactionStatus , propagate)發(fā)布包含transactionId 的 "新事務(wù)事件" ,然后把需要監(jiān)控的事務(wù)事件存放在aliveTransactionMap中 。
@Service
public class TransactionTool {
private AtomicLong transactionId = new AtomicLong(0);
// transcatioId,BizTransactionEvent 存儲(chǔ)存活的事務(wù)事件
public static ConcurrentHashMap<String, BizTransactionEvent> aliveTransactionMap =
new ConcurrentHashMap<String, BizTransactionEvent>();
//spring注入事務(wù)管理對(duì)象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
@Autowired
private ApplicationEventPublisher publisher;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事務(wù)定義
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 傳播范圍
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
// 增加事務(wù)監(jiān)聽
publishTransactionEvent(transactionStatus , propagate);
return transactionStatus;
}
public void publishEvent(long tid,int propagate) {
long temp = tid;
StackTraceElement[] stackTraceElementArray = Thread.currentThread().getStackTrace();
if(stackTraceElementArray.length>2) {
if(transactionId.longValue() == Long.MAX_VALUE) {
transactionId.compareAndSet(Long.MAX_VALUE, 0);
}
BizTransactionEvent bizTransactionEvent = new BizTransactionEvent();
bizTransactionEvent.setTransactionId(""+temp);
bizTransactionEvent.setTransactionName(stackTraceElementArray[3].getClassName()+":"
+stackTraceElementArray[3].getMethodName()+":"+stackTraceElementArray[3].getLineNumber());
bizTransactionEvent.setCurrentTimeMillis(System.currentTimeMillis());
bizTransactionEvent.setStackTraceElement(stackTraceElementArray);
bizTransactionEvent.setPropagate(propagate);
System.out.println("[NEWTX"+bizTransactionEvent.getTransactionId()+"]"+bizTransactionEvent.toString());
publisher.publishEvent(bizTransactionEvent);
//在這里處理新建的事務(wù)操作,可以放入一個(gè)map中
TransactionTool.aliveTransactionMap.put(bizTransactionEvent.getTransactionId(), bizTransactionEvent);
}
}
}
2、增加事物事件類1BizTransactionEvent ,事務(wù)監(jiān)聽類BizTransactionEventListener,通過事務(wù)commit時(shí)候,同步調(diào)用標(biāo)有注解@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)的afterCompletion方法把aliveTransactionMap中transactionId對(duì)應(yīng)的事務(wù)事件刪掉。
事務(wù)事件監(jiān)聽類
@Component
public class BizTransactionEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<BizTransactionEvent> event) {
System.out.println("[NEWTX" + event.getPayload().getTransactionId() + "-REMOVE] " + event.toString()
+ "Duration:" + (System.currentTimeMillis() - event.getPayload().getCurrentTimeMillis()) + "ms");
TransactionTool.aliveTransactionMap.remove(event.getPayload().getTransactionId());
}
}
事務(wù)事件類
public class BizTransactionEvent {
private static final int STACK_TRACE_ELEMENT_DEEP = 4;
private String transactionId;
private String transactionName;
private StackTraceElement[] stackTraceElement;
private long currentTimeMillis;
private int propagate;
public String getTransactionName() {
return transactionName;
}
public void setTransactionName(String transactionName) {
this.transactionName = transactionName;
}
public int getPropagate() {
return propagate;
}
public void setPropagate(int propagate) {
this.propagate = propagate;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public long getCurrentTimeMillis() {
return currentTimeMillis;
}
public void setCurrentTimeMillis(long currentTimeMillis) {
this.currentTimeMillis = currentTimeMillis;
}
public StackTraceElement[] getStackTraceElement() {
return stackTraceElement;
}
public void setStackTraceElement(StackTraceElement[] stackTraceElement) {
this.stackTraceElement = stackTraceElement;
}
@Override
public String toString() {
return "BizTransactionEvent [transactionId=" + transactionId + ", transactionName=" + transactionName
+ ", stackTraceElement=" + Arrays.toString(Arrays.copyOf(stackTraceElement, STACK_TRACE_ELEMENT_DEEP))
+ ", currentTimeMillis=" + currentTimeMillis + ", propagate=" + propagate + "]";
}
}
3、我們可以通過監(jiān)控aliveTransactionMap中的事務(wù)事件存活時(shí)間來尋找發(fā)現(xiàn)事務(wù)未關(guān)閉的業(yè)務(wù)代碼。
代碼略...
4、我們看下以下邏輯中問題出在哪:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
@Transactional
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus1 = null;
TransactionStatus transactionStatus2 = null;
try{
transactionStatus1 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
//transactionManager.commit(transactionStatus1);
}catch(Exception){
transactionManager.rollback(transactionStatus1);
}finally{
//略掉一些分庫分表的特殊處理
}
try{
transactionStatus2 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus2);
}catch(Exception){
transactionManager.rollback(transactionStatus2);
}finally{
//略掉一些分庫分表的特殊處理
}
}
}
事務(wù)上下文狀態(tài)切換如下:
TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction
| 步驟 | 事務(wù)操作 | TransactionSynchronizationManager | 掛起\執(zhí)行 |
|---|---|---|---|
| 1 | @Transactional TS0=getTransaction(REQUIRESD) | T0(con0) | 掛起 NULL |
| 2 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)、TE1 | 掛起T0 |
| 3 | commit(TS1)被注掉了,不執(zhí)行 | . | con1連接泄露 |
| 4 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con1)、TE2 | 掛起T1、TE1 |
| 5 | commit(TS2) | TE2執(zhí)行,同步器清理T2 解掛步驟4的T1、TE1 |
執(zhí)行T2.commit成功 con2歸還連接池 |
| 6 | commit(TS0) | TE1執(zhí)行,同步器清理T1 解掛步驟2的T0 |
執(zhí)行 T0.commit成功 con0歸還連接池 |
這種方式的最大問題在于,程序執(zhí)行完成后,當(dāng)前線程在事務(wù)同步器中仍存在解掛的事務(wù)資源(T0),并且事務(wù)commit(TS1)沒有執(zhí)行,TE1卻被正常執(zhí)行了,同時(shí)aliveTransactionMap中的TE1被移除了,失去了后續(xù)的監(jiān)控基礎(chǔ)。
所以對(duì)于手動(dòng)事務(wù)來說,思路1比較失敗
文末彩蛋:簡述手動(dòng)Spring事務(wù)處理邏輯
spring-tx、spring-jdbc中比較重要的四個(gè)關(guān)鍵處理類:
-
AbstractPlatformTransactionManager:事務(wù)核心處理類,開啟事務(wù),掛起/恢復(fù),釋放資源等功能 -
DataSourceTransactionManager:數(shù)據(jù)庫操作都有這個(gè)類來完成,例如:setAutoCommit,commit,rollback -
TransactionSynchronizationManager:這里的TransactionSynchronizationManager都是以線程為單位來記錄相關(guān)的資源息。resources中記錄了,key為datasource,value為ConnectionHolder的map結(jié)構(gòu)信息。上文中publisher.publishEvent(bizTransactionEvent)會(huì)把事務(wù)事件到synchronizations中,后續(xù)事務(wù)在提交的時(shí)候會(huì)執(zhí)行synchronizations中的事件。 -
DefaultTransactionStatus:存放當(dāng)前事務(wù),掛起的事務(wù)資源,事務(wù)定義等內(nèi)容。
自動(dòng)事務(wù)cglib代理可參考TransactionAspectSupport類

在事務(wù)處理的過程中參考如下步驟,偷個(gè)懶不畫時(shí)序圖了,大家按照序號(hào),腦補(bǔ)一下
[package:spring-tx]AbstractPlatformTransactionManager
1、首先調(diào)用getTransaction()方法,獲取連接,獲取當(dāng)前事務(wù)狀態(tài)
4、調(diào)用handleExistingTransaction()處理已存在的事務(wù)
- 如果是
REQUIRES_NEW就要掛起當(dāng)前存在事務(wù)、創(chuàng)建新事務(wù)把掛起的事務(wù)資源放入新事務(wù)中,并且切換TransactionSynchronizationManager的本地線程變量為新事務(wù)相關(guān)內(nèi)容,解綁當(dāng)前事務(wù)資源。 - 如果是
NESTED則需要?jiǎng)?chuàng)建保存點(diǎn) - 如果是
REQUIRED,創(chuàng)建新把newTransaction設(shè)定為false。
5、掛起資源SuspendedResourcesHolder結(jié)構(gòu)與TransactionSynchronizationManager相同,用于解掛時(shí)恢復(fù)TransactionSynchronizationManager中的本地線程變量。
7、調(diào)用prepareSynchronization方法,初始化當(dāng)前線程的事務(wù)同步管理器,設(shè)置Threadlocal相關(guān)內(nèi)容,并反回新的TransactionStatus對(duì)象。
以下為事務(wù)提交后的操作
8、調(diào)用commit方法提交事務(wù)。這里會(huì)調(diào)用processCommit方法,在這個(gè)方法中會(huì)調(diào)用事務(wù)事件監(jiān)聽邏輯。通過ApplicationListenerMethodTransactionalAdapter處理各個(gè)不同階段的transactionEvent,需要注意的是待處理的transactionEvent是從TransactionSynchronizationManager.getSynchronizations()當(dāng)前的本地線程變量中獲取的。
9、cleanupAfterCompletion設(shè)置事務(wù)狀態(tài)為完成,清理當(dāng)前線程TransactionSynchronizationManager資源,解綁connection資源,設(shè)置autocommit=true。還原connection屬性,回并且把連接歸還給連接池。
10、調(diào)用resume()方法還原掛起的資源,繼續(xù)執(zhí)行。
[package:spring-jdbc]DataSourceTransactionManager
2、調(diào)用doGetTransaction() 獲取事務(wù)對(duì)象DataSourceTransactionObject
3、檢索綁定到當(dāng)前線程(TransactionSynchronizationManager)的資源(ConnectionHolder),把ConnectionHolder放入DataSourceTransactionObject中
6、調(diào)用dobegin開啟事務(wù)con.setAutoCommit(false);并且修改transactionActive為true。如果連接資源為空則獲取新的連接,并且在TransactionSynchronizationManager進(jìn)行資源綁定。
8.1、調(diào)用doCommit提交事務(wù)