照片做視頻模板下載網(wǎng)站seo外包多少錢
文章目錄
- 一. 調(diào)用StreamTask執(zhí)行Checkpoint操作
- 1. 執(zhí)行Checkpoint總體代碼流程
- 1.1. StreamTask.checkpointState()
- 1.2. executeCheckpointing
- 1.3. 將算子中的狀態(tài)快照操作封裝在OperatorSnapshotFutures中
- 1.4. 算子狀態(tài)進(jìn)行快照
- 1.5. 狀態(tài)數(shù)據(jù)快照持久化
- 二. CheckpointCoordinator管理Checkpoint
- 1. Checkpoint執(zhí)行完畢后的確認(rèn)過程
- 2. 觸發(fā)并完成Checkpoint操作
- 3. 通知CheckpointComplete給TaskExecutor
- 三. 狀態(tài)管理學(xué)習(xí)小結(jié)
上文介紹了CheckpointBarrier的對(duì)齊操作,當(dāng)CheckpointBarrier完成對(duì)齊操作后,接下來就是通過notifyCheckpoint()方法觸發(fā)StreamTask節(jié)點(diǎn)的Checkpoint操作。
一. 調(diào)用StreamTask執(zhí)行Checkpoint操作
如下代碼,notifyCheckpoint()方法主要包含如下邏輯。
> 1. 判斷toNotifyOnCheckpoint不為空。
> 2. 創(chuàng)建CheckpointMetaData和CheckpointMetrics實(shí)例,CheckpointMetaData用于存儲(chǔ)
> Checkpoint的元信息,CheckpointMetrics用于記錄和監(jiān)控Checkpoint監(jiān)控指標(biāo)。
> 3. 觸發(fā)StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {if (toNotifyOnCheckpoint != null) {// 創(chuàng)建CheckpointMetaData對(duì)象用于存儲(chǔ)Meta信息CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());// 創(chuàng)建CheckpointMetrics對(duì)象用于記錄監(jiān)控指標(biāo)CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(bufferedBytes).setAlignmentDurationNanos(alignmentDurationNanos);// 調(diào)用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法觸發(fā)Checkpoint操作toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData,checkpointBarrier.getCheckpointOptions(),checkpointMetrics);}
}
注意:StreamTask是唯一實(shí)現(xiàn)了Checkpoint方法的子類,即只有StreamTask才能觸發(fā)當(dāng)前Task實(shí)例中的Checkpoint操作。
?
接下來具體看Checkpoint執(zhí)行細(xì)節(jié)
1. 執(zhí)行Checkpoint總體代碼流程
Checkpoint觸發(fā)過程分為兩種情況:一種是CheckpointCoordinator周期性地觸發(fā)數(shù)據(jù)源節(jié)點(diǎn)中的Checkpoint操作;另一種是下游算子通過對(duì)齊CheckpointBarrier事件觸發(fā)本節(jié)點(diǎn)算子的Checkpoint操作。
不管是哪種方式觸發(fā)Checkpoint,最終都是調(diào)用StreamTask.performCheckpoint()方法實(shí)現(xiàn)StreamTask實(shí)例中狀態(tài)數(shù)據(jù)的持久化操作。
?
在StreamTask.performCheckpoint()方法中,首先判斷當(dāng)前的Task是否運(yùn)行正常,然后使用actionExecutor線程池執(zhí)行Checkpoint操作,Checkpoint的實(shí)際執(zhí)行過程如下。
- Checkpoint執(zhí)行前的準(zhǔn)備操作,讓OperatorChain中所有的Operator執(zhí)行Pre-barrier工作。
- 將CheckpointBarrier事件發(fā)送到下游的節(jié)點(diǎn)中。
- 算子狀態(tài)數(shù)據(jù)進(jìn)行快照
執(zhí)行checkpointState()方法,對(duì)StreamTask中OperatorChain的所有算子進(jìn)行狀態(tài)數(shù)據(jù)的快照操作,該過程為異步非阻塞過程,不影響數(shù)據(jù)的正常處理進(jìn)程,執(zhí)行完成后會(huì)返回True到CheckpointInputGate中。
- task掛掉情況處理:
- 如果isRunning的條件為false,表明Task不在運(yùn)行狀態(tài),此時(shí)需要給OperatorChain中的所有算子發(fā)送CancelCheckpointMarker消息,這里主要借助recordWriter.broadcastEvent(message)方法向下游算子進(jìn)行事件廣播。
- 當(dāng)且僅當(dāng)OperatorChain中的算子還沒有執(zhí)行完Checkpoint操作的時(shí)候,下游的算子接收到CancelCheckpointMarker消息后會(huì)立即取消Checkpoint操作。
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());final long checkpointId = checkpointMetaData.getCheckpointId();if (isRunning) {// 使用actionExecutor執(zhí)行Checkpoint邏輯actionExecutor.runThrowing(() -> {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}//Checkpoint操作的準(zhǔn)備工作operatorChain.prepareSnapshotPreBarrier(checkpointId);//將checkpoint barrier發(fā)送到下游的stream中operatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);//對(duì)算子中的狀態(tài)進(jìn)行快照操作,此步驟是異步操作,//不影響streaming拓?fù)渲袛?shù)據(jù)的正常處理checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);});return true;} else {// 如果Task處于其他狀態(tài),則向下游廣播CancelCheckpointMarker消息actionExecutor.runThrowing(() -> {final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}
}
?
1.1. StreamTask.checkpointState()
接下來我們看StreamTask.checkpointState()方法的具體實(shí)現(xiàn),如下代碼。
- 創(chuàng)建CheckpointStateOutputStream實(shí)例。主要有如下兩種實(shí)現(xiàn)類:
- FsCheckpointStateOutputStream:文件類型系統(tǒng)
- MemoryCheckpointOutputStream:內(nèi)存的數(shù)據(jù)流輸出。
- 創(chuàng)建CheckpointingOperation實(shí)例,CheckpointingOperation封裝了Checkpoint執(zhí)行的具體操作流程,以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint執(zhí)行過程中需要的環(huán)境配置信息。
- 調(diào)用CheckpointingOperation.executeCheckpointing()方法執(zhí)行Checkpoint操作。
private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {// 創(chuàng)建CheckpointStreamFactory實(shí)例CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());// 創(chuàng)建CheckpointingOperation實(shí)例CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 執(zhí)行Checkpoint操作checkpointingOperation.executeCheckpointing();
}
?
1.2. executeCheckpointing
如代碼所示,CheckpointingOperation.executeCheckpointing()方法主要包含如下邏輯。
- 遍歷所有StreamOperator算子,然后調(diào)用checkpointStreamOperator()方法為每個(gè)算子創(chuàng)建OperatorSnapshotFuture對(duì)象。這一步將所有算子的快照操作存儲(chǔ)在OperatorSnapshotFutures集合中。
- 將OperatorSnapshotFutures存儲(chǔ)到operatorSnapshotsInProgress的鍵值對(duì)集合中,其中Key為OperatorID,Value為該算子執(zhí)行狀態(tài)快照操作對(duì)應(yīng)的OperatorSnapshotFutures對(duì)象
- 創(chuàng)建AsyncCheckpointRunnable線程對(duì)象,AsyncCheckpointRunnable實(shí)例中包含了創(chuàng)建好的OperatorSnapshotFutures集合。
- 調(diào)用StreamTask.asyncOperationsThreadPool線程池運(yùn)行asyncCheckpointRunnable線程,執(zhí)行operatorSnapshotsInProgress集合中算子的異步快照操作。
public void executeCheckpointing() throws Exception {//通過算子創(chuàng)建執(zhí)行快照操作的OperatorSnapshotFutures對(duì)象for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}// 此處省略部分代碼startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);// 注冊(cè)Closeable操作owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 執(zhí)行asyncCheckpointRunnableowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);}
?
1.3. 將算子中的狀態(tài)快照操作封裝在OperatorSnapshotFutures中
如下代碼,AbstractStreamOperator.snapshotState()方法將當(dāng)前算子的狀態(tài)快照操作封裝在OperatorSnapshotFutures對(duì)象中,然后通過asyncOperationsThreadPool線程池異步觸發(fā)所有的OperatorSnapshotFutures操作,方法主要步驟如下。
- 創(chuàng)建OperatorSnapshotFutures對(duì)象,封裝當(dāng)前算子對(duì)應(yīng)的狀態(tài)快照操作。
- 創(chuàng)建snapshotContext上下文對(duì)象,存儲(chǔ)快照過程需要的上下文信息,并調(diào)用snapshotState()方法執(zhí)行快照操作。
snapshotState()方法由StreamOperator子類實(shí)現(xiàn),例如在AbstractUdfStreamOperator中會(huì)調(diào)用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(),
userFunction)方法執(zhí)行函數(shù)中的狀態(tài)快照操作。
- 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture,專門用于處理原生狀態(tài)數(shù)據(jù)的快照操作。
- 如果operatorStateBackend不為空,則將operatorStateBackend.snapshot()方法塊設(shè)定到OperatorStateManagedFuture中,并注冊(cè)到snapshotInProgress中等待執(zhí)行。
- 如果keyedStateBackend不為空,則將keyedStateBackend.snapshot()方法塊設(shè)定到KeyedStateManagedFuture中,并注冊(cè)到snapshotInProgress中等待執(zhí)行。
- 返回創(chuàng)建的snapshotInProgress異步Future對(duì)象,snapshotInProgress中封裝了當(dāng)前算子需要執(zhí)行的所有快照操作。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {// 獲取KeyGroupRangeKeyGroupRange keyGroupRange = null != keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;// 創(chuàng)建OperatorSnapshotFutures處理對(duì)象OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();// 創(chuàng)建snapshotContext上下文對(duì)象StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables());try {snapshotState(snapshotContext);// 設(shè)定KeyedStateRawFuture和OperatorStateRawFuturesnapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());// 如果operatorStateBackend不為空,設(shè)定OperatorStateManagedFutureif (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}// 如果keyedStateBackend不為空,設(shè)定KeyedStateManagedFutureif (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {// 此處省略部分代碼}return snapshotInProgress;
}
這里可以看出,原生狀態(tài)和管理狀態(tài)
的RunnableFuture對(duì)象會(huì)有所不同
- RawState主要通過從snapshotContext中獲取的RawFuture對(duì)象 管理狀態(tài)的快照操作
- ManagedState主要通過operatorStateBackend和keyedStateBackend進(jìn)行狀態(tài)的管理,并根據(jù)StateBackend的不同實(shí)現(xiàn)將狀態(tài)數(shù)據(jù)寫入內(nèi)存或外部文件系統(tǒng)中。
?
1.4. 算子狀態(tài)進(jìn)行快照
我們知道所有的狀態(tài)快照操作都會(huì)被封裝到OperatorStateManagedFuture對(duì)象中,最終通過AsyncCheckpointRunnable線程觸發(fā)執(zhí)行。
下面我們看AsyncCheckpointRunnable線程的定義。如代碼所示,AsyncCheckpointRunnable.run()方法主要邏輯如下。
- 調(diào)用FileSystemSafetyNet.initializeSafetyNetForThread()方法為當(dāng)前線程初始化文件系統(tǒng)安全網(wǎng),確保數(shù)據(jù)能夠正常寫入。
- 創(chuàng)建TaskStateSnapshot實(shí)例:
創(chuàng)建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates對(duì)應(yīng)的TaskStateSnapshot實(shí)例,其中jobManagerTaskOperatorSubtaskStates用于存儲(chǔ)和記錄發(fā)送給JobManager的Checkpoint數(shù)據(jù),localTaskOperatorSubtaskStates用于存儲(chǔ)TaskExecutor本地的狀態(tài)數(shù)據(jù)。
- 執(zhí)行所有狀態(tài)快照線程操作
遍歷operatorSnapshotsInProgress集合,獲取OperatorSnapshotFutures并創(chuàng)建OperatorSnapshotFinalizer實(shí)例,用于執(zhí)行所有狀態(tài)快照線程操作。在OperatorSnapshotFinalizerz中會(huì)調(diào)用FutureUtils.runIfNotDoneAndGet()方法執(zhí)行KeyedState和OperatorState的快照操作。
- 從finalizedSnapshots中獲取JobManagerOwnedState和TaskLocalState,分別存儲(chǔ)在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。
- 調(diào)用checkpointMetrics對(duì)象記錄Checkpoint執(zhí)行的時(shí)間并匯總到Metric監(jiān)控系統(tǒng)中。
- 如果AsyncCheckpointState為COMPLETED狀態(tài),則調(diào)用reportCompletedSnapshotStates()方法向JobManager匯報(bào)Checkpoint的執(zhí)行結(jié)果。
- 如果出現(xiàn)其他異常情況,則調(diào)用handleExecutionException()方法進(jìn)行處理。
public void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {// 創(chuàng)建TaskStateSnapshotTaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// 創(chuàng)建OperatorSnapshotFinalizer對(duì)象OperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}
}
至此,算子狀態(tài)數(shù)據(jù)快照的邏輯基本完成,算子中的托管狀態(tài)主要借助KeyedStateBackend和OperatorStateBackend管理。
KeyedStateBackend和OperatorStateBackend都實(shí)現(xiàn)了SnapshotStrategy接口,提供了狀態(tài)快照的方法。SnapshotStrategy根據(jù)不同類型存儲(chǔ)后端,主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy兩種類型。
?
1.5. 狀態(tài)數(shù)據(jù)快照持久化
這里我們以HeapSnapshotStrategy為例,介紹在StateBackend中對(duì)狀態(tài)數(shù)據(jù)進(jìn)行狀態(tài)快照持久化操作的步驟。如代碼所示,
HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定義了對(duì)KeyedState以及OperatorState的狀態(tài)處理邏輯。
- 遍歷每個(gè)StateSnapshotRestore。
- 調(diào)用StateSnapshotRestore.stateSnapshot()方法,此時(shí)會(huì)創(chuàng)建StateSnapshot對(duì)象。
- 將創(chuàng)建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中,完成堆內(nèi)存存儲(chǔ)類型KvState的快照操作。
private void processSnapshotMetaInfoForAllStates(List metaInfoSnapshots,Map<StateUID, StateSnapshot> cowStateStableSnapshots,Map<StateUID, Integer> stateNamesToId,Map<String, ? extends StateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType) {for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :registeredStates.entrySet()) {final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if (null != state) {final StateSnapshot stateSnapshot = state.stateSnapshot();metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());cowStateStableSnapshots.put(stateUid, stateSnapshot);}}
}
?
二. CheckpointCoordinator管理Checkpoint
1. Checkpoint執(zhí)行完畢后的確認(rèn)過程
當(dāng)StreamTask中所有的算子完成狀態(tài)數(shù)據(jù)的快照操作后,Task實(shí)例會(huì)立即將TaskStateSnapshot消息發(fā)送到管理節(jié)點(diǎn)的CheckpointCoordinator中,并在CheckpointCoordinator中完成后續(xù)的操作。如圖所示,Checkpoint執(zhí)行完畢后的確認(rèn)過程如下。
- 調(diào)用StreamTask.reportCompletedSnapshotStates
當(dāng)StreamTask中的所有算子都完成快照操作后,會(huì)調(diào)用StreamTask.reportCompletedSnapshotStates()方法將TaskStateSnapshot等Ack消息發(fā)送給TaskStateManager。TaskStateManager封裝了CheckpointCoordinatorGateway,因此可以直接和CheckpointCoordinator組件進(jìn)行RPC通信。
- 消息傳遞
- 將消息傳遞給CheckpointCoordinatorGateway
TaskStateManager通過CheckpointResponder.acknowledgeCheckpoint()方法將acknowledgedTaskStateSnapshot消息傳遞給CheckpointCoordinatorGateway接口實(shí)現(xiàn)者,實(shí)際上就是JobMasterRPC服務(wù)。- 消息傳遞給CheckpointCoordinator
JobMaster接收到RpcCheckpointResponder返回的Ack消息后,會(huì)調(diào)用SchedulerNG.acknowledgeCheckpoint()方法將消息傳遞給調(diào)度器。調(diào)度器會(huì)將Ack消息封裝成AcknowledgeCheckpoint,傳遞給CheckpointCoordinator組件繼續(xù)處理。
- 管理PendingCheckpoint
當(dāng)CheckpointCoordinator接收到AcknowledgeCheckpoint后,會(huì)從pendingCheckpoints集合中獲取對(duì)應(yīng)的PendingCheckpoint,然后判斷當(dāng)前Checkpoint中是否收到AcknowledgedTasks集合所有的Task實(shí)例發(fā)送的Ack確認(rèn)消息。
如果notYetAcknowledgedTasks為空,則調(diào)用completePendingCheckpoint()方法完成當(dāng)前PendingCheckpoint操作,并從pendingCheckpoints集合中移除當(dāng)前的PendingCheckpoint。
- 添加CompletedCheckpoint:
緊接著,PendingCheckpoint會(huì)轉(zhuǎn)換成CompletedCheckpoint,此時(shí)CheckpointCoordinator會(huì)在completedCheckpointStore集合中添加CompletedCheckpoint。
- 通知Checkpoint操作結(jié)束。
CheckpointCoordinator會(huì)遍歷tasksToCommitTo集合中的ExecutionVertex節(jié)點(diǎn)并獲取Execution對(duì)象,然后通過Execution向TaskManagerGateway發(fā)送CheckpointComplete消息,通知所有的Task實(shí)例本次Checkpoint操作結(jié)束。
- 通知同步
當(dāng)TaskExecutor接收到CheckpointComplete消息后,會(huì)從TaskSlotTable中獲取對(duì)應(yīng)的Task實(shí)例,向Task實(shí)例中發(fā)送CheckpointComplete消息。所有實(shí)現(xiàn)CheckpointListener監(jiān)聽器的組件或算子都會(huì)獲取Checkpoint完成的消息,然后完成各自后續(xù)的處理操作。
?
2. 觸發(fā)并完成Checkpoint操作
CheckpointCoordinator組件接收到Task實(shí)例的Ack消息(快照完成了?)后,會(huì)觸發(fā)并完成Checkpoint操作。如代碼PendingCheckpoint.finalizeCheckpoint()方法的具體實(shí)現(xiàn)如下。
1)向sharedStateRegistry中注冊(cè)operatorStates。
2)結(jié)束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint。
3)將completedCheckpoint添加到completedCheckpointStore中,
4)從pendingCheckpoint中移除checkpointId對(duì)應(yīng)的PendingCheckpoint,
并觸發(fā)隊(duì)列中的Checkpoint請(qǐng)求。
5)向所有的ExecutionVertex節(jié)點(diǎn)發(fā)送CheckpointComplete消息,
通知Task實(shí)例本次Checkpoint操作完成。private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// 首先向sharedStateRegistry中注冊(cè)operatorStatesMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());// 對(duì)pendingCheckpoint中的Checkpoint做結(jié)束處理并生成CompletedCheckpointtry {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {// 如果出現(xiàn)異常則中止運(yùn)行并拋出CheckpointExecutionif (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException("Could not finalize the pending checkpoint " +checkpointId + '.',CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}// 當(dāng)完成finalization后,PendingCheckpoint必須被丟棄Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);// 將completedCheckpoint添加到completedCheckpointStore中try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果completed checkpoint存儲(chǔ)出現(xiàn)異常則進(jìn)行清理executor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.",completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最后從pendingCheckpoints中移除checkpointId對(duì)應(yīng)的PendingCheckpointpendingCheckpoints.remove(checkpointId);// 觸發(fā)隊(duì)列中的Checkpoint請(qǐng)求triggerQueuedRequests();}// 記錄checkpointIdrememberRecentCheckpointId(checkpointId);// 清除之前的CheckpointsdropSubsumedCheckpoints(checkpointId);// 計(jì)算和前面Checkpoint操作之間的最低延時(shí)lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());// 通知所有的ExecutionVertex節(jié)點(diǎn)Checkpoint操作完成final long timestamp = completedCheckpoint.getTimestamp();for (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}
?
3. 通知CheckpointComplete給TaskExecutor
當(dāng)TaskExecutor接收到來自CheckpointCoordinator的CheckpointComplete消息后,會(huì)調(diào)用Task.notifyCheckpointComplete()方法將消息傳遞到指定的Task實(shí)例中。Task線程會(huì)將CheckpointComplete消息通知給StreamTask中的算子。
如下代碼,
/**
將notifyCheckpointComplete()轉(zhuǎn)換成RunnableWithException線程并提交到Mailbox中運(yùn)行,且在MailboxExecutor線程模型中獲取和執(zhí)行的優(yōu)先級(jí)是最高的。
最終notifyCheckpointComplete()方法會(huì)在MailboxProcessor中運(yùn)行。
**/public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(() -> notifyCheckpointComplete(checkpointId),"checkpoint %d complete", checkpointId);
}
繼續(xù)具體看StreamTask.notifyCheckpointComplete(),如下代碼:
1)獲取當(dāng)前Task中算子鏈的算子,并發(fā)送Checkpoint完成的消息。
2)獲取TaskStateManager對(duì)象,向其通知Checkpoint完成消息,這里主要調(diào)用
TaskLocalStateStore清理本地?zé)o用的Checkpoint數(shù)據(jù)。
3)如果當(dāng)前Checkpoint是同步的Savepoint操作,直接完成并終止當(dāng)前Task實(shí)例,并調(diào)用
resetSynchronousSavepointId()方法將syncSavepointId重置為空。private void notifyCheckpointComplete(long checkpointId) {try {boolean success = actionExecutor.call(() -> {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());// 獲取當(dāng)前Task中operatorChain所有的Operator,并通知每個(gè)Operator Checkpoint執(zhí)行成功的消息for (StreamOperator<?> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());return true;}});// 獲取TaskStateManager,并通知Checkpoint執(zhí)行完成的消息getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作,則直接完成當(dāng)前Taskif (success && isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to "notify" the internal synchronous savepoint mailbox loop.resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException("Error while confirming checkpoint", e));}
}
算子接收到Checkpoint完成消息后,會(huì)根據(jù)自身需要進(jìn)行后續(xù)的處理,默認(rèn)在AbstractStreamOperator基本實(shí)現(xiàn)類中會(huì)通知keyedStateBackend進(jìn)行后續(xù)操作。
對(duì)于AbstractUdfStreamOperator實(shí)例,會(huì)判斷當(dāng)前userFunction是否實(shí)現(xiàn)了CheckpointListener,如果實(shí)現(xiàn)了,則向UserFucntion通知Checkpoint執(zhí)行完成的信息
例如在FlinkKafkaConsumerBase中會(huì)通過獲取到的Checkpoint完成信息,將Offset提交至Kafka集群,確保消費(fèi)的數(shù)據(jù)已經(jīng)完成處理,詳細(xì)實(shí)現(xiàn)可以參考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}
?
三. 狀態(tài)管理學(xué)習(xí)小結(jié)
通過學(xué)習(xí)狀態(tài)管理的源碼,我們可以再來思考下如下幾個(gè)場(chǎng)景問題,是不是有一點(diǎn)“庖丁解?!钡囊馑?#xff01;
flink中狀態(tài)存在的意義是什么,涉及到哪些場(chǎng)景。
- 實(shí)時(shí)聚合:比如,計(jì)算過去一小時(shí)內(nèi)的平均銷售額。這時(shí),你會(huì)需要使用到Flink的狀態(tài)來存儲(chǔ)過去一小時(shí)內(nèi)的所有銷售數(shù)據(jù)。
- 窗口操作:Flink SQL支持滾動(dòng)窗口、滑動(dòng)窗口、會(huì)話窗口等。這些窗口操作都需要Flink的狀態(tài)來存儲(chǔ)在窗口期限內(nèi)的數(shù)據(jù)。
- 狀態(tài)的持久化與任務(wù)恢復(fù):實(shí)時(shí)任務(wù)掛掉之后,為了快速從上一個(gè)點(diǎn)恢復(fù)任務(wù),可以使用savepoint和checkpoint。
- 多流join:Flink至少存儲(chǔ)一個(gè)流中的數(shù)據(jù),以便于在新的記錄到來時(shí)進(jìn)行匹配。
?
其次通過學(xué)習(xí)Flink狀態(tài)管理相關(guān)源碼,可以進(jìn)一步了解狀態(tài)管理的細(xì)節(jié)操作,為解決更加復(fù)雜的問題打下理論基礎(chǔ)
- 深入理解任務(wù)運(yùn)行過程中,各算子狀態(tài)的流轉(zhuǎn)機(jī)制;
- 快速定位問題:在遇到實(shí)際問題時(shí),能夠快速反應(yīng)出是哪塊邏輯出現(xiàn)了問題;
- 應(yīng)對(duì)故障:狀態(tài)管理和Flink容錯(cuò)機(jī)制相關(guān),可以了解Flink發(fā)生故障時(shí)如何保證狀態(tài)的一致性和可恢復(fù)性
- 二次開發(fā):可以自定義狀態(tài)后端,或者拓展優(yōu)化已有的例如RocksDB狀態(tài)后端等;
- 性能優(yōu)化:了解了Flink是如何有效的處理和管理狀態(tài),就可以優(yōu)化任務(wù)性能,減少資源消耗。
?
參考:《Flink設(shè)計(jì)與實(shí)現(xiàn):核心原理與源碼解析》–張利兵