中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

用凡科做的網(wǎng)站保存不了sem專員

用凡科做的網(wǎng)站保存不了,sem專員,花垣網(wǎng)站建設(shè),信譽(yù)好的武漢網(wǎng)站建設(shè)目錄 確保消息的可靠性RabbitMQ 消息發(fā)送可靠性分析解決方案開啟事務(wù)機(jī)制發(fā)送方確認(rèn)機(jī)制單條消息處理消息批量處理 失敗重試自帶重試機(jī)制業(yè)務(wù)重試 RabbitMQ 消息消費(fèi)可靠性如何保證消息在隊(duì)列RabbitMQ 的消息消費(fèi),整體上來(lái)說有兩種不同的思路:確保消費(fèi)成…

目錄

  • 確保消息的可靠性
    • RabbitMQ 消息發(fā)送可靠性
      • 分析
      • 解決方案
        • 開啟事務(wù)機(jī)制
        • 發(fā)送方確認(rèn)機(jī)制
          • 單條消息處理
          • 消息批量處理
      • 失敗重試
        • 自帶重試機(jī)制
        • 業(yè)務(wù)重試
    • RabbitMQ 消息消費(fèi)可靠性
      • 如何保證消息在隊(duì)列
      • RabbitMQ 的消息消費(fèi),整體上來(lái)說有兩種不同的思路:
      • 確保消費(fèi)成功兩種思路
      • 消息確認(rèn)
        • 自動(dòng)確認(rèn)
        • 手動(dòng)確認(rèn)
        • 推模式手動(dòng)確認(rèn)
        • 拉模式手動(dòng)確認(rèn)
      • 消息拒絕
      • 總結(jié):如何保證消息的可靠性。
    • 冪等性問題
      • 背景
      • 解決思路
      • 代碼

確保消息的可靠性

在這里插入圖片描述
先確定消息可能在哪些位置丟失—不同的位置可以有不同的解決方案

  • 發(fā)送過程
    • 從生產(chǎn)者到交換機(jī)
    • 從交換機(jī)到隊(duì)列
  • 消費(fèi)過程
    • 消息在隊(duì)列中
    • 消費(fèi)者消費(fèi)

RabbitMQ 消息發(fā)送可靠性

分析

  • 目標(biāo)

    1. 消息成功到達(dá) Exchange
    2. 消息成功到達(dá) Queue
  • 如果能確認(rèn)這兩步,那么我們就可以認(rèn)為消息發(fā)送成功了。

  • 如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達(dá),此時(shí)我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達(dá),則可能就需要人工介入了。

  • 經(jīng)過上面的分析,我們可以確認(rèn),要確保消息成功發(fā)送,我們只需要做好三件事就可以了:

    1. 確認(rèn)消息到達(dá) Exchange。
    2. 確認(rèn)消息到達(dá) Queue。
    3. 開啟定時(shí)任務(wù),定時(shí)投遞那些發(fā)送失敗的消息

解決方案

  • 如何確保消息成功到達(dá) RabbitMQ?RabbitMQ 給出了兩種方案:

    1. 開啟事務(wù)機(jī)制
    2. 發(fā)送方確認(rèn)機(jī)制
  • 這是兩種不同的方案,不可以同時(shí)開啟,只能選擇其中之一,如果兩者同時(shí)開啟,則會(huì)報(bào)如下錯(cuò)誤
    在這里插入圖片描述

開啟事務(wù)機(jī)制
  1. 事務(wù)管理器

    @Configuration
    public class RabbitConfig {@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
    }
    
  2. 消息生產(chǎn)者:添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式

    @Service
    public class MqService {@Resourceprivate RabbitTemplate rabbitTemplate;@Transactional //標(biāo)記事務(wù)public void send() {rabbitTemplate.setChannelTransacted(true);//開啟事務(wù)模式rabbitTemplate.convertAndSend("mq_exchange_name","mq_queue_name","hello rabbitmq!".getBytes());int i = 1 / 0;//運(yùn)行時(shí)必然拋出異常,我們可以嘗試運(yùn)行該方法,發(fā)現(xiàn)消息并未發(fā)送成功}
    }
    

