wordpress主題插件seo工程師
消息何去何從
mandatory和immediate是channel.basicPublish方法的兩個參數(shù),都有消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者的功能。
mandatory參數(shù)
- true:交換器無法根據(jù)自身的類型 和路由鍵找到符合條件的隊列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者
- 生產(chǎn)者調(diào)用channel.addReturnListener添加ReturnListener監(jiān)聽器實現(xiàn)
- false:消息直接丟棄
immediate參數(shù)
告訴服務(wù)器至少將該消息路由到一個隊列中,否則將消息返回給生產(chǎn)者。
Rabbitmq3.0去掉了對immediate參數(shù)支持,建議采用TTL和DLX方法替代
- true:如果交換器在將消息路由到隊列時發(fā)現(xiàn)隊列上并不存在任何消費者,這條消息將不會存入隊列中,當與路由鍵匹配的所有隊列都沒有消費者時,該消息會通過Basic.Return返回至生產(chǎn)者。
備份交換器(AE)
生產(chǎn)者發(fā)送消息不設(shè)置mandatory,消息未被路由會丟失,設(shè)置了,需要添加ReturnListener。如果不想編程復(fù)雜也不想消息丟失使用備份交換器。
使用:
- 聲明交換器時添加alternate-exchange參數(shù)實現(xiàn)
- channel.exchangeDeclare(“myAe”,“fanout”,true,false,null);
- channel.queueDeclare(“unroutedQueue”,true,false,false,null);
- channel.queueBind(“unroutedQueue”,“myAe”,“”);
- 通過策略(Policy)實現(xiàn)
- rabbitmqctl set_policy AE “^normalExchange$” ‘{“alternate-exchange”:“myAE”}’
特殊情況:
- 如果設(shè)置了備份交換器不存在,客戶端和RabbitMQ服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
- 如果備份交換器沒有綁定任何隊列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
- 如果備份交換器沒有任何匹配的隊列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
- 如果備份交換器和mandatory參數(shù)一起使用,那么mandatory參數(shù)無效
過期時間(Time to Live ,TTL)
設(shè)置消息TTL
-
通過隊列屬性設(shè)置,隊列中所有消息都有相同的過期時間(一旦過期,就從隊列中抹去,消息已經(jīng)在隊列頭部,只要定期從隊列頭部開始掃描即可)
-
channel.queueDeclare方法中加入x-message-ttl參數(shù)實現(xiàn),單位ms
-
Map<String,Object> args = new HashMap<String,Object>(); args.put("x-message-ttl",6000); channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
-
-
通過Policy方式設(shè)置ttl
-
rabbitmqctl set_policy TTL ".*" '{"message-ttl":6000}' --apply-to queue
-
-
通過調(diào)用http api接口設(shè)置
-
-
通過對消息本身單獨設(shè)置,每條消息的ttl可以不同(即使過期,也不會馬上抹去,是否過期是在即將投遞到消費者之前判定的)
- 代碼設(shè)置
- 設(shè)置AMQP.BasicProperties屬性
- set屬性:deliveryMode(持久化消息),expiration(ttl時間)
- 通過 http api接口設(shè)置
- 代碼設(shè)置
-
如果兩個方法一起使用,消息的ttl以兩者之間較小的數(shù)值為準,消息在隊列中一旦超過設(shè)置的ttl時,就會變成死信,消費者將無法再收到該消息
-
不設(shè)置ttl,表示此消息不會過期,ttl=0,表示除非此時可以直接將消息投遞到消費者,否則立即丟棄。
設(shè)置隊列TTL
channel.queueDeclare方法中的x-expires參數(shù)可以控制隊列被自動刪除前處于未使用狀態(tài)的時間(未使用:隊列上沒有任何消費者,隊列也沒有被重新聲明,并在過期時間段內(nèi)也未調(diào)用過Basic.Get命令)
Map<String, Object> args = new HashMap<String,Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue",false,false,false,args);
死信隊列
當消息在一個隊列中變成死信后,能被重新被發(fā)送到另一個交換器中,這個交換器就是DLX,死信交換器,綁定DLX的隊列就是死信隊列
消息變成死信情況
- 消息被拒絕
- 消息過期
- 隊列達到最大長度
當隊列中存在死信時,rabbitmq會自動將這個消息重新發(fā)布到設(shè)置的DLX上去,進而被路由到另一個隊列,死信隊列,可以監(jiān)聽這個隊列的消息進行相應(yīng)處理
設(shè)置方法
- 代碼設(shè)置:
- channel.queueDeclare方法中設(shè)置x-dead-letter-exchange為隊列添加DLX
- 通過Policy方式設(shè)置
延遲隊列
延遲隊列存儲的對象是對應(yīng)的延遲消息(當消息被發(fā)送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費)。
場景:
- 訂單系統(tǒng),30min內(nèi)未支付,進行異常處理
- 手機遙控設(shè)備指定時間工作
通過DLX 和TTL模擬延遲隊列的功能。
假設(shè)一個應(yīng)用中需要將每條消息都設(shè)置為10秒延遲,生產(chǎn)者通過exchange.normal交換器將發(fā)送的消息存儲在queue.normal隊列,消費者訂閱的是queue.dlx隊列,當消息從queue.normal整個隊列中過期之后被存入queue.dlx隊列,消費者恰巧消費到了延遲10秒的這條消息。
優(yōu)先級隊列
實現(xiàn):通過設(shè)置隊列的x-max-priority參數(shù)實現(xiàn)
默認最低優(yōu)先級為0,越高越優(yōu)先消費
前提:如果在消費速度大于生成者的速度且broker中沒有消息堆積的情況下,對發(fā)送的消息設(shè)置優(yōu)先級就沒什么意義了。
RPC實現(xiàn)
客戶端發(fā)送請求消息,服務(wù)端回復(fù)響應(yīng)的消息,為了接收響應(yīng)的消息,需要在請求消息中發(fā)送一個回調(diào)隊列
String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
- replyTo:用來設(shè)置一個回調(diào)隊列
- correlationId:用來關(guān)聯(lián)請求和其調(diào)用RPC之后的回復(fù),每一個請求設(shè)置一個唯一的correlationId
可以為每一個客戶端創(chuàng)建一個單一的回調(diào)隊列。
持久化
- 交換器持久化:通過在聲明隊列是將durable參數(shù)置為true實現(xiàn)的。如果不持久化,rabbitmq服務(wù)重啟后,相關(guān)的交換器元數(shù)據(jù)會丟失,消息不丟失,只是不能將消息發(fā)送到這個交換器中了。
- 隊列持久化:通過在聲明隊列時將durable置為true,如果不設(shè)置持久化,rabbitmq重啟后,相關(guān)隊列元數(shù)據(jù)會丟失,此時數(shù)據(jù)也會丟失。
- 消息持久化:通過將消息的投遞模式BasicProperties中的diliveryMode屬性設(shè)置為2即可實現(xiàn)消息的持久化
- 設(shè)置了隊列和消息的持久化,rabbitmq服務(wù)重啟后,消息依舊存在。
將交換器、隊列、消息都設(shè)置持久化后不能保證數(shù)據(jù)百分百丟失。
生產(chǎn)者確認
確定消息到底有沒有正確到達服務(wù)器??梢酝ㄟ^事務(wù)機制和發(fā)送方確認機制
事務(wù)機制
rabbitmq客戶端與事務(wù)機制相關(guān)方法
- channel.txSelect:用于當前的信道設(shè)置成事務(wù)模式
- channel.txCommit:用于提交事務(wù)
- channel.txRollback:用于事務(wù)回滾
開啟事務(wù)流程
- 客戶端發(fā)送Tx.select,將信道置為事務(wù)模式
- Broker回復(fù)Tx.Select-Ok,確認已將信道置為事務(wù)模式
- 在發(fā)送完消息后,客戶端發(fā)送Tx.Commit提交事務(wù)
- Broker回復(fù)Tx.Commit-Ok,確認提交事務(wù)
- 如果發(fā)生異常,在捕獲異常后,channel.txRollback()回滾
缺點:會有性能損失
發(fā)送方確認機制
- 生產(chǎn)者將信道設(shè)置成confirm模式(channel.confirmSelect),rabbitmq同意:Confirm.Select-Ok;
- 一旦信道進入confirm模式,所有在該信道上發(fā)布的消息都會被指派一個唯一id
- 一旦消息被投遞到匹配的隊列后,rabbitmq會發(fā)送一個確認給生產(chǎn)者(包含消息唯一id),使得生產(chǎn)者知曉消息已經(jīng)正確到達了目的地。
事務(wù)機制在一條消息發(fā)送后會使發(fā)送端阻塞,等待rabbitmq回應(yīng)后才發(fā)下一條消息,而發(fā)送發(fā)確認機制最大好處是異步的。生產(chǎn)者通過回調(diào)方法處理該確認消息。如果rabbitmq因自身內(nèi)部錯誤導(dǎo)致消息丟失,會發(fā)送一條nack命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理nack命令。
publisher confirm優(yōu)勢
- 批量confirm方法,每發(fā)送一批消息后,調(diào)用channel.waitForConfirms方法,等待服務(wù)器的確認返回
- 異步confirm方法:提供一個回調(diào)方法,服務(wù)端確認了一條或多條消息后客戶端會回調(diào)這個方法進行處理。
消費端要點介紹
消息分發(fā)
rabbitmq隊列擁有多個消費者時,隊列收到的消息將以輪詢的分發(fā)方式發(fā)送給消費者,每條消息只會發(fā)送給訂閱列表里的一個消費者。
- 問題:如果某些空閑,某些忙碌造成整體下降
- 方法:channel.basicQos方法允許限制信道上的消費者所能保持的最大未確認消息的數(shù)量。如果達到上限,就不會向這個消費者再發(fā)送任何消息,知道消費者確認了某條消息后,相應(yīng)計數(shù)減1,之后消費者可以繼續(xù)接受消息。
消息順序性
指消費者消費到的消息和發(fā)送者發(fā)布的消息的順序是一致的。
打破順序性的情形
- 如果生產(chǎn)者使用了事務(wù)機制,發(fā)送消息遇到異常進行了事務(wù)回滾,需重新補償發(fā)送,如果是另一個線程實現(xiàn),則出現(xiàn)亂序。
- 如果生產(chǎn)者發(fā)送的消息設(shè)置了不同的超時時間,并設(shè)置了死信隊列,順序不一致。
- 設(shè)置了優(yōu)先級,也不是順序的。
要保證消息的順序性,需要業(yè)務(wù)方使用rabbitmq之后進一步處理,例如在消息體內(nèi)添加全局有序標識實現(xiàn)。
棄用QueueingConsumer
缺陷
- 內(nèi)存溢出問題:隊列堆積較多的消息,導(dǎo)致消費者客戶端內(nèi)存溢出假死,不斷堆積
- 使用Basic.Qos限制某個消費者所保持未確認消息的數(shù)量。
- 會拖累同一個connection下的所有信道,性能降低
- 同步遞歸調(diào)用QueueingConsumer會產(chǎn)生死鎖
- rabbitmq的自動連接恢復(fù)機制不支持Queueing Consumer這種形式
- QueueingConsumer不是事件驅(qū)動的
消息傳輸保障
一般消息中間件消息傳輸保障分為三個層級
- 最多一次
- 最少一次
- 恰好一次
rabbitmq支持其中的最多一次和最少一次,其中最少一次投遞實現(xiàn)需要考慮
- 消息生產(chǎn)者需要開啟事務(wù)機制或publisher confirm機制,以確保消息可以可靠地傳輸?shù)絩abbitmq中
- 消息生產(chǎn)者需要配合使用mandatory參數(shù)或者備份交換器來確保消息能夠從交換器路由到隊列中,進而能夠保存下來而不會被丟棄
- 消息和隊列都需要進行持久化處理,以確保rabbitmq服務(wù)器在遇到異常情況時不會造成消息丟失
- 消費者在消費消息的同時需要將autoAck設(shè)置為false,然后通過手動確認的方式去確認已經(jīng)正確消費的消息,以避免在消費端引起不必要的消息丟失。
參考:《RabbitMQ實戰(zhàn)指南》