代理機(jī)構(gòu)做的網(wǎng)站找不到人了怎么辦谷歌瀏覽器 安卓下載
Flink協(xié)調(diào)器Coordinator及自定義Operator
最近的項(xiàng)目開發(fā)過(guò)程中,使用到了Flink中的協(xié)調(diào)器以及自定義算子相關(guān)的內(nèi)容,本篇文章主要介紹Flink中的協(xié)調(diào)器是什么,如何用,以及協(xié)調(diào)器與算子間的交互。
協(xié)調(diào)器Coordinator
Flink中的協(xié)調(diào)器是用來(lái)協(xié)調(diào)運(yùn)行時(shí)的算子,運(yùn)行在JobManager中,通過(guò)事件的方式與算子通信。例如Source和Sink算子中的協(xié)調(diào)器是用來(lái)發(fā)現(xiàn)和分配工作或者聚合和提交元數(shù)據(jù)。
線程模型
所有協(xié)調(diào)器方法都由作業(yè)管理器的主線程(郵箱線程)調(diào)用。這意味著這些方法在任何情況下都不得執(zhí)行阻塞操作(如 I/ O 或等待鎖或或Futures)。這很有可能使整個(gè) JobManager 癱瘓。
因此,涉及更復(fù)雜操作的協(xié)調(diào)器應(yīng)生成線程來(lái)處理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地從另一個(gè)線程調(diào)用,而不是從調(diào)用協(xié)調(diào)器方法的線程調(diào)用。
一致性
與調(diào)度程序的視圖相比,協(xié)調(diào)器對(duì)任務(wù)執(zhí)行的視圖高度簡(jiǎn)化,但允許與在并行子任務(wù)上運(yùn)行的操作員進(jìn)行一致的交互。具體而言,保證嚴(yán)格按順序調(diào)用以下方法:
- executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任務(wù)就緒的時(shí)候調(diào)用一次。SubtaskGateway是用來(lái)與子任務(wù)交互的網(wǎng)關(guān)。這是與子任務(wù)嘗試交互的開始。
executionAttemptFailed(int, int, Throwable):在嘗試失敗或取消后立即調(diào)用每個(gè)子任務(wù)。此時(shí),應(yīng)停止與子任務(wù)嘗試的交互。 - subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦調(diào)度程序確定了要還原的檢查點(diǎn),這些方法就會(huì)通知協(xié)調(diào)器。前一種方法在發(fā)生區(qū)域故障/ 恢復(fù)(可能影響子任務(wù)的子集)時(shí)調(diào)用,后一種方法在全局故障/ 恢復(fù)的情況下調(diào)用。此方法應(yīng)用于確定要恢復(fù)的操作,因?yàn)樗鼤?huì)告訴要回退到哪個(gè)檢查點(diǎn)。協(xié)調(diào)器實(shí)現(xiàn)需要恢復(fù)自還原的檢查點(diǎn)以來(lái)與相關(guān)任務(wù)的交互。只有在子任務(wù)的所有嘗試被調(diào)用后 executionAttemptFailed(int, int, Throwable) ,才會(huì)調(diào)用它。
- executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢復(fù)的任務(wù)(新嘗試)準(zhǔn)備就緒后再次調(diào)用。這晚于 subtaskReset(int, long),因?yàn)樵谶@些方法之間,會(huì)計(jì)劃和部署新的嘗試。
接口方法說(shuō)明
實(shí)現(xiàn)自定義的協(xié)調(diào)器需要實(shí)現(xiàn)OperatorCoordinator接口方法,各方法說(shuō)明如下所示:
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {// ------------------------------------------------------------------------/*** 啟動(dòng)協(xié)調(diào)器,啟動(dòng)時(shí)調(diào)用一次當(dāng)前方法在所有方法之前* 此方法拋出的異常都會(huì)導(dǎo)致當(dāng)前作業(yè)失敗*/void start() throws Exception;/*** 釋放協(xié)調(diào)器時(shí)調(diào)用當(dāng)前方法,此方法應(yīng)當(dāng)釋放持有的資源* 此方法拋出的異常不會(huì)導(dǎo)致作業(yè)失敗*/@Overridevoid close() throws Exception;// ------------------------------------------------------------------------/*** 處理來(lái)自并行算子實(shí)例的事件* 此方法拋出的異常會(huì)導(dǎo)致作業(yè)失敗并恢復(fù)*/void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)throws Exception;// ------------------------------------------------------------------------/*** 為協(xié)調(diào)器做checkpoint,將當(dāng)前協(xié)調(diào)器中的狀態(tài)序列化到checkpoint中,執(zhí)行成功需要調(diào)用CompletableFuture的complete方法,失敗需要調(diào)用CompletableFuture的completeExceptionally方法*/void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)throws Exception;/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the* method.*/@Overridevoid notifyCheckpointComplete(long checkpointId);/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the* method.*/@Overridedefault void notifyCheckpointAborted(long checkpointId) {}/*** 從checkpoint重置當(dāng)前的協(xié)調(diào)器*/void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;// ------------------------------------------------------------------------/*** 子任務(wù)重置時(shí)調(diào)用此方法*/void subtaskReset(int subtask, long checkpointId);/*** 子任務(wù)失敗時(shí)調(diào)用此方法 */void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);/*** 子任務(wù)就緒時(shí)調(diào)用此方法*/void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}
算子Operator
Flink中執(zhí)行計(jì)算任務(wù)的算子,像使用DataStream API時(shí)調(diào)用的map、flatmap、process傳入的自定義函數(shù)最終都會(huì)封裝為一個(gè)一個(gè)的算子。使用UDF已經(jīng)能夠滿足大多數(shù)的開發(fā)場(chǎng)景,但涉及到與協(xié)調(diào)器打交道時(shí)需要自定義算子,自定義算子相對(duì)比較好簡(jiǎn)單,具體可以參考o(jì)rg.apache.flink.streaming.api.operators.KeyedProcessOperator的實(shí)現(xiàn)。
自定義算子需要實(shí)現(xiàn)AbstractStreamOperator和OneInputStreamOperator接口方法
實(shí)現(xiàn)定時(shí)器功能,需要實(shí)現(xiàn)Triggerable接口方法
實(shí)現(xiàn)處理協(xié)調(diào)器的事件功能,需要實(shí)現(xiàn)OperatorEventHandler接口方法
示例
自定義算子
這里實(shí)現(xiàn)一個(gè)自定義的算子,用來(lái)處理KeyedStream的數(shù)據(jù),它能夠接受來(lái)自協(xié)調(diào)器的事件,并且能夠給協(xié)調(diào)器發(fā)送事件。
MyKeyedProcessOperator實(shí)現(xiàn)代碼如下:
package com.examples.operator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 自定義的KeyedProcessOperator* @author shirukai*/
public class MyKeyedProcessOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>implements OneInputStreamOperator<IN, OUT>,Triggerable<KEY, VoidNamespace>,OperatorEventHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);private transient TimestampedCollector<OUT> collector;private transient TimerService timerService;private final OperatorEventGateway operatorEventGateway;public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {this.processingTimeService = processingTimeService;this.operatorEventGateway = operatorEventGateway;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);timerService = new SimpleTimerService(internalTimerService);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {LOG.info("processElement: {}", element);collector.setTimestamp(element);// 注冊(cè)事件時(shí)間定時(shí)器timerService.registerEventTimeTimer(element.getTimestamp() + 10);// 注冊(cè)處理時(shí)間定時(shí)器timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);// 給協(xié)調(diào)器發(fā)送消息operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));// 不做任何處理直接發(fā)送到下游collector.collect((OUT) element.getValue());}@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onEventTime: {}", timer);}@Overridepublic void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onProcessingTime: {}", timer);}@Overridepublic void handleOperatorEvent(OperatorEvent evt) {LOG.info("handleOperatorEvent: {}", evt);}
}
算子工廠類MyKeyedProcessOperatorFactory:
package com.examples.operator;import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;/*** 自定義算子工廠類* @author shirukai*/
public class MyKeyedProcessOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN>implements OneInputStreamOperatorFactory<IN, IN>,CoordinatedOperatorFactory<IN>,ProcessingTimeServiceAware {@Overridepublic OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);}@Overridepublic <T extends StreamOperator<IN>> T createStreamOperator(StreamOperatorParameters<IN> parameters) {final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();final OperatorEventGateway gateway =parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);try {final MyKeyedProcessOperator<?, IN, IN> operator = new MyKeyedProcessOperator<>(processingTimeService, gateway);operator.setup(parameters.getContainingTask(),parameters.getStreamConfig(),parameters.getOutput());parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, operator);return (T) operator;} catch (Exception e) {throw new IllegalStateException("Cannot create operator for "+ parameters.getStreamConfig().getOperatorName(),e);}}@Overridepublic Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {return MyKeyedProcessOperator.class;}
}
自定義協(xié)調(diào)器
協(xié)調(diào)器執(zhí)行器線程工廠類CoordinatorExecutorThreadFactory,當(dāng)前類可以通用,用來(lái)創(chuàng)建協(xié)調(diào)器線程。
package com.examples.coordinator;import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FatalExitExceptionHandler;import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;/*** A thread factory class that provides some helper methods.*/
public class CoordinatorExecutorThreadFactoryimplements ThreadFactory, Thread.UncaughtExceptionHandler {private final String coordinatorThreadName;private final ClassLoader classLoader;private final Thread.UncaughtExceptionHandler errorHandler;@Nullableprivate Thread thread;// TODO discuss if we should fail the job(JM may restart the job later) or directly kill JM// process// Currently we choose to directly kill JM processCoordinatorExecutorThreadFactory(final String coordinatorThreadName, final ClassLoader contextClassLoader) {this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);}@VisibleForTestingCoordinatorExecutorThreadFactory(final String coordinatorThreadName,final ClassLoader contextClassLoader,final Thread.UncaughtExceptionHandler errorHandler) {this.coordinatorThreadName = coordinatorThreadName;this.classLoader = contextClassLoader;this.errorHandler = errorHandler;}@Overridepublic synchronized Thread newThread(Runnable r) {thread = new Thread(r, coordinatorThreadName);thread.setContextClassLoader(classLoader);thread.setUncaughtExceptionHandler(this);return thread;}@Overridepublic synchronized void uncaughtException(Thread t, Throwable e) {errorHandler.uncaughtException(t, e);}public String getCoordinatorThreadName() {return coordinatorThreadName;}boolean isCurrentThreadCoordinatorThread() {return Thread.currentThread() == thread;}
}
協(xié)調(diào)器上下文CoordinatorContext,當(dāng)前類可以通用。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.examples.coordinator;import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;/*** A context class for the {@link OperatorCoordinator}.** <p>The context serves a few purposes:** <ul>* <li>Thread model enforcement - The context ensures that all the manipulations to the* coordinator state are handled by the same thread.* </ul>*/
@Internal
public class CoordinatorContext implements AutoCloseable {private static final Logger LOG =LoggerFactory.getLogger(CoordinatorContext.class);private final ScheduledExecutorService coordinatorExecutor;private final ScheduledExecutorService workerExecutor;private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;private final OperatorCoordinator.Context operatorCoordinatorContext;private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;public CoordinatorContext(CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this(Executors.newScheduledThreadPool(1, coordinatorThreadFactory),Executors.newScheduledThreadPool(1,new ExecutorThreadFactory(coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),coordinatorThreadFactory,operatorCoordinatorContext);}public CoordinatorContext(ScheduledExecutorService coordinatorExecutor,ScheduledExecutorService workerExecutor,CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this.coordinatorExecutor = coordinatorExecutor;this.workerExecutor = workerExecutor;this.coordinatorThreadFactory = coordinatorThreadFactory;this.operatorCoordinatorContext = operatorCoordinatorContext;this.subtaskGateways = new HashMap<>(operatorCoordinatorContext.currentParallelism());}@Overridepublic void close() throws InterruptedException {// Close quietly so the closing sequence will be executed completely.ComponentClosingUtils.shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));ComponentClosingUtils.shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));}public void runInCoordinatorThread(Runnable runnable) {coordinatorExecutor.execute(runnable);}// --------- Package private methods for the DynamicCepOperatorCoordinator ------------ClassLoader getUserCodeClassloader() {return this.operatorCoordinatorContext.getUserCodeClassloader();}void subtaskReady(OperatorCoordinator.SubtaskGateway gateway) {final int subtask = gateway.getSubtask();if (subtaskGateways.get(subtask) == null) {subtaskGateways.put(subtask, gateway);} else {throw new IllegalStateException("Already have a subtask gateway for " + subtask);}}void subtaskNotReady(int subtaskIndex) {subtaskGateways.put(subtaskIndex, null);}Set<Integer> getSubtasks() {return subtaskGateways.keySet();}public void sendEventToOperator(int subtaskId, OperatorEvent event) {callInCoordinatorThread(() -> {final OperatorCoordinator.SubtaskGateway gateway =subtaskGateways.get(subtaskId);if (gateway == null) {LOG.warn(String.format("Subtask %d is not ready yet to receive events.",subtaskId));} else {gateway.sendEvent(event);}return null;},String.format("Failed to send event %s to subtask %d", event, subtaskId));}/*** Fail the job with the given cause.** @param cause the cause of the job failure.*/void failJob(Throwable cause) {operatorCoordinatorContext.failJob(cause);}// ---------------- private helper methods -----------------/*** A helper method that delegates the callable to the coordinator thread if the current thread* is not the coordinator thread, otherwise call the callable right away.** @param callable the callable to delegate.*/private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {// Ensure the split assignment is done by the coordinator executor.if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()&& !coordinatorExecutor.isShutdown()) {try {final Callable<V> guardedCallable =() -> {try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Coordinator Executor",t);ExceptionUtils.rethrowException(t);return null;}};return coordinatorExecutor.submit(guardedCallable).get();} catch (InterruptedException | ExecutionException e) {throw new FlinkRuntimeException(errorMessage, e);}}try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Source Coordinator Executor", t);throw new FlinkRuntimeException(errorMessage, t);}}
}
自定義協(xié)調(diào)器
package com.examples.coordinator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;/*** 自定義協(xié)調(diào)器* 需要實(shí)現(xiàn) OperatorCoordinator 接口* @author shirukai*/
public class MyKeyedProcessCoordinator implements OperatorCoordinator {private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessCoordinator.class);/*** The name of the operator this RuleDistributorCoordinator is associated with.*/private final String operatorName;private final CoordinatorContext context;private boolean started;public MyKeyedProcessCoordinator(String operatorName, CoordinatorContext context) {this.operatorName = operatorName;this.context = context;}@Overridepublic void start() throws Exception {LOG.info("Starting Coordinator for {}: {}.",this.getClass().getSimpleName(),operatorName);// we mark this as started first, so that we can later distinguish the cases where 'start()'// wasn't called and where 'start()' failed.started = true;runInEventLoop(() -> {LOG.info("Coordinator started.");},"do something for coordinator.");}@Overridepublic void close() throws Exception {}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {LOG.info("Recovering subtask {} to checkpoint {} for operator {} to checkpoint.",subtask,checkpointId,operatorName);runInEventLoop(() -> {},"making event gateway to subtask %d available",subtask);}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {runInEventLoop(() -> {LOG.info("Removing itself after failure for subtask {} of operator {}.",subtask,operatorName);context.subtaskNotReady(subtask);},"handling subtask %d failure",subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {assert subtask == gateway.getSubtask();LOG.debug("Subtask {} of operator {} is ready.", subtask, operatorName);runInEventLoop(() -> {context.subtaskReady(gateway);sendEventToOperator(new MyEvent("hello,I'm from coordinator"));},"making event gateway to subtask %d available",subtask);}private void sendEventToOperator(OperatorEvent event) {for (Integer subtask : context.getSubtasks()) {try {context.sendEventToOperator(subtask, event);} catch (Exception e) {LOG.error("Failed to send OperatorEvent to operator {}",operatorName,e);context.failJob(e);return;}}}private void runInEventLoop(final ThrowingRunnable<Throwable> action,final String actionName,final Object... actionNameFormatParameters) {ensureStarted();context.runInCoordinatorThread(() -> {try {action.run();} catch (Throwable t) {// If we have a JVM critical error, promote it immediately, there is a good// chance the logging or job failing will not succeed any moreExceptionUtils.rethrowIfFatalErrorOrOOM(t);final String actionString =String.format(actionName, actionNameFormatParameters);LOG.error("Uncaught exception in the coordinator for {} while {}. Triggering job failover.",operatorName,actionString,t);context.failJob(t);}});}private void ensureStarted() {if (!started) {throw new IllegalStateException("The coordinator has not started yet.");}}
}
自定義協(xié)調(diào)器提供器
package com.examples.coordinator;import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;/*** 自定義協(xié)調(diào)器的提供者** @author shirukai*/
public class MyKeyedProcessCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {private static final long serialVersionUID = 1L;private final String operatorName;public MyKeyedProcessCoordinatorProvider(String operatorName, OperatorID operatorID) {super(operatorID);this.operatorName = operatorName;}@Overrideprotected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {final String coordinatorThreadName = " MyKeyedProcessCoordinator-" + operatorName;CoordinatorExecutorThreadFactory coordinatorThreadFactory =new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader());CoordinatorContext coordinatorContext =new CoordinatorContext(coordinatorThreadFactory, context);return new MyKeyedProcessCoordinator(operatorName, coordinatorContext);}
}
執(zhí)行測(cè)試
package com.examples;import com.examples.operator.MyKeyedProcessOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author shirukai*/
public class CoordinatorExamples {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<MyData> source = env.fromElements(new MyData(1, 1.0), new MyData(2, 2.0), new MyData(1, 3.0));MyKeyedProcessOperatorFactory<MyData> operatorFactory = new MyKeyedProcessOperatorFactory<>();source.keyBy((KeySelector<MyData, Integer>) MyData::getId).transform("MyKeyedProcess", TypeInformation.of(MyData.class), operatorFactory).print();env.execute();}public static class MyData {private Integer id;private Double value;public MyData(Integer id, Double value) {this.id = id;this.value = value;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Double getValue() {return value;}public void setValue(Double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"id=" + id +", value=" + value +'}';}}
}