汝州市住房和城鄉(xiāng)規(guī)劃建設(shè)局網(wǎng)站淘寶關(guān)鍵詞排名查詢
背景
FlinkKafkaConsumer支持當(dāng)收到某個kafka分區(qū)中的某條記錄時發(fā)送水位線,比如這條特殊的記錄代表一個完整記錄的結(jié)束等,本文就來解析下發(fā)送punctuated水位線的源碼
punctuated 水位線發(fā)送源碼解析
1.首先KafkaFetcher中的runFetchLoop方法
public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任務(wù)消費(fèi)的每個分區(qū)都調(diào)用這個方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}
2.查看partitionConsumerRecordsHandler方法處理當(dāng)前算子任務(wù)對應(yīng)的每個分區(qū)的水位線
protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 發(fā)送kafka記錄到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 處理分區(qū)的水位線,記錄這個分區(qū)的水位線,并在滿足條件時更新整個算子任務(wù)的水位線partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.處理每個分區(qū)的水位線```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));對應(yīng)方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每個分區(qū)對應(yīng)的水位線,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 這個表明這個算子任務(wù)的最低水位線,也就是算子任務(wù)級別的水位線,而不是分區(qū)級別的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每個分區(qū)水位線的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}
4.最后是發(fā)送算子任務(wù)級別的水位線的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}
你可以看這個流程,是不是意味著如果使用Punctuated的方式,是不支持Idle空閑時間的?–答案是的