中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

wordpress虛擬主機(jī)安裝教程seo如何進(jìn)行優(yōu)化

wordpress虛擬主機(jī)安裝教程,seo如何進(jìn)行優(yōu)化,網(wǎng)站怎么做的支付,杭州百度推廣公司有幾家強烈建議在看本篇博客之前快速瀏覽文章:RabbitMQ基礎(chǔ)有這一篇就夠了 RabbitMQ高級篇 0. 前言1. 發(fā)送者的可靠性1.1 生產(chǎn)者重試機(jī)制1.2 生產(chǎn)者確認(rèn)機(jī)制1.3 實現(xiàn)生產(chǎn)者確認(rèn) 2. MQ的可靠性2.1 MQ持久化2.2 LazyQueue 3. 消費者的可靠性3.1 消費者確認(rèn)機(jī)制3.2 失敗重試策…

強烈建議在看本篇博客之前快速瀏覽文章:RabbitMQ基礎(chǔ)有這一篇就夠了

RabbitMQ高級篇

  • 0. 前言
  • 1. 發(fā)送者的可靠性
    • 1.1 生產(chǎn)者重試機(jī)制
    • 1.2 生產(chǎn)者確認(rèn)機(jī)制
    • 1.3 實現(xiàn)生產(chǎn)者確認(rèn)
  • 2. MQ的可靠性
    • 2.1 MQ持久化
    • 2.2 LazyQueue
  • 3. 消費者的可靠性
    • 3.1 消費者確認(rèn)機(jī)制
    • 3.2 失敗重試策略
    • 3.3 業(yè)務(wù)冪等性
      • 3.3.1 唯一消息ID
      • 3.3.2 業(yè)務(wù)判斷
  • 4. 延遲消息
    • 4.1 死信交換機(jī)
    • 4.2 取消超時訂單案例
  • 常見面試題

0. 前言

在這里插入圖片描述消息從生產(chǎn)者到消費者的每一步都可能導(dǎo)致消息丟失:

  • 發(fā)送消息時丟失
    • 生產(chǎn)者發(fā)送消息時連接MQ失敗
    • 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange
    • 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue
    • 消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常
  • MQ導(dǎo)致消息丟失
    • 消息到達(dá)MQ,保存到隊列后,尚未消費就突然宕機(jī)
  • 消費者處理消息時
    • 消息接收后尚未處理突然宕機(jī)
    • 消息接收后處理過程中拋出異常

解決消息丟失問題,保證MQ的可靠性,就必須從3個方面入手:

  • 確保生產(chǎn)者一定把消息發(fā)送到MQ
  • 確保MQ不會將消息弄丟
  • 確保消費者一定要處理消息

1. 發(fā)送者的可靠性

1.1 生產(chǎn)者重試機(jī)制

生產(chǎn)者發(fā)送消息時,出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。SpringAMQP提供的消息發(fā)送時的重試機(jī)制。即:當(dāng)RabbitTemplate與MQ連接超時后,多次重試。

修改publisher模塊的application.yaml文件,添加下面的內(nèi)容:

spring:rabbitmq:connection-timeout: 1s # 設(shè)置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機(jī)制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數(shù),下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數(shù)這段配置代碼是針對 Spring 框架中 RabbitMQ 的設(shè)置,主要用于配置 RabbitMQ 的連接和消息發(fā)送的重試機(jī)制。下面是對每個配置項的詳細(xì)解釋:1. spring.rabbitmq.connection-timeout
- RabbitMQ 連接的超時時間。
- 表示連接超時時間為 1 秒。如果在 1 秒內(nèi)無法建立連接,程序?qū)伋霎惓!?. spring.rabbitmq.template.retry
這個部分配置了消息發(fā)送時的重試策略。a. enabled
- 開啟或關(guān)閉重試機(jī)制。
- 表示開啟超時重試機(jī)制。如果消息發(fā)送失敗,Spring 會自動嘗試重新發(fā)送消息。b. initial-interval
- 設(shè)置首次重試的等待時間。
- 表示首次重試的等待時間為 1 秒。如果第一次發(fā)送失敗,程序?qū)⒃?1 秒后再次嘗試發(fā)送。c. multiplier
- 重試時等待時間的倍數(shù)。
- 表示每次重試的等待時間不增加。計算公式為:  下次等待時長 = initial-interval × multiplier。  在此配置中,所有重試的等待時間都將保持在 1 秒,因為乘以 1 不會改變初始間隔。d. `max-attempts`
- 設(shè)置最大重試次數(shù)。
- 表示最多會嘗試 3 次發(fā)送消息。如果消息在 3 次嘗試后仍然無法發(fā)送成功,則會拋出異常并終止重試。

