前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink JobManager的High Availability

聊聊flink JobManager的High Availability

原创
作者头像
code4it
发布2019-02-16 12:21:24
2.5K0
发布2019-02-16 12:21:24
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink JobManager的High Availability

配置

flink-conf.yaml

代码语言:javascript
复制
high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share
  • high-availability的可选值为NONE或者zookeeper;high-availability.zookeeper.quorum用于指定zookeeper的peers;high-availability.zookeeper.path.root用于指定在zookeeper的root node路径;high-availability.cluster-id用于指定当前cluster的node名称,该cluster node位于root node下面;high-availability.storageDir用于指定JobManager metadata的存储路径

masters文件

代码语言:javascript
复制
localhost:8081
localhost:8082
  • masters文件用于指定jobmanager的地址

HighAvailabilityMode

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java

代码语言:javascript
复制
public enum HighAvailabilityMode {
    NONE(false),
    ZOOKEEPER(true),
    FACTORY_CLASS(true);
​
    private final boolean haActive;
​
    HighAvailabilityMode(boolean haActive) {
        this.haActive = haActive;
    }
​
    /**
     * Return the configured {@link HighAvailabilityMode}.
     *
     * @param config The config to parse
     * @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
     * configured.
     */
    public static HighAvailabilityMode fromConfig(Configuration config) {
        String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
​
        if (haMode == null) {
            return HighAvailabilityMode.NONE;
        } else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
            // Map old default to new default
            return HighAvailabilityMode.NONE;
        } else {
            try {
                return HighAvailabilityMode.valueOf(haMode.toUpperCase());
            } catch (IllegalArgumentException e) {
                return FACTORY_CLASS;
            }
        }
    }
​
    /**
     * Returns true if the defined recovery mode supports high availability.
     *
     * @param configuration Configuration which contains the recovery mode
     * @return true if high availability is supported by the recovery mode, otherwise false
     */
    public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
        HighAvailabilityMode mode = fromConfig(configuration);
        return mode.haActive;
    }
}
  • HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability

HighAvailabilityOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java

