中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

做網(wǎng)站時(shí)分類標(biāo)題和分類描述搜索引擎排名谷歌

做網(wǎng)站時(shí)分類標(biāo)題和分類描述,搜索引擎排名谷歌,秦皇島百度網(wǎng)站排名,三屏網(wǎng)站建設(shè)文章目錄 1. 消息丟失的情況2. 生產(chǎn)者的可靠性2.1 生產(chǎn)者重連2.2 生產(chǎn)者確認(rèn)2.3 生產(chǎn)者確認(rèn)機(jī)制的代碼實(shí)現(xiàn)2.4 如何看待和處理生產(chǎn)者的確認(rèn)信息 3. 消息代理(RabbitMQ)的可靠性3.1 數(shù)據(jù)持久化3.2 LazyQueue( 3.12 版本后所有隊(duì)列都是 Lazy Qu…

文章目錄

  • 1. 消息丟失的情況
  • 2. 生產(chǎn)者的可靠性
    • 2.1 生產(chǎn)者重連
    • 2.2 生產(chǎn)者確認(rèn)
    • 2.3 生產(chǎn)者確認(rèn)機(jī)制的代碼實(shí)現(xiàn)
    • 2.4 如何看待和處理生產(chǎn)者的確認(rèn)信息
  • 3. 消息代理(RabbitMQ)的可靠性
    • 3.1 數(shù)據(jù)持久化
    • 3.2 LazyQueue( 3.12 版本后所有隊(duì)列都是 Lazy Queue 模式)
  • 4. 消費(fèi)者的可靠性
    • 4.1 消費(fèi)者確認(rèn)機(jī)制
    • 4.2 失敗重試機(jī)制
    • 4.3 失敗消息的處理策略
    • 4.4 業(yè)務(wù)冪等性
      • 4.4.1 方案一:為每條消息設(shè)置一個(gè)唯一的 id
      • 4.4.2 方案二:結(jié)合業(yè)務(wù)判斷
    • 4.5 兜底的解決方案
  • 5. 延遲消息
    • 5.1 什么是延遲消息
    • 5.2 死信交換機(jī)
    • 5.3 延遲消息插件(推薦使用)
      • 5.3.1 下載并安裝延遲插件
      • 5.3.2 安裝插件時(shí)可能遇到的問題
      • 5.3.3 在 Java 代碼中發(fā)送延遲消息
      • 5.3.4 延遲消息的原理和缺點(diǎn)
    • 5.4 取消超時(shí)訂單
    • 5.5 發(fā)送延遲檢測訂單的消息

對 RabbitMQ 不是很了解的同學(xué),可以看一下我的另一篇博文:RabbitMQ快速入門(MQ的概念、安裝RabbitMQ、在 SpringBoot 項(xiàng)目中集成 RabbitMQ )

1. 消息丟失的情況

消息丟失的情況主要有以下三種:

  1. 生產(chǎn)者向消息代理傳遞消息的過程中,消息丟失了
  2. 消息代理( RabbitMQ )把消息弄丟了
  3. 消費(fèi)者把消息弄丟了

在這里插入圖片描述

那怎么保證消息的可靠性呢,我們可以從消息丟失的情況入手——從生產(chǎn)者、消息代理( RabbitMQ )、消費(fèi)者三個(gè)方面來保證消息的可靠性

2. 生產(chǎn)者的可靠性

2.1 生產(chǎn)者重連

由于網(wǎng)絡(luò)問題,可能會(huì)出現(xiàn)客戶端連接 RabbitMQ 失敗的情況,我們可以通過配置開啟連接 RabbitMQ 失敗后的重連機(jī)制

application.yml(將 host 更改為部署 RabbitMQ 的服務(wù)器的地址)

spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 連接超時(shí)時(shí)間template:retry:enabled: true # 開啟連接超時(shí)重試機(jī)制initial-interval: 1000ms # 連接失敗后的初始等待時(shí)間multiplier: 1 # 連接失敗后的等待時(shí)長倍數(shù),下次等待時(shí)長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數(shù)

填寫完配置信息后,我們手動(dòng)停止 RabbitMQ ,模擬生產(chǎn)者連接 RabbitMQ 失敗的情況

sudo docker stop rabbitmq

啟動(dòng)測試類

@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}

