Spring事務攔截器為
org.springframework.transaction.interceptor.TransactionInterceptor
當Spring對函數(shù)進行事務攔截時,會調(diào)用到TransactionInterceptor的invoke函數(shù)。
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 在事務中調(diào)用函數(shù),使函數(shù)支持事務
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
invokeWithTransaction函數(shù)
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 數(shù)據(jù)準備
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 1.新建一個事務
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//2.執(zhí)行業(yè)務代碼
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 異常處理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {//
cleanupTransactionInfo(txInfo);
}
//3.提交事務
commitTransactionAfterReturning(txInfo);
return retVal;
}
//......其他代碼
}
整個事務處理過程中主要涉及到四個類
- TransactionAttribute
- PlatformTransactionManager
- TransactionStatus
- TransactionInfo
TransactionAttribute
public interface TransactionAttribute extends TransactionDefinition {
String getQualifier();
boolean rollbackOn(Throwable ex);
}
父類TransactionDefinition的定義
public interface TransactionDefinition {
int getIsolationLevel();
String getName();
int getPropagationBehavior();
boolean isReadOnly();
int getTimeout();
}
TransactionAttribute用于保存@Transactional注解上的屬性內(nèi)容。
PlatformTransactionManager
package org.springframework.transaction;
public interface PlatformTransactionManager {
//啟動一個新事務
TransactionStatus getTransaction(TransactionDefinition definition)
//提交事務
void commit(TransactionStatus status)
//回滾事務
void rollback(TransactionStatus status)
}
PlatformTransactionManager是事務的具體操作類,類似于JDBC的事務操作,使用PlatformTransactionManager進行事務處理的偽代碼如下:
PlatformTransactionManager pm;
TransactionDefinition definition
//啟動事務
TransactionStatus status=pm.getTransaction(definition);
try{
//處理業(yè)務邏輯
.....
//提交事務
pm.commit(status);
}catch(Throwable e){
//回滾事務
pm.rollback(status);
}
}
**TransactionStatus **
public interface TransactionStatus{
boolean isNewTransaction();
boolean hasSavepoint();
void setRollbackOnly();
boolean isRollbackOnly();
void flush();
boolean isCompleted();
}
TransactionStatus表征一次事務操作。
TransactionInfo
protected final class TransactionInfo {
private final PlatformTransactionManager transactionManager;
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
private TransactionStatus transactionStatus;
private TransactionInfo oldTransactionInfo;
}
TransactionInfo將上面的三個類的內(nèi)容融合在一起,即當前事務的相關(guān)信息,
oldTransactionInfo為父事務的相關(guān)信息。
Spring的事務的具體操作是PlatformTransactionManager,下面來具體說明PlatformTransactionManager得三個函數(shù)。
1. getTransaction
TransactionStatus org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(TransactionDefinition definition)
getTransaction函數(shù)的業(yè)務邏輯可簡化為
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//1.獲取當前的事務對象,如數(shù)據(jù)庫的connection對象
Object transaction = doGetTransaction();
//2.如果當前的事務對象可用,根據(jù)當前的傳遞方式,構(gòu)建TransactionStatus
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}//3.如果不存在,根據(jù)當前的傳遞方式,構(gòu)建TransactionStatus
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
return status;
}
}
doGetTransaction函數(shù)
以PlatformTransactionManager的實現(xiàn)之一DataSourceTransactionManager為例,
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager.getResource是從線程上下文獲取數(shù)據(jù)庫連接信息,如果存在,直接返回,否則返回null.
isExistingTransaction
判斷當前事務對象是否可用,在DataSourceTransactionManager中,只需要簡單判斷ConnectionHolder是否可用即可。
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
}
handleExistingTransaction
是處理事務傳遞的關(guān)鍵類,在handleExistingTransaction中會根據(jù)當前不同的事務傳遞方式創(chuàng)建不同的TransactionStatus
1.PROPAGATION_NEVER
當前方法不應在Transaction中運行,如果存在已經(jīng)定義的Transaction則拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
所以代碼中直接拋出異常
2.PROPAGATION_NOT_SUPPORTED
當前方法不應在Transaction中運行,如果存在已經(jīng)定義的Transaction,則該Transaction暫停(掛起)直至該方法運行完畢。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
前面介紹過transaction是從上下文中獲取的,suspend函數(shù)掛起transaction的主要作用是將transaction的ConnectionHolder和事務的相關(guān)信息從線程上下文移除,這樣在業(yè)務代碼中獲取數(shù)據(jù)庫連接時,就不能從線程上下文中獲取到父事務的連接。
DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources)
prepareTransactionStatus的作用有兩個
- 組裝DefaultTransactionStatus
- 如果newTransaction==true,更新當前線程上下文,保存當前事務的信息,如事務隔離級別,傳遞方式等。newTransaction還有另外一個作用:表示當前事務需不需要進行提交,回滾操作。
當傳遞方式為PROPAGATION_NOT_SUPPORTED時,prepareTransactionStatus函數(shù)newTransaction參數(shù)的值為false,不會更新線程上下文信息。業(yè)務代碼中獲取數(shù)據(jù)庫連接時會從數(shù)據(jù)源中獲取一個全新的connection使用。
3.Propagation.REQUIRES_NEW
當前方法必須在新開的Transaction中運行。如果存在已經(jīng)定義的Transaction,則該已定義的Transaction暫停直至新開的Transaction執(zhí)行完畢。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
操作分為3步:
- 掛起當前事務
- 從數(shù)據(jù)源中獲取一個新的連接,將connection的AutoCommit設置false.并將該connection綁定到當前線程(doBegin函數(shù)),以便業(yè)務代碼能獲取到該connection。
- 更新線程上下文關(guān)于當前事務的相關(guān)信息。
........
其他傳遞方式略
2.commit
commit 就比較簡單,主要做兩件事
- 如果當前事務是newTransaction,提交事務
- 恢復當前connection的屬性,釋放connection
- 恢復被掛起的事務
提交connection
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
//數(shù)據(jù)庫connection提交
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
恢復connection屬性,釋放connection
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
恢復被掛起的事務
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
3.rollback
與commit雷同,不做解釋。
其他
Spring的Transactional注解是如何在mybatis中生效的?
Spring通過控制mybatis獲取的connection來起作用。
mybatis的數(shù)據(jù)庫連接是從Mybatis的事務中獲取的
package org.apache.ibatis.executor;
public abstract class BaseExecutor{
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
}
mybatis的事務對象為:
org.apache.ibatis.transaction.Transaction
Transaction是有mybatis的事務工廠產(chǎn)生的
org.apache.ibatis.transaction.TransactionFactory
而mybatis默認的事務工廠為
org.mybatis.spring.transaction.SpringManagedTransactionFactory
見下面mybatis代碼
package org.mybatis.spring;
public class SqlSessionFactoryBean{
protected SqlSessionFactory buildSqlSessionFactory(){
//部分代碼略..........
if (this.transactionFactory == null) {
this.transactionFactory = new SpringManagedTransactionFactory();
//部分代碼略.............
}
}
}
SpringManagedTransactionFactory生產(chǎn)的事務的類為
org.mybatis.spring.transaction.SpringManagedTransaction
SpringManagedTransaction的getConnection會調(diào)用到openConnection函數(shù)。
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (logger.isDebugEnabled()) {
logger.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
DataSourceUtils.getConnection最終會調(diào)用到DataSourceUtils.doGetConnection函數(shù)
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(dataSource.getConnection());
}
return conHolder.getConnection();
}
// Else we either got no holder or an empty thread-bound holder here.
logger.debug("Fetching JDBC Connection from DataSource");
Connection con = dataSource.getConnection();
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
上面的代碼首先會從線程上下文中獲取數(shù)據(jù)庫連接,如果不能獲取到,就直接從數(shù)據(jù)源中獲取。