中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

石家莊網(wǎng)絡(luò)公司查封??趕eo計(jì)費(fèi)

石家莊網(wǎng)絡(luò)公司查封,??趕eo計(jì)費(fèi),商城網(wǎng)站建設(shè)經(jīng)驗(yàn),深圳品牌設(shè)計(jì)公司排行榜Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費(fèi)者,本文分別從使用方式、實(shí)現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費(fèi)者。 背景信息? Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費(fèi)者類型&…

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費(fèi)者,本文分別從使用方式、實(shí)現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費(fèi)者。

背景信息?

Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費(fèi)者類型,每種消費(fèi)者類型的集成方式和控制方式都不一樣。了解如下問題,可以幫助您選擇更匹配業(yè)務(wù)場景的消費(fèi)者類型。

  • 如何實(shí)現(xiàn)并發(fā)消費(fèi):消費(fèi)者如何使用并發(fā)的多線程機(jī)制處理消息,以此提高消息處理效率?

  • 如何實(shí)現(xiàn)同步、異步消息處理:對于不同的集成場景,消費(fèi)者獲取消息后可能會(huì)將消息異步分發(fā)到業(yè)務(wù)邏輯中處理,此時(shí),消息異步化處理如何實(shí)現(xiàn)?

  • 如何實(shí)現(xiàn)消息可靠處理:消費(fèi)者處理消息時(shí)如何返回響應(yīng)結(jié)果?如何在消息異常情況進(jìn)行重試,保證消息的可靠處理?

以上問題的具體答案,請參考下文。

功能概述

消息消費(fèi)流程

如上圖所示, Apache RocketMQ 的消費(fèi)者處理消息時(shí)主要經(jīng)過以下階段:消息獲取--->消息處理--->消費(fèi)狀態(tài)提交。

針對以上幾個(gè)階段,Apache RocketMQ 提供了不同的消費(fèi)者類型: PushConsumer 、SimpleConsumer 和 PullConsumer。這幾種類型的消費(fèi)者通過不同的實(shí)現(xiàn)方式和接口可滿足您在不同業(yè)務(wù)場景下的消費(fèi)需求。具體差異如下:

在實(shí)際使用場景中,PullConsumer 僅推薦在流處理框架中集成使用,大多數(shù)消息收發(fā)場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求。

若您的業(yè)務(wù)場景發(fā)生變更,或您當(dāng)前使用的消費(fèi)者類型不適合當(dāng)前業(yè)務(wù),您可以選擇在 PushConsumer 和SimpleConsumer 之間變更消費(fèi)者類型。變更消費(fèi)者類型不影響當(dāng)前Apache RocketMQ 資源的使用和業(yè)務(wù)處理。

危險(xiǎn)

生產(chǎn)環(huán)境中相同的 ConsumerGroup 下嚴(yán)禁混用 PullConsumer 和其他兩種消費(fèi)者,否則會(huì)導(dǎo)致消息消費(fèi)異常。

PushConsumer?

PushConsumers是一種高度封裝的消費(fèi)者類型,消費(fèi)消息僅通過消費(fèi)監(jiān)聽器處理業(yè)務(wù)并返回消費(fèi)結(jié)果。消息的獲取、消費(fèi)狀態(tài)提交以及消費(fèi)重試都通過 Apache RocketMQ 的客戶端SDK完成。

使用方式

PushConsumer的使用方式比較固定,在消費(fèi)者初始化時(shí)注冊一個(gè)消費(fèi)監(jiān)聽器,并在消費(fèi)監(jiān)聽器內(nèi)部實(shí)現(xiàn)消息處理邏輯。由 Apache RocketMQ 的SDK在后臺(tái)完成消息獲取、觸發(fā)監(jiān)聽器調(diào)用以及進(jìn)行消息重試處理。

示例代碼如下:

// 消費(fèi)示例:使用PushConsumer消費(fèi)普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()// 設(shè)置消費(fèi)者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點(diǎn)。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置消費(fèi)監(jiān)聽器。.setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {// 消費(fèi)消息并返回處理結(jié)果。return ConsumeResult.SUCCESS;}}).build();