可以在控制臺(tái)看到,總共有三次重新連接 RabbitMQ 的記錄,三次連接都失敗后,就直接拋異常了

在這里插入圖片描述

注意事項(xiàng):

  1. 當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率,但 SpringAMOP 提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,線程會(huì)被阻塞,影響業(yè)務(wù)性能
  2. 如果對于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請合理配置等待時(shí)長(比如 200 ms)和重試次數(shù),也
    可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼

2.2 生產(chǎn)者確認(rèn)

RabbitMQ 提供了 Publisher ConfirmPublisher Return 兩種確認(rèn)機(jī)制。開啟確機(jī)制認(rèn)后,如果 MQ 成功收到消息后,會(huì)返回確認(rèn)消息給生產(chǎn)者,返回的結(jié)果有以下幾種情況:

  1. 消息投遞到了 MQ,但是路由失敗,此時(shí)會(huì)通過 PublisherReturn 機(jī)制返回路由異常的原因,然后返回 ACK,告知生產(chǎn)者消息投遞成功
  2. 臨時(shí)消息投遞到了 MQ,并且入隊(duì)成功,返回 ACK,告知生產(chǎn)者消息投遞成功
  3. 持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回 ACK,告知生產(chǎn)者消息投遞成功
  4. 其它情況都會(huì)返回 NACK,告知生產(chǎn)者消息投遞失敗

在這里插入圖片描述

2.3 生產(chǎn)者確認(rèn)機(jī)制的代碼實(shí)現(xiàn)

在 publisher 服務(wù)中編寫與生產(chǎn)者確認(rèn)機(jī)制有關(guān)的配置信息( application.yml 文件)

spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated

publisher-confirm-type 有三種模式:

  1. none:關(guān)閉 confirm 機(jī)制
  2. simple:以同步阻塞等待的方式返回 MQ 的回執(zhí)消息
  3. correlated:以異步回調(diào)方式的方式返回 MQ 的回執(zhí)消息

每個(gè) RabbitTemplate 只能配置一個(gè) ReturnCallback

在 publisher 模塊新增一個(gè)名為 RabbitMQConfig 的配置類,并讓該類實(shí)現(xiàn) ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回調(diào)rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}

測試前先運(yùn)行 RabbitMQ

sudo docker start rabbitmq

在 publisher 模塊添加一個(gè)測試類,測試 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息發(fā)送成功System.out.println("消息發(fā)送成功,收到ack");} else {// 消息發(fā)送失敗System.err.println("消息發(fā)送失敗,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回調(diào)失敗System.err.println("消息回調(diào)失敗");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 測試方法執(zhí)行結(jié)束后程序就結(jié)束了,所以這里需要阻塞線程,否則程序看不到回調(diào)結(jié)果Thread.sleep(2000);
}

發(fā)送成功后可以看到消息發(fā)送成功的回調(diào)信息


如果交換機(jī)不存在會(huì)怎么樣呢,我們故意使用一個(gè)不存在的交換機(jī),觀察控制臺(tái)的輸出結(jié)果

在這里插入圖片描述


如果 routingKey 不存在會(huì)怎么樣呢,我們故意使用一個(gè)不存在的 routingKey ,觀察控制臺(tái)的輸出結(jié)果

在這里插入圖片描述

可以看到,confirmCallback 和 ReturnCallback 都返回了回調(diào)信息(deliveryTag0 表示消息無法路由到隊(duì)列)

2.4 如何看待和處理生產(chǎn)者的確認(rèn)信息

  1. 生產(chǎn)者確認(rèn)需要額外的網(wǎng)絡(luò)開銷和系統(tǒng)資源開銷,盡量不要使用
  2. 如果一定要使用,無需開啟 Publisher-Return 機(jī)制,因?yàn)槁酚墒∫话闶菢I(yè)務(wù)出了問題
  3. 對于返回 nack 的消息,可以嘗試重新投遞,如果依然失敗,則記錄異常消息

