醫(yī)院網(wǎng)站建設原理知乎關鍵詞搜索
🔥博客主頁:?【小扳_-CSDN博客】
?感謝大家點贊👍收藏?評論?
文章目錄
? ? ? ? 1.0 RabbitMQ 初識
????????1.1 RabbitMQ 安裝
? ? ? ? 2.0 數(shù)據(jù)隔離
????????2.1 用戶管理
????????2.2 virtual host 虛擬主機
? ? ? ? 3.0 SpringAMQP
? ? ? ? 3.1 RabbitMQ 配置
? ? ? ? 3.2 發(fā)送消息
? ? ? ? 3.3 接收消息
? ? ? ? 3.4 WorkQueues 模式
? ? ? ? 4.0 交換機類型
? ? ? ? 4.1 Fanout 交換機
? ? ? ? 4.2 Direct 交換機
? ? ? ? 4.3 Topic 交換機
? ? ? ? 5.0 聲明隊列和交換機
? ? ? ? 5.1 基本的 API
? ? ? ? 5.2 fanout 示例
? ? ? ? 5.3 direct 示例
? ? ? ? 5.4 基于注解聲明
? ? ? ? 5.4.1 Fanout 模式的交換機
? ? ? ? 5.4.2 Direct 模式的交換機
? ? ? ? 6.0 消息轉換器
? ? ? ? 6.1 配置 JSON 轉化器
? ? ? ? 6.2 實現(xiàn)業(yè)務冪等性
? ? ? ? 1.0 RabbitMQ 初識
? ? ? ? RabbitMQ 是基于 Erlang 語言開發(fā)的開源消息通信中間件,官網(wǎng)地址:https://www.rabbitmq.com/
????????RabbitMQ 中間件的使用目的,基于消息通知實現(xiàn)異步調用,一般包含三個角色:
? ? ? ? 1)消息發(fā)送者:投遞消息的人,就是原來的調用方。
? ? ? ? 2)消息 Broker:管理、暫存、轉發(fā)消息,可以理解為容器。
? ? ? ? 3)消息接收者:接收和處理消息的人,服務提供方。
? ? ? ? 在異步調用中,發(fā)送者不再直接同步調用接收者的業(yè)務接口,而是發(fā)送一條消息投遞給消息 Broker,然后接收者根據(jù)自己的需求從消息 Broker 那里訂閱消息。每當發(fā)送方發(fā)送消息后,接收者都能接收消息并處理,這樣發(fā)送消息的人與接收消息的人完全解耦了。
RabbitMQ 對應的框架:
其中包含的幾個概念:
? ? ? ? 1)Publish:生產者,也就是發(fā)送消息的一方。
? ? ? ? 2)consumer:消費者,也就是消費消息的一方。
? ? ? ? 3)queue:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理。
? ? ? ? 4)exchange:交換機,負責消息路由。生產者發(fā)送的消息由交換機決定投遞到哪個隊列。
? ? ? ? 5)virtual host:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的 exchange、queue
????????1.1 RabbitMQ 安裝
? ? ? ? 1)基于 Docker 來安裝 RabbitMQ,使用下面命令即可:
docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
????????2)如果部署在云服務器中,則還需要添加管理員,命令如下:
1. 進入 RabbitMQ Docker 容器:
? ? ? ? 首先找到正在運行的 RabbitMQ 容器的 ID 或名稱:
docker ps
2. 然后使用 docker exec 命令進入容器:
docker exec -it mq /bin/bash
3. 添加用戶:
rabbitmqctl add_user admin admin_password
4. 設置用戶權限:
? ? ? ? 將用戶設置為超級管理員:
rabbitmqctl set_user_tags admin administrator
5. 配置用戶虛擬主機權限:
? ? ? ? 如果 RabbitMQ 使用虛擬主機 vhost,需要為用戶設置相應的權限,假設虛擬主機為 “/”
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
6. 退出容器:
exit
? ? ? ? 可以看到安裝命令中有兩個映射的端口:
? ? ? ? 15672:RabbitMQ 提供的管理控制臺的端口。
? ? ? ? 5672:RabbitMQ 的消息發(fā)送處理接口。
? ? ? ? 安裝完成后,訪問自己的 "IP:15672" 即可看到管理控制臺。
? ? ? ? 如果是首次訪問需要登錄,默認的用戶名和密碼就是在設置超級管理員的時候所對應的用戶名和密碼。
? ? ? ? 2.0 數(shù)據(jù)隔離
????????2.1 用戶管理
? ? ? ? 點擊 admin 選項卡,首先看到 RabbitMQ 控制臺的用戶管理界面:
? ? ? ? Name:用戶名。
? ? ? ? Tages:administrator,說明該用戶是超級管理員,擁有所有權限。
? ? ? ? Can access virtual host:/,可以訪問的 virtual host,這里的 / 是默認的 virtual host 。
添加用戶:
? ? ? ? xbs 用戶已經(jīng)擁有了超級管理員的權限,但是沒有屬于自己的虛擬主機。?
? ? ? ? 因此,可以通過添加用戶,且創(chuàng)建另一個 virtual host,實現(xiàn)數(shù)據(jù)隔離。
????????2.2 virtual host 虛擬主機
? ? ? ? 先退出原先的用戶,切換到剛剛創(chuàng)建的 xbs 用戶登錄,然后點擊 Virtual Host 菜單,進入 virtual hsot 管理頁:
創(chuàng)建虛擬主機:
? ? ? ? 不同的虛擬主機之間,數(shù)據(jù)是隔離的,相互不會受到影響。簡單理解成 MySQL 中的數(shù)據(jù)庫,數(shù)據(jù)庫與數(shù)據(jù)庫之間的數(shù)據(jù)不會受到影響。
? ? ? ? 3.0 SpringAMQP
? ? ? ? 由于 RabbitMQ 采用了 AMQP 協(xié)議,因此具備跨語言的特性。任何語言只要遵循 AMQP 協(xié)議收發(fā)消息,都可以與 RabbitMQ 交互。并且 RabbitMQ 官方也提供了各種不同語言的客戶端。但是,RabbitMQ 官方提供的 Java 客戶端編碼相對復雜,一般生產環(huán)境下更多結合 Spring 來使用,而 Spring 的官方剛好基于 RabbitMQ 提供了這樣一套消息收發(fā)的模板工具:SpringAMQP,并且還基于 SpringBoot 對其實現(xiàn)了自動裝配,使用起來非常方便。
? ? ? ? SpringAMQP 官方地址:Spring AMQP
? ? ? ? SpringAMQP 提供了三個功能:
? ? ? ? 1)自動聲明隊列、交換機及其綁定關系。
? ? ? ? 2)基于注解的監(jiān)聽器模式,異步接收消息。
? ? ? ? 3)封裝了 RabbitTemplate 工具,用于發(fā)送消息。
? ? ? ? 3.1 RabbitMQ 配置
? ? ? ? 1)依賴引入:
<!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
? ? ? ? 2)application.yml 添加配置:
spring:rabbitmq:host: 113.45.166.112 # 你的虛擬機IPport: 5672 # 端口virtual-host: /tt # 虛擬主機username: xbs # 用戶名password: ******** # 密碼
? ? ? ? 3.2 發(fā)送消息
? ? ? ? 在發(fā)送消息之前,需要先根據(jù)圖形化界面創(chuàng)建一個 queue1 隊列:
成功創(chuàng)建 queue1 隊列:
? ? ? ? 接著在測試類 Test 中編寫,并利用 RabbitTemplate 實現(xiàn)消息發(fā)送:
代碼如下:
@Testvoid contextLoads() {//發(fā)送消息sendMessage("hello world");}// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;private void sendMessage(String massage){//需要明確發(fā)送的隊列名稱,以及發(fā)送的消息rabbitTemplate.convertAndSend("queue1",massage );}
測試結果:
? ? ? ? 可以看到 queue1 隊列中接收到消息了:
? ? ? ? 3.3 接收消息
? ? ? ? 剛剛發(fā)送的消息,已經(jīng)到 queue1 隊列中,但是沒有消費者進行消費,所以現(xiàn)在創(chuàng)建消費者進行處理消息。
? ? ? ? 在 SpringAMQP 中,是使用了監(jiān)聽器來進行對綁定的隊列進行監(jiān)聽,將來一旦監(jiān)聽的隊列中有了消息,就會推送給當前服務,調用當前方法,處理消息。
代碼如下:
@Slf4j @Component public class RabbitMQ {//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue1")private void receiveMessage(String massage) {//用什么類型發(fā)送,就接收什么類型log.info("接收到的消息為: " + massage);} }
? ? ? ? 注意:需要加上 @Component 注解,成為 IOC 容器中的 Bean 對象。?
執(zhí)行結果:
? ? ? ? 3.4 WorkQueues 模式
? ? ? ? Work queues,任務模型,簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息,如果直接將消息發(fā)送到隊列中,則隊列每一個消息只能被處理一次,每一個消息都不能被多個消費者同時消費。
? ? ? ? 而在 WorkQueues 模式中,就是將消息直接發(fā)送到隊列中,且多個消費者綁定同一個隊列。
WorkQueues 應用場景:
? ? ? ? 當消息處理比較耗時時的時候,可能產生消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
? ? ? ? 此時就可以使用 work 模型,多個消費者共同處理消息處理,消息處理的速度就能大大提高了。
????????默認情況下,消息是平均分配每個消費者,并沒有考慮到消費者的處理能力,沒有充分利用每一個消費者的能力,這樣顯然是有問題的。在 Spring 中有一個簡單的配置,可以解決這個問題,通過修改 application.yml 文件,添加配置:
spring:rabbitmq:host: 113.45.166.112 # 你的虛擬機IPport: 5672 # 端口virtual-host: /tt # 虛擬主機username: xbs # 用戶名password: ****** # 密碼listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
? ? ? ? 能者多勞,只要完成處理了消息,就能從隊列獲取一條消息。
測試:
? ? ? ? 1)定義了兩個消費者進行對隊列中的消息進行消費:
@Slf4j @Component public class RabbitMQ {//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue1")private void receiveMessage1(String massage) throws InterruptedException {String coloredMessage = String.format("\u001B[32m%s\u001B[0m", massage); // 綠色字體//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage1接收到的消息為: " + coloredMessage);//手動阻塞,模擬消息處理中Thread.sleep(200);}//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue1")private void receiveMessage2(String massage) throws InterruptedException {String coloredMessage = String.format("\\u001B[0m", massage); // 黃色字體//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage2接收到的消息為: " + massage);//手動阻塞,模擬消息處理中Thread.sleep(800);} }
? ? ? ? 2)發(fā)送多條消息到隊列中:
@Testvoid contextLoads() {//發(fā)送多條消息到queue1隊列中setRabbitTemplate();}// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;private void setRabbitTemplate(){for (int i = 1; i <= 100; i++) {rabbitTemplate.convertAndSend("queue1","接收到第"+i+"條消息");}}
測試結果:
? ? ? ? 很明顯看得出來,消費能力強的消費者處理隊列中的消息越多。正所謂能者多勞,這樣充分利用每一個消費者的處理能力,可以有效避免消息積壓問題。
? ? ? ? 4.0 交換機類型
? ? ? ? 在之前沒有交換機,都是生產者直接發(fā)送消息給隊列,而一旦引入交換機,消息發(fā)送的模式會有很大變化:
? ? ? ? Exchange 交換機,只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失。
? ? ? ? 而如果將消息直接發(fā)送到隊列中,那么隊列可以將消息進行存儲,不會丟失。
交換機的類型有四種:
? ? ? ? 1)Fanout:廣播,將消息交給所有綁定到交換機的隊列。
? ? ? ? 2)Direct:訂閱,基于 RoutingKey(路由 key)發(fā)送給訂閱了消息的隊列。
? ? ? ? 3)Topic:通配符訂閱,與 Direct 類型,只不過 RoutingKey 可以使用通配符。
? ? ? ? 4)Headers:頭匹配,基于 MQ 的消息匹配,用的較少。
? ? ? ? 4.1 Fanout 交換機
????????在廣播模式下,消息發(fā)送流程是這樣的:
-
1) 可以有多個隊列
-
2) 每個隊列都要綁定到 Exchange(交換機)
-
3) 生產者發(fā)送的消息,只能發(fā)送到交換機
-
4) 交換機把消息發(fā)送給綁定過的所有隊列
-
5) 訂閱隊列的消費者都能拿到消息
廣播模式演示:
? ? ? ? 先創(chuàng)建 Fanout 類型的交換機:
? ? ? ? 再創(chuàng)建兩個隊列 queue1、queue2:
? ? ? ? 接著 xbs.fanout 交換需要綁定 queue1、queue2 隊列:
? ? ? ? 再接著用代碼實現(xiàn)發(fā)送消息、接收消息:
代碼如下:
? ? ? ? 1)接收消息:
@Slf4j @Component public class RabbitMQ {//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue1")private void receiveMessage1(String massage) throws InterruptedException {String coloredMessage = String.format("\u001B[32m%s\u001B[0m", massage); // 綠色字體//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage1接收到的消息為: " + coloredMessage);}//接收消息//監(jiān)聽queue2隊列@RabbitListener(queues = "queue2")private void receiveMessage2(String massage) throws InterruptedException {//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage2接收到的消息為: " + massage);} }
? ? ? ? 2)發(fā)送消息:
@Testvoid contextLoads() {//發(fā)送多條消息到queue1隊列中sendMessageByExchange("廣播通知:狼來啦!!!!!!");}// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;//發(fā)送消息到交換機private void sendMessageByExchange(String msg){rabbitTemplate.convertAndSend("xbs.fanout","",msg);}
執(zhí)行結果:
? ? ? ? 4.2 Direct 交換機
????????在 Fanout 模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到 Direct 類型的 Exchange 。
在Direct模型下:
? ? ? ? 1)隊列與交換機的綁定,不能是任意綁定了,而是要指定一個 RoutingKey(路由key)
? ? ? ? 2)消息的發(fā)送方在 向 Exchange 發(fā)送消息時,也必須指定消息的 RoutingKey。
? ? ? ? 3)Exchange 不再把消息交給每一個綁定的隊列,而是根據(jù)消息的 RoutingKey?進行判斷,只有隊列的 RoutingKey 與消息的 RoutingKey?完全一致,才會接收到消息。
Direct 交換機演示:
????????1)首先創(chuàng)建 xbs.direct 交換機:
? ? ? ? 2)再創(chuàng)建兩個隊列 queue3、queue4:
? ? ? ? 3)xbs.direct 交換機綁定兩個隊列:
? ? ? ? 4)通過 RoutingKey 路由關鍵字來指定。對于 queue4 隊列也是同樣的道理:
? ? ? ? 接著使用代碼來實現(xiàn)發(fā)送消息、接收消息:
代碼實現(xiàn):
? ? ? ? 1)接收消息:
@Slf4j @Component public class RabbitMQ {//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue3")private void receiveMessage1(String massage) throws InterruptedException {String coloredMessage = String.format("\u001B[32m%s\u001B[0m", massage); // 綠色字體//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage1接收到的消息為: " + coloredMessage);}//接收消息//監(jiān)聽queue2隊列@RabbitListener(queues = "queue4")private void receiveMessage2(String massage) throws InterruptedException {//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage2接收到的消息為: " + massage);} }
? ? ? ? 2)發(fā)送消息:
@Testvoid contextLoads() {sendMessageByDirect1();}@Testpublic void sendMessageByDirect1(){rabbitTemplate.convertAndSend("xbs.direct","red","指定 red 路由進行發(fā)送消息");}@Testpublic void sendMessageByDirect2(){rabbitTemplate.convertAndSend("xbs.direct","blue","指定 blue 路由進行發(fā)送消息");}
執(zhí)行結果:
? ? ? ? 當指定 red 的路由關鍵字,那么只有隊列 queue1 才能接收得到,因此對應的消費者才能進行處理。
? ? ? ? 同理,當指定 blue 的路由關鍵字,那么只有隊列 queue2 才能接收得到,因此對應的消費者才能進行處理。
? ? ? ? 4.3 Topic 交換機
? ? ? ? Topic 類型的 Exchange 與 Direct 相比,都是可以根據(jù) RoutingKey 吧消息路由到不同的隊列。只不過 Topic 類型 Exhchange 可以讓隊列在綁定 BindingKey 的時候使用通配符。
通配符規(guī)則:
? ? ? ? 1)#:匹配一個或多個詞。
? ? ? ? 2)*:只匹配一個詞。
Topic 交換機演示:
? ? ? ? 1)創(chuàng)建 xbs.topic 交換機:
? ? ? ? 2)創(chuàng)建兩個隊列 queue5、queue5:
????????3)將該兩個隊列進行綁定到 xbs.topic 交換機上,并且指定對應通配符的?RoutingKey 關鍵字:
? ? ? ? 4)使用代碼實現(xiàn)發(fā)送消息、接收消息:
代碼如下:
? ? ? ? 發(fā)送消息:
@Testvoid contextLoads() {//兩個隊列綁定同一個交換機sendMessageByTopic1();}// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessageByTopic1(){rabbitTemplate.convertAndSend("xbs.topic","class.student","該消息通知學生以及老師:狼來啦!!");}@Testpublic void sendMessageByTopic2(){rabbitTemplate.convertAndSend("xbs.topic","class.teacher","該消息不會通知學生:狼來啦!!");}
? ? ? ? 接收消息:
@Slf4j @Component public class RabbitMQ {//接收消息//監(jiān)聽queue1隊列@RabbitListener(queues = "queue5")private void receiveMessage1(String massage) throws InterruptedException {String coloredMessage = String.format("\u001B[32m%s\u001B[0m", massage); // 綠色字體//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage1接收到的消息為: " + coloredMessage);}//接收消息//監(jiān)聽queue2隊列@RabbitListener(queues = "queue6")private void receiveMessage2(String massage) throws InterruptedException {//用什么類型發(fā)送,就接收什么類型log.info("receiveMessage2接收到的消息為: " + massage);} }
測試結果:
? ? ? ? 當發(fā)送的消息的關鍵字為 class.student 時,則會將該消息發(fā)送到 queue5、queue6 隊列中,所以對應的兩個消費者都能進行消費:
? ? ? ? 當發(fā)送的消息的關鍵字為 class.teacher 時,則該消息只會發(fā)送到 queue6 隊列中:
? ? ? ? 5.0 聲明隊列和交換機
? ? ? ? 在之前都是基于 RabbitMQ 控制臺來創(chuàng)建隊列、交換機。但是在實際開發(fā)時,隊列和交換機是程序員定義的,將來項目上線,又要交給運維去創(chuàng)建,那么程序員就需要把程序中運行的所有隊列和交換機都寫下來,交給運維,在這個過程中是是很容易出錯的。
????????因此推薦的做法是由程序員啟動時檢查隊列和交換機是否存在,如果不存在則自動創(chuàng)建。
? ? ? ? 5.1 基本的 API
? ? ? ? 1)SpringAMQP 提供了一個 Queue 類,用來創(chuàng)建隊列:
? ? ? ? 2)SpringAMQP 還提供了一個 Exchange 接口,來表示所有不同類型的交換機:
????????我們可以自己創(chuàng)建隊列和交換機,不過 SpringAMQP 還提供了 ExchangeBuilder 來簡化這個過程:
? ? ? ? 3)而在綁定隊列和交換機時,則需要使用 BindingBuilder 來創(chuàng)建 Binding 對象:
? ? ? ? 5.2 fanout 示例
????????在 FanoutExchangeCom 中創(chuàng)建一個類,聲明兩個隊列和一個交換機:
@Configuration public class FanoutExchangeCom {/*** 聲明交換機* @return Fanout類型交換機*/@Beanpublic FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange("tt.fanout").build();}/*** 聲明隊列* @return Queue*/@Beanpublic Queue queue1(){return new Queue("tt.queue1");}/*** 聲明隊列* @return Queue*/@Beanpublic Queue queue2(){return new Queue("tt.queue2");}/*** 綁定第一個隊列和交換機*/@Beanpublic Binding bindingQueue1(Queue queue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(queue1).to(fanoutExchange);}/*** 綁定第二個隊列和交換機*/@Beanpublic Binding bindingQueue2(Queue queue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(queue2).to(fanoutExchange);} }
? ? ? ? 當代碼運行起來,則會自動創(chuàng)建交換機和隊列:
? ? ? ? 5.3 direct 示例
????????在 DirectExchangeCom 中創(chuàng)建一個類,聲明兩個隊列和一個交換機:
@Configuration public class DirectExchangeCom {/*** 創(chuàng)建一個direct交換機* @return DirectExchange*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("tt.direct").build();}/*** 創(chuàng)建一個隊列* @return Queue*/@Beanpublic Queue queue3(){return new Queue("tt.queue3");}/*** 創(chuàng)建一個隊列* @return Queue*/@Beanpublic Queue queue4(){return new Queue("tt.queue4");}/*** 綁定隊列到交換機* @param queue1* @param directExchange* @return 且指定路由鍵 xbs*/@Beanpublic Binding binding1(Queue queue3,DirectExchange directExchange){return BindingBuilder.bind(queue3).to(directExchange).with("xbs");}/*** 綁定隊列到交換機* @param queue2* @param directExchange* @return 且指定路由鍵 tt*/@Beanpublic Binding binding2(Queue queue4,DirectExchange directExchange){return BindingBuilder.bind(queue4).to(directExchange).with("tt");}}
程序執(zhí)行結果:
? ? ? ? 自動創(chuàng)建隊列:
? ? ? ? 自動創(chuàng)建交換機:
? ? ? ? 5.4 基于注解聲明
? ? ? ? 基于注解 @Bean 的方式聲明隊列和交換機比價麻煩,Spring 還提供了基于注解方式來聲明。在監(jiān)聽隊列 @RabbitListener 注解上進行添加相關的注解來聲明交換機或者隊列。
? ? ? ? 5.4.1 Fanout 模式的交換機
代碼如下:
//基于注解聲明交換機和隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "tt.queue5"),exchange = @Exchange(name = "tt.fanoutExchange", type = ExchangeTypes.FANOUT)))private void sendMessage(String massage) throws InterruptedException {log.info("sendMessage發(fā)送的消息為: " + massage);}
? ? ? ? 通過注解 @Queue 聲明隊列,@Exchange 聲明交換機,可以指定類型,默認的類型為 Direct 類型的交換機。最后再通過 @QueueBinding 注解進行綁定隊列到交換機中。
當程序運行起來:
? ? ? ? tt.fanoutExchange 交換機:
? ? ? ?tt.queue5 隊列:
//基于注解聲明交換機和隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "tt.queue6"),exchange = @Exchange(name = "tt.directExchange",type = ExchangeTypes.DIRECT),key = "tt.xbs"))private void listenMessageByDirect(String massage) throws InterruptedException {log.info("sendMessageByFanout發(fā)送的消息為: " + massage);}
? ? ? ? 5.4.2 Direct 模式的交換機
代碼如下:
//基于注解聲明交換機和隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "tt.queue6"),exchange = @Exchange(name = "tt.directExchange",type = ExchangeTypes.DIRECT),key = "tt.xbs"))private void listenMessageByDirect(String massage) throws InterruptedException {log.info("sendMessageByFanout發(fā)送的消息為: " + massage);}
? ? ? ? 通過 @Queue 注解聲明隊列,@Exchange 注解聲明交換機,且指定交換機類型。還通過 @QueueBinding 注解將隊列綁定到交換機中,通過 key 指定路由關鍵字。
程序執(zhí)行結果:
? ? ? ? tt.directExchange 交換機:
? ? ? ? tt.queue6 隊列:
? ? ? ? 6.0 消息轉換器
? ? ? ? Spring 的消息發(fā)送代碼接收的消息體是一個 Object:
// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//發(fā)送Object類型的消息textObject();}@Testpublic void textObject(){User user = new User(1,"xbs","123456",null,null);rabbitTemplate.convertAndSend("text",user);}
? ? ? ? 此外,還需要確保 User 類實現(xiàn)?Serializable 接口,這是因為 SimpleMessageConverter 只支持 String、byte[] 和 Serializable 類型的消息負載。
此時隊列中的消息:
? ? ? ? 在數(shù)據(jù)傳輸時,SimpleMessageConverter 會把發(fā)送的消息序列化為字節(jié)發(fā)送給 MQ,接收消息的時候,還會把字節(jié)反序列化為 Java 對象。
? ? ? ? 只不過,默認情況下 Spring 采用的序列化方式是 JDK 序列化。從所周知,JDK 序列化存在以下問題:
? ? ? ? 1)數(shù)據(jù)體積過大。
? ? ? ? 2)有安全漏洞。
? ? ? ? 3)可讀性差。
? ? ? ? 6.1 配置 JSON 轉化器
? ? ? ? 顯然,JDK 序列化方式并不合適,期望消息體的體積更小、可讀性更高,因此可以使用 JSON 方式來做序列化和反序列化。
? ? ? ? 1)引入依賴:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version> </dependency>
? ? ? ? 注意,如果項目中引入了 Spring-boot-start-wed 依賴,則無需再次引入 Jackson 依賴。
? ? ? ? 2)添加新 MessageConverter 類型
@Beanpublic MessageConverter messageConverter(){//1.定義消息轉化器Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();//2.配置自定創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務中基于ID判斷是否是重復消息converter.setCreateMessageIds(true);return converter;}
????????消息轉換器中添加的 messageId 可以便于我們將來做冪等性判斷。
配置好了 Json 消息轉化器之后,進行測試:
? ? ? ? 發(fā)送的消息是 User 類型,且該 User 類不需要再實現(xiàn)?SimpleMessageConverter 接口。
// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//發(fā)送Object類型的消息textObject();}@Testpublic void textObject(){User user = new User(1,"xbs","123456",null,null);rabbitTemplate.convertAndSend("text",user);}
執(zhí)行結果:
? ? ? ? 可以很容易看到消息體中的內容,且字節(jié)只需要 81 個。
? ? ? ? 6.2 實現(xiàn)業(yè)務冪等性
? ? ? ? 冪等是一個數(shù)學概念,用函數(shù)表達式來描述:f(x)=f(f(x)) 。在程序開發(fā)中,則是指同一個業(yè)務,執(zhí)行一次或多次對業(yè)務狀態(tài)的影響是一致的。
? ? ? ? 實現(xiàn)業(yè)務冪等可以通過唯一消息 id,是給每個消息都設置一個唯一 ID,利用 id 區(qū)分是否是重復消息:
? ? ? ? 1)每一條消息都生成一個唯一的 id,與消息一起投遞給消費者。
? ? ? ? 2)消費者接收到消息后處理自己的業(yè)務,業(yè)務處理成功后將消息 ID 保存到數(shù)據(jù)庫。
? ? ? ? 3)如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復消息放棄處理。
? ? ? ? 那么通過配置?JSON 轉化器中,設置?setCreateMessageIds(true) 方法,接收者都可以在消息中獲取到唯一 id 。
代碼如下:
? ? ? ? 1)發(fā)送消息:
// 首先引入依賴@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//發(fā)送Object類型的消息textObject();}@Testpublic void textObject(){User user = new User(1,"xbs","123456",null,null);rabbitTemplate.convertAndSend("textExchange", "xbs", user, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setMessageId("123456");return message;}});}
? ? ? ? ?通過匿名類 MessagePostProcessor 重寫?postProcessMessage 方法,將消息發(fā)送之前對消息的屬性進行修改,比如說配置 id 屬性、配置消息頭等等,而對于?postProcessMessage 方法不能攜帶消息本體,只能在?convertAndSend() 方法中屬性進行配置,比如說發(fā)送 user 實體類。
? ? ? ? 2)接收消息:
@AutowiredMessageConverter messageConverter;@RabbitListener(queues = "text")private void listenMessage(Message massage) throws InterruptedException {log.info("sendMessage發(fā)送的消息為: " + massage.getMessageProperties().getMessageId());User user = (User) messageConverter.fromMessage(massage);log.info("sendMessage發(fā)送的消息為: " + user);}
? ? ? ? 監(jiān)聽 text 隊列中的消息,接收的參數(shù)不再是 user,而是 Message 類型,通過 massage.getMessageProperties().getMessageId() 獲取唯一 id,再通過 MessageConverter 消息轉化器將 massage 的本體消息轉化為 User 類型。
接收消息的結果: