域名查詢whois臨沂網(wǎng)站建設(shè)優(yōu)化
RabbitMQ消息確認(rèn)
SpringBoot與RabbitMQ整合后,對(duì)RabbitClient的“確認(rèn)”進(jìn)行了封裝、使用方式與RabbitMQ官網(wǎng)不一致;
消息發(fā)布確認(rèn)
- 生產(chǎn)者給交換機(jī)發(fā)送消息后、若是不管了,則會(huì)出現(xiàn)消息丟失;
- 解決方案1: 交換機(jī)接受到消息、給生產(chǎn)者一個(gè)答復(fù)ack, 若生產(chǎn)者沒有收到ack, 可能出現(xiàn)消息丟失,因此重新發(fā)送消息;
- 解決方案1隱藏問題:若是交換機(jī)發(fā)送了ack, 出現(xiàn)網(wǎng)絡(luò)延遲,則生產(chǎn)者沒有收到ack, 就會(huì)出現(xiàn)消息重復(fù)發(fā)送問題, 進(jìn)而衍生冪等性問題;
- 隱藏問題解決方案1:在數(shù)據(jù)庫(kù)中增加一張去重表,設(shè)置唯一索引; 生產(chǎn)者在消息內(nèi)容中,翻入唯一ID,消費(fèi)者消費(fèi)時(shí)、先從數(shù)據(jù)庫(kù)查詢是否存在,存在則不處理該消息;
- 適用于并發(fā)低、業(yè)務(wù)嚴(yán)謹(jǐn)?shù)膱?chǎng)景
- 隱藏問題解決方案2:利用Redis的String的setnx,若key存在,則不處理、若key存在,則執(zhí)行業(yè)務(wù);
- 適用于短時(shí)間處理大量消息,且 key不會(huì)重復(fù);
-這就是大名鼎鼎的冪等性問題,賊討厭這些專有名詞;
- 適用于短時(shí)間處理大量消息,且 key不會(huì)重復(fù);
- 隱藏問題解決方案1:在數(shù)據(jù)庫(kù)中增加一張去重表,設(shè)置唯一索引; 生產(chǎn)者在消息內(nèi)容中,翻入唯一ID,消費(fèi)者消費(fèi)時(shí)、先從數(shù)據(jù)庫(kù)查詢是否存在,存在則不處理該消息;
業(yè)務(wù)開發(fā)中的冪等性
- 前端保存數(shù)據(jù)時(shí)、點(diǎn)擊多次保存按鈕,插入多條數(shù)據(jù);
- 解決方案 :前端限制按鈕點(diǎn)擊、 數(shù)據(jù)庫(kù)設(shè)置業(yè)務(wù)唯一索引;
- 消息推送中,可能出現(xiàn)多條內(nèi)容一樣的消息,又不可以重復(fù)處理 ,需要冪等性處理;
上家公司中,后臺(tái)給app客戶端推送系統(tǒng)消息時(shí)、配置給所有用戶推送消息, 其他服務(wù)給我的應(yīng)用消息服務(wù)推送 RabbitMQ消息, 正常來說, 每次推送的消息, 設(shè)備ID和用戶ID合起來唯一的,結(jié)果其他服務(wù)業(yè)務(wù)數(shù)據(jù)存在問題,有些舊數(shù)據(jù)沒有清除, 導(dǎo)致相通的設(shè)備ID,用戶ID, 一次給設(shè)備用戶推送了十幾條,安卓客戶端當(dāng)當(dāng)當(dāng)?shù)捻?#xff0c; 直接驚動(dòng)了產(chǎn)品經(jīng)理; 經(jīng)過排查上,是上游數(shù)據(jù)有問題,代碼又很老,其他服務(wù)負(fù)責(zé)人排查了好幾天, 把問題數(shù)據(jù)清楚了, 結(jié)果后面又產(chǎn)生了問題數(shù)據(jù);- 解決方案:由于會(huì)一次性處理幾萬條推送消息,因此對(duì)業(yè)務(wù)要求速度高,因此利用Redis的String的setNx, 以taskId + mobileDevId + userId + tenantId 組成了唯一key,若是存在,則不處理; key有限時(shí)間為60分鐘, 就成功處理了該問題;
- 吐槽:上游的業(yè)務(wù)問題,讓下游服務(wù)做業(yè)務(wù)保證,屬實(shí)離譜;
{
“taskId” :“xxxx”;
“mobileDevId” : “xxxx”;
“userId”:“xxx”;
“tenantId” : “xxx”;
“其他字段”: “…”
}
RabbitMQ發(fā)布確認(rèn)與返回
SpringBoot發(fā)布確認(rèn)與返回
配置:
第二個(gè)參數(shù)因?yàn)檫^時(shí),所以要配置第三個(gè)參數(shù)為correlated,表示用來確認(rèn)消息;
#生產(chǎn)者
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
生產(chǎn)者:
- 通過RabbitTempldate#setConfirmCallback設(shè)置確認(rèn)回調(diào), 即交換器發(fā)送給ack給生產(chǎn)者,生產(chǎn)者調(diào)用ConfirmCallback回調(diào), 若出現(xiàn)異常cause,則可重新推送;
通過RabbitTempldate#setReturnCallback設(shè)置返回回調(diào); - 通過template#waitForConfirms(xxx)表示等待xxx毫秒后確認(rèn),超時(shí)返回false;
- 若返回false, 則進(jìn)行業(yè)務(wù)補(bǔ)救處理;
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange confirmExchange;AtomicInteger index = new AtomicInteger(0);AtomicInteger count = new AtomicInteger(0);private final String[] keys = {"sms", "mail"};@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() throws IOException {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");}@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send2() throws IOException {template.invoke((operations) -> {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());//必須設(shè)置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");return template.waitForConfirms(1000);});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("receive confirm callback, ack = " + ack);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("receive return call : message:" + message.getBody());System.out.println("receive return call : replyCode:" + replyCode);System.out.println("receive return call : replyText:" + replyText);System.out.println("receive return call : exchange:" + exchange);System.out.println("receive return call : routingKey:" + routingKey);}
}
業(yè)務(wù)開發(fā)中,非嚴(yán)謹(jǐn),追求性能高業(yè)務(wù)建議使用send,這個(gè)過程是異步確認(rèn)的;
嚴(yán)謹(jǐn)業(yè)務(wù)建議使用send2, 同步等待相應(yīng),出現(xiàn)問題好確認(rèn);
交換機(jī):
@Configuration
public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME = "confirmQueue";public final static String CONFIRM_EXCHANGE_NAME = "confirmExchange";public final static String CONFIRM_ROUTING_NAME = "confirm";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();}
}
運(yùn)行結(jié)果:
receive status : true
RabbitMQ發(fā)布確認(rèn)
若是使用原生Rabbit MQ客戶端API,則有三種方式:
- 聲明channel是需要交換機(jī)確認(rèn)的
channel.confirmSelect();
- 發(fā)布單條消息
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}
channel.waitForConfirmsOrDie(5_000);這個(gè)方法就會(huì)在channel端等待RabbitMQ給出一個(gè)響應(yīng),用來表明這個(gè)消息已經(jīng)正確發(fā)送到了RabbitMQ服務(wù)端。但是要注意,這個(gè)方法會(huì)同步阻塞channel,在等待確認(rèn)期間,channel將不能再繼續(xù)發(fā)送消息,也就是說會(huì)明顯降低集群的發(fā)送速度即吞吐量。
官方說明了,其實(shí)channel底層是異步工作的,會(huì)將channel阻塞住,然后異步等待服務(wù)端發(fā)送一個(gè)確認(rèn)消息,才解除阻塞。但是我們?cè)谑褂脮r(shí),可以把他當(dāng)作一個(gè)同步工具來看待。利用一個(gè)異步轉(zhuǎn)同步功能,可利用JUC實(shí)現(xiàn);
然后如果到了超時(shí)時(shí)間,還沒有收到服務(wù)端的確認(rèn)機(jī)制,那就會(huì)拋出異常。然后通常處理這個(gè)異常的方式是記錄錯(cuò)誤日志或者嘗試重發(fā)消息,但是嘗試重發(fā)時(shí)一定要注意不要使程序陷入死循環(huán)。
- 發(fā)送批量消息
條確認(rèn)的機(jī)制會(huì)對(duì)系統(tǒng)的吞吐量造成很大的影響,所以稍微中和一點(diǎn)的方式就是發(fā)送一批消息后,再一起確認(rèn)
int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}
存在隱藏問題:若是500條消息處理太久,超時(shí)了,則響應(yīng)失敗,消息重新入隊(duì)、出現(xiàn)重新消費(fèi)問題;
- 異步確認(rèn)消息
實(shí)現(xiàn)的方式也比較簡(jiǎn)單,Producer在channel中注冊(cè)監(jiān)聽器來對(duì)消息進(jìn)行確認(rèn)。核心代碼就是一個(gè):
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
這三種確認(rèn)機(jī)制都能夠提升Producer發(fā)送消息的安全性。通常情況下,第三種異步確認(rèn)機(jī)制的性能是最好的。第一種安全性最高。
消費(fèi)者確認(rèn)
當(dāng)交換機(jī)接受消息后,就要轉(zhuǎn)發(fā)給消費(fèi)者;如何保證消息不丟失?重復(fù)消費(fèi)?
SpringBoot消費(fèi)確認(rèn)
自動(dòng)確認(rèn)
-
若是業(yè)務(wù)中,一些消息發(fā)送給消費(fèi)者,若是消息出現(xiàn)異常,消費(fèi)者返回通知交換機(jī)消息出現(xiàn)了異常,交換機(jī)會(huì)將消息重新入隊(duì);
-
若是沒有確認(rèn)消息,交換機(jī)沒有收到消息,會(huì)將消息會(huì)重新放入隊(duì)列中,每次消費(fèi)者啟動(dòng)都會(huì)把以前消費(fèi)的消息重新消費(fèi);
-
SpringBoot整合RabbitMQ后, 設(shè)置參數(shù)max-attempts為最大重試次數(shù)、retry.enabled為開啟重試機(jī)制;
spring.rabbitmq.listener.direct.retry.max-attempts=5
spring.rabbitmq.listener.direct.retry.enabled=true
-
為什么要開啟重試?
不開啟重試、消費(fèi)者處理消息發(fā)生異常后, RabbitMQ會(huì)丟棄該消息, 通常業(yè)務(wù)開發(fā)中,是不允許的。 -
為什么設(shè)置重試次數(shù)?
不開啟重試次數(shù),則消息會(huì)一直重新入隊(duì),占用內(nèi)存,若是錯(cuò)誤消息過多,RabbitMQ內(nèi)存爆了;
手動(dòng)確認(rèn)
在手動(dòng)確認(rèn)的模式下,不管是消費(fèi)成功還是消費(fèi)失敗,一定要記得確認(rèn)消息,不然消息會(huì)一直處于unack狀態(tài),直到消費(fèi)者進(jìn)程重啟或者停止。
設(shè)置參數(shù):
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費(fèi)者:
通過使用channel#basicAck, basicNack, basicReject完成;
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public class ConfirmConsumer {// @RabbitHandler : 標(biāo)記的方法只能有一個(gè)參數(shù),類型為String ,若是傳Map參數(shù)、則需要傳入map參數(shù)// @RabbitListener:標(biāo)記的方法可以傳入Channel, Message參數(shù)@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);System.out.println("消息ID : " + message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒絕確認(rèn)消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒絕消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublic void listenObjectQueue2(Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);}}
-
確認(rèn)收到一個(gè)或多個(gè)消息
- void basicAck(long deliveryTag, boolean multiple) throws IOException;
- deliveryTag :消息傳遞標(biāo)識(shí)
- multiple:是否批量確認(rèn),為true,則確認(rèn)后,其他消息deliveryTag小于當(dāng)前消息的deliveryTag的消息全部變?yōu)榇_認(rèn);(慎重)
-
拒絕一個(gè)或多個(gè)消息:
- void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag:消息的傳遞標(biāo)識(shí)。
- multiple: 如果為true,則拒絕所有consumer獲得的小于deliveryTag的消息。(慎重)
- requeue: 設(shè)置為true 會(huì)把消費(fèi)失敗的消息從新添加到隊(duì)列的尾端,設(shè)置為false不會(huì)重新回到隊(duì)列。
-
拒絕一個(gè)消息:
- void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:消息的傳遞標(biāo)識(shí)。
- requeue: 設(shè)置為false 表示不再重新入隊(duì),如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列。
-
channel.basicNack 與 channel.basicReject 的區(qū)別在于basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
個(gè)人還是推薦手動(dòng)確認(rèn),可控性更高;
SpringBoot消費(fèi)者手動(dòng)確認(rèn)調(diào)用的API與RabbitMQClient原生API一致,都是通過這三個(gè)方法完成確認(rèn)操作;
業(yè)務(wù)開發(fā)中,@RabbitHandler注解用的少,因?yàn)樽⒔鈽?biāo)記的方法只能傳入 消息內(nèi)容參數(shù), 無法傳Channel, Message, 獲取到的消息有限, 而@RabbitListener則相反;