前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的RestClientConfiguration

聊聊flink的RestClientConfiguration

原创
作者头像
code4it
发布2019-03-07 14:03:11
1.2K0
发布2019-03-07 14:03:11
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的RestClientConfiguration

RestClientConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java

代码语言:javascript
复制
public final class RestClientConfiguration {
​
    @Nullable
    private final SSLHandlerFactory sslHandlerFactory;
​
    private final long connectionTimeout;
​
    private final long idlenessTimeout;
​
    private final int maxContentLength;
​
    private RestClientConfiguration(
            @Nullable final SSLHandlerFactory sslHandlerFactory,
            final long connectionTimeout,
            final long idlenessTimeout,
            final int maxContentLength) {
        checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
        this.sslHandlerFactory = sslHandlerFactory;
        this.connectionTimeout = connectionTimeout;
        this.idlenessTimeout = idlenessTimeout;
        this.maxContentLength = maxContentLength;
    }
​
    /**
     * Returns the {@link SSLEngine} that the REST client endpoint should use.
     *
     * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
     */
    @Nullable
    public SSLHandlerFactory getSslHandlerFactory() {
        return sslHandlerFactory;
    }
​
    /**
     * {@see RestOptions#CONNECTION_TIMEOUT}.
     */
    public long getConnectionTimeout() {
        return connectionTimeout;
    }
​
    /**
     * {@see RestOptions#IDLENESS_TIMEOUT}.
     */
    public long getIdlenessTimeout() {
        return idlenessTimeout;
    }
​
    /**
     * Returns the max content length that the REST client endpoint could handle.
     *
     * @return max content length that the REST client endpoint could handle
     */
    public int getMaxContentLength() {
        return maxContentLength;
    }
​
    /**
     * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
     *
     * @param config configuration from which the REST client endpoint configuration should be created from
     * @return REST client endpoint configuration
     * @throws ConfigurationException if SSL was configured incorrectly
     */
​
    public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
        Preconditions.checkNotNull(config);
​
        final SSLHandlerFactory sslHandlerFactory;
        if (SSLUtils.isRestSSLEnabled(config)) {
            try {
                sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
            } catch (Exception e) {
                throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
            }
        } else {
            sslHandlerFactory = null;
        }
​
        final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
​
        final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
​
        int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
​
        return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
    }
}
  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
  • fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600

RestClient

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

代码语言:javascript
复制
public class RestClient implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
​
    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
​
    // used to open connections to a rest server endpoint
    private final Executor executor;
​
    private final Bootstrap bootstrap;
​
    private final CompletableFuture<Void> terminationFuture;
​
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
​
    public RestClient(RestClientConfiguration configuration, Executor executor) {
        Preconditions.checkNotNull(configuration);
        this.executor = Preconditions.checkNotNull(executor);
        this.terminationFuture = new CompletableFuture<>();
​
        final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                try {
                    // SSL should be the first handler in the pipeline
                    if (sslHandlerFactory != null) {
                        socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
                    }
​
                    socketChannel.pipeline()
                        .addLast(new HttpClientCodec())
                        .addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
                        .addLast(new ChunkedWriteHandler()) // required for multipart-requests
                        .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
                        .addLast(new ClientHandler());
                } catch (Throwable t) {
                    t.printStackTrace();
                    ExceptionUtils.rethrow(t);
                }
            }
        };
        NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
​
        bootstrap = new Bootstrap();
        bootstrap
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(initializer);
​
        LOG.info("Rest client endpoint started.");
    }
​
    @Override
    public CompletableFuture<Void> closeAsync() {
        return shutdownInternally(Time.seconds(10L));
    }
​
    public void shutdown(Time timeout) {
        final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
​
        try {
            shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        } catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", e);
        }
    }
​
    private CompletableFuture<Void> shutdownInternally(Time timeout) {
        if (isRunning.compareAndSet(true, false)) {
            LOG.info("Shutting down rest endpoint.");
​
            if (bootstrap != null) {
                if (bootstrap.group() != null) {
                    bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
                        .addListener(finished -> {
                            if (finished.isSuccess()) {
                                terminationFuture.complete(null);
                            } else {
                                terminationFuture.completeExceptionally(finished.cause());
                            }
                        });
                }
            }
        }
        return terminationFuture;
    }
​
    //......
}
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

小结

  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RestClientConfiguration
  • RestClient
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档