做網(wǎng)站時(shí)分類標(biāo)題和分類描述搜索引擎排名谷歌
文章目錄
- 1. 消息丟失的情況
- 2. 生產(chǎn)者的可靠性
- 2.1 生產(chǎn)者重連
- 2.2 生產(chǎn)者確認(rèn)
- 2.3 生產(chǎn)者確認(rèn)機(jī)制的代碼實(shí)現(xiàn)
- 2.4 如何看待和處理生產(chǎn)者的確認(rèn)信息
- 3. 消息代理(RabbitMQ)的可靠性
- 3.1 數(shù)據(jù)持久化
- 3.2 LazyQueue( 3.12 版本后所有隊(duì)列都是 Lazy Queue 模式)
- 4. 消費(fèi)者的可靠性
- 4.1 消費(fèi)者確認(rèn)機(jī)制
- 4.2 失敗重試機(jī)制
- 4.3 失敗消息的處理策略
- 4.4 業(yè)務(wù)冪等性
- 4.4.1 方案一:為每條消息設(shè)置一個(gè)唯一的 id
- 4.4.2 方案二:結(jié)合業(yè)務(wù)判斷
- 4.5 兜底的解決方案
- 5. 延遲消息
- 5.1 什么是延遲消息
- 5.2 死信交換機(jī)
- 5.3 延遲消息插件(推薦使用)
- 5.3.1 下載并安裝延遲插件
- 5.3.2 安裝插件時(shí)可能遇到的問題
- 5.3.3 在 Java 代碼中發(fā)送延遲消息
- 5.3.4 延遲消息的原理和缺點(diǎn)
- 5.4 取消超時(shí)訂單
- 5.5 發(fā)送延遲檢測訂單的消息
對 RabbitMQ 不是很了解的同學(xué),可以看一下我的另一篇博文:RabbitMQ快速入門(MQ的概念、安裝RabbitMQ、在 SpringBoot 項(xiàng)目中集成 RabbitMQ )
1. 消息丟失的情況
消息丟失的情況主要有以下三種:
- 生產(chǎn)者向消息代理傳遞消息的過程中,消息丟失了
- 消息代理( RabbitMQ )把消息弄丟了
- 消費(fèi)者把消息弄丟了
那怎么保證消息的可靠性呢,我們可以從消息丟失的情況入手——從生產(chǎn)者、消息代理( RabbitMQ )、消費(fèi)者三個(gè)方面來保證消息的可靠性
2. 生產(chǎn)者的可靠性
2.1 生產(chǎn)者重連
由于網(wǎng)絡(luò)問題,可能會(huì)出現(xiàn)客戶端連接 RabbitMQ 失敗的情況,我們可以通過配置開啟連接 RabbitMQ 失敗后的重連機(jī)制
application.yml(將 host 更改為部署 RabbitMQ 的服務(wù)器的地址)
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 連接超時(shí)時(shí)間template:retry:enabled: true # 開啟連接超時(shí)重試機(jī)制initial-interval: 1000ms # 連接失敗后的初始等待時(shí)間multiplier: 1 # 連接失敗后的等待時(shí)長倍數(shù),下次等待時(shí)長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數(shù)
填寫完配置信息后,我們手動(dòng)停止 RabbitMQ ,模擬生產(chǎn)者連接 RabbitMQ 失敗的情況
sudo docker stop rabbitmq
啟動(dòng)測試類
@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}
可以在控制臺(tái)看到,總共有三次重新連接 RabbitMQ 的記錄,三次連接都失敗后,就直接拋異常了
注意事項(xiàng):
- 當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率,但 SpringAMOP 提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,線程會(huì)被阻塞,影響業(yè)務(wù)性能
- 如果對于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請合理配置等待時(shí)長(比如 200 ms)和重試次數(shù),也
可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼
2.2 生產(chǎn)者確認(rèn)
RabbitMQ 提供了 Publisher Confirm
和 Publisher Return
兩種確認(rèn)機(jī)制。開啟確機(jī)制認(rèn)后,如果 MQ 成功收到消息后,會(huì)返回確認(rèn)消息給生產(chǎn)者,返回的結(jié)果有以下幾種情況:
- 消息投遞到了 MQ,但是路由失敗,此時(shí)會(huì)通過 PublisherReturn 機(jī)制返回路由異常的原因,然后返回 ACK,告知生產(chǎn)者消息投遞成功
- 臨時(shí)消息投遞到了 MQ,并且入隊(duì)成功,返回 ACK,告知生產(chǎn)者消息投遞成功
- 持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回 ACK,告知生產(chǎn)者消息投遞成功
- 其它情況都會(huì)返回 NACK,告知生產(chǎn)者消息投遞失敗
2.3 生產(chǎn)者確認(rèn)機(jī)制的代碼實(shí)現(xiàn)
在 publisher 服務(wù)中編寫與生產(chǎn)者確認(rèn)機(jī)制有關(guān)的配置信息( application.yml 文件)
spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated
publisher-confirm-type 有三種模式:
- none:關(guān)閉 confirm 機(jī)制
- simple:以同步阻塞等待的方式返回 MQ 的回執(zhí)消息
- correlated:以異步回調(diào)方式的方式返回 MQ 的回執(zhí)消息
每個(gè) RabbitTemplate 只能配置一個(gè) ReturnCallback
在 publisher 模塊新增一個(gè)名為 RabbitMQConfig
的配置類,并讓該類實(shí)現(xiàn) ApplicationContextAware
接口
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回調(diào)rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}
測試前先運(yùn)行 RabbitMQ
sudo docker start rabbitmq
在 publisher 模塊添加一個(gè)測試類,測試 ReturnCallback 的效果
@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息發(fā)送成功System.out.println("消息發(fā)送成功,收到ack");} else {// 消息發(fā)送失敗System.err.println("消息發(fā)送失敗,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回調(diào)失敗System.err.println("消息回調(diào)失敗");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 測試方法執(zhí)行結(jié)束后程序就結(jié)束了,所以這里需要阻塞線程,否則程序看不到回調(diào)結(jié)果Thread.sleep(2000);
}
發(fā)送成功后可以看到消息發(fā)送成功的回調(diào)信息
如果交換機(jī)不存在會(huì)怎么樣呢,我們故意使用一個(gè)不存在的交換機(jī),觀察控制臺(tái)的輸出結(jié)果
如果 routingKey 不存在會(huì)怎么樣呢,我們故意使用一個(gè)不存在的 routingKey ,觀察控制臺(tái)的輸出結(jié)果
可以看到,confirmCallback 和 ReturnCallback 都返回了回調(diào)信息(deliveryTag
為 0
表示消息無法路由到隊(duì)列)
2.4 如何看待和處理生產(chǎn)者的確認(rèn)信息
- 生產(chǎn)者確認(rèn)需要額外的網(wǎng)絡(luò)開銷和系統(tǒng)資源開銷,盡量不要使用
- 如果一定要使用,無需開啟 Publisher-Return 機(jī)制,因?yàn)槁酚墒∫话闶菢I(yè)務(wù)出了問題
- 對于返回 nack 的消息,可以嘗試重新投遞,如果依然失敗,則記錄異常消息
3. 消息代理(RabbitMQ)的可靠性
在默認(rèn)情況下,RabbitMQ 會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲,這樣會(huì)導(dǎo)致兩個(gè)問題:
- 一旦 RabbitMQ 宕機(jī),內(nèi)存中的消息會(huì)丟失
- 內(nèi)存空間是有限的,當(dāng)消費(fèi)者處理過慢或者消費(fèi)者出現(xiàn)故障或時(shí),會(huì)導(dǎo)致消息積壓,引發(fā) MQ 阻塞( Paged Out 現(xiàn)象)
怎么理解 MQ 阻塞呢,當(dāng)隊(duì)列的空間被消息占滿了之后,RabbitMQ 會(huì)先把老舊的信息存到磁盤,為新消息騰出空間,在這個(gè)過程中,整個(gè) MQ 是被阻塞的,也就是說,在 MQ 完成這一系列工作之前,無法處理已有的消息和接收新的消息
我們來測試一下消息丟失的情況,在 RabbitMQ 的控制臺(tái)中向 simple.queue 隊(duì)列發(fā)送一條信息,發(fā)送后重啟 RabbitMQ ,模擬 RabbitMQ 宕機(jī)后重啟的情況
測試前,記得先把監(jiān)聽 simple.queue 隊(duì)列的代碼注釋掉
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");
}
第一步:先發(fā)送一條消息
第二步:查看消息的情況
第三步:重啟 RabbitMQ ,模擬 RabbitMQ 宕機(jī)后重啟的情況
sudo docker restart rabbitmq
第四步:查看消息的情況(可以看到,RabbitMQ 重啟后,消息丟失了)
3.1 數(shù)據(jù)持久化
RabbitMQ 實(shí)現(xiàn)數(shù)據(jù)持久化包括 3 個(gè)方面:
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
注意事項(xiàng):
- 利用 SpringAMQP 創(chuàng)建的交換機(jī)、隊(duì)列、消息,默認(rèn)都是持久化的
- 在 RabbitMQ 控制臺(tái)創(chuàng)建的交換機(jī)、隊(duì)列默認(rèn)是持久化的,而消息默認(rèn)是存在內(nèi)存中( 3.12 版本之前默認(rèn)存放在內(nèi)存,3.12 版本及之后默認(rèn)先存放在磁盤,消費(fèi)者處理消息時(shí)才會(huì)將消息取出來放到內(nèi)存中)
我們來演示一下 RabbitMQ 發(fā)生 Paged Out 現(xiàn)象(也就是隊(duì)列的空間被消息占滿了之后,將老舊消息移到磁盤,為新消息騰出空間的情況)
我們編寫一個(gè)測試類,向 simple.queue 一次性發(fā)送一百萬條消息
在發(fā)送消息之前,先把生產(chǎn)者確認(rèn)機(jī)制關(guān)閉,提高消息發(fā)送的速度
spring:rabbitmq:publisher-returns: falsepublisher-confirm-type: none
先測試發(fā)送非持久化信息
@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}
測試結(jié)果
再測試發(fā)送持久化信息
@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}
3.2 LazyQueue( 3.12 版本后所有隊(duì)列都是 Lazy Queue 模式)
從 RabbitMQ 的 3.6.0
版本開始,增加了 Lazy Queue 的概念,也就是惰性隊(duì)列,惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存(內(nèi)存中只保留最近的消息,默認(rèn) 2048條 )
- 消費(fèi)者要處理消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲(chǔ),在 3.12 版本后,所有隊(duì)列都是 Lazy Queue 模式,無法更改
開啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ 只有在消息持久化完成后才會(huì)給生產(chǎn)者返回 ACK 回執(zhí)
在 RabbitMQ 的控制臺(tái)可以看到 RabbitMQ 的版本
在 RabbitMQ 控制臺(tái)中,要?jiǎng)?chuàng)建一個(gè)惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定 x-queue-mode 屬性為 lazy 即可
x-queue-mode
在 Java 代碼中,要?jiǎng)?chuàng)建一個(gè)惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定 x-queue-mode 屬性為 lazy 即可
編程式創(chuàng)建
@Bean
public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
}
注解式創(chuàng)建
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazeQueue(String message) {System.out.println("消費(fèi)者收到了 laze.queue2的消息: " + message);
}
4. 消費(fèi)者的可靠性
4.1 消費(fèi)者確認(rèn)機(jī)制
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ 提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。處理消息后,消費(fèi)者應(yīng)該向 RabbitMQ 發(fā)送一個(gè)回執(zhí),告知 RabbitMQ 消息的處理狀態(tài),回執(zhí)有三種可選值:
- ack:成功處理消息,RabbitMQ 從隊(duì)列中刪除該消息
- nack:消息處理失敗,RabbitMQ 需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ 從隊(duì)列中刪除該消息
SpringAMQP 已經(jīng)實(shí)現(xiàn)了消息確認(rèn)功能,并允許我們通過配置文件選擇 ACK 的處理方式,有三種方式:
- none:不處理,即消息投遞給消費(fèi)者后立刻 ack,消息會(huì)會(huì)立刻從 MQ 中刪除,非常不安全,不建議使用
- manual:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用 api,發(fā)送 ack 或 reject ,存在業(yè)務(wù)入侵,但更靈活
- auto:自動(dòng)模式,SpringAMQP 利用 AOP 對我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回 ack,當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),會(huì)根據(jù)異常的類型返回不同結(jié)果:
- 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回 nack
- 如果是消息處理或校驗(yàn)異常,自動(dòng)返回 reject
開啟消息確認(rèn)機(jī)制,需要在 application.yml
文件中編寫相關(guān)的配置
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none
先測試處理模式為 none 的情況,向 simple.queue 隊(duì)列發(fā)送一條消息,同時(shí)監(jiān)聽 simple.queue 隊(duì)列的消息,監(jiān)聽到隊(duì)列中的消息后手動(dòng)拋出一個(gè)異常
publisher 服務(wù)
@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}
consumer 服務(wù)
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意拋出異常");
}
不出意外,程序報(bào)錯(cuò)了
但在 RabbitMQ 的控制臺(tái)可以看到,消息也丟失了
再測試處理模式為 none 的情況
可以看到,控制臺(tái)一直在報(bào)錯(cuò),報(bào)錯(cuò)之后一直在嘗試重新發(fā)送消息
在 RabbitMQ 的控制臺(tái)可以看到,simple.queue 一直在收發(fā)消息,速率達(dá)到了 97 次每秒(狀態(tài)為 running ,消息的狀態(tài)為 Unacked )
此時(shí),我們手動(dòng)關(guān)閉 consumer 服務(wù),查看 RabbitMQ 的控制臺(tái),可以看到消息恢復(fù)到正常的狀態(tài)了
再來測試異常類型為 MessageConversionException 的情況
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new MessageConversionException("故意拋出異常");
}
在控制臺(tái)可以看到,消息被拒絕了,而且消息也沒有重新發(fā)送
查看 RabbitMQ 的控制臺(tái),可以發(fā)現(xiàn)消息已經(jīng)從隊(duì)列中移除了
4.2 失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷重新入隊(duì),重新發(fā)送給消費(fèi)者,然后再次發(fā)生異常,再次 requeue(重新入隊(duì)),陷入 無限循環(huán),給 RabbitMQ 帶來不必要的壓力
我們可以利用 Spring 提供的 retry 機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制地重新入隊(duì)
在 application.yml 配置文件中開啟失敗重試機(jī)制
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 開啟消息消費(fèi)失敗重試機(jī)制initial-interval: 1000ms # 消息消費(fèi)失敗后的初始等待時(shí)間multiplier: 1 # 消息消費(fèi)失敗后的等待時(shí)長倍數(shù),下次等待時(shí)長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數(shù)stateless: true # true表示無狀態(tài),false表示有狀態(tài),如果業(yè)務(wù)中包含事務(wù),需要設(shè)置為false
我們將拋出的異常類型改回 RuntimeException
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意拋出異常");
}
在控制臺(tái)可以看出,消息的重新發(fā)送次數(shù)已經(jīng)耗盡了
查看 RabbitMQ 的控制臺(tái),發(fā)現(xiàn)消息也丟失了
正常情況下,消息丟失都不是我們想看到的,該怎么解決這個(gè)問題呢
4.3 失敗消息的處理策略
開啟重試模式后,如果重試次數(shù)耗盡后消息依然處理失敗,則需要由 MessageRecoverer 接口來處理, MessageRecoverer 有三個(gè)實(shí)現(xiàn)類:
RejectAndDontRequeueRecoverer
:重試次數(shù)耗盡后,直接 reject,丟棄消息,默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer
:重試次數(shù)耗盡后,返回 nack,消息重新入隊(duì)RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
我們來演示一下使用 RepublishMessageRecoverer 類的情況
第一步:定義一個(gè)名為 blog.error 的交換機(jī)、一個(gè)名為 error.queue 的隊(duì)列,并將隊(duì)列和交換機(jī)進(jìn)行綁定
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
第二步:將失敗處理策略改為 RepublishMessageRecoverer (開起了消費(fèi)者重試機(jī)制才會(huì)生效)
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
在控制臺(tái)中可以看到,消息的重試次數(shù)耗盡后,消息被放入了 error.queue 隊(duì)列
在 RabbitMQ 的控制塔也可以看到, error.direct 交換機(jī) 和 error.queue 隊(duì)列成功創(chuàng)建,消息也成功放入了 error.queue 隊(duì)列
總結(jié):消費(fèi)者如何保證消息一定被消費(fèi)?
- 開啟消費(fèi)者確認(rèn)機(jī)制為 auto ,由 Spring 幫我們確認(rèn),消息處理成功后返回 ack,異常時(shí)返回 nack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置
MessageRecoverer
,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
4.4 業(yè)務(wù)冪等性
冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來描述是這樣的:f(x) = f(f(x)),絕對值函數(shù)具有冪等性
在程序開發(fā)中,冪等是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對業(yè)務(wù)狀態(tài)的影響是一致的
那么有什么方法能夠確保業(yè)務(wù)的冪等性呢
4.4.1 方案一:為每條消息設(shè)置一個(gè)唯一的 id
給每個(gè)消息都設(shè)置一個(gè)唯一的 id,利用 id 區(qū)分是否是重復(fù)消息:
- 為每條消息都生成一個(gè)唯一的 id,與消息一起投遞給消費(fèi)者
- 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息 id 保存到數(shù)據(jù)庫
- 如果消費(fèi)者下次又收到相同消息,先去數(shù)據(jù)庫查詢該消息對應(yīng)的 id 是否存在,如果存在則為重復(fù)消息,放棄處理
可以在指定 MessageConverter 的具體類型時(shí),同時(shí)為 MessageConverter 設(shè)置自動(dòng)創(chuàng)建一個(gè) messageId
@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
發(fā)送消息后,在 RabbitMQ 的控制臺(tái)可以看到,消息的 properties 屬性附帶了 messageId 信息
但這種方式對業(yè)務(wù)有一定的侵入性
4.4.2 方案二:結(jié)合業(yè)務(wù)判斷
結(jié)合業(yè)務(wù)邏輯,基于業(yè)務(wù)本身做判斷。以支付業(yè)務(wù)為例:我們要在支付后修改訂單狀態(tài)為已支付,應(yīng)該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否是未支付,只有未支付訂單才需要修改,其它狀態(tài)的訂單不做處理
總結(jié):如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?
- 首先,支付服務(wù)會(huì)正在用戶支付成功以后利用 MQ 發(fā)送消息通知交易服務(wù),完成訂單狀態(tài)同步
- 其次,為了保證 MQ 消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞和處理的可靠性,同時(shí)也開啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失
- 最后,我們還在交易服務(wù)更新訂單狀態(tài)時(shí)做了業(yè)務(wù)幕等判斷,避免因消息重復(fù)消費(fèi)導(dǎo)致訂單狀態(tài)異常
4.5 兜底的解決方案
如果交易服務(wù)消息處理失敗,支付服務(wù)和交易服務(wù)出現(xiàn)了數(shù)據(jù)不一致的情況,有沒有什么兜底的解決方案?
我們可以在交易服務(wù)設(shè)置定時(shí)任務(wù),定期查詢訂單支付狀態(tài),這樣即便 MQ 通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性
5. 延遲消息
5.1 什么是延遲消息
延遲消息:生產(chǎn)者發(fā)送消息時(shí)指定一個(gè)時(shí)間,消費(fèi)者不會(huì)立刻收到消息,而是在指定時(shí)間之后才會(huì)收到消息
延遲任務(wù):一定時(shí)間之后才會(huì)執(zhí)行的任務(wù)
5.2 死信交換機(jī)
當(dāng)一個(gè)隊(duì)列中的某條消息滿足下列情況之一時(shí),就會(huì)成為死信(dead letter):
- 消費(fèi)者使用 basic.reject 或 basic.nack 聲明消費(fèi)失敗,并且消息的 requeue 參數(shù)設(shè)置為 false
- 過期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過期時(shí)間),消息超時(shí)后無人消費(fèi)
- 要投遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信
如果隊(duì)列通過 dead-letter-exchange 屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)
利用死信交換機(jī)的特點(diǎn),可以實(shí)現(xiàn)發(fā)送延遲消息的功能
5.3 延遲消息插件(推薦使用)
5.3.1 下載并安裝延遲插件
RabbitMQ 的官方推出了一個(gè)插件,原生支持延遲消息功能。該插件的原理是設(shè)計(jì)了一種支持延遲消息功能的交換機(jī),當(dāng)消息投遞到交換機(jī)后,可以將消息暫存一段時(shí)間,時(shí)間到了之后再將消息投遞到隊(duì)列中
插件的下載地址:rabbitmq-delayed-message-exchange
下載完插件后,運(yùn)行以下指令,在輸出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安裝目錄
sudo docker inspect rabbitmq
然后進(jìn)入 RabbitMQ 的插件的安裝目錄,將剛才下載的插件上傳到該目錄下
一般與 docker 相關(guān)的目錄只有 root 用戶才有權(quán)限訪問,所以我們需要先打開 docker 目錄的部分權(quán)限(耗時(shí)可能較長)
sudo chmod +rx -R /var/lib/docker
接著打開/var/lib/docker/volumes/rabbitmq-plugins/_data
目錄的寫權(quán)限(如果修改權(quán)限不生效,請切換到 root 用戶執(zhí)行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data
將剛才下載的插件上傳到/var/lib/docker/volumes/rabbitmq-plugins/_data
目錄
上傳成功后將/var/lib/docker/volumes/rabbitmq-plugins/_data
目錄的權(quán)限復(fù)原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data
最后進(jìn)入容器內(nèi)部,運(yùn)行指令安裝插件,安裝完成后退出容器內(nèi)部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
看到以下信息,說明插件安裝成功了
5.3.2 安裝插件時(shí)可能遇到的問題
如果你遇到了以下錯(cuò)誤,在執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange
指令前先執(zhí)行以下指令
chmod 400 /var/lib/rabbitmq/.erlang.cookie
5.3.3 在 Java 代碼中發(fā)送延遲消息
聲明延遲交換機(jī)
@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}
聲明隊(duì)列和延遲交換機(jī),并將隊(duì)列和延遲交換機(jī)綁定在一起
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),key = "delay"
))
public void listenDelayQueue(String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消費(fèi)者收到了 delay.queue的消息: " + message + ",時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}
編寫測試方法,測試發(fā)送延遲消息
@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000); // 毫秒return message;}});SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("發(fā)送消息成功!發(fā)送時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}
發(fā)送延遲消息的本質(zhì)是在消息頭屬性中添加 x-delay 屬性
5.3.4 延遲消息的原理和缺點(diǎn)
RabbitMQ 的延遲消息是怎么實(shí)現(xiàn)的呢?RabbitMQ 會(huì)自動(dòng)維護(hù)一個(gè)時(shí)鐘,這個(gè)時(shí)鐘每隔一秒就跳動(dòng)一次,如果對時(shí)鐘的精度要求比較高的,可能還要精確到毫秒,甚至納秒
RabbitMQ 會(huì)為發(fā)送到交換機(jī)的每一條延遲消息創(chuàng)建一個(gè)時(shí)鐘,時(shí)鐘運(yùn)行的過程中需要 CPU 不斷地進(jìn)行計(jì)算。發(fā)送到交換機(jī)的延遲消息數(shù)越多,RabbitMQ 需要維護(hù)的時(shí)鐘就越多,對 CPU 的占用率就越高(Spring 提供的定時(shí)任務(wù)的原理也是類似)
定時(shí)任務(wù)屬于 CPU 密集型任務(wù),中間涉及到的計(jì)算過程對 CPU 來說壓力是很大的,所以說,采用延遲消息會(huì)給服務(wù)器的 CPU 帶來更大的壓力。當(dāng)交換機(jī)中有非常多的延遲消息時(shí),對 CPU 的壓力就會(huì)特別大
所以說,延遲消息適用于延遲時(shí)間較短的場景
5.4 取消超時(shí)訂單
設(shè)置 30 分鐘后檢測訂單支付狀態(tài)實(shí)現(xiàn)起來非常簡單,但是存在兩個(gè)問題:
- 如果并發(fā)較高,30分鐘可能堆積消息過多,對 MQ 壓力很大
- 大多數(shù)訂單在下單后 1 分鐘內(nèi)就會(huì)支付,但消息需要在 MQ 中等待30分鐘,浪費(fèi)資源
5.5 發(fā)送延遲檢測訂單的消息
我們定義一個(gè)實(shí)體類,用于記錄延遲消息的內(nèi)容和延遲消息的延遲時(shí)間列表(該實(shí)體類也是延遲消息的類型)
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class MultipleDelayMessage<T> {private T data;private List<Long> delayMillis;public MultipleDelayMessage() {}public MultipleDelayMessage(T data, Long... delayMillis) {this.data = data;this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));}public MultipleDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));}public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {return new MultipleDelayMessage<>(data, delayMillis);}public boolean hasNextDelay() {return !delayMillis.isEmpty();}public Long removeNextDelay() {return delayMillis.remove(0);}public T getData() {return data;}public void setData(T data) {this.data = data;}public List<Long> getDelayMillis() {return delayMillis;}public void setDelayMillis(List<Long> delayMillis) {this.delayMillis = delayMillis;}@Overridepublic String toString() {return "MultipleDelayMessage{" +"data=" + data +", delayMillis=" + delayMillis +'}';}}
我們再定義一個(gè)發(fā)送延遲消息的消息處理器,供所有服務(wù)使用
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;public class DelayMessagePostProcessor implements MessagePostProcessor {private final Integer delay;public DelayMessagePostProcessor(Integer delay) {this.delay = delay;}@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}}
改造后的發(fā)送延遲消息的測試方法
@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("發(fā)送消息成功!發(fā)送時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}