中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

照片做視頻模板下載網(wǎng)站seo外包多少錢

照片做視頻模板下載網(wǎng)站,seo外包多少錢,網(wǎng)站建設(shè)教程所需文字,制作一個(gè)個(gè)人網(wǎng)站文章目錄 一. 調(diào)用StreamTask執(zhí)行Checkpoint操作1. 執(zhí)行Checkpoint總體代碼流程1.1. StreamTask.checkpointState()1.2. executeCheckpointing1.3. 將算子中的狀態(tài)快照操作封裝在OperatorSnapshotFutures中1.4. 算子狀態(tài)進(jìn)行快照1.5. 狀態(tài)數(shù)據(jù)快照持久化 二. CheckpointCoordin…

文章目錄

  • 一. 調(diào)用StreamTask執(zhí)行Checkpoint操作
    • 1. 執(zhí)行Checkpoint總體代碼流程
      • 1.1. StreamTask.checkpointState()
      • 1.2. executeCheckpointing
      • 1.3. 將算子中的狀態(tài)快照操作封裝在OperatorSnapshotFutures中
      • 1.4. 算子狀態(tài)進(jìn)行快照
      • 1.5. 狀態(tài)數(shù)據(jù)快照持久化
  • 二. CheckpointCoordinator管理Checkpoint
    • 1. Checkpoint執(zhí)行完畢后的確認(rèn)過程
    • 2. 觸發(fā)并完成Checkpoint操作
    • 3. 通知CheckpointComplete給TaskExecutor
  • 三. 狀態(tài)管理學(xué)習(xí)小結(jié)

上文介紹了CheckpointBarrier的對(duì)齊操作,當(dāng)CheckpointBarrier完成對(duì)齊操作后,接下來就是通過notifyCheckpoint()方法觸發(fā)StreamTask節(jié)點(diǎn)的Checkpoint操作。

一. 調(diào)用StreamTask執(zhí)行Checkpoint操作

如下代碼,notifyCheckpoint()方法主要包含如下邏輯。

> 1. 判斷toNotifyOnCheckpoint不為空。
> 2. 創(chuàng)建CheckpointMetaDataCheckpointMetrics實(shí)例,CheckpointMetaData用于存儲(chǔ)
> Checkpoint的元信息,CheckpointMetrics用于記錄和監(jiān)控Checkpoint監(jiān)控指標(biāo)。
> 3. 觸發(fā)StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {if (toNotifyOnCheckpoint != null) {// 創(chuàng)建CheckpointMetaData對(duì)象用于存儲(chǔ)Meta信息CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());// 創(chuàng)建CheckpointMetrics對(duì)象用于記錄監(jiān)控指標(biāo)CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(bufferedBytes).setAlignmentDurationNanos(alignmentDurationNanos);// 調(diào)用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法觸發(fā)Checkpoint操作toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData,checkpointBarrier.getCheckpointOptions(),checkpointMetrics);}
}

注意:StreamTask是唯一實(shí)現(xiàn)了Checkpoint方法的子類,即只有StreamTask才能觸發(fā)當(dāng)前Task實(shí)例中的Checkpoint操作。

?

接下來具體看Checkpoint執(zhí)行細(xì)節(jié)

1. 執(zhí)行Checkpoint總體代碼流程

Checkpoint觸發(fā)過程分為兩種情況:一種是CheckpointCoordinator周期性地觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)中的Checkpoint操作;另一種是下游算子通過對(duì)齊CheckpointBarrier事件觸發(fā)本節(jié)點(diǎn)算子的Checkpoint操作。

不管是哪種方式觸發(fā)Checkpoint,最終都是調(diào)用StreamTask.performCheckpoint()方法實(shí)現(xiàn)StreamTask實(shí)例中狀態(tài)數(shù)據(jù)的持久化操作。

?

在StreamTask.performCheckpoint()方法中,首先判斷當(dāng)前的Task是否運(yùn)行正常,然后使用actionExecutor線程池執(zhí)行Checkpoint操作,Checkpoint的實(shí)際執(zhí)行過程如下。

  1. Checkpoint執(zhí)行前的準(zhǔn)備操作,讓OperatorChain中所有的Operator執(zhí)行Pre-barrier工作。
  2. 將CheckpointBarrier事件發(fā)送到下游的節(jié)點(diǎn)中。
  3. 算子狀態(tài)數(shù)據(jù)進(jìn)行快照

