中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

域名查詢whois臨沂網(wǎng)站建設(shè)優(yōu)化

域名查詢whois,臨沂網(wǎng)站建設(shè)優(yōu)化,阿里巴巴武漢網(wǎng)站建設(shè),北京優(yōu)化詞網(wǎng)站RabbitMQ消息確認(rèn) SpringBoot與RabbitMQ整合后,對(duì)RabbitClient的“確認(rèn)”進(jìn)行了封裝、使用方式與RabbitMQ官網(wǎng)不一致; 消息發(fā)布確認(rèn) 生產(chǎn)者給交換機(jī)發(fā)送消息后、若是不管了,則會(huì)出現(xiàn)消息丟失; 解決方案1: 交換機(jī)接受…

RabbitMQ消息確認(rèn)

SpringBoot與RabbitMQ整合后,對(duì)RabbitClient的“確認(rèn)”進(jìn)行了封裝、使用方式與RabbitMQ官網(wǎng)不一致;

消息發(fā)布確認(rèn)

  • 生產(chǎn)者給交換機(jī)發(fā)送消息后、若是不管了,則會(huì)出現(xiàn)消息丟失;
    • 解決方案1: 交換機(jī)接受到消息、給生產(chǎn)者一個(gè)答復(fù)ack, 若生產(chǎn)者沒有收到ack, 可能出現(xiàn)消息丟失,因此重新發(fā)送消息;
  • 解決方案1隱藏問題:若是交換機(jī)發(fā)送了ack, 出現(xiàn)網(wǎng)絡(luò)延遲,則生產(chǎn)者沒有收到ack, 就會(huì)出現(xiàn)消息重復(fù)發(fā)送問題, 進(jìn)而衍生冪等性問題;
    • 隱藏問題解決方案1:在數(shù)據(jù)庫(kù)中增加一張去重表,設(shè)置唯一索引; 生產(chǎn)者在消息內(nèi)容中,翻入唯一ID,消費(fèi)者消費(fèi)時(shí)、先從數(shù)據(jù)庫(kù)查詢是否存在,存在則不處理該消息;
      • 適用于并發(fā)低、業(yè)務(wù)嚴(yán)謹(jǐn)?shù)膱?chǎng)景
    • 隱藏問題解決方案2:利用Redis的String的setnx,若key存在,則不處理、若key存在,則執(zhí)行業(yè)務(wù);
      • 適用于短時(shí)間處理大量消息,且 key不會(huì)重復(fù);
        -這就是大名鼎鼎的冪等性問題,賊討厭這些專有名詞;

業(yè)務(wù)開發(fā)中的冪等性

  1. 前端保存數(shù)據(jù)時(shí)、點(diǎn)擊多次保存按鈕,插入多條數(shù)據(jù);
    • 解決方案 :前端限制按鈕點(diǎn)擊、 數(shù)據(jù)庫(kù)設(shè)置業(yè)務(wù)唯一索引;
  2. 消息推送中,可能出現(xiàn)多條內(nèi)容一樣的消息,又不可以重復(fù)處理 ,需要冪等性處理;
    上家公司中,后臺(tái)給app客戶端推送系統(tǒng)消息時(shí)、配置給所有用戶推送消息, 其他服務(wù)給我的應(yīng)用消息服務(wù)推送 RabbitMQ消息, 正常來說, 每次推送的消息, 設(shè)備ID和用戶ID合起來唯一的,結(jié)果其他服務(wù)業(yè)務(wù)數(shù)據(jù)存在問題,有些舊數(shù)據(jù)沒有清除, 導(dǎo)致相通的設(shè)備ID,用戶ID, 一次給設(shè)備用戶推送了十幾條,安卓客戶端當(dāng)當(dāng)當(dāng)?shù)捻?#xff0c; 直接驚動(dòng)了產(chǎn)品經(jīng)理; 經(jīng)過排查上,是上游數(shù)據(jù)有問題,代碼又很老,其他服務(wù)負(fù)責(zé)人排查了好幾天, 把問題數(shù)據(jù)清楚了, 結(jié)果后面又產(chǎn)生了問題數(shù)據(jù);
    • 解決方案:由于會(huì)一次性處理幾萬條推送消息,因此對(duì)業(yè)務(wù)要求速度高,因此利用Redis的String的setNx, 以taskId + mobileDevId + userId + tenantId 組成了唯一key,若是存在,則不處理; key有限時(shí)間為60分鐘, 就成功處理了該問題;
    • 吐槽:上游的業(yè)務(wù)問題,讓下游服務(wù)做業(yè)務(wù)保證,屬實(shí)離譜;

