Kafka/RocketMQ事務(wù)消息對(duì)比

一、Kafka事務(wù)消息

Kafka的事務(wù)概念類似于數(shù)據(jù)庫提供的事務(wù),即經(jīng)典的ACID,原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。保證多條消息原子性地寫入到目標(biāo)分區(qū),同時(shí)也能保證Consumer只能看到事務(wù)成功提交的消息。

事務(wù)消息實(shí)現(xiàn)

設(shè)置事務(wù)型Producer,需要滿足兩個(gè)要求:

  • 開啟enable.idempotence = true
  • 設(shè)置Producer端參數(shù)transctional. id。最好為其設(shè)置一個(gè)有意義的名字。
producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch (KafkaException e) {
  producer.abortTransaction();
}

和普通Producer代碼相比,事務(wù)型Producer的顯著特點(diǎn)是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。

這段代碼能夠保證Record1和Record2被當(dāng)作一個(gè)事務(wù)統(tǒng)一提交到Kafka,要么它們?nèi)刻峤怀晒?,要么全部寫入失敗。?shí)際上即使寫入失敗,Kafka也會(huì)把它們寫入到底層的日志中,也就是說Consumer還是會(huì)看到這些消息。因此在Consumer端,讀取事務(wù)型Producer發(fā)送的消息需要設(shè)置isolation.level參數(shù)的值。當(dāng)前這個(gè)參數(shù)有兩個(gè)取值:

  • read_uncommitted:這是默認(rèn)值,表明Consumer能夠讀取到Kafka寫入的任何消息,不論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。很顯然,如果你用了事務(wù)型Producer,那么對(duì)應(yīng)的Consumer就不要使用這個(gè)值。
  • read_committed:表明Consumer只會(huì)讀取事務(wù)型Producer成功提交事務(wù)寫入的消息。當(dāng)然了,它也能看到非事務(wù)型Producer寫入的所有消息。

二、RocketMQ事務(wù)消息

RocketMQ事務(wù)不同與Kafka事務(wù),它是基于2PC的方案實(shí)現(xiàn)的分布式事務(wù),分兩階段提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來處理二階段超時(shí)或者失敗的消息。


事務(wù)消息流程.png

1. 事務(wù)消息示例

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
    final ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    TransactionMQProducer producer = new TransactionMQProducer("myGroup");
    producer.setExecutorService(executorService);
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 執(zhí)行本地事務(wù)
            try {
                // 執(zhí)行本地事務(wù)創(chuàng)建訂單
                executeLocal(arg);  // TODO
                // 如果沒拋異常說明執(zhí)行成功,提交事務(wù)消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Throwable t) {
                // 失敗則直接回滾事務(wù)消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 回查本地事務(wù)結(jié)果
            // 查詢本地事務(wù)執(zhí)行結(jié)果,若存在則提交事務(wù) COMMIT_MESSAGE
            // 若不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回 UNKNOW
            boolean isLocalSuss = checkLocal(msg.getUserProperty("xxx"));  // TODO
            return isLocalSuss ? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
        }
    });
    producer.start();


    Message msg =
            new Message("TopicTest1234", "*", "KEY",
                    "Hello Transaction Msg".getBytes(RemotingHelper.DEFAULT_CHARSET));
    Object arg = getLocalTransactionParam();  // TODO
    SendResult sendResult = producer.sendMessageInTransaction(msg, arg);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
        // TODO
    }
}

主要涉及幾個(gè)類:

  • TransactionMQProducer – 事務(wù)消息生產(chǎn)者,主要實(shí)現(xiàn)事務(wù)消息發(fā)送;
  • TransactionListener – 事務(wù)監(jiān)聽器,主要實(shí)現(xiàn)本地事務(wù)執(zhí)行及事務(wù)狀態(tài)回查;
  • ExecutorService – 事務(wù)回查線程池,用于回查事務(wù)執(zhí)行狀態(tài);

2. 事務(wù)消息發(fā)送

public TransactionSendResult sendMessageInTransaction(final Message msg,
        final TransactionListener tranExecuter, final Object arg) throws MQClientException {
    // 1. 預(yù)處理,在擴(kuò)展字段中設(shè)置消息類型, TRAN_MSG:當(dāng)前時(shí)事務(wù)half消息 / PGROUP:生產(chǎn)者組名
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    // 2. 發(fā)送事務(wù)消息,跟發(fā)送普通消息一致
    sendResult = this.send(msg);
    // 3. 回調(diào)用戶自定義代碼,執(zhí)行本地事務(wù)
    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
    // 4. 結(jié)束事務(wù),提交或回滾
    this.endTransaction(sendResult, localTransactionState, localException);
}

這里羅列了發(fā)送事務(wù)消息的主要流程:
1. 對(duì)消息添加屬性TRAN_MSG=true標(biāo)識(shí)消息為事務(wù)消息;
2. 發(fā)送事務(wù)消息,跟處理普通消息相比唯一的區(qū)別是在構(gòu)造發(fā)送到Broker的SendMessageRequestHeader時(shí)需要設(shè)置sysFlag;