3. 消息代理(RabbitMQ)的可靠性

在默認(rèn)情況下,RabbitMQ 會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲,這樣會(huì)導(dǎo)致兩個(gè)問題:

  1. 一旦 RabbitMQ 宕機(jī),內(nèi)存中的消息會(huì)丟失
  2. 內(nèi)存空間是有限的,當(dāng)消費(fèi)者處理過慢或者消費(fèi)者出現(xiàn)故障或時(shí),會(huì)導(dǎo)致消息積壓,引發(fā) MQ 阻塞( Paged Out 現(xiàn)象)

怎么理解 MQ 阻塞呢,當(dāng)隊(duì)列的空間被消息占滿了之后,RabbitMQ 會(huì)先把老舊的信息存到磁盤,為新消息騰出空間,在這個(gè)過程中,整個(gè) MQ 是被阻塞的,也就是說,在 MQ 完成這一系列工作之前,無法處理已有的消息和接收新的消息

在這里插入圖片描述

在這里插入圖片描述


我們來測試一下消息丟失的情況,在 RabbitMQ 的控制臺(tái)中向 simple.queue 隊(duì)列發(fā)送一條信息,發(fā)送后重啟 RabbitMQ ,模擬 RabbitMQ 宕機(jī)后重啟的情況

測試前,記得先把監(jiān)聽 simple.queue 隊(duì)列的代碼注釋掉

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");
}

第一步:先發(fā)送一條消息

在這里插入圖片描述

第二步:查看消息的情況

在這里插入圖片描述

在這里插入圖片描述

第三步:重啟 RabbitMQ ,模擬 RabbitMQ 宕機(jī)后重啟的情況

sudo docker restart rabbitmq

第四步:查看消息的情況(可以看到,RabbitMQ 重啟后,消息丟失了)

在這里插入圖片描述

3.1 數(shù)據(jù)持久化

RabbitMQ 實(shí)現(xiàn)數(shù)據(jù)持久化包括 3 個(gè)方面:

  1. 交換機(jī)持久化
  2. 隊(duì)列持久化
  3. 消息持久化

注意事項(xiàng):

  1. 利用 SpringAMQP 創(chuàng)建的交換機(jī)、隊(duì)列、消息,默認(rèn)都是持久化的
  2. 在 RabbitMQ 控制臺(tái)創(chuàng)建的交換機(jī)、隊(duì)列默認(rèn)是持久化的,而消息默認(rèn)是存在內(nèi)存中( 3.12 版本之前默認(rèn)存放在內(nèi)存,3.12 版本及之后默認(rèn)先存放在磁盤,消費(fèi)者處理消息時(shí)才會(huì)將消息取出來放到內(nèi)存中)

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

我們來演示一下 RabbitMQ 發(fā)生 Paged Out 現(xiàn)象(也就是隊(duì)列的空間被消息占滿了之后,將老舊消息移到磁盤,為新消息騰出空間的情況)

我們編寫一個(gè)測試類,向 simple.queue 一次性發(fā)送一百萬條消息

在發(fā)送消息之前,先把生產(chǎn)者確認(rèn)機(jī)制關(guān)閉,提高消息發(fā)送的速度

spring:rabbitmq:publisher-returns: falsepublisher-confirm-type: none

先測試發(fā)送非持久化信息

@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}

測試結(jié)果

在這里插入圖片描述


再測試發(fā)送持久化信息

@Test
void testPagedOut() {Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend("simple.queue", message);}
}

在這里插入圖片描述

3.2 LazyQueue( 3.12 版本后所有隊(duì)列都是 Lazy Queue 模式)

從 RabbitMQ 的 3.6.0 版本開始,增加了 Lazy Queue 的概念,也就是惰性隊(duì)列,惰性隊(duì)列的特征如下:

  1. 接收到消息后直接存入磁盤而非內(nèi)存(內(nèi)存中只保留最近的消息,默認(rèn) 2048條 )
  2. 消費(fèi)者要處理消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存
  3. 支持?jǐn)?shù)百萬條的消息存儲(chǔ),在 3.12 版本后,所有隊(duì)列都是 Lazy Queue 模式,無法更改

開啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ 只有在消息持久化完成后才會(huì)給生產(chǎn)者返回 ACK 回執(zhí)


在 RabbitMQ 的控制臺(tái)可以看到 RabbitMQ 的版本

在這里插入圖片描述

在 RabbitMQ 控制臺(tái)中,要?jiǎng)?chuàng)建一個(gè)惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定 x-queue-mode 屬性為 lazy 即可

在這里插入圖片描述

x-queue-mode

在 Java 代碼中,要?jiǎng)?chuàng)建一個(gè)惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定 x-queue-mode 屬性為 lazy 即可

編程式創(chuàng)建

@Bean
public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
}

注解式創(chuàng)建

@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazeQueue(String message) {System.out.println("消費(fèi)者收到了 laze.queue2的消息: " + message);
}

4. 消費(fèi)者的可靠性

4.1 消費(fèi)者確認(rèn)機(jī)制

為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ 提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。處理消息后,消費(fèi)者應(yīng)該向 RabbitMQ 發(fā)送一個(gè)回執(zhí),告知 RabbitMQ 消息的處理狀態(tài),回執(zhí)有三種可選值:

  1. ack:成功處理消息,RabbitMQ 從隊(duì)列中刪除該消息
  2. nack:消息處理失敗,RabbitMQ 需要再次投遞消息
  3. reject:消息處理失敗并拒絕該消息,RabbitMQ 從隊(duì)列中刪除該消息

SpringAMQP 已經(jīng)實(shí)現(xiàn)了消息確認(rèn)功能,并允許我們通過配置文件選擇 ACK 的處理方式,有三種方式:

  1. none:不處理,即消息投遞給消費(fèi)者后立刻 ack,消息會(huì)會(huì)立刻從 MQ 中刪除,非常不安全,不建議使用
  2. manual:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用 api,發(fā)送 ack 或 reject ,存在業(yè)務(wù)入侵,但更靈活
  3. auto:自動(dòng)模式,SpringAMQP 利用 AOP 對我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回 ack,當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),會(huì)根據(jù)異常的類型返回不同結(jié)果:
    • 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回 nack
    • 如果是消息處理或校驗(yàn)異常,自動(dòng)返回 reject

開啟消息確認(rèn)機(jī)制,需要在 application.yml 文件中編寫相關(guān)的配置

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none

先測試處理模式為 none 的情況,向 simple.queue 隊(duì)列發(fā)送一條消息,同時(shí)監(jiān)聽 simple.queue 隊(duì)列的消息,監(jiān)聽到隊(duì)列中的消息后手動(dòng)拋出一個(gè)異常

publisher 服務(wù)

@Test
void testSendMessageToQueue() {String queueName = "simple.queue";String msg = "Hello, SpringAMQP!";rabbitTemplate.convertAndSend(queueName, msg);
}

consumer 服務(wù)

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意拋出異常");
}

不出意外,程序報(bào)錯(cuò)了

在這里插入圖片描述

但在 RabbitMQ 的控制臺(tái)可以看到,消息也丟失了

在這里插入圖片描述

再測試處理模式為 none 的情況

可以看到,控制臺(tái)一直在報(bào)錯(cuò),報(bào)錯(cuò)之后一直在嘗試重新發(fā)送消息

在 RabbitMQ 的控制臺(tái)可以看到,simple.queue 一直在收發(fā)消息,速率達(dá)到了 97 次每秒(狀態(tài)為 running ,消息的狀態(tài)為 Unacked )

在這里插入圖片描述

此時(shí),我們手動(dòng)關(guān)閉 consumer 服務(wù),查看 RabbitMQ 的控制臺(tái),可以看到消息恢復(fù)到正常的狀態(tài)了

在這里插入圖片描述

再來測試異常類型為 MessageConversionException 的情況

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new MessageConversionException("故意拋出異常");
}

在控制臺(tái)可以看到,消息被拒絕了,而且消息也沒有重新發(fā)送