{
“taskId” :“xxxx”;
“mobileDevId” : “xxxx”;
“userId”:“xxx”;
“tenantId” : “xxx”;
“其他字段”: “…”
}

RabbitMQ發(fā)布確認(rèn)與返回

SpringBoot發(fā)布確認(rèn)與返回

配置:
第二個(gè)參數(shù)因?yàn)檫^時(shí),所以要配置第三個(gè)參數(shù)為correlated,表示用來確認(rèn)消息;

#生產(chǎn)者
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

生產(chǎn)者:

  1. 通過RabbitTempldate#setConfirmCallback設(shè)置確認(rèn)回調(diào), 即交換器發(fā)送給ack給生產(chǎn)者,生產(chǎn)者調(diào)用ConfirmCallback回調(diào), 若出現(xiàn)異常cause,則可重新推送;
    通過RabbitTempldate#setReturnCallback設(shè)置返回回調(diào);
  2. 通過template#waitForConfirms(xxx)表示等待xxx毫秒后確認(rèn),超時(shí)返回false;
    • 若返回false, 則進(jìn)行業(yè)務(wù)補(bǔ)救處理;
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange confirmExchange;AtomicInteger index = new AtomicInteger(0);AtomicInteger count = new AtomicInteger(0);private final String[] keys = {"sms", "mail"};@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() throws IOException {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");}@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send2() throws IOException {template.invoke((operations) -> {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());//必須設(shè)置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");return template.waitForConfirms(1000);});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("receive confirm callback, ack = " + ack);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("receive return call : message:" + message.getBody());System.out.println("receive return call : replyCode:" + replyCode);System.out.println("receive return call : replyText:" + replyText);System.out.println("receive return call : exchange:" + exchange);System.out.println("receive return call : routingKey:" + routingKey);}
}

業(yè)務(wù)開發(fā)中,非嚴(yán)謹(jǐn),追求性能高業(yè)務(wù)建議使用send,這個(gè)過程是異步確認(rèn)的;
嚴(yán)謹(jǐn)業(yè)務(wù)建議使用send2, 同步等待相應(yīng),出現(xiàn)問題好確認(rèn);

交換機(jī):

@Configuration
public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME = "confirmQueue";public final static String CONFIRM_EXCHANGE_NAME = "confirmExchange";public final static String CONFIRM_ROUTING_NAME = "confirm";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();}
}

運(yùn)行結(jié)果:

receive status : true

RabbitMQ發(fā)布確認(rèn)

若是使用原生Rabbit MQ客戶端API,則有三種方式:

  1. 聲明channel是需要交換機(jī)確認(rèn)的
channel.confirmSelect();
  1. 發(fā)布單條消息
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}

channel.waitForConfirmsOrDie(5_000);這個(gè)方法就會(huì)在channel端等待RabbitMQ給出一個(gè)響應(yīng),用來表明這個(gè)消息已經(jīng)正確發(fā)送到了RabbitMQ服務(wù)端。但是要注意,這個(gè)方法會(huì)同步阻塞channel,在等待確認(rèn)期間,channel將不能再繼續(xù)發(fā)送消息,也就是說會(huì)明顯降低集群的發(fā)送速度即吞吐量。

官方說明了,其實(shí)channel底層是異步工作的,會(huì)將channel阻塞住,然后異步等待服務(wù)端發(fā)送一個(gè)確認(rèn)消息,才解除阻塞。但是我們?cè)谑褂脮r(shí),可以把他當(dāng)作一個(gè)同步工具來看待。利用一個(gè)異步轉(zhuǎn)同步功能,可利用JUC實(shí)現(xiàn);