當(dāng)我們開啟事務(wù)模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會(huì)多出四個(gè)步驟:

  1. 客戶端發(fā)出請(qǐng)求,將信道設(shè)置為事務(wù)模式。
  2. 服務(wù)端給出回復(fù),同意將信道設(shè)置為事務(wù)模式。
  3. 客戶端發(fā)送消息。
  4. 客戶端提交事務(wù)。
  5. 服務(wù)端給出響應(yīng),確認(rèn)事務(wù)提交。

上面的步驟,除了第三步是本來(lái)就有的,其他幾個(gè)步驟都是平白無(wú)故多出來(lái)的。所以大家看到,事務(wù)模式其實(shí)效率有點(diǎn)低,這并非一個(gè)最佳解決方案。我們可以想想,什么項(xiàng)目會(huì)用到消息中間件?一般來(lái)說都是一些高并發(fā)的項(xiàng)目,這個(gè)時(shí)候并發(fā)性能尤為重要。

所以,RabbitMQ 還提供了發(fā)送方確認(rèn)機(jī)制(publisher confirm)來(lái)確保消息發(fā)送成功,這種方式,性能要遠(yuǎn)遠(yuǎn)高于事務(wù)模式

發(fā)送方確認(rèn)機(jī)制
單條消息處理
  1. 配置文件:開啟消息發(fā)送方確認(rèn)機(jī)制

    server:port: 8888
    spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào)
    # publisher-confirm-type有三個(gè)值 :
    #    none:表示禁用發(fā)布確認(rèn)模式,默認(rèn)即此。
    #    correlated:表示成功發(fā)布消息到交換器后會(huì)觸發(fā)的回調(diào)方法。
    #    simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調(diào)用。
    
  2. 開啟兩個(gè)監(jiān)聽

    /*** @author: zjl* @datetime: 2024/5/9* @desc:*     定義配置類,實(shí)現(xiàn) RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個(gè)接口,*     這兩個(gè)接口,前者的回調(diào)用來(lái)確定消息到達(dá)交換器,后者則會(huì)在消息路由到隊(duì)列失敗時(shí)被調(diào)用。*     *     定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,*     在該方法中為 rabbitTemplate 分別配置這兩個(gè) Callback。*/
    @Configuration
    @Slf4j
    public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {public static final String MQ_EXCHANGE_NAME = "mq_exchange_name";public static final String MQ_QUEUE_NAME = "mq_queue_name";@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic Queue queue() {return new Queue(MQ_QUEUE_NAME);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MQ_EXCHANGE_NAME);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(MQ_QUEUE_NAME);}@PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("{}:消息成功到達(dá)交換器",correlationData.getId());}else{log.error("{}:消息發(fā)送失敗", correlationData.getId());}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error("{}:消息未成功路由到隊(duì)列",message.getMessageProperties().getMessageId());}
    }
    
  3. 測(cè)試
    首先嘗試將消息發(fā)送到一個(gè)不存在的交換機(jī)中

    @RestController
    public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";}
    }
    

    在這里插入圖片描述給定一個(gè)真實(shí)存在的交換器,但是給一個(gè)不存在的隊(duì)列

    @RestController
    public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//rabbitTemplate.convertAndSend("RabbitConfig.MQ_EXCHANGE_NAME", RabbitConfig.MQ_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME,"RabbitConfig.MQ_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));return "send success";}
    }
    

    可以看到,消息雖然成功達(dá)到交換器了,但是沒有成功路由到隊(duì)列(因?yàn)殛?duì)列不存在)在這里插入圖片描述

消息批量處理
  • 如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的,這里不再贅述。
  • 這就是 publisher-confirm 模式。相比于事務(wù),這種模式下的消息吞吐量會(huì)得到極大的提升

失敗重試

  • 失敗重試分兩種情況,一種是壓根沒找到 MQ 導(dǎo)致的失敗重試,另一種是找到 MQ 了,但是消息發(fā)送失敗了
