企業(yè)電話認(rèn)證百度seo教程視頻
說(shuō)明:在RabbitMQ消息傳遞過(guò)程中,有以下問(wèn)題:
-
消息沒(méi)發(fā)到交換機(jī)
-
消息沒(méi)發(fā)到隊(duì)列
-
MQ宕機(jī),消息在隊(duì)列中丟失
-
消息者接收到消息后,未能正常消費(fèi)(程序報(bào)錯(cuò)),此時(shí)消息已在隊(duì)列中移除
針對(duì)以上問(wèn)題,提供以下解決方案:
-
消息確認(rèn):確認(rèn)消息是否發(fā)送到交換機(jī)、隊(duì)列;
-
消息持久化:持久化消息,以防MQ宕機(jī)造成消息丟失;
-
消費(fèi)者消息確認(rèn):確認(rèn)消費(fèi)者已正確消費(fèi)消息,才把消息從隊(duì)列中刪除;
消息確認(rèn)
可以使用Rabbit MQ提供的publisher confirm機(jī)制來(lái)避免消息發(fā)送到MQ過(guò)程丟失。具體實(shí)現(xiàn)是,publisher-confirm(發(fā)送者確定)、publisher-return(發(fā)送者回執(zhí)),前者判斷消息到交換機(jī)、后者判斷交換機(jī)到隊(duì)列
publisher-confirm(發(fā)送者確定)
-
消息成功投遞到交換機(jī),返回ack;
-
消息未投遞到交換機(jī),返回nack;
publisher-return(發(fā)送者回執(zhí))
- 消息投遞到交換機(jī),但沒(méi)有到隊(duì)列,返回ack,即失敗原因;
在生產(chǎn)者端添加配置
spring:rabbitmq:# rabbitMQ相關(guān)配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 開(kāi)啟生產(chǎn)者確認(rèn),correlated為異步,simple為同步publisher-confirm-type: correlated# 開(kāi)啟publish-return功能,基于callback機(jī)制publisher-returns: true# 開(kāi)啟消息路由失敗的策略,true是調(diào)用returnCallback方法,false是丟棄消息template:mandatory: true
publisher-return(發(fā)送者回執(zhí))代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 發(fā)送者回執(zhí)實(shí)現(xiàn)*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplate對(duì)象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 設(shè)置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回執(zhí)信息* @param message 信息對(duì)象* @param replyCode 回執(zhí)碼* @param replyText 回執(zhí)內(nèi)容* @param exchange 交換機(jī)* @param routingKey 路由鍵值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息發(fā)送隊(duì)列失敗=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}
publisher-confirm(發(fā)送者確定)代碼
@Testpublic void sendExceptionMessage() {// 路由鍵值String routingKey = "exception";// 消息String message = "This is a exception message";// 給消息設(shè)置一個(gè)唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 編寫(xiě)confirmCallBack回調(diào)函數(shù)correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息發(fā)送交換機(jī)成功log.debug("消息送達(dá)至交換機(jī)成功");} else {// 消息發(fā)送交換機(jī)失敗,打印消息log.error("消息未能送達(dá)至交換機(jī),ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息發(fā)送交換機(jī)異常@Overridepublic void onFailure(Throwable ex) {log.error("消息發(fā)送交換機(jī)異常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}
測(cè)試,設(shè)置一個(gè)不存在的routingKey,被發(fā)送者確認(rèn)(publisher-confirm)捕獲到;
// 路由鍵值
String routingKey = "null";
設(shè)置一個(gè)不存在的路由,被發(fā)送者回執(zhí)(publisher-return)捕獲到;
rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);
消息持久化
消息持久化,是指把消息保存到磁盤中,在RabbitMQ宕機(jī)或者關(guān)機(jī)時(shí),重啟后,消息仍可以保存下來(lái)。消息依賴于交換機(jī)、隊(duì)列,因此持久化消息,同時(shí)也需要持久化交換機(jī)、隊(duì)列。
創(chuàng)建一個(gè)持久化的交換機(jī)、隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交換機(jī)持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三個(gè)參數(shù)分別是:交換機(jī)名、是否持久化、沒(méi)有隊(duì)列與之綁定時(shí)是否自動(dòng)刪除return new DirectExchange("durable.direct",true,false);}/*** 隊(duì)列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交換機(jī)與隊(duì)列綁定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}
發(fā)送一個(gè)持久化的消息
/*** 發(fā)送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 設(shè)置該消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}
打開(kāi)RabbitMQ管理平臺(tái),可以看到"delivery_mode: 2",表示該消息是持久化消息
(源碼:MessageDeliveryMode類)
實(shí)際上,交換機(jī)、隊(duì)列默認(rèn)就是持久化的(durable: true),所以不用特意設(shè)置;
消費(fèi)者消息確認(rèn)
介紹
消費(fèi)者消息確認(rèn),是為了確保消費(fèi)者已經(jīng)消費(fèi)了消息,才讓MQ把該消息刪除;
可通過(guò)在消費(fèi)者的配置文件中增加下面這行配置實(shí)現(xiàn),備選項(xiàng)有以下三個(gè):
-
none:關(guān)閉ack,表示不做處理,消息發(fā)給消費(fèi)者之后就立即被刪除;
-
auto:自動(dòng)ack,表示由Spring檢測(cè)代碼是否出現(xiàn)異常,出現(xiàn)異常則保留消息,沒(méi)有異常則刪除消息;
-
manual:手動(dòng)ack,可根據(jù)業(yè)務(wù)手動(dòng)編寫(xiě)代碼,返回ack;
spring:rabbitmq:listener:simple:# 設(shè)置消息確認(rèn)模式acknowledge-mode: none
測(cè)試:none
可編寫(xiě)代碼測(cè)試,下面是生產(chǎn)者代碼,發(fā)送消息
/*** 發(fā)送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}
消費(fèi)者代碼有問(wèn)題,未能正常消費(fèi)消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}
測(cè)試結(jié)果,程序報(bào)錯(cuò),消息也沒(méi)能保留下來(lái)
測(cè)試:auto
更改設(shè)置為:auto,重試
但是消息未被刪除
這種情況,在實(shí)際開(kāi)發(fā)中是不能允許,可以通過(guò)更改消費(fèi)失敗的重試機(jī)制解決。
消費(fèi)失敗重試機(jī)制
方法一:設(shè)置retry
因?yàn)橄⒈幌M(fèi)失敗,消息會(huì)一直循環(huán)重試,無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力,這種情況可以通過(guò)在消費(fèi)者端添加以下配置,限制失敗重試的條件來(lái)解決:
spring:rabbitmq:listener:simple:retry:# 開(kāi)啟消費(fèi)者失敗重試enabled: true# 初次失敗等待時(shí)長(zhǎng)為1秒initial-interval: 1000# 失敗的等待時(shí)長(zhǎng)倍數(shù),即后一次等待的時(shí)間是前一次等待時(shí)間的多少倍multiplier: 1# 最多重試次數(shù)max-attempts: 3# true 無(wú)狀態(tài) false 有狀態(tài) 如果業(yè)務(wù)中包含事務(wù) 改為falsestateless: true
開(kāi)啟后,控制臺(tái)可以發(fā)現(xiàn),信息不回一直循環(huán)打印,而是打印數(shù)條后停止,日志信息中有提示“Retry Policy Exhausted”(重試策略已用盡)
這種通過(guò)配置的方式,并不會(huì)重試數(shù)次后仍保留消息,而是重試數(shù)次仍失敗,隨即丟棄消息,消息丟失,這在實(shí)際開(kāi)發(fā)中也是不能被允許的。
方法二:路由存儲(chǔ)消息
因此,可以通過(guò)下面這個(gè)方法,把消費(fèi)失敗的消息,通過(guò)交換機(jī)路由到另外的隊(duì)列中存儲(chǔ)起來(lái),等業(yè)務(wù)代碼被修復(fù),再路由回來(lái)消費(fèi)。
代碼如下
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 錯(cuò)誤消息隊(duì)列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 創(chuàng)建一個(gè)交換機(jī),用于路由消費(fèi)失敗的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 創(chuàng)建一個(gè)隊(duì)列,用于存儲(chǔ)消費(fèi)失敗的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 綁定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,當(dāng)消費(fèi)失敗時(shí),把消費(fèi)失敗的消息路由到此隊(duì)列中,路由key為"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}
可以看到,消息消費(fèi)失敗后并沒(méi)有被丟失,而是路由到錯(cuò)誤隊(duì)列中存儲(chǔ)了起來(lái)。因?yàn)殄e(cuò)誤隊(duì)列沒(méi)有設(shè)置RabbitListener,所以可以存儲(chǔ)消息,等帶代碼問(wèn)題被排查出來(lái)后,可以再針對(duì)該隊(duì)列設(shè)置監(jiān)聽(tīng)方法,消費(fèi)這部分錯(cuò)誤的消息。
另外,值得一提的是,消費(fèi)者這邊的控制臺(tái)會(huì)報(bào)一個(gè)警告,提示路由密鑰錯(cuò)誤。我們可以理解,在RabbitMQ底層,會(huì)把消費(fèi)失敗了的消息,統(tǒng)一路由到一個(gè)地方去,而我們這種手動(dòng)把消費(fèi)失敗的消息路由到自定義的隊(duì)列中的方式,打破了這種“默認(rèn)的規(guī)則”,所以報(bào)了一個(gè)這樣的警告。這種警告是在可控范圍內(nèi)的。
總結(jié)
RabbitMQ發(fā)送消息,為了確保消息的可靠性,保證消息能被交換機(jī)、隊(duì)列收到,消息能被正常消費(fèi),而不會(huì)因消費(fèi)失敗而丟失,提供了對(duì)應(yīng)的一系列方法,并且最后還提供了兩種消費(fèi)失敗重試方法,優(yōu)化了消費(fèi)過(guò)程,非常Nice。