執(zhí)行checkpointState()方法,對(duì)StreamTask中OperatorChain的所有算子進(jìn)行狀態(tài)數(shù)據(jù)的快照操作,該過程為異步非阻塞過程,不影響數(shù)據(jù)的正常處理進(jìn)程,執(zhí)行完成后會(huì)返回True到CheckpointInputGate中。

  1. task掛掉情況處理:
  • 如果isRunning的條件為false,表明Task不在運(yùn)行狀態(tài),此時(shí)需要給OperatorChain中的所有算子發(fā)送CancelCheckpointMarker消息,這里主要借助recordWriter.broadcastEvent(message)方法向下游算子進(jìn)行事件廣播。
  • 當(dāng)且僅當(dāng)OperatorChain中的算子還沒有執(zhí)行完Checkpoint操作的時(shí)候,下游的算子接收到CancelCheckpointMarker消息后會(huì)立即取消Checkpoint操作。
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());final long checkpointId = checkpointMetaData.getCheckpointId();if (isRunning) {// 使用actionExecutor執(zhí)行Checkpoint邏輯actionExecutor.runThrowing(() -> {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}//Checkpoint操作的準(zhǔn)備工作operatorChain.prepareSnapshotPreBarrier(checkpointId);//將checkpoint barrier發(fā)送到下游的stream中operatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);//對(duì)算子中的狀態(tài)進(jìn)行快照操作,此步驟是異步操作,//不影響streaming拓?fù)渲袛?shù)據(jù)的正常處理checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);});return true;} else {// 如果Task處于其他狀態(tài),則向下游廣播CancelCheckpointMarker消息actionExecutor.runThrowing(() -> {final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}
}

?

1.1. StreamTask.checkpointState()

接下來我們看StreamTask.checkpointState()方法的具體實(shí)現(xiàn),如下代碼。

  1. 創(chuàng)建CheckpointStateOutputStream實(shí)例。主要有如下兩種實(shí)現(xiàn)類:
    • FsCheckpointStateOutputStream:文件類型系統(tǒng)
    • MemoryCheckpointOutputStream:內(nèi)存的數(shù)據(jù)流輸出。
  2. 創(chuàng)建CheckpointingOperation實(shí)例,CheckpointingOperation封裝了Checkpoint執(zhí)行的具體操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint執(zhí)行過程中需要的環(huán)境配置信息。
  3. 調(diào)用CheckpointingOperation.executeCheckpointing()方法執(zhí)行Checkpoint操作。
private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {// 創(chuàng)建CheckpointStreamFactory實(shí)例CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());// 創(chuàng)建CheckpointingOperation實(shí)例CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 執(zhí)行Checkpoint操作checkpointingOperation.executeCheckpointing();
}

?

1.2. executeCheckpointing

如代碼所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下邏輯。

  1. 遍歷所有StreamOperator算子,然后調(diào)用checkpointStreamOperator()方法為每個(gè)算子創(chuàng)建OperatorSnapshotFuture對(duì)象。這一步將所有算子的快照操作存儲(chǔ)在OperatorSnapshotFutures集合中。
  2. 將OperatorSnapshotFutures存儲(chǔ)到operatorSnapshotsInProgress的鍵值對(duì)集合中,其中Key為OperatorID,Value為該算子執(zhí)行狀態(tài)快照操作對(duì)應(yīng)的OperatorSnapshotFutures對(duì)象
  3. 創(chuàng)建AsyncCheckpointRunnable線程對(duì)象,AsyncCheckpointRunnable實(shí)例中包含了創(chuàng)建好的OperatorSnapshotFutures集合。
  4. 調(diào)用StreamTask.asyncOperationsThreadPool線程池運(yùn)行asyncCheckpointRunnable線程,執(zhí)行operatorSnapshotsInProgress集合中算子的異步快照操作。
