回收網(wǎng)站怎么做百度電視劇風(fēng)云榜
Flink CDC 3.0 Starrocks建表失敗會導(dǎo)致任務(wù)卡主!
現(xiàn)象
StarRocks建表失敗,然后任務(wù)自動重啟,重啟完畢后數(shù)據(jù)回放,jobMaster打印下面日志后,整個任務(wù)會卡主
There are already processing requests. Wait for processing
原因分析
前提概要:可以先閱讀CDC表變更處理流程然后再讀下面會更加清晰
涉及類包括SchemaRegistry
、SchemaOperator
和StarRocksMetadataApplier
類
SchemaRegistry->handleEventFromOperator
方法執(zhí)行建表失敗后會導(dǎo)致任務(wù)重啟,但是jobMaster不會重啟,因此SchemaRegistry.requestHandler.pendingSchemaChanges
無法刪除導(dǎo)致任務(wù)卡主!
public void flushSuccess(TableId tableId, int sinkSubtask) {flushedSinkWriters.add(sinkSubtask);if (flushedSinkWriters.equals(activeSinkWriters)) {LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.",tableId.toString());PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);//執(zhí)行表結(jié)構(gòu)變更操作!applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {//異常會跳過刪除pendingSchame!startNextSchemaChangeRequest();}}
}
//刪除pendingSchemaChanges中已經(jīng)完成的pendingSchame
private void startNextSchemaChangeRequest() {this.pendingSchemaChanges.remove(0);this.flushedSinkWriters.clear();...
}public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(SchemaChangeRequest request) {//歷史pendingSchame未刪除導(dǎo)致,卡主if (pendingSchemaChanges.isEmpty()) {LOG.info("Received schema change event request from table {}. Start to buffer requests for others.",request.getTableId().toString());if (request.getSchemaChangeEvent() instanceof CreateTableEvent&& schemaManager.schemaExists(request.getTableId())) {return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));}CompletableFuture<CoordinationResponse> response =CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));schemaManager.applySchemaChange(request.getSchemaChangeEvent());pendingSchemaChanges.add(new PendingSchemaChange(request, response));pendingSchemaChanges.get(0).startToWaitForReleaseRequest();return response;} else {LOG.info("There are already processing requests. Wait for processing.");CompletableFuture<CoordinationResponse> response = new CompletableFuture<>();pendingSchemaChanges.add(new PendingSchemaChange(request, response));return response;}
}
解決辦法
- 讓建表執(zhí)行成功
- catch住異常,將schame刪除后再異常重啟(未驗證)