特別注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,當(dāng)前線程是被阻塞的。
如果對于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請合理配置等待時長和重試次數(shù),當(dāng)然也可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼。

1.2 生產(chǎn)者確認(rèn)機(jī)制

只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無需考慮這種問題。不過,在少數(shù)情況下,也會出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:

  • MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange(程序員書寫問題)
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無法路由(程序員書寫問題)

在這里插入圖片描述

針對上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher ConfirmPublisher Return兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會根據(jù)消息處理的情況返回不同的回執(zhí)。返回的結(jié)果有以下幾種情況:

  • 消息投遞到了MQ,但是路由失敗。此時會通過Publisher Return返回路由異常原因,然后返回ACK,告知投遞成功**(例如:成功送到交換機(jī)但是沒有綁定隊列,一般是程序員忘記添加)**
    • 路由失敗情況1:發(fā)送者成功發(fā)送到MQ,但是exchange沒有綁定一個queue,一般是程序員書寫問題;
    • 路由失敗情況2:發(fā)送者成功發(fā)送到MQ,但是RountingKey 和 BindingKey都不一致,一般是程序員書寫問題;
  • 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功
  • 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功
  • 其它情況都會返回NACK,告知投遞失敗(比如持久化消息沒有寫到磁盤等情況)

其中acknack屬于Publisher Confirm機(jī)制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機(jī)制。默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過配置文件來開啟。

1.3 實現(xiàn)生產(chǎn)者確認(rèn)

publisher模塊的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型publisher-returns: true # 開啟publisher return機(jī)制

這里publisher-confirm-type有三種模式可選:

  • none:關(guān)閉confirm機(jī)制
  • simple:同步阻塞等待MQ的回執(zhí)
  • correlated:MQ異步回調(diào)返回回執(zhí)

測試案例

每個RabbitTemplate只能配置一個ReturnCallback,因此我們可以在配置類中統(tǒng)一設(shè)置。在publisher模塊定義一個配置類:

package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發(fā)return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

定義ConfirmCallback:

由于每個消息發(fā)送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時定義。具體來說,是在調(diào)用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數(shù):

在這里插入圖片描述這里的CorrelationData中包含兩個核心的東西:

  • id:消息的唯一標(biāo)示,MQ對不同的消息的回執(zhí)以此做判斷,避免混淆
  • SettableListenableFuture:回執(zhí)結(jié)果的Future對象

將來MQ的回執(zhí)就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來處理消息回執(zhí):

在這里插入圖片描述

新建一個測試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback

@Test
void testPublisherConfirm() {// 1.創(chuàng)建CorrelationDataCorrelationData cd = new CorrelationData();// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發(fā)生異常時的處理邏輯,基本不會觸發(fā)log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí)log.debug("發(fā)送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發(fā)送消息,參數(shù)依次為:交換機(jī)名稱、RountingKey、需要發(fā)送的消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

執(zhí)行結(jié)果如下:

在這里插入圖片描述

由于傳遞的RoutingKey是錯誤的,路由失敗后,觸發(fā)了return callback,同時也收到了ack。當(dāng)我們修改為正確的RoutingKey以后,就不會觸發(fā)return callback了,只收到ack。而如果連交換機(jī)都是錯誤的,則只會收到nack。

2. MQ的可靠性

消息到達(dá)MQ以后,如果MQ不能及時保存,也會導(dǎo)致消息丟失,所以MQ的可靠性也非常重要。

2.1 MQ持久化

為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲的臨時數(shù)據(jù),重啟后就會消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:

  • 交換機(jī)持久化
  • 隊列持久化
  • 消息持久化

交換機(jī)持久化:在控制臺的Exchanges頁面,添加交換機(jī)時可以配置交換機(jī)的Durability參數(shù),設(shè)置為Durable就是持久化模式,Transient就是臨時模式。

在這里插入圖片描述

隊列持久化:在控制臺的Queues頁面,添加隊列時,同樣可以配置隊列的Durability參數(shù)。

在這里插入圖片描述

消息持久化:在控制臺發(fā)送消息的時候,可以添加很多參數(shù),而消息的持久化是要配置一個properties。

在這里插入圖片描述

注意:在開啟持久化機(jī)制以后,如果同時還開啟了生產(chǎn)者確認(rèn),那么MQ會在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。

2.2 LazyQueue

在默認(rèn)情況下,RabbitMQ會將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會導(dǎo)致消息積壓,比如:

  • 消費者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
  • 消息發(fā)送量激增,超過了消費者處理速度
  • 消費者處理業(yè)務(wù)發(fā)生阻塞

為什么要提出LazyQueue的原因,PageOut阻塞=================

一旦出現(xiàn)消息堆積問題,RabbitMQ的內(nèi)存占用就會越來越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時RabbitMQ會將內(nèi)存消息刷到磁盤上,這個行為成為PageOut. PageOut會耗費一段時間,并且會阻塞隊列進(jìn)程。因此在這個過程中RabbitMQ不會再處理新的消息,生產(chǎn)者的所有請求都會被阻塞。

為了解決這個問題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊列。惰性隊列的特征如下:

  • 接收到消息后直接存入磁盤而非內(nèi)存;
  • 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存(也就是懶加載);
  • 支持?jǐn)?shù)百萬條的消息存儲;

而在3.12版本之后,LazyQueue已經(jīng)成為所有隊列的默認(rèn)格式。因此官方推薦升級MQ為3.12版本或者所有隊列都設(shè)置為LazyQueue模式。

在這里插入圖片描述

配置Lazy模式1:控制臺配置Lazy模式

在添加隊列的時候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊列為Lazy模式:在這里插入圖片描述

配置Lazy模式2:代碼配置Lazy模式

在利用SpringAMQP聲明隊列的時候,添加x-queue-mod=lazy參數(shù)也可設(shè)置隊列為Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy()  // 開啟Lazy模式.build();
}

也可以基于注解來聲明隊列并設(shè)置為Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

3. 消費者的可靠性

3.1 消費者確認(rèn)機(jī)制

問題提出:當(dāng)RabbitMQ向消費者投遞消息以后,需要知道消費者的處理狀態(tài)如何。因為消息投遞給消費者并不代表就一定被正確消費了,可能出現(xiàn)的故障有很多,比如:

  • 消息投遞的過程中出現(xiàn)了網(wǎng)絡(luò)故障
  • 消費者接收到消息后突然宕機(jī)
  • 消費者接收到消息后,因處理不當(dāng)導(dǎo)致異常

為了確認(rèn)消費者是否成功處理消息,RabbitMQ提供了消費者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個回執(zhí),告知RabbitMQ自己消息處理狀態(tài)。回執(zhí)有三種可選值:

  • ack:成功處理消息,RabbitMQ從隊列中刪除該消息
  • nack:消息處理失敗,RabbitMQ需要再次投遞消息
    • 網(wǎng)絡(luò)延遲,或者自身問題處理失敗
  • reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息
    • 例如發(fā)送過來的json格式存在問題,再次重試還是存在問題的,所以直接決絕

在這里插入圖片描述