代码语言:javascript
复制
@PublicEvolving
@ConfigGroups(groups = {
    @ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {
​
    // ------------------------------------------------------------------------
    //  Required High Availability Options
    // ------------------------------------------------------------------------
​
    /**
     * Defines high-availability mode used for the cluster execution.
     * A value of "NONE" signals no highly available setup.
     * To enable high-availability, set this mode to "ZOOKEEPER".
     * Can also be set to FQN of HighAvailability factory class.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
    public static final ConfigOption<String> HA_MODE =
            key("high-availability")
            .defaultValue("NONE")
            .withDeprecatedKeys("recovery.mode")
            .withDescription("Defines high-availability mode used for the cluster execution." +
                " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
​
    /**
     * The ID of the Flink cluster, used to separate multiple Flink clusters
     * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
     */
    public static final ConfigOption<String> HA_CLUSTER_ID =
            key("high-availability.cluster-id")
            .defaultValue("/default")
            .withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
            .withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
                " Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");
​
    /**
     * File system path (URI) where Flink persists metadata in high-availability setups.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
    public static final ConfigOption<String> HA_STORAGE_PATH =
            key("high-availability.storageDir")
            .noDefaultValue()
            .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
            .withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
​
    // ------------------------------------------------------------------------
    //  Recovery Options
    // ------------------------------------------------------------------------
​
    /**
     * Optional port (range) used by the job manager in high-availability mode.
     */
    public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
            key("high-availability.jobmanager.port")
            .defaultValue("0")
            .withDeprecatedKeys("recovery.jobmanager.port")
            .withDescription("Optional port (range) used by the job manager in high-availability mode.");
​
    /**
     * The time before a JobManager after a fail over recovers the current jobs.
     */
    public static final ConfigOption<String> HA_JOB_DELAY =
            key("high-availability.job.delay")
            .noDefaultValue()
            .withDeprecatedKeys("recovery.job.delay")
            .withDescription("The time before a JobManager after a fail over recovers the current jobs.");
​
    // ------------------------------------------------------------------------
    //  ZooKeeper Options
    // ------------------------------------------------------------------------
​
    /**
     * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
     */
    public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
            key("high-availability.zookeeper.quorum")
            .noDefaultValue()
            .withDeprecatedKeys("recovery.zookeeper.quorum")
            .withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");
​
    /**
     * The root path under which Flink stores its entries in ZooKeeper.
     */
    public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
            key("high-availability.zookeeper.path.root")
            .defaultValue("/flink")
            .withDeprecatedKeys("recovery.zookeeper.path.root")
            .withDescription("The root path under which Flink stores its entries in ZooKeeper.");
​
    public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
            key("high-availability.zookeeper.path.latch")
            .defaultValue("/leaderlatch")
            .withDeprecatedKeys("recovery.zookeeper.path.latch")
            .withDescription("Defines the znode of the leader latch which is used to elect the leader.");
​
    /** ZooKeeper root path (ZNode) for job graphs. */
    public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
            key("high-availability.zookeeper.path.jobgraphs")
            .defaultValue("/jobgraphs")
            .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
            .withDescription("ZooKeeper root path (ZNode) for job graphs");
​
    public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
            key("high-availability.zookeeper.path.leader")
            .defaultValue("/leader")
            .withDeprecatedKeys("recovery.zookeeper.path.leader")
            .withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
                " leader session ID.");
​
    /** ZooKeeper root path (ZNode) for completed checkpoints. */
    public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
            key("high-availability.zookeeper.path.checkpoints")
            .defaultValue("/checkpoints")
            .withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
            .withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");
​
    /** ZooKeeper root path (ZNode) for checkpoint counters. */
    public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
            key("high-availability.zookeeper.path.checkpoint-counter")
            .defaultValue("/checkpoint-counter")
            .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
            .withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");
​
    /** ZooKeeper root path (ZNode) for Mesos workers. */
    @PublicEvolving
    public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
            key("high-availability.zookeeper.path.mesos-workers")
            .defaultValue("/mesos-workers")
            .withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
            .withDescription(Description.builder()
                .text("The ZooKeeper root path for persisting the Mesos worker information.")
                .build());
​
    // ------------------------------------------------------------------------
    //  ZooKeeper Client Settings
    // ------------------------------------------------------------------------
​
    public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
            key("high-availability.zookeeper.client.session-timeout")
            .defaultValue(60000)
            .withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
            .withDescription("Defines the session timeout for the ZooKeeper session in ms.");
​
    public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
            key("high-availability.zookeeper.client.connection-timeout")
            .defaultValue(15000)
            .withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
            .withDescription("Defines the connection timeout for ZooKeeper in ms.");
​
    public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
            key("high-availability.zookeeper.client.retry-wait")
            .defaultValue(5000)
            .withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
            .withDescription("Defines the pause between consecutive retries in ms.");
​
    public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
            key("high-availability.zookeeper.client.max-retry-attempts")
            .defaultValue(3)
            .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
            .withDescription("Defines the number of connection retries before the client gives up.");
​
    public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
            key("high-availability.zookeeper.path.running-registry")
            .defaultValue("/running_job_registry/");
​
    public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
            key("high-availability.zookeeper.client.acl")
            .defaultValue("open")
            .withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
                " set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
                " SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");
​
    // ------------------------------------------------------------------------
​
    /** Not intended to be instantiated. */
    private HighAvailabilityOptions() {}
}
  • HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项

HighAvailabilityServicesUtils

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java

代码语言:javascript
复制
public class HighAvailabilityServicesUtils {
​
    public static HighAvailabilityServices createAvailableOrEmbeddedServices(
        Configuration config,
        Executor executor) throws Exception {
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
​
        switch (highAvailabilityMode) {
            case NONE:
                return new EmbeddedHaServices(executor);
​
            case ZOOKEEPER:
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
​
                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(config),
                    executor,
                    config,
                    blobStoreService);
​
            case FACTORY_CLASS:
                return createCustomHAServices(config, executor);
​
            default:
                throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
        }
    }
​
    public static HighAvailabilityServices createHighAvailabilityServices(
        Configuration configuration,
        Executor executor,
        AddressResolution addressResolution) throws Exception {
​
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
​
        switch (highAvailabilityMode) {
            case NONE:
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
​
                final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    JobMaster.JOB_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    ResourceManager.RESOURCE_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    Dispatcher.DISPATCHER_NAME,
                    addressResolution,
                    configuration);
​
                final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
                    "%s must be set",
                    RestOptions.ADDRESS.key());
                final int port = configuration.getInteger(RestOptions.PORT);
                final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
                final String protocol = enableSSL ? "https://" : "http://";