public void executeCheckpointing() throws Exception {//通過算子創(chuàng)建執(zhí)行快照操作的OperatorSnapshotFutures對(duì)象for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// 此處省略部分代碼startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);// 注冊(cè)Closeable操作owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 執(zhí)行asyncCheckpointRunnableowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);}

?

1.3. 將算子中的狀態(tài)快照操作封裝在OperatorSnapshotFutures中

如下代碼,AbstractStreamOperator.snapshotState()方法將當(dāng)前算子的狀態(tài)快照操作封裝在OperatorSnapshotFutures對(duì)象中,然后通過asyncOperationsThreadPool線程池異步觸發(fā)所有的OperatorSnapshotFutures操作,方法主要步驟如下。

  1. 創(chuàng)建OperatorSnapshotFutures對(duì)象,封裝當(dāng)前算子對(duì)應(yīng)的狀態(tài)快照操作。
  2. 創(chuàng)建snapshotContext上下文對(duì)象,存儲(chǔ)快照過程需要的上下文信息,并調(diào)用snapshotState()方法執(zhí)行快照操作。

snapshotState()方法由StreamOperator子類實(shí)現(xiàn),例如在AbstractUdfStreamOperator中會(huì)調(diào)用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),
userFunction)方法執(zhí)行函數(shù)中的狀態(tài)快照操作。

  1. 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,專門用于處理原生狀態(tài)數(shù)據(jù)的快照操作。
  • 如果operatorStateBackend不為空,則將operatorStateBackend.snapshot()方法塊設(shè)定到OperatorStateManagedFuture中,并注冊(cè)到snapshotInProgress中等待執(zhí)行。
  • 如果keyedStateBackend不為空,則將keyedStateBackend.snapshot()方法塊設(shè)定到KeyedStateManagedFuture中,并注冊(cè)到snapshotInProgress中等待執(zhí)行。
  1. 返回創(chuàng)建的snapshotInProgress異步Future對(duì)象,snapshotInProgress中封裝了當(dāng)前算子需要執(zhí)行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {// 獲取KeyGroupRangeKeyGroupRange keyGroupRange = null != keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;// 創(chuàng)建OperatorSnapshotFutures處理對(duì)象OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();// 創(chuàng)建snapshotContext上下文對(duì)象StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables());try {snapshotState(snapshotContext);// 設(shè)定KeyedStateRawFuture和OperatorStateRawFuturesnapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());// 如果operatorStateBackend不為空,設(shè)定OperatorStateManagedFutureif (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}// 如果keyedStateBackend不為空,設(shè)定KeyedStateManagedFutureif (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {// 此處省略部分代碼}return snapshotInProgress;
}

這里可以看出,原生狀態(tài)和管理狀態(tài)的RunnableFuture對(duì)象會(huì)有所不同

  • RawState主要通過從snapshotContext中獲取的RawFuture對(duì)象 管理狀態(tài)的快照操作
  • ManagedState主要通過operatorStateBackend和keyedStateBackend進(jìn)行狀態(tài)的管理,并根據(jù)StateBackend的不同實(shí)現(xiàn)將狀態(tài)數(shù)據(jù)寫入內(nèi)存或外部文件系統(tǒng)中。

?

1.4. 算子狀態(tài)進(jìn)行快照

我們知道所有的狀態(tài)快照操作都會(huì)被封裝到OperatorStateManagedFuture對(duì)象中,最終通過AsyncCheckpointRunnable線程觸發(fā)執(zhí)行。

下面我們看AsyncCheckpointRunnable線程的定義。如代碼所示,AsyncCheckpointRunnable.run()方法主要邏輯如下。

  1. 調(diào)用FileSystemSafetyNet.initializeSafetyNetForThread()方法為當(dāng)前線程初始化文件系統(tǒng)安全網(wǎng),確保數(shù)據(jù)能夠正常寫入。
  2. 創(chuàng)建TaskStateSnapshot實(shí)例:

創(chuàng)建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates對(duì)應(yīng)的TaskStateSnapshot實(shí)例,其中jobManagerTaskOperatorSubtaskStates用于存儲(chǔ)和記錄發(fā)送給JobManager的Checkpoint數(shù)據(jù),localTaskOperatorSubtaskStates用于存儲(chǔ)TaskExecutor本地的狀態(tài)數(shù)據(jù)。

  1. 執(zhí)行所有狀態(tài)快照線程操作

