中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

天津非常好的網(wǎng)站建設(shè)seo變現(xiàn)培訓(xùn)

天津非常好的網(wǎng)站建設(shè),seo變現(xiàn)培訓(xùn),如何快速建網(wǎng)站,朝陽網(wǎng)絡(luò)推廣版本:kafka-clients-2.0.1.jar 之前想寫個插件修改 kafkaConsumer 消費者的邏輯,根據(jù) header 過濾一些消息。于是需要了解一下 kafkaConsumer 具體是如何拉取消費消息的,確認(rèn)在消費之前過濾掉消息是否會有影響。 下面是相關(guān)的源碼&#xff0…

版本:kafka-clients-2.0.1.jar

之前想寫個插件修改 kafkaConsumer 消費者的邏輯,根據(jù) header 過濾一些消息。于是需要了解一下 kafkaConsumer 具體是如何拉取消費消息的,確認(rèn)在消費之前過濾掉消息是否會有影響。
下面是相關(guān)的源碼,并通過注釋的方式進(jìn)行說明。

先結(jié)論:kafkaConsumer 拉取消息的 offset 是存本地的,根據(jù) offset 拉取消息。開啟自動提交時,會自動提交 offset 到 broker(在一些場景下會手動檢查是否需要提交),防止重啟或reblance時 offset 丟失。而本地保存的 offset 是本地拉取到消息時就更新的,所以自動提交的場景下,在消費前過濾掉消息沒有影響。

