做網(wǎng)站空間放哪些文件夾網(wǎng)頁模板圖片
文章目錄
- 一、路由模式原理
- 二、多重綁定
- 三、路由模式實戰(zhàn)
- 1、消費者代碼
- 2、生產者代碼
- 3、運行結果分析
本文參考
尚硅谷RabbitMQ教程丨快速掌握MQ消息中間件rabbitmq
RabbitMQ 詳解
Centos7環(huán)境安裝Erlang、RabbitMQ詳細過程(配圖)
一、路由模式原理
使用發(fā)布訂閱模式時,所有消息都會發(fā)送到綁定的隊列中,但很多時候,不是所有消息都要無差別的發(fā)布到所有隊列中。比如,我們希望將日志消息寫入磁盤的程序僅接收嚴重錯誤(errros),而不存儲那些警告(warning)或信息(info)日志消息以避免浪費磁盤空間。Fanout這種交換類型并不能給我們帶來很大的靈活性-它只能進行無意識的廣播,此時就需要使用路由模式 (Routing)完成這一需求。其特點如下:
- 每個隊列綁定路由關鍵字 RoutingKey
- 生產者將帶有 RoutingKey 的消息發(fā)送給交換機,交換機根據(jù) RoutingKey 轉發(fā)到指定隊列。
- 路由模式使用 direct 交換機。
回顧bindings的概念,綁定是交換機和隊列之間的橋梁關系。也可以這么理解:隊列只對它綁定的交換機的消息感興趣。綁定用參數(shù):routingKey
來表示也可稱該參數(shù)為binding key
,創(chuàng)建綁定我們用代碼:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
在上圖實例中,可以看到交換機 X 綁定了兩個隊列,綁定類型是direct。隊列Q1綁定鍵為orange,隊列Q2綁定鍵有兩個:一個綁定鍵為black,另一個綁定鍵為green。這說明即使是同一對交換機和隊列也可以有多個routingKey
。
在這種綁定情況下,生產者發(fā)布消息到exchange上,綁定鍵為orange的消息會被發(fā)布到隊列Q1。綁定鍵為black和green和的消息會被發(fā)布到隊列Q2,其他消息類型的消息將被丟棄。
二、多重綁定
如果exchange的綁定類型是direct,但是它綁定的多個隊列的key如果都相同,在這種情況下雖然綁定類型是direct但是它表現(xiàn)的就和fanout有點類似了,就跟廣播差不多,如上圖所示。
三、路由模式實戰(zhàn)
根據(jù)以上對應關系進行代碼編寫,其中包括兩個隊列:disk和console,一個生產者,兩個消費者,一個direct類型的交換,與disk隊列通過error進行綁定,與console隊列通過info和warning兩個routingkey進行綁定。最終進行測試,查看結果。
1、消費者代碼
同樣的,代碼整體邏輯與之前的案例沒什么不同,只是交換機類型變更,以及需要指定routingkey參數(shù)。
消費者01代碼如下:
/*** Description: 路由模式消費者01*/
public class ReceiveLogsDirect01 {//設置要創(chuàng)建的交換機的名稱private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//創(chuàng)建fanout交換機/*** 參數(shù)1:交換機名* 參數(shù)2:交換機類型,本次設置為direct* 參數(shù)3:交換機是否持久化*/channel.exchangeDeclare(EXCHANGE_NAME, "direct", false);channel.queueDeclare("disk", false, false, false, null);//將交換機與隊列進行綁定(binding)/*** 參數(shù)1:隊列名* 參數(shù)2:交換機名* 參數(shù)3:路由關鍵字,發(fā)布訂閱模式寫""空串即可*/channel.queueBind("disk", EXCHANGE_NAME, "error");//接收消息channel.basicConsume("disk", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogsDirect01控制臺打印接收到的消息: " + message);}});}
}
消費者02代碼如下:
/*** Description: 路由模式消費者02*/
public class ReceiveLogsDirect02 {//設置要創(chuàng)建的交換機的名稱private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct", false);channel.queueDeclare("console", false, false, false, null);//多重綁定channel.queueBind("console", EXCHANGE_NAME, "info");channel.queueBind("console", EXCHANGE_NAME, "warning");//接收消息channel.basicConsume("console", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogsDirect02控制臺打印接收到的消息: " + message);}});}
}
2、生產者代碼
/*** Description: 路由模式生產者*/
public class EmitLogDirect {//交換機名稱private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//準備3條消息String message1 = "這是一條info信息";String message2 = "這是一條warning信息";String message3 = "這是一條error信息";channel.basicPublish(EXCHANGE_NAME, "info", null, message1.getBytes());channel.basicPublish(EXCHANGE_NAME, "warning", null, message2.getBytes());channel.basicPublish(EXCHANGE_NAME, "error", null, message3.getBytes());//關閉資源channel.close();}
}
3、運行結果分析
先將兩個消費者運行,因為它們負責聲明交換機創(chuàng)建隊列以及綁定關系,再啟動生產者發(fā)送消息,此時會看到message1和message2發(fā)給了ReceiveLogsDirect02
,而message3發(fā)送給了ReceiveLogsDirect01
在此過程中,生產者在發(fā)送消息時指定了EXCHANGE_NAME
,無論是什么消息都先發(fā)送給direct類型的交換機,它的名字設置為了direct_logs
,然后交換機會根據(jù)routingkey的路由規(guī)則決定該消息轉發(fā)給哪個隊列,以及是否要丟棄。假如此時我們發(fā)送一個routingkey為debug的消息,交換機由于找不到轉發(fā)目標會將該消息丟棄。