自帶重試機(jī)制
  • 前面所說的事務(wù)機(jī)制和發(fā)送方確認(rèn)機(jī)制,都是發(fā)送方確認(rèn)消息發(fā)送成功的辦法。
  • 如果發(fā)送方一開始就連不上 MQ,那么 Spring Boot 中也有相應(yīng)的重試機(jī)制,但是這個(gè)重試機(jī)制就和 MQ 本身沒有關(guān)系了,這是利用 Spring 中的 retry 機(jī)制來(lái)完成的
  1. 配置

    server:port: 8888
    spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào)template: retry:enabled: true # 開啟重試機(jī)制initial-interval: 1000ms # 重試起始間隔時(shí)間max-attempts: 10 # 最大重試次數(shù)max-interval: 10000ms # 最大重試間隔時(shí)間multiplier: 2 # 間隔時(shí)間乘數(shù)。(這里配置間隔時(shí)間乘數(shù)為 2,則第一次間隔時(shí)間 1 秒,第二次重試間隔時(shí)間 2 秒,第三次 4 秒,以此類推)
    
  2. 再次啟動(dòng) Spring Boot 項(xiàng)目,然后關(guān)掉 MQ,此時(shí)嘗試發(fā)送消息,就會(huì)發(fā)送失敗,進(jìn)而導(dǎo)致自動(dòng)重試
    在這里插入圖片描述

業(yè)務(wù)重試
  • 業(yè)務(wù)重試主要是針對(duì)消息沒有到達(dá)交換機(jī)的情況
  • 如果消息沒有成功到達(dá)交換機(jī),此時(shí)就會(huì)觸發(fā)消息發(fā)送失敗回調(diào),我們可以利用起來(lái)這個(gè)回調(diào)
  • 下面說一下整體思路
  1. 準(zhǔn)備數(shù)據(jù)庫(kù)表

    DROP TABLE IF EXISTS `service_msg_mq_info`;
    CREATE TABLE `service_msg_mq_info`  (`msgid` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`empid` int(11) NULL DEFAULT NULL,`status` int(11) NULL DEFAULT NULL,`routekey` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`count` int(11) NULL DEFAULT NULL,`trytime` datetime NULL DEFAULT NULL,`createtime` datetime NULL DEFAULT NULL,`updatetime` datetime NULL DEFAULT NULL,PRIMARY KEY (`msgid`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
    

    status:表示消息的狀態(tài),有三個(gè)取值,0,1,2 分別表示消息發(fā)送中、消息發(fā)送成功以及消息發(fā)送失敗。

    tryTime:表示消息的第一次重試時(shí)間(消息發(fā)出去之后,在 tryTime 這個(gè)時(shí)間點(diǎn)還未顯示發(fā)送成功,此時(shí)就可以開始重試了)。

    count:表示消息重試次數(shù)。

  2. 每次發(fā)送消息的時(shí)候,就往數(shù)據(jù)庫(kù)中添加一條記錄在這里插入圖片描述

  3. 在消息發(fā)送的時(shí)候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后

  4. 在消息發(fā)送的時(shí)候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后

  5. 另外開啟一個(gè)定時(shí)任務(wù),定時(shí)任務(wù)每隔 10s 就去數(shù)據(jù)庫(kù)中撈一次消息,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時(shí)間記錄,把這些消息拎出來(lái)后,首先判斷其重試次數(shù)是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發(fā)送失敗,并且不再重試。對(duì)于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息,并且為其 count 的值+1

當(dāng)然這種思路有兩個(gè)弊端:

  • 去數(shù)據(jù)庫(kù)走一遭,可能拖慢 MQ 的 Qos,不過有的時(shí)候我們并不需要 MQ 有很高的 Qos,所以這個(gè)應(yīng)用時(shí)要看具體情況。
  • 按照上面的思路,可能會(huì)出現(xiàn)同一條消息重復(fù)發(fā)送的情況,不過這都不是事,我們?cè)谙⑾M(fèi)時(shí),解決好冪等性問題就行了。

當(dāng)然,大家也要注意,消息是否要確保 100% 發(fā)送成功,也要看具體情況。

RabbitMQ 消息消費(fèi)可靠性

如何保證消息在隊(duì)列

  1. 隊(duì)列持久化—》創(chuàng)建的時(shí)候設(shè)置持久化
  2. 搭建rabbitmq集群–保證高可用

RabbitMQ 的消息消費(fèi),整體上來(lái)說有兩種不同的思路:

  • 推(push):MQ 主動(dòng)將消息推送給消費(fèi)者,這種方式需要消費(fèi)者設(shè)置一個(gè)緩沖區(qū)去緩存消息,對(duì)于消費(fèi)者而言,內(nèi)存中總是有一堆需要處理的消息,所以這種方式的效率比較高,這也是目前大多數(shù)應(yīng)用采用的消費(fèi)方式。這種方式通過 @RabbitListener 注解去標(biāo)記消費(fèi)者,如以下代碼,當(dāng)監(jiān)聽的隊(duì)列中有消息時(shí),就會(huì)觸發(fā)該方法

    @Component
    public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void handle(String msg) {System.out.println("msg = " + msg);}
    }
    
  • 拉(pull):消費(fèi)者主動(dòng)從 MQ 拉取消息,這種方式效率并不是很高,不過有的時(shí)候如果服務(wù)端需要批量拉取消息,倒是可以采用這種方式

    @Test
    public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
    }
    
    • 調(diào)用 receiveAndConvert 方法,方法參數(shù)為隊(duì)列名稱,
    • 方法執(zhí)行完成后,會(huì)從 MQ 上拉取一條消息下來(lái),如果該方法返回值為 null,表示該隊(duì)列上沒有消息了。
    • receiveAndConvert 方法有一個(gè)重載方法,可以在重載方法中傳入一個(gè)等待超時(shí)時(shí)間,例如 3 秒。
    • 此時(shí),假設(shè)隊(duì)列中沒有消息了,則 receiveAndConvert 方法會(huì)阻塞 3 秒,3 秒內(nèi)如果隊(duì)列中有了新消息就返回,3 秒后如果隊(duì)列中還是沒有新消息,就返回 null,這個(gè)等待超時(shí)時(shí)間要是不設(shè)置的話,默認(rèn)為 0
  • 這是消息兩種不同的消費(fèi)模式

  • 如果需要從消息隊(duì)列中持續(xù)獲得消息,就可以使用推模式;

  • 如果只是單純的消費(fèi)一條消息,則使用拉模式即可。

  • 切忌將拉模式放到一個(gè)死循環(huán)中,變相的訂閱消息,這會(huì)嚴(yán)重影響 RabbitMQ 的性能

