wordpress虛擬主機(jī)安裝教程seo如何進(jìn)行優(yōu)化
強烈建議在看本篇博客之前快速瀏覽文章:RabbitMQ基礎(chǔ)有這一篇就夠了
RabbitMQ高級篇
- 0. 前言
- 1. 發(fā)送者的可靠性
- 1.1 生產(chǎn)者重試機(jī)制
- 1.2 生產(chǎn)者確認(rèn)機(jī)制
- 1.3 實現(xiàn)生產(chǎn)者確認(rèn)
- 2. MQ的可靠性
- 2.1 MQ持久化
- 2.2 LazyQueue
- 3. 消費者的可靠性
- 3.1 消費者確認(rèn)機(jī)制
- 3.2 失敗重試策略
- 3.3 業(yè)務(wù)冪等性
- 3.3.1 唯一消息ID
- 3.3.2 業(yè)務(wù)判斷
- 4. 延遲消息
- 4.1 死信交換機(jī)
- 4.2 取消超時訂單案例
- 常見面試題
0. 前言
消息從生產(chǎn)者到消費者的每一步都可能導(dǎo)致消息丟失:
- 發(fā)送消息時丟失:
- 生產(chǎn)者發(fā)送消息時連接MQ失敗
- 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange
- 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue
- 消息到達(dá)MQ后,處理消息的進(jìn)程發(fā)生異常
- MQ導(dǎo)致消息丟失:
- 消息到達(dá)MQ,保存到隊列后,尚未消費就突然宕機(jī)
- 消費者處理消息時:
- 消息接收后尚未處理突然宕機(jī)
- 消息接收后處理過程中拋出異常
解決消息丟失問題,保證MQ的可靠性,就必須從3個方面入手:
- 確保生產(chǎn)者一定把消息發(fā)送到MQ
- 確保MQ不會將消息弄丟
- 確保消費者一定要處理消息
1. 發(fā)送者的可靠性
1.1 生產(chǎn)者重試機(jī)制
生產(chǎn)者發(fā)送消息時,出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。SpringAMQP
提供的消息發(fā)送時的重試機(jī)制。即:當(dāng)RabbitTemplate
與MQ連接超時后,多次重試。
修改publisher
模塊的application.yaml
文件,添加下面的內(nèi)容:
spring:rabbitmq:connection-timeout: 1s # 設(shè)置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機(jī)制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數(shù),下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數(shù)這段配置代碼是針對 Spring 框架中 RabbitMQ 的設(shè)置,主要用于配置 RabbitMQ 的連接和消息發(fā)送的重試機(jī)制。下面是對每個配置項的詳細(xì)解釋:1. spring.rabbitmq.connection-timeout
- RabbitMQ 連接的超時時間。
- 表示連接超時時間為 1 秒。如果在 1 秒內(nèi)無法建立連接,程序?qū)伋霎惓!?. spring.rabbitmq.template.retry
這個部分配置了消息發(fā)送時的重試策略。a. enabled
- 開啟或關(guān)閉重試機(jī)制。
- 表示開啟超時重試機(jī)制。如果消息發(fā)送失敗,Spring 會自動嘗試重新發(fā)送消息。b. initial-interval
- 設(shè)置首次重試的等待時間。
- 表示首次重試的等待時間為 1 秒。如果第一次發(fā)送失敗,程序?qū)⒃?1 秒后再次嘗試發(fā)送。c. multiplier
- 重試時等待時間的倍數(shù)。
- 表示每次重試的等待時間不增加。計算公式為: 下次等待時長 = initial-interval × multiplier。 在此配置中,所有重試的等待時間都將保持在 1 秒,因為乘以 1 不會改變初始間隔。d. `max-attempts`
- 設(shè)置最大重試次數(shù)。
- 表示最多會嘗試 3 次發(fā)送消息。如果消息在 3 次嘗試后仍然無法發(fā)送成功,則會拋出異常并終止重試。
特別注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說多次重試等待的過程中,當(dāng)前線程是被阻塞的。
如果對于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請合理配置等待時長和重試次數(shù),當(dāng)然也可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼。
1.2 生產(chǎn)者確認(rèn)機(jī)制
只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無需考慮這種問題。不過,在少數(shù)情況下,也會出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:
- MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常
- 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange(程序員書寫問題)
- 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無法路由(程序員書寫問題)
針對上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher Confirm和Publisher Return兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會根據(jù)消息處理的情況返回不同的回執(zhí)。返回的結(jié)果有以下幾種情況:
- 消息投遞到了MQ,但是路由失敗。此時會通過Publisher Return返回路由異常原因,然后返回ACK,告知投遞成功**(例如:成功送到交換機(jī)但是沒有綁定隊列,一般是程序員忘記添加)**
- 路由失敗情況1:發(fā)送者成功發(fā)送到MQ,但是exchange沒有綁定一個queue,一般是程序員書寫問題;
- 路由失敗情況2:發(fā)送者成功發(fā)送到MQ,但是RountingKey 和 BindingKey都不一致,一般是程序員書寫問題;
- 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功
- 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功
- 其它情況都會返回NACK,告知投遞失敗(比如持久化消息沒有寫到磁盤等情況)
其中ack
和nack
屬于Publisher Confirm
機(jī)制,ack
是投遞成功;nack
是投遞失敗。而return
則屬于Publisher Return
機(jī)制。默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過配置文件來開啟。
1.3 實現(xiàn)生產(chǎn)者確認(rèn)
在publisher
模塊的application.yaml
中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型publisher-returns: true # 開啟publisher return機(jī)制
這里publisher-confirm-type
有三種模式可選:
none
:關(guān)閉confirm機(jī)制simple
:同步阻塞等待MQ的回執(zhí)correlated
:MQ異步回調(diào)返回回執(zhí)
測試案例:
每個RabbitTemplate
只能配置一個ReturnCallback
,因此我們可以在配置類中統(tǒng)一設(shè)置。在publisher
模塊定義一個配置類:
package com.itheima.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發(fā)return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
定義ConfirmCallback:
由于每個消息發(fā)送時的處理邏輯不一定相同,因此ConfirmCallback
需要在每次發(fā)消息時定義。具體來說,是在調(diào)用RabbitTemplate
中的convertAndSend
方法時,多傳遞一個參數(shù):
這里的CorrelationData中包含兩個核心的東西:
- id:消息的唯一標(biāo)示,MQ對不同的消息的回執(zhí)以此做判斷,避免混淆
- SettableListenableFuture:回執(zhí)結(jié)果的Future對象
將來MQ的回執(zhí)就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來處理消息回執(zhí):
新建一個測試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback
:
@Test
void testPublisherConfirm() {// 1.創(chuàng)建CorrelationDataCorrelationData cd = new CorrelationData();// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發(fā)生異常時的處理邏輯,基本不會觸發(fā)log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí)log.debug("發(fā)送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發(fā)送消息,參數(shù)依次為:交換機(jī)名稱、RountingKey、需要發(fā)送的消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
執(zhí)行結(jié)果如下:
由于傳遞的RoutingKey是錯誤的,路由失敗后,觸發(fā)了return callback,同時也收到了ack。當(dāng)我們修改為正確的RoutingKey以后,就不會觸發(fā)return callback了,只收到ack。而如果連交換機(jī)都是錯誤的,則只會收到nack。
2. MQ的可靠性
消息到達(dá)MQ以后,如果MQ不能及時保存,也會導(dǎo)致消息丟失,所以MQ的可靠性也非常重要。
2.1 MQ持久化
為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲的臨時數(shù)據(jù),重啟后就會消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:
- 交換機(jī)持久化
- 隊列持久化
- 消息持久化
交換機(jī)持久化:在控制臺的Exchanges
頁面,添加交換機(jī)時可以配置交換機(jī)的Durability
參數(shù),設(shè)置為Durable就是持久化模式,Transient就是臨時模式。
隊列持久化:在控制臺的Queues頁面,添加隊列時,同樣可以配置隊列的Durability參數(shù)。
消息持久化:在控制臺發(fā)送消息的時候,可以添加很多參數(shù),而消息的持久化是要配置一個properties。
注意:在開啟持久化機(jī)制以后,如果同時還開啟了生產(chǎn)者確認(rèn),那么MQ會在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。
2.2 LazyQueue
在默認(rèn)情況下,RabbitMQ會將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會導(dǎo)致消息積壓,比如:
- 消費者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
- 消息發(fā)送量激增,超過了消費者處理速度
- 消費者處理業(yè)務(wù)發(fā)生阻塞
為什么要提出LazyQueue的原因,PageOut阻塞=================
一旦出現(xiàn)消息堆積問題,RabbitMQ的內(nèi)存占用就會越來越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時RabbitMQ會將內(nèi)存消息刷到磁盤上,這個行為成為PageOut. PageOut會耗費一段時間,并且會阻塞隊列進(jìn)程。因此在這個過程中RabbitMQ不會再處理新的消息,生產(chǎn)者的所有請求都會被阻塞。
為了解決這個問題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues
的模式,也就是惰性隊列。惰性隊列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存;
- 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存(也就是懶加載);
- 支持?jǐn)?shù)百萬條的消息存儲;
而在3.12版本之后,LazyQueue已經(jīng)成為所有隊列的默認(rèn)格式。因此官方推薦升級MQ為3.12版本或者所有隊列都設(shè)置為LazyQueue模式。
配置Lazy模式1:控制臺配置Lazy模式
在添加隊列的時候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊列為Lazy模式:
配置Lazy模式2:代碼配置Lazy模式
在利用SpringAMQP
聲明隊列的時候,添加x-queue-mod=lazy
參數(shù)也可設(shè)置隊列為Lazy
模式:
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 開啟Lazy模式.build();
}
也可以基于注解來聲明隊列并設(shè)置為Lazy
模式:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
3. 消費者的可靠性
3.1 消費者確認(rèn)機(jī)制
問題提出:當(dāng)RabbitMQ向消費者投遞消息以后,需要知道消費者的處理狀態(tài)如何。因為消息投遞給消費者并不代表就一定被正確消費了,可能出現(xiàn)的故障有很多,比如:
- 消息投遞的過程中出現(xiàn)了網(wǎng)絡(luò)故障
- 消費者接收到消息后突然宕機(jī)
- 消費者接收到消息后,因處理不當(dāng)導(dǎo)致異常
為了確認(rèn)消費者是否成功處理消息,RabbitMQ
提供了消費者確認(rèn)機(jī)制(Consumer Acknowledgement)
。即:當(dāng)消費者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個回執(zhí),告知RabbitMQ
自己消息處理狀態(tài)。回執(zhí)有三種可選值:
- ack:成功處理消息,RabbitMQ從隊列中刪除該消息
- nack:消息處理失敗,RabbitMQ需要再次投遞消息
- 網(wǎng)絡(luò)延遲,或者自身問題處理失敗
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息
- 例如發(fā)送過來的json格式存在問題,再次重試還是存在問題的,所以直接決絕
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP
幫我們實現(xiàn)了消息確認(rèn)。并允許我們通過配置文件設(shè)置ACK處理方式,有三種模式:
- none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
- manual:手動模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活
- auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,當(dāng)業(yè)務(wù)正常執(zhí)行時則自動返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時,根據(jù)異常判斷返回不同結(jié)果:
- 如果是業(yè)務(wù)異常,會自動返回nack;
- 如果是消息處理或校驗異常,自動返回reject;
通過下面的配置可以修改SpringAMQP
的ACK
處理方式(消費者端):
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理acknowledge-mode: auto # 自動ack
3.2 失敗重試策略
當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue(重入隊)到隊列,再重新發(fā)送給消費者。如果消費者再次執(zhí)行依然出錯,消息會再次requeue到隊列,再次投遞,直到消息處理成功為止。
極端情況就是消費者一直無法執(zhí)行成功,那么消息requeue就會無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:
為了應(yīng)對上述情況Spring又提供了消費者失敗重試機(jī)制:在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。
spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數(shù)stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer
服務(wù),重復(fù)之前的測試。可以發(fā)現(xiàn):
- 消費者在失敗后消息沒有重新回到
MQ
無限重新投遞,而是在本地重試了3次 - 本地重試3次以后,拋出了
AmqpRejectAndDontRequeueException
異常。查看RabbitMQ
控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP
返回的是reject
;
失敗處理策略:
本地測試達(dá)到最大重試次數(shù)后,消息會被丟棄。這在某些對于消息可靠性要求較高的業(yè)務(wù)場景下,顯然不太合適了。因此Spring
允許**自定義重試次數(shù)耗盡后的消息處理策略**
,這個策略是由MessageRecovery
接口來定義的,它有3個不同實現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接reject
,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer
:重試耗盡后,返回nack
,消息重新入隊RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。完整代碼如下:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
// 根據(jù)配置文件中的屬性決定是否加載這個配置類。
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}// 關(guān)聯(lián)交換機(jī)和隊列@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
3.3 業(yè)務(wù)冪等性
冪等是一個數(shù)學(xué)概念,用函數(shù)表達(dá)式來描述是這樣的:f(x) = f(f(x)) 。在程序開發(fā)中,則是指同一個業(yè)務(wù),執(zhí)行一次或多次對業(yè)務(wù)狀態(tài)的影響是一致的。
常見的冪等
- 根據(jù)id刪除數(shù)據(jù)
- 查詢數(shù)據(jù)
- 新增數(shù)據(jù)
常見的非冪等
- 取消訂單,恢復(fù)庫存的業(yè)務(wù)。如果多次恢復(fù)就會出現(xiàn)庫存重復(fù)增加的情況
- 退款業(yè)務(wù)。重復(fù)退款對商家而言會有經(jīng)濟(jì)損失。
非冪等性實際案例:
- 情況1:消息被重復(fù)消費,如果消費者和mq之間的網(wǎng)絡(luò)連接斷開,消費者的ack未能成功發(fā)送到mq,那么等到連接好了之后,mq又會重新發(fā)送消息,此時消息重復(fù)被消費。如果這個消息是用于扣減庫存的,那么就會出現(xiàn)問題。
- 情況2:用戶在支付服務(wù)完成支付后,MQ通知交易業(yè)務(wù)標(biāo)記訂單為已支付,交易服務(wù)標(biāo)記成功后返回ack給mq,但是此時交易服務(wù)和MQ之間網(wǎng)絡(luò)故障,ack未被mq收到,mq可能認(rèn)為交易服務(wù)宕機(jī),消息重新入隊,此時用戶又進(jìn)行了退款,交易服務(wù)立馬將訂單狀態(tài)修改為退款中,而mq和交易服務(wù)之間的網(wǎng)絡(luò)又恢復(fù)了,mq又將消息發(fā)送給交易服務(wù),其又將訂單修改為已支付,此時出現(xiàn)問題。
必須想辦法保證消息處理的冪等性。這里給出兩種方案:
- 唯一消息ID
- 業(yè)務(wù)狀態(tài)判斷
3.3.1 唯一消息ID
給每個消息都設(shè)置一個唯一id,利用id區(qū)分是否是重復(fù)消息:
- 每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
- 消費者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫
- 如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復(fù)消息放棄處理。
SpringAMQP
的MessageConverter
自帶了MessageID
的功能,我們只要開啟這個功能即可。以Jackson
的消息轉(zhuǎn)換器為例
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉(zhuǎn)換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息jjmc.setCreateMessageIds(true);return jjmc;
}
缺點:業(yè)務(wù)侵入、且有數(shù)據(jù)庫的操作影響業(yè)務(wù)性能
3.3.2 業(yè)務(wù)判斷
業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來判斷是否是重復(fù)的請求或消息,不同的業(yè)務(wù)場景判斷的思路也不一樣。例如當(dāng)前案例中,處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時判斷訂單狀態(tài)是否是未支付,如果不是未支付狀態(tài)則證明訂單已經(jīng)被處理過(已支付,申請退款等狀態(tài)),無需重復(fù)處理。相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫,所以我更推薦使用業(yè)務(wù)判斷的方案。
以支付修改訂單的業(yè)務(wù)為例,修改OrderServiceImpl
中的markOrderPaySuccess
方法:
@Override
public void markOrderPaySuccess(Long orderId) {// 1.查詢訂單Order old = getById(orderId);// 2.判斷訂單狀態(tài)// 訂單的狀態(tài),1、未付款 2、已付款,未發(fā)貨 3、已發(fā)貨,未確認(rèn) 4、確認(rèn)收貨,交易成功5、交易取消,訂單關(guān)閉6、交易結(jié)束,已評價if (old == null || old.getStatus() != 1) {// 訂單不存在或者訂單狀態(tài)不是1,放棄處理return;}// 3.嘗試更新訂單Order order = new Order();order.setId(orderId);order.setStatus(2);order.setPayTime(LocalDateTime.now());updateById(order);
}// 考慮到線程安全問題,可以進(jìn)行合并
@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}
4. 延遲消息
在電商的支付業(yè)務(wù)中,對于一些庫存有限的商品,為了更好的用戶體驗,通常都會在用戶下單時立刻扣減商品庫存。例如電影院購票、高鐵購票,下單后就會鎖定座位資源,其他人無法重復(fù)購買。
但是這樣就存在一個問題,假如用戶下單后一直不付款,就會一直占有庫存資源,導(dǎo)致其他客戶無法正常交易,最終導(dǎo)致商戶利益受損!
因此,電商中通常的做法就是:對于超過一定時間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫存。例如,訂單支付超時時間為30分鐘,則我們應(yīng)該在用戶下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫存。
如何才能準(zhǔn)確的實現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?
像這種在一段時間以后才執(zhí)行的任務(wù),我們稱之為延遲任務(wù),而要實現(xiàn)延遲任務(wù),最簡單的方案就是利用MQ的延遲消息了。
在RabbitMQ中實現(xiàn)延遲消息也有兩種方案:
- 死信交換機(jī)+TTL
- 延遲消息插件
4.1 死信交換機(jī)
注意:要確保normal.direct
和 dlx.direct
的RountingKey
一致
4.2 取消超時訂單案例
用戶下單完成后,發(fā)送15分鐘延遲消息,在15分鐘后接收消息,檢查支付狀態(tài):
- 已支付:更新訂單狀態(tài)為已支付
- 未支付:更新訂單狀態(tài)為關(guān)閉訂單,恢復(fù)商品庫存
查詢支付狀態(tài)有兩次:
- 查詢本地訂單狀態(tài),如果已經(jīng)正常通知了,支付和交易服務(wù)的通知正常著,訂單狀態(tài)已經(jīng)修改未為支付了,此時直接結(jié)束即可。
- 如果本地查詢到的訂單狀態(tài)不是已支付,那么有可能是沒能通知到,此時需要去向支付服務(wù)查詢支付流水狀態(tài),如果是已支付,則修改,如果不是,那么就說明超時了,則取消訂單。
定義常量:無論是消息發(fā)送還是接收都是在交易服務(wù)完成,因此我們在trade-service中定義一個常量類,用于記錄交換機(jī)、隊列、RoutingKey等常量:
package com.hmall.trade.constants;public interface MQConstants {String DELAY_EXCHANGE_NAME = "trade.delay.direct";String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";String DELAY_ORDER_KEY = "delay.order.query";
}
配置MQ: 在trade-service模塊的pom.xml中引入amqp的依賴:
<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在trade-service
的application.yaml
中添加MQ的配置:
spring:rabbitmq:host: 192.168.150.101port: 5672virtual-host: /hmallusername: hmallpassword: 123
改造下單業(yè)務(wù),發(fā)送延遲消息: 在下單完成后,發(fā)送延遲消息,查詢支付狀態(tài)。修改trade-service
模塊的com.hmall.trade.service.impl.OrderServiceImpl
類的createOrder
方法,添加消息發(fā)送的代碼:
編寫查詢支付狀態(tài)接口:由于MQ消息處理時需要查詢支付狀態(tài),因此要在pay-service模塊定義一個這樣的接口,并提供對應(yīng)的FeignClient。 首先,在hm-api模塊定義三個類:
PayOrderDTO
:支付單的數(shù)據(jù)傳輸實體PayClient
:支付系統(tǒng)的Feign客戶端PayClientFallback
:支付系統(tǒng)的fallback邏輯
PayOrderDTO
代碼如下:
package com.hmall.api.dto;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** <p>* 支付訂單* </p>*/
@Data
@ApiModel(description = "支付單數(shù)據(jù)傳輸實體")
public class PayOrderDTO {@ApiModelProperty("id")private Long id;@ApiModelProperty("業(yè)務(wù)訂單號")private Long bizOrderNo;@ApiModelProperty("支付單號")private Long payOrderNo;@ApiModelProperty("支付用戶id")private Long bizUserId;@ApiModelProperty("支付渠道編碼")private String payChannelCode;@ApiModelProperty("支付金額,單位分")private Integer amount;@ApiModelProperty("付類型,1:h5,2:小程序,3:公眾號,4:掃碼,5:余額支付")private Integer payType;@ApiModelProperty("付狀態(tài),0:待提交,1:待支付,2:支付超時或取消,3:支付成功")private Integer status;@ApiModelProperty("拓展字段,用于傳遞不同渠道單獨處理的字段")private String expandJson;@ApiModelProperty("第三方返回業(yè)務(wù)碼")private String resultCode;@ApiModelProperty("第三方返回提示信息")private String resultMsg;@ApiModelProperty("支付成功時間")private LocalDateTime paySuccessTime;@ApiModelProperty("支付超時時間")private LocalDateTime payOverTime;@ApiModelProperty("支付二維碼鏈接")private String qrCodeUrl;@ApiModelProperty("創(chuàng)建時間")private LocalDateTime createTime;@ApiModelProperty("更新時間")private LocalDateTime updateTime;
}
PayClient
代碼如下:
package com.hmall.api.client;import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {/*** 根據(jù)交易訂單id查詢支付單* @param id 業(yè)務(wù)訂單id* @return 支付單信息*/@GetMapping("/pay-orders/biz/{id}")PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
PayClientFallback
代碼如下:
package com.hmall.api.client.fallback;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {@Overridepublic PayClient create(Throwable cause) {return new PayClient() {@Overridepublic PayOrderDTO queryPayOrderByBizOrderNo(Long id) {return null;}};}
}
最后,在pay-service模塊的PayController中實現(xiàn)該接口:
@ApiOperation("根據(jù)id查詢支付單")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}
監(jiān)聽消息,查詢支付狀態(tài):在trader-service編寫一個監(jiān)聽器,監(jiān)聽延遲消息,查詢訂單支付狀態(tài):
package com.hmall.trade.listener;import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.trade.constants.MQConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {private final IOrderService orderService;private final PayClient payClient;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MQConstants.DELAY_ORDER_QUEUE_NAME),exchange = @Exchange(name = MQConstants.DELAY_EXCHANGE_NAME, delayed = "true"),key = MQConstants.DELAY_ORDER_KEY))public void listenOrderDelayMessage(Long orderId){// 1.查詢訂單Order order = orderService.getById(orderId);// 2.檢測訂單狀態(tài),判斷是否已支付if(order == null || order.getStatus() != 1){// 訂單不存在或者已經(jīng)支付return;}// 3.未支付,需要查詢支付流水狀態(tài)PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);// 4.判斷是否支付if(payOrder != null && payOrder.getStatus() == 3){// 4.1.已支付,標(biāo)記訂單狀態(tài)為已支付orderService.markOrderPaySuccess(orderId);}else{// TODO 4.2.未支付,取消訂單,回復(fù)庫存orderService.cancelOrder(orderId);}}
}
常見面試題
問題1:如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性?
- 首先,支付服務(wù)會在用戶支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。
- 其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費者確認(rèn)、消費者失敗重試等策略,確保消息投遞和處理的可靠性。同時也開啟了MQ的持久化,避免因服務(wù)宕機(jī)導(dǎo)致消息丟失。
- 最后,我們還在交易服務(wù)更新訂單狀態(tài)時做了業(yè)務(wù)冪等判斷,避免因消息重復(fù)消費導(dǎo)致訂單狀態(tài)異常。
問題2:如果交易服務(wù)消息處理失敗,有沒有什么兜底方案?
我們可以在交易服務(wù)設(shè)置定時任務(wù),定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。