在這里插入圖片描述

查看 RabbitMQ 的控制臺(tái),可以發(fā)現(xiàn)消息已經(jīng)從隊(duì)列中移除了

在這里插入圖片描述

4.2 失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷重新入隊(duì),重新發(fā)送給消費(fèi)者,然后再次發(fā)生異常,再次 requeue(重新入隊(duì)),陷入 無限循環(huán),給 RabbitMQ 帶來不必要的壓力

我們可以利用 Spring 提供的 retry 機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制地重新入隊(duì)

在 application.yml 配置文件中開啟失敗重試機(jī)制

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 開啟消息消費(fèi)失敗重試機(jī)制initial-interval: 1000ms # 消息消費(fèi)失敗后的初始等待時(shí)間multiplier: 1 # 消息消費(fèi)失敗后的等待時(shí)長倍數(shù),下次等待時(shí)長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數(shù)stateless: true # true表示無狀態(tài),false表示有狀態(tài),如果業(yè)務(wù)中包含事務(wù),需要設(shè)置為false

我們將拋出的異常類型改回 RuntimeException

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {System.out.println("消費(fèi)者收到了simple.queue的消息:【" + message + "】");throw new RuntimeException("故意拋出異常");
}

在控制臺(tái)可以看出,消息的重新發(fā)送次數(shù)已經(jīng)耗盡了

在這里插入圖片描述

查看 RabbitMQ 的控制臺(tái),發(fā)現(xiàn)消息也丟失了

正常情況下,消息丟失都不是我們想看到的,該怎么解決這個(gè)問題呢

4.3 失敗消息的處理策略

開啟重試模式后,如果重試次數(shù)耗盡后消息依然處理失敗,則需要由 MessageRecoverer 接口來處理, MessageRecoverer 有三個(gè)實(shí)現(xiàn)類:

  1. RejectAndDontRequeueRecoverer:重試次數(shù)耗盡后,直接 reject,丟棄消息,默認(rèn)就是這種方式
  2. ImmediateRequeueMessageRecoverer:重試次數(shù)耗盡后,返回 nack,消息重新入隊(duì)
  3. RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

在這里插入圖片描述

我們來演示一下使用 RepublishMessageRecoverer 類的情況

第一步:定義一個(gè)名為 blog.error 的交換機(jī)、一個(gè)名為 error.queue 的隊(duì)列,并將隊(duì)列和交換機(jī)進(jìn)行綁定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}

第二步:將失敗處理策略改為 RepublishMessageRecoverer (開起了消費(fèi)者重試機(jī)制才會(huì)生效)

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

在控制臺(tái)中可以看到,消息的重試次數(shù)耗盡后,消息被放入了 error.queue 隊(duì)列

在這里插入圖片描述

在 RabbitMQ 的控制塔也可以看到, error.direct 交換機(jī) 和 error.queue 隊(duì)列成功創(chuàng)建,消息也成功放入了 error.queue 隊(duì)列

在這里插入圖片描述

總結(jié):消費(fèi)者如何保證消息一定被消費(fèi)?

  1. 開啟消費(fèi)者確認(rèn)機(jī)制為 auto ,由 Spring 幫我們確認(rèn),消息處理成功后返回 ack,異常時(shí)返回 nack
  2. 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置 MessageRecoverer ,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

4.4 業(yè)務(wù)冪等性

冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來描述是這樣的:f(x) = f(f(x)),絕對值函數(shù)具有冪等性

在程序開發(fā)中,冪等是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對業(yè)務(wù)狀態(tài)的影響是一致的

在這里插入圖片描述

那么有什么方法能夠確保業(yè)務(wù)的冪等性呢

4.4.1 方案一:為每條消息設(shè)置一個(gè)唯一的 id

給每個(gè)消息都設(shè)置一個(gè)唯一的 id,利用 id 區(qū)分是否是重復(fù)消息:

  1. 為每條消息都生成一個(gè)唯一的 id,與消息一起投遞給消費(fèi)者
  2. 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息 id 保存到數(shù)據(jù)庫
  3. 如果消費(fèi)者下次又收到相同消息,先去數(shù)據(jù)庫查詢該消息對應(yīng)的 id 是否存在,如果存在則為重復(fù)消息,放棄處理