由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實現(xiàn)了消息確認(rèn)。并允許我們通過配置文件設(shè)置ACK處理方式,有三種模式

  • none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
  • manual:手動模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活
  • auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,當(dāng)業(yè)務(wù)正常執(zhí)行時則自動返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時,根據(jù)異常判斷返回不同結(jié)果:
  • 如果是業(yè)務(wù)異常,會自動返回nack;
  • 如果是消息處理或校驗異常,自動返回reject;

通過下面的配置可以修改SpringAMQPACK處理方式(消費者端):

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理acknowledge-mode: auto # 自動ack

3.2 失敗重試策略

當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue(重入隊)到隊列,再重新發(fā)送給消費者。如果消費者再次執(zhí)行依然出錯,消息會再次requeue到隊列,再次投遞,直到消息處理成功為止。
極端情況就是消費者一直無法執(zhí)行成功,那么消息requeue就會無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:

為了應(yīng)對上述情況Spring又提供了消費者失敗重試機(jī)制:在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。

spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數(shù)stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測試。可以發(fā)現(xiàn):

  • 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
  • 本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject;

失敗處理策略

本地測試達(dá)到最大重試次數(shù)后,消息會被丟棄。這在某些對于消息可靠性要求較高的業(yè)務(wù)場景下,顯然不太合適了。因此Spring允許**自定義重試次數(shù)耗盡后的消息處理策略**,這個策略是由MessageRecovery接口來定義的,它有3個不同實現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。完整代碼如下:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
// 根據(jù)配置文件中的屬性決定是否加載這個配置類。
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}// 關(guān)聯(lián)交換機(jī)和隊列@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

3.3 業(yè)務(wù)冪等性

冪等是一個數(shù)學(xué)概念,用函數(shù)表達(dá)式來描述是這樣的:f(x) = f(f(x)) 。在程序開發(fā)中,則是指同一個業(yè)務(wù),執(zhí)行一次或多次對業(yè)務(wù)狀態(tài)的影響是一致的。

常見的冪等

  • 根據(jù)id刪除數(shù)據(jù)
  • 查詢數(shù)據(jù)
  • 新增數(shù)據(jù)

常見的非冪等

  • 取消訂單,恢復(fù)庫存的業(yè)務(wù)。如果多次恢復(fù)就會出現(xiàn)庫存重復(fù)增加的情況
  • 退款業(yè)務(wù)。重復(fù)退款對商家而言會有經(jīng)濟(jì)損失。

非冪等性實際案例

  • 情況1:消息被重復(fù)消費,如果消費者和mq之間的網(wǎng)絡(luò)連接斷開,消費者的ack未能成功發(fā)送到mq,那么等到連接好了之后,mq又會重新發(fā)送消息,此時消息重復(fù)被消費。如果這個消息是用于扣減庫存的,那么就會出現(xiàn)問題。
  • 情況2:用戶在支付服務(wù)完成支付后,MQ通知交易業(yè)務(wù)標(biāo)記訂單為已支付,交易服務(wù)標(biāo)記成功后返回ack給mq,但是此時交易服務(wù)和MQ之間網(wǎng)絡(luò)故障,ack未被mq收到,mq可能認(rèn)為交易服務(wù)宕機(jī),消息重新入隊,此時用戶又進(jìn)行了退款,交易服務(wù)立馬將訂單狀態(tài)修改為退款中,而mq和交易服務(wù)之間的網(wǎng)絡(luò)又恢復(fù)了,mq又將消息發(fā)送給交易服務(wù),其又將訂單修改為已支付,此時出現(xiàn)問題。

必須想辦法保證消息處理的冪等性。這里給出兩種方案:

  • 唯一消息ID
  • 業(yè)務(wù)狀態(tài)判斷

3.3.1 唯一消息ID

給每個消息都設(shè)置一個唯一id,利用id區(qū)分是否是重復(fù)消息:

  1. 每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
  2. 消費者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫
  3. 如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復(fù)消息放棄處理。