# DefaultMQProducerImpl.sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
requestHeader.setSysFlag(sysFlag);

3. 回調(diào)事務(wù)監(jiān)聽器TransactionListener用戶自定義執(zhí)行本地事務(wù)的方法;
4. 結(jié)束事務(wù),根據(jù) half 消息的狀態(tài)和本地事務(wù)執(zhí)行結(jié)果決定第二階段提交還是回滾;

3. 事務(wù)消息存儲(chǔ)

# SendMessageProcessor.sendMessage
// 通過擴(kuò)展字段 TRAN_MSG 進(jìn)行判斷是否事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    // 事務(wù)消息,先prepareMessage
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    // 非事務(wù)消息,直接putMessage
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
  • 根據(jù) TRAN_MSG 屬性判斷是否為事務(wù)消息;
  • 再判斷rejectTransactionMessage(默認(rèn)為false支持事務(wù)消息)是否支持事務(wù)消息;
  • 事務(wù)消息執(zhí)行prepare消息存儲(chǔ)方法prepareMessage,否則按照普通消息進(jìn)行處理;
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 將原消息的 Topic queueId sysFlag 存儲(chǔ)在消息的擴(kuò)展字段中,并且
    // 修改Topic 為RMQ_SYS_TRANS_HALF_TOPIC, queueId 為 0
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}
  • parseHalfMessageInner對(duì)事務(wù)消息進(jìn)行了主題更換操作,備份了原先的topic、隊(duì)列id之后,將事務(wù)消息的topic統(tǒng)一更換為 RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列id統(tǒng)一更換為0;
  • 通過store.putMessage對(duì)消息進(jìn)行了存儲(chǔ),這里事務(wù)消息最終落盤其實(shí)還是按照普通消息的方式落盤,區(qū)別只是對(duì)topic和隊(duì)列id進(jìn)行了變換,以便該事務(wù)消息在提交之前不會(huì)被消費(fèi)者消費(fèi)到;

4. 事務(wù)消息提交/回滾

在 事務(wù)消息發(fā)送 結(jié)束事務(wù) endTransaction 中會(huì)發(fā)送結(jié)束事務(wù)的請(qǐng)求到Broker

this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());

因?yàn)橛谢夭闄C(jī)制的保證,所以這里是Oneway的方式發(fā)送

Broker 端通過 EndTransactionProcessor 對(duì)該請(qǐng)求進(jìn)行處理(Broker啟動(dòng)時(shí)注冊(cè)了 EndTransactionProcessor)

this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);

NettyRemotingAbstract.processRequestCommand接收請(qǐng)求后通過注冊(cè)的code 找到該P(yáng)rocessor 進(jìn)行處理

final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 通過偏移量獲取原 prepareMessage
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 通過half消息,將原始消息還原 topic / consume queue等信息
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 將還原后的消息發(fā)送到 CommitLog中,之后消費(fèi)者就可以正常拉取并消費(fèi)
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 成功后刪除half消息,內(nèi)部實(shí)現(xiàn)為將prepare消息轉(zhuǎn)儲(chǔ)到RMQ_SYS_TRANS_OP_HALF TOPIC 主題中;標(biāo)識(shí)該消息已被處理,為事務(wù)回查提供依據(jù)
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 通過偏移量獲取原 prepareMessage
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 刪除半消息
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
  • Commit請(qǐng)求

1. commitMessage 實(shí)際就是通過偏移量獲取原 prepareMessage(這里不明白為什么方法命名叫commitMessage);
2. endMessageTransaction 從之前prepareMessage消息屬性中還原原始消息的 topic / queueId等信息;

// 從屬性中恢復(fù)消息的原topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 從屬性中恢復(fù)消息的原隊(duì)列id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

3. sendFinalMessage 將還原后的消息寫到 CommitLog中,之后消費(fèi)者就可以正常拉取并消費(fèi);
4. deletePrepareMessage 寫到 CommitLog成功后刪除half消息,這里是邏輯刪除,內(nèi)部實(shí)現(xiàn)為將half消息轉(zhuǎn)儲(chǔ)到RMQ_SYS_TRANS_OP_HALF TOPIC 主題中,表示該消息已被處理,后續(xù)通過該主題進(jìn)行事務(wù)回查;

private void writeOp(Message message, MessageQueue mq) {
    MessageQueue opQueue;
    if (opQueueMap.containsKey(mq)) {
        opQueue = opQueueMap.get(mq);
    } else {
        opQueue = getOpQueueByHalf(mq);
        MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
        if (oldQueue != null) {
            opQueue = oldQueue;
        }
    }
    if (opQueue == null) {
        // topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
        opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
    }
    // 構(gòu)造 RMQ_SYS_TRANS_OP_HALF_TOPIC 存到 CommitLog 中
    putMessage(makeOpMessageInner(message, opQueue));
}
  • Rollback請(qǐng)求