可以在指定 MessageConverter 的具體類型時(shí),同時(shí)為 MessageConverter 設(shè)置自動(dòng)創(chuàng)建一個(gè) messageId

@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

發(fā)送消息后,在 RabbitMQ 的控制臺(tái)可以看到,消息的 properties 屬性附帶了 messageId 信息

在這里插入圖片描述

但這種方式對業(yè)務(wù)有一定的侵入性

4.4.2 方案二:結(jié)合業(yè)務(wù)判斷

結(jié)合業(yè)務(wù)邏輯,基于業(yè)務(wù)本身做判斷。以支付業(yè)務(wù)為例:我們要在支付后修改訂單狀態(tài)為已支付,應(yīng)該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否是未支付,只有未支付訂單才需要修改,其它狀態(tài)的訂單不做處理

在這里插入圖片描述

總結(jié):如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?

  1. 首先,支付服務(wù)會(huì)正在用戶支付成功以后利用 MQ 發(fā)送消息通知交易服務(wù),完成訂單狀態(tài)同步
  2. 其次,為了保證 MQ 消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞和處理的可靠性,同時(shí)也開啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失
  3. 最后,我們還在交易服務(wù)更新訂單狀態(tài)時(shí)做了業(yè)務(wù)幕等判斷,避免因消息重復(fù)消費(fèi)導(dǎo)致訂單狀態(tài)異常

4.5 兜底的解決方案

如果交易服務(wù)消息處理失敗,支付服務(wù)和交易服務(wù)出現(xiàn)了數(shù)據(jù)不一致的情況,有沒有什么兜底的解決方案?

我們可以在交易服務(wù)設(shè)置定時(shí)任務(wù),定期查詢訂單支付狀態(tài),這樣即便 MQ 通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性

5. 延遲消息

5.1 什么是延遲消息

延遲消息:生產(chǎn)者發(fā)送消息時(shí)指定一個(gè)時(shí)間,消費(fèi)者不會(huì)立刻收到消息,而是在指定時(shí)間之后才會(huì)收到消息

延遲任務(wù):一定時(shí)間之后才會(huì)執(zhí)行的任務(wù)

在這里插入圖片描述

5.2 死信交換機(jī)

當(dāng)一個(gè)隊(duì)列中的某條消息滿足下列情況之一時(shí),就會(huì)成為死信(dead letter):

  1. 消費(fèi)者使用 basic.reject 或 basic.nack 聲明消費(fèi)失敗,并且消息的 requeue 參數(shù)設(shè)置為 false
  2. 過期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過期時(shí)間),消息超時(shí)后無人消費(fèi)
  3. 要投遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信

如果隊(duì)列通過 dead-letter-exchange 屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)

在這里插入圖片描述

利用死信交換機(jī)的特點(diǎn),可以實(shí)現(xiàn)發(fā)送延遲消息的功能

5.3 延遲消息插件(推薦使用)

5.3.1 下載并安裝延遲插件

RabbitMQ 的官方推出了一個(gè)插件,原生支持延遲消息功能。該插件的原理是設(shè)計(jì)了一種支持延遲消息功能的交換機(jī),當(dāng)消息投遞到交換機(jī)后,可以將消息暫存一段時(shí)間,時(shí)間到了之后再將消息投遞到隊(duì)列中

插件的下載地址:rabbitmq-delayed-message-exchange

在這里插入圖片描述


下載完插件后,運(yùn)行以下指令,在輸出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安裝目錄

sudo docker inspect rabbitmq

在這里插入圖片描述

然后進(jìn)入 RabbitMQ 的插件的安裝目錄,將剛才下載的插件上傳到該目錄下

一般與 docker 相關(guān)的目錄只有 root 用戶才有權(quán)限訪問,所以我們需要先打開 docker 目錄的部分權(quán)限(耗時(shí)可能較長)

sudo chmod +rx -R /var/lib/docker

