博彩網(wǎng)站做代理seo sem論壇
參考API :?Overview (RabbitMQ Java Client 5.20.0 API)?
參考文檔:??RabbitMQ: One broker to queue them all | RabbitMQ
目錄
結(jié)構(gòu)
?Hello World
?consumer
?producer
?創(chuàng)建連接API解析
創(chuàng)建連接工廠
生產(chǎn)者生產(chǎn)消息
?消費(fèi)者消費(fèi)消息
隊(duì)列聲明?
工作隊(duì)列Work Queues?
公平分發(fā)
輪訓(xùn)分發(fā)
消息應(yīng)答
發(fā)布確認(rèn)?
相關(guān)的實(shí)體類說(shuō)明
Delivery消息體?
Envelope元數(shù)據(jù)
持久化?
?發(fā)布訂閱
交換機(jī)
臨時(shí)隊(duì)列?
? ? ? ? 在此之前您需要了解生產(chǎn)者消費(fèi)者模型...? 學(xué)習(xí)的時(shí)候, 應(yīng)該結(jié)合下面這張圖一起看.
結(jié)構(gòu)
?Hello World
?consumer
package one;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消費(fèi)者*/
public class Consumer {public static final String QUEUE_NAME = "hello";// 接受消息public static void main(String[] args) throws IOException, TimeoutException {// 創(chuàng)建鏈接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("106.14.165.91");factory.setPassword("123");factory.setUsername("admin");// 進(jìn)行連接Connection connection = factory.newConnection();// 鏈接成功之后創(chuàng)建一個(gè)信道Channel channel = connection.createChannel();// 消費(fèi)者消費(fèi)消息/*** 參數(shù)* 1.消費(fèi)哪個(gè)隊(duì)列* 2.消費(fèi)成功之后,是否要自動(dòng)應(yīng)答,true表示自動(dòng)應(yīng)答, 否則false* 3.未消費(fèi)成功的回調(diào)方法* 4.消費(fèi)者取消消費(fèi)的回調(diào)*/channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {@Overridepublic void handle(String var1, Delivery var2) throws IOException {String msg = new String(var2.getBody());System.out.println(msg);}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("發(fā)生錯(cuò)誤:" + s);}});}
}
?producer
package one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static final String QUEUE_NAME = "hello";// 發(fā)消息public static void main(String[] args) throws IOException, TimeoutException {// 創(chuàng)建一個(gè)鏈接工廠ConnectionFactory factory = new ConnectionFactory();// 工廠的ip, 鏈接rabbit隊(duì)列factory.setHost("106.14.165.91");factory.setUsername("admin");factory.setPassword("123");// 建立連接Connection connection = factory.newConnection();// 獲取信道Channel channel1 = connection.createChannel();// 生成一個(gè)隊(duì)列/*** 第一個(gè)參數(shù): 隊(duì)列名稱* 第二個(gè)參數(shù): 消息是否持久化, true表示存儲(chǔ)在磁盤上, 否則表示存儲(chǔ)在內(nèi)存中(默認(rèn))* 第三個(gè)參數(shù): 該隊(duì)列是否消息共享, true表示可以多個(gè)消費(fèi)者消費(fèi), 否則只能一個(gè)消費(fèi)者消費(fèi)* 第四個(gè)參數(shù): 是否自動(dòng)刪除, 最后一個(gè)消費(fèi)者斷開連接之后, 該隊(duì)列是否自動(dòng)刪除,true表示自動(dòng)刪除* 其他參數(shù):*/channel1.queueDeclare(QUEUE_NAME,false,false,false,null);long nextPublishSeqNo = channel1.getNextPublishSeqNo();System.out.println(nextPublishSeqNo);// 發(fā)送消息String msg = "hello world";/** 參數(shù)列表* 1 : 發(fā)送到哪個(gè)交換機(jī)* 2 : 路由的key值, 本次是隊(duì)列名稱* 3 : 其他參數(shù)* 4 : 消息體*/channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());long nextPublishSeqNo2 = channel1.getNextPublishSeqNo();System.out.println(nextPublishSeqNo);channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());long nextPublishSeqNo3 = channel1.getNextPublishSeqNo();System.out.println("消息發(fā)送完畢over");}
}
?創(chuàng)建連接API解析
? ? ? ? 官方api網(wǎng)址:??Channel (RabbitMQ Java Client 5.20.0 API)declaration: package: com.rabbitmq.client, interface: Channelhttps://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html
????????Connection:publisher/consumer和broker之間的TCP連接,??Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè)Connection,在消息量大的時(shí)候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊,AMQP method包含了channel id 幫助客戶端和message broker 識(shí)別 channel,所以channel之間是完全隔離的。
????????Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷.
創(chuàng)建連接工廠
? ? ? ? 創(chuàng)建一個(gè)連接工廠之后, 設(shè)置對(duì)應(yīng)Rabbitmq在哪個(gè)服務(wù)器上面, 并提供安全訪問(wèn)的驗(yàn)證.
? ? ? ? 在建立連接工廠之后進(jìn)行連接, 就可以使用工廠創(chuàng)建連接.
ConnectionFactory factory = new ConnectionFactory();
// 工廠的ip, 鏈接rabbit隊(duì)列
factory.setHost("106.14.165.11");
factory.setUsername("usr");
factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
? ? ? ? ?創(chuàng)建鏈接之后就可以使用這個(gè)鏈接的對(duì)象來(lái)創(chuàng)建channel.
生產(chǎn)者生產(chǎn)消息
? ? ? ? 生產(chǎn)者生產(chǎn)消息, 然后通過(guò)channel發(fā)送給隊(duì)列. 通過(guò)創(chuàng)建的channel對(duì)象, 調(diào)用其中的basicPublish方法來(lái)將消息發(fā)送給隊(duì)列.
????????basicPublish
?是 RabbitMQ 中用于發(fā)布消息到指定交換機(jī)的方法。它的主要作用是允許生產(chǎn)者將消息發(fā)送到 RabbitMQ 的交換機(jī),然后交換機(jī)根據(jù)路由規(guī)則將消息發(fā)送到相應(yīng)的隊(duì)列中,以供消費(fèi)者消費(fèi)。
basicPublish參數(shù)解析:
basicPublish有三個(gè)重載版本:
void basicPublish(String exchange , String routingKey , AMQP.BasicProperties props, byte[] body ) throws IOException;
- exchange : 指定要發(fā)送的交換機(jī)的名稱, 如果設(shè)置空字符串, 那么消息會(huì)被發(fā)送到RabbitMQ的默認(rèn)交換機(jī).
- routingKey : 路由鍵, 用于指定消息要路由到的隊(duì)列.
- props : 消息的屬性, 這是一個(gè)可選參數(shù), 里面有: 消息類型, 格式, 優(yōu)先級(jí), 過(guò)期時(shí)間等等
- body : 消息體, 也就是要發(fā)送的消息本身
? ? ? ? exchange這個(gè)參數(shù), 如果指定默認(rèn)的交換機(jī), 也就是如下圖所示:
void basicPublish(String exchange, String routingKey, boolean var3, AMQP.BasicProperties props, byte[] body) throws IOException;
- exchange和?
routingKey
:與第一個(gè)方法中的意義相同,分別是交換機(jī)名稱和路由鍵。 var3
?(boolean):是否強(qiáng)制路由(mandatory routing)。如果設(shè)置為true
,并且消息無(wú)法路由到任何隊(duì)列(沒(méi)有匹配的綁定),那么RabbitMQ會(huì)返回一個(gè)錯(cuò)誤給生產(chǎn)者。如果設(shè)置為false
,消息將被丟棄。- props和 body:與第一個(gè)方法中的意義相同,分別是消息屬性和消息體。
void basicPublish(String var1, String var2, boolean var3, boolean var4, AMQP.BasicProperties var5, byte[] var6) throws IOException;
var1
?和?var2
:與前兩個(gè)方法中的意義相同,分別是交換機(jī)名稱和路由鍵。var3
?(boolean):是否強(qiáng)制路由,與第二個(gè)方法中的意義相同。var4
?(boolean):是否立即發(fā)布(immediate flag)。如果設(shè)置為true
,并且消息無(wú)法路由到任何消費(fèi)者(沒(méi)有匹配的隊(duì)列或消費(fèi)者不在線),那么RabbitMQ會(huì)返回一個(gè)錯(cuò)誤給生產(chǎn)者。如果設(shè)置為false
,消息將被存儲(chǔ)在隊(duì)列中等待消費(fèi)者。var5
?和?var6
:與第一個(gè)方法中的意義相同,分別是消息屬性和消息體。
需要注意的是, 如果你指定默認(rèn)的交換機(jī), 也就是default交換機(jī), 那么第二個(gè)參數(shù)routingKey的意思就變成了queue了, 也就是第二個(gè)參數(shù)改為 對(duì)應(yīng)的隊(duì)列的名稱.
?消費(fèi)者消費(fèi)消息
? ? ? ? ?消費(fèi)者消費(fèi)消息的方法為basicConsume() 這個(gè)方法有很多個(gè)重載, 如下:
? ? ? ? 地址:??Channel (RabbitMQ Java Client 5.20.0 API)
? ? ? ? 這里只講解最常見的, 也是初學(xué)者最常用的一個(gè)方法:
basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback
)
? ? ? ? 參數(shù)解析:
- String queue:
- 這個(gè)參數(shù)指定了消費(fèi)者要從中接收消息的隊(duì)列名稱。
- boolean autoAck:
- 這個(gè)參數(shù)決定了是否自動(dòng)確認(rèn)消息。如果設(shè)置為?
true
,則一旦消息被交付給消費(fèi)者,RabbitMQ 會(huì)自動(dòng)將其標(biāo)記為已確認(rèn),即使消費(fèi)者還沒(méi)有實(shí)際處理完這條消息。這種模式下,如果消費(fèi)者在處理消息時(shí)崩潰或發(fā)生錯(cuò)誤,那么這條消息就會(huì)丟失,因?yàn)?RabbitMQ 認(rèn)為它已經(jīng)被成功處理了。 - 如果設(shè)置為?
false
,則消費(fèi)者需要顯式地調(diào)用?basicAck
?方法來(lái)確認(rèn)消息已被成功處理。這樣,如果消費(fèi)者在處理消息時(shí)崩潰,RabbitMQ 會(huì)重新將這條消息放回隊(duì)列中,等待其他消費(fèi)者處理,從而保證了消息的可靠性。
- 這個(gè)參數(shù)決定了是否自動(dòng)確認(rèn)消息。如果設(shè)置為?
- Map<String,Object> arguments:
- 這個(gè)參數(shù)允許你傳遞額外的參數(shù)到消費(fèi)者,這些參數(shù)可以用來(lái)配置消費(fèi)者的行為。例如,你可以使用它來(lái)設(shè)置消費(fèi)者標(biāo)簽(consumer tag),該標(biāo)簽用于唯一標(biāo)識(shí)這個(gè)消費(fèi)者,或者在后續(xù)的操作中引用它。
- DeliverCallback deliverCallback:
- 這是一個(gè)回調(diào)函數(shù),當(dāng) RabbitMQ 向消費(fèi)者發(fā)送消息時(shí),會(huì)自動(dòng)調(diào)用這個(gè)回調(diào)?;卣{(diào)函數(shù)通常包含處理消息的邏輯,比如解析消息內(nèi)容、執(zhí)行業(yè)務(wù)邏輯等。
- 回調(diào)函數(shù)的參數(shù)通常包含消息的內(nèi)容、消息的元數(shù)據(jù)(如消息的交換機(jī)、路由鍵、消息ID等)以及一個(gè)通道(Channel)對(duì)象,通過(guò)這個(gè)通道對(duì)象,消費(fèi)者可以發(fā)送消息確認(rèn)、拒絕消息或進(jìn)行其他操作。
- CancelCallback cancelCallback:
- 這是一個(gè)可選的回調(diào)函數(shù),當(dāng)消費(fèi)者被取消(例如,由于連接斷開或消費(fèi)者顯式地調(diào)用?
basicCancel
)時(shí),會(huì)自動(dòng)調(diào)用這個(gè)回調(diào)。這個(gè)回調(diào)可以用于執(zhí)行清理工作,比如釋放資源、記錄日志等。
- 這是一個(gè)可選的回調(diào)函數(shù),當(dāng)消費(fèi)者被取消(例如,由于連接斷開或消費(fèi)者顯式地調(diào)用?
下面是?DeliverCallback 和CancelCallback 兩個(gè)接口的代碼:
@FunctionalInterface
public interface DeliverCallback {void handle(String var1, Delivery var2) throws IOException;
}
@FunctionalInterface
public interface CancelCallback {void handle(String var1) throws IOException;
}
? ? ? ? 我們需要重寫里面的handle方法, 示例如下:
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {@Overridepublic void handle(String var1, Delivery var2) throws IOException {// ...}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {// ...}});
? ? ? ?當(dāng)然你也可以使用lambda表達(dá)式;
隊(duì)列聲明?
? ? ? ? 生產(chǎn)者使用的是basicPublish來(lái)將消息推送至隊(duì)列, 也就是:
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
? ? ? ? 但是存在一個(gè)問(wèn)題, 如果你basicPublish指定的交換機(jī)不存在? 那么你推送消息到你指定的交換機(jī), 就會(huì)發(fā)生異常, 所以除非你的RabbitMQ-server本地已經(jīng)創(chuàng)建了這個(gè)交換機(jī), 那么就不需要其他操作, 但是如果你沒(méi)有你指定的名稱的交換機(jī), 那么就應(yīng)該去聲明一個(gè)交換機(jī).
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
queueDeclare()
? ? ? ? ?也就是方法: queueDeclare() , 它可以指定參數(shù), 也可以不指定, 下面是他們的解釋:
queueDeclare
?是 RabbitMQ Java 客戶端庫(kù)中用于聲明隊(duì)列的方法。這個(gè)方法有兩個(gè)版本,一個(gè)不帶參數(shù),另一個(gè)帶有多個(gè)參數(shù)以提供隊(duì)列的詳細(xì)配置。下面我將詳細(xì)解釋這兩個(gè)方法及其參數(shù)的作用。
第一個(gè)方法:queueDeclare()
????????Actively declare a server-named exclusive, autodelete, non-durable queue.?
????????這個(gè)方法不帶任何參數(shù)。當(dāng)你調(diào)用這個(gè)方法時(shí),RabbitMQ 會(huì)為你創(chuàng)建一個(gè)新的隊(duì)列,該隊(duì)列的名稱將由 RabbitMQ 自動(dòng)生成,并且這個(gè)隊(duì)列是非持久的、排他的、自動(dòng)刪除的,且不帶任何額外的參數(shù)。
????????由于沒(méi)有指定隊(duì)列名稱,你通常無(wú)法預(yù)先知道隊(duì)列的確切名稱,這可能會(huì)在某些場(chǎng)景下造成不便,比如當(dāng)你需要多個(gè)消費(fèi)者共享同一個(gè)隊(duì)列時(shí)。此外,由于隊(duì)列是非持久的,如果 RabbitMQ 服務(wù)器重啟,這個(gè)隊(duì)列將會(huì)丟失,所有在隊(duì)列中的消息也會(huì)丟失。
第二個(gè)方法:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
這個(gè)方法允許你更細(xì)致地配置隊(duì)列的屬性。下面是每個(gè)參數(shù)的解釋:
- String queue:
- 隊(duì)列的名稱。這個(gè)名稱必須是非空的,并且在 RabbitMQ 服務(wù)器上是唯一的。
- boolean durable:
- 是否持久化隊(duì)列。如果設(shè)置為?
true
,隊(duì)列會(huì)在 RabbitMQ 服務(wù)器重啟后依然存在。如果設(shè)置為?false
,隊(duì)列則是非持久的,服務(wù)器重啟后隊(duì)列將不存在。
- 是否持久化隊(duì)列。如果設(shè)置為?
- boolean exclusive:
- 是否排他。如果設(shè)置為?
true
,隊(duì)列只能被聲明它的連接使用,并且當(dāng)連接關(guān)閉時(shí),隊(duì)列會(huì)被自動(dòng)刪除。這通常用于臨時(shí)隊(duì)列。
- 是否排他。如果設(shè)置為?
- boolean autoDelete:
- 是否自動(dòng)刪除。如果設(shè)置為?
true
,當(dāng)最后一個(gè)消費(fèi)者斷開連接后,隊(duì)列會(huì)自動(dòng)刪除。如果設(shè)置為?false
,則不會(huì)自動(dòng)刪除隊(duì)列。
- 是否自動(dòng)刪除。如果設(shè)置為?
- Map<String,Object> arguments:
- 一組額外的隊(duì)列參數(shù),可以用來(lái)設(shè)置隊(duì)列的更多高級(jí)特性。例如,你可以設(shè)置隊(duì)列的最大長(zhǎng)度、消息生存時(shí)間等。
對(duì)比兩個(gè)方法
????????第一個(gè)方法(無(wú)參數(shù)版本)非常簡(jiǎn)單易用,但功能有限。它適用于那些不需要復(fù)雜隊(duì)列配置的場(chǎng)景,比如臨時(shí)測(cè)試或簡(jiǎn)單應(yīng)用。然而,由于它創(chuàng)建的隊(duì)列是非持久的,且名稱不可預(yù)知,因此它可能不適用于需要持久化存儲(chǔ)或精確控制隊(duì)列名稱的場(chǎng)景。
????????第二個(gè)方法(帶參數(shù)版本)提供了更豐富的隊(duì)列配置選項(xiàng),使得你可以更精確地控制隊(duì)列的行為。通過(guò)設(shè)置不同的參數(shù),你可以創(chuàng)建持久化隊(duì)列、排他隊(duì)列、自動(dòng)刪除隊(duì)列,以及帶有額外屬性的隊(duì)列。這使得這個(gè)方法適用于那些需要復(fù)雜隊(duì)列配置和高級(jí)特性的場(chǎng)景。
????????在實(shí)際應(yīng)用中,你應(yīng)該根據(jù)應(yīng)用的需求來(lái)選擇使用哪個(gè)方法。如果你只是需要一個(gè)簡(jiǎn)單的、臨時(shí)的隊(duì)列來(lái)傳遞消息,那么無(wú)參數(shù)版本可能足夠了。但如果你需要確保隊(duì)列的持久性、控制隊(duì)列的名稱、設(shè)置隊(duì)列的額外屬性等,那么你應(yīng)該使用帶參數(shù)版本。
?對(duì)于消費(fèi)者同樣如此?
? ? ? ? AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
?這個(gè)方法確實(shí)是用于聲明(或創(chuàng)建)一個(gè)隊(duì)列的。在RabbitMQ中,隊(duì)列的聲明是一個(gè)冪等操作,這意味著即使隊(duì)列已經(jīng)存在,再次聲明它也不會(huì)產(chǎn)生錯(cuò)誤或?qū)е氯魏尾黄谕男袨椤?/p>
當(dāng)調(diào)用這個(gè)方法時(shí),RabbitMQ會(huì)檢查是否已經(jīng)存在具有相同名稱的隊(duì)列:
-
如果隊(duì)列不存在:RabbitMQ會(huì)根據(jù)提供的參數(shù)(如
durable
、exclusive
、autoDelete
和arguments
)創(chuàng)建一個(gè)新的隊(duì)列。 -
如果隊(duì)列已經(jīng)存在:RabbitMQ會(huì)忽略聲明請(qǐng)求中的大多數(shù)參數(shù)(除了
exclusive
和autoDelete
,這兩個(gè)參數(shù)僅在首次聲明隊(duì)列時(shí)生效),并返回隊(duì)列的當(dāng)前屬性。重要的是要注意,即使隊(duì)列已經(jīng)存在,durable
標(biāo)志也不會(huì)影響現(xiàn)有隊(duì)列的持久性。如果隊(duì)列在原始聲明時(shí)是持久的,那么它將繼續(xù)是持久的,即使后續(xù)的聲明將其標(biāo)記為非持久的。
????????因此,如果你嘗試聲明一個(gè)已經(jīng)存在的隊(duì)列,RabbitMQ不會(huì)報(bào)錯(cuò)或采取任何特別的行動(dòng),除了驗(yàn)證提供的exclusive
和autoDelete
標(biāo)志是否與原始聲明一致(如果不一致,操作會(huì)失敗)。其他參數(shù)(如durable
)將不會(huì)影響已存在的隊(duì)列(但是不報(bào)錯(cuò)并不是絕對(duì)的, 這個(gè)需要根據(jù)版本說(shuō)明去判斷, 不能肯定它不報(bào)錯(cuò))。
????????最后,需要注意的是,雖然聲明隊(duì)列本身不會(huì)拋出IOException
,但如果在與RabbitMQ服務(wù)器通信時(shí)發(fā)生網(wǎng)絡(luò)問(wèn)題或其他I/O問(wèn)題,這個(gè)方法可能會(huì)拋出IOException
。因此,在實(shí)際使用中,你應(yīng)該妥善處理這些潛在的異常。
工作隊(duì)列Work Queues?
? ? ? ? 工作隊(duì)列, 主要是避免立即執(zhí)行資源密集型任務(wù), 而不得不等待它完成, 相反我們安裝任務(wù)之后執(zhí)行, 我們把任務(wù)封裝為消息并將其發(fā)送給隊(duì)列, 在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè), 當(dāng)有多個(gè)線程工作時(shí), 這些工作線程講義氣處理這些任務(wù).
????????
????????RabbitMQ的工作隊(duì)列(Work Queue)是一種消息隊(duì)列模式,它允許你將任務(wù)(通常表示為消息)分發(fā)給多個(gè)消費(fèi)者(工作進(jìn)程)進(jìn)行并行處理。這種模式特別適用于那些可以并行處理且不需要按照特定順序完成的任務(wù)。
????????在工作隊(duì)列模式中,生產(chǎn)者發(fā)送消息到隊(duì)列中,一個(gè)或多個(gè)消費(fèi)者從隊(duì)列中接收并處理這些消息。每個(gè)消息都會(huì)被一個(gè)消費(fèi)者處理,并且通常不會(huì)被多個(gè)消費(fèi)者處理(除非有明確的路由或復(fù)制邏輯)。這種模式非常適合用于處理后臺(tái)任務(wù),如批量電子郵件發(fā)送、日志處理、圖像處理等。
RabbitMQ的工作隊(duì)列模式有以下幾個(gè)關(guān)鍵特點(diǎn):
-
任務(wù)分發(fā):生產(chǎn)者將任務(wù)作為消息發(fā)送到隊(duì)列中。RabbitMQ負(fù)責(zé)將消息從隊(duì)列中取出并分發(fā)給一個(gè)或多個(gè)消費(fèi)者。分發(fā)通?;谙⒌南冗M(jìn)先出(FIFO)順序,但也可以通過(guò)其他策略(如優(yōu)先級(jí)隊(duì)列)進(jìn)行定制。
-
并行處理:多個(gè)消費(fèi)者可以同時(shí)從隊(duì)列中接收消息并處理任務(wù)。這使得任務(wù)可以并行執(zhí)行,從而提高了整體的處理速度。
-
消息確認(rèn):為了確保消息的可靠處理,消費(fèi)者通常在處理完消息后會(huì)向RabbitMQ發(fā)送一個(gè)確認(rèn)信號(hào)(ack)。這樣,即使消費(fèi)者在處理消息時(shí)崩潰,RabbitMQ也可以將未確認(rèn)的消息重新放回隊(duì)列中,等待其他消費(fèi)者處理。這種機(jī)制保證了消息的可靠性。
-
持久化:通過(guò)配置隊(duì)列和消息的持久化屬性,可以確保即使在RabbitMQ服務(wù)器重啟后,消息也不會(huì)丟失。這對(duì)于處理重要任務(wù)至關(guān)重要。
-
擴(kuò)展性:工作隊(duì)列模式具有很好的擴(kuò)展性。你可以根據(jù)需要添加更多的消費(fèi)者來(lái)處理更多的任務(wù),從而輕松應(yīng)對(duì)負(fù)載的增加。
使用RabbitMQ的工作隊(duì)列模式,你可以構(gòu)建高效、可靠且可擴(kuò)展的后臺(tái)任務(wù)處理系統(tǒng),以滿足各種應(yīng)用場(chǎng)景的需求。
? ? ? ? 下面我們來(lái)一一列舉出案例來(lái)解析工作隊(duì)列的特性....?
公平分發(fā)
????????RabbitMQ 在默認(rèn)情況下,其分發(fā)機(jī)制是公平的,它試圖將消息平均地分發(fā)給各個(gè)消費(fèi)者,確保每個(gè)消費(fèi)者都有機(jī)會(huì)處理大致相同數(shù)量的消息。這種分發(fā)并不是隨機(jī)的,而是按照一定的順序或規(guī)則進(jìn)行。
? ? ? ? 但是這種分發(fā)模式會(huì)有一個(gè)很大的問(wèn)題, 那么就是如果一個(gè)消費(fèi)者處理消息的速度慢, 一個(gè)快, 那么就會(huì)有一個(gè)消費(fèi)者產(chǎn)生饑餓的情況, 而另外一個(gè)消費(fèi)者非常忙碌, 嚴(yán)重的隊(duì)列會(huì)出現(xiàn)消息積壓的情況. 此時(shí)產(chǎn)生饑餓的消費(fèi)者沒(méi)有完全利用cpu來(lái)消費(fèi)消息, 所以就產(chǎn)生了資源的浪費(fèi), 為了避免這個(gè)情況?
????????在 RabbitMQ 中,如果消息被平均分發(fā)到多個(gè)消費(fèi)者(如消費(fèi)者a和b),但消費(fèi)者的處理速度不同(如a處理速度很快,b處理速度很慢),那么未被消費(fèi)的消息會(huì)繼續(xù)保留在隊(duì)列中,等待消費(fèi)者處理。具體來(lái)說(shuō),當(dāng)消費(fèi)者a迅速處理完自己的消息后,它會(huì)繼續(xù)從隊(duì)列中獲取并處理新的消息(如果有的話)。而消費(fèi)者b由于處理速度慢,它還未消費(fèi)完的消息會(huì)留在隊(duì)列中,等待其逐漸處理。
????????RabbitMQ 本身并沒(méi)有為每個(gè)消費(fèi)者設(shè)置單獨(dú)的緩存來(lái)存儲(chǔ)未處理的消息。消息的處理和存儲(chǔ)都是在隊(duì)列層面進(jìn)行的。隊(duì)列是消息的緩沖區(qū),它負(fù)責(zé)存儲(chǔ)和分發(fā)消息給消費(fèi)者。消費(fèi)者按照自己的速度從隊(duì)列中拉取(或在某些配置下由隊(duì)列推送)消息進(jìn)行處理。
? ? ? ? 為了解決這種情況, 可以使用basicQos(1)方法來(lái)設(shè)置每個(gè)消費(fèi)者同時(shí)只能消費(fèi)一個(gè)消息, 這個(gè)設(shè)置將會(huì)告訴隊(duì)列, 給我發(fā)送的消息, 同時(shí)不能超過(guò)一個(gè) , 或者說(shuō)是"別給我發(fā)送消息, 除非我上一個(gè)消息已經(jīng)處理并應(yīng)答",? 同時(shí), 他會(huì)將第二個(gè)消息發(fā)送給另外一個(gè)空閑的消費(fèi)者來(lái)處理.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
? ? ? ? ?但是如果所有的消費(fèi)者都處于忙碌狀態(tài), 消息無(wú)法即使處理, 那么如果你還有必要維護(hù)這個(gè)隊(duì)列, 那么推薦您多創(chuàng)建幾個(gè)消費(fèi)者去消費(fèi).
輪訓(xùn)分發(fā)
? ? ? ?首先創(chuàng)建兩個(gè)消費(fèi)者, 創(chuàng)建一個(gè)生產(chǎn)者, 看看他們之間的任務(wù)是如何分配的:
消費(fèi)者1 :
package MutiThreadWorkQueue;import Util.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;/*** 工作隊(duì)列1* 也就是消費(fèi)者1*/
public class Worker1 {// 接收消息public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();// soutSystem.out.println("worker1 : ");// 接收消息channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println("worker1: " + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("worker1 發(fā)生錯(cuò)誤");}});}
}
?消費(fèi)者2同消費(fèi)者1一樣, 只不過(guò)里面的一些向控制臺(tái)輸出的提示信息發(fā)生了一些修改, 例如:
System.out.println("worker2 : ");
?生產(chǎn)者
package MutiThreadWorkQueue;import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.io.IOException;
import java.util.Scanner;public class Producer {public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();// 從控制臺(tái)輸入 接收信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String msg = scanner.next();channel.basicPublish("",RabbitMQUtil.QUEUE_NAME,null,msg.getBytes());System.out.println("發(fā)送消息: " + msg + ",發(fā)送完畢");}}
}
?RabbitMQUtil :
package Util;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQUtil {public static final String QUEUE_NAME = "hello";public static Channel getChannel() {ConnectionFactory factory = new ConnectionFactory();// 工廠的ip, 鏈接rabbit隊(duì)列factory.setHost("106.14.165.91");factory.setUsername("admin");factory.setPassword("123");// 建立連接Connection connection = null;try {connection = factory.newConnection();return connection.createChannel();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}// 獲取信道}}
? ? ? ? 查看rabbitmq是否已經(jīng)存在RabbitMQUtil.QUEUE_NAME這個(gè)字符串對(duì)應(yīng)的隊(duì)列:
? ? ? ? 已經(jīng)存在, 直接啟動(dòng)生產(chǎn)者和消費(fèi)者, 然后在生產(chǎn)者中多次輸入信息:
? ? ? ? 查看消費(fèi)者1和消費(fèi)者2 :
? ? ? ? ?如果你多次重復(fù)的去實(shí)驗(yàn)?zāi)憔蜁?huì)發(fā)現(xiàn), 總是奇數(shù)的在woker1或者是woker2.?
? ? ? ? 為什么??
? ? ? ? 我們首先看看消費(fèi)者的消費(fèi)的代碼:
channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println("worker1: " + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("worker1 發(fā)生錯(cuò)誤");}});
? ? ? ? 然后根據(jù)上面的參數(shù)解析, 可以發(fā)現(xiàn), 其實(shí)他是使用的這個(gè)構(gòu)造方法:
? ? ? ? 這個(gè)構(gòu)造方法有什么特別之處? 那就是它沒(méi)有指定exclusive, 也就是沒(méi)有指定它是否是排他的, 但是不設(shè)置就不代表沒(méi)有隱式設(shè)置.
? ? ? ? 官方文檔給出的這個(gè)構(gòu)造方法的描述是:
? ? ? ? 也就是non-sxclusive, 我們知道exclusive是排他的意思, 那么non-exclusive就是不拍他的, 也就是說(shuō), 消費(fèi)的時(shí)候允許其他消費(fèi)者一起共享處理. 但是每一個(gè)任務(wù)只能分發(fā)給一個(gè)消費(fèi)者.?
消息應(yīng)答
? ? ? ? ?首先你得確認(rèn)一個(gè)東西, 那就是, 消息從生產(chǎn)者這里發(fā)送出去, 就可以不管了嗎, 隊(duì)列將消息分配給消費(fèi)者, 就可以不管了嗎, 當(dāng)然不是, 還需要使用一種應(yīng)答機(jī)制, 你可以將它和TCP協(xié)議的應(yīng)答報(bào)文機(jī)制和超時(shí)重傳進(jìn)行一個(gè)對(duì)比.
????????RabbitMQ 的消息應(yīng)答機(jī)制是一個(gè)確保消息在發(fā)送和接收過(guò)程中可靠性的重要手段。這種機(jī)制主要用于處理消費(fèi)者在處理消息時(shí)可能出現(xiàn)的異常情況,如消費(fèi)者在處理消息過(guò)程中宕機(jī),導(dǎo)致消息丟失。
????????RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,通常會(huì)將該消息標(biāo)記為已發(fā)送。然而,如果消費(fèi)者在處理消息的過(guò)程中發(fā)生宕機(jī),未處理的消息可能會(huì)丟失。為了保證消息在發(fā)送過(guò)程中不丟失,RabbitMQ 引入了消息應(yīng)答機(jī)制。
????????消息應(yīng)答機(jī)制的工作原理是:消費(fèi)者在接收到消息并且處理完該消息之后,會(huì)向 RabbitMQ 發(fā)送一個(gè)確認(rèn)信號(hào),告訴 RabbitMQ 它已經(jīng)處理了該消息,此時(shí) RabbitMQ 可以安全地將該消息從隊(duì)列中刪除。
RabbitMQ 提供了兩種消息應(yīng)答模式:
- 自動(dòng)應(yīng)答(Auto Acknowledgment):在這種模式下,一旦消息被消費(fèi)者接收,RabbitMQ 會(huì)立即將消息標(biāo)記為已被消費(fèi),而不需要消費(fèi)者明確地向 RabbitMQ 發(fā)送確認(rèn)。這種模式對(duì)消息的處理時(shí)機(jī)和可靠性要求不高,可以容忍一定程度的消息丟失。但是,如果消費(fèi)者在處理消息的過(guò)程中發(fā)生錯(cuò)誤,消息仍然會(huì)從隊(duì)列中刪除,這可能導(dǎo)致消息丟失。
- 手動(dòng)應(yīng)答(Manual Acknowledgment):在手動(dòng)應(yīng)答模式下,消費(fèi)者在處理完消息之后,需要向 RabbitMQ 發(fā)送明確的確認(rèn)信號(hào)。這種模式下,消費(fèi)者可以更精確地控制消息的刪除時(shí)機(jī),只有在確認(rèn)消息已經(jīng)成功處理后才通知 RabbitMQ 刪除消息。這有助于防止因消費(fèi)者處理錯(cuò)誤或宕機(jī)而導(dǎo)致的消息丟失。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
????????在實(shí)際使用中,可以根據(jù)應(yīng)用的需求和消息的重要性來(lái)選擇適合的消息應(yīng)答模式。對(duì)于要求消息可靠傳遞的場(chǎng)景,建議使用手動(dòng)應(yīng)答模式;而對(duì)于對(duì)消息丟失容忍度較高的場(chǎng)景,可以選擇自動(dòng)應(yīng)答模式以提高處理效率。
? ? ? ? ?對(duì)于自動(dòng)應(yīng)答, 你想想,你有沒(méi)有在什么地方見過(guò)?? 那肯定見過(guò)呀, 就在我們的api解析中, basicConsume的構(gòu)造方法中, 存在一個(gè)參數(shù), 名為AutoAck, 也就是Auto acknowledgement, 自動(dòng)確認(rèn). 此時(shí), 如果你設(shè)置為true, 那么就表明他是自動(dòng)確認(rèn), 當(dāng)隊(duì)列將消息發(fā)送給消費(fèi)者, 只要消費(fèi)者接收到消息之后, RabbitMQ會(huì)立即將消息標(biāo)記為已消費(fèi), 然后刪除.????????
? ? ? ? 但是這樣不安全, 得換一個(gè)更安全的方法 : 手動(dòng)應(yīng)答.
? ? ? ? 開啟消息應(yīng)答(手動(dòng)) 你首先需要設(shè)置消費(fèi)者的消費(fèi)autoack為false? :
? ? ? ? 官方有這樣一句話描述basicAck:
? ? ? ? ?簡(jiǎn)而言之就是, 如果你沒(méi)有進(jìn)行手動(dòng)應(yīng)答, 雖然是一個(gè)很容易犯的錯(cuò)誤, 但是他會(huì)造成嚴(yán)重的后果, 也就是當(dāng)你的客戶端退出的時(shí)候消息會(huì)被重新推送(就像一些消息被無(wú)規(guī)則的推送), 但是RabbitMQ將會(huì)占用越來(lái)越多的內(nèi)存, 這是因?yàn)檫@些消息沒(méi)有得到正確的處理.
? ? ? ? 接下來(lái)我們看看basicAck這個(gè)方法的聲明:
// Acknowledge one or several received messages.
basicAck(long deliveryTag, boolean multiple)
參數(shù)解析:
- deliveryTag: 這是一個(gè)長(zhǎng)整型(
long
)參數(shù),代表要確認(rèn)的消息的投遞標(biāo)簽(delivery tag)。投遞標(biāo)簽是 RabbitMQ 在發(fā)送消息給消費(fèi)者時(shí)附帶的,用于唯一標(biāo)識(shí)這個(gè)消息。通過(guò)確認(rèn)特定的投遞標(biāo)簽,消費(fèi)者可以告訴 RabbitMQ 它已經(jīng)處理了哪個(gè)消息。 - multiple :?這是一個(gè)布爾型(
boolean
)參數(shù),指示是否確認(rèn)一個(gè)投遞標(biāo)簽范圍內(nèi)的多個(gè)消息。如果?multiple
?設(shè)置為?true
,則 RabbitMQ 會(huì)將投遞標(biāo)簽小于或等于指定?deliveryTag
?的所有未確認(rèn)消息標(biāo)記為已確認(rèn)。如果?multiple
?設(shè)置為?false
,則僅確認(rèn)具有指定?deliveryTag
?的單個(gè)消息。
????????那么, 這個(gè)deliveryTag和multiple從哪里來(lái)? 還記得處理接口DeliverCallback 嗎, 每次消息隊(duì)列向這個(gè)消費(fèi)者發(fā)送消息, 消費(fèi)者就會(huì)調(diào)用這個(gè)接口.
? DeliverCallback
?在 RabbitMQ 的 Java 客戶端中是一個(gè)回調(diào)接口,用于處理從 RabbitMQ 隊(duì)列接收到的消息。當(dāng) RabbitMQ 服務(wù)器向消費(fèi)者發(fā)送消息時(shí),它會(huì)調(diào)用這個(gè)回調(diào)接口,并將消息作為參數(shù)傳遞給?DeliverCallback
?的實(shí)現(xiàn)方法。
????????具體來(lái)說(shuō),DeliverCallback
?的實(shí)現(xiàn)方法接收兩個(gè)參數(shù):
consumerTag
:這是一個(gè)唯一標(biāo)識(shí)消費(fèi)者的標(biāo)簽,用于在多個(gè)消費(fèi)者之間區(qū)分不同的消費(fèi)實(shí)例, 是隊(duì)列發(fā)送給消費(fèi)者的時(shí)候自動(dòng)為消費(fèi)者分配的。delivery
:這是一個(gè)?Delivery
?對(duì)象,它包含了從 RabbitMQ 接收到的消息的內(nèi)容以及其他相關(guān)信息,如消息的包體(body)、消息的頭部(headers)、消息的投遞標(biāo)簽(delivery tag)等。
? ?DeliverCallback
?不是一個(gè)緩存。它僅僅是一個(gè)回調(diào)函數(shù),用于實(shí)時(shí)處理從 RabbitMQ 服務(wù)器接收到的消息。每當(dāng)有新消息到達(dá)時(shí),RabbitMQ 就會(huì)調(diào)用這個(gè)回調(diào)函數(shù),并將消息傳遞給它。因此,你的消費(fèi)者代碼需要在?DeliverCallback
?的實(shí)現(xiàn)中編寫處理消息的邏輯。
? ? ? ?例如,在上面的代碼示例中,當(dāng)接收到消息時(shí),DeliverCallback
?的實(shí)現(xiàn)會(huì)打印出消息內(nèi)容,模擬一些處理過(guò)程(在這個(gè)例子中是等待兩秒),然后發(fā)送一個(gè)確認(rèn)信號(hào)給 RabbitMQ,告訴它消息已經(jīng)被成功處理。
? ? ? ?需要注意的是,DeliverCallback
?的實(shí)現(xiàn)應(yīng)該盡可能快地處理消息并發(fā)送確認(rèn)信號(hào),以避免消息在隊(duì)列中堆積。如果處理消息的過(guò)程非常耗時(shí),或者有可能失敗,你可能需要考慮使用更復(fù)雜的錯(cuò)誤處理機(jī)制,比如重試邏輯、死信隊(duì)列等。
channel.basicConsume(TASK_QUEUEN_NAME, false, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("c1 消費(fèi)者接收到 取消接口消費(fèi)回調(diào)邏輯");}});
? ? ? ? 至于multiple, 批量應(yīng)答以減少網(wǎng)絡(luò)擁堵:?
處理消息:
????????如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或TCP連接丟失),導(dǎo)致消息未發(fā)送ACK確認(rèn),RabbitMQ將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確保不會(huì)丟失任何消息
發(fā)布確認(rèn)?
? ? ? ? 為了保證消息被安全的發(fā)送給broker, 也就是RabbitMQ隊(duì)列, 你應(yīng)該需要使用到一些策略, 來(lái)讓發(fā)布確認(rèn)生效.
? ? ? ? 開啟發(fā)布確認(rèn)模式
Channel channel = connection.createChannel();
channel.confirmSelect();
? ? ? ? 發(fā)布確認(rèn)模式是, AMQP 0.9.1 協(xié)議對(duì)于RabbitMQ的擴(kuò)展, 所以發(fā)布確認(rèn)模式不是默認(rèn)啟動(dòng)的, 發(fā)布確認(rèn)需要再channel?頻道開啟, 使用上述的confirmSelect()方法來(lái)開啟發(fā)布確認(rèn).
? ? ? ? 開啟發(fā)布確認(rèn)之后, producer每次發(fā)送消息之后, 都會(huì)遵循相應(yīng)的確認(rèn)策略, 可以單個(gè)確認(rèn), 也可以批量確認(rèn), 下面是發(fā)布確認(rèn)的一些常用確認(rèn)方法:
- waitForConfirms()
這個(gè)方法會(huì)阻塞當(dāng)前線程,直到自上次調(diào)用此方法以來(lái)發(fā)布的所有消息都被Broker確認(rèn)(ack)或拒絕(nack)。如果沒(méi)有設(shè)置超時(shí)時(shí)間,它可能會(huì)無(wú)限期地等待,直到所有消息都被處理。
返回類型是boolean
,但在大多數(shù)情況下,此方法可能會(huì)因?yàn)樽枞环祷厝魏沃?。?shí)際上,其返回值的意義可能取決于具體的RabbitMQ客戶端庫(kù)實(shí)現(xiàn),但通常這種同步等待方法不會(huì)使用其返回值來(lái)進(jìn)行流控制或錯(cuò)誤處理。
2.?waitForConfirms(long timeout)
與上一個(gè)方法類似,這個(gè)方法也會(huì)阻塞當(dāng)前線程,等待Broker對(duì)消息的確認(rèn)或拒絕。但是,它接受一個(gè)超時(shí)參數(shù)timeout
,表示等待的最大時(shí)間(以毫秒為單位)。如果在指定的超時(shí)時(shí)間內(nèi)Broker沒(méi)有對(duì)所有消息進(jìn)行確認(rèn)或拒絕,那么該方法將停止等待并返回。
返回類型是boolean
,但同樣,返回值的意義可能取決于具體的RabbitMQ客戶端庫(kù)實(shí)現(xiàn)。通常,如果所有消息都在超時(shí)前得到了確認(rèn),則返回true
;如果超時(shí)了,則返回false
。
3.?waitForConfirmsOrDie()
這個(gè)方法的行為與waitForConfirms()
類似,也會(huì)阻塞當(dāng)前線程,等待Broker對(duì)所有消息進(jìn)行確認(rèn)或拒絕。但是,如果Broker沒(méi)有對(duì)所有消息進(jìn)行確認(rèn)或拒絕,那么這個(gè)方法不會(huì)返回,而是會(huì)拋出異常(通常是運(yùn)行時(shí)異常),導(dǎo)致當(dāng)前線程終止。
由于這個(gè)方法可能導(dǎo)致線程終止,因此它通常用于那些對(duì)消息確認(rèn)有嚴(yán)格要求的場(chǎng)景,并且愿意在消息未得到確認(rèn)時(shí)讓整個(gè)程序失敗。
4.?waitForConfirmsOrDie(long timeout)
這個(gè)方法結(jié)合了waitForConfirms(long timeout)
和waitForConfirmsOrDie()
的特點(diǎn)。它會(huì)在指定的超時(shí)時(shí)間內(nèi)等待Broker對(duì)所有消息進(jìn)行確認(rèn)或拒絕。如果超時(shí)時(shí)間到了,而Broker還沒(méi)有對(duì)所有消息進(jìn)行確認(rèn)或拒絕,那么這個(gè)方法會(huì)拋出異常,導(dǎo)致當(dāng)前線程終止。
這種方法在需要確保消息被處理但又不想無(wú)限期等待的情況下非常有用。它允許設(shè)置一個(gè)合理的超時(shí)時(shí)間,以便在消息處理失敗時(shí)能夠及時(shí)地采取其他措施。
單獨(dú)的發(fā)送消息
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.waitForConfirmsOrDie(5_000);
}
? ? ? ? ?上面的例子中我們發(fā)送了一個(gè)消息, 然后等待他的確認(rèn)(waitForConfirmsOrDie(5_000)), 這個(gè)方法將會(huì)再消息得到隊(duì)列的確認(rèn)之后返回, 如果消息沒(méi)有在指定time內(nèi)確認(rèn), 或者是由于某些原因隊(duì)列無(wú)法返回確認(rèn)消息(比如網(wǎng)絡(luò)原因) , 那么該方法就會(huì)拋出異常, 這種異常的處理一般是記錄日志, 或者重新將消息發(fā)送.
? ? ? ? 不同的客戶端庫(kù)擁有不同的方法區(qū)同步處理發(fā)布者確認(rèn)模式, 所以確保仔細(xì)閱讀你所使用的客戶端的文件.
? ? ? ? 缺點(diǎn):?
? ? ? ? 這種方法雖然是很簡(jiǎn)便的, 但是也有一些主要的缺點(diǎn), 它大大降低了發(fā)布者發(fā)布的效率, 因?yàn)橐粋€(gè)消息的確認(rèn), 阻止了發(fā)布隨后將要發(fā)布的所有消息.這種方法, 將不能提供每秒發(fā)送幾百條消息的吞吐量, 但是這種方法對(duì)于某一些應(yīng)用來(lái)說(shuō), 還是很不錯(cuò)的, 足夠支持一個(gè)應(yīng)用了.
批量發(fā)布?
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}
}
if (outstandingMessageCount > 0) {channel.waitForConfirmsOrDie(5_000);
}
? ? ? ? 對(duì)比上面的一次性發(fā)布確認(rèn), 可以看到這個(gè)java代碼有很大的不同, 首先, 他不并不是每一次循環(huán)都進(jìn)行一個(gè)等待確認(rèn), 而是當(dāng)?outstandingMessageCount == batchSize 這個(gè)條件成立再進(jìn)行確認(rèn).?
? ? ? ? 等待批量發(fā)送的消息被確認(rèn), 這個(gè)提高了吞吐量(對(duì)比于單獨(dú)確認(rèn)), 差不多時(shí)單獨(dú)確認(rèn)的20 ~ 30倍的效率提升, 但是他的一個(gè)缺點(diǎn)就是, 我們不能明確知道在失敗的情況中, 是什么原因造成這種失敗. 所以我們需要讓整個(gè)批量發(fā)送維護(hù)在內(nèi)存中來(lái)記錄一些有用的東西, 或者重新發(fā)送該消息, 并且這種方法依然是同步的, 也就是在等待確認(rèn)的時(shí)候, 會(huì)阻塞當(dāng)前線程, 也就會(huì)阻止當(dāng)前線程繼續(xù)publish消息.
異步確認(rèn)?
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {// code when message is confirmed
}, (sequenceNumber, multiple) -> {// code when message is nack-ed
});
? ? ? ? ?broker異步的確認(rèn)發(fā)送過(guò)來(lái)的消息, 僅僅只需要在客戶端上注冊(cè)一個(gè)回調(diào)函數(shù), 來(lái)監(jiān)視這些確認(rèn)信息.
? ? ? ? 這里有兩個(gè)回調(diào), 一個(gè)是已經(jīng)確認(rèn)的消息, 一個(gè)是被拒絕的消息(你可以理解為被RabbitMQ丟棄的消息), 每一次回調(diào)都有兩個(gè)參數(shù):
- sequenceNumber: 序列號(hào), 這個(gè)序列號(hào)碼用來(lái)標(biāo)記被確認(rèn)或者被拒絕的消息,?
- multiple : boolean類型的數(shù)據(jù), 如果為false, 那么僅僅是一個(gè)消息被確認(rèn)或者拒絕. 如果為true, 所有的小于等于sequenceNumber的消息都會(huì)被確認(rèn)或者拒絕.
? ? ? ? 每個(gè)消息在發(fā)布之前, 你可以通過(guò)下面的方法來(lái)獲取到序列號(hào):
int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
? ? ? ? ?你可以使用這個(gè)序列號(hào)來(lái)找到對(duì)應(yīng)的被拒絕或者是被確認(rèn)的消息, 然后做出相關(guān)的處理操作. 但是在此之前, 你應(yīng)該首先維護(hù)一個(gè) key -value 的map, 以便記錄sequenceNumber和對(duì)應(yīng)消息的關(guān)聯(lián).
? ? ? ? 下面是一些代碼案例:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
? ? ? ? ?如何使用這個(gè)ConcurrentSkipListMap?
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);confirmed.clear();} else {outstandingConfirms.remove(sequenceNumber);}
};channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, sequenceNumber, multiple);cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
相關(guān)的實(shí)體類說(shuō)明
Delivery消息體?
?源碼:
public class Delivery {private final Envelope _envelope;private final AMQP.BasicProperties _properties;private final byte[] _body;public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {this._envelope = envelope;this._properties = properties;this._body = body;}public Envelope getEnvelope() {return this._envelope;}public AMQP.BasicProperties getProperties() {return this._properties;}public byte[] getBody() {return this._body;}
}
解析:
這個(gè)Delivery
類是在RabbitMQ的Java客戶端中使用的,用于封裝從RabbitMQ服務(wù)器接收到的消息。下面我將詳細(xì)解釋類中的參數(shù)和方法的作用:
參數(shù):
Envelope envelope
:- 作用:這個(gè)參數(shù)包含了消息的元數(shù)據(jù),比如消息的
deliveryTag
(投遞標(biāo)簽)、exchange
(交換機(jī))名稱、routingKey
(路由鍵)等。 - 重要字段:
deliveryTag
是一個(gè)唯一的標(biāo)識(shí)符,用于確認(rèn)(ack)或拒絕(nack)特定的消息。
- 作用:這個(gè)參數(shù)包含了消息的元數(shù)據(jù),比如消息的
AMQP.BasicProperties properties
:- 作用:這個(gè)參數(shù)包含了消息的附加屬性,比如消息的內(nèi)容類型、消息頭部、消息的優(yōu)先級(jí)、消息的發(fā)布和過(guò)期時(shí)間等。
- 重要字段:
contentType
表示消息的內(nèi)容類型(例如,text/plain
或application/json
),headers
可以包含自定義的鍵值對(duì),用于傳遞額外的信息。
byte[] body
:- 作用:這個(gè)參數(shù)包含了消息的實(shí)際內(nèi)容。在RabbitMQ中,消息的內(nèi)容被表示為字節(jié)數(shù)組,這意味著你可以發(fā)送任何類型的數(shù)據(jù),只要你能將其轉(zhuǎn)換為字節(jié)。
- 處理方式:通常,你需要根據(jù)
properties
中的contentType
字段來(lái)確定如何解析這個(gè)字節(jié)數(shù)組。例如,如果contentType
是text/plain
,你可能需要將其轉(zhuǎn)換為字符串;如果是application/json
,你可能需要將其解析為JSON對(duì)象。
方法:
public Envelope getEnvelope()
:- 作用:這個(gè)方法返回消息的元數(shù)據(jù)(
Envelope
對(duì)象)。通過(guò)這個(gè)方法,你可以獲取到消息的deliveryTag
,進(jìn)而在處理完消息后進(jìn)行確認(rèn)或拒絕操作。
- 作用:這個(gè)方法返回消息的元數(shù)據(jù)(
public AMQP.BasicProperties getProperties()
:- 作用:這個(gè)方法返回消息的附加屬性(
AMQP.BasicProperties
對(duì)象)。你可以使用這個(gè)方法獲取到消息的contentType
、headers
等字段,以便正確地解析和處理消息內(nèi)容。
- 作用:這個(gè)方法返回消息的附加屬性(
public byte[] getBody()
:- 作用:這個(gè)方法返回消息的實(shí)際內(nèi)容(字節(jié)數(shù)組)。你需要根據(jù)
getProperties()
返回的屬性來(lái)確定如何解析這個(gè)字節(jié)數(shù)組。
- 作用:這個(gè)方法返回消息的實(shí)際內(nèi)容(字節(jié)數(shù)組)。你需要根據(jù)
使用場(chǎng)景:
當(dāng)你在RabbitMQ的Java客戶端中消費(fèi)消息時(shí),RabbitMQ服務(wù)器會(huì)將消息封裝為一個(gè)Delivery
對(duì)象,并通過(guò)DeliverCallback
回調(diào)給你。你可以在回調(diào)中處理這個(gè)消息,例如解析消息內(nèi)容、執(zhí)行業(yè)務(wù)邏輯,并在處理完后通過(guò)channel.basicAck
方法發(fā)送確認(rèn)。
總之,Delivery
類及其參數(shù)和方法在RabbitMQ的Java客戶端中起到了封裝和傳遞消息的作用,使得開發(fā)者能夠方便地獲取和處理從RabbitMQ服務(wù)器接收到的消息。
Envelope元數(shù)據(jù)
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.rabbitmq.client;public class Envelope {private final long _deliveryTag;private final boolean _redeliver;private final String _exchange;private final String _routingKey;public Envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) {this._deliveryTag = deliveryTag;this._redeliver = redeliver;this._exchange = exchange;this._routingKey = routingKey;}public long getDeliveryTag() {return this._deliveryTag;}public boolean isRedeliver() {return this._redeliver;}public String getExchange() {return this._exchange;}public String getRoutingKey() {return this._routingKey;}public String toString() {StringBuilder sb = new StringBuilder();sb.append("Envelope(deliveryTag=").append(this._deliveryTag);sb.append(", redeliver=").append(this._redeliver);sb.append(", exchange=").append(this._exchange);sb.append(", routingKey=").append(this._routingKey);sb.append(")");return sb.toString();}
}
Envelope
?類是 RabbitMQ Java 客戶端庫(kù)中的一個(gè)類,它用于封裝從 RabbitMQ 服務(wù)器接收到的消息的元數(shù)據(jù)。這個(gè)類包含了關(guān)于消息的一些重要信息,比如投遞標(biāo)簽(deliveryTag
)、是否重新投遞(redeliver
)、交換機(jī)名稱(exchange
)和路由鍵(routingKey
)。
下面是?Envelope
?類中每個(gè)字段和方法的詳細(xì)解釋:
字段:
_deliveryTag
:- 類型:
long
- 描述:這是 RabbitMQ 為每條消息分配的唯一標(biāo)識(shí)符。當(dāng)消費(fèi)者處理完消息后,需要使用此標(biāo)簽來(lái)確認(rèn)(ack)或拒絕(nack)消息。
- 類型:
_redeliver
:- 類型:
boolean
- 描述:這個(gè)字段表示消息是否被重新投遞。如果消息之前被投遞過(guò)但因?yàn)槟承┰?#xff08;例如消費(fèi)者未正確確認(rèn))而被 RabbitMQ 重新放入隊(duì)列,這個(gè)字段就會(huì)是?
true
。
- 類型:
_exchange
:- 類型:
String
- 描述:這個(gè)字段表示消息最初被發(fā)送到的交換機(jī)名稱。交換機(jī)是 RabbitMQ 中用于路由消息的關(guān)鍵組件。
- 類型:
_routingKey
:- 類型:
String
- 描述:這個(gè)字段表示消息在發(fā)送時(shí)使用的路由鍵。路由鍵用于確定消息應(yīng)該被路由到哪個(gè)隊(duì)列。
- 類型:
方法:
getDeliveryTag()
:- 返回值:
long
- 描述:這個(gè)方法返回消息的投遞標(biāo)簽。
- 返回值:
isRedeliver()
:- 返回值:
boolean
- 描述:這個(gè)方法返回一個(gè)布爾值,表示消息是否被重新投遞。
- 返回值:
getExchange()
:- 返回值:
String
- 描述:這個(gè)方法返回消息最初被發(fā)送到的交換機(jī)名稱。
- 返回值:
getRoutingKey()
:- 返回值:
String
- 描述:這個(gè)方法返回消息在發(fā)送時(shí)使用的路由鍵。
- 返回值:
toString()
:- 返回值:
String
- 描述:這個(gè)方法覆蓋了?
Object
?類中的?toString
?方法,用于返回?Envelope
?對(duì)象的字符串表示形式,方便調(diào)試和日志記錄。
- 返回值:
使用場(chǎng)景:
當(dāng)消費(fèi)者從 RabbitMQ 接收消息時(shí),每條消息都會(huì)附帶一個(gè)?Envelope
?對(duì)象。消費(fèi)者可以使用?Envelope
?對(duì)象中的方法來(lái)獲取消息的元數(shù)據(jù),并根據(jù)這些信息來(lái)決定如何處理消息。例如,消費(fèi)者可以使用?getDeliveryTag
?方法獲取投遞標(biāo)簽,以便在處理完消息后發(fā)送確認(rèn)。
持久化?
? ? ? ? 我們上面已經(jīng)了解如何保證任務(wù)不會(huì)丟失, 即使消費(fèi)者連接丟失. 但是我們的任務(wù)依然會(huì)有丟失的風(fēng)險(xiǎn), 例如RabbitMQ服務(wù)器崩掉.
? ? ? ? 當(dāng)RabbitMQ服務(wù)器退出或者崩潰的時(shí)候, 他將會(huì)清除隊(duì)列和消息, ?除非你指定它不清除. 我們需要做兩件事情, 來(lái)保證即使是服務(wù)器崩潰也不會(huì)丟失數(shù)據(jù).
? ? ? ? 首先我們需要確保隊(duì)列會(huì)在RabbitMQ節(jié)點(diǎn)重啟之后存活, 要想做到這樣, 就需要聲明這個(gè)隊(duì)列為持久化模式
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
? ? ? ? 但是前面我們講到, 我們應(yīng)該避免對(duì)一個(gè)已經(jīng)存在的隊(duì)列重新定義, 因?yàn)樗粫?huì)生效, RabbitMQ是不允許使用不同的參數(shù)(durable, autoDelete,exclusive等)重新定義一個(gè)已經(jīng)存在的queue的. 即使這個(gè)語(yǔ)句本身是正確的. 如果你這樣做將會(huì)返回一個(gè)錯(cuò)誤信息.
? ? ? ? 你可以聲明一個(gè)不同名稱的queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
? ? ? ? 設(shè)置了以上的信息之后, 就可以保證此時(shí)這個(gè)隊(duì)列將不會(huì)在RabbitMQ重啟的時(shí)候丟失了, 但是這并不意味著RabbitMQ重啟之后, 消息不會(huì)丟失, 因?yàn)槟銉H僅只是持久化了queue, 而不是消息, 現(xiàn)在我們需要將我們的消息同時(shí)也標(biāo)記為持久化模式.
? ? ? ? ?如何將消息體設(shè)置為durable? 我們思考一下, 首先消息是從producer那邊publish過(guò)來(lái)的, 那么我們可不可以從basicPublish這個(gè)方法中找線索?? 還真被你找到了, 如下:?
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
? ? ? ? ?我們?cè)谕扑拖⒌臅r(shí)候, 可以設(shè)置一個(gè)屬性AMQP.BasicProperties props, 這個(gè)屬性定義如下:
public static class BasicProperties extends AMQBasicProperties {private String contentType;private String contentEncoding;private Map<String, Object> headers;private Integer deliveryMode;private Integer priority;private String correlationId;private String replyTo;private String expiration;private String messageId;private Date timestamp;private String type;private String userId;private String appId;private String clusterId;// 方法體 ... 省略}
這個(gè)BasicProperties
類繼承自AMQBasicProperties
,它擴(kuò)展了AMQP協(xié)議中消息屬性的基礎(chǔ)定義。AMQP(高級(jí)消息隊(duì)列協(xié)議)是一個(gè)開放、可靠、面向消息的中間件協(xié)議,它支持多種消息傳遞模式,包括發(fā)布/訂閱、點(diǎn)對(duì)點(diǎn)、請(qǐng)求/響應(yīng)等。下面是對(duì)該類中一些屬性和方法的基本解釋:
屬性:
- contentType:
- 用途:表示消息體的MIME類型,例如
text/plain
或application/json
。這有助于接收方知道如何解析消息內(nèi)容。
- 用途:表示消息體的MIME類型,例如
- contentEncoding:
- 用途:表示消息內(nèi)容使用的字符編碼,如
UTF-8
。
- 用途:表示消息內(nèi)容使用的字符編碼,如
- headers:
- 用途:一個(gè)自定義的鍵值對(duì)集合,允許發(fā)送方和接收方傳遞額外的信息。
- deliveryMode:
- 用途:定義消息的持久性。通常有兩個(gè)值:1表示非持久(消息不存儲(chǔ)在服務(wù)器上),2表示持久(消息存儲(chǔ)在服務(wù)器上,直到被消費(fèi))。
- priority:
- 用途:消息的優(yōu)先級(jí),用于在多個(gè)消息等待消費(fèi)時(shí)決定先處理哪個(gè)消息。
- correlationId:
- 用途:用于將回復(fù)與請(qǐng)求關(guān)聯(lián)起來(lái),通常用于RPC(遠(yuǎn)程過(guò)程調(diào)用)模式。
- replyTo:
- 用途:用于指定一個(gè)隊(duì)列名,用于接收對(duì)這條消息的回復(fù)。這在RPC場(chǎng)景中特別有用。
- expiration:
- 用途:定義消息的生存時(shí)間(TTL,Time-To-Live)。如果在這段時(shí)間內(nèi)消息沒(méi)有被消費(fèi),它將被丟棄。
- messageId:
- 用途:為消息提供一個(gè)全局唯一的標(biāo)識(shí)符。
- timestamp:
- 用途:表示消息創(chuàng)建或發(fā)送的時(shí)間。
- type:
- 用途:表示消息的類型或名稱,用于在多個(gè)不同類型的消息中進(jìn)行區(qū)分。
- userId:
- 用途:創(chuàng)建或發(fā)送消息的用戶ID。
- appId:
- 用途:標(biāo)識(shí)創(chuàng)建消息的應(yīng)用程序的名稱。
- clusterId:
- 用途:表示消息來(lái)自的RabbitMQ集群的ID。
方法:
通常,該類還會(huì)包含一些用于獲取和設(shè)置這些屬性的getter和setter方法,以及可能的其他方法用于序列化、反序列化或比較屬性等。具體的方法實(shí)現(xiàn)取決于這個(gè)類的完整源代碼。
使用場(chǎng)景:
這些屬性通常用于確保消息的正確路由、處理和持久化。例如,發(fā)送方可能會(huì)設(shè)置replyTo
和correlationId
以接收RPC回復(fù);或者設(shè)置priority
來(lái)確保某些關(guān)鍵消息優(yōu)先被處理。接收方則會(huì)使用這些屬性來(lái)正確地處理或路由消息。
? ? ? ? 其中有一個(gè)deliveryMode, 這個(gè)表示消息的持久化?.
? ? ? ? 所以我們第一個(gè)想到的就是通過(guò)構(gòu)建一個(gè)BasicProperties對(duì)象,然后設(shè)置里面的屬性,然后傳入給basicPublish, 如下:
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map; // ... 其他代碼 ... // 創(chuàng)建消息屬性
Map<String, Object> headers = new HashMap<>();
headers.put("custom-header", "some-value"); BasicProperties properties = new BasicProperties.Builder() .contentType("text/plain") .contentEncoding("UTF-8") .headers(headers) .deliveryMode(2) // 設(shè)置為持久化消息 .priority(1) .correlationId("my-correlation-id") .replyTo("my-reply-queue") .expiration("60000") // 消息將在60秒后過(guò)期 .messageId("my-message-id") .timestamp(new java.util.Date()) .type("my-message-type") .userId("my-user-id") .appId("my-app-id") .clusterId("my-cluster-id") .build(); // 獲取RabbitMQ的Channel
Channel channel = connection.createChannel(); // 發(fā)布消息到指定的交換機(jī)和路由鍵,并帶上屬性
String exchange = "my-exchange";
String routingKey = "my.routing.key";
String messageBody = "Hello, RabbitMQ!";
channel.basicPublish(exchange, routingKey, properties, messageBody.getBytes(StandardCharsets.UTF_8)); // ... 其他代碼 ...
? ? ? ?除此之外, 官方還提供了第二種方法, 你可以不用build一個(gè)BasicProperties,而是直接使用封裝好的AMQP.BasicProperties實(shí)例對(duì)象MessageProperties來(lái)直接傳入:
import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
? ? ? ? 下面是MessageProperties的原碼:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package com.rabbitmq.client;import java.util.Date;
import java.util.Map;public class MessageProperties {public static final AMQP.BasicProperties MINIMAL_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final AMQP.BasicProperties BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final AMQP.BasicProperties PERSISTENT_BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final AMQP.BasicProperties TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);public MessageProperties() {}
}
? ? ? ? 此處的消息持久化為最后一個(gè)PERSISTENT_TEXT_PLAIN, 使用的構(gòu)造方法為:
public BasicProperties(String contentType, String contentEncoding, Map<String, Object> headers, Integer deliveryMode, Integer priority, String correlationId, String replyTo, String expiration, String messageId, Date timestamp, String type, String userId, String appId, String clusterId) {this.contentType = contentType;this.contentEncoding = contentEncoding;this.headers = headers == null ? null : Collections.unmodifiableMap(new HashMap(headers));this.deliveryMode = deliveryMode;this.priority = priority;this.correlationId = correlationId;this.replyTo = replyTo;this.expiration = expiration;this.messageId = messageId;this.timestamp = timestamp;this.type = type;this.userId = userId;this.appId = appId;this.clusterId = clusterId;}
? ? ? ? 關(guān)于消息持久化的說(shuō)明:
? ? ? ? 將一個(gè)消息設(shè)置為持久化, 并不能完全保證消息不會(huì)丟失. 盡管它告訴RabbitMQ將message保存在硬硬盤, 當(dāng)RabbitMQ已經(jīng)接收一個(gè)消息, 但是還沒(méi)有被保存的時(shí)候, 仍然會(huì)有一段很短的時(shí)間窗口, 這段時(shí)間窗口如果發(fā)生事故, 也可能導(dǎo)致消息丟失.?
? ? ? ? 如果你想保證message的durable的正確性和有效性, 你可以參考Publish and confirm模式 :?Consumer Acknowledgements and Publisher Confirms | RabbitMQ
?發(fā)布訂閱
? ? ? ? 這個(gè)部分我們將做一些完全不一樣的事情 -- 我們將會(huì)把一個(gè)消息發(fā)送給多個(gè)消費(fèi)者, 這個(gè)模式就被稱為發(fā)布訂閱模式.?????????
? ? ? ? 為了用圖解壽命這個(gè)模式, 我們將會(huì)建立一個(gè)簡(jiǎn)單的日志系統(tǒng), 他將會(huì)包含兩個(gè)項(xiàng)目, 第一個(gè)會(huì)發(fā)送日志消息, 第二個(gè)會(huì)接受然后打印這些消息.
? ? ? ? 在這個(gè)日志系統(tǒng)中, 每一個(gè)接受者的副本都會(huì)收到消息, 因此我們可以啟動(dòng)一個(gè)接受者, 也可以稱為消費(fèi)者, and將這些log消息導(dǎo)向硬盤, 與此同時(shí), 我們將會(huì)跑起另外一個(gè)消費(fèi)者并且看到這些日志打印到屏幕上.
交換機(jī)
? ? ? ? 其實(shí)一個(gè)消息并不是直接傳遞給隊(duì)列的, 而是指定交換機(jī), 然后由交換機(jī)傳遞給對(duì)應(yīng)的隊(duì)列.
? ? ? ? 我們之前所構(gòu)造的例子中, 包含這三個(gè)部分:
- ?一個(gè)生產(chǎn)者來(lái)生產(chǎn)消息, 然后發(fā)給隊(duì)列
- 一個(gè)隊(duì)列, 這個(gè)隊(duì)列來(lái)轉(zhuǎn)發(fā)消息給消費(fèi)者
- 一個(gè)消費(fèi)者, 消費(fèi)者接受并處理來(lái)自隊(duì)列的消息
? ? ? ? RabbitMQ的核心消息模式, 是生產(chǎn)者永遠(yuǎn)都不會(huì)直接給隊(duì)列發(fā)送任何消息, 事實(shí)上大多數(shù)情況下, 生產(chǎn)者會(huì)并不知道它生產(chǎn)的消息將會(huì)被發(fā)送到哪個(gè)隊(duì)列.
? ? ? ? 相反, 生產(chǎn)者僅僅只能發(fā)送消息給交換機(jī), 一個(gè)交換機(jī)是一個(gè)很簡(jiǎn)單的實(shí)現(xiàn), 一方面它接受來(lái)自生產(chǎn)者的消息, 另外一方面,它將這些消息轉(zhuǎn)發(fā)給隊(duì)列. 交換機(jī)必定確切的知道它收到消息之后, 這個(gè)消息將會(huì)被發(fā)送到哪個(gè)隊(duì)列. 比如說(shuō)它是否會(huì)被添加到一個(gè)指定的隊(duì)列, 或者是是其他的隊(duì)列. 亦或是將其丟棄. 不管是哪種, 這些規(guī)則都是由交換機(jī)的類型決定
? ? ? ? 首先創(chuàng)建一個(gè)交換機(jī):?
? ? ? ? 然后給這個(gè)交換機(jī)綁定一個(gè)隊(duì)列, 如下:
? ? ? ? 可以看到這個(gè)test交換機(jī)綁定了一個(gè)test隊(duì)列, 綁定之后指定routingKey, 后期producer發(fā)送消息的時(shí)候可以通過(guò)exchangeName來(lái)指定交換機(jī), 然后通過(guò)routingKey來(lái)指定要傳入哪個(gè)隊(duì)列.
? ? ? ? ?那我可以將兩個(gè)交換機(jī)綁定的隊(duì)列, 并且將其指定的routingKey的值設(shè)置為一樣的嗎?
????????一個(gè)交換機(jī)確實(shí)可以綁定兩個(gè)隊(duì)列,并且這兩個(gè)綁定隊(duì)列的routingKey可以設(shè)置為一樣。但是,這樣做的話,當(dāng)消息使用這個(gè)特定的routingKey發(fā)送到交換機(jī)時(shí),交換機(jī)會(huì)將消息路由到這兩個(gè)隊(duì)列中,實(shí)現(xiàn)消息的廣播效果。
????????在實(shí)際應(yīng)用中,是否使用相同的routingKey取決于你的業(yè)務(wù)需求。如果你希望消息被發(fā)送到多個(gè)隊(duì)列進(jìn)行處理,那么可以設(shè)置相同的routingKey。但如果你希望根據(jù)不同的routingKey將消息路由到不同的隊(duì)列,以實(shí)現(xiàn)更細(xì)粒度的控制,那么就應(yīng)該為每個(gè)隊(duì)列設(shè)置不同的routingKey。
????????此外,需要注意的是,routingKey的匹配規(guī)則還受到交換機(jī)類型的影響。例如,在Direct Exchange中,routingKey必須與隊(duì)列的綁定鍵完全匹配;而在Topic Exchange中,routingKey可以與綁定鍵進(jìn)行模式匹配。因此,在設(shè)置routingKey時(shí),還需要考慮你使用的交換機(jī)類型。
? ? ? ? ?交換機(jī)的類型:
- Direct Exchange(直連交換機(jī)):
- 特點(diǎn):消息會(huì)傳送給綁定鍵(BindingKey)與消息的路由鍵(RoutingKey)完全匹配的那個(gè)隊(duì)列。
- 工作原理:在發(fā)送消息時(shí),需要指定一個(gè)RoutingKey。當(dāng)消息到達(dá)交換機(jī)時(shí),交換機(jī)會(huì)查找與這個(gè)RoutingKey完全匹配的BindingKey,并將消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的隊(duì)列。如果找不到匹配的隊(duì)列,消息則會(huì)被丟棄。
- 應(yīng)用場(chǎng)景:適用于需要精確匹配RoutingKey的場(chǎng)景,如簡(jiǎn)單的請(qǐng)求-響應(yīng)模型或者路由到特定服務(wù)或處理流程的隊(duì)列。
?????????在這個(gè)設(shè)置中,我們可以看到direct exchange X綁定了兩個(gè)隊(duì)列。第一個(gè)隊(duì)列用綁定密鑰橙色綁定,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定密鑰黑色,另一個(gè)綁定密鑰綠色。
????????在這樣的設(shè)置中,發(fā)布到交換機(jī)的帶有路由關(guān)鍵字橙色的消息將被路由到隊(duì)列Q1。路由關(guān)鍵字為黑色或綠色的消息將發(fā)送到Q2。所有其他消息都將被丟棄。
? ? ? ? 多次綁定:
? ? ? ? 使用同一個(gè)RoutingKey綁定多個(gè)隊(duì)列是完全合法的, 例如上圖, 我們可以給c1和c2這兩個(gè)隊(duì)列綁定同一個(gè)direct類型的交換機(jī), 并且使用同一個(gè)RoutingKey : black. 上圖這個(gè)案例中, 這個(gè)direct交換機(jī)的作用就類似于一個(gè)fanout交換機(jī).?
- Topic Exchange(主題交換機(jī)):
- 特點(diǎn):與Direct類型的交換器類似,也是將消息路由到RoutingKey與BindingKey匹配的隊(duì)列中,但它支持模糊匹配。
- 工作原理:BindingKey可以包含通配符(如
.
和*
),使得RoutingKey可以與多個(gè)BindingKey匹配。這樣,一個(gè)消息可以被路由到多個(gè)隊(duì)列中。 - 應(yīng)用場(chǎng)景:適用于需要將消息發(fā)送到一組相關(guān)的隊(duì)列的場(chǎng)景,如基于主題或模式的消息發(fā)布和訂閱。
- 使用: topic類型的交換機(jī), 其RoutingKey不能亂寫, 必須滿足一定的要求, 它必須要求是一個(gè)單詞列表, 單詞之間誰(shuí)用 點(diǎn)號(hào)分開. 例如 stock.usr.notice
- routingKey的匹配規(guī)則: * 可以代表一個(gè)單詞, 使用#代表多個(gè)單詞, 例如 *.test可以匹配 a.test和b.test, test.#可以匹配test.a.b和test.a.c
- Headers Exchange(頭交換機(jī)):
- 特點(diǎn):不依賴于路由鍵的匹配規(guī)則來(lái)路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。
- 工作原理:在綁定隊(duì)列和交換器時(shí),會(huì)制定一組鍵值對(duì)。當(dāng)發(fā)送消息到交換器時(shí),RabbitMQ會(huì)獲取到該消息的headers,并對(duì)比其中的鍵值對(duì)是否完全匹配隊(duì)列和交換器綁定時(shí)指定的鍵值對(duì)。如果匹配,則消息會(huì)被路由到該隊(duì)列中。
- 應(yīng)用場(chǎng)景:適用于需要根據(jù)消息內(nèi)容中的特定屬性進(jìn)行路由的場(chǎng)景,提供了更靈活的消息路由機(jī)制。
- Fanout Exchange(扇型交換機(jī)):
- 特點(diǎn):發(fā)布/訂閱的廣播模式,它會(huì)將發(fā)送到該交換機(jī)的消息發(fā)送到所有與該交換機(jī)綁定的隊(duì)列中。
- 工作原理:當(dāng)一個(gè)消息發(fā)送到扇形交換機(jī)時(shí),交換機(jī)會(huì)將消息分別發(fā)送給所有綁定到該交換機(jī)上的隊(duì)列,無(wú)論它們的RoutingKey或BindingKey是什么。
- 應(yīng)用場(chǎng)景:適用于需要將消息廣播到多個(gè)隊(duì)列的場(chǎng)景,如通知系統(tǒng)或需要多個(gè)服務(wù)或組件同時(shí)處理同一消息的情況。?
????????