遍歷operatorSnapshotsInProgress集合,獲取OperatorSnapshotFutures并創(chuàng)建OperatorSnapshotFinalizer實(shí)例,用于執(zhí)行所有狀態(tài)快照線程操作。在OperatorSnapshotFinalizerz中會(huì)調(diào)用FutureUtils.runIfNotDoneAndGet()方法執(zhí)行KeyedState和OperatorState的快照操作。

  1. 從finalizedSnapshots中獲取JobManagerOwnedState和TaskLocalState,分別存儲(chǔ)在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
  2. 調(diào)用checkpointMetrics對(duì)象記錄Checkpoint執(zhí)行的時(shí)間并匯總到Metric監(jiān)控系統(tǒng)中。
  3. 如果AsyncCheckpointState為COMPLETED狀態(tài),則調(diào)用reportCompletedSnapshotStates()方法向JobManager匯報(bào)Checkpoint的執(zhí)行結(jié)果。
  4. 如果出現(xiàn)其他異常情況,則調(diào)用handleExecutionException()方法進(jìn)行處理。
public void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {// 創(chuàng)建TaskStateSnapshotTaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// 創(chuàng)建OperatorSnapshotFinalizer對(duì)象OperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}
}

至此,算子狀態(tài)數(shù)據(jù)快照的邏輯基本完成,算子中的托管狀態(tài)主要借助KeyedStateBackend和OperatorStateBackend管理。

KeyedStateBackend和OperatorStateBackend都實(shí)現(xiàn)了SnapshotStrategy接口,提供了狀態(tài)快照的方法。SnapshotStrategy根據(jù)不同類型存儲(chǔ)后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy兩種類型。

?

1.5. 狀態(tài)數(shù)據(jù)快照持久化

這里我們以HeapSnapshotStrategy為例,介紹在StateBackend中對(duì)狀態(tài)數(shù)據(jù)進(jìn)行狀態(tài)快照持久化操作的步驟。如代碼所示,

HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定義了對(duì)KeyedState以及OperatorState的狀態(tài)處理邏輯。

  1. 遍歷每個(gè)StateSnapshotRestore。
  2. 調(diào)用StateSnapshotRestore.stateSnapshot()方法,此時(shí)會(huì)創(chuàng)建StateSnapshot對(duì)象。
  3. 將創(chuàng)建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆內(nèi)存存儲(chǔ)類型KvState的快照操作。
private void processSnapshotMetaInfoForAllStates(List metaInfoSnapshots,Map<StateUID, StateSnapshot> cowStateStableSnapshots,Map<StateUID, Integer> stateNamesToId,Map<String, ? extends StateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType) {for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :registeredStates.entrySet()) {final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if (null != state) {final StateSnapshot stateSnapshot = state.stateSnapshot();metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());cowStateStableSnapshots.put(stateUid, stateSnapshot);}}
}

?

二. CheckpointCoordinator管理Checkpoint

1. Checkpoint執(zhí)行完畢后的確認(rèn)過程

當(dāng)StreamTask中所有的算子完成狀態(tài)數(shù)據(jù)的快照操作后,Task實(shí)例會(huì)立即將TaskStateSnapshot消息發(fā)送到管理節(jié)點(diǎn)的CheckpointCoordinator中,并在CheckpointCoordinator中完成后續(xù)的操作。如圖所示,Checkpoint執(zhí)行完畢后的確認(rèn)過程如下。

在這里插入圖片描述

  1. 調(diào)用StreamTask.reportCompletedSnapshotStates

當(dāng)StreamTask中的所有算子都完成快照操作后,會(huì)調(diào)用StreamTask.reportCompletedSnapshotStates()方法將TaskStateSnapshot等Ack消息發(fā)送給TaskStateManager。TaskStateManager封裝了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator組件進(jìn)行RPC通信。

  1. 消息傳遞
  • 將消息傳遞給CheckpointCoordinatorGateway
    TaskStateManager通過CheckpointResponder.acknowledgeCheckpoint()方法將acknowledgedTaskStateSnapshot消息傳遞給CheckpointCoordinatorGateway接口實(shí)現(xiàn)者,實(shí)際上就是JobMasterRPC服務(wù)。
  • 消息傳遞給CheckpointCoordinator
    JobMaster接收到RpcCheckpointResponder返回的Ack消息后,會(huì)調(diào)用SchedulerNG.acknowledgeCheckpoint()方法將消息傳遞給調(diào)度器。調(diào)度器會(huì)將Ack消息封裝成AcknowledgeCheckpoint,傳遞給CheckpointCoordinator組件繼續(xù)處理。
  1. 管理PendingCheckpoint