SpringAMQPMessageConverter自帶了MessageID的功能,我們只要開啟這個功能即可。以Jackson的消息轉(zhuǎn)換器為例

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉(zhuǎn)換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息jjmc.setCreateMessageIds(true);return jjmc;
}

在這里插入圖片描述

在這里插入圖片描述
缺點:業(yè)務(wù)侵入、且有數(shù)據(jù)庫的操作影響業(yè)務(wù)性能

3.3.2 業(yè)務(wù)判斷

業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來判斷是否是重復(fù)的請求或消息,不同的業(yè)務(wù)場景判斷的思路也不一樣。例如當(dāng)前案例中,處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時判斷訂單狀態(tài)是否是未支付,如果不是未支付狀態(tài)則證明訂單已經(jīng)被處理過(已支付,申請退款等狀態(tài))無需重復(fù)處理。相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫,所以我更推薦使用業(yè)務(wù)判斷的方案。

在這里插入圖片描述

以支付修改訂單的業(yè)務(wù)為例,修改OrderServiceImpl中的markOrderPaySuccess方法:

@Override
public void markOrderPaySuccess(Long orderId) {// 1.查詢訂單Order old = getById(orderId);// 2.判斷訂單狀態(tài)// 訂單的狀態(tài),1、未付款 2、已付款,未發(fā)貨 3、已發(fā)貨,未確認(rèn) 4、確認(rèn)收貨,交易成功5、交易取消,訂單關(guān)閉6、交易結(jié)束,已評價if (old == null || old.getStatus() != 1) {// 訂單不存在或者訂單狀態(tài)不是1,放棄處理return;}// 3.嘗試更新訂單Order order = new Order();order.setId(orderId);order.setStatus(2);order.setPayTime(LocalDateTime.now());updateById(order);
}// 考慮到線程安全問題,可以進(jìn)行合并
@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}

4. 延遲消息

在電商的支付業(yè)務(wù)中,對于一些庫存有限的商品,為了更好的用戶體驗,通常都會在用戶下單時立刻扣減商品庫存。例如電影院購票、高鐵購票,下單后就會鎖定座位資源,其他人無法重復(fù)購買。

但是這樣就存在一個問題,假如用戶下單后一直不付款,就會一直占有庫存資源,導(dǎo)致其他客戶無法正常交易,最終導(dǎo)致商戶利益受損!

因此,電商中通常的做法就是:對于超過一定時間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫存。例如,訂單支付超時時間為30分鐘,則我們應(yīng)該在用戶下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫存。

如何才能準(zhǔn)確的實現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢

像這種在一段時間以后才執(zhí)行的任務(wù),我們稱之為延遲任務(wù),而要實現(xiàn)延遲任務(wù),最簡單的方案就是利用MQ的延遲消息了。

在RabbitMQ中實現(xiàn)延遲消息也有兩種方案:

  • 死信交換機(jī)+TTL
  • 延遲消息插件

4.1 死信交換機(jī)

在這里插入圖片描述
注意:要確保normal.directdlx.directRountingKey一致

4.2 取消超時訂單案例

用戶下單完成后,發(fā)送15分鐘延遲消息,在15分鐘后接收消息,檢查支付狀態(tài):

  • 已支付:更新訂單狀態(tài)為已支付
  • 未支付:更新訂單狀態(tài)為關(guān)閉訂單,恢復(fù)商品庫存

在這里插入圖片描述

在這里插入圖片描述

查詢支付狀態(tài)有兩次

  • 查詢本地訂單狀態(tài),如果已經(jīng)正常通知了,支付和交易服務(wù)的通知正常著,訂單狀態(tài)已經(jīng)修改未為支付了,此時直接結(jié)束即可。
  • 如果本地查詢到的訂單狀態(tài)不是已支付,那么有可能是沒能通知到,此時需要去向支付服務(wù)查詢支付流水狀態(tài),如果是已支付,則修改,如果不是,那么就說明超時了,則取消訂單。