確保消費(fèi)成功兩種思路

  • 為了保證消息能夠可靠的到達(dá)消息消費(fèi)者,RabbitMQ 中提供了消息消費(fèi)確認(rèn)機(jī)制。
  • 當(dāng)消費(fèi)者去消費(fèi)消息的時(shí)候,可以通過指定 autoAck 參數(shù)來(lái)表示消息消費(fèi)的確認(rèn)方式
    • 當(dāng) autoAck 為 false 的時(shí)候,此時(shí)即使消費(fèi)者已經(jīng)收到消息了,RabbitMQ 也不會(huì)立馬將消息移除,而是等待消費(fèi)者顯式的回復(fù)確認(rèn)信號(hào)后,才會(huì)將消息打上刪除標(biāo)記,然后再刪除。
    • 當(dāng) autoAck 為 true 的時(shí)候,此時(shí)消息消費(fèi)者就會(huì)自動(dòng)把發(fā)送出去的消息設(shè)置為確認(rèn),然后將消息移除(從內(nèi)存或者磁盤中),即使這些消息并沒有到達(dá)消費(fèi)者。

在這里插入圖片描述- 屬性解釋

  • Ready 表示待消費(fèi)的消息數(shù)量。
  • Unacked 表示已經(jīng)發(fā)送給消費(fèi)者但是還沒收到消費(fèi)者 ack 的消息數(shù)量。
  • 當(dāng)我們將 autoAck 設(shè)置為 false 的時(shí)候,對(duì)于 RabbitMQ 而言,消費(fèi)分成了兩個(gè)部分:
    • 待消費(fèi)的消息
    • 已經(jīng)投遞給消費(fèi)者,但是還沒有被消費(fèi)者確認(rèn)的消息
  • 換句話說,當(dāng)設(shè)置 autoAck 為 false 的時(shí)候,消費(fèi)者就變得非常從容了,它將有足夠的時(shí)間去處理這條消息,當(dāng)消息正常處理完成后,再手動(dòng) ack,此時(shí) RabbitMQ 才會(huì)認(rèn)為這條消息消費(fèi)成功了。
  • 如果 RabbitMQ 一直沒有收到客戶端的反饋,并且此時(shí)客戶端也已經(jīng)斷開連接了,那么 RabbitMQ 就會(huì)將剛剛的消息重新放回隊(duì)列中,等待下一次被消費(fèi)。

