網(wǎng)站項目建設背景seo發(fā)外鏈的網(wǎng)站
文章目錄
- 1、消息整體處理過程
- Producer發(fā)送消息階段
- 手段一:提供SYNC的發(fā)送消息方式,等待broker處理結(jié)果。
- 手段二:發(fā)送消息如果失敗或者超時,則重新發(fā)送。
- 手段三:broker提供多master模式,即使某臺broker宕機了,保證消息可以投遞到另外一臺正常的broker上。
- Broker處理消息階段
- 手段四:提供同步刷盤的策略
- 手段五:提供主從模式,同時主從支持同步雙寫
- Consumer消費消息階段
- 手段六:consumer默認提供的是At least Once機制
- 手段七:消費消息重試機制
- 2、如何保證消息不被重復消費
- 所以第二個問題來了,怎么保證消息隊列消費的冪等性?
1、消息整體處理過程
這里我們將消息的整體處理階段分為3個階段進行分析:
- Producer發(fā)送消息階段。
- Broker處理消息階段。
- Consumer消費消息階段。
Producer發(fā)送消息階段
發(fā)送消息階段涉及到Producer到broker的網(wǎng)絡通信,因此丟失消息的幾率一定會有,那RocketMQ在此階段用了哪些手段保證消息不丟失了(或者說降低丟失的可能性)。
手段一:提供SYNC的發(fā)送消息方式,等待broker處理結(jié)果。
RocketMQ提供了3種發(fā)送消息方式,分別是:
- 同步發(fā)送:Producer 向 broker 發(fā)送消息,阻塞當前線程等待 broker 響應 發(fā)送結(jié)果。
- 異步發(fā)送:Producer 首先構(gòu)建一個向 broker 發(fā)送消息的任務,把該任務提交給線程池,等執(zhí)行完該任務時,回調(diào)用戶自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。
- Oneway發(fā)送:Oneway 方式只負責發(fā)送請求,不等待應答,Producer只負責把請求發(fā)出去,而不處理響應結(jié)果。
我們在調(diào)用producer.send方法時,不指定回調(diào)方法,則默認采用同步發(fā)送消息的方式,這也是丟失幾率最小的一種發(fā)送方式。
手段二:發(fā)送消息如果失敗或者超時,則重新發(fā)送。
發(fā)送重試源碼如下,本質(zhì)其實就是一個for循環(huán),當發(fā)送消息發(fā)生異常的時候重新循環(huán)發(fā)送。默認重試3次,重試次數(shù)可以通過producer指定。
手段三:broker提供多master模式,即使某臺broker宕機了,保證消息可以投遞到另外一臺正常的broker上。
如果broker只有一個節(jié)點,則broker宕機了,即使producer有重試機制,也沒用,因此利用多主模
式,當某臺broker宕機了,換一臺broker進行投遞。
總結(jié)
producer消息發(fā)送方式雖然有3種,但為了減小丟失消息的可能性盡量采用同步的發(fā)送方式,同步等待發(fā)送結(jié)果,利用同步發(fā)送+重試機制+多個master節(jié)點,盡可能減小消息丟失的可能性。
Broker處理消息階段
手段四:提供同步刷盤的策略
public enum FlushDiskType {
SYNC_FLUSH, //同步刷盤
ASYNC_FLUSH//異步刷盤(默認)
}
我們知道,當消息投遞到broker之后,會先存到page cache,然后根據(jù)broker設置的刷盤策略是否立即刷盤,也就是如果刷盤策略為異步,broker并不會等待消息落盤就會返回producer成功,也就是說當broker所在的服務器突然宕機,則會丟失部分頁的消息。
手段五:提供主從模式,同時主從支持同步雙寫
即使broker設置了同步刷盤,如果主broker磁盤損壞,也是會導致消息丟失。
因此可以給broker指定slave,同時設置master為SYNC_MASTER,然后將slave設置為同步刷盤策略。
此模式下,producer每發(fā)送一條消息,都會等消息投遞到master和slave都落盤成功了,broker才會當作消息投遞成功,保證休息不丟失。
總結(jié)
在broker端,消息丟失的可能性主要在于刷盤策略和同步機制。
RocketMQ默認broker的刷盤策略為異步刷盤,如果有主從,同步策略也默認的是異步同步,這樣子可以提高broker處理消息的效率,但是會有丟失的可能性。因此可以通過同步刷盤策略+同步slave策略+主從的方式解決丟失消息的可能。
Consumer消費消息階段
手段六:consumer默認提供的是At least Once機制
從producer投遞消息到broker,即使前面這些過程保證了消息正常持久化,但如果consumer消費消息沒有消費到也不能理解為消息絕對的可靠。因此RockerMQ默認提供了At least Once機制保證消息可靠消費。
何為At least Once?
Consumer先pull 消息到本地,消費完成后,才向服務器返回ack。
通常消費消息的ack機制一般分為兩種思路:
- 先提交后消費;
- 先消費,消費成功后再提交;
思路一可以解決重復消費的問題但是會丟失消息,因此Rocketmq默認實現(xiàn)的是思路二,由各自consumer業(yè)務方保證冪等來解決重復消費問題。
手段七:消費消息重試機制
當消費消息失敗了,如果不提供重試消息的能力,則也不能算完全的可靠消費,因此RocketMQ本身提供了重新消費消息的能力。
總結(jié)
consumer端要保證消費消息的可靠性,主要通過At least Once+消費重試機制保證。
2、如何保證消息不被重復消費
回答這個問題,首先你別聽到重復消息這個事兒,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題。
首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能會出現(xiàn)消息重復消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發(fā)來保證的。挑一個 Kafka 來舉個例子,說說怎么重復消費吧。
Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,代表消息的序號,然后 consumer 消費了數(shù)據(jù)之后,每隔一段時間(定時定期),會把自己消費過的消息的 offset 提交一下,表示“我已經(jīng)消費過了,下次我要是重啟啥的,你就讓我繼續(xù)從上次消費到的 offset 來繼續(xù)消費吧”。
但是凡事總有意外,比如我們之前生產(chǎn)經(jīng)常遇到的,就是你有時候重啟系統(tǒng),看你怎么重啟了,如果碰到點著急的,直接 kill 進程了,再重啟。這會導致 consumer 有些消息處理了,但是沒來得及提交 offset,尷尬了。重啟之后,少數(shù)消息會再次消費一次。
有這么個場景。數(shù)據(jù) 1/2/3 依次進入 kafka,kafka 會給這三條數(shù)據(jù)每條分配一個 offset,代表這條數(shù)據(jù)的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka 去消費的時候,也是按照這個順序去消費。假如當消費者消費了 offset=153 的這條數(shù)據(jù),剛準備去提交 offset 到 zookeeper,此時消費者進程被重啟了。
那么此時消費過的數(shù)據(jù) 1/2 的 offset 并沒有提交,kafka 也就不知道你已經(jīng)消費了 offset=153 這條數(shù)據(jù)。那么重啟之后,消費者會找 kafka 說,嘿,哥兒們,你給我接著把上次我消費到的那個地方后面的數(shù)據(jù)繼續(xù)給我傳遞過來。由于之前的 offset 沒有提交成功,那么數(shù)據(jù) 1/2 會再次傳過來,如果此時消費者沒有去重的話,那么就會導致重復消費。
如果消費者干的事兒是拿一條數(shù)據(jù)就往數(shù)據(jù)庫里寫一條,會導致說,你可能就把數(shù)據(jù) 1/2 在數(shù)據(jù)庫里插入了 2 次,那么數(shù)據(jù)就錯啦。
其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。
舉個例子吧。假設你有個系統(tǒng),消費一條消息就往數(shù)據(jù)庫里插入一條數(shù)據(jù),要是你一個消息重復兩次,你不就插入了兩條,這數(shù)據(jù)不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經(jīng)消費過了,若是就直接扔了,這樣不就保留了一條數(shù)據(jù),從而保證了數(shù)據(jù)的正確性。
一條數(shù)據(jù)重復出現(xiàn)兩次,數(shù)據(jù)庫里就只有一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。
冪等性,通俗點說,就一個數(shù)據(jù),或者一個請求,給你重復來多次,你得確保對應的數(shù)據(jù)是不會改變的,不能出錯。
所以第二個問題來了,怎么保證消息隊列消費的冪等性?
其實還是得結(jié)合業(yè)務來思考,我這里給幾個思路:
比如你拿個數(shù)據(jù)要寫庫,你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
比如你不是上面兩個場景,那做的稍微復雜一點,你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費到了之后,先根據(jù)這個 id 去比如 Redis 里查一下,之前消費過嗎?如果沒有消費過,你就處理,然后這個 id 寫 Redis。如果消費過了,那你就別處理了,保證別重復處理相同的消息即可。
比如基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會重復插入多條。因為有唯一鍵約束了,重復數(shù)據(jù)插入只會報錯,不會導致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。