石家莊網(wǎng)絡(luò)公司查封??趕eo計(jì)費(fèi)
Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費(fèi)者,本文分別從使用方式、實(shí)現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費(fèi)者。
背景信息?
Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費(fèi)者類型,每種消費(fèi)者類型的集成方式和控制方式都不一樣。了解如下問題,可以幫助您選擇更匹配業(yè)務(wù)場景的消費(fèi)者類型。
-
如何實(shí)現(xiàn)并發(fā)消費(fèi):消費(fèi)者如何使用并發(fā)的多線程機(jī)制處理消息,以此提高消息處理效率?
-
如何實(shí)現(xiàn)同步、異步消息處理:對于不同的集成場景,消費(fèi)者獲取消息后可能會(huì)將消息異步分發(fā)到業(yè)務(wù)邏輯中處理,此時(shí),消息異步化處理如何實(shí)現(xiàn)?
-
如何實(shí)現(xiàn)消息可靠處理:消費(fèi)者處理消息時(shí)如何返回響應(yīng)結(jié)果?如何在消息異常情況進(jìn)行重試,保證消息的可靠處理?
以上問題的具體答案,請參考下文。
功能概述
如上圖所示, Apache RocketMQ 的消費(fèi)者處理消息時(shí)主要經(jīng)過以下階段:消息獲取--->消息處理--->消費(fèi)狀態(tài)提交。
針對以上幾個(gè)階段,Apache RocketMQ 提供了不同的消費(fèi)者類型: PushConsumer 、SimpleConsumer 和 PullConsumer。這幾種類型的消費(fèi)者通過不同的實(shí)現(xiàn)方式和接口可滿足您在不同業(yè)務(wù)場景下的消費(fèi)需求。具體差異如下:
在實(shí)際使用場景中,PullConsumer 僅推薦在流處理框架中集成使用,大多數(shù)消息收發(fā)場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求。
若您的業(yè)務(wù)場景發(fā)生變更,或您當(dāng)前使用的消費(fèi)者類型不適合當(dāng)前業(yè)務(wù),您可以選擇在 PushConsumer 和SimpleConsumer 之間變更消費(fèi)者類型。變更消費(fèi)者類型不影響當(dāng)前Apache RocketMQ 資源的使用和業(yè)務(wù)處理。
危險(xiǎn)
生產(chǎn)環(huán)境中相同的 ConsumerGroup 下嚴(yán)禁混用 PullConsumer 和其他兩種消費(fèi)者,否則會(huì)導(dǎo)致消息消費(fèi)異常。
PushConsumer?
PushConsumers是一種高度封裝的消費(fèi)者類型,消費(fèi)消息僅通過消費(fèi)監(jiān)聽器處理業(yè)務(wù)并返回消費(fèi)結(jié)果。消息的獲取、消費(fèi)狀態(tài)提交以及消費(fèi)重試都通過 Apache RocketMQ 的客戶端SDK完成。
使用方式
PushConsumer的使用方式比較固定,在消費(fèi)者初始化時(shí)注冊一個(gè)消費(fèi)監(jiān)聽器,并在消費(fèi)監(jiān)聽器內(nèi)部實(shí)現(xiàn)消息處理邏輯。由 Apache RocketMQ 的SDK在后臺(tái)完成消息獲取、觸發(fā)監(jiān)聽器調(diào)用以及進(jìn)行消息重試處理。
示例代碼如下:
// 消費(fèi)示例:使用PushConsumer消費(fèi)普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()// 設(shè)置消費(fèi)者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點(diǎn)。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置消費(fèi)監(jiān)聽器。.setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {// 消費(fèi)消息并返回處理結(jié)果。return ConsumeResult.SUCCESS;}}).build();
PushConsumer的消費(fèi)監(jiān)聽器執(zhí)行結(jié)果分為以下三種情況:
-
返回消費(fèi)成功:以Java SDK為例,返回
ConsumeResult.SUCCESS
,表示該消息處理成功,服務(wù)端按照消費(fèi)結(jié)果更新消費(fèi)進(jìn)度。 -
返回消費(fèi)失敗:以Java SDK為例,返回
ConsumeResult.FAILURE
,表示該消息處理失敗,需要根據(jù)消費(fèi)重試邏輯判斷是否進(jìn)行重試消費(fèi)。 -
出現(xiàn)非預(yù)期失敗:例如拋異常等行為,該結(jié)果按照消費(fèi)失敗處理,需要根據(jù)消費(fèi)重試邏輯判斷是否進(jìn)行重試消費(fèi)。
PushConsumer 消費(fèi)消息時(shí),若消息處理邏輯出現(xiàn)預(yù)期之外的阻塞導(dǎo)致消息處理一直無法執(zhí)行成功,SDK會(huì)按照消費(fèi)超時(shí)處理強(qiáng)制提交消費(fèi)失敗結(jié)果,并按照消費(fèi)重試邏輯進(jìn)行處理。消息超時(shí),請參見PushConsumer消費(fèi)重試策略。
出現(xiàn)消費(fèi)超時(shí)情況時(shí),SDK雖然提交消費(fèi)失敗結(jié)果,但是當(dāng)前消費(fèi)線程可能仍然無法響應(yīng)中斷,還會(huì)繼續(xù)處理消息。
內(nèi)部原理
在PushConsumer類型中,消息的實(shí)時(shí)處理能力是基于SDK內(nèi)部的典型Reactor線程模型實(shí)現(xiàn)的。如下圖所示,SDK內(nèi)置了一個(gè)長輪詢線程,先將消息異步拉取到SDK內(nèi)置的緩存隊(duì)列中,再分別提交到消費(fèi)線程中,觸發(fā)監(jiān)聽器執(zhí)行本地消費(fèi)邏輯。
?可靠性重試
PushConsumer 消費(fèi)者類型中,客戶端SDK和消費(fèi)邏輯的唯一邊界是消費(fèi)監(jiān)聽器接口。客戶端SDK嚴(yán)格按照監(jiān)聽器的返回結(jié)果判斷消息是否消費(fèi)成功,并做可靠性重試。所有消息必須以同步方式進(jìn)行消費(fèi)處理,并在監(jiān)聽器接口結(jié)束時(shí)返回調(diào)用結(jié)果,不允許再做異步化分發(fā)。消息重試具體信息,請參見PushConsumer消費(fèi)重試策略。
使用PushConsumer消費(fèi)者消費(fèi)時(shí),不允許使用以下方式處理消息,否則 Apache RocketMQ 無法保證消息的可靠性。
-
錯(cuò)誤方式一:消息還未處理完成,就提前返回消費(fèi)成功結(jié)果。此時(shí)如果消息消費(fèi)失敗,Apache RocketMQ 服務(wù)端是無法感知的,因此不會(huì)進(jìn)行消費(fèi)重試。
-
錯(cuò)誤方式二:在消費(fèi)監(jiān)聽器內(nèi)將消息再次分發(fā)到自定義的其他線程,消費(fèi)監(jiān)聽器提前返回消費(fèi)結(jié)果。此時(shí)如果消息消費(fèi)失敗,Apache RocketMQ 服務(wù)端同樣無法感知,因此也不會(huì)進(jìn)行消費(fèi)重試。
順序性保障
基于 Apache RocketMQ?順序消息的定義,如果消費(fèi)者分組設(shè)置了順序消費(fèi)模式,則PushConsumer在觸發(fā)消費(fèi)監(jiān)聽器時(shí),嚴(yán)格遵循消息的先后順序。業(yè)務(wù)處理邏輯無感知即可保證消息的消費(fèi)順序。
適用場景
PushConsumer嚴(yán)格限制了消息同步處理及每條消息的處理超時(shí)時(shí)間,適用于以下場景:
-
消息處理時(shí)間可預(yù)估:如果不確定消息處理耗時(shí),經(jīng)常有預(yù)期之外的長時(shí)間耗時(shí)的消息,PushConsumer的可靠性保證會(huì)頻繁觸發(fā)消息重試機(jī)制造成大量重復(fù)消息。
-
無異步化、高級定制場景:PushConsumer限制了消費(fèi)邏輯的線程模型,由客戶端SDK內(nèi)部按最大吞吐量觸發(fā)消息處理。該模型開發(fā)邏輯簡單,但是不允許使用異步化和自定義處理流程。
SimpleConsumer
SimpleConsumer 是一種接口原子型的消費(fèi)者類型,消息的獲取、消費(fèi)狀態(tài)提交以及消費(fèi)重試都是通過消費(fèi)者業(yè)務(wù)邏輯主動(dòng)發(fā)起調(diào)用完成
使用方式
SimpleConsumer 的使用涉及多個(gè)接口調(diào)用,由業(yè)務(wù)邏輯按需調(diào)用接口獲取消息,然后分發(fā)給業(yè)務(wù)線程處理消息,最后按照處理的結(jié)果調(diào)用提交接口,返回服務(wù)端當(dāng)前消息的處理結(jié)果。示例如下:
// 消費(fèi)示例:使用 SimpleConsumer 消費(fèi)普通消息,主動(dòng)獲取消息處理并提交。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()// 設(shè)置消費(fèi)者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點(diǎn)。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置從服務(wù)端接受消息的最大等待時(shí)間.setAwaitDuration(Duration.ofSeconds(1)).build();
try {// SimpleConsumer 需要主動(dòng)獲取消息,并處理。List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);// 消費(fèi)處理完成后,需要主動(dòng)調(diào)用 ACK 提交消費(fèi)結(jié)果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);}});
} catch (ClientException e) {// 如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。logger.error("Failed to receive message", e);
}
SimpleConsumer主要涉及以下幾個(gè)接口行為:
?可靠性重試
SimpleConsumer消費(fèi)者類型中,客戶端SDK和服務(wù)端通過ReceiveMessage
和AckMessage
接口通信??蛻舳薙DK如果處理消息成功則調(diào)用AckMessage
接口;如果處理失敗只需要不回復(fù)ACK響應(yīng),即可在定義的消費(fèi)不可見時(shí)間到達(dá)后觸發(fā)消費(fèi)重試流程。更多信息,請參見SimpleConsumer消費(fèi)重試策略。
順序性保障
基于 Apache RocketMQ?順序消息的定義,SimpleConsumer在處理順序消息時(shí),會(huì)按照消息存儲(chǔ)的先后順序獲取消息。即需要保持順序的一組消息中,如果前面的消息未處理完成,則無法獲取到后面的消息。
適用場景
SimpleConsumer提供原子接口,用于消息獲取和提交消費(fèi)結(jié)果,相對于PushConsumer方式更加靈活。SimpleConsumer適用于以下場景:
-
消息處理時(shí)長不可控:如果消息處理時(shí)長無法預(yù)估,經(jīng)常有長時(shí)間耗時(shí)的消息處理情況。建議使用SimpleConsumer消費(fèi)類型,可以在消費(fèi)時(shí)自定義消息的預(yù)估處理時(shí)長,若實(shí)際業(yè)務(wù)中預(yù)估的消息處理時(shí)長不符合預(yù)期,也可以通過接口提前修改。
-
需要異步化、批量消費(fèi)等高級定制場景:SimpleConsumer在SDK內(nèi)部沒有復(fù)雜的線程封裝,完全由業(yè)務(wù)邏輯自由定制,可以實(shí)現(xiàn)異步分發(fā)、批量消費(fèi)等高級定制場景。
-
需要自定義消費(fèi)速率:SimpleConsumer是由業(yè)務(wù)邏輯主動(dòng)調(diào)用接口獲取消息,因此可以自由調(diào)整獲取消息的頻率,自定義控制消費(fèi)速率。
PullConsumer
使用建議?
PushConsumer合理控制消費(fèi)耗時(shí),避免無限阻塞
對于PushConsumer消費(fèi)類型,需要嚴(yán)格控制消息的消費(fèi)耗時(shí),盡量避免出現(xiàn)消息處理超時(shí)導(dǎo)致消息重復(fù)。如果業(yè)務(wù)經(jīng)常會(huì)出現(xiàn)一些預(yù)期外的長時(shí)間耗時(shí)的消息,建議使用SimpleConsumer,并設(shè)置好消費(fèi)不可見時(shí)間。