當(dāng)CheckpointCoordinator接收到AcknowledgeCheckpoint后,會(huì)從pendingCheckpoints集合中獲取對(duì)應(yīng)的PendingCheckpoint,然后判斷當(dāng)前Checkpoint中是否收到AcknowledgedTasks集合所有的Task實(shí)例發(fā)送的Ack確認(rèn)消息。
如果notYetAcknowledgedTasks為空,則調(diào)用completePendingCheckpoint()方法完成當(dāng)前PendingCheckpoint操作,并從pendingCheckpoints集合中移除當(dāng)前的PendingCheckpoint。

  1. 添加CompletedCheckpoint:

緊接著,PendingCheckpoint會(huì)轉(zhuǎn)換成CompletedCheckpoint,此時(shí)CheckpointCoordinator會(huì)在completedCheckpointStore集合中添加CompletedCheckpoint。

  1. 通知Checkpoint操作結(jié)束。

CheckpointCoordinator會(huì)遍歷tasksToCommitTo集合中的ExecutionVertex節(jié)點(diǎn)并獲取Execution對(duì)象,然后通過Execution向TaskManagerGateway發(fā)送CheckpointComplete消息,通知所有的Task實(shí)例本次Checkpoint操作結(jié)束。

  1. 通知同步

當(dāng)TaskExecutor接收到CheckpointComplete消息后,會(huì)從TaskSlotTable中獲取對(duì)應(yīng)的Task實(shí)例,向Task實(shí)例中發(fā)送CheckpointComplete消息。所有實(shí)現(xiàn)CheckpointListener監(jiān)聽器的組件或算子都會(huì)獲取Checkpoint完成的消息,然后完成各自后續(xù)的處理操作。

?

2. 觸發(fā)并完成Checkpoint操作

CheckpointCoordinator組件接收到Task實(shí)例的Ack消息(快照完成了?)后,會(huì)觸發(fā)并完成Checkpoint操作。如代碼PendingCheckpoint.finalizeCheckpoint()方法的具體實(shí)現(xiàn)如下。

1)向sharedStateRegistry中注冊(cè)operatorStates。
2)結(jié)束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint3)將completedCheckpoint添加到completedCheckpointStore中,
4)從pendingCheckpoint中移除checkpointId對(duì)應(yīng)的PendingCheckpoint,
并觸發(fā)隊(duì)列中的Checkpoint請(qǐng)求。
5)向所有的ExecutionVertex節(jié)點(diǎn)發(fā)送CheckpointComplete消息,
通知Task實(shí)例本次Checkpoint操作完成。private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// 首先向sharedStateRegistry中注冊(cè)operatorStatesMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());// 對(duì)pendingCheckpoint中的Checkpoint做結(jié)束處理并生成CompletedCheckpointtry {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {// 如果出現(xiàn)異常則中止運(yùn)行并拋出CheckpointExecutionif (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " +checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}// 當(dāng)完成finalization后,PendingCheckpoint必須被丟棄Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);// 將completedCheckpoint添加到completedCheckpointStore中try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果completed checkpoint存儲(chǔ)出現(xiàn)異常則進(jìn)行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.",completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最后從pendingCheckpoints中移除checkpointId對(duì)應(yīng)的PendingCheckpointpendingCheckpoints.remove(checkpointId);// 觸發(fā)隊(duì)列中的Checkpoint請(qǐng)求triggerQueuedRequests();}// 記錄checkpointIdrememberRecentCheckpointId(checkpointId);// 清除之前的CheckpointsdropSubsumedCheckpoints(checkpointId);// 計(jì)算和前面Checkpoint操作之間的最低延時(shí)lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());// 通知所有的ExecutionVertex節(jié)點(diǎn)Checkpoint操作完成final long timestamp = completedCheckpoint.getTimestamp();for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}

