南昌自助建站模板今天上海最新新聞事件
事務(wù)消息為 Apache RocketMQ 中的高級特性消息,本文為您介紹事務(wù)消息的應(yīng)用場景、功能原理、使用限制、使用方法和使用建議。
事務(wù)消息為 Apache RocketMQ 中的高級特性消息,本文為您介紹事務(wù)消息的應(yīng)用場景、功能原理、使用限制、使用方法和使用建議。
以電商交易場景為例,用戶支付訂單這一核心操作的同時會涉及到下游物流發(fā)貨、積分變更、購物車狀態(tài)清空等多個子系統(tǒng)的變更。當前業(yè)務(wù)的處理分支包括:
-
主分支訂單系統(tǒng)狀態(tài)更新:由未支付變更為支付成功。
-
物流系統(tǒng)狀態(tài)新增:新增待發(fā)貨物流記錄,創(chuàng)建訂單物流記錄。
-
積分系統(tǒng)狀態(tài)變更:變更用戶積分,更新用戶積分表。
-
購物車系統(tǒng)狀態(tài)變更:清空購物車,更新用戶購物車記錄。
傳統(tǒng)XA事務(wù)方案:性能不足
為了保證上述四個分支的執(zhí)行結(jié)果一致性,典型方案是基于XA協(xié)議的分布式事務(wù)系統(tǒng)來實現(xiàn)。將四個調(diào)用分支封裝成包含四個獨立事務(wù)分支的大事務(wù)。基于XA分布式事務(wù)的方案可以滿足業(yè)務(wù)處理結(jié)果的正確性,但最大的缺點是多分支環(huán)境下資源鎖定范圍大,并發(fā)度低,隨著下游分支的增加,系統(tǒng)性能會越來越差。
基于普通消息方案:一致性保障困難
將上述基于XA事務(wù)的方案進行簡化,將訂單系統(tǒng)變更作為本地事務(wù),剩下的系統(tǒng)變更作為普通消息的下游來執(zhí)行,事務(wù)分支簡化成普通消息+訂單表事務(wù),充分利用消息異步化的能力縮短鏈路,提高并發(fā)度。
該方案中消息下游分支和訂單系統(tǒng)變更的主分支很容易出現(xiàn)不一致的現(xiàn)象,例如:
-
消息發(fā)送成功,訂單沒有執(zhí)行成功,需要回滾整個事務(wù)。
-
訂單執(zhí)行成功,消息沒有發(fā)送成功,需要額外補償才能發(fā)現(xiàn)不一致。
-
消息發(fā)送超時未知,此時無法判斷需要回滾訂單還是提交訂單變更。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?基于分布式事務(wù)消息:支持最終一致性
上述普通消息方案中,普通消息和訂單事務(wù)無法保證一致的原因,本質(zhì)上是由于普通消息無法像單機數(shù)據(jù)庫事務(wù)一樣,具備提交、回滾和統(tǒng)一協(xié)調(diào)的能力。
而基于Apache RocketMQ實現(xiàn)的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實現(xiàn)全局提交結(jié)果的一致性。
Apache RocketMQ事務(wù)消息的方案,具備高性能、可擴展、業(yè)務(wù)開發(fā)簡單的優(yōu)勢。具體事務(wù)消息的原理和流程,請參見下文的功能原理。
功能原理?
什么是事務(wù)消息
事務(wù)消息是 Apache RocketMQ 提供的一種高級消息類型,支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。
事務(wù)消息處理流程
事務(wù)消息交互流程如下圖所示。
-
生產(chǎn)者將消息發(fā)送至Apache RocketMQ服務(wù)端。
-
Apache RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認消息已經(jīng)發(fā)送成功,此時消息被標記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息。
-
生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
-
生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認結(jié)果(Commit或是Rollback),服務(wù)端收到確認結(jié)果后處理邏輯如下:
-
二次確認結(jié)果為Commit:服務(wù)端將半事務(wù)消息標記為可投遞,并投遞給消費者。
-
二次確認結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
-
-
在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認結(jié)果,或服務(wù)端收到的二次確認結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。?說明?服務(wù)端回查的間隔時間和最大回查次數(shù),請參見參數(shù)限制。
-
生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
-
生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認,服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。
事務(wù)消息生命周期
-
初始化:半事務(wù)消息被生產(chǎn)者構(gòu)建并完成初始化,待發(fā)送到服務(wù)端的狀態(tài)。
-
事務(wù)待提交:半事務(wù)消息被發(fā)送到服務(wù)端,和普通消息不同,并不會直接被服務(wù)端持久化,而是會被單獨存儲到事務(wù)存儲系統(tǒng)中,等待第二階段本地事務(wù)返回執(zhí)行結(jié)果后再提交。此時消息對下游消費者不可見。
-
消息回滾:第二階段如果事務(wù)執(zhí)行結(jié)果明確為回滾,服務(wù)端會將半事務(wù)消息回滾,該事務(wù)消息流程終止。
-
提交待消費:第二階段如果事務(wù)執(zhí)行結(jié)果明確為提交,服務(wù)端會將半事務(wù)消息重新存儲到普通存儲系統(tǒng)中,此時消息對下游消費者可見,等待被消費者獲取并消費。
-
消費中:消息被消費者獲取,并按照消費者本地的業(yè)務(wù)邏輯進行處理的過程。 此時服務(wù)端會等待消費者完成消費并提交消費結(jié)果,如果一定時間后沒有收到消費者的響應(yīng),Apache RocketMQ會對消息進行重試處理。具體信息,請參見消費重試。
-
消費提交:消費者完成消費處理,并向服務(wù)端提交消費結(jié)果,服務(wù)端標記當前消息已經(jīng)被處理(包括消費成功和失敗)。 Apache RocketMQ默認支持保留所有消息,此時消息數(shù)據(jù)并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。
-
消息刪除:Apache RocketMQ按照消息保存機制滾動清理最早的消息數(shù)據(jù),將消息從物理文件中刪除。更多信息,請參見消息存儲和清理機制。
使用限制?
消息類型一致性
事務(wù)消息僅支持在 MessageType 為 Transaction 的主題內(nèi)使用,即事務(wù)消息只能發(fā)送至類型為事務(wù)消息的主題中,發(fā)送的消息的類型必須和主題的類型一致。
消費事務(wù)性
Apache RocketMQ 事務(wù)消息保證本地主分支事務(wù)和下游消息發(fā)送事務(wù)的一致性,但不保證消息消費結(jié)果和上游事務(wù)的一致性。因此需要下游業(yè)務(wù)分支自行保證消息正確處理,建議消費端做好消費重試,如果有短暫失敗可以利用重試機制保證最終處理成功。
中間狀態(tài)可見性
Apache RocketMQ 事務(wù)消息為最終一致性,即在消息提交到下游消費端處理完成之前,下游分支和上游事務(wù)之間的狀態(tài)會不一致。因此,事務(wù)消息僅適合接受異步執(zhí)行的事務(wù)場景。
事務(wù)超時機制
Apache RocketMQ 事務(wù)消息的命周期存在超時機制,即半事務(wù)消息被生產(chǎn)者發(fā)送服務(wù)端后,如果在指定時間內(nèi)服務(wù)端無法確認提交或者回滾狀態(tài),則消息默認會被回滾。事務(wù)超時時間,請參見參數(shù)限制。
使用示例?
創(chuàng)建主題
Apache RocketMQ 5.0版本下創(chuàng)建主題操作,推薦使用mqadmin工具,需要注意的是,對于消息類型需要通過屬性參數(shù)添加。示例如下:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction
發(fā)送消息
事務(wù)消息相比普通消息發(fā)送時需要修改以下幾點:
-
發(fā)送事務(wù)消息前,需要開啟事務(wù)并關(guān)聯(lián)本地的事務(wù)執(zhí)行。
-
為保證事務(wù)一致性,在構(gòu)建生產(chǎn)者時,必須設(shè)置事務(wù)檢查器和預(yù)綁定事務(wù)消息發(fā)送的主題列表,客戶端內(nèi)置的事務(wù)檢查器會對綁定的事務(wù)主題做異常狀態(tài)恢復。
創(chuàng)建事務(wù)主題
NORMAL類型Topic不支持TRANSACTION類型消息,生產(chǎn)消息會報錯。
./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
- -c 集群名稱
- -t Topic名稱
- -n nameserver地址
- -a 額外屬性,本例給主題添加了
message.type
為TRANSACTION
的屬性用來支持事務(wù)消息
?以Java語言為例,使用事務(wù)消息示例參考如下:
//演示demo,模擬訂單表查詢服務(wù),用來確認訂單事務(wù)是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模擬本地事務(wù)的執(zhí)行結(jié)果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilder();//構(gòu)造事務(wù)生產(chǎn)者:事務(wù)消息需要生產(chǎn)者構(gòu)建一個事務(wù)檢查器,用于檢查確認異常半事務(wù)的中間狀態(tài)。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事務(wù)檢查器一般是根據(jù)業(yè)務(wù)的ID去檢查本地事務(wù)是否正確提交還是回滾,此處以訂單ID屬性為例。* 在訂單表找到了這個訂單,說明本地事務(wù)插入訂單的操作已經(jīng)正確提交;如果訂單表沒有訂單,說明本地事務(wù)已經(jīng)回滾。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 錯誤的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//開啟事務(wù)分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事務(wù)分支開啟失敗,直接退出。return;}Message message = messageBuilder.setTopic("topic")//設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。.setKeys("messageKey")//設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。.setTag("messageTag")//一般事務(wù)消息都會設(shè)置一個本地事務(wù)關(guān)聯(lián)的唯一ID,用來做本地事務(wù)回查的校驗。.addProperty("OrderId", "xxx")//消息體。.setBody("messageBody".getBytes()).build();//發(fā)送半事務(wù)消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事務(wù)消息發(fā)送失敗,事務(wù)可以直接退出并回滾。return;}/*** 執(zhí)行本地事務(wù),并確定本地事務(wù)結(jié)果。* 1. 如果本地事務(wù)提交成功,則提交消息事務(wù)。* 2. 如果本地事務(wù)提交失敗,則回滾消息事務(wù)。* 3. 如果本地事務(wù)未知異常,則不處理,等待事務(wù)消息回查。**/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 業(yè)務(wù)可以自身對實時性的要求選擇是否重試,如果放棄重試,可以依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建議記錄異常信息,回滾異常時可以無需重試,依賴事務(wù)消息回查機制進行事務(wù)狀態(tài)的提交。e.printStackTrace();}}}
使用建議?
避免大量未決事務(wù)導致超時
Apache RocketMQ支持在事務(wù)提交階段異常的情況下發(fā)起事務(wù)回查,保證事務(wù)一致性。但生產(chǎn)者應(yīng)該盡量避免本地事務(wù)返回未知結(jié)果。大量的事務(wù)檢查會導致系統(tǒng)性能受損,容易導致事務(wù)處理延遲。
正確處理"進行中"的事務(wù)
消息回查時,對于正在進行中的事務(wù)不要返回Rollback或Commit結(jié)果,應(yīng)繼續(xù)保持Unknown的狀態(tài)。 一般出現(xiàn)消息回查時事務(wù)正在處理的原因為:事務(wù)執(zhí)行較慢,消息回查太快。解決方案如下:
-
將第一次事務(wù)回查時間設(shè)置較大一些,但可能導致依賴回查的事務(wù)提交延遲較大。
-
程序能正確識別正在進行中的事務(wù)。