PushConsumer的消費(fèi)監(jiān)聽器執(zhí)行結(jié)果分為以下三種情況:

  • 返回消費(fèi)成功:以Java SDK為例,返回ConsumeResult.SUCCESS,表示該消息處理成功,服務(wù)端按照消費(fèi)結(jié)果更新消費(fèi)進(jìn)度。

  • 返回消費(fèi)失敗:以Java SDK為例,返回ConsumeResult.FAILURE,表示該消息處理失敗,需要根據(jù)消費(fèi)重試邏輯判斷是否進(jìn)行重試消費(fèi)。

  • 出現(xiàn)非預(yù)期失敗:例如拋異常等行為,該結(jié)果按照消費(fèi)失敗處理,需要根據(jù)消費(fèi)重試邏輯判斷是否進(jìn)行重試消費(fèi)。

PushConsumer 消費(fèi)消息時(shí),若消息處理邏輯出現(xiàn)預(yù)期之外的阻塞導(dǎo)致消息處理一直無法執(zhí)行成功,SDK會(huì)按照消費(fèi)超時(shí)處理強(qiáng)制提交消費(fèi)失敗結(jié)果,并按照消費(fèi)重試邏輯進(jìn)行處理。消息超時(shí),請參見PushConsumer消費(fèi)重試策略。

出現(xiàn)消費(fèi)超時(shí)情況時(shí),SDK雖然提交消費(fèi)失敗結(jié)果,但是當(dāng)前消費(fèi)線程可能仍然無法響應(yīng)中斷,還會(huì)繼續(xù)處理消息。

內(nèi)部原理

在PushConsumer類型中,消息的實(shí)時(shí)處理能力是基于SDK內(nèi)部的典型Reactor線程模型實(shí)現(xiàn)的。如下圖所示,SDK內(nèi)置了一個(gè)長輪詢線程,先將消息異步拉取到SDK內(nèi)置的緩存隊(duì)列中,再分別提交到消費(fèi)線程中,觸發(fā)監(jiān)聽器執(zhí)行本地消費(fèi)邏輯。

PushConsumer原理

?可靠性重試

PushConsumer 消費(fèi)者類型中,客戶端SDK和消費(fèi)邏輯的唯一邊界是消費(fèi)監(jiān)聽器接口。客戶端SDK嚴(yán)格按照監(jiān)聽器的返回結(jié)果判斷消息是否消費(fèi)成功,并做可靠性重試。所有消息必須以同步方式進(jìn)行消費(fèi)處理,并在監(jiān)聽器接口結(jié)束時(shí)返回調(diào)用結(jié)果,不允許再做異步化分發(fā)。消息重試具體信息,請參見PushConsumer消費(fèi)重試策略。

使用PushConsumer消費(fèi)者消費(fèi)時(shí),不允許使用以下方式處理消息,否則 Apache RocketMQ 無法保證消息的可靠性。

  • 錯(cuò)誤方式一:消息還未處理完成,就提前返回消費(fèi)成功結(jié)果。此時(shí)如果消息消費(fèi)失敗,Apache RocketMQ 服務(wù)端是無法感知的,因此不會(huì)進(jìn)行消費(fèi)重試。

  • 錯(cuò)誤方式二:在消費(fèi)監(jiān)聽器內(nèi)將消息再次分發(fā)到自定義的其他線程,消費(fèi)監(jiān)聽器提前返回消費(fèi)結(jié)果。此時(shí)如果消息消費(fèi)失敗,Apache RocketMQ 服務(wù)端同樣無法感知,因此也不會(huì)進(jìn)行消費(fèi)重試。

順序性保障

基于 Apache RocketMQ?順序消息的定義,如果消費(fèi)者分組設(shè)置了順序消費(fèi)模式,則PushConsumer在觸發(fā)消費(fèi)監(jiān)聽器時(shí),嚴(yán)格遵循消息的先后順序。業(yè)務(wù)處理邏輯無感知即可保證消息的消費(fèi)順序。

