具有營銷型網(wǎng)站的公司網(wǎng)站域名在哪里查詢
目錄
- 一、為什么需要帶有 subscribe 的 group.id
- 二、我們需要使用commitSync手動提交偏移量嗎?
- 三、如果我想手動提交偏移量,該怎么做?
一、為什么需要帶有 subscribe 的 group.id
- 消費概念:
Kafka 使用消費者組的概念來實現(xiàn)主題的并行消費 - 每條消息都將在每個消費者組中傳遞一次,無論該組中實際有多少個消費者。所以 group 參數(shù)是強(qiáng)制性的,如果沒有組,Kafka 將不知道如何對待訂閱同一主題的其他消費者。 - 偏移量:
每當(dāng)我們啟動一個消費者時,它都會加入一個消費者組,然后根據(jù)該消費者組中的其他消費者數(shù)量,為其分配要讀取的分區(qū)。對于這些分區(qū),它會檢查列表讀取偏移量是否已知,如果找到,它將從這一點開始讀取消息。如果沒有找到偏移量,則參數(shù) auto.offset.reset 控制是從分區(qū)中最早的消息還是從最新的消息開始讀取。
二、我們需要使用commitSync手動提交偏移量嗎?
-
是否需要手動提交偏移?
是否需要提交偏移量取決于作為參數(shù) enable.auto.commit 選擇的值。默認(rèn)情況下,此設(shè)置為 true,這意味著消費者將定期自動提交其偏移量(由auto.commit.interval.ms 決定提交的頻率)。如果將其設(shè)置為 false,那么將需要自己提交偏移量。這種默認(rèn)行為可能也是導(dǎo)致很多發(fā)現(xiàn) kafka 總是從最新的開始消費的原因,由于偏移量是自動提交的,因此它將使用該偏移量。 -
有沒有辦法從頭開始重播消息?
如果想每次都從頭開始讀取,可以調(diào)用seekToBeginning,如果不帶參數(shù)調(diào)用,它將重置為所有訂閱分區(qū)中的第一條消息,或者僅重置您傳入的那些分區(qū)。 -
seekToBeginning
查找每個給定分區(qū)的第一個偏移量。poll(long) 該函數(shù)延遲計算,僅在調(diào)用或時才查找所有分區(qū)中的第一個偏移量position(TopicPartition)。如果未提供分區(qū),則查找所有當(dāng)前分配的分區(qū)的第一個偏移量。public class MyListener implements ConsumerSeekAware {...@Overridepublic void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}
-
有沒有辦法從最后開始重播消息?
有的,可以使用 seekToEnd() 查找所有分配的分區(qū)到最后?;蛘呤褂?seekToTimestamp(long time)- 查找所有分配的分區(qū)到該時間戳表示的偏移量。public class MyListener extends AbstractConsumerSeekAware {@KafkaListener(...)void listn(...) {...} }public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}
三、如果我想手動提交偏移量,該怎么做?
-
1、禁用自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
提交方法
對于手動提交,KafkaConsumers提供了兩種方法,即 commitSync() 和 commitAsync()。commitSync()是一個阻塞調(diào)用,在偏移量成功提交后返回,commitAsync()則立即返回。如果想知道提交是否成功,可以為回調(diào)處理程序 ( OffsetCommitCallback) 提供一個方法參數(shù)。請注意,在兩次提交調(diào)用中,消費者都會提交最新poll()調(diào)用的偏移量。
舉個例子:假設(shè)一個分區(qū)主題有一個消費者并且最后一次調(diào)用poll()返回偏移量為 4、5、6 的消息。提交時,偏移量 6 將被提交,因為這是消費者客戶端跟蹤的最新偏移量。
同時,commitSync() 和 commitAsync() 都允許更多地控制我們想要提交的偏移量:如果你使用允許你指定的相應(yīng)重載,那么Map<TopicPartition, OffsetAndMetadata>消費者將僅提交指定的偏移量(即,映射可以包含分配的分區(qū)的任何子集) ,并且指定的偏移量可以為任意值)。 -
同步提交:
阻塞線程,直到提交成功或遇到不可恢復(fù)的錯誤(在這種情況下,它被拋出給調(diào)用者)while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitSync();} }
對于 for 循環(huán)中的每次迭代,只有在consumer.commitSync()成功返回或因拋出異常而中斷后,代碼才會移至下一次迭代。
-
異步提交:
是一種非阻塞方法。調(diào)用它不會阻塞線程。相反,它將繼續(xù)處理以下指令,無論最終是成功還是失敗。while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitAsync(callback);} }
對于 for 循環(huán)中的每次迭代,無論consumer.commitAsync()最終會發(fā)生什么,代碼都會移至下一次迭代。并且,提交的結(jié)果將由定義的回調(diào)函數(shù)處理。
-
權(quán)衡:延遲與數(shù)據(jù)一致性
1、如果必須確保數(shù)據(jù)一致性,請選擇commitSync(),因為它將確保在執(zhí)行任何進(jìn)一步操作之前,你將知道偏移量提交是成功還是失敗。但由于它是同步和阻塞的,你將花費更多的時間來等待提交完成,這會導(dǎo)致高延遲。
2、如果可以接受某些數(shù)據(jù)不一致并希望具有低延遲,請選擇commitAsync(),因為它不會等待完成。相反,它只會發(fā)出提交請求并稍后處理來自 Kafka 的響應(yīng)(成功或失敗),同時代碼將繼續(xù)執(zhí)行。