定義常量:無論是消息發(fā)送還是接收都是在交易服務(wù)完成,因此我們在trade-service中定義一個常量類,用于記錄交換機(jī)、隊列、RoutingKey等常量:

在這里插入圖片描述

package com.hmall.trade.constants;public interface MQConstants {String DELAY_EXCHANGE_NAME = "trade.delay.direct";String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";String DELAY_ORDER_KEY = "delay.order.query";
}

配置MQ: 在trade-service模塊的pom.xml中引入amqp的依賴:

  <!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

trade-serviceapplication.yaml中添加MQ的配置:

spring:rabbitmq:host: 192.168.150.101port: 5672virtual-host: /hmallusername: hmallpassword: 123

改造下單業(yè)務(wù),發(fā)送延遲消息: 在下單完成后,發(fā)送延遲消息,查詢支付狀態(tài)。修改trade-service模塊的com.hmall.trade.service.impl.OrderServiceImpl類的createOrder方法,添加消息發(fā)送的代碼:

在這里插入圖片描述

編寫查詢支付狀態(tài)接口:由于MQ消息處理時需要查詢支付狀態(tài),因此要在pay-service模塊定義一個這樣的接口,并提供對應(yīng)的FeignClient。 首先,在hm-api模塊定義三個類:

在這里插入圖片描述

  • PayOrderDTO:支付單的數(shù)據(jù)傳輸實體
  • PayClient:支付系統(tǒng)的Feign客戶端
  • PayClientFallback:支付系統(tǒng)的fallback邏輯

PayOrderDTO代碼如下:

package com.hmall.api.dto;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** <p>* 支付訂單* </p>*/
@Data
@ApiModel(description = "支付單數(shù)據(jù)傳輸實體")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("業(yè)務(wù)訂單號")private Long bizOrderNo;@ApiModelProperty("支付單號")private Long payOrderNo;@ApiModelProperty("支付用戶id")private Long bizUserId;@ApiModelProperty("支付渠道編碼")private String payChannelCode;@ApiModelProperty("支付金額,單位分")private Integer amount;@ApiModelProperty("付類型,1:h5,2:小程序,3:公眾號,4:掃碼,5:余額支付")private Integer payType;@ApiModelProperty("付狀態(tài),0:待提交,1:待支付,2:支付超時或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨處理的字段")private String expandJson;@ApiModelProperty("第三方返回業(yè)務(wù)碼")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功時間")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超時時間")private LocalDateTime payOverTime;@ApiModelProperty("支付二維碼鏈接")private String qrCodeUrl;@ApiModelProperty("創(chuàng)建時間")private LocalDateTime createTime;@ApiModelProperty("更新時間")private LocalDateTime updateTime;
}

PayClient代碼如下:

package com.hmall.api.client;import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根據(jù)交易訂單id查詢支付單* @param id 業(yè)務(wù)訂單id* @return 支付單信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}

PayClientFallback代碼如下:

package com.hmall.api.client.fallback;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}

最后,在pay-service模塊的PayController中實現(xiàn)該接口:

@ApiOperation("根據(jù)id查詢支付單")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

監(jiān)聽消息,查詢支付狀態(tài):在trader-service編寫一個監(jiān)聽器,監(jiān)聽延遲消息,查詢訂單支付狀態(tài):

在這里插入圖片描述

package com.hmall.trade.listener;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.trade.constants.MQConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {private final IOrderService orderService;private final PayClient payClient;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),key = MQConstants.DELAY_ORDER_KEY))public void listenOrderDelayMessage(Long orderId){// 1.查詢訂單Order order = orderService.getById(orderId);// 2.檢測訂單狀態(tài),判斷是否已支付if(order == null || order.getStatus() != 1){// 訂單不存在或者已經(jīng)支付return;}// 3.未支付,需要查詢支付流水狀態(tài)PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);// 4.判斷是否支付if(payOrder != null && payOrder.getStatus() == 3){// 4.1.已支付,標(biāo)記訂單狀態(tài)為已支付orderService.markOrderPaySuccess(orderId);}else{// TODO 4.2.未支付,取消訂單,回復(fù)庫存orderService.cancelOrder(orderId);}}
}

