大連網(wǎng)站推廣公司種子搜索神器 bt 下載
MQ服務(wù)異步通信
- MQ服務(wù)異步通信
- a.消息可靠性
- 1) 生產(chǎn)者消息確認(rèn)
- 2) 消息持久化
- 3) 消費(fèi)者消息確認(rèn)
- 4) 消費(fèi)者失敗重試
- 4.a) 本地重試
- 4.b) 失敗策略
- b.死信交換機(jī)
- 1) 初識死信交換機(jī)
- 2) TTL
- 3) 延遲隊(duì)列
- a) 安裝延遲隊(duì)列插件
- b) SpringAMQP使用延遲隊(duì)列插件
- c.惰性隊(duì)列
- 1) 消息堆積問題
- 2) 惰性隊(duì)列
MQ服務(wù)異步通信
MQ的一些常見問題
a.消息可靠性
消息可靠性問題
消息從發(fā)送,到消費(fèi)者接收,會經(jīng)歷多個過程:
其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
- 發(fā)送時丟失:
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費(fèi)就宕機(jī)
1) 生產(chǎn)者消息確認(rèn)
生產(chǎn)者確認(rèn)機(jī)制
RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。結(jié)果有兩種請求:
- publisher-confirm,發(fā)送者確認(rèn)
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
注意:
SpringAMQP實(shí)現(xiàn)生產(chǎn)者確認(rèn)
1.修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
說明:
publish-confirm-type
:開啟publisher-confirm,這里支持兩種類型:simple
:同步等待confirm結(jié)果,直到超時correlated
:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
publish-returns
:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallbacktemplate.mandatory
:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
2.每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項(xiàng)目啟動過程中配置ApplicationContextAware:
package cn.itcast.mq.config;@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplate對象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置returnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 記錄日志log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},路由key:{},消息:{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要,重發(fā)消息或通知管理員});}
}
3.發(fā)送消息,指定消息ID、消息ConfirmCallback
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.準(zhǔn)備消息String message = "hello, spring amqp!";// 2.準(zhǔn)備correlationData// 2.1.消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 2.2.準(zhǔn)備ConfirmCallbackcorrelationData.getFuture().addCallback(result -> {// 判斷結(jié)果if (result.isAck()){// ACKlog.debug("消息成功投遞到交換機(jī)!消息ID:{}", correlationData.getId());}else {// NACKlog.error("消息投遞到交換機(jī)失敗!消息ID:{}", correlationData.getId());// 如果有需要,重發(fā)消息或通知管理員}}, ex -> {// 記錄日志log.error("消息發(fā)送失敗!", ex);// 如果有需要,重發(fā)消息或通知管理員});// 3.發(fā)送消息rabbitTemplate.convertAndSend("amq.topic", "simple.text", message, correlationData);
}
pringAMQP中處理消息確認(rèn)的幾種情況:
- publisher-comfirm:
- 消息成功發(fā)送到exchange,返回ack
- 消息發(fā)送失敗,沒有到達(dá)交換機(jī),返回nack
- 消息發(fā)送過程中出現(xiàn)異常,沒有收到回執(zhí)
- 消息成功發(fā)送到exchange,但沒有路由到queue,調(diào)用ReturnCallback
2) 消息持久化
MQ默認(rèn)是內(nèi)存存儲消息,開啟持久化功能可以確保緩存在MQ中的消息不丟失。8
1.交換機(jī)持久化:
@Bean
public DirectExchange simpleDirect(){// 三個參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時是否自動刪除 return new DirectExchange("simple.direct", true, false);
}
2.隊(duì)列持久化:
@Bean
public Queue simpleQueue(){// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
3.消息持久化,SpringAMQP中的的消息默認(rèn)是持久的,可以通過MessageProperties中的DeliveryMode來指定的:
// 1.準(zhǔn)備消息
Message message = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
3) 消費(fèi)者消息確認(rèn)
RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,即:消費(fèi)者處理消息后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)后才會刪除該消息。
SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會成功處理,因此消息投遞后立即被刪除
配置方式是修改application.yml文件,添加下面配置:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 關(guān)閉ack
4) 消費(fèi)者失敗重試
4.a) 本地重試
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會不斷requeue(重新入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:
可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊(duì)列。
spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費(fèi)者失敗重試initial-interval: 1000 # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數(shù)stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
結(jié)論:
- 開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會返回ack,消息會被丟棄
4.b) 失敗策略
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecoverer接口來處理,它包含三種不同的實(shí)現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
1.首先,定義接收失敗消息的交換機(jī)、隊(duì)列及其綁定關(guān)系:
2.定義RepublishMessageRecoverer的Bean:
@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
如何確保RabbitMQ消息的可靠性?
- 1.開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 2.開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會丟失
- 3.開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 4.開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
b.死信交換機(jī)
1) 初識死信交換機(jī)
當(dāng)一個隊(duì)列中的消息滿足下列情況之一時,可以成為死信(dead letter):
- 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個過期消息,超時無人消費(fèi)
- 要投遞的隊(duì)列消息滿了,無法投遞
如果該隊(duì)列配置了dead-letter-exchange屬性,指定了一個交換機(jī),那么隊(duì)列中的死信就會投遞到這個交換機(jī)中,而這個交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)。
如圖,一個消息被消費(fèi)者拒絕了,變成了死信:
因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會投遞給這個交換機(jī):
如果這個死信交換機(jī)也綁定了一個隊(duì)列,則消息最終會進(jìn)入這個存放死信的隊(duì)列:
另外,隊(duì)列將死信投遞給死信交換機(jī)時,必須知道兩個信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey
這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。
2) TTL
TTL,也就是Time-To-Live。如果一個隊(duì)列中的消息TTL結(jié)束仍未消費(fèi),則會變?yōu)樗佬?#xff0c;ttl超時分為兩種情況:
- 消息所在的隊(duì)列設(shè)置了存活時間
- 消息本身設(shè)置了存活時間
1.在消費(fèi)者Listener中,聲明一組死信交換機(jī)和隊(duì)列,基于注解方式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"
))
public void listenDlQueue(String msg){log.info("消費(fèi)者接收到了dl.queue的延遲消息");
}
2.在config中,要給隊(duì)列設(shè)置超時時間,需要在聲明隊(duì)列時配置x-message-ttl屬性:
package cn.itcast.mq.config;@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlDirectExchange() {return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000) // 設(shè)置隊(duì)列的超時時間,10秒.deadLetterExchange("dl.direct") // 指定死信交換機(jī).deadLetterRoutingKey("dl") // 指定死信RoutingKey.build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}
}
3.發(fā)送消息時,給消息本身設(shè)置超時時間
@Test
public void testTTLMessage(){// 1.準(zhǔn)備消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000") // 設(shè)置超時時間5秒.build();// 2.發(fā)送消息rabbitTemplate.convertAndSend("ttl.direct","ttl", message);// 3.記錄日志log.info("消息已經(jīng)成功發(fā)送");
}
消息超時的兩種方式是?
- 1.給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過ttl時間的消息變?yōu)樗佬?/strong>
- 2.給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過ttl時間后變?yōu)樗佬?/strong>
- 兩者共存時,以時間短的ttl為準(zhǔn)
3) 延遲隊(duì)列
利用TTL結(jié)合死信交換機(jī),我們實(shí)現(xiàn)了消息發(fā)出后,消費(fèi)者延遲收到消息的效果。這種消息模式就稱為**延遲隊(duì)列(Delay Queue)**模式。
延遲隊(duì)列的使用場景包括:
- 延遲發(fā)送短信
- 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動取消
- 預(yù)約工作會議,20分鐘后自動通知所有參會人員
a) 安裝延遲隊(duì)列插件
因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊(duì)列效果。
拉取MQ
docker pull rabbitmq:3.8-management
安裝MQ
docker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=root \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
之前設(shè)定的RabbitMQ的數(shù)據(jù)卷名稱為mq-plugins
,使用下面命令查看數(shù)據(jù)卷:
docker volume inspect mq-plugins
可以得到下面結(jié)果:
可以去對應(yīng)的GitHub頁面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9這個對應(yīng)RabbitMQ的3.8.5以上版本。
接下來,將rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
插件上傳到這個目錄即可:
最后就是安裝了,需要進(jìn)入MQ容器內(nèi)部來執(zhí)行安裝。我的容器名為mq
,所以執(zhí)行下面命令:
docker exec -it mq bash
執(zhí)行時,請將其中的 -it
后面的mq
替換為你自己的容器名.
進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
結(jié)果如下:
b) SpringAMQP使用延遲隊(duì)列插件
DelayExchange的本質(zhì)還是官方的三種交換機(jī),只是添加了延遲功能。因此使用時只需要聲明一個交換機(jī),交換機(jī)的類型可以是任意類型,然后設(shè)定delayed屬性為true即可。
基于注解方式:
基于java代碼的方式:
向這個delay為true的交換機(jī)中發(fā)送消息,一定要給消息添加一個header:x-delay,值為延遲的時間,單位為毫秒:
c.惰性隊(duì)列
1) 消息堆積問題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲消息達(dá)到上限。最早接收到的消息,可能就會成為死信,會被丟棄,這就是消息堆積問題。
解決消息堆積有三種種思路:
- 增加更多消費(fèi)者,提高消費(fèi)速度
- 在消費(fèi)者內(nèi)開啟線程池加快消息處理速度
- 擴(kuò)大隊(duì)列容積,提高堆積上限
2) 惰性隊(duì)列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。
惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時才會從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲
要設(shè)置一個隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時,指定x-queue-mode屬性為lazy即可。可以通過命令行將一個運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一個策略Lazy
:策略名稱,可以自定義"^lazy-queue$"
:用正則表達(dá)式匹配隊(duì)列的名字'{"queue-mode":"lazy"}'
:設(shè)置隊(duì)列模式為lazy模式--apply-to queues
:策略的作用對象,是所有的隊(duì)列
用SpringAMQP聲明惰性隊(duì)列分兩種方式:
1.@Bean的方式
2.注解方式:
消息堆積問題的解決方案?
- 1.隊(duì)列上綁定多個消費(fèi)者,提高消費(fèi)速度
- 2.給消費(fèi)者開啟線程池,提高消費(fèi)速度
- 3.使用惰性隊(duì)列,可以再mq中保存更多消息
惰性隊(duì)列的優(yōu)點(diǎn)有哪些?
- 1.基于磁盤存儲,消息上限高
- 2.沒有間歇性的page-out,性能比較穩(wěn)定
惰性隊(duì)列的缺點(diǎn)有哪些?
- 1.基于磁盤存儲,消息時效性會降低
- 2.性能受限于磁盤的IO