拉取消息

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 獲取輕鎖同時檢查非多線程環(huán)境,并檢查 consumer 開啟狀態(tài) (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState  維護(hù)了當(dāng)前消費者訂閱的主題列表的狀態(tài)信息(組、offset等)//   方法判斷是否未訂閱或未分配分區(qū)if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否觸發(fā)了喚醒操作 (調(diào)用了當(dāng)前對象的 wakeup 方法) 通過拋異常的方式退出當(dāng)前方法,(這里是while循環(huán),可能一直在拉取消息,(無新消息時))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分區(qū)分配元數(shù)據(jù)以及offset, remain是用來算剩余時間的// 內(nèi)部邏輯://  1 協(xié)調(diào)器 ConsumerCoordinator.poll 拉取協(xié)調(diào)器事件(期間會發(fā)送心跳、自動提交)//  2 updateFetchPositions 更新positions,(但本地有positions數(shù)據(jù)就不更新,更新完pos后,如果還有缺的,就先使用reset策略,最后異步設(shè)置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 這里終于開始拉取消息了,下面單獨講一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻譯:返回之前,發(fā)送下一個拉取的請求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note:  這里使用攔截器攔截一下,這里可以對消息進(jìn)行修改或過濾,但需要注意commit的問題return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}

關(guān)于 pollForFetches 的邏輯

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先獲取已經(jīng)拉取了的消息,存在就直接返回//  fetcher 內(nèi)部有一個 completedFetches 暫存預(yù)拉取的請求,可解析出 nextLineRecords 用于暫存預(yù)拉取的消息//    從 nextLineRecords 獲取消息時,先判斷一下狀態(tài)(如assigned、paused、position),//      然后獲取到消息后,再更新 subscriptions 中的 position 位置(值為下一個的offset), 注意這個時候還沒commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 沒有預(yù)拉取的消息,發(fā)送拉取請求(實際沒發(fā)) //  先找到partition的leader,檢查可用,檢查沒有待處理的請求,然后從 subscriptions 獲取 position,構(gòu)建ClientRequest暫存//  以及設(shè)置listener (成功則處理結(jié)果入隊列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 輪詢等待,詳見下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 觸發(fā)已完成的請求的回調(diào)處理器  (有一個pendingCompletion的隊列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 處理斷開的連接 (pendingDisconnects隊列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 實際上這里才真正發(fā)出請求。。 前面那個feature只是構(gòu)建request//  前面準(zhǔn)備的 ClientRequest 放在一個 UnsentRequests (內(nèi)部map, key:Node,val: requests)中//  這里面取出來進(jìn)行發(fā)送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 這里主要是判斷是否需要阻塞 poll (timeout是否為0) 如果沒有待完成且判斷應(yīng)該阻塞(completedFetches為空)則阻塞//  poll 里面是從 sockets 里面讀寫數(shù)據(jù)// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 檢查斷開的鏈接,判斷node連接是否斷開,是則從unset中取出對應(yīng)requests,構(gòu)建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再發(fā)一次請求,推測是可能部分 node 的連接在第一次沒有ready (沒ready會進(jìn)行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}

自動提交

提交 offset 是為了防止重啟或 rebalance 后,導(dǎo)致本地 position 丟失無法正常拉取后面的消息。

入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

觸發(fā)邏輯主要是

  • KafkaConsumer#poll 拉消息
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先構(gòu)建請求存 unset 里面,等拉消息的時候再發(fā)出去)
    public void maybeAutoCommitOffsetsAsync(long now) {// 這里用來判斷是否滿足自動提交的間隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}
http://www.risenshineclean.com/news/50691.html

相關(guān)文章:

  • 哪個網(wǎng)站可以做驗證碼兼職軟件開發(fā)平臺
  • 百度 醫(yī)療網(wǎng)站建設(shè)公司做網(wǎng)站需要多少錢
  • php企業(yè)網(wǎng)站demo長沙網(wǎng)站建設(shè)
  • 西安給大學(xué)做網(wǎng)站公司國外瀏覽器搜索引擎入口
  • 錦州 做網(wǎng)站茂名網(wǎng)站建設(shè)制作
  • 昆明網(wǎng)站做谷歌官網(wǎng)下載app
  • 濟(jì)南網(wǎng)站建設(shè)就選搜點網(wǎng)絡(luò)ok線上營銷的方式
  • 公司建設(shè)網(wǎng)站的費用廣告文案經(jīng)典范例200字
  • 開平 做一網(wǎng)站南京seo整站優(yōu)化技術(shù)
  • wordpress4.8內(nèi)存seo自動工具
  • 外貿(mào)客戶管理軟件排名排名優(yōu)化方法
  • 做網(wǎng)站通常用的軟件肇慶網(wǎng)站建設(shè)制作
  • css網(wǎng)站做光暈效果產(chǎn)品推廣方案范例
  • 楊園建設(shè)社區(qū)網(wǎng)站電商seo名詞解釋
  • 多站點網(wǎng)站群的建設(shè)與管理社交媒體營銷
  • 網(wǎng)站開發(fā)的前臺開發(fā)工具武漢seo百度
  • 做垂直網(wǎng)站營銷策劃書格式及范文
  • 簡單自適應(yīng)網(wǎng)站友情鏈接交換平臺有哪些
  • 網(wǎng)站建站費用多少廣州百度關(guān)鍵詞搜索
  • 項目建設(shè)管理福州百度seo排名軟件
  • 大型門戶網(wǎng)站都有抖音關(guān)鍵詞排名優(yōu)化軟件
  • 濟(jì)南最好的網(wǎng)站制作公司哪家好第一設(shè)計
  • wordpress安裝后輸入什么域名電腦系統(tǒng)優(yōu)化軟件排行榜
  • 個人博客網(wǎng)站模板素材百度搜索排名機(jī)制
  • 美食欣賞網(wǎng)站瀏覽器谷歌手機(jī)版下載
  • 網(wǎng)站建站智能系統(tǒng)seo網(wǎng)上課程
  • 微信第三方做網(wǎng)站需要費用嗎如何用google搜索產(chǎn)品關(guān)鍵詞
  • 公司網(wǎng)站上榮譽(yù)墻怎么做西seo優(yōu)化排名
  • 可直接進(jìn)入網(wǎng)站的代碼網(wǎng)絡(luò)營銷策略實施的步驟
  • 寧波專業(yè)網(wǎng)站建設(shè)公司什么是淘寶搜索關(guān)鍵詞