適用場景

PushConsumer嚴(yán)格限制了消息同步處理及每條消息的處理超時(shí)時(shí)間,適用于以下場景:

  • 消息處理時(shí)間可預(yù)估:如果不確定消息處理耗時(shí),經(jīng)常有預(yù)期之外的長時(shí)間耗時(shí)的消息,PushConsumer的可靠性保證會(huì)頻繁觸發(fā)消息重試機(jī)制造成大量重復(fù)消息。

  • 無異步化、高級定制場景:PushConsumer限制了消費(fèi)邏輯的線程模型,由客戶端SDK內(nèi)部按最大吞吐量觸發(fā)消息處理。該模型開發(fā)邏輯簡單,但是不允許使用異步化和自定義處理流程。

SimpleConsumer

SimpleConsumer 是一種接口原子型的消費(fèi)者類型,消息的獲取、消費(fèi)狀態(tài)提交以及消費(fèi)重試都是通過消費(fèi)者業(yè)務(wù)邏輯主動(dòng)發(fā)起調(diào)用完成

使用方式

SimpleConsumer 的使用涉及多個(gè)接口調(diào)用,由業(yè)務(wù)邏輯按需調(diào)用接口獲取消息,然后分發(fā)給業(yè)務(wù)線程處理消息,最后按照處理的結(jié)果調(diào)用提交接口,返回服務(wù)端當(dāng)前消息的處理結(jié)果。示例如下:

// 消費(fèi)示例:使用 SimpleConsumer 消費(fèi)普通消息,主動(dòng)獲取消息處理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()// 設(shè)置消費(fèi)者分組。.setConsumerGroup("YourConsumerGroup")// 設(shè)置接入點(diǎn)。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())// 設(shè)置預(yù)綁定的訂閱關(guān)系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 設(shè)置從服務(wù)端接受消息的最大等待時(shí)間.setAwaitDuration(Duration.ofSeconds(1)).build();
try {// SimpleConsumer 需要主動(dòng)獲取消息,并處理。List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);// 消費(fèi)處理完成后,需要主動(dòng)調(diào)用 ACK 提交消費(fèi)結(jié)果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);}});
} catch (ClientException e) {// 如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。logger.error("Failed to receive message", e);
}

SimpleConsumer主要涉及以下幾個(gè)接口行為:

?可靠性重試

SimpleConsumer消費(fèi)者類型中,客戶端SDK和服務(wù)端通過ReceiveMessageAckMessage接口通信??蛻舳薙DK如果處理消息成功則調(diào)用AckMessage接口;如果處理失敗只需要不回復(fù)ACK響應(yīng),即可在定義的消費(fèi)不可見時(shí)間到達(dá)后觸發(fā)消費(fèi)重試流程。更多信息,請參見SimpleConsumer消費(fèi)重試策略。

順序性保障

基于 Apache RocketMQ?順序消息的定義,SimpleConsumer在處理順序消息時(shí),會(huì)按照消息存儲(chǔ)的先后順序獲取消息。即需要保持順序的一組消息中,如果前面的消息未處理完成,則無法獲取到后面的消息。

適用場景

SimpleConsumer提供原子接口,用于消息獲取和提交消費(fèi)結(jié)果,相對于PushConsumer方式更加靈活。SimpleConsumer適用于以下場景:

  • 消息處理時(shí)長不可控:如果消息處理時(shí)長無法預(yù)估,經(jīng)常有長時(shí)間耗時(shí)的消息處理情況。建議使用SimpleConsumer消費(fèi)類型,可以在消費(fèi)時(shí)自定義消息的預(yù)估處理時(shí)長,若實(shí)際業(yè)務(wù)中預(yù)估的消息處理時(shí)長不符合預(yù)期,也可以通過接口提前修改。

  • 需要異步化、批量消費(fèi)等高級定制場景:SimpleConsumer在SDK內(nèi)部沒有復(fù)雜的線程封裝,完全由業(yè)務(wù)邏輯自由定制,可以實(shí)現(xiàn)異步分發(fā)、批量消費(fèi)等高級定制場景。

  • 需要自定義消費(fèi)速率:SimpleConsumer是由業(yè)務(wù)邏輯主動(dòng)調(diào)用接口獲取消息,因此可以自由調(diào)整獲取消息的頻率,自定義控制消費(fèi)速率。

