合肥網(wǎng)站建站報(bào)廣告代理在線之家
????????
目錄
1、生產(chǎn)者發(fā)送消息的 2 種方式
2、生產(chǎn)者訪問主題???????的 4 種模式
3、消息壓縮
4、消息批量發(fā)送
5、消息分塊發(fā)送
????????生產(chǎn)者(producer)是附加主題(topic)并把消息(messages)發(fā)送到 Pulsar broker 的程序。Pulsar broker 會處理接收到的消息(messages)。
1、生產(chǎn)者???????發(fā)送消息的 2 種方式
????????生產(chǎn)者(Producers)發(fā)送消息(messages)到?brokers 可以是同步的(sync),也可以是異步的(async) // 發(fā)送方式分為同步和異步兩種方式
Mode | Description |
---|---|
Sync send 同步發(fā)送 | The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure. // 生產(chǎn)者發(fā)送的每一條消息都需要等待broker的確認(rèn),如果沒有收到確認(rèn),生產(chǎn)者認(rèn)為此次消息發(fā)送失敗 |
Async send 異步發(fā)送 | The producer puts a message in a blocking queue and returns immediately. The client library sends the message to the broker in the background. If the queue is full (you can?configure?the maximum size), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer. // 生產(chǎn)者將消息放入阻塞隊(duì)列并立即返回。客戶端在后臺將消息發(fā)送給代理。如果隊(duì)列已滿(可以配置大小),則根據(jù)傳遞給生產(chǎn)者的參數(shù),生產(chǎn)者會被阻止或在調(diào)用 API 時(shí)立即失敗。 |
2、生產(chǎn)者???????訪問主題???????的 4 種模式
????????對于主題,生產(chǎn)者有以下不同類型的訪問模式:
Access mode | Description |
---|---|
| Multiple producers can publish on a topic. // 默認(rèn)配置 |
| Only one producer can publish on a topic. // 當(dāng)一個(gè)生產(chǎn)者因?yàn)榫W(wǎng)絡(luò)中斷連接,broker 會選擇一個(gè)新的 producer 成為專用生產(chǎn)者 |
| Only one producer can publish on a topic. // 如果已經(jīng)有生產(chǎn)者連接,其他生產(chǎn)者將被移除并且立即失效 |
等待獨(dú)占 | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the? // 成功獲取獨(dú)占訪問權(quán)的生產(chǎn)者將被視為一個(gè)leader,因此,如果你想為應(yīng)用程序?qū)崿F(xiàn)一個(gè)leader選舉機(jī)制,可以選用該訪問模式。需要注意的是,leader機(jī)制涉及到Pulsar的WAL日志,leader會把”決策“寫入到主題。錯(cuò)誤情況,當(dāng)leader寫入消息失敗,會得到broker的通知,通知該生產(chǎn)者將不再是一個(gè)leader。 |
注意事項(xiàng)
? ? ? ? ??一旦應(yīng)用程序成功創(chuàng)建了?Exclusive(獨(dú)占)?或?WaitForExclusive(等待獨(dú)占)訪問模式的生產(chǎn)者,那么此生產(chǎn)者需要保證是主題的唯一訪問者。任何其他試圖訪問此主題的生產(chǎn)者都會立即出錯(cuò)或者必須等待,直到他們獲得主題的獨(dú)占訪問權(quán)。// 獨(dú)占模式具有排他性
3、消息壓縮
????????可以壓縮生產(chǎn)者在傳輸過程中發(fā)布的消息。Pulsar 目前支持以下類型的壓縮:
- LZ4:LZ算法系列的一種,號稱是目前最快的壓縮算法之一
- ZLIB:zlib是用于數(shù)據(jù)壓縮的一個(gè)簡單的庫,僅支持LZ77的變種算法
- ZSTD:Facebook開源的新無損壓縮算法,優(yōu)點(diǎn)是壓縮率和壓縮/解壓縮性能都很突出
- SNAPPY:提供高速壓縮速度和合理的壓縮率。Snappy?比 zlib 更快,但文件相對要大 20% 到 100%。
????????詳情信息,請點(diǎn)擊這里。
????????壓縮的原理:假如當(dāng)前位置的一個(gè)字符串序列,在以前的歷史數(shù)據(jù)中也出現(xiàn)過,那么現(xiàn)在用一種特殊的格式或者特殊的小序列來表示它,就可以起到壓縮的效果,因?yàn)樘厥飧袷交蛘咛厥庑⌒蛄型ǔ6急仍镜淖址蛄懈 ?/p>
4、消息批量發(fā)送
????????啟用批量處理后,生產(chǎn)者(producer)在單個(gè)請求中累積并發(fā)送一批消息(messages)。批量大小由最大消息數(shù)和最大發(fā)布延遲決定。因此,backlog 大小表示批量的大小,而不是消息的大小。// 等消息積攢到一定數(shù)量再一起發(fā)送
????????在 Pulsar 中,批量消息作為單個(gè)單元而不是單個(gè)消息進(jìn)行跟蹤和存儲。消費(fèi)者需要將批量消息拆分為單個(gè)消息進(jìn)行處理。但是,即使啟用了批處理,預(yù)先計(jì)劃的 messages(通過 deliverAt 或 deliverAfter 參數(shù)配置)也始終只作為單獨(dú)的消息發(fā)送。
????????通常,當(dāng)批量中的所有消息都被消費(fèi)者確認(rèn)時(shí),該批量也會被確認(rèn)。但是,當(dāng)批量中有消息沒有被確認(rèn),或者出現(xiàn)意外的失敗,否定的確認(rèn)以及確認(rèn)超時(shí)會導(dǎo)致批量中的所有消息都重新的傳遞。
????????為了避免將批量中已確認(rèn)的消息重新發(fā)送給消費(fèi)者,Pulsar 從 Pulsar 2.6.0 開始引入批量索引確認(rèn)機(jī)制。啟用批量索引確認(rèn)機(jī)制后,消費(fèi)者會過濾出已經(jīng)確認(rèn)過的批量索引,并將批量索引確認(rèn)請求發(fā)送給代理(broker)。代理(broker)會維護(hù)和跟蹤每個(gè)批量索引的確認(rèn)狀態(tài),并避免向消費(fèi)者(consumer)發(fā)送已確認(rèn)過的消息。當(dāng)批量中的所有消息都被確認(rèn)后,該批量將會被刪除。// 批量索引會帶來內(nèi)存開銷,但是可以避免數(shù)據(jù)重復(fù)消費(fèi)
????????默認(rèn)情況下,禁用批量索引確認(rèn)機(jī)制(AcknowledgementAtBatchIndexLevelEnabled=false)。但是可以通過在代理端(broker)將 AcknowledgementAtBatchIndexLevelEnabled 參數(shù)設(shè)置為 true 來啟用批量索引確認(rèn)機(jī)制。啟用批量索引確認(rèn)機(jī)制會導(dǎo)致更多內(nèi)存開銷。
5、消息分塊發(fā)送
????????消息分塊,使 Pulsa 的生產(chǎn)者和消費(fèi)者都能處理大型有效負(fù)載消息(在生產(chǎn)者端將消息分塊,在消費(fèi)端將消息聚合)。
????????啟用消息分塊后,當(dāng)消息(messages)大小超過允許的最大負(fù)載大小( broker 的 maxMessageSize 參數(shù))時(shí),消息的工作流如下所示:
- 生產(chǎn)者(producer)將原始消息拆分為分塊消息,并將它們與分塊元數(shù)據(jù)一起按順序單獨(dú)發(fā)布到代理。
- 代理(broker)以與普通消息相同的方式將分塊消息存儲在一個(gè) managed-ledger(托管賬本)中,并使用?chunkedMessageRate?參數(shù)記錄該主題上分塊消息的速率。
- 消費(fèi)者(consumer)會緩存分塊的消息,并在接收到消息的所有分塊時(shí)將其聚合到接收隊(duì)列中。
- 客戶端(client)消費(fèi)接收隊(duì)列中的聚合消息。
消息分塊的限制:
- 只能用于持久化主題
- 只能用于獨(dú)占(exclusive)訪問和容錯(cuò)訂閱(failover subscription)類型。
- 不能和批處理同時(shí)使用
處理連續(xù)的分塊消息
????????下圖展示了處理連續(xù)分塊消息的過程。圖中,生產(chǎn)者依次往一個(gè)主題中發(fā)布分塊消息(大型消息)和非分塊消息(常規(guī)消息 M3 \ M4)。生產(chǎn)者(producer)在發(fā)布 M1 消息時(shí),把 M1 分成了 M1-C1、M1-C2 和 M1-C3 三個(gè)塊消息。broker 端會存儲所有的分塊消息(放在 managed-ledger 中),并把他們按照相同的順序發(fā)送給消費(fèi)者。消費(fèi)者(consumer)會在內(nèi)存中緩存接收到的分塊消息,直到全部接收,然后把它們聚合為原始的消息 M1,最后將原始的消息 M1 移交給客戶端(client)。
處理不連續(xù)的分塊消息
????????當(dāng)多個(gè)生產(chǎn)者將分塊消息發(fā)布到單個(gè)主題中時(shí),代理(broker)將來自不同生產(chǎn)者的所有分塊消息都存儲在同一個(gè)托管賬本(managed-ledger)中。托管賬本中的分塊消息相互交錯(cuò),如下所示,生產(chǎn)者-1將消息 M1 分為三個(gè)數(shù)據(jù)塊 M1-C1、M1-C2 和 M1-C3 進(jìn)行發(fā)布。生產(chǎn)者-2將消息? M2 也分為三個(gè)塊 M2-C1、M2-C2 和 M2-C3 進(jìn)行發(fā)布。但是特定消息的分塊消息仍處于有序狀態(tài),雖然他們在托管分類中可能不是連續(xù)的。// 分塊通過消費(fèi)者進(jìn)行合并處理后再發(fā)送發(fā)給客戶端
注意事項(xiàng)
????????在這種情況下,交錯(cuò)的分塊消息可能會給消費(fèi)者帶來一些內(nèi)存壓力,因?yàn)橄M(fèi)者為每個(gè)大型消息都保留了單獨(dú)的緩沖區(qū),以便將其所有的分塊消息聚合成一條消息。通過配置maxPendingChunkedMessage參數(shù),可以限制消費(fèi)者并發(fā)維護(hù)的最大分塊消息數(shù)量。當(dāng)維護(hù)量達(dá)到數(shù)量閾值時(shí),消費(fèi)者會暫時(shí)丟棄這些消息,隨后不發(fā)送消息確認(rèn),或者要求代理重傳來進(jìn)行消息補(bǔ)償,從而優(yōu)化內(nèi)存利用率。
啟用消息分塊
????????前提條件:通過將?enableBatching?參數(shù)設(shè)置為 false 來禁用批量處理。
????????默認(rèn)情況下,消息分塊功能處于關(guān)閉狀態(tài)。要啟用消息分塊,請?jiān)趧?chuàng)建生產(chǎn)者時(shí)將?chunkingEnabled?參數(shù)設(shè)置為 true。
注意事項(xiàng)
????????如果消費(fèi)者未能在指定的時(shí)間段內(nèi)收到消息的所有分塊,那么不完整的分塊消息將過期。過期的時(shí)間默認(rèn)值為1分鐘。有關(guān)?ExpireTimeofCompletechUnkedMessage?參數(shù)的更多信息,請參閱 org。
點(diǎn)擊回到首頁