一、Kafka事務(wù)消息
Kafka的事務(wù)概念類似于數(shù)據(jù)庫提供的事務(wù),即經(jīng)典的ACID,原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。保證多條消息原子性地寫入到目標(biāo)分區(qū),同時(shí)也能保證Consumer只能看到事務(wù)成功提交的消息。
事務(wù)消息實(shí)現(xiàn)
設(shè)置事務(wù)型Producer,需要滿足兩個(gè)要求:
- 開啟enable.idempotence = true
- 設(shè)置Producer端參數(shù)transctional. id。最好為其設(shè)置一個(gè)有意義的名字。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通Producer代碼相比,事務(wù)型Producer的顯著特點(diǎn)是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
這段代碼能夠保證Record1和Record2被當(dāng)作一個(gè)事務(wù)統(tǒng)一提交到Kafka,要么它們?nèi)刻峤怀晒?,要么全部寫入失敗。?shí)際上即使寫入失敗,Kafka也會(huì)把它們寫入到底層的日志中,也就是說Consumer還是會(huì)看到這些消息。因此在Consumer端,讀取事務(wù)型Producer發(fā)送的消息需要設(shè)置isolation.level參數(shù)的值。當(dāng)前這個(gè)參數(shù)有兩個(gè)取值:
-
read_uncommitted:這是默認(rèn)值,表明Consumer能夠讀取到Kafka寫入的任何消息,不論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。很顯然,如果你用了事務(wù)型Producer,那么對(duì)應(yīng)的Consumer就不要使用這個(gè)值。 -
read_committed:表明Consumer只會(huì)讀取事務(wù)型Producer成功提交事務(wù)寫入的消息。當(dāng)然了,它也能看到非事務(wù)型Producer寫入的所有消息。
二、RocketMQ事務(wù)消息
RocketMQ事務(wù)不同與Kafka事務(wù),它是基于2PC的方案實(shí)現(xiàn)的分布式事務(wù),分兩階段提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來處理二階段超時(shí)或者失敗的消息。