​
                return new StandaloneHaServices(
                    resourceManagerRpcUrl,
                    dispatcherRpcUrl,
                    jobManagerRpcUrl,
                    String.format("%s%s:%s", protocol, address, port));
            case ZOOKEEPER:
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
​
                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(configuration),
                    executor,
                    configuration,
                    blobStoreService);
​
            case FACTORY_CLASS:
                return createCustomHAServices(configuration, executor);
​
            default:
                throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
        }
    }
​
    /**
     * Returns the JobManager's hostname and port extracted from the given
     * {@link Configuration}.
     *
     * @param configuration Configuration to extract the JobManager's address from
     * @return The JobManager's hostname and port
     * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
     */
    public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
​
        final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
        final int port = configuration.getInteger(JobManagerOptions.PORT);
​
        if (hostname == null) {
            throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
                "' is missing (hostname/address of JobManager to connect to).");
        }
​
        if (port <= 0 || port >= 65536) {
            throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
                "' (port of the JobManager actor system) : " + port +
                ".  it must be greater than 0 and less than 65536.");
        }
​
        return Tuple2.of(hostname, port);
    }
​
    private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
​
        final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
​
        try {
            highAvailabilityServicesFactory = InstantiationUtil.instantiate(
                haServicesClassName,
                HighAvailabilityServicesFactory.class,
                classLoader);
        } catch (Exception e) {
            throw new FlinkException(
                String.format(
                    "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
                    haServicesClassName),
                e);
        }
​
        try {
            return highAvailabilityServicesFactory.createHAServices(config, executor);
        } catch (Exception e) {
            throw new FlinkException(
                String.format(
                    "Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
                    haServicesClassName),
                e);
        }
    }
​
    /**
     * Enum specifying whether address resolution should be tried or not when creating the
     * {@link HighAvailabilityServices}.
     */
    public enum AddressResolution {
        TRY_ADDRESS_RESOLUTION,
        NO_ADDRESS_RESOLUTION
    }
}
  • HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices
  • 其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
  • HighAvailabilityServicesUtils还提供了getJobManagerAddress静态方法,用于获取JobManager的hostname及port

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

代码语言:javascript
复制
/**
 * The HighAvailabilityServices give access to all services needed for a highly-available
 * setup. In particular, the services provide access to highly available storage and
 * registries, as well as distributed counters and leader election.
 * 
 * <ul>
 *     <li>ResourceManager leader election and leader retrieval</li>
 *     <li>JobManager leader election and leader retrieval</li>
 *     <li>Persistence for checkpoint metadata</li>
 *     <li>Registering the latest completed checkpoint(s)</li>
 *     <li>Persistence for the BLOB store</li>
 *     <li>Registry that marks a job's status</li>
 *     <li>Naming of RPC endpoints</li>
 * </ul>
 */
public interface HighAvailabilityServices extends AutoCloseable {
​
    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------
​
    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);
​
    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
​
    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------
​
    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();
​
    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();
​
    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
​
    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
​
    LeaderRetrievalService getWebMonitorLeaderRetriever();
​
    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();
​
    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();
​
    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
​
    LeaderElectionService getWebMonitorLeaderElectionService();
​
    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();
​
    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
​
    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;
​
    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;
​
    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------
​
    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;
​
    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法

ZooKeeperHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

代码语言:javascript
复制
/**
 * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
 * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
 * 
 * <pre>
 * /flink
 *      +/cluster_id_1/resource_manager_lock
 *      |            |
 *      |            +/job-id-1/job_manager_lock
 *      |            |         /checkpoints/latest
 *      |            |                     /latest-1
 *      |            |                     /latest-2
 *      |            |
 *      |            +/job-id-2/job_manager_lock
 *      |      
 *      +/cluster_id_2/resource_manager_lock
 *                   |
 *                   +/job-id-1/job_manager_lock
 *                            |/checkpoints/latest
 *                            |            /latest-1
 *                            |/persisted_job_graph
 * </pre>
 * 
 * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
 * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
 * accommodate specific permission.
 * 
 * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
 * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
 * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
 * 
 * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
 * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
 * 
 * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
 * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
 * cluster and participate in the execution of the same set of jobs.
 */
public class ZooKeeperHaServices implements HighAvailabilityServices {
​
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);
​
    private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
​
    private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
​
    private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