接著打開/var/lib/docker/volumes/rabbitmq-plugins/_data目錄的寫權(quán)限(如果修改權(quán)限不生效,請切換到 root 用戶執(zhí)行指令

sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data

將剛才下載的插件上傳到/var/lib/docker/volumes/rabbitmq-plugins/_data目錄

上傳成功后將/var/lib/docker/volumes/rabbitmq-plugins/_data目錄的權(quán)限復(fù)原

sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data

最后進(jìn)入容器內(nèi)部,運(yùn)行指令安裝插件,安裝完成后退出容器內(nèi)部

sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,說明插件安裝成功了

在這里插入圖片描述

5.3.2 安裝插件時(shí)可能遇到的問題

如果你遇到了以下錯(cuò)誤,在執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange指令前先執(zhí)行以下指令

在這里插入圖片描述

chmod 400 /var/lib/rabbitmq/.erlang.cookie

5.3.3 在 Java 代碼中發(fā)送延遲消息

聲明延遲交換機(jī)

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

聲明隊(duì)列和延遲交換機(jī),并將隊(duì)列和延遲交換機(jī)綁定在一起

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),key = "delay"
))
public void listenDelayQueue(String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消費(fèi)者收到了 delay.queue的消息: " + message + ",時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}

編寫測試方法,測試發(fā)送延遲消息

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000); // 毫秒return message;}});SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("發(fā)送消息成功!發(fā)送時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}

發(fā)送延遲消息的本質(zhì)是在消息頭屬性中添加 x-delay 屬性

在這里插入圖片描述

5.3.4 延遲消息的原理和缺點(diǎn)

RabbitMQ 的延遲消息是怎么實(shí)現(xiàn)的呢?RabbitMQ 會(huì)自動(dòng)維護(hù)一個(gè)時(shí)鐘,這個(gè)時(shí)鐘每隔一秒就跳動(dòng)一次,如果對時(shí)鐘的精度要求比較高的,可能還要精確到毫秒,甚至納秒

RabbitMQ 會(huì)為發(fā)送到交換機(jī)的每一條延遲消息創(chuàng)建一個(gè)時(shí)鐘,時(shí)鐘運(yùn)行的過程中需要 CPU 不斷地進(jìn)行計(jì)算。發(fā)送到交換機(jī)的延遲消息數(shù)越多,RabbitMQ 需要維護(hù)的時(shí)鐘就越多,對 CPU 的占用率就越高(Spring 提供的定時(shí)任務(wù)的原理也是類似)

定時(shí)任務(wù)屬于 CPU 密集型任務(wù),中間涉及到的計(jì)算過程對 CPU 來說壓力是很大的,所以說,采用延遲消息會(huì)給服務(wù)器的 CPU 帶來更大的壓力。當(dāng)交換機(jī)中有非常多的延遲消息時(shí),對 CPU 的壓力就會(huì)特別大

所以說,延遲消息適用于延遲時(shí)間較短的場景

5.4 取消超時(shí)訂單

設(shè)置 30 分鐘后檢測訂單支付狀態(tài)實(shí)現(xiàn)起來非常簡單,但是存在兩個(gè)問題:

  1. 如果并發(fā)較高,30分鐘可能堆積消息過多,對 MQ 壓力很大
  2. 大多數(shù)訂單在下單后 1 分鐘內(nèi)就會(huì)支付,但消息需要在 MQ 中等待30分鐘,浪費(fèi)資源

在這里插入圖片描述

在這里插入圖片描述

5.5 發(fā)送延遲檢測訂單的消息

我們定義一個(gè)實(shí)體類,用于記錄延遲消息的內(nèi)容和延遲消息的延遲時(shí)間列表(該實(shí)體類也是延遲消息的類型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class MultipleDelayMessage<T> {private T data;private List<Long> delayMillis;public MultipleDelayMessage() {}public MultipleDelayMessage(T data, Long... delayMillis) {this.data = data;this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));}public MultipleDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));}public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {return new MultipleDelayMessage<>(data, delayMillis);}public boolean hasNextDelay() {return !delayMillis.isEmpty();}public Long removeNextDelay() {return delayMillis.remove(0);}public T getData() {return data;}public void setData(T data) {this.data = data;}public List<Long> getDelayMillis() {return delayMillis;}public void setDelayMillis(List<Long> delayMillis) {this.delayMillis = delayMillis;}@Overridepublic String toString() {return "MultipleDelayMessage{" +"data=" + data +", delayMillis=" + delayMillis +'}';}}