?

3. 通知CheckpointComplete給TaskExecutor

當(dāng)TaskExecutor接收到來自CheckpointCoordinator的CheckpointComplete消息后,會(huì)調(diào)用Task.notifyCheckpointComplete()方法將消息傳遞到指定的Task實(shí)例中。Task線程會(huì)將CheckpointComplete消息通知給StreamTask中的算子。

如下代碼,

/**
將notifyCheckpointComplete()轉(zhuǎn)換成RunnableWithException線程并提交到Mailbox中運(yùn)行,且在MailboxExecutor線程模型中獲取和執(zhí)行的優(yōu)先級(jí)是最高的。
最終notifyCheckpointComplete()方法會(huì)在MailboxProcessor中運(yùn)行。
**/public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}

繼續(xù)具體看StreamTask.notifyCheckpointComplete(),如下代碼:

1)獲取當(dāng)前Task中算子鏈的算子,并發(fā)送Checkpoint完成的消息。
2)獲取TaskStateManager對(duì)象,向其通知Checkpoint完成消息,這里主要調(diào)用
TaskLocalStateStore清理本地?zé)o用的Checkpoint數(shù)據(jù)。
3)如果當(dāng)前Checkpoint是同步的Savepoint操作,直接完成并終止當(dāng)前Task實(shí)例,并調(diào)用
resetSynchronousSavepointId()方法將syncSavepointId重置為空。private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 獲取當(dāng)前Task中operatorChain所有的Operator,并通知每個(gè)Operator Checkpoint執(zhí)行成功的消息for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 獲取TaskStateManager,并通知Checkpoint執(zhí)行完成的消息getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,則直接完成當(dāng)前Taskif (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}

算子接收到Checkpoint完成消息后,會(huì)根據(jù)自身需要進(jìn)行后續(xù)的處理,默認(rèn)在AbstractStreamOperator基本實(shí)現(xiàn)類中會(huì)通知keyedStateBackend進(jìn)行后續(xù)操作。

對(duì)于AbstractUdfStreamOperator實(shí)例,會(huì)判斷當(dāng)前userFunction是否實(shí)現(xiàn)了CheckpointListener,如果實(shí)現(xiàn)了,則向UserFucntion通知Checkpoint執(zhí)行完成的信息

例如在FlinkKafkaConsumerBase中會(huì)通過獲取到的Checkpoint完成信息,將Offset提交至Kafka集群,確保消費(fèi)的數(shù)據(jù)已經(jīng)完成處理,詳細(xì)實(shí)現(xiàn)可以參考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}

?

三. 狀態(tài)管理學(xué)習(xí)小結(jié)

通過學(xué)習(xí)狀態(tài)管理的源碼,我們可以再來思考下如下幾個(gè)場(chǎng)景問題,是不是有一點(diǎn)“庖丁解?!钡囊馑?#xff01;

flink中狀態(tài)存在的意義是什么,涉及到哪些場(chǎng)景。

  1. 實(shí)時(shí)聚合:比如,計(jì)算過去一小時(shí)內(nèi)的平均銷售額。這時(shí),你會(huì)需要使用到Flink的狀態(tài)來存儲(chǔ)過去一小時(shí)內(nèi)的所有銷售數(shù)據(jù)。
  2. 窗口操作:Flink SQL支持滾動(dòng)窗口、滑動(dòng)窗口、會(huì)話窗口等。這些窗口操作都需要Flink的狀態(tài)來存儲(chǔ)在窗口期限內(nèi)的數(shù)據(jù)。
  3. 狀態(tài)的持久化與任務(wù)恢復(fù):實(shí)時(shí)任務(wù)掛掉之后,為了快速從上一個(gè)點(diǎn)恢復(fù)任務(wù),可以使用savepoint和checkpoint。
  4. 多流join:Flink至少存儲(chǔ)一個(gè)流中的數(shù)據(jù),以便于在新的記錄到來時(shí)進(jìn)行匹配。

?