然后如果到了超時(shí)時(shí)間,還沒有收到服務(wù)端的確認(rèn)機(jī)制,那就會(huì)拋出異常。然后通常處理這個(gè)異常的方式是記錄錯(cuò)誤日志或者嘗試重發(fā)消息,但是嘗試重發(fā)時(shí)一定要注意不要使程序陷入死循環(huán)。

  1. 發(fā)送批量消息
    條確認(rèn)的機(jī)制會(huì)對(duì)系統(tǒng)的吞吐量造成很大的影響,所以稍微中和一點(diǎn)的方式就是發(fā)送一批消息后,再一起確認(rèn)
   int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}

存在隱藏問題:若是500條消息處理太久,超時(shí)了,則響應(yīng)失敗,消息重新入隊(duì)、出現(xiàn)重新消費(fèi)問題;

  1. 異步確認(rèn)消息
    實(shí)現(xiàn)的方式也比較簡(jiǎn)單,Producer在channel中注冊(cè)監(jiān)聽器來對(duì)消息進(jìn)行確認(rèn)。核心代碼就是一個(gè):
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);

這三種確認(rèn)機(jī)制都能夠提升Producer發(fā)送消息的安全性。通常情況下,第三種異步確認(rèn)機(jī)制的性能是最好的。第一種安全性最高。

消費(fèi)者確認(rèn)

當(dāng)交換機(jī)接受消息后,就要轉(zhuǎn)發(fā)給消費(fèi)者;如何保證消息不丟失?重復(fù)消費(fèi)?

SpringBoot消費(fèi)確認(rèn)

自動(dòng)確認(rèn)
  • 若是業(yè)務(wù)中,一些消息發(fā)送給消費(fèi)者,若是消息出現(xiàn)異常,消費(fèi)者返回通知交換機(jī)消息出現(xiàn)了異常,交換機(jī)會(huì)將消息重新入隊(duì);

  • 若是沒有確認(rèn)消息,交換機(jī)沒有收到消息,會(huì)將消息會(huì)重新放入隊(duì)列中,每次消費(fèi)者啟動(dòng)都會(huì)把以前消費(fèi)的消息重新消費(fèi);

  • SpringBoot整合RabbitMQ后, 設(shè)置參數(shù)max-attempts為最大重試次數(shù)、retry.enabled為開啟重試機(jī)制;

spring.rabbitmq.listener.direct.retry.max-attempts=5
spring.rabbitmq.listener.direct.retry.enabled=true

  • 為什么要開啟重試?
    不開啟重試、消費(fèi)者處理消息發(fā)生異常后, RabbitMQ會(huì)丟棄該消息, 通常業(yè)務(wù)開發(fā)中,是不允許的。

  • 為什么設(shè)置重試次數(shù)?
    不開啟重試次數(shù),則消息會(huì)一直重新入隊(duì),占用內(nèi)存,若是錯(cuò)誤消息過多,RabbitMQ內(nèi)存爆了;

手動(dòng)確認(rèn)

在手動(dòng)確認(rèn)的模式下,不管是消費(fèi)成功還是消費(fèi)失敗,一定要記得確認(rèn)消息,不然消息會(huì)一直處于unack狀態(tài),直到消費(fèi)者進(jìn)程重啟或者停止。
設(shè)置參數(shù):

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費(fèi)者:
通過使用channel#basicAck, basicNack, basicReject完成;

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public class ConfirmConsumer {//    @RabbitHandler : 標(biāo)記的方法只能有一個(gè)參數(shù),類型為String ,若是傳Map參數(shù)、則需要傳入map參數(shù)// @RabbitListener:標(biāo)記的方法可以傳入Channel, Message參數(shù)@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);System.out.println("消息ID : " + message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒絕確認(rèn)消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒絕消息
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublic void listenObjectQueue2(Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);}}
  • 確認(rèn)收到一個(gè)或多個(gè)消息

    • void basicAck(long deliveryTag, boolean multiple) throws IOException;
    • deliveryTag :消息傳遞標(biāo)識(shí)
    • multiple:是否批量確認(rèn),為true,則確認(rèn)后,其他消息deliveryTag小于當(dāng)前消息的deliveryTag的消息全部變?yōu)榇_認(rèn);(慎重)
  • 拒絕一個(gè)或多個(gè)消息:

    • void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    • deliveryTag:消息的傳遞標(biāo)識(shí)。
    • multiple: 如果為true,則拒絕所有consumer獲得的小于deliveryTag的消息。(慎重)
    • requeue: 設(shè)置為true 會(huì)把消費(fèi)失敗的消息從新添加到隊(duì)列的尾端,設(shè)置為false不會(huì)重新回到隊(duì)列。
  • 拒絕一個(gè)消息:

    • void basicReject(long deliveryTag, boolean requeue) throws IOException;
    • deliveryTag:消息的傳遞標(biāo)識(shí)。
    • requeue: 設(shè)置為false 表示不再重新入隊(duì),如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列。
  • channel.basicNack 與 channel.basicReject 的區(qū)別在于basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。