我們再定義一個(gè)發(fā)送延遲消息的消息處理器,供所有服務(wù)使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;public class DelayMessagePostProcessor implements MessagePostProcessor {private final Integer delay;public DelayMessagePostProcessor(Integer delay) {this.delay = delay;}@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}}

改造后的發(fā)送延遲消息的測試方法

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("發(fā)送消息成功!發(fā)送時(shí)間:" + simpleDateFormat.format(System.currentTimeMillis()));
}
http://www.risenshineclean.com/news/2892.html

相關(guān)文章:

  • 網(wǎng)站怎樣做鏈接微信怎么推廣
  • 個(gè)人怎么做課程網(wǎng)站seo權(quán)重是什么意思
  • 合肥做網(wǎng)站域名的公司怎么制作小程序
  • 鄂州手機(jī)網(wǎng)站建設(shè)百度收錄軟件
  • wordpress 寫php頁面跳轉(zhuǎn)seo代碼優(yōu)化包括哪些
  • 廣州做網(wǎng)站 漢獅網(wǎng)絡(luò)抖音seo源碼搭建
  • 肇慶企業(yè)網(wǎng)站關(guān)鍵詞優(yōu)化教程哪里有網(wǎng)頁設(shè)計(jì)公司
  • 建工網(wǎng)校官網(wǎng)app百度推廣優(yōu)化師培訓(xùn)
  • 網(wǎng)站制作的重要流程圖百度排行榜前十名
  • 網(wǎng)站首頁優(yōu)化營銷型網(wǎng)站外包
  • 哈爾濱 房產(chǎn)網(wǎng)站建設(shè)成都seo專家
  • 軟件公司 網(wǎng)站模板網(wǎng)站外鏈平臺(tái)
  • 自己黑自己做的網(wǎng)站找相似圖片 識(shí)別
  • 石家莊網(wǎng)站建設(shè)找哪家百度推廣關(guān)鍵詞規(guī)劃師
  • 提供網(wǎng)站制作公司地址常用的關(guān)鍵詞挖掘工具
  • 舟山做網(wǎng)站seo關(guān)鍵詞排名優(yōu)化品牌
  • 網(wǎng)站 建設(shè) 標(biāo)準(zhǔn)方案網(wǎng)站平臺(tái)都有哪些
  • 網(wǎng)站橫幅js代碼如何策劃一個(gè)營銷方案
  • 做網(wǎng)站群發(fā)外鏈平臺(tái)
  • 濟(jì)南專業(yè)做網(wǎng)站的公司哪家好信息流優(yōu)化師證書
  • 怎么用flash做網(wǎng)站網(wǎng)絡(luò)營銷手段有哪些方式
  • 網(wǎng)站設(shè)計(jì)方法制作公司網(wǎng)站的公司
  • 做網(wǎng)站的哪里好站長工具seo綜合查詢columbu cat
  • 推廣做任務(wù) 有哪些網(wǎng)站南寧網(wǎng)站推廣營銷
  • 自己編輯網(wǎng)站怎么做的域名備案
  • 網(wǎng)站群建設(shè)目標(biāo)網(wǎng)絡(luò)營銷經(jīng)典案例
  • php可以做視頻網(wǎng)站如何做好網(wǎng)絡(luò)營銷工作
  • 數(shù)據(jù)庫網(wǎng)站開發(fā)外文翻譯競價(jià)托管代運(yùn)營
  • 做網(wǎng)站需要租服務(wù)器嗎線上推廣平臺(tái)都有哪些
  • 自己做的網(wǎng)站怎么綁域名杭州seo網(wǎng)絡(luò)公司