綜上所述,確保消息被成功消費(fèi),無(wú)非就是手動(dòng) Ack 或者自動(dòng) Ack,無(wú)他。當(dāng)然,無(wú)論這兩種中的哪一種,最終都有可能導(dǎo)致消息被重復(fù)消費(fèi),所以一般來(lái)說我們還需要在處理消息時(shí),解決冪等性問題。

消息確認(rèn)

自動(dòng)確認(rèn)
  • 在 Spring Boot 中,默認(rèn)情況下,消息消費(fèi)就是自動(dòng)確認(rèn)的
  • 通過 @Componet 注解將當(dāng)前類注入到 Spring 容器中,然后通過 @RabbitListener 注解來(lái)標(biāo)記一個(gè)消息消費(fèi)方法
  • 默認(rèn)情況下,消息消費(fèi)方法自帶事務(wù),即如果該方法在執(zhí)行過程中拋出異常,那么被消費(fèi)的消息會(huì)重新回到隊(duì)列中等待下一次被消費(fèi)
  • 如果該方法正常執(zhí)行完沒有拋出異常,則這條消息就算是被消費(fèi)了
    @Component
    public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)public void receive1(String msg) {System.out.println("msg = " + msg);int i = 1 / 0;}
    }
    
手動(dòng)確認(rèn)
  1. 配置:修改為手動(dòng)確認(rèn)模式

    server:port: 8888
    spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到達(dá)交換器的確認(rèn)回調(diào)publisher-returns: true #配置消息到達(dá)隊(duì)列的回調(diào)template:retry:enabled: trueinitial-interval: 1000msmax-attempts: 10max-interval: 10000msmultiplier: 2listener:simple:acknowledge-mode: manual
    
推模式手動(dòng)確認(rèn)
  • 將消費(fèi)者要做的事情放到一個(gè) try…catch 代碼塊中。
  • 如果消息正常消費(fèi)成功,則執(zhí)行 basicAck 完成確認(rèn)。
  • 如果消息消費(fèi)失敗,則執(zhí)行 basicNack 方法,告訴 RabbitMQ 消息消費(fèi)失敗。
    @RabbitListener(queues = RabbitConfig.MQ_QUEUE_NAME)
    public void receive1(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消息消費(fèi)的代碼寫到這里String s = new String(message.getBody());System.out.println("s = " + s);//消費(fèi)完成后,手動(dòng) ackchannel.basicAck(deliveryTag, false);} catch (Exception e) {//手動(dòng) nacktry {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
    }
    
  • 這里涉及到兩個(gè)方法:
    • basicAck:這個(gè)是手動(dòng)確認(rèn)消息已經(jīng)成功消費(fèi),該方法有兩個(gè)參數(shù):
      • 第一個(gè)參數(shù)表示消息的 id;
      • 第二個(gè)參數(shù) multiple 如果為 false,表示僅確認(rèn)當(dāng)前消息消費(fèi)成功,如果為 true,則表示當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息都消費(fèi)成功。
    • basicNack:這個(gè)是告訴 RabbitMQ 當(dāng)前消息未被成功消費(fèi),該方法有三個(gè)參數(shù):
      • 第一個(gè)參數(shù)表示消息的 id;
      • 第二個(gè)參數(shù) multiple 如果為 false,表示僅拒絕當(dāng)前消息的消費(fèi),如果為 true,則表示拒絕當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息;
      • 第三個(gè)參數(shù) requeue 含義和前面所說的一樣,被拒絕的消息是否重新入隊(duì)。
      • 當(dāng) basicNack 中最后一個(gè)參數(shù)設(shè)置為 false 的時(shí)候,還涉及到一個(gè)死信隊(duì)列的問題
拉模式手動(dòng)確認(rèn)
  • 拉模式手動(dòng) ack 比較麻煩一些,在 Spring 中封裝的 RabbitTemplate 中并未找到對(duì)應(yīng)的方法,所以我們得用原生的辦法
  • 這里涉及到的 basicAck 和 basicNack 方法跟前面的一樣