1. rollbackMessage 根據(jù)消息的物理偏移commitLogOffset獲取消息MessageExt;
2. deletePrepareMessage 將half半消息進(jìn)行刪除,實(shí)現(xiàn)方式與事務(wù)提交相同;

5. 事務(wù)消息回查

事務(wù)回查實(shí)現(xiàn)是通過線程TransactionalMessageCheckService實(shí)現(xiàn),TransactionalMessageCheckService也是在Broker啟動(dòng)時(shí)start;

public void run() {
    log.info("Start transaction check service thread!");
    // checkInterval為回查任務(wù)的間隔時(shí)間,默認(rèn)為60秒
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}
  • 回查時(shí)間間隔為60s一次,每次執(zhí)行的超時(shí)時(shí)間為3秒;最大回查次數(shù)為5次,超過最大回查次數(shù)則丟棄消息,相當(dāng)有對(duì)事務(wù)進(jìn)行了回滾。
public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
    ......
    String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
    // 獲取 RMQ_SYS_TRANS_HALF_TOPIC 半消息中的所有隊(duì)列
    Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
    log.info("Check topic={}, queues={}", topic, msgQueues);
    for (MessageQueue messageQueue : msgQueues) {
        while (true) {
            listener.resolveHalfMsg(msgExt);
        }
    }
    ......
}
  • 這里是控制回查的方法,其中控制是否需要回查的邏輯比較復(fù)雜,這里不做詳細(xì)分析,大概意思是遍歷 RMQ_SYS_TRANS_HALF_TOPIC ,通過跟 RMQ_SYS_TRANS_OP_HALF_TOPIC 的比對(duì)判斷哪些half消息需要進(jìn)行回查,再結(jié)合前面的回查次數(shù)/超時(shí)時(shí)間等條件來控制回查頻率;
public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}
  • 提交到executorService線程池(也是Broker啟動(dòng)時(shí)創(chuàng)建的)中進(jìn)行回查,內(nèi)部會(huì)組裝回查請(qǐng)求通過netty通信發(fā)送到Producer進(jìn)行回查;

Producer執(zhí)行回查

Producer通過ClientRemotingProcessor.checkTransactionState處理Broker發(fā)送的回查請(qǐng)求;

public void checkTransactionState(final String addr, final MessageExt msg,
    final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final MessageExt message = msg;
        @Override
        public void run() {
            // 用戶自定義監(jiān)聽器,示例代碼中的 TransactionListener
            TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            // 回調(diào)用戶自定義回查邏輯,示例代碼中的 checkLocalTransaction 方法
            localTransactionState = transactionCheckListener.checkLocalTransaction(message);
            // 回查結(jié)果構(gòu)造響應(yīng)對(duì)象返回 Broker
            this.processTransactionState(localTransactionState,group,exception);
        }

        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            // 通過endTransactionOneway將事務(wù)回查狀態(tài)發(fā)送給broker
            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                3000);
        }
    };
    // 提交到回查線程池(示例代碼中的ExecutorService )中執(zhí)行
    this.checkExecutor.submit(request);
}
  • 這里邏輯比較簡單,就是回調(diào)用戶自定義的 TransactionListener.checkLocalTransaction方法獲取本地事務(wù)的執(zhí)行狀態(tài),然后構(gòu)造響應(yīng)結(jié)果發(fā)送回Broker;

6. 小結(jié)

總結(jié)一下RMQ的事務(wù)消息過程就是兩階段提交 + 回查

  • 一階段:發(fā)送half消息;
  • 二階段:根據(jù)half消息發(fā)送結(jié)果以及本地事務(wù)執(zhí)行結(jié)果決定發(fā)送commit或rollback;
  • 回查:broker端通過定時(shí)任務(wù),默認(rèn)以1分鐘為回查頻率,對(duì)half消息存儲(chǔ)隊(duì)列(RMQ_SYS_TRANS_HALF_TOPIC)及半消息處理隊(duì)列(RMQ_SYS_TRANS_OP_HALF_TOPIC存儲(chǔ)已經(jīng)提交或者回滾的消息)中的消息進(jìn)行比較,對(duì)需要進(jìn)行回查的half消息發(fā)送給客戶端進(jìn)行回查;根據(jù)回查結(jié)果最終決定對(duì)半消息進(jìn)行commit/rollback操作。

三、總結(jié)

Kafka和RMQ的事務(wù)消息完全是兩個(gè)概念,Kafka事務(wù)是針對(duì)經(jīng)典的ACID本地事務(wù)(跟Mysql/Rides事務(wù)類似);而RMQ事務(wù)消息是對(duì)經(jīng)典的2PC分布式事務(wù)的實(shí)現(xiàn);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容