前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的RestClusterClientConfiguration

聊聊flink的RestClusterClientConfiguration

原创
作者头像
code4it
发布2019-03-08 09:51:12
1.3K0
发布2019-03-08 09:51:12
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的RestClusterClientConfiguration

RestClusterClientConfiguration

flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java

代码语言:javascript
复制
public final class RestClusterClientConfiguration {
​
    private final RestClientConfiguration restClientConfiguration;
​
    private final long awaitLeaderTimeout;
​
    private final int retryMaxAttempts;
​
    private final long retryDelay;
​
    private RestClusterClientConfiguration(
            final RestClientConfiguration endpointConfiguration,
            final long awaitLeaderTimeout,
            final int retryMaxAttempts,
            final long retryDelay) {
        checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0");
        checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0");
        checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0");
​
        this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
        this.awaitLeaderTimeout = awaitLeaderTimeout;
        this.retryMaxAttempts = retryMaxAttempts;
        this.retryDelay = retryDelay;
    }
​
    public RestClientConfiguration getRestClientConfiguration() {
        return restClientConfiguration;
    }
​
    /**
     * @see RestOptions#AWAIT_LEADER_TIMEOUT
     */
    public long getAwaitLeaderTimeout() {
        return awaitLeaderTimeout;
    }
​
    /**
     * @see RestOptions#RETRY_MAX_ATTEMPTS
     */
    public int getRetryMaxAttempts() {
        return retryMaxAttempts;
    }
​
    /**
     * @see RestOptions#RETRY_DELAY
     */
    public long getRetryDelay() {
        return retryDelay;
    }
​
    public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
        RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);
​
        final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
        final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
        final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);
​
        return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
    }
}
  • RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay;awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒

RestClusterClient

flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

代码语言:javascript
复制
public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
​
    private final RestClusterClientConfiguration restClusterClientConfiguration;
​
    private final RestClient restClient;
​
    private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
​
    private final WaitStrategy waitStrategy;
​
    private final T clusterId;
​
    private final LeaderRetrievalService webMonitorRetrievalService;
​
    private final LeaderRetrievalService dispatcherRetrievalService;
​
    private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();
​
    private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever();
​
    /** ExecutorService to run operations that can be retried on exceptions. */
    private ScheduledExecutorService retryExecutorService;
​
    //......
​
    private <C> CompletableFuture<C> retry(
            CheckedSupplier<CompletableFuture<C>> operation,
            Predicate<Throwable> retryPredicate) {
        return FutureUtils.retryWithDelay(
            CheckedSupplier.unchecked(operation),
            restClusterClientConfiguration.getRetryMaxAttempts(),
            Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
            retryPredicate,
            new ScheduledExecutorServiceAdapter(retryExecutorService));
    }
​
    @VisibleForTesting
    CompletableFuture<URL> getWebMonitorBaseUrl() {
        return FutureUtils.orTimeout(
                webMonitorLeaderRetriever.getLeaderFuture(),
                restClusterClientConfiguration.getAwaitLeaderTimeout(),
                TimeUnit.MILLISECONDS)
            .thenApplyAsync(leaderAddressSessionId -> {
                final String url = leaderAddressSessionId.f0;
                try {
                    return new URL(url);
                } catch (MalformedURLException e) {
                    throw new IllegalArgumentException("Could not parse URL from " + url, e);
                }
            }, executorService);
    }
​
    //......
}
  • RestClusterClient的构造器会从使用RestClusterClientConfiguration.fromConfiguration(configuration)方法从Configuration构建RestClusterClientConfiguration
  • retry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay())
  • getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()

小结

  • RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay
  • awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒
  • RestClusterClient的etry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay());getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RestClusterClientConfiguration
  • RestClusterClient
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档