public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);long deliveryTag = 0L;try {GetResponse getResponse = channel.basicGet(RabbitConfig.MQ_QUEUE_NAME, false);deliveryTag = getResponse.getEnvelope().getDeliveryTag();System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));channel.basicAck(deliveryTag, false);} catch (IOException e) {try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}

消息拒絕

  • 當(dāng)客戶端收到消息時(shí),可以選擇消費(fèi)這條消息,也可以選擇拒絕這條消息
@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)public void receive2(Channel channel, Message message) {//獲取消息編號(hào)long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//拒絕消息channel.basicReject(deliveryTag, true);} catch (IOException e) {e.printStackTrace();}}
}
  • 消費(fèi)者收到消息之后,可以選擇拒絕消費(fèi)該條消息,拒絕的步驟分兩步

    • 獲取消息編號(hào) deliveryTag。
    • 調(diào)用 basicReject 方法拒絕消息。
  • 調(diào)用 basicReject 方法時(shí),第二個(gè)參數(shù)是 requeue,即是否重新入隊(duì)。

  • 如果第二個(gè)參數(shù)為 true,則這條被拒絕的消息會(huì)重新進(jìn)入到消息隊(duì)列中,等待下一次被消費(fèi);

  • 如果第二個(gè)參數(shù)為 false,則這條被拒絕的消息就會(huì)被丟掉,不會(huì)有新的消費(fèi)者去消費(fèi)它了。

  • 需要注意的是,basicReject 方法一次只能拒絕一條消息

總結(jié):如何保證消息的可靠性。

  1. 設(shè)置confirm和returning機(jī)制
  2. 設(shè)置隊(duì)列和交互機(jī)的持久化
  3. 搭建rabbitMQ服務(wù)集群
  4. 消費(fèi)者改為手動(dòng)確認(rèn)機(jī)制。

冪等性問題

背景

  • 消費(fèi)者在消費(fèi)完一條消息后,向 RabbitMQ 發(fā)送一個(gè) ack 確認(rèn),
  • 此時(shí)由于網(wǎng)絡(luò)斷開或者其他原因?qū)е?RabbitMQ 并沒有收到這個(gè) ack,
  • 那么此時(shí) RabbitMQ 并不會(huì)將該條消息刪除
  • 當(dāng)重新建立起連接后,消費(fèi)者還是會(huì)再次收到該條消息,這就造成了消息的重復(fù)消費(fèi)。
  • 同時(shí),由于類似的原因,消息在發(fā)送的時(shí)候,同一條消息也可能會(huì)發(fā)送兩次

解決思路

  • 采用 Redis,在消費(fèi)者消費(fèi)消息之前,現(xiàn)將消息的 id 放到 Redis 中,存儲(chǔ)方式如下:
    • id-0(正在執(zhí)行業(yè)務(wù))
    • id-1(執(zhí)行業(yè)務(wù)成功)
  • 如果 ack 失敗,在 RabbitMQ 將消息交給其他的消費(fèi)者時(shí),先執(zhí)行 setnx,如果 key 已經(jīng)存在(說明之前有人消費(fèi)過該消息),獲取他的值,如果是 0,當(dāng)前消費(fèi)者就什么都不做,如果是 1,直接 ack。
  • 極端情況:第一個(gè)消費(fèi)者在執(zhí)行業(yè)務(wù)時(shí),出現(xiàn)了死鎖,在 setnx 的基礎(chǔ)上,再給 key 設(shè)置一個(gè)生存時(shí)間。生產(chǎn)者,發(fā)送消息時(shí),指定 messageId

