做報名網(wǎng)站中國國家培訓網(wǎng)是真的嗎
背景
公司目前需要將Flink實時作業(yè)云化,構建多租戶實時計算平臺。目前考慮為了資源高效利用,并不打算為每個租戶部署一套獨立的Kubernetes集群。也就意味著多個租戶的作業(yè)可能會運行在同一套kubernets集群中。此時實時作業(yè)的任務就變的很危險,因為網(wǎng)絡可能是通的,就會存在危險的REST API暴露出去,被一些不壞好意的人利用,從而影響其他租戶的作業(yè)。鑒于此考慮給Flink的作業(yè)添加一個認證方式,可以是Kerberos或者是Http 用戶名密碼Baisc認證。各種搜索和詢問,最終發(fā)現(xiàn)了一些線索FLIP-181: Custom netty HTTP request inbound/outbound handlers 這里描述了為何flink官方否定這個訴求。當然不要著急,筆者在flink-basic-auth-handler上找到了方案,并且成功將方案遷移到了flink-1.17.2版本中。
改造步驟
Flink 的JobManager/SQLGateway是基于Netty實現(xiàn)的一套輕量級的web服務接口,這些接口都實現(xiàn)了RestServerEndpoint抽象類。因此我們可以看看這個類start方法中可以看到在啟動的代碼中可以看到InboundChannelHandlerFactory這個東西,通過改Factory創(chuàng)建一個Inbound的hander。
public final void start() throws Exception {synchronized (lock) {Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");log.info("Starting rest endpoint.");final Router router = new Router();final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();handlers = initializeHandlers(restAddressFuture);/* sort the handlers such that they are ordered the following:* /jobs* /jobs/overview* /jobs/:jobid* /jobs/:jobid/config* /:**/Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);checkAllEndpointsAndHandlersAreUnique(handlers);handlers.forEach(handler -> registerHandler(router, handler, log));ChannelInitializer<SocketChannel> initializer =new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws ConfigurationException {RouterHandler handler = new RouterHandler(router, responseHeaders);// SSL should be the first handler in the pipelineif (isHttpsEnabled()) {ch.pipeline().addLast("ssl",new RedirectingSslHandler(restAddress,restAddressFuture,sslHandlerFactory));}ch.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(uploadDir)).addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders));for (InboundChannelHandlerFactory factory :inboundChannelHandlerFactories) {Optional<ChannelHandler> channelHandler =factory.createHandler(configuration, responseHeaders);if (channelHandler.isPresent()) {ch.pipeline().addLast(channelHandler.get());}}ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(handler.getName(), handler).addLast(new PipelineErrorHandler(log, responseHeaders));}};NioEventLoopGroup bossGroup =new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));NioEventLoopGroup workerGroup =new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer);Iterator<Integer> portsIterator;try {portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);} catch (IllegalConfigurationException e) {throw e;} catch (Exception e) {throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);}int chosenPort = 0;while (portsIterator.hasNext()) {try {chosenPort = portsIterator.next();final ChannelFuture channel;if (restBindAddress == null) {channel = bootstrap.bind(chosenPort);} else {channel = bootstrap.bind(restBindAddress, chosenPort);}serverChannel = channel.syncUninterruptibly().channel();break;} catch (final Exception e) {// syncUninterruptibly() throws checked exceptions via Unsafe// continue if the exception is due to the port being in use, fail early// otherwiseif (!(e instanceof java.net.BindException)) {throw e;}}}if (serverChannel == null) {throw new BindException("Could not start rest endpoint on any port in port range "+ restBindPortRange);}log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();final String advertisedAddress;if (bindAddress.getAddress().isAnyLocalAddress()) {advertisedAddress = this.restAddress;} else {advertisedAddress = bindAddress.getAddress().getHostAddress();}port = bindAddress.getPort();log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();restAddressFuture.complete(restBaseUrl);state = State.RUNNING;startInternal();}}
然后在構造函數(shù)中可以發(fā)現(xiàn)inboundChannelHandlerFactories對象是通過SPI方案加載進來的。
public RestServerEndpoint(Configuration configuration)throws IOException, ConfigurationException {Preconditions.checkNotNull(configuration);RestServerEndpointConfiguration restConfiguration =RestServerEndpointConfiguration.fromConfiguration(configuration);Preconditions.checkNotNull(restConfiguration);this.configuration = configuration;this.restAddress = restConfiguration.getRestAddress();this.restBindAddress = restConfiguration.getRestBindAddress();this.restBindPortRange = restConfiguration.getRestBindPortRange();this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();this.uploadDir = restConfiguration.getUploadDir();