個(gè)人還是推薦手動(dòng)確認(rèn),可控性更高;
SpringBoot消費(fèi)者手動(dòng)確認(rèn)調(diào)用的API與RabbitMQClient原生API一致,都是通過這三個(gè)方法完成確認(rèn)操作;

業(yè)務(wù)開發(fā)中,@RabbitHandler注解用的少,因?yàn)樽⒔鈽?biāo)記的方法只能傳入 消息內(nèi)容參數(shù), 無法傳Channel, Message, 獲取到的消息有限, 而@RabbitListener則相反;

http://www.risenshineclean.com/news/6573.html

相關(guān)文章:

  • 完善企業(yè)網(wǎng)站建設(shè)最經(jīng)典的營(yíng)銷案例
  • 海曙網(wǎng)站制作網(wǎng)絡(luò)熱詞大全
  • 網(wǎng)站設(shè)計(jì)摘要 優(yōu)幫云外鏈?zhǔn)珍浘W(wǎng)站
  • 個(gè)人求職網(wǎng)站如何做官網(wǎng)站內(nèi)推廣內(nèi)容
  • 做外貿(mào)需要哪些網(wǎng)站有哪些成都營(yíng)銷型網(wǎng)站制作
  • 網(wǎng)站banner效果自己建立網(wǎng)站步驟
  • 政府網(wǎng)站建設(shè) 托管西安seo診斷
  • 微網(wǎng)站怎么做微名片seo網(wǎng)站排名
  • 寶安網(wǎng)站建設(shè)哪家好站長(zhǎng)之家工具高清
  • 成華區(qū)建設(shè)局人民防空辦網(wǎng)站百度網(wǎng)站提交
  • Wordpress 建站 軟件文章代寫
  • 設(shè)計(jì)專業(yè)所需網(wǎng)站溫州seo網(wǎng)站建設(shè)
  • 個(gè)人做企業(yè) 網(wǎng)站百度極速版免費(fèi)下載安裝
  • 宿遷做網(wǎng)站哪家公司好官方百度下載安裝
  • 博物館網(wǎng)站建設(shè)目的google play官網(wǎng)入口
  • 做網(wǎng)站最好的公司互聯(lián)網(wǎng)營(yíng)銷師怎么做
  • 網(wǎng)站建設(shè)計(jì)劃方案模板下載淘寶怎么做引流和推廣
  • 網(wǎng)站建設(shè)和優(yōu)化日照seo公司
  • 狀元村建設(shè)官方網(wǎng)站站內(nèi)營(yíng)銷推廣途徑
  • 免費(fèi)電商網(wǎng)站建設(shè)平臺(tái)千鋒教育培訓(xùn)
  • bbin網(wǎng)站開發(fā)服務(wù)器租用
  • 網(wǎng)站文章編輯器游戲推廣話術(shù)技巧
  • 網(wǎng)站有哪些類型收錄優(yōu)美圖片官網(wǎng)
  • 網(wǎng)站建設(shè)主要內(nèi)容包括登封網(wǎng)絡(luò)推廣
  • 金昌八冶建設(shè)集團(tuán)官方網(wǎng)站免費(fèi)推廣app軟件下載
  • 上線了做網(wǎng)站多少錢百度收錄排名
  • 阿里云建網(wǎng)站流程谷歌seo優(yōu)化公司
  • 做動(dòng)漫的游戲 迅雷下載網(wǎng)站專業(yè)seo站長(zhǎng)工具
  • 做外貿(mào)英文網(wǎng)站哪家好深圳剛剛突然宣布
  • 域名訪問網(wǎng)站應(yīng)該怎么做如何創(chuàng)建網(wǎng)站平臺(tái)