1. 事務(wù)消息示例
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
final ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
TransactionMQProducer producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)
try {
// 執(zhí)行本地事務(wù)創(chuàng)建訂單
executeLocal(arg); // TODO
// 如果沒拋異常說明執(zhí)行成功,提交事務(wù)消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Throwable t) {
// 失敗則直接回滾事務(wù)消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事務(wù)結(jié)果
// 查詢本地事務(wù)執(zhí)行結(jié)果,若存在則提交事務(wù) COMMIT_MESSAGE
// 若不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回 UNKNOW
boolean isLocalSuss = checkLocal(msg.getUserProperty("xxx")); // TODO
return isLocalSuss ? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
}
});
producer.start();
Message msg =
new Message("TopicTest1234", "*", "KEY",
"Hello Transaction Msg".getBytes(RemotingHelper.DEFAULT_CHARSET));
Object arg = getLocalTransactionParam(); // TODO
SendResult sendResult = producer.sendMessageInTransaction(msg, arg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// TODO
}
}
主要涉及幾個(gè)類:
- TransactionMQProducer – 事務(wù)消息生產(chǎn)者,主要實(shí)現(xiàn)事務(wù)消息發(fā)送;
- TransactionListener – 事務(wù)監(jiān)聽器,主要實(shí)現(xiàn)本地事務(wù)執(zhí)行及事務(wù)狀態(tài)回查;
- ExecutorService – 事務(wù)回查線程池,用于回查事務(wù)執(zhí)行狀態(tài);
2. 事務(wù)消息發(fā)送
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg) throws MQClientException {
// 1. 預(yù)處理,在擴(kuò)展字段中設(shè)置消息類型, TRAN_MSG:當(dāng)前時(shí)事務(wù)half消息 / PGROUP:生產(chǎn)者組名
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 2. 發(fā)送事務(wù)消息,跟發(fā)送普通消息一致
sendResult = this.send(msg);
// 3. 回調(diào)用戶自定義代碼,執(zhí)行本地事務(wù)
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
// 4. 結(jié)束事務(wù),提交或回滾
this.endTransaction(sendResult, localTransactionState, localException);
}
這里羅列了發(fā)送事務(wù)消息的主要流程:
1. 對(duì)消息添加屬性TRAN_MSG=true標(biāo)識(shí)消息為事務(wù)消息;
2. 發(fā)送事務(wù)消息,跟處理普通消息相比唯一的區(qū)別是在構(gòu)造發(fā)送到Broker的SendMessageRequestHeader時(shí)需要設(shè)置sysFlag;
# DefaultMQProducerImpl.sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
requestHeader.setSysFlag(sysFlag);
3. 回調(diào)事務(wù)監(jiān)聽器TransactionListener用戶自定義執(zhí)行本地事務(wù)的方法;
4. 結(jié)束事務(wù),根據(jù) half 消息的狀態(tài)和本地事務(wù)執(zhí)行結(jié)果決定第二階段提交還是回滾;
3. 事務(wù)消息存儲(chǔ)
# SendMessageProcessor.sendMessage
// 通過擴(kuò)展字段 TRAN_MSG 進(jìn)行判斷是否事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 事務(wù)消息,先prepareMessage
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 非事務(wù)消息,直接putMessage
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
- 根據(jù) TRAN_MSG 屬性判斷是否為事務(wù)消息;
- 再判斷rejectTransactionMessage(默認(rèn)為false支持事務(wù)消息)是否支持事務(wù)消息;
- 事務(wù)消息執(zhí)行prepare消息存儲(chǔ)方法prepareMessage,否則按照普通消息進(jìn)行處理;
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 將原消息的 Topic queueId sysFlag 存儲(chǔ)在消息的擴(kuò)展字段中,并且
// 修改Topic 為RMQ_SYS_TRANS_HALF_TOPIC, queueId 為 0
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
-
parseHalfMessageInner對(duì)事務(wù)消息進(jìn)行了主題更換操作,備份了原先的topic、隊(duì)列id之后,將事務(wù)消息的topic統(tǒng)一更換為 RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列id統(tǒng)一更換為0; - 通過
store.putMessage對(duì)消息進(jìn)行了存儲(chǔ),這里事務(wù)消息最終落盤其實(shí)還是按照普通消息的方式落盤,區(qū)別只是對(duì)topic和隊(duì)列id進(jìn)行了變換,以便該事務(wù)消息在提交之前不會(huì)被消費(fèi)者消費(fèi)到;
4. 事務(wù)消息提交/回滾
在 事務(wù)消息發(fā)送 結(jié)束事務(wù) endTransaction 中會(huì)發(fā)送結(jié)束事務(wù)的請(qǐng)求到Broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
因?yàn)橛谢夭闄C(jī)制的保證,所以這里是Oneway的方式發(fā)送
Broker 端通過 EndTransactionProcessor 對(duì)該請(qǐng)求進(jìn)行處理(Broker啟動(dòng)時(shí)注冊(cè)了 EndTransactionProcessor)
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
NettyRemotingAbstract.processRequestCommand接收請(qǐng)求后通過注冊(cè)的code 找到該P(yáng)rocessor 進(jìn)行處理
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 通過偏移量獲取原 prepareMessage
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 通過half消息,將原始消息還原 topic / consume queue等信息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 將還原后的消息發(fā)送到 CommitLog中,之后消費(fèi)者就可以正常拉取并消費(fèi)
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 成功后刪除half消息,內(nèi)部實(shí)現(xiàn)為將prepare消息轉(zhuǎn)儲(chǔ)到RMQ_SYS_TRANS_OP_HALF TOPIC 主題中;標(biāo)識(shí)該消息已被處理,為事務(wù)回查提供依據(jù)
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 通過偏移量獲取原 prepareMessage
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 刪除半消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
- Commit請(qǐng)求
1. commitMessage 實(shí)際就是通過偏移量獲取原 prepareMessage(這里不明白為什么方法命名叫commitMessage);
2. endMessageTransaction 從之前prepareMessage消息屬性中還原原始消息的 topic / queueId等信息;
// 從屬性中恢復(fù)消息的原topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 從屬性中恢復(fù)消息的原隊(duì)列id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
3. sendFinalMessage 將還原后的消息寫到 CommitLog中,之后消費(fèi)者就可以正常拉取并消費(fèi);
4. deletePrepareMessage 寫到 CommitLog成功后刪除half消息,這里是邏輯刪除,內(nèi)部實(shí)現(xiàn)為將half消息轉(zhuǎn)儲(chǔ)到RMQ_SYS_TRANS_OP_HALF TOPIC 主題中,表示該消息已被處理,后續(xù)通過該主題進(jìn)行事務(wù)回查;
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
if (opQueueMap.containsKey(mq)) {
opQueue = opQueueMap.get(mq);
} else {
opQueue = getOpQueueByHalf(mq);
MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
if (oldQueue != null) {
opQueue = oldQueue;
}
}
if (opQueue == null) {
// topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
// 構(gòu)造 RMQ_SYS_TRANS_OP_HALF_TOPIC 存到 CommitLog 中
putMessage(makeOpMessageInner(message, opQueue));
}
- Rollback請(qǐng)求
1. rollbackMessage 根據(jù)消息的物理偏移commitLogOffset獲取消息MessageExt;
2. deletePrepareMessage 將half半消息進(jìn)行刪除,實(shí)現(xiàn)方式與事務(wù)提交相同;
5. 事務(wù)消息回查
事務(wù)回查實(shí)現(xiàn)是通過線程TransactionalMessageCheckService實(shí)現(xiàn),TransactionalMessageCheckService也是在Broker啟動(dòng)時(shí)start;
public void run() {
log.info("Start transaction check service thread!");
// checkInterval為回查任務(wù)的間隔時(shí)間,默認(rèn)為60秒
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
- 回查時(shí)間間隔為60s一次,每次執(zhí)行的超時(shí)時(shí)間為3秒;最大回查次數(shù)為5次,超過最大回查次數(shù)則丟棄消息,相當(dāng)有對(duì)事務(wù)進(jìn)行了回滾。
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
......
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
// 獲取 RMQ_SYS_TRANS_HALF_TOPIC 半消息中的所有隊(duì)列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
log.info("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
while (true) {
listener.resolveHalfMsg(msgExt);
}
}
......
}
- 這里是控制回查的方法,其中控制是否需要回查的邏輯比較復(fù)雜,這里不做詳細(xì)分析,大概意思是遍歷 RMQ_SYS_TRANS_HALF_TOPIC ,通過跟 RMQ_SYS_TRANS_OP_HALF_TOPIC 的比對(duì)判斷哪些half消息需要進(jìn)行回查,再結(jié)合前面的回查次數(shù)/超時(shí)時(shí)間等條件來控制回查頻率;
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
- 提交到executorService線程池(也是Broker啟動(dòng)時(shí)創(chuàng)建的)中進(jìn)行回查,內(nèi)部會(huì)組裝回查請(qǐng)求通過netty通信發(fā)送到Producer進(jìn)行回查;
Producer執(zhí)行回查
Producer通過ClientRemotingProcessor.checkTransactionState處理Broker發(fā)送的回查請(qǐng)求;
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final MessageExt message = msg;
@Override
public void run() {
// 用戶自定義監(jiān)聽器,示例代碼中的 TransactionListener
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 回調(diào)用戶自定義回查邏輯,示例代碼中的 checkLocalTransaction 方法
localTransactionState = transactionCheckListener.checkLocalTransaction(message);
// 回查結(jié)果構(gòu)造響應(yīng)對(duì)象返回 Broker
this.processTransactionState(localTransactionState,group,exception);
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
// 通過endTransactionOneway將事務(wù)回查狀態(tài)發(fā)送給broker
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
}
};
// 提交到回查線程池(示例代碼中的ExecutorService )中執(zhí)行
this.checkExecutor.submit(request);
}
- 這里邏輯比較簡單,就是回調(diào)用戶自定義的 TransactionListener.checkLocalTransaction方法獲取本地事務(wù)的執(zhí)行狀態(tài),然后構(gòu)造響應(yīng)結(jié)果發(fā)送回Broker;
6. 小結(jié)
總結(jié)一下RMQ的事務(wù)消息過程就是兩階段提交 + 回查
-
一階段:發(fā)送half消息; -
二階段:根據(jù)half消息發(fā)送結(jié)果以及本地事務(wù)執(zhí)行結(jié)果決定發(fā)送commit或rollback; -
回查:broker端通過定時(shí)任務(wù),默認(rèn)以1分鐘為回查頻率,對(duì)half消息存儲(chǔ)隊(duì)列(RMQ_SYS_TRANS_HALF_TOPIC)及半消息處理隊(duì)列(RMQ_SYS_TRANS_OP_HALF_TOPIC存儲(chǔ)已經(jīng)提交或者回滾的消息)中的消息進(jìn)行比較,對(duì)需要進(jìn)行回查的half消息發(fā)送給客戶端進(jìn)行回查;根據(jù)回查結(jié)果最終決定對(duì)半消息進(jìn)行commit/rollback操作。
三、總結(jié)
Kafka和RMQ的事務(wù)消息完全是兩個(gè)概念,Kafka事務(wù)是針對(duì)經(jīng)典的ACID本地事務(wù)(跟Mysql/Rides事務(wù)類似);而RMQ事務(wù)消息是對(duì)經(jīng)典的2PC分布式事務(wù)的實(shí)現(xiàn);