​
    private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
​
    // ------------------------------------------------------------------------
    
    
    /** The ZooKeeper client to use */
    private final CuratorFramework client;
​
    /** The executor to run ZooKeeper callbacks on */
    private final Executor executor;
​
    /** The runtime configuration */
    private final Configuration configuration;
​
    /** The zookeeper based running jobs registry */
    private final RunningJobsRegistry runningJobsRegistry;
​
    /** Store for arbitrary blobs */
    private final BlobStoreService blobStoreService;
​
    public ZooKeeperHaServices(
            CuratorFramework client,
            Executor executor,
            Configuration configuration,
            BlobStoreService blobStoreService) {
        this.client = checkNotNull(client);
        this.executor = checkNotNull(executor);
        this.configuration = checkNotNull(configuration);
        this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
​
        this.blobStoreService = checkNotNull(blobStoreService);
    }
​
    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------
​
    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
    }
​
    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }
​
    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
    }
​
    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
    }
​
    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
    }
​
    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
    }
​
    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
    }
​
    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
    }
​
    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
    }
​
    @Override
    public RunningJobsRegistry getRunningJobsRegistry() {
        return runningJobsRegistry;
    }
​
    @Override
    public BlobStore createBlobStore() throws IOException {
        return blobStoreService;
    }
​
    // ------------------------------------------------------------------------
    //  Shutdown
    // ------------------------------------------------------------------------
​
    @Override
    public void close() throws Exception {
        Throwable exception = null;
​
        try {
            blobStoreService.close();
        } catch (Throwable t) {
            exception = t;
        }
​
        internalClose();
​
        if (exception != null) {
            ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
        }
    }
​
    @Override
    public void closeAndCleanupAllData() throws Exception {
        LOG.info("Close and clean up all data for ZooKeeperHaServices.");
​
        Throwable exception = null;
​
        try {
            blobStoreService.closeAndCleanupAllData();
        } catch (Throwable t) {
            exception = t;
        }
​
        internalClose();
​
        if (exception != null) {
            ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
        }
    }
​
    /**
     * Closes components which don't distinguish between close and closeAndCleanupAllData
     */
    private void internalClose() {
        client.close();
    }
​
    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------
​
    private static String getPathForJobManager(final JobID jobID) {
        return "/" + jobID + JOB_MANAGER_LEADER_PATH;
    }
}
  • ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs

JobClient.submitJob

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/client/JobClient.java

代码语言:javascript
复制
public class JobClient {
​
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
​
    //......
​
    /**
     * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
     * passed to {@code awaitJobResult} to get the result of the submission.
     * @return JobListeningContext which may be used to retrieve the JobExecutionResult via
     *          {@code awaitJobResult(JobListeningContext context)}.
     */
    public static JobListeningContext submitJob(
            ActorSystem actorSystem,
            Configuration config,
            HighAvailabilityServices highAvailabilityServices,
            JobGraph jobGraph,
            FiniteDuration timeout,
            boolean sysoutLogUpdates,
            ClassLoader classLoader) {
​
        checkNotNull(actorSystem, "The actorSystem must not be null.");
        checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
        checkNotNull(jobGraph, "The jobGraph must not be null.");
        checkNotNull(timeout, "The timeout must not be null.");
​
        // for this job, we create a proxy JobClientActor that deals with all communication with
        // the JobManager. It forwards the job submission, checks the success/failure responses, logs
        // update messages, watches for disconnect between client and JobManager, ...
​
        Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
            highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
            timeout,
            sysoutLogUpdates,
            config);
​
        ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
​
        Future<Object> submissionFuture = Patterns.ask(
                jobClientActor,
                new JobClientMessages.SubmitJobAndWait(jobGraph),
                new Timeout(AkkaUtils.INF_TIMEOUT()));
​
        return new JobListeningContext(
            jobGraph.getJobID(),
            submissionFuture,
            jobClientActor,
            timeout,
            classLoader,
            highAvailabilityServices);
    }
​
    //......
}
  • 像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job

小结

  • HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability;HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
  • HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices;其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法;ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs;像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 配置
    • flink-conf.yaml
      • masters文件
      • HighAvailabilityMode
      • HighAvailabilityOptions
      • HighAvailabilityServicesUtils
      • HighAvailabilityServices
      • ZooKeeperHaServices
      • JobClient.submitJob
      • 小结
      • doc
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档