哪個平臺買東西最便宜seo網(wǎng)站搜索優(yōu)化
RabbitMQ的6種工作模式
官方文檔:
http://www.rabbitmq.com/
https://www.rabbitmq.com/getstarted.html
RabbitMQ 常見的 6 種工作模式:
1、simple簡單模式
1)、消息產(chǎn)生后將消息放入隊列。
2)、消息的消費者監(jiān)聽消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除。
3)、存在的問題:消息可能沒有被消費者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失。
4)、應(yīng)用場景:聊天(中間有一個過度的服務(wù)器)。
5)、代碼實現(xiàn):
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>rabbitmq-java</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies></project>
工具類
package com.example;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {// 連接rabbitmq服務(wù),共享一個工廠對象private static ConnectionFactory factory;static {factory=new ConnectionFactory();//設(shè)置rabbitmq屬性factory.setHost("127.0.0.1");factory.setUsername("zsx242030");factory.setPassword("zsx242030");factory.setVirtualHost("/");factory.setPort(5672);}public static Connection getConnection(){Connection connection=null;try {//獲取連接對象connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}
消息提供者
package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//獲取連接對象Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//通過通道創(chuàng)建隊列,后續(xù)所有的操作都是基于channel實現(xiàn)(隊列也可以由消費方創(chuàng)建)channel.queueDeclare("queue1", false, false, false, null);//向隊列中發(fā)送消息channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());//斷開連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者
package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//監(jiān)聽隊列中的消息(消費的是隊列,而不是交換機(jī))channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消費者獲得消息為:Hello RabbitMQ!!!
2、work工作模式(資源的競爭)
1)、消息產(chǎn)生者將消息放入隊列,消費者可以有多個,消費者1,消費者2,同時監(jiān)聽同一個隊列。消息被消費,
C1 和 C2 共同爭搶當(dāng)前的消息隊列內(nèi)容,誰先拿到誰負(fù)責(zé)消費消息。
2)、存在的問題:高并發(fā)情況下,默認(rèn)會產(chǎn)生某一個消息被多個消費者共同使用,可以設(shè)置一個開關(guān)
(synchronized,與同步鎖的性能不一樣),保證一條消息只能被一個消費者使用。
3)、應(yīng)用場景:紅包;大項目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到
消息隊列中,空閑的系統(tǒng)自動爭搶);對于任務(wù)過重或任務(wù)較多情況使用工作隊列可以提高任務(wù)處理的速度。
4)、代碼實現(xiàn):
消息提供者
package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//獲取連接對象Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//通過通道創(chuàng)建隊列channel.queueDeclare("queue1", false, false, false, null);//向隊列中發(fā)送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());}//斷開連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者1
package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//監(jiān)聽隊列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者2
package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//監(jiān)聽隊列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消費者1獲得消息為:Hello RabbitMQ!!!1
消費者1獲得消息為:Hello RabbitMQ!!!3
消費者1獲得消息為:Hello RabbitMQ!!!5
消費者1獲得消息為:Hello RabbitMQ!!!7
消費者1獲得消息為:Hello RabbitMQ!!!9
消費者2獲得消息為:Hello RabbitMQ!!!2
消費者2獲得消息為:Hello RabbitMQ!!!4
消費者2獲得消息為:Hello RabbitMQ!!!6
消費者2獲得消息為:Hello RabbitMQ!!!8
消費者2獲得消息為:Hello RabbitMQ!!!10
3、publish/subscribe發(fā)布訂閱(共享資源)
1)、X代表交換機(jī),rabbitMQ 內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消
息放入交換機(jī),交換機(jī)發(fā)布訂閱把消息發(fā)送到所有消息隊列中,對應(yīng)消息隊列的消費者拿到消息進(jìn)行消費。
Exchange 有常見以下 3 種類型:
-
Fanout
:廣播,將消息交給所有綁定到交換機(jī)的隊列。 -
Direct
:定向,把消息交給符合指定 routing key 的隊列。 -
Topic
:通配符,把消息交給符合 routing pattern (路由模式)的隊列。
Exchange
(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者
沒有符合路由規(guī)則的隊列,那么消息會丟失。
2)相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)。
3)、代碼實現(xiàn):
消息提供者
package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交換機(jī)和隊列可以在提供方和消費方某一方創(chuàng)建,在兩邊同時創(chuàng)建也可以,只要創(chuàng)建的名稱一致。
// 保證,哪一方先運行則在哪一方創(chuàng)建
public class Provider {public static void main(String[] args) {try {//獲取連接對象Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)(交換機(jī)沒有存儲數(shù)據(jù)的能力,數(shù)據(jù)存儲在隊列上,如果有交換機(jī)沒隊列的情況下,數(shù)據(jù)會丟失)// 1.參數(shù)一:交換機(jī)名稱 參數(shù)二:交換機(jī)類型channel.exchangeDeclare("fanout_exchange", "fanout");//通過通道創(chuàng)建隊列//channel.queueDeclare("queue1",false,false,false,null);//向隊列中發(fā)送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());}//斷開連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者1
package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("fanout_queue1", false, false, false, null);//給隊列綁定交換機(jī)channel.queueBind("fanout_queue1", "fanout_exchange", "");//監(jiān)聽隊列中的消息channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者2
package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("fanout_queue2", false, false, false, null);//給隊列綁定交換機(jī)channel.queueBind("fanout_queue2", "fanout_exchange", "");//監(jiān)聽隊列中的消息channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消費者1獲得消息為:Hello RabbitMQ!!!1
消費者1獲得消息為:Hello RabbitMQ!!!2
消費者1獲得消息為:Hello RabbitMQ!!!3
消費者1獲得消息為:Hello RabbitMQ!!!4
消費者1獲得消息為:Hello RabbitMQ!!!5
消費者1獲得消息為:Hello RabbitMQ!!!6
消費者1獲得消息為:Hello RabbitMQ!!!7
消費者1獲得消息為:Hello RabbitMQ!!!8
消費者1獲得消息為:Hello RabbitMQ!!!9
消費者1獲得消息為:Hello RabbitMQ!!!10
消費者2獲得消息為:Hello RabbitMQ!!!1
消費者2獲得消息為:Hello RabbitMQ!!!2
消費者2獲得消息為:Hello RabbitMQ!!!3
消費者2獲得消息為:Hello RabbitMQ!!!4
消費者2獲得消息為:Hello RabbitMQ!!!5
消費者2獲得消息為:Hello RabbitMQ!!!6
消費者2獲得消息為:Hello RabbitMQ!!!7
消費者2獲得消息為:Hello RabbitMQ!!!8
消費者2獲得消息為:Hello RabbitMQ!!!9
消費者2獲得消息為:Hello RabbitMQ!!!10
4、routing路由模式
1)、消息生產(chǎn)者將消息發(fā)送給交換機(jī)按照路由判斷,路由是字符串,當(dāng)前產(chǎn)生的消息攜帶路由字符,交換機(jī)根據(jù)路
由的 key,只能匹配上路由 key 對應(yīng)的消息隊列,對應(yīng)的消費者才能消費消息。隊列與交換機(jī)的綁定,不能是任意
綁定了,而是要指定一個 RoutingKey (路由 key)。消息的發(fā)送方在向 Exchange 發(fā)送消息時,也必須指定消息的
RoutingKey 。Exchange 不再把消息交給每一個綁定的隊列,而是根據(jù)消息的 Routing Key 進(jìn)行判斷,只有隊列
的 Routingkey 與消息的 Routing key 完全一致,才會接收到消息。
2)、根據(jù)業(yè)務(wù)功能定義路由字符串。
3)、從系統(tǒng)的代碼邏輯中獲取對應(yīng)的功能字符串,將消息任務(wù)扔到對應(yīng)的隊列中。
4)、業(yè)務(wù)場景:error 通知;EXCEPTION;錯誤通知的功能;傳統(tǒng)意義的錯誤通知;客戶通知;利用key路由,可
以將程序中的錯誤封裝成消息傳入到消息隊列中,開發(fā)者可以自定義消費者,實時接收錯誤。
5)、代碼實現(xiàn):
消息提供者
package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交換機(jī)和隊列可以在提供方和消費方某一方創(chuàng)建,在兩邊同時創(chuàng)建也可以,只要創(chuàng)建的名稱一致。
// 保證,哪一方先運行則在哪一方創(chuàng)建public class Provider {public static void main(String[] args) {try {//獲取連接對象Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)(交換機(jī)沒有存儲數(shù)據(jù)的能力,數(shù)據(jù)存儲在隊列上,如果有交換機(jī)沒隊列的情況下,數(shù)據(jù)會丟失)// 1.參數(shù)一:交換機(jī)名稱 參數(shù)二:交換機(jī)類型channel.exchangeDeclare("direct_exchange", "direct");//向隊列中發(fā)送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("direct_exchange",//設(shè)置路由鍵,符合路由鍵的隊列,才能拿到消息"insert",null,("Hello RabbitMQ!!!" + i).getBytes());}//斷開連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者1
package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("direct_queue1", false, false, false, null);//綁定交換機(jī)(routingKey:路由鍵)channel.queueBind("direct_queue1", "direct_exchange", "select");channel.queueBind("direct_queue1", "direct_exchange", "insert");//監(jiān)聽隊列中的消息channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者2
package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("direct_queue2", false, false, false, null);//綁定交換機(jī)(routingKey:路由鍵)channel.queueBind("direct_queue2", "direct_exchange", "delete");channel.queueBind("direct_queue2", "direct_exchange", "select");//監(jiān)聽隊列中的消息channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消費者1獲得消息為:Hello RabbitMQ!!!1
消費者1獲得消息為:Hello RabbitMQ!!!2
消費者1獲得消息為:Hello RabbitMQ!!!3
消費者1獲得消息為:Hello RabbitMQ!!!4
消費者1獲得消息為:Hello RabbitMQ!!!5
消費者1獲得消息為:Hello RabbitMQ!!!6
消費者1獲得消息為:Hello RabbitMQ!!!7
消費者1獲得消息為:Hello RabbitMQ!!!8
消費者1獲得消息為:Hello RabbitMQ!!!9
消費者1獲得消息為:Hello RabbitMQ!!!10
5、topic 主題模式(路由模式的一種)
1)、Topic 類型與 Direct 相比,都是可以根據(jù) RoutingKey 把消息路由到不同的隊列。只不過 Topic 類型
Exchange 可以讓隊列在綁定 Routing key 的時候使用通配符。
2)、Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以 . 分割,例如:item.insert。
通配符規(guī)則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
舉例:
item.#
:能夠匹配item.insert.abc
或者item.insert
item.*
:只能匹配 item.insert
usa.#
,因此凡是以 usa.
開頭的 routing key
都會被匹配到
#.news
,因此凡是以 .news
結(jié)尾的 routing key
都會被匹配
3)、路由功能添加模糊匹配。
4)、消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機(jī)。
5)、交換機(jī)根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費。
6)、代碼實現(xiàn):
消息提供者
package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//交換機(jī)和隊列可以在提供方和消費方某一方創(chuàng)建,在兩邊同時創(chuàng)建也可以,只要創(chuàng)建的名稱一致。
// 保證,哪一方先運行則在哪一方創(chuàng)建public class Provider {public static void main(String[] args) {try {//獲取連接對象Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)(交換機(jī)沒有存儲數(shù)據(jù)的能力,數(shù)據(jù)存儲在隊列上,如果有交換機(jī)沒隊列的情況下,數(shù)據(jù)會丟失) //1.參數(shù)一:交換機(jī)名稱 參數(shù)二:交換機(jī)類型channel.exchangeDeclare("topic_exchange", "topic");//向隊列中發(fā)送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("topic_exchange",// #:匹配0-n個單詞(之間以.區(qū)分,兩點之間算一個單詞,可以匹配hello world空格的情況) *(匹配一個單詞)"emp.hello world",null,("Hello RabbitMQ!!!" + i).getBytes());}//斷開連接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者1
package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("topic_queue1", false, false, false, null);//綁定交換機(jī)(routingKey:路由鍵) #:匹配0-n個單詞(之間以.區(qū)分,兩點之間算一個單詞)channel.queueBind("topic_queue1", "topic_exchange", "emp.#");//監(jiān)聽隊列中的消息channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者1獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消息消費者2
package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//獲取通道對象Channel channel = connection.createChannel();//創(chuàng)建隊列channel.queueDeclare("topic_queue2", false, false, false, null);//綁定交換機(jī)(routingKey:路由鍵) *:匹配1個單詞(之間以.區(qū)分,兩點之間算一個單詞)channel.queueBind("topic_queue2", "topic_exchange", "emp.*");//監(jiān)聽隊列中的消息channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消費者2獲得消息為:" + new String(body, "utf-8"));}});//消費方不需要關(guān)閉連接,保持一直監(jiān)聽隊列狀態(tài)// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消費者1獲得消息為:Hello RabbitMQ!!!1
消費者1獲得消息為:Hello RabbitMQ!!!2
消費者1獲得消息為:Hello RabbitMQ!!!3
消費者1獲得消息為:Hello RabbitMQ!!!4
消費者1獲得消息為:Hello RabbitMQ!!!5
消費者1獲得消息為:Hello RabbitMQ!!!6
消費者1獲得消息為:Hello RabbitMQ!!!7
消費者1獲得消息為:Hello RabbitMQ!!!8
消費者1獲得消息為:Hello RabbitMQ!!!9
消費者1獲得消息為:Hello RabbitMQ!!!10
6、RPC
RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實現(xiàn),流程如下:
1)、客戶端即是生產(chǎn)者也是消費者,向RPC請求隊列發(fā)送RPC調(diào)用消息,同時監(jiān)聽RPC響應(yīng)隊列。
2)、服務(wù)端監(jiān)聽RPC請求隊列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果。
3)、服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊列。
4)、客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊列,接收到RPC調(diào)用結(jié)果。
5)、代碼實現(xiàn):
Client端
package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Client {public static void main(String[] argv) throws IOException, InterruptedException {String message = "Hello World!!!";// 建立一個連接和一個通道,并為回調(diào)聲明一個唯一的回調(diào)隊列Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定義一個臨時變量的接受隊列名String replyQueueName = channel.queueDeclare().getQueue();// 生成一個唯一的字符串作為回調(diào)隊列的編號String corrId = UUID.randomUUID().toString();// 發(fā)送請求消息,消息使用了兩個屬性:replyTo和correlationId// 服務(wù)端根據(jù)replyTo返回結(jié)果,客戶端根據(jù)correlationId判斷響應(yīng)是不是給自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 發(fā)布一個消息,rpc_queue路由規(guī)則channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));// 由于我們的消費者交易處理是在單獨的線程中進(jìn)行的,因此我們需要在響應(yīng)到達(dá)之前暫停主線程。// 這里我們創(chuàng)建的容量為1的阻塞隊列ArrayBlockingQueue,因為我們只需要等待一個響應(yīng)。final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//檢查它的correlationId是否是我們所要找的那個if (properties.getCorrelationId().equals(corrId)) {//如果是,則響應(yīng)BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});System.out.println(" 客戶端請求的結(jié)果:" + response.take());}
}
Server端
package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Server {public static void main(String[] args) {Connection connection = null;try {connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("rpc_queue", false, false, false, null);channel.basicQos(1);System.out.println("Awaiting RPC requests:");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response = new String(body, "UTF-8");System.out.println("response (" + response + ")");} catch (RuntimeException e) {System.out.println("錯誤信息 " + e.toString());} finally {// 返回處理結(jié)果隊列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));// 確認(rèn)消息,已經(jīng)收到后面參數(shù) multiple:是否批量.true:將一次性確認(rèn)所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};// 取消自動確認(rèn)boolean autoAck = false;channel.basicConsume("rpc_queue", autoAck, consumer);// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)# 客戶端發(fā)起3次請求
客戶端請求的結(jié)果:Hello World!!!
客戶端請求的結(jié)果:Hello World!!!
客戶端請求的結(jié)果:Hello World!!!
7、發(fā)布訂閱模式與工作隊列模式的區(qū)別
1、工作隊列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。
2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊列模式的生產(chǎn)方是面向隊列發(fā)送消息(底層使
用默認(rèn)交換機(jī))。
3、發(fā)布/訂閱模式需要設(shè)置隊列和交換機(jī)的綁定,工作隊列模式不需要設(shè)置,實際上工作隊列模式會將
隊列綁定到默認(rèn)的交換機(jī) 。