哪些網(wǎng)站容易收錄阿里巴巴國際站關(guān)鍵詞推廣
1.?問題引發(fā)
由某個服務(wù)BI-collector-xx隊列出現(xiàn)阻塞,影響很整個rabbitMQ集群服務(wù)不可用,多個應(yīng)用MQ生產(chǎn)者服務(wù)出現(xiàn)假死狀態(tài),系統(tǒng)影響面較廣,業(yè)務(wù)影響很大。當(dāng)時為了應(yīng)急處理,恢復(fù)系統(tǒng)可用,運(yùn)維相對粗暴的把一堆阻塞隊列信息清空,然后重啟整個集群。
在復(fù)盤整個故障過程中,我心中有不少疑惑,至少存在以下幾個問題點(diǎn):
- 為什么出現(xiàn)隊列阻塞?
- 某個隊列出現(xiàn)阻塞為什么會影響到其他隊列的運(yùn)行(即多隊列間相互影響)?
- 某個應(yīng)用MQ隊列出現(xiàn)問題,為什么會導(dǎo)致應(yīng)用不可用呢?
2.?試驗(yàn)隊列阻塞
某天周末在家里,找個測試環(huán)境,安裝rabbitmq嘗試重現(xiàn)這過程,并做模擬測試。
寫兩個測試應(yīng)用Demo(假設(shè)是兩個項目應(yīng)用)分別有生產(chǎn)者和消費(fèi)者,并分別使用隊列testA和testB。
為了盡可能還原生產(chǎn)的情況,一開始測試使用了同一個vhost,后面分別設(shè)置不同vhost。
生產(chǎn)者A,示例代碼如下
消費(fèi)者A
MQ配置
生產(chǎn)者B,每次生產(chǎn)10萬條消息
?
消費(fèi)者B,代碼故意寫錯(模擬出現(xiàn)異常的情況),不是正常的json串導(dǎo)致解釋json時拋出異常
先了解一下Rabbitmq客戶端啟動連接工作過程,通過wireshark抓包分析,如下
?
先對AMQP做一個簡單的介紹,請求的AMQP協(xié)議方法信息,AMQP協(xié)議方法包含類名+方法名+參數(shù),這一列主要展示了類名和方法名
Connection.Start
:請求服務(wù)端開始建立連接Channel.Open
:
請求服務(wù)端建立信道Queue.Declare
:
聲明隊列Basic.Consume
:
開始一個消費(fèi)者,請求指定隊列的消息
詳細(xì)方法可以查看amqp
官網(wǎng)https://www.rabbitmq.com/amqp-0-9-1-reference.html
工作過程分析:
Basic.Publish
:
?客戶端發(fā)送Basic.Publish
方法請求,將消息發(fā)布到exchange
,
rabbitmq server
會根據(jù)路由規(guī)則轉(zhuǎn)發(fā)到隊列中;
Basic.Deliver
:
?服務(wù)端發(fā)送Basic.Deliver
方法請求,投遞消息到監(jiān)聽隊列的客戶端消費(fèi)者;
Basic.Ack
:
?客戶端發(fā)送Basic.Ack
方法請求,告知rabbimq server,消息已接收處理。
兩個應(yīng)用程序啟動后,通過rabbitmq管理控制臺可以觀察一些參數(shù)和監(jiān)控指標(biāo)
?
?
一開始A應(yīng)用生產(chǎn)和消費(fèi)都是正常的。
B消費(fèi)端錯誤代碼異常,狂刷報錯信息
?
經(jīng)過大概30分鐘運(yùn)行,觀察A生產(chǎn)者應(yīng)用控制臺也有出現(xiàn)異常信息
?
查看服務(wù)端連接狀態(tài)出現(xiàn)blocked情況,與生產(chǎn)故障發(fā)生情景很類似。
?
此時客戶端即本機(jī)器,CPU和內(nèi)存上漲明顯,風(fēng)扇聲音很響,明顯卡頓,再過30分鐘應(yīng)用基本不可用狀態(tài)。
分析原因
上面錯誤代碼展示了消費(fèi)者B無法ack,由于沒有進(jìn)行ack導(dǎo)致隊里阻塞。那么問題來了,這是為什么呢?其實(shí)這是RabbitMQ的一種保護(hù)機(jī)制。防止當(dāng)消息激增的時候,海量的消息進(jìn)入consumer而引發(fā)consumer宕機(jī)。
?RabbitMQ提供了一種QOS(服務(wù)質(zhì)量保證)功能,即在非自動確認(rèn)的消息的前提下,限制信道上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量??梢酝ㄟ^設(shè)置prefetchCount實(shí)現(xiàn),自動確認(rèn)prefetchCount設(shè)置無效。
舉例說明:可以理解為在consumer前面加了一個緩沖容器,容器能容納最大的消息數(shù)量就是PrefetchCount。如果容器沒有滿RabbitMQ就會將消息投遞到容器內(nèi),如果滿了就不投遞了。當(dāng)consumer對消息進(jìn)行ack以后就會將此消息移除,從而放入新的消息。
通過上面的配置發(fā)現(xiàn)prefetch初始我只配置了2,并且concurrency配置的只有1,所以當(dāng)我發(fā)送了2條錯誤消息以后,由于解析失敗這2條消息一直沒有被ack。將緩沖區(qū)沾滿了,這個時候RabbitMQ認(rèn)為這個consumer已經(jīng)沒有消費(fèi)能力了就不繼續(xù)給它推送消息了,所以就造成了隊列阻塞。
判斷隊列是否有阻塞的風(fēng)險。
??當(dāng)ack
模式為manual
,并且線上出現(xiàn)了unacked
消息,這個時候不用慌。由于QOS是限制信道channel
上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量。所以允許出現(xiàn)unacked
的數(shù)量可以通過channelCount * prefetchCount *
消費(fèi)節(jié)點(diǎn)數(shù)量
得出。
channlCount
就是由concurrency,max-concurrency
決定的。
min = concurrency * prefetch *
消費(fèi)節(jié)點(diǎn)數(shù)量
max = max-concurrency * prefetch *
消費(fèi)節(jié)點(diǎn)數(shù)量
由此可以得出結(jié)論
unacked_msg_count < min
?隊列不會阻塞。但需要及時處理unacked
的消息。unacked_msg_count >= min
?可能會出現(xiàn)堵塞。unacked_msg_count >= max
?隊列一定阻塞。
重點(diǎn)注意
1
、
unacked
的消息在consumer
切斷連接后(如重啟)再連接,會自動回到隊頭。
2、若將ack
模式改成auto
自動,這樣會使QOS不生效。會出現(xiàn)大量消息涌入consumer
從而可能造成consumer
宕機(jī)風(fēng)險。
再回看程序配置,做一些分析和調(diào)整
對B消費(fèi)端問題代碼加個try-catch-finally
,不管中間有何問題,都進(jìn)行消息簽收ACK。
?
代碼調(diào)整之后,兩個隊列正常運(yùn)行,客戶端兩個應(yīng)用也正常運(yùn)行。
?
?
經(jīng)過一段時間消費(fèi),B消費(fèi)者端已經(jīng)把堆積的消息消費(fèi)完了。
?
3、????第三個問題原因分析
還是查看抓包信息
Basic.Reject
: 客戶端發(fā)送Basic.Reject方法請求,表示無法處理消息,拒絕消息,此時的requeue參數(shù)為true,將消息返回原來的隊列;
Basic.Deliver
: 服務(wù)端調(diào)用Basic.Deliver方法,和第一次Basic.Deliver方法不同的是,此時的redeliver參數(shù)為true,表示重新投遞消息到監(jiān)聽隊列的消費(fèi)者,然后這兩步會一直重復(fù)下去。
RabbitMQ消息監(jiān)聽程序異常時,consumer會向rabbitmq server發(fā)送Basic.Reject
,表示消息拒絕接受,由于Spring默認(rèn)requeue-rejected
配置為true
,消息會重新入隊,然后rabbitmq server重新投遞。就相當(dāng)于死循環(huán)了,所以容易導(dǎo)致消費(fèi)端資源占用過高,特別是TCP連接數(shù)、線程數(shù)、IO飆升,如果個別程序帶事務(wù)或數(shù)據(jù)庫操作等連接資源得不到釋放也會占滿,導(dǎo)致應(yīng)用假死狀態(tài)(出現(xiàn)問題的時候,查看問題應(yīng)用出現(xiàn)大量的connection timeout錯誤報錯日志)。
因此針對性的,有些業(yè)務(wù)場景(不強(qiáng)調(diào)數(shù)據(jù)強(qiáng)一致性的場景,比如日志收集)可以設(shè)置default-requeue-rejected: false
即可。
factory.setDefaultRequeueRejected(false);
??會根據(jù)異常類型選擇直接丟棄或加入dead-letter-exchange中。
消費(fèi)者端正確的使用手動確認(rèn)示例結(jié)構(gòu)代碼,很重要!
try {// 業(yè)務(wù)邏輯。
}catch (Exception e){// 輸出錯誤日志。
}finally {// 消息簽收。
}
4、????驗(yàn)證隊列設(shè)置最大長度限制
設(shè)置queueLengthLimit隊列最大長度限制 x-max-length=5
?
生產(chǎn)者原本想要生產(chǎn)10條消息
?
由于受到隊列最大長度限制,實(shí)際上只有5條入隊列里面。
?
消費(fèi)者拿出來的消息,僅有5條,從NO.6~NO.10
改變消費(fèi)者程序,讓生產(chǎn)者一直產(chǎn)生消息,消費(fèi)者消費(fèi)速度明顯趕不上生產(chǎn)者的生產(chǎn)速度。
?
?
從消費(fèi)端來看消息是隨機(jī)性入隊的,隊列里面一直最多5條消息,發(fā)再多也進(jìn)不了,消息者和生產(chǎn)者也不會發(fā)生什么異常,只是消息會隨機(jī)性丟失(并沒有全部入隊)。
運(yùn)行情況良好,除了消息沒有全部入隊列 ,沒有出現(xiàn)異常情況
?
消費(fèi)比較慢,本機(jī)器CPU和內(nèi)存各項指標(biāo)正常,沒有異常。
搞一個異常情況出現(xiàn)unack,最大隊列長度限制,是不算unack數(shù)量的,如下圖所示
?
異常之后,此觀察MQ監(jiān)控管理后臺
?
生產(chǎn)者不停一直在生產(chǎn)消息,運(yùn)行30分鐘,觀察生產(chǎn)者應(yīng)用也是正常的的,就是消息入不了隊列。
?
?
5、??檢查實(shí)際的業(yè)務(wù)端代碼
再看我們業(yè)務(wù)系統(tǒng)消費(fèi)端代碼,消費(fèi)端各種不規(guī)范寫法都有,以下例舉幾個典型
1、手動簽收有ACK,但是沒有try-catch-finally結(jié)構(gòu),消費(fèi)端業(yè)務(wù)代碼如下:
2、有try-catch-finally結(jié)構(gòu),但是deliverTag是一個固定值0,一樣的會出問題。
?
3、自動簽收確認(rèn)的,大量消息的時候,容易搞死消費(fèi)端應(yīng)用。
?
6.?總結(jié)
- 生產(chǎn)環(huán)境不建議使用自動ack模式,這樣會使QOS無法生效。
- 在使用手動ack的時候,需要非常注意消息簽收,業(yè)務(wù)代碼使用try-catch-finally處理結(jié)構(gòu),防止業(yè)務(wù)代碼異常時無法簽收。
- 規(guī)范約束mq客戶端代碼,正確的使用Rabbitmq配置。
- 不同業(yè)務(wù)項目設(shè)置不同的vhost可以隔離一些影響,提升rabbitmq資源使用。
- 考慮設(shè)置dead-letter-exchange,當(dāng)設(shè)置了?requeue=false時,可以放入dead-letter-exchange,可以快速排查定位問題。
- Exchange和隊列的最大長度限制可以是限制消息的數(shù)量(參數(shù):x-max-length),或者是消息的總字節(jié)數(shù)(總字節(jié)數(shù)表示的是所有的消息體的字節(jié)數(shù),忽略消息的屬性和任何頭部信息),又或者兩者都進(jìn)行了限制,兩者取小值生效,只有處于ready狀態(tài)的消息被計數(shù),未被確認(rèn)的消息不會被計數(shù)受到limit的限制。最大隊列設(shè)置可以限制生產(chǎn)端,但會造成消息丟失風(fēng)險,最大消息數(shù)量限制,不能完全解決隊列阻塞問題。
- 盡量使用Direct-exchange,Direct 類型的 Exchange 投遞消息是最快的。
- Direct:處理路由鍵,需要將一個隊列綁定到交換機(jī)上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機(jī)上要求路由鍵為“A”,則只有路由鍵為“A”的消息才被轉(zhuǎn)發(fā),不會轉(zhuǎn)發(fā)路由鍵為"B",只會轉(zhuǎn)發(fā)路由鍵為“A”;
- Topic:將路由鍵和某模式進(jìn)行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞;
- Fanout:不處理路由鍵。只需要簡單的將隊列綁定到交換機(jī)上。一個發(fā)送到該類型交換機(jī)的消息都會被廣播到與該交換機(jī)綁定的所有隊列上;
- Headers:不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的 headers 屬性進(jìn)行匹配。在綁定 Queue 與 Exchange 時指定一組鍵值對;當(dāng)消息發(fā)送到 RabbitMQ 時會取到該消息的 headers 與 Exchange 綁定時指定的鍵值對進(jìn)行匹配;如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。