其次通過學(xué)習(xí)Flink狀態(tài)管理相關(guān)源碼,可以進(jìn)一步了解狀態(tài)管理的細(xì)節(jié)操作,為解決更加復(fù)雜的問題打下理論基礎(chǔ)

  1. 深入理解任務(wù)運(yùn)行過程中,各算子狀態(tài)的流轉(zhuǎn)機(jī)制;
  2. 快速定位問題:在遇到實(shí)際問題時(shí),能夠快速反應(yīng)出是哪塊邏輯出現(xiàn)了問題;
  3. 應(yīng)對(duì)故障:狀態(tài)管理和Flink容錯(cuò)機(jī)制相關(guān),可以了解Flink發(fā)生故障時(shí)如何保證狀態(tài)的一致性和可恢復(fù)性
  4. 二次開發(fā):可以自定義狀態(tài)后端,或者拓展優(yōu)化已有的例如RocksDB狀態(tài)后端等;
  5. 性能優(yōu)化:了解了Flink是如何有效的處理和管理狀態(tài),就可以優(yōu)化任務(wù)性能,減少資源消耗。

?

參考:《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》–張利兵

http://www.risenshineclean.com/news/58012.html

相關(guān)文章:

  • 深圳 服裝 網(wǎng)站建設(shè)青島百度網(wǎng)站排名
  • 清溪網(wǎng)站仿做海外推廣專員
  • 網(wǎng)站建設(shè) 模板中心如何創(chuàng)建網(wǎng)站?
  • 怎么做響應(yīng)式網(wǎng)站搜索百度
  • wordpress閉站網(wǎng)站營銷外包哪家專業(yè)
  • 中國建設(shè)銀行手機(jī)appwindows優(yōu)化大師是哪個(gè)公司的
  • 網(wǎng)站內(nèi)容被攻擊該怎么做安徽seo優(yōu)化
  • 淘寶做鏈接的網(wǎng)站長沙seo霜天
  • 自適應(yīng)網(wǎng)站建設(shè)推薦淘寶站內(nèi)推廣方式有哪些
  • 蘇州市網(wǎng)站優(yōu)化優(yōu)化大師是什么軟件
  • 怎么做免費(fèi)推廣網(wǎng)站武漢新聞最新消息
  • 江蘇網(wǎng)站制作企業(yè)最近新聞報(bào)道
  • 美食網(wǎng)站首頁模板自己怎么開電商平臺(tái)
  • 深圳別墅設(shè)計(jì)公司首選優(yōu)化設(shè)計(jì)六年級(jí)下冊(cè)數(shù)學(xué)答案
  • 長安東莞網(wǎng)站推廣線上推廣平臺(tái)有哪些
  • 尋找鄭州網(wǎng)站建設(shè)公司網(wǎng)站推廣廣告
  • 博州住房和城鄉(xiāng)建設(shè)局網(wǎng)站網(wǎng)頁設(shè)計(jì)自學(xué)要多久
  • 做動(dòng)漫游戲網(wǎng)站網(wǎng)站怎么創(chuàng)建
  • 前端做項(xiàng)目有哪些網(wǎng)站全網(wǎng)營銷圖片
  • 網(wǎng)站建設(shè)支出賬務(wù)處理推廣形式
  • 如何快速做企業(yè)網(wǎng)站包括商城網(wǎng)頁分析工具
  • 輕淘客一鍵做網(wǎng)站seo關(guān)鍵詞優(yōu)化技巧
  • 那個(gè)網(wǎng)站做720度效果圖互聯(lián)網(wǎng)推廣員是做什么
  • 網(wǎng)站建設(shè)公司排行杭州什么平臺(tái)推廣效果最好
  • 凡科建站電腦版網(wǎng)址奉節(jié)縣關(guān)鍵詞seo排名優(yōu)化
  • 網(wǎng)站系統(tǒng)開發(fā)畢業(yè)設(shè)計(jì)企業(yè)郵箱注冊(cè)
  • 日本食品包裝設(shè)計(jì)圖片大全搜狗搜索引擎優(yōu)化論文
  • cmsv6官方免費(fèi)下載seo優(yōu)化排名怎么做
  • 怎么把自己做的網(wǎng)站登錄到網(wǎng)上北京百度網(wǎng)訊人工客服電話
  • 外貿(mào)seo網(wǎng)站建設(shè)排名優(yōu)化關(guān)鍵詞公司