常見面試題

問題1:如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?

  • 首先,支付服務(wù)會在用戶支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。
  • 其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費者確認(rèn)、消費者失敗重試等策略,確保消息投遞和處理的可靠性。同時也開啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失。
  • 最后,我們還在交易服務(wù)更新訂單狀態(tài)時做了業(yè)務(wù)冪等判斷,避免因消息重復(fù)消費導(dǎo)致訂單狀態(tài)異常。

問題2:如果交易服務(wù)消息處理失敗,有沒有什么兜底方案?
我們可以在交易服務(wù)設(shè)置定時任務(wù),定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。

http://www.risenshineclean.com/news/4957.html

相關(guān)文章:

  • 如何用ip地址做網(wǎng)站軟文推廣哪個平臺好
  • 網(wǎng)站虛擬域名seo指的是搜索引擎
  • 網(wǎng)頁游戲賺錢平臺有哪些沈陽高端關(guān)鍵詞優(yōu)化
  • b2b電子商務(wù)網(wǎng)站介紹chrome官方下載
  • 網(wǎng)站做成app網(wǎng)頁設(shè)計收費標(biāo)準(zhǔn)
  • 一個網(wǎng)站怎么做seo搜索引擎優(yōu)化方式
  • 在百度上建網(wǎng)站怎么建設(shè)今日國內(nèi)重大新聞
  • java做軟件的網(wǎng)站企業(yè)網(wǎng)站建設(shè)案例
  • 石家莊做網(wǎng)站最好的公司有哪些搜狗收錄入口
  • 連州網(wǎng)站建設(shè)關(guān)鍵詞搜索量查詢工具
  • 只做百度移動端網(wǎng)站可以嗎怎樣制作一個網(wǎng)站
  • 微網(wǎng)站怎么做的好處國內(nèi)營銷推廣渠道
  • 茄子直播搜索引擎網(wǎng)站排名優(yōu)化方案
  • 哈爾濱創(chuàng)意網(wǎng)站建設(shè)黃岡網(wǎng)站推廣軟件視頻下載
  • 鹽亭做網(wǎng)站廣州最新新聞事件
  • 做公司網(wǎng)站 國外系統(tǒng)個人網(wǎng)頁生成器
  • 哪個網(wǎng)站可以做高像素動圖優(yōu)化搜索點擊次數(shù)的方法
  • 合肥最好的網(wǎng)站建設(shè)營銷管理
  • 青島網(wǎng)景互聯(lián)網(wǎng)站建設(shè)公司年度關(guān)鍵詞
  • 做承諾的網(wǎng)站上海百度推廣排名
  • 百度網(wǎng)站排名全掉軟件開發(fā)定制
  • 寫論文的好網(wǎng)站寧波網(wǎng)站建設(shè)網(wǎng)站排名優(yōu)化
  • 免費的網(wǎng)站推廣在線推廣百度seo新站優(yōu)化
  • 網(wǎng)站怎樣做多語言切換做app的網(wǎng)站
  • 廣西柳州網(wǎng)站建設(shè)安陽企業(yè)網(wǎng)站優(yōu)化外包
  • 微信微網(wǎng)站教程電商運營推廣是做什么的
  • 合肥網(wǎng)站制作聯(lián)系方式策劃方案怎么做
  • 零基礎(chǔ)學(xué)網(wǎng)站建設(shè)seo優(yōu)化快速排名技術(shù)
  • 石家莊電子商城網(wǎng)站建設(shè)鄭州網(wǎng)站關(guān)鍵詞排名
  • 重慶營銷型網(wǎng)站建設(shè)多少錢如何制作簡單的網(wǎng)頁鏈接