中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

汝州市住房和城鄉(xiāng)規(guī)劃建設(shè)局網(wǎng)站淘寶關(guān)鍵詞排名查詢

汝州市住房和城鄉(xiāng)規(guī)劃建設(shè)局網(wǎng)站,淘寶關(guān)鍵詞排名查詢,國內(nèi)知名的網(wǎng)站設(shè)計公司,網(wǎng)站備案法律法規(guī)背景 FlinkKafkaConsumer支持當(dāng)收到某個kafka分區(qū)中的某條記錄時發(fā)送水位線,比如這條特殊的記錄代表一個完整記錄的結(jié)束等,本文就來解析下發(fā)送punctuated水位線的源碼 punctuated 水位線發(fā)送源碼解析 1.首先KafkaFetcher中的runFetchLoop方法 public…

背景

FlinkKafkaConsumer支持當(dāng)收到某個kafka分區(qū)中的某條記錄時發(fā)送水位線,比如這條特殊的記錄代表一個完整記錄的結(jié)束等,本文就來解析下發(fā)送punctuated水位線的源碼

punctuated 水位線發(fā)送源碼解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任務(wù)消費(fèi)的每個分區(qū)都調(diào)用這個方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法處理當(dāng)前算子任務(wù)對應(yīng)的每個分區(qū)的水位線

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 發(fā)送kafka記錄到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 處理分區(qū)的水位線,記錄這個分區(qū)的水位線,并在滿足條件時更新整個算子任務(wù)的水位線partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.處理每個分區(qū)的水位線```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));對應(yīng)方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每個分區(qū)對應(yīng)的水位線,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 這個表明這個算子任務(wù)的最低水位線,也就是算子任務(wù)級別的水位線,而不是分區(qū)級別的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每個分區(qū)水位線的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是發(fā)送算子任務(wù)級別的水位線的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看這個流程,是不是意味著如果使用Punctuated的方式,是不支持Idle空閑時間的?–答案是的

http://www.risenshineclean.com/news/3368.html

相關(guān)文章:

  • 興義網(wǎng)站seo廣東疫情動態(tài)人民日報
  • 網(wǎng)站推廣鏈接seo研究學(xué)院
  • 做網(wǎng)站 寫腳本是什么百度官方下載
  • 淺談一下網(wǎng)絡(luò)營銷的幾個誤區(qū)東莞seo公司
  • wordpress 文章閱讀數(shù)杭州seo網(wǎng)站建設(shè)
  • 易企秀怎么做網(wǎng)站發(fā)布友情鏈接
  • 同城型網(wǎng)站開發(fā)app優(yōu)化推廣
  • 簡單的cms源碼對網(wǎng)站外部的搜索引擎優(yōu)化
  • 自己做的網(wǎng)站如何連接入數(shù)據(jù)庫產(chǎn)品免費(fèi)推廣網(wǎng)站有哪些
  • 網(wǎng)站建設(shè)相關(guān)的博客有哪些網(wǎng)站推廣途徑
  • 湖南郴州市seo是做什么工作的
  • 智達(dá)世通建設(shè)集團(tuán)有限公司網(wǎng)站廣州市新聞發(fā)布
  • 開發(fā)做網(wǎng)站公司itme收錄優(yōu)美圖片官網(wǎng)
  • 做視頻發(fā)哪個網(wǎng)站賺錢百度數(shù)據(jù)庫
  • 西湖專業(yè)網(wǎng)站設(shè)計公司網(wǎng)站策劃書的撰寫流程
  • 網(wǎng)站定制文章列表項(xiàng)怎么做盤古搜索
  • shopify可以用來做B2B網(wǎng)站嗎百度網(wǎng)頁版電腦版入口
  • 做門窗投標(biāo)網(wǎng)站seo課程在哪培訓(xùn)好
  • 公司名詞解釋百度關(guān)鍵詞排名優(yōu)化
  • web網(wǎng)站開發(fā)完整教程線下推廣公司
  • wordpress安裝在vps百度seo公司興田德潤
  • 平面設(shè)計主要學(xué)什么哪些軟件seo產(chǎn)品推廣
  • 網(wǎng)站設(shè)計與制作教程1百度搜索引擎優(yōu)化怎么做
  • 網(wǎng)站logo如何做鏈接免費(fèi)b2b網(wǎng)站推廣有哪些
  • 做國際網(wǎng)站要多少錢友情鏈接你會回來感謝我
  • 做pc端網(wǎng)站什么開頭必應(yīng)站長平臺
  • 網(wǎng)站出現(xiàn)的的問題搜索引擎優(yōu)化的完整過程
  • 做ppt好的網(wǎng)站有哪些方面汕頭網(wǎng)站建設(shè)推廣
  • 有哪些國外網(wǎng)站做的好的效果圖培訓(xùn)網(wǎng)站有哪些
  • 渭南建網(wǎng)站seo首頁關(guān)鍵詞優(yōu)化