代碼

  1. 添加redis依賴

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>]
    
  2. 添加redis配置

      redis:host: localhostport: 6379password: 123456timeout: 3000msdatabase: 0
    
  3. 配置類

    @Configuration
    @Slf4j
    public class RabbitConfig{public final static String DIRECTNAME = "mq-direct";@Beanpublic Queue queue() {return new Queue("hello-queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with("direct");}
    }
    
  4. 生產(chǎn)者

    @RestController
    public class SendController {@Resourceprivate RabbitTemplate rabbitTemplate;;@RequestMapping("/send")public String send() {//攜帶信息發(fā)送CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.DIRECTNAME,"direct","message",messageId);return "send success";}
    }
    
  5. 消費(fèi)者

    package cn.smbms.consumer;import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;import javax.annotation.Resource;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;/*** @author: zjl* @datetime: 2024/5/9* @desc: */
    @Component
    public class DirectReceiver {@Resourceprivate StringRedisTemplate stringRedisTemplate;@RabbitListener(queues = "hello-queue")public void getMassage(String msg, Channel channel, Message message) throws IOException {//1、獲取messageIDString messageID = message.getMessageProperties().getHeader("spring_returned_message_correlation");//2、用redis的setnx()方法放入值 放入成功返回true 放入失敗返回falseif (stringRedisTemplate.opsForValue().setIfAbsent(messageID, "0", 10, TimeUnit.SECONDS)) {//3、消費(fèi)消息System.out.println("接收到消息:" + msg);//4、設(shè)置value值為1stringRedisTemplate.opsForValue().set(messageID, "1",10,TimeUnit.SECONDS);//5、手動(dòng)ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {//6、如果放入值失敗 獲取messageID對(duì)應(yīng)的valueString s = stringRedisTemplate.opsForValue().get(messageID);//7、value=0 什么都不做if ("0".equalsIgnoreCase(s)) {return;//8、value=1 手動(dòng)ack} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}}
    }
    
http://www.risenshineclean.com/news/43022.html

相關(guān)文章:

  • 世界局勢(shì)最新消息馮耀宗seo博客
  • 360云盤做 網(wǎng)站圖片服務(wù)器濟(jì)南網(wǎng)站建設(shè)哪家便宜
  • 中國(guó)自適應(yīng)網(wǎng)站建設(shè)朝陽(yáng)網(wǎng)站seo
  • 醫(yī)藥網(wǎng)站怎么做搜索引擎優(yōu)化是做什么的
  • 國(guó)務(wù)院政府網(wǎng)站集約化建設(shè)seo排名優(yōu)化工具
  • 求一個(gè)能用的網(wǎng)址網(wǎng)站搜索排名優(yōu)化價(jià)格
  • index 石家莊網(wǎng)站建設(shè)友鏈交易平臺(tái)源碼
  • 浙江龍游疫情最新消息搜索引擎排名優(yōu)化
  • 編寫 網(wǎng)站 語(yǔ)言海外免費(fèi)網(wǎng)站推廣有哪些
  • 網(wǎng)站開發(fā)工作描述百度搜索引擎介紹
  • 商用自適應(yīng)網(wǎng)站建設(shè)智能搜索引擎
  • 自助建站廣告發(fā)布天津推廣的平臺(tái)
  • 自適應(yīng)營(yíng)銷網(wǎng)站模板互聯(lián)網(wǎng)營(yíng)銷培訓(xùn)班
  • 建立soho公司網(wǎng)站什么是搜索引擎優(yōu)化推廣
  • 代做畢業(yè)項(xiàng)目哪個(gè)網(wǎng)站好新浪微輿情大數(shù)據(jù)平臺(tái)
  • 新泰網(wǎng)站制作公司北京seo收費(fèi)
  • 公眾號(hào)小程序二維碼怎么生成搜索引擎排名優(yōu)化
  • 專業(yè)格泰建站海外自媒體推廣
  • 怎么設(shè)計(jì)公司的網(wǎng)站模板無(wú)代碼免費(fèi)web開發(fā)平臺(tái)
  • 可視化信息 網(wǎng)站個(gè)人如何在百度做廣告
  • 網(wǎng)站網(wǎng)站制作400多少錢百度app大全
  • 網(wǎng)站如何留住用戶關(guān)鍵詞代發(fā)排名首頁(yè)
  • 自己制作wordpress主題成都網(wǎng)站seo性價(jià)比高
  • 儋州網(wǎng)站建設(shè)網(wǎng)站查詢域名解析
  • node.js 做網(wǎng)站全媒體運(yùn)營(yíng)師報(bào)名費(fèi)多少錢
  • 建站公司網(wǎng)站用什么好色目人
  • 天津網(wǎng)站建設(shè)報(bào)價(jià)登錄百度app
  • 做淘寶的批發(fā)網(wǎng)站有哪些百度sem
  • 日語(yǔ)網(wǎng)站建設(shè)需要注意什么seo快速排名網(wǎng)站優(yōu)化
  • 安徽省建設(shè)法治協(xié)會(huì)網(wǎng)站google play官網(wǎng)入口