用凡科做的網(wǎng)站保存不了sem專員
目錄
- 確保消息的可靠性
- RabbitMQ 消息發(fā)送可靠性
- 分析
- 解決方案
- 開啟事務(wù)機(jī)制
- 發(fā)送方確認(rèn)機(jī)制
- 單條消息處理
- 消息批量處理
- 失敗重試
- 自帶重試機(jī)制
- 業(yè)務(wù)重試
- RabbitMQ 消息消費(fèi)可靠性
- 如何保證消息在隊(duì)列
- RabbitMQ 的消息消費(fèi),整體上來(lái)說有兩種不同的思路:
- 確保消費(fèi)成功兩種思路
- 消息確認(rèn)
- 自動(dòng)確認(rèn)
- 手動(dòng)確認(rèn)
- 推模式手動(dòng)確認(rèn)
- 拉模式手動(dòng)確認(rèn)
- 消息拒絕
- 總結(jié):如何保證消息的可靠性。
- 冪等性問題
- 背景
- 解決思路
- 代碼
確保消息的可靠性
先確定消息可能在哪些位置丟失—不同的位置可以有不同的解決方案
- 發(fā)送過程
- 從生產(chǎn)者到交換機(jī)
- 從交換機(jī)到隊(duì)列
- 消費(fèi)過程
- 消息在隊(duì)列中
- 消費(fèi)者消費(fèi)
RabbitMQ 消息發(fā)送可靠性
分析
-
目標(biāo)
- 消息成功到達(dá) Exchange
- 消息成功到達(dá) Queue
-
如果能確認(rèn)這兩步,那么我們就可以認(rèn)為消息發(fā)送成功了。
-
如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達(dá),此時(shí)我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達(dá),則可能就需要人工介入了。
-
經(jīng)過上面的分析,我們可以確認(rèn),要確保消息成功發(fā)送,我們只需要做好三件事就可以了:
- 確認(rèn)消息到達(dá) Exchange。
- 確認(rèn)消息到達(dá) Queue。
- 開啟定時(shí)任務(wù),定時(shí)投遞那些發(fā)送失敗的消息
解決方案
-
如何確保消息成功到達(dá) RabbitMQ?RabbitMQ 給出了兩種方案:
- 開啟事務(wù)機(jī)制
- 發(fā)送方確認(rèn)機(jī)制
-
這是兩種不同的方案,不可以同時(shí)開啟,只能選擇其中之一,如果兩者同時(shí)開啟,則會(huì)報(bào)如下錯(cuò)誤
開啟事務(wù)機(jī)制
-
事務(wù)管理器
@Configuration public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);} }
-
消息生產(chǎn)者:添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式
@Service public class MqService {@Resourceprivate RabbitTemplate rabbitTemplate;@Transactional //標(biāo)記事務(wù)public void send() {rabbitTemplate.setChannelTransacted(true);//開啟事務(wù)模式rabbitTemplate.convertAndSend("mq_exchange_name","mq_queue_name","hello rabbitmq!".getBytes());int i = 1 / 0;//運(yùn)行時(shí)必然拋出異常,我們可以嘗試運(yùn)行該方法,發(fā)現(xiàn)消息并未發(fā)送成功} }
當(dāng)我們開啟事務(wù)模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會(huì)多出四個(gè)步驟:
- 客戶端發(fā)出請(qǐng)求,將信道設(shè)置為事務(wù)模式。
- 服務(wù)端給出回復(fù),同意將信道設(shè)置為事務(wù)模式。
- 客戶端發(fā)送消息。
- 客戶端提交事務(wù)。
- 服務(wù)端給出響應(yīng),確認(rèn)事務(wù)提交。
上面的步驟,除了第三步是本來(lái)就有的,其他幾個(gè)步驟都是平白無(wú)故多出來(lái)的。所以大家看到,事務(wù)模式其實(shí)效率有點(diǎn)低,這并非一個(gè)最佳解決方案。我們可以想想,什么項(xiàng)目會(huì)用到消息中間件?一般來(lái)說都是一些高并發(fā)的項(xiàng)目,這個(gè)時(shí)候并發(fā)性能尤為重要。
所以,RabbitMQ 還提供了發(fā)送方確認(rèn)機(jī)制(publisher confirm)來(lái)確保消息發(fā)送成功,這種方式,性能要遠(yuǎn)遠(yuǎn)高于事務(wù)模式
發(fā)送方確認(rèn)機(jī)制
單條消息處理
-
配置文件:開啟消息發(fā)送方確認(rèn)機(jī)制
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào) # publisher-confirm-type有三個(gè)值 : # none:表示禁用發(fā)布確認(rèn)模式,默認(rèn)即此。 # correlated:表示成功發(fā)布消息到交換器后會(huì)觸發(fā)的回調(diào)方法。 # simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調(diào)用。
-
開啟兩個(gè)監(jiān)聽
/*** @author: zjl* @datetime: 2024/5/9* @desc:* 定義配置類,實(shí)現(xiàn) RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個(gè)接口,* 這兩個(gè)接口,前者的回調(diào)用來(lái)確定消息到達(dá)交換器,后者則會(huì)在消息路由到隊(duì)列失敗時(shí)被調(diào)用。* * 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,* 在該方法中為 rabbitTemplate 分別配置這兩個(gè) Callback。*/ @Configuration @Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {public static final String MQ_EXCHANGE_NAME = "mq_exchange_name";public static final String MQ_QUEUE_NAME = "mq_queue_name";@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic Queue queue() {return new Queue(MQ_QUEUE_NAME);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MQ_EXCHANGE_NAME);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(MQ_QUEUE_NAME);}@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("{}:消息成功到達(dá)交換器",correlationData.getId());}else{log.error("{}:消息發(fā)送失敗", correlationData.getId());}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("{}:消息未成功路由到隊(duì)列",message.getMessageProperties().getMessageId());} }
-
測(cè)試
首先嘗試將消息發(fā)送到一個(gè)不存在的交換機(jī)中@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";} }
給定一個(gè)真實(shí)存在的交換器,但是給一個(gè)不存在的隊(duì)列
@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME,"RabbitConfig.MQ_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";} }
可以看到,消息雖然成功達(dá)到交換器了,但是沒有成功路由到隊(duì)列(因?yàn)殛?duì)列不存在)
消息批量處理
- 如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的,這里不再贅述。
- 這就是 publisher-confirm 模式。相比于事務(wù),這種模式下的消息吞吐量會(huì)得到極大的提升
失敗重試
- 失敗重試分兩種情況,一種是壓根沒找到 MQ 導(dǎo)致的失敗重試,另一種是找到 MQ 了,但是消息發(fā)送失敗了
自帶重試機(jī)制
- 前面所說的事務(wù)機(jī)制和發(fā)送方確認(rèn)機(jī)制,都是發(fā)送方確認(rèn)消息發(fā)送成功的辦法。
- 如果發(fā)送方一開始就連不上 MQ,那么 Spring Boot 中也有相應(yīng)的重試機(jī)制,但是這個(gè)重試機(jī)制就和 MQ 本身沒有關(guān)系了,這是利用 Spring 中的 retry 機(jī)制來(lái)完成的
-
配置
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào)template: retry:enabled: true # 開啟重試機(jī)制initial-interval: 1000ms # 重試起始間隔時(shí)間max-attempts: 10 # 最大重試次數(shù)max-interval: 10000ms # 最大重試間隔時(shí)間multiplier: 2 # 間隔時(shí)間乘數(shù)。(這里配置間隔時(shí)間乘數(shù)為 2,則第一次間隔時(shí)間 1 秒,第二次重試間隔時(shí)間 2 秒,第三次 4 秒,以此類推)
-
再次啟動(dòng) Spring Boot 項(xiàng)目,然后關(guān)掉 MQ,此時(shí)嘗試發(fā)送消息,就會(huì)發(fā)送失敗,進(jìn)而導(dǎo)致自動(dòng)重試
業(yè)務(wù)重試
- 業(yè)務(wù)重試主要是針對(duì)消息沒有到達(dá)交換機(jī)的情況
- 如果消息沒有成功到達(dá)交換機(jī),此時(shí)就會(huì)觸發(fā)消息發(fā)送失敗回調(diào),我們可以利用起來(lái)這個(gè)回調(diào)
- 下面說一下整體思路
-
準(zhǔn)備數(shù)據(jù)庫(kù)表
DROP TABLE IF EXISTS `service_msg_mq_info`; CREATE TABLE `service_msg_mq_info` (`msgid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`empid` int(11) NULL DEFAULT NULL,`status` int(11) NULL DEFAULT NULL,`routekey` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`count` int(11) NULL DEFAULT NULL,`trytime` datetime NULL DEFAULT NULL,`createtime` datetime NULL DEFAULT NULL,`updatetime` datetime NULL DEFAULT NULL,PRIMARY KEY (`msgid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
status:表示消息的狀態(tài),有三個(gè)取值,0,1,2 分別表示消息發(fā)送中、消息發(fā)送成功以及消息發(fā)送失敗。
tryTime:表示消息的第一次重試時(shí)間(消息發(fā)出去之后,在 tryTime 這個(gè)時(shí)間點(diǎn)還未顯示發(fā)送成功,此時(shí)就可以開始重試了)。
count:表示消息重試次數(shù)。
-
每次發(fā)送消息的時(shí)候,就往數(shù)據(jù)庫(kù)中添加一條記錄
-
在消息發(fā)送的時(shí)候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后
-
在消息發(fā)送的時(shí)候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后
-
另外開啟一個(gè)定時(shí)任務(wù),定時(shí)任務(wù)每隔 10s 就去數(shù)據(jù)庫(kù)中撈一次消息,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時(shí)間記錄,把這些消息拎出來(lái)后,首先判斷其重試次數(shù)是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發(fā)送失敗,并且不再重試。對(duì)于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息,并且為其 count 的值+1
當(dāng)然這種思路有兩個(gè)弊端:
- 去數(shù)據(jù)庫(kù)走一遭,可能拖慢 MQ 的 Qos,不過有的時(shí)候我們并不需要 MQ 有很高的 Qos,所以這個(gè)應(yīng)用時(shí)要看具體情況。
- 按照上面的思路,可能會(huì)出現(xiàn)同一條消息重復(fù)發(fā)送的情況,不過這都不是事,我們?cè)谙⑾M(fèi)時(shí),解決好冪等性問題就行了。
當(dāng)然,大家也要注意,消息是否要確保 100% 發(fā)送成功,也要看具體情況。
RabbitMQ 消息消費(fèi)可靠性
如何保證消息在隊(duì)列
- 隊(duì)列持久化—》創(chuàng)建的時(shí)候設(shè)置持久化
- 搭建rabbitmq集群–保證高可用
RabbitMQ 的消息消費(fèi),整體上來(lái)說有兩種不同的思路:
-
推(push):MQ 主動(dòng)將消息推送給消費(fèi)者,這種方式需要消費(fèi)者設(shè)置一個(gè)緩沖區(qū)去緩存消息,對(duì)于消費(fèi)者而言,內(nèi)存中總是有一堆需要處理的消息,所以這種方式的效率比較高,這也是目前大多數(shù)應(yīng)用采用的消費(fèi)方式。這種方式通過 @RabbitListener 注解去標(biāo)記消費(fèi)者,如以下代碼,當(dāng)監(jiān)聽的隊(duì)列中有消息時(shí),就會(huì)觸發(fā)該方法
@Component public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void handle(String msg) {System.out.println("msg = " + msg);} }
-
拉(pull):消費(fèi)者主動(dòng)從 MQ 拉取消息,這種方式效率并不是很高,不過有的時(shí)候如果服務(wù)端需要批量拉取消息,倒是可以采用這種方式
@Test public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);System.out.println("o = " + new String(((byte[]) o),"UTF-8")); }
- 調(diào)用 receiveAndConvert 方法,方法參數(shù)為隊(duì)列名稱,
- 方法執(zhí)行完成后,會(huì)從 MQ 上拉取一條消息下來(lái),如果該方法返回值為 null,表示該隊(duì)列上沒有消息了。
- receiveAndConvert 方法有一個(gè)重載方法,可以在重載方法中傳入一個(gè)等待超時(shí)時(shí)間,例如 3 秒。
- 此時(shí),假設(shè)隊(duì)列中沒有消息了,則 receiveAndConvert 方法會(huì)阻塞 3 秒,3 秒內(nèi)如果隊(duì)列中有了新消息就返回,3 秒后如果隊(duì)列中還是沒有新消息,就返回 null,這個(gè)等待超時(shí)時(shí)間要是不設(shè)置的話,默認(rèn)為 0
-
這是消息兩種不同的消費(fèi)模式
-
如果需要從消息隊(duì)列中持續(xù)獲得消息,就可以使用推模式;
-
如果只是單純的消費(fèi)一條消息,則使用拉模式即可。
-
切忌將拉模式放到一個(gè)死循環(huán)中,變相的訂閱消息,這會(huì)嚴(yán)重影響 RabbitMQ 的性能
確保消費(fèi)成功兩種思路
- 為了保證消息能夠可靠的到達(dá)消息消費(fèi)者,RabbitMQ 中提供了消息消費(fèi)確認(rèn)機(jī)制。
- 當(dāng)消費(fèi)者去消費(fèi)消息的時(shí)候,可以通過指定 autoAck 參數(shù)來(lái)表示消息消費(fèi)的確認(rèn)方式
- 當(dāng) autoAck 為 false 的時(shí)候,此時(shí)即使消費(fèi)者已經(jīng)收到消息了,RabbitMQ 也不會(huì)立馬將消息移除,而是等待消費(fèi)者顯式的回復(fù)確認(rèn)信號(hào)后,才會(huì)將消息打上刪除標(biāo)記,然后再刪除。
- 當(dāng) autoAck 為 true 的時(shí)候,此時(shí)消息消費(fèi)者就會(huì)自動(dòng)把發(fā)送出去的消息設(shè)置為確認(rèn),然后將消息移除(從內(nèi)存或者磁盤中),即使這些消息并沒有到達(dá)消費(fèi)者。
- 屬性解釋
- Ready 表示待消費(fèi)的消息數(shù)量。
- Unacked 表示已經(jīng)發(fā)送給消費(fèi)者但是還沒收到消費(fèi)者 ack 的消息數(shù)量。
- 當(dāng)我們將 autoAck 設(shè)置為 false 的時(shí)候,對(duì)于 RabbitMQ 而言,消費(fèi)分成了兩個(gè)部分:
- 待消費(fèi)的消息
- 已經(jīng)投遞給消費(fèi)者,但是還沒有被消費(fèi)者確認(rèn)的消息
- 換句話說,當(dāng)設(shè)置 autoAck 為 false 的時(shí)候,消費(fèi)者就變得非常從容了,它將有足夠的時(shí)間去處理這條消息,當(dāng)消息正常處理完成后,再手動(dòng) ack,此時(shí) RabbitMQ 才會(huì)認(rèn)為這條消息消費(fèi)成功了。
- 如果 RabbitMQ 一直沒有收到客戶端的反饋,并且此時(shí)客戶端也已經(jīng)斷開連接了,那么 RabbitMQ 就會(huì)將剛剛的消息重新放回隊(duì)列中,等待下一次被消費(fèi)。
綜上所述,確保消息被成功消費(fèi),無(wú)非就是手動(dòng) Ack 或者自動(dòng) Ack,無(wú)他。當(dāng)然,無(wú)論這兩種中的哪一種,最終都有可能導(dǎo)致消息被重復(fù)消費(fèi),所以一般來(lái)說我們還需要在處理消息時(shí),解決冪等性問題。
消息確認(rèn)
自動(dòng)確認(rèn)
- 在 Spring Boot 中,默認(rèn)情況下,消息消費(fèi)就是自動(dòng)確認(rèn)的
- 通過 @Componet 注解將當(dāng)前類注入到 Spring 容器中,然后通過 @RabbitListener 注解來(lái)標(biāo)記一個(gè)消息消費(fèi)方法
- 默認(rèn)情況下,消息消費(fèi)方法自帶事務(wù),即如果該方法在執(zhí)行過程中拋出異常,那么被消費(fèi)的消息會(huì)重新回到隊(duì)列中等待下一次被消費(fèi)
- 如果該方法正常執(zhí)行完沒有拋出異常,則這條消息就算是被消費(fèi)了
@Component public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void receive1(String msg) {System.out.println("msg = " + msg);int i = 1 / 0;} }
手動(dòng)確認(rèn)
-
配置:修改為手動(dòng)確認(rèn)模式
server:port: 8888 spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào)template:retry:enabled: trueinitial-interval: 1000msmax-attempts: 10max-interval: 10000msmultiplier: 2listener:simple:acknowledge-mode: manual
推模式手動(dòng)確認(rèn)
- 將消費(fèi)者要做的事情放到一個(gè) try…catch 代碼塊中。
- 如果消息正常消費(fèi)成功,則執(zhí)行 basicAck 完成確認(rèn)。
- 如果消息消費(fèi)失敗,則執(zhí)行 basicNack 方法,告訴 RabbitMQ 消息消費(fèi)失敗。
@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME) public void receive1(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消息消費(fèi)的代碼寫到這里String s = new String(message.getBody());System.out.println("s = " + s);//消費(fèi)完成后,手動(dòng) ackchannel.basicAck(deliveryTag, false);} catch (Exception e) {//手動(dòng) nacktry {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}} }
- 這里涉及到兩個(gè)方法:
- basicAck:這個(gè)是手動(dòng)確認(rèn)消息已經(jīng)成功消費(fèi),該方法有兩個(gè)參數(shù):
- 第一個(gè)參數(shù)表示消息的 id;
- 第二個(gè)參數(shù) multiple 如果為 false,表示僅確認(rèn)當(dāng)前消息消費(fèi)成功,如果為 true,則表示當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息都消費(fèi)成功。
- basicNack:這個(gè)是告訴 RabbitMQ 當(dāng)前消息未被成功消費(fèi),該方法有三個(gè)參數(shù):
- 第一個(gè)參數(shù)表示消息的 id;
- 第二個(gè)參數(shù) multiple 如果為 false,表示僅拒絕當(dāng)前消息的消費(fèi),如果為 true,則表示拒絕當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息;
- 第三個(gè)參數(shù) requeue 含義和前面所說的一樣,被拒絕的消息是否重新入隊(duì)。
- 當(dāng) basicNack 中最后一個(gè)參數(shù)設(shè)置為 false 的時(shí)候,還涉及到一個(gè)死信隊(duì)列的問題
- basicAck:這個(gè)是手動(dòng)確認(rèn)消息已經(jīng)成功消費(fèi),該方法有兩個(gè)參數(shù):
拉模式手動(dòng)確認(rèn)
- 拉模式手動(dòng) ack 比較麻煩一些,在 Spring 中封裝的 RabbitTemplate 中并未找到對(duì)應(yīng)的方法,所以我們得用原生的辦法
- 這里涉及到的 basicAck 和 basicNack 方法跟前面的一樣
public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);long deliveryTag = 0L;try {GetResponse getResponse = channel.basicGet(RabbitConfig.MQ_QUEUE_NAME, false);deliveryTag = getResponse.getEnvelope().getDeliveryTag();System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));channel.basicAck(deliveryTag, false);} catch (IOException e) {try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}
消息拒絕
- 當(dāng)客戶端收到消息時(shí),可以選擇消費(fèi)這條消息,也可以選擇拒絕這條消息
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void receive2(Channel channel, Message message) {//獲取消息編號(hào)long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//拒絕消息channel.basicReject(deliveryTag, true);} catch (IOException e) {e.printStackTrace();}}
}
-
消費(fèi)者收到消息之后,可以選擇拒絕消費(fèi)該條消息,拒絕的步驟分兩步
- 獲取消息編號(hào) deliveryTag。
- 調(diào)用 basicReject 方法拒絕消息。
-
調(diào)用 basicReject 方法時(shí),第二個(gè)參數(shù)是 requeue,即是否重新入隊(duì)。
-
如果第二個(gè)參數(shù)為 true,則這條被拒絕的消息會(huì)重新進(jìn)入到消息隊(duì)列中,等待下一次被消費(fèi);
-
如果第二個(gè)參數(shù)為 false,則這條被拒絕的消息就會(huì)被丟掉,不會(huì)有新的消費(fèi)者去消費(fèi)它了。
-
需要注意的是,basicReject 方法一次只能拒絕一條消息
總結(jié):如何保證消息的可靠性。
- 設(shè)置confirm和returning機(jī)制
- 設(shè)置隊(duì)列和交互機(jī)的持久化
- 搭建rabbitMQ服務(wù)集群
- 消費(fèi)者改為手動(dòng)確認(rèn)機(jī)制。
冪等性問題
背景
- 消費(fèi)者在消費(fèi)完一條消息后,向 RabbitMQ 發(fā)送一個(gè) ack 確認(rèn),
- 此時(shí)由于網(wǎng)絡(luò)斷開或者其他原因?qū)е?RabbitMQ 并沒有收到這個(gè) ack,
- 那么此時(shí) RabbitMQ 并不會(huì)將該條消息刪除
- 當(dāng)重新建立起連接后,消費(fèi)者還是會(huì)再次收到該條消息,這就造成了消息的重復(fù)消費(fèi)。
- 同時(shí),由于類似的原因,消息在發(fā)送的時(shí)候,同一條消息也可能會(huì)發(fā)送兩次
解決思路
- 采用 Redis,在消費(fèi)者消費(fèi)消息之前,現(xiàn)將消息的 id 放到 Redis 中,存儲(chǔ)方式如下:
- id-0(正在執(zhí)行業(yè)務(wù))
- id-1(執(zhí)行業(yè)務(wù)成功)
- 如果 ack 失敗,在 RabbitMQ 將消息交給其他的消費(fèi)者時(shí),先執(zhí)行 setnx,如果 key 已經(jīng)存在(說明之前有人消費(fèi)過該消息),獲取他的值,如果是 0,當(dāng)前消費(fèi)者就什么都不做,如果是 1,直接 ack。
- 極端情況:第一個(gè)消費(fèi)者在執(zhí)行業(yè)務(wù)時(shí),出現(xiàn)了死鎖,在 setnx 的基礎(chǔ)上,再給 key 設(shè)置一個(gè)生存時(shí)間。生產(chǎn)者,發(fā)送消息時(shí),指定 messageId
代碼
-
添加redis依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>]
-
添加redis配置
redis:host: localhostport: 6379password: 123456timeout: 3000msdatabase: 0
-
配置類
@Configuration @Slf4j public class RabbitConfig{public final static String DIRECTNAME = "mq-direct";@Beanpublic Queue queue() {return new Queue("hello-queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with("direct");} }
-
生產(chǎn)者
@RestController public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//攜帶信息發(fā)送CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.DIRECTNAME,"direct","message",messageId);return "send success";} }
-
消費(fèi)者
package cn.smbms.consumer;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.io.IOException; import java.util.concurrent.TimeUnit;/*** @author: zjl* @datetime: 2024/5/9* @desc: */ @Component public class DirectReceiver {@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = "hello-queue")public void getMassage(String msg, Channel channel, Message message) throws IOException {//1、獲取messageIDString messageID = message.getMessageProperties().getHeader("spring_returned_message_correlation");//2、用redis的setnx()方法放入值 放入成功返回true 放入失敗返回falseif (stringRedisTemplate.opsForValue().setIfAbsent(messageID, "0", 10, TimeUnit.SECONDS)) {//3、消費(fèi)消息System.out.println("接收到消息:" + msg);//4、設(shè)置value值為1stringRedisTemplate.opsForValue().set(messageID, "1",10,TimeUnit.SECONDS);//5、手動(dòng)ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {//6、如果放入值失敗 獲取messageID對(duì)應(yīng)的valueString s = stringRedisTemplate.opsForValue().get(messageID);//7、value=0 什么都不做if ("0".equalsIgnoreCase(s)) {return;//8、value=1 手動(dòng)ack} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}} }