? ? ? ? 創(chuàng)建交換機(jī), 可以通過(guò)RabbitMQ提供的web插件來(lái)生成 :
? ? ? ? 可以通過(guò)java client來(lái)生成:
channel.exchangeDeclare("logs", "fanout");
? ? ? ? 其聲明如下:
? ? ? ? ?現(xiàn)在我們就可以通過(guò)這個(gè)交換機(jī)來(lái)推送消息:
channel.basicPublish( "logs", "", null, message.getBytes());
? ? ? ? 但是有人可能會(huì)想起來(lái), 這和我們之前寫的不一樣, 我們之前沒(méi)有指定這個(gè)交換機(jī)name啊, 或者是指定了一個(gè)空字符串, 如下:
channel.basicPublish("", "hello", null, message.getBytes());
? ? ? ? 為什么它還是能夠指定到hello這個(gè)隊(duì)列??
? ? ? ? 那是因?yàn)?
????????The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
? ? ? ? 你指定的空串代表它的交換機(jī)為默認(rèn)交換機(jī), 默認(rèn)交換機(jī)是隊(duì)列在創(chuàng)建的時(shí)候, 已經(jīng)和隊(duì)列進(jìn)行綁定了, 這樣保證每個(gè)隊(duì)列能有一個(gè)初始化的默認(rèn)的交換機(jī). 如果你指定的是默認(rèn)交換機(jī), 那么這個(gè)routingKey就為你指定的隊(duì)列名字了.
? ? ? ? 并且你不能顯示的去讓隊(duì)列綁定默認(rèn)交換機(jī), 也不能讓隊(duì)列和默認(rèn)交換機(jī)解綁, 當(dāng)然, 默認(rèn)交換機(jī)也不能被刪除.
案例?
? ? ? ? 實(shí)現(xiàn)一個(gè)fanout交換機(jī), 實(shí)現(xiàn)一個(gè)生產(chǎn)者,? 兩個(gè)隊(duì)列, 兩個(gè)隊(duì)列bind到這個(gè)fanout交換機(jī), 創(chuàng)建兩個(gè)消費(fèi)者, 分別接受兩個(gè)隊(duì)列的消息.
實(shí)現(xiàn)一個(gè)生產(chǎn)者, 可以不斷地輸入數(shù)據(jù) :?
package fanoutExchangeTest;import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {Channel channel = RabbitMQUtil.getChannel();// declaring an exchange named logs and its type is fanoutchannel.exchangeDeclare("logs","fanout");// bind queuechannel.queueBind("queue1","logs", "logsToQueue1");channel.queueBind("queue2","logs", "logsToQueue2");// declaring two queues the one named queue1 and the other one named queue2
// channel.queueDeclare("queue1",true,true,false,null);
// channel.queueDeclare("queue2",true,true,false,null);// manage messageConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();// publish and confirmchannel.confirmSelect();// callback : successConfirmCallback success = (sequenceNumber,multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(sequenceNumber,true);longStringConcurrentNavigableMap.clear();} else {map.remove(sequenceNumber);}};// callback : failConfirmCallback fail = (sequenceNumber,multiple) -> {String body = map.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, sequenceNumber, multiple);success.handle(sequenceNumber,multiple);};// add non - sycn listenerchannel.addConfirmListener(success,fail);// publis codeScanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String msg = scanner.next();channel.basicPublish("logs", "testFanout",null, msg.getBytes());channel.waitForConfirmsOrDie(3000L);}}
}
創(chuàng)建兩個(gè)消費(fèi)者:
package fanoutExchangeTest;import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.io.IOException;public class Consumer1 {public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare("queue1",false,false,false,null);channel.basicConsume("queue1", false, (s, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [Consumer1] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}, s -> {System.out.println("nothing");});}
}
package fanoutExchangeTest;import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.io.IOException;public class Consumer2 {public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare("queue2",false,false,false,null);channel.basicConsume("queue2", false, (s, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [Consumer2] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}, s -> {System.out.println("nothing");});}
}
? ? ? ? 首先啟動(dòng)兩個(gè)消費(fèi)者, 然后啟動(dòng)生產(chǎn)者, 隨后輸入數(shù)據(jù), 輸出:
?
臨時(shí)隊(duì)列?
? ? ? ? ?有時(shí)候我們需要一些流動(dòng)性, 變化性很強(qiáng)的數(shù)據(jù), 就可以創(chuàng)建臨時(shí)隊(duì)列, 他有如下特性:
- 匿名性:臨時(shí)隊(duì)列通常沒(méi)有明確的名稱,而是由RabbitMQ服務(wù)器在創(chuàng)建時(shí)自動(dòng)分配一個(gè)唯一的名稱。這使得它們非常適合于一次性使用或短暫存在的場(chǎng)景。
- 自動(dòng)刪除:當(dāng)最后一個(gè)消費(fèi)者斷開連接時(shí),臨時(shí)隊(duì)列會(huì)自動(dòng)被刪除。這種特性使得隊(duì)列的管理變得簡(jiǎn)單,因?yàn)槟恍枰謩?dòng)跟蹤和刪除不再使用的隊(duì)列。
- 非持久化:臨時(shí)隊(duì)列通常也是非持久化的,這意味著它們不會(huì)存儲(chǔ)在磁盤上,因此當(dāng)RabbitMQ服務(wù)器重啟時(shí),這些隊(duì)列及其內(nèi)容會(huì)丟失。
- 使用場(chǎng)景:臨時(shí)隊(duì)列在RPC(遠(yuǎn)程過(guò)程調(diào)用)場(chǎng)景中特別有用,其中客戶端發(fā)送一個(gè)請(qǐng)求并等待一個(gè)響應(yīng)。在這種情況下,客戶端可以創(chuàng)建一個(gè)臨時(shí)隊(duì)列來(lái)接收響應(yīng),一旦響應(yīng)被接收,隊(duì)列就可以被自動(dòng)刪除。
- 創(chuàng)建方式:在代碼中,您可以使用RabbitMQ的客戶端庫(kù)來(lái)創(chuàng)建臨時(shí)隊(duì)列。例如,在RabbitMQ的Java客戶端中,您可以通過(guò)不指定隊(duì)列名稱,并設(shè)置某些參數(shù)來(lái)創(chuàng)建一個(gè)臨時(shí)隊(duì)列。當(dāng)您聲明一個(gè)隊(duì)列但不提供名稱時(shí),RabbitMQ會(huì)自動(dòng)為您生成一個(gè)唯一的名稱。
- 注意事項(xiàng):雖然臨時(shí)隊(duì)列提供了便利性和簡(jiǎn)化管理的好處,但您也應(yīng)該意識(shí)到它們的局限性。由于它們是非持久化的,并且會(huì)在最后一個(gè)消費(fèi)者斷開連接時(shí)自動(dòng)刪除,因此不適合用于需要長(zhǎng)期保存數(shù)據(jù)或需要在多個(gè)會(huì)話之間共享數(shù)據(jù)的場(chǎng)景。
下面是如何進(jìn)行獲取一個(gè)臨時(shí)隊(duì)列:
String queueName = channel.queueDeclare().getQueue();
死信隊(duì)列
????????死信隊(duì)列(Dead-Letter Queue,DLQ)是一種特殊的隊(duì)列,用于存放無(wú)法被正常處理的消息。這些消息可能由于各種原因,如消息被拒絕、消息過(guò)期、隊(duì)列達(dá)到最大長(zhǎng)度、消息格式錯(cuò)誤或處理過(guò)程中拋出異常等,無(wú)法被消費(fèi)者正常消費(fèi)。通過(guò)將這些無(wú)法處理的消息放入死信隊(duì)列,可以防止它們阻塞正常的消息處理流程,同時(shí)也方便進(jìn)行后續(xù)的問(wèn)題排查和處理。
????????死信隊(duì)列在消息中間件中是一個(gè)重要的概念,它增強(qiáng)了消息的可靠性,有效避免了因消息處理失敗而引起的數(shù)據(jù)丟失和系統(tǒng)異常。此外,死信隊(duì)列中的消息可以進(jìn)行特殊處理,如記錄日志、統(tǒng)計(jì)失敗次數(shù)、發(fā)送告警通知等,有助于監(jiān)控系統(tǒng)的健康狀況,并對(duì)處理失敗的消息進(jìn)行進(jìn)一步的分析和處理。
????????值得注意的是,死信隊(duì)列通常需要手動(dòng)維護(hù),而不是自動(dòng)清空,因?yàn)樗佬畔⑼枰斯し治龊吞幚?。在?shí)際應(yīng)用中,可以通過(guò)查詢、導(dǎo)出和重新發(fā)送進(jìn)入死信隊(duì)列的死信消息,按需管理死信消息,避免消息漏處理。
?
? ?消費(fèi)者C1代碼:
package DeadQueue;import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.sql.SQLOutput;
import java.util.HashMap;
import java.util.Map;/*** 消費(fèi)者1*/
public class Consumer1 {// 有兩個(gè)交換機(jī)public static final String normal_exchange = "normal_exchange";// 死信交換機(jī)public static final String dead_exchange = "dead_exchange";// 普通隊(duì)列public static final String normal_queue = "normal_queue";// 死信隊(duì)列public static final String dead_queue = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();// 聲明兩個(gè)交換機(jī): 死信交換機(jī)和普通交換機(jī)channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT);Map<String, Object> map = new HashMap<>();// 過(guò)期時(shí)間map.put("x-message-ttl",10000);// 正常隊(duì)列設(shè)置死信交換機(jī)map.put("x-dead-letter-exchange",dead_exchange);// 設(shè)置死信消息的RoutingKeymap.put("x-dead-letter-routing-key", "lisi");// 聲明兩個(gè)隊(duì)列channel.queueDeclare(normal_queue,false,false,false,map); // 聲明將死信發(fā)送給死信交換機(jī)channel.queueDeclare(dead_queue,false,false,false,null);// 綁定交換機(jī)和隊(duì)列// 綁定普通隊(duì)列和消費(fèi)者1channel.queueBind(normal_queue,normal_exchange,"zhangsan");channel.queueBind(dead_queue,dead_exchange,"lisi");DeliverCallback deliverCallback = (tag,msg) -> {System.out.println("consumer1接收到消息: " + new String(msg.getBody(),"UTF-8"));};channel.basicConsume(normal_queue,true, deliverCallback, tag-> {});}
}
?消費(fèi)者C2代碼:
package DeadQueue;import Util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;/*** 消費(fèi)者1*/
public class Consumer2 {// 死信隊(duì)列public static final String dead_queue = "dead_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();DeliverCallback deliverCallback = (tag,msg) -> {System.out.println("consumer2接收到消息: " + new String(msg.getBody(),"UTF-8"));};channel.basicConsume(dead_queue,true, deliverCallback, tag-> {});}
}
?生產(chǎn)者代碼:
package DeadQueue;import Util.RabbitMQUtil;
import com.rabbitmq.client.Channel;public class Producer {// 定義一個(gè)普通交換機(jī)即可public static final String normal_exchange = "normal_exchange";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();// 死信消息for (int i = 0; i < 10; i++) {String msg = "info" + i;channel.basicPublish(normal_exchange,"zhangsan", null/* 這里消息的過(guò)期時(shí)間已經(jīng)在隊(duì)列聲明的時(shí)候設(shè)置*/, msg.getBytes());}}
}
首先啟動(dòng)消費(fèi)者C1, 讓其創(chuàng)建相關(guān)隊(duì)列和交換機(jī), 隨后關(guān)閉消費(fèi)者C1模擬其崩潰, 然后開啟生產(chǎn)者, 發(fā)現(xiàn)normal隊(duì)列里面產(chǎn)生了10條無(wú)法被消費(fèi)消息;?
?隨后開啟消費(fèi)者C2, 來(lái)消費(fèi)死信隊(duì)列的消息:
?
? ? ? ? ?當(dāng)然, 一個(gè)消息被放入死信隊(duì)列當(dāng)然不止 設(shè)置過(guò)期時(shí)間這一種, 還可以設(shè)置隊(duì)列最大長(zhǎng)度, 當(dāng)普通隊(duì)列的長(zhǎng)度到達(dá)最大值的時(shí)候, 這個(gè)時(shí)候額外的消息會(huì)被放入死信隊(duì)列
Map<String, Object> props = new HashMap<>();// 過(guò)期時(shí)間// props.put("x-message-ttl",10000);// 設(shè)置最大長(zhǎng)度為6props.put("x-max-length",6);
? ? ? ? ?當(dāng)然你也可以主動(dòng)拒絕消息, 而不是被動(dòng)的觸發(fā)轉(zhuǎn)發(fā)給死信隊(duì)列.
如何設(shè)置主動(dòng)拒絕?
// 其他代碼DeliverCallback deliverCallback = (tag,msg) -> {String getMsg = new String(msg.getBody(), StandardCharsets.UTF_8);if (getMsg.equals("info")) {System.out.println("消息:" + getMsg + " 被拒絕");// 拒絕策略需要開啟手動(dòng)應(yīng)答// 第二個(gè)參數(shù)設(shè)置為false表示 不會(huì)重新將此消息返回原來(lái)的隊(duì)列.channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);} else {System.out.println("consumer1接收到消息: " + getMsg);}};channel.basicConsume(normal_queue,false, deliverCallback, tag-> {});