前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的AbstractNonHaServices

聊聊flink的AbstractNonHaServices

原创
作者头像
code4it
发布2019-02-17 10:49:56
6260
发布2019-02-17 10:49:56
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的AbstractNonHaServices

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

代码语言:javascript
复制
public interface HighAvailabilityServices extends AutoCloseable {
​
    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------
​
    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);
​
    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
​
    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------
​
    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();
​
    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();
​
    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
​
    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
​
    LeaderRetrievalService getWebMonitorLeaderRetriever();
​
    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();
​
    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();
​
    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
​
    LeaderElectionService getWebMonitorLeaderElectionService();
​
    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();
​
    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
​
    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;
​
    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;
​
    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------
​
    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;
​
    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

AbstractNonHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

代码语言:javascript
复制
public abstract class AbstractNonHaServices implements HighAvailabilityServices {
    protected final Object lock = new Object();
​
    private final RunningJobsRegistry runningJobsRegistry;
​
    private final VoidBlobStore voidBlobStore;
​
    private boolean shutdown;
​
    public AbstractNonHaServices() {
        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
        this.voidBlobStore = new VoidBlobStore();
​
        shutdown = false;
    }
​
    // ----------------------------------------------------------------------
    // HighAvailabilityServices method implementations
    // ----------------------------------------------------------------------
​
    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneCheckpointRecoveryFactory();
        }
    }
​
    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneSubmittedJobGraphStore();
        }
    }
​
    @Override
    public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
        synchronized (lock) {
            checkNotShutdown();
​
            return runningJobsRegistry;
        }
    }
​
    @Override
    public BlobStore createBlobStore() throws IOException {
        synchronized (lock) {
            checkNotShutdown();
​
            return voidBlobStore;
        }
    }
​
    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!shutdown) {
                shutdown = true;
            }
        }
    }
​
    @Override
    public void closeAndCleanupAllData() throws Exception {
        // this stores no data, so this method is the same as 'close()'
        close();
    }
​
    // ----------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------
​
    @GuardedBy("lock")
    protected void checkNotShutdown() {
        checkState(!shutdown, "high availability services are shut down");
    }
​
    protected boolean isShutDown() {
        return shutdown;
    }
}
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

EmbeddedHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

代码语言:javascript
复制
public class EmbeddedHaServices extends AbstractNonHaServices {
​
    private final Executor executor;
​
    private final EmbeddedLeaderService resourceManagerLeaderService;
​
    private final EmbeddedLeaderService dispatcherLeaderService;
​
    private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
​
    private final EmbeddedLeaderService webMonitorLeaderService;
​
    public EmbeddedHaServices(Executor executor) {
        this.executor = Preconditions.checkNotNull(executor);
        this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
        this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
        this.jobManagerLeaderServices = new HashMap<>();
        this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
    }
​
    // ------------------------------------------------------------------------
    //  services
    // ------------------------------------------------------------------------
​
    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return resourceManagerLeaderService.createLeaderRetrievalService();
    }
​
    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return dispatcherLeaderService.createLeaderRetrievalService();
    }
​
    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return resourceManagerLeaderService.createLeaderElectionService();
    }
​
    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return dispatcherLeaderService.createLeaderElectionService();
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        checkNotNull(jobID);
​
        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderRetrievalService();
        }
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }
​
    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return webMonitorLeaderService.createLeaderRetrievalService();
    }
​
    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        checkNotNull(jobID);
​
        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderElectionService();
        }
    }
​
    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return webMonitorLeaderService.createLeaderElectionService();
    }
​
    // ------------------------------------------------------------------------
    // internal
    // ------------------------------------------------------------------------
​
    @GuardedBy("lock")
    private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
        EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
        if (service == null) {
            service = new EmbeddedLeaderService(executor);
            jobManagerLeaderServices.put(jobID, service);
        }
        return service;
    }
​
    // ------------------------------------------------------------------------
    //  shutdown
    // ------------------------------------------------------------------------
​
    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!isShutDown()) {
                // stop all job manager leader services
                for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
                    service.shutdown();
                }
                jobManagerLeaderServices.clear();
​
                resourceManagerLeaderService.shutdown();
​
                webMonitorLeaderService.shutdown();
            }
​
            super.close();
        }
    }
}
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

StandaloneHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

代码语言:javascript
复制
public class StandaloneHaServices extends AbstractNonHaServices {
​
    /** The constant name of the ResourceManager RPC endpoint */
    private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
​
    /** The fix address of the ResourceManager */
    private final String resourceManagerAddress;
​
    /** The fix address of the Dispatcher */
    private final String dispatcherAddress;
​
    /** The fix address of the JobManager */
    private final String jobManagerAddress;
​
    private final String webMonitorAddress;
​
    /**
     * Creates a new services class for the fix pre-defined leaders.
     *
     * @param resourceManagerAddress    The fix address of the ResourceManager
     * @param webMonitorAddress
     */
    public StandaloneHaServices(
            String resourceManagerAddress,
            String dispatcherAddress,
            String jobManagerAddress,
            String webMonitorAddress) {
        this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
        this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
        this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
    }
​
    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------
​
    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }
​
    }
​
    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
        }
    }
​
    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderElectionService();
        }
    }
​
    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderElectionService();
        }
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
        }
    }
​
    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
        }
    }
​
    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderElectionService();
        }
    }
​
    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
        }
    }
​
    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();
​
            return new StandaloneLeaderElectionService();
        }
    }
​
}
  • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

小结

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HighAvailabilityServices
  • AbstractNonHaServices
  • EmbeddedHaServices
  • StandaloneHaServices
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档