大型門戶網站建設需要哪些技術百度號碼認證平臺官網首頁
背景
當flink消費kafka的消息時,我們經常會用到FlinkKafkaConsumer進行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程
FlinkKafkaConsumer水位線發(fā)送
1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時候會構建一個周期性的水位線發(fā)送線程并啟動
// if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}
2.隨后,PeriodicWatermarkEmitter中注冊處理時間定時器,周期性執(zhí)行
public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 這里當前算子任務消費的kafka 分區(qū)分別記錄每個分區(qū)的水位值state.onPeriodicEmit();}//這里當前算子會把自己消費的kafka分區(qū)的所有水位線取最小值后當成當前算子任務自身的水位線發(fā)送出去,注意這里是當前算子任務級別的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}
3.對應state.onPeriodicEmit();記錄每個kafka分區(qū)的水位線方法
@Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代碼如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以這里最終效果只是對應state(kafka分區(qū)[注意,一個算子任務有可能消費好幾個kafka分區(qū)])上設置了水位線/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 這里也可以看出來,即使代碼里面發(fā)送了更小值的水位線,水位線也不會回退this.watermark = Math.max(watermark, this.watermark);return updated;}
4.對應算子任務組合當前任務消費的所有分區(qū)水位線的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任務不消費任何分區(qū),它不會發(fā)出任何水位線,這里是不是就是kafka消費者要小于kafka主題的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果當前算子任務處于空閑時間,標識空閑,以便后續(xù)算子可以繼續(xù)推進underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```