背景

公司目前需要将Flink实时作业云化,构建多租户实时计算平台。目前考虑为了资源高效利用,并不打算为每个租户部署一套独立的Kubernetes集群。也就意味着多个租户的作业可能会运行在同一套kubernets集群中。此时实时作业的任务就变的很危险,因为网络可能是通的,就会存在危险的REST API暴露出去,被一些不坏好意的人利用,从而影响其他租户的作业。鉴于此考虑给Flink的作业添加一个认证方式,可以是Kerberos或者是Http 用户名密码Baisc认证。各种搜索和询问,最终发现了一些线索FLIP-181: Custom netty HTTP request inbound/outbound handlers 这里描述了为何flink官方否定这个诉求。当然不要着急,笔者在flink-basic-auth-handler上找到了方案,并且成功将方案迁移到了flink-1.17.2版本中。

改造步骤

Flink 的JobManager/SQLGateway是基于Netty实现的一套轻量级的web服务接口,这些接口都实现了RestServerEndpoint抽象类。因此我们可以看看这个类start方法中可以看到在启动的代码中可以看到InboundChannelHandlerFactory这个东西,通过改Factory创建一个Inbound的hander。

public final void start() throws Exception {

synchronized (lock) {

Preconditions.checkState(
state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
handlers = initializeHandlers(restAddressFuture);
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
checkAllEndpointsAndHandlersAreUnique(handlers);
handlers.forEach(handler -> registerHandler(router, handler, log));
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws ConfigurationException {

RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled()) {

ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories) {

Optional<ChannelHandler> channelHandler =
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent()) {

ch.pipeline().addLast(channelHandler.get());
}
}
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
Iterator<Integer> portsIterator;
try {

portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch (IllegalConfigurationException e) {

throw e;
} catch (Exception e) {

throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
}
int chosenPort = 0;
while (portsIterator.hasNext()) {

try {

chosenPort = portsIterator.next();
final ChannelFuture channel;
if (restBindAddress == null) {

channel = bootstrap.bind(chosenPort);
} else {

channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch (final Exception e) {

// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException)) {

throw e;
}
}
}
if (serverChannel == null) {

throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
}
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {

advertisedAddress = this.restAddress;
} else {

advertisedAddress = bindAddress.getAddress().getHostAddress();
}
port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
startInternal();
}
}

然后在构造函数中可以发现inboundChannelHandlerFactories对象是通过SPI方案加载进来的。

 public RestServerEndpoint(Configuration configuration)
throws IOException, ConfigurationException {

Preconditions.checkNotNull(configuration);
RestServerEndpointConfiguration restConfiguration =
RestServerEndpointConfiguration.fromConfiguration(configuration);
Preconditions.checkNotNull(restConfiguration);
this.configuration = configuration;
this.restAddress = restConfiguration.getRestAddress();
this.restBindAddress = restConfiguration.getRestBindAddress();
this.restBindPortRange = restConfiguration.getRestBindPortRange();
this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();
this.uploadDir = restConfiguration.getUploadDir();
本站无任何商业行为
个人在线分享-虚灵IT资料分享 » Flink Rest Basic Auth – 安全认证
E-->