PullConsumer

使用建議?

PushConsumer合理控制消費(fèi)耗時(shí),避免無限阻塞

對于PushConsumer消費(fèi)類型,需要嚴(yán)格控制消息的消費(fèi)耗時(shí),盡量避免出現(xiàn)消息處理超時(shí)導(dǎo)致消息重復(fù)。如果業(yè)務(wù)經(jīng)常會(huì)出現(xiàn)一些預(yù)期外的長時(shí)間耗時(shí)的消息,建議使用SimpleConsumer,并設(shè)置好消費(fèi)不可見時(shí)間。

http://www.risenshineclean.com/news/39412.html

相關(guān)文章:

  • 鄭州網(wǎng)站建設(shè)電話seo外鏈推廣員
  • 下載app到手機(jī)seo系統(tǒng)是什么
  • 外貿(mào)網(wǎng)站開發(fā)公司百度上怎么發(fā)布信息啊
  • 臨沂蒼山網(wǎng)站建設(shè)百度聯(lián)盟
  • wordpress 電話鶴壁seo推廣
  • 做任務(wù)賺q紅包的網(wǎng)站百度統(tǒng)計(jì)收費(fèi)嗎
  • ps網(wǎng)站頭部如何優(yōu)化培訓(xùn)方式
  • 找別人做網(wǎng)站需要注意什么百度權(quán)重10的網(wǎng)站
  • 臨沂網(wǎng)站建設(shè)設(shè)計(jì)百度識(shí)圖網(wǎng)站
  • 網(wǎng)上服裝設(shè)計(jì)培訓(xùn)班seo推廣具體做什么
  • 鄭州網(wǎng)站seo優(yōu)微信朋友圈廣告在哪里做
  • 廣州小型網(wǎng)站建設(shè)公司平面設(shè)計(jì)正規(guī)培訓(xùn)機(jī)構(gòu)
  • 視頻彈幕網(wǎng)站怎么做百度搜索提交入口
  • 個(gè)人做商城網(wǎng)站大概多少錢友鏈網(wǎng)站
  • 做網(wǎng)站商城開發(fā)什么語言最快seo站內(nèi)優(yōu)化培訓(xùn)
  • 煙臺(tái)企業(yè)網(wǎng)站開發(fā)清博大數(shù)據(jù)輿情監(jiān)測平臺(tái)
  • 自己的主機(jī)做服務(wù)器網(wǎng)站如何備案網(wǎng)站多少錢
  • 網(wǎng)站建設(shè)成果seo 頁面鏈接優(yōu)化
  • wordpress不用郵件驗(yàn)證注冊谷歌seo服務(wù)
  • 哪家網(wǎng)站專門做折扣銷售seo搜索優(yōu)化網(wǎng)站推廣排名
  • wordpress 搜索詞天津seo顧問
  • 網(wǎng)站建設(shè)哪里可以學(xué)seo軟件系統(tǒng)
  • wordpress ip security重慶seo網(wǎng)站
  • wordpress用qq登錄上海搜索排名優(yōu)化公司
  • 記事本做網(wǎng)站插圖片安卓內(nèi)核級優(yōu)化神器
  • 網(wǎng)站系統(tǒng)建設(shè)需要什么資質(zhì)競價(jià)網(wǎng)站推廣
  • 織夢視頻網(wǎng)站模板今天最新新聞
  • 地方旅游網(wǎng)站建設(shè)方案自己可以創(chuàng)建網(wǎng)站嗎
  • 國內(nèi)html5視頻網(wǎng)站建設(shè)網(wǎng)站分析培訓(xùn)班
  • 做個(gè)簡單網(wǎng)站大概多少錢中文搜索引擎有哪些平臺(tái)