怎么快速推廣自己的二維碼北京seo排名收費(fèi)
1?背景
A服務(wù)作為生產(chǎn)者,每天發(fā)送上千萬(wàn)的mq消息,每一個(gè)消息包含500個(gè)用戶ids數(shù)據(jù)。B服務(wù)作為消費(fèi)者,接受MQ消息并通過(guò)http調(diào)用第三方請(qǐng)求進(jìn)行業(yè)務(wù)處理,消費(fèi)組啟用了rabbitmq的多線程消費(fèi)組,一個(gè)實(shí)例并發(fā)40個(gè)mq消費(fèi)者線程,每個(gè)線程一次獲取10個(gè)消息進(jìn)行消費(fèi)。
Mq消費(fèi)者配置如下:
# mq配置rabbitmq:connection-timeout: 15000cache:channel:size: 200# 消息發(fā)送到rabbitmq broker cluster需要回調(diào)publisher-confirms: true# 交換機(jī)將消息投遞至隊(duì)列失敗時(shí)需要回調(diào)publisher-returns: truelistener:# 手動(dòng)確認(rèn)消息已被消費(fèi)simple:acknowledge-mode: manual# consumer的并發(fā)數(shù)concurrency: 40max-concurrency: 50# 每個(gè)消息者每次取10條prefetch: 10
Mq擠壓消息如下
2?排查
2.1 復(fù)制rabbitmq擠壓消息數(shù)據(jù)進(jìn)行模擬復(fù)現(xiàn)
找出rabbitmq擠壓的消息,在本地模擬消費(fèi),找出沒(méi)有進(jìn)行消息確認(rèn)的原因,通過(guò)rabbitmq控制臺(tái)的Get messages功能
復(fù)制payload的消息進(jìn)行base64轉(zhuǎn)碼,轉(zhuǎn)出來(lái)的消息是亂碼不完整的,懷疑
是rabbitmq還結(jié)合了其他加密處理,放棄這種排查思路
2.2?檢查報(bào)錯(cuò)日志
rabbitmq的unack消息擠壓,那就是消費(fèi)者沒(méi)有進(jìn)行ack確認(rèn),懷疑消費(fèi)者代碼有異常導(dǎo)致沒(méi)能執(zhí)行到ack的代碼。
查詢服務(wù)器日志,沒(méi)發(fā)現(xiàn)有報(bào)錯(cuò)的日志,梳理業(yè)務(wù)代碼,消費(fèi)者使用了spring aop around機(jī)制進(jìn)行消息確認(rèn),所以不管代碼有沒(méi)有報(bào)錯(cuò),按理說(shuō)都會(huì)手動(dòng)進(jìn)行mq消息ack確認(rèn)
2.3 檢查服務(wù)是否宕機(jī)
消費(fèi)組實(shí)例數(shù)量符合服務(wù)器大小配置,因此服務(wù)器應(yīng)用沒(méi)有宕機(jī)
2.4?檢查java線程
使用IBM的TMDA工具進(jìn)行分析線程堆棧,工具下載地址
TMDA工具下載地址
TMDA工具簡(jiǎn)介
TMDA分析線程堆棧結(jié)果如下
通過(guò)分析圖,看到大量park線程,確實(shí)是符合現(xiàn)狀,應(yīng)用的線程掛起了
3?分析和解決
通過(guò)stack深度高到底排序,業(yè)務(wù)代碼存在線程等待情況,具體代碼CountDownLatch.await
3.1 結(jié)合業(yè)務(wù)代碼分析
通過(guò)上圖stack提示,找到關(guān)聯(lián)的業(yè)務(wù)代碼
偽代碼如下:
// new一個(gè)CompletableFuture
public CompletableFuture<Integer> httpCall(String tokenData){CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {long time = 3000L;try {Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}return Integer.parseInt(tokenData);});return completableFuture;}httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{// do business// ex.getMessage()// 其中ex對(duì)象為空,使用ex.getMessage()報(bào)了空指針,導(dǎo)致沒(méi)能執(zhí)行如下的countDowncountDownLatch.countDown();})
消費(fèi)者服務(wù)通過(guò)http調(diào)用第三方服務(wù),為了提高并發(fā),使用了多線程,每一組(數(shù)十個(gè)為一組)http請(qǐng)求批量調(diào)用完成后再把請(qǐng)求響應(yīng)結(jié)果異步存入數(shù)據(jù)庫(kù),
主線程使用了countDownLatch.await進(jìn)行等待,
其中whenCompleteAsync方法存在空指針問(wèn)題,導(dǎo)致沒(méi)能執(zhí)行如下的countDown方法。
這里有人會(huì)問(wèn), 上面錯(cuò)誤日志檢查步驟,不是說(shuō)日志沒(méi)有空指針異常嗎?
對(duì),子線程報(bào)了空指針,因?yàn)镃ompletableFuture執(zhí)行每次都是new 一個(gè)新的CompletableFuture對(duì)象,并把結(jié)果作為下一個(gè)CompletableFuture執(zhí)行的入?yún)?#xff0c;
通過(guò)偽代碼可以發(fā)現(xiàn),執(zhí)行whenCompleteAsync后,沒(méi)有新的CompletableFuture方法執(zhí)行,所以異常沒(méi)有拋出來(lái),使得排查變得困難
3.2?解決
因?yàn)榇嬖趙henCompleteAsync報(bào)錯(cuò)的情況,添加多一個(gè)新的異常捕獲處理方法,捕獲異常也進(jìn)行countDown的操作。
代碼如下:
httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{// do business// ex.getMessage()// 其中ex對(duì)象為空,使用ex.getMessage()報(bào)了空指針,導(dǎo)致沒(méi)能執(zhí)行如下的countDowncountDownLatch.countDown();}).exceptionally(e ->{log.info("exceptionally捕獲到異常,tokenData={}, e={}", tokenData, e.getMessage());countDownLatch.countDown();return null;});
4?結(jié)論
-
熟練CompletableFuture的使用,要看源碼的實(shí)現(xiàn)(實(shí)現(xiàn)原理cas + 多個(gè)future采用入stack,每次把前一個(gè)future的結(jié)果作為參數(shù)傳入下一個(gè)future去執(zhí)行)
-
使用多線程需要考慮異常、超時(shí)等情況
-
熟練使用jvm stack分析工具
5?文章參考
CompletableFuture流程圖
CompletableFuture參考文章如下
CompletableFuture 原理淺析