wordpress主題下新建頁面網(wǎng)站seo站外優(yōu)化
SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丟失、消息重復(fù)、消息順序、消息順序)
- RabbitMQ常見問題解決方案
- 問題一:消息丟失的解決方案
- (1)生成者丟失消息
- 丟失的情景
- 解決方案1:發(fā)送方確認機制(推薦,最常用)
- 解決方案2:事務(wù)(不推薦,因為性能差)
- (2)MQ丟失消息
- 丟失的情景
- 解決方案:開啟RabbitMQ的持久化+開啟鏡像隊列
- (3)消費者丟失消息
- 丟失的情景 1
- 解決方案:無需解決
- 丟失的情景 2
- 擴展:重試機制
- 解決方案:消費者方確認機制(推薦,最常用)
- 問題二:消息重復(fù)的解決方案
- 什么時候會重復(fù)消費
- 如何解決
- 問題三:保證消息順序的解決方案
- 單一隊列和單一消費者模式(RabbitMQ)
- 問題四:消息堆積的解決方案
- 消息堆積原因
- 預(yù)防措施
- 已出事故的解決措施
RabbitMQ常見問題解決方案
問題一:消息丟失的解決方案
首先明確一條消息的傳送流程:生產(chǎn)者->MQ->消費者
所以這三個節(jié)點都可能丟失數(shù)據(jù)
(1)生成者丟失消息
丟失的情景
發(fā)送消息過程中出現(xiàn)網(wǎng)絡(luò)問題:生產(chǎn)者以為發(fā)送成功,但RabbitMQ server沒有收到
解決方案1:發(fā)送方確認機制(推薦,最常用)
發(fā)送方確認機制最大的好處在于它是異步的,等信道返回ark確認的同時繼續(xù)發(fā)送下一條消息(不會堵塞其他消息的發(fā)送)
(一)修改application.properties配置
# 確認消息已發(fā)送到交換機(Exchange)
spring.rabbitmq.publisher-confirms=true #舊版本
spring.rabbitmq.publisher-confirm-type=correlated #新版本
# 確認消息已發(fā)送到隊列(Queue)
spring.rabbitmq.publisher-returns=true
springBoot 2.2.0.RELEASE版本之前 是使用 spring.rabbitmq.publisher-confirms=true
在2.2.0及之后 使用spring.rabbitmq.publisher-confirm-type=correlated 屬性配置代替
(二)新建配置文件RabbitTemplate
對于 發(fā)送確認 寫法有多種方式,以下的是其中的一種方式
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//setMandatory設(shè)置表示:消息在沒有被隊列接收時是否應(yīng)該被退回給生產(chǎn)者(true:退回;false:丟棄)。//通常與yml配置文件中的publisher-returns配合一起使用,若不配置該項,setReutrnCallback將不會有消息返回rabbitTemplate.setMandatory(true);//幫助生產(chǎn)者判斷 確認消息是否成功發(fā)送到RabbitMQ//ack 為true表示已發(fā)送成功 false表示發(fā)送失敗rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {System.out.println("ConfirmCallback: "+"相關(guān)數(shù)據(jù):"+correlationData);System.out.println("ConfirmCallback: "+"確認情況:"+ack);System.out.println("ConfirmCallback: "+"原因:"+cause);});//當(dāng)消息無法 放到隊列里面時 返回的提醒rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("ReturnCallback: "+"消息:"+message);System.out.println("ReturnCallback: "+"回應(yīng)碼:"+replyCode);System.out.println("ReturnCallback: "+"回應(yīng)信息:"+replyText);System.out.println("ReturnCallback: "+"交換機:"+exchange);System.out.println("ReturnCallback: "+"路由鍵:"+routingKey);});return rabbitTemplate;}
}
解決方案2:事務(wù)(不推薦,因為性能差)
RabbitMQ提供的事務(wù)功能,在生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟RabbitMQ事務(wù)
(2)MQ丟失消息
丟失的情景
RabbitMQ服務(wù)端接收到消息后由于服務(wù)器宕機或重啟等原因(消息默認存在內(nèi)存中)導(dǎo)致消息丟失;
解決方案:開啟RabbitMQ的持久化+開啟鏡像隊列
RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化、消息的持久化
三者 都 持久化 才能保證 RabbitMQ服務(wù)重啟之后,消息才能存在且能發(fā)出去
交換機持久化
交換機持久化描述的是當(dāng)這個交換機上沒有注冊隊列時,這個交換機是否刪除。
如果要打開持久化的話也很簡單 (上面列子都是有體現(xiàn)的)
//定義直接交換機
@Bean
public DirectExchange directExchange() {//第一個參數(shù):定義交換機的名稱,第二個參數(shù):是否持久化,第三個參數(shù):是否自動刪除return new DirectExchange("directExchange", true, false);
}
隊列持久化
隊列持久化描述的是當(dāng)這個隊列沒有消費者在監(jiān)聽時,是否進行刪除。
持久化做法:
//定義隊列
@Bean
public Queue directQueue() {//第一個參數(shù):隊列的名稱,第二個參數(shù):是否持久化return new Queue("directQueue", true);
}
消息持久化
關(guān)鍵配置 持久化(MessageDeliveryMode.PERSISTENT)
@Test
public void testDurableMessage() {// 1.準備消息Message message = MessageBuilder.withBody("hello, rabbitmq".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.消息ID,封裝到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.發(fā)送消息rabbitTemplate.convertAndSend("simple.queue", message, correlationData);log.info("發(fā)送消息成功");
}
(3)消費者丟失消息
丟失的情景 1
RabbitMQ服務(wù)端向消費者發(fā)送完消息之后,網(wǎng)絡(luò)斷了,消息并沒有到達消費者
解決方案:無需解決
無需解決。因為此情景下服務(wù)端收不到確認消息,會再次發(fā)送的。
丟失的情景 2
啟用了重試機制,重試指定次數(shù)之后,還沒成功,但消息被確認。
擴展:重試機制
重試機制的三大前提
- 重試模式已啟用:通過配置 spring.rabbitmq.listener.simple.retry.enabled=true 來啟用重試模式。
- 拋出了異常:在 @RabbitListener 標注的方法中拋出了異常,通常是 RuntimeException 或 Error。
Spring AMQP 會捕獲這些異常并根據(jù)配置的重試策略來重試消息。- 未達到最大重試次數(shù):消息的重試次數(shù)尚未達到配置的最大值(spring.rabbitmq.listener.simple.retry.maxAttempts)。
配置以下即可實現(xiàn)重試操作
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 重試次數(shù)(默認3次)
spring.rabbitmq.listener.simple.retry.max-attempts=5
解決方案:消費者方確認機制(推薦,最常用)
改成手動后就 可以實現(xiàn) “先操作業(yè)務(wù)邏輯(數(shù)據(jù)庫操作)后,再手動從隊列上刪除這個消息” 的動作
其中“從隊列上刪除這個消息“這個動作體現(xiàn)就是 使用 channel.basicAck 去完成的。
切記改成手動后,這個channel.basicAck方法一定要寫。
(一)修改application.properties配置
# 設(shè)置消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(二)修改Service接收信息項
當(dāng)消息在進入 emailProcess、smsProcess(被@RabbitListener注解) 方法時就已經(jīng)被視為“接收到了”,但是需要 你 執(zhí)行 channel.basicAck(手動確認)才能讓這個消息從隊列上刪除。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class DirectReceiver {@RabbitHandler@RabbitListener(queues = "emailQueue") //監(jiān)聽的隊列名稱public void emailProcess(Channel channel, Message message) throws IOException {try{System.out.println(new String(message.getBody(),"UTF-8"));//TODO 具體業(yè)務(wù).......//你使用手動消息確認模式時,basicAck 一定要執(zhí)行,不然會導(dǎo)致會保留在隊列中,無法被消費//第1個參數(shù)表示消息投遞序號//第2個參數(shù)false只確認當(dāng)前一個消息收到(大多數(shù)情況下都設(shè)置為false),true確認所有consumer獲得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//若是消息沒有成功接收,第二個參數(shù)設(shè)置為true的話,代表重新放回隊列中,false則為丟棄,在此也可以做成放置死信隊列的操作channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
確認和拒絕消息:
- basicAck: 這個方法用于確認消息已被成功處理。
第一個參數(shù)是消息的delivery tag(用于標識消息),
第二個參數(shù)指定是否批量確認(false
表示只確認當(dāng)前消息)。- basicReject: 這個方法用于拒絕消息。
第一個參數(shù)同樣是delivery tag,
第二個參數(shù)指定是否將消息重新放回隊列(false
表示不重新放回,即丟棄消息)。
方法解釋:
- emailProcess: 這個方法監(jiān)聽
emailQueue
隊列。
當(dāng)隊列中有消息時,它會打印出消息的內(nèi)容,并嘗試確認消息。
如果處理過程中發(fā)生異常,它會拒絕消息,但不會重新放回隊列(第二個參數(shù)為false
)。
問題二:消息重復(fù)的解決方案
什么時候會重復(fù)消費
1.自動提交模式時
消費者收到消息后,要自動提交,但提交后,網(wǎng)絡(luò)出故障,RabbitMQ服務(wù)器沒收到提交消息,那么此消息會被重新放入隊列,會再次發(fā)給消費者。
2.手動提交模式時
情景1:網(wǎng)絡(luò)故障問題,同上。
情景2:接收到消息并處理結(jié)束了,此時消費者掛了,沒有手動提交消息。
總體來說就是:網(wǎng)絡(luò)不可達、消費端宕機。
如何解決
消費端處理消息的業(yè)務(wù)邏輯保持冪等性
比如你拿個數(shù)據(jù)要寫庫,先根據(jù)主鍵查一下,如果這數(shù)據(jù)有了,就別插入了,update 一下。
比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
問題三:保證消息順序的解決方案
單一隊列和單一消費者模式(RabbitMQ)
在RabbitMQ中,可以確保一個隊列只被一個消費者消費,這樣可以保證消息按照發(fā)送的順序被處理。
因為隊列本身就是一個先進先出的結(jié)構(gòu)。
適用場景:RabbitMQ用戶且對消息順序有嚴格要求的場景。
優(yōu)點:實現(xiàn)簡單,易于管理。
缺點:可能成為性能瓶頸,在處理大量消息時需要考慮擴展性。
問題四:消息堆積的解決方案
消息堆積原因
消息堆積即消息沒及時被消費,是生產(chǎn)者生產(chǎn)消息速度快于消費者消費的速度導(dǎo)致的。
消費者消費慢可能是因為:本身邏輯耗費時間較長、阻塞了。
預(yù)防措施
生產(chǎn)者
1.減少發(fā)布頻率
3.考慮使用隊列最大長度限制
消費者
1.優(yōu)化代碼
已出事故的解決措施
情況1:堆積的消息還需要使用
方案1:簡單修復(fù)
修復(fù) 消費者(consumer)的問題,讓他恢復(fù)消費速度,然后等待幾個小時消費完畢
方案2:復(fù)雜修復(fù)
單隊列消費轉(zhuǎn)變?yōu)槎嚓犃胁⑿邢M
也是需要先 修復(fù) 消費者(consumer)的問題,再進行下面的步驟
步驟 1: 隊列和路由設(shè)置
1.創(chuàng)建新隊列:在RabbitMQ中創(chuàng)建10個新隊列,每個隊列分配一個獨特的名稱。
2. 設(shè)置交換機:定義一個直連型(Direct)交換機。
3. 綁定路由鍵:將每個新隊列通過唯一的路由鍵綁定到直連型交換機上。
偽代碼例子:
// 假設(shè)這是配置類的一部分
@Bean
Queue queue1() { return new Queue("queue1", false);
}
@Bean
Queue queue2() { return new Queue("queue2", false);
}
// 以此類推,為其他9個隊列創(chuàng)建Bean
.........
@Bean
DirectExchange exchange() { return new DirectExchange("myExchange");
}
@Bean
Binding binding1(Queue queue1, DirectExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("routingKey1");
}
@Bean
Binding binding2(Queue queue2, DirectExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("routingKey2");
}
// 以此類推,為其他隊列和路由鍵創(chuàng)建綁定
......
步驟 2: 消息分發(fā)
1.接收堆積數(shù)據(jù):現(xiàn)有消費者(或分發(fā)者)接收從發(fā)送者處堆積的數(shù)據(jù)。
2.分發(fā)到新隊列:實現(xiàn)分發(fā)邏輯,將接收到的消息根據(jù)路由鍵分發(fā)到相應(yīng)的10個新隊列中。
偽代碼例子:
@RabbitListener(queues = "oldQueue")
public void emailProcess(Message message, Channel channel) throws IOException { try { // 生成1-10之間的順序數(shù) SequentialRandom sequentialRandom = new SequentialRandom()String key = sequentialRandom.getNextSequentialRandom();// 重新發(fā)送消息到交換機,交換機將根據(jù)routingKey將消息路由到正確的隊列 rabbitTemplate.convertAndSend("myExchange", "routingKey"+key, new String(message.getBody(),"UTF-8")); // 確認原始隊列中的消息(如果您想要的話) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 處理異常,可能包括記錄日志、發(fā)送警報等 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }
}
public class SequentialRandom { private int currentIndex = 1; // 初始索引為1 /** * 獲取下一個順序數(shù)* @return 下一個數(shù)字,從1到10循環(huán) */ public int getNextSequentialRandom() { int next = currentIndex; currentIndex = (currentIndex % 10) + 1; // 使用模運算實現(xiàn)循環(huán),并更新索引 return next; }
}
步驟 3: 并行消費
1.開發(fā)新消費端:編寫新的消費端程序,該程序能夠監(jiān)并處理來自10個新隊列的消息。
2. 部署并啟動:將新消費端程序部署到服務(wù)器,并啟動它以開始并行消費。
偽代碼例子:
@Component
public class ParallelConsumer { @RabbitListener(queues = {"queue1"}) public void receiveMessage1(Message message) { // 處理消息 } @RabbitListener(queues = {"queue2"}) public void receiveMessage2(Message message) { // 處理消息 } // ... @RabbitListener(queues = {"queue10"}) public void receiveMessage3(Message message) { // 處理消息 }
}
情況2:堆積的消息不需要使用
刪除消息即可。(可以在RabbitMQ控制臺刪除,或者使用命令)。