前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的KvStateRegistryGateway

聊聊flink的KvStateRegistryGateway

原创
作者头像
code4it
发布2019-03-19 09:16:18
5400
发布2019-03-19 09:16:18
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的KvStateRegistryGateway

KvStateRegistryGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java

代码语言:javascript
复制
public interface KvStateRegistryGateway {
​
    /**
     * Notifies that queryable state has been registered.
     *
     * @param jobId identifying the job for which to register a key value state
     * @param jobVertexId JobVertexID the KvState instance belongs to.
     * @param keyGroupRange Key group range the KvState instance belongs to.
     * @param registrationName Name under which the KvState has been registered.
     * @param kvStateId ID of the registered KvState instance.
     * @param kvStateServerAddress Server address where to find the KvState instance.
     * @return Future acknowledge if the key-value state has been registered
     */
    CompletableFuture<Acknowledge> notifyKvStateRegistered(
        final JobID jobId,
        final JobVertexID jobVertexId,
        final KeyGroupRange keyGroupRange,
        final String registrationName,
        final KvStateID kvStateId,
        final InetSocketAddress kvStateServerAddress);
​
    /**
     * Notifies that queryable state has been unregistered.
     *
     * @param jobId identifying the job for which to unregister a key value state
     * @param jobVertexId JobVertexID the KvState instance belongs to.
     * @param keyGroupRange Key group index the KvState instance belongs to.
     * @param registrationName Name under which the KvState has been registered.
     * @return Future acknowledge if the key-value state has been unregistered
     */
    CompletableFuture<Acknowledge> notifyKvStateUnregistered(
        final JobID jobId,
        final JobVertexID jobVertexId,
        final KeyGroupRange keyGroupRange,
        final String registrationName);
}
  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法

JobMaster

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

代码语言:javascript
复制
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
​
    /** Default names for Flink's distributed components. */
    public static final String JOB_MANAGER_NAME = "jobmanager";
    public static final String ARCHIVE_NAME = "archive";
​
    // ------------------------------------------------------------------------
​
    private final JobMasterConfiguration jobMasterConfiguration;
​
    private final ResourceID resourceId;
​
    private final JobGraph jobGraph;
​
    private final Time rpcTimeout;
​
    private final HighAvailabilityServices highAvailabilityServices;
​
    private final BlobServer blobServer;
​
    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
​
    private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;
​
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
​
    private final ScheduledExecutorService scheduledExecutorService;
​
    private final OnCompletionActions jobCompletionActions;
​
    private final FatalErrorHandler fatalErrorHandler;
​
    private final ClassLoader userCodeLoader;
​
    private final SlotPool slotPool;
​
    private final SlotPoolGateway slotPoolGateway;
​
    private final RestartStrategy restartStrategy;
​
    // --------- BackPressure --------
​
    private final BackPressureStatsTracker backPressureStatsTracker;
​
    // --------- ResourceManager --------
​
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
​
    // --------- TaskManagers --------
​
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
​
    // -------- Mutable fields ---------
​
    private ExecutionGraph executionGraph;
​
    @Nullable
    private JobManagerJobStatusListener jobStatusListener;
​
    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;
​
    @Nullable
    private String lastInternalSavepoint;
​
    @Nullable
    private ResourceManagerAddress resourceManagerAddress;
​
    @Nullable
    private ResourceManagerConnection resourceManagerConnection;
​
    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
​
    //......
​
    @Override
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(
            final JobID jobId,
            final JobVertexID jobVertexId,
            final KeyGroupRange keyGroupRange,
            final String registrationName,
            final KvStateID kvStateId,
            final InetSocketAddress kvStateServerAddress) {
        if (jobGraph.getJobID().equals(jobId)) {
            if (log.isDebugEnabled()) {
                log.debug("Key value state registered for job {} under name {}.",
                    jobGraph.getJobID(), registrationName);
            }
​
            try {
                executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
                    jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
​
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Exception e) {
                log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
                return FutureUtils.completedExceptionally(e);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
        }
    }
​
    @Override
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
            JobID jobId,
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName) {
        if (jobGraph.getJobID().equals(jobId)) {
            if (log.isDebugEnabled()) {
                log.debug("Key value state unregistered for job {} under name {}.",
                    jobGraph.getJobID(), registrationName);
            }
​
            try {
                executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
                    jobVertexId, keyGroupRange, registrationName);
​
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Exception e) {
                log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);
                return FutureUtils.completedExceptionally(e);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
            }
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
        }
    }
​
    //......
}
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered

KvStateLocationRegistry

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java

代码语言:javascript
复制
public class KvStateLocationRegistry {
​
    /** JobID this coordinator belongs to. */
    private final JobID jobId;
​
    /** Job vertices for determining parallelism per key. */
    private final Map<JobVertexID, ExecutionJobVertex> jobVertices;
​
    /**
     * Location info keyed by registration name. The name needs to be unique
     * per JobID, i.e. two operators cannot register KvState with the same
     * name.
     */
    private final Map<String, KvStateLocation> lookupTable = new HashMap<>();
​
    /**
     * Creates the registry for the job.
     *
     * @param jobId       JobID this coordinator belongs to.
     * @param jobVertices Job vertices map of all vertices of this job.
     */
    public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
        this.jobId = Preconditions.checkNotNull(jobId, "JobID");
        this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");
    }
​
    /**
     * Returns the {@link KvStateLocation} for the registered KvState instance
     * or <code>null</code> if no location information is available.
     *
     * @param registrationName Name under which the KvState instance is registered.
     * @return Location information or <code>null</code>.
     */
    public KvStateLocation getKvStateLocation(String registrationName) {
        return lookupTable.get(registrationName);
    }
​
    /**
     * Notifies the registry about a registered KvState instance.
     *
     * @param jobVertexId JobVertexID the KvState instance belongs to
     * @param keyGroupRange Key group range the KvState instance belongs to
     * @param registrationName Name under which the KvState has been registered
     * @param kvStateId ID of the registered KvState instance
     * @param kvStateServerAddress Server address where to find the KvState instance
     *
     * @throws IllegalArgumentException If JobVertexID does not belong to job
     * @throws IllegalArgumentException If state has been registered with same
     * name by another operator.
     * @throws IndexOutOfBoundsException If key group index is out of bounds.
     */
    public void notifyKvStateRegistered(
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName,
            KvStateID kvStateId,
            InetSocketAddress kvStateServerAddress) {
​
        KvStateLocation location = lookupTable.get(registrationName);
​
        if (location == null) {
            // First registration for this operator, create the location info
            ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
​
            if (vertex != null) {
                int parallelism = vertex.getMaxParallelism();
                location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
                lookupTable.put(registrationName, location);
            } else {
                throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);
            }
        }
​
        // Duplicated name if vertex IDs don't match
        if (!location.getJobVertexId().equals(jobVertexId)) {
            IllegalStateException duplicate = new IllegalStateException(
                    "Registration name clash. KvState with name '" + registrationName +
                            "' has already been registered by another operator (" +
                            location.getJobVertexId() + ").");
​
            ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
            if (vertex != null) {
                vertex.fail(new SuppressRestartsException(duplicate));
            }
​
            throw duplicate;
        }
        location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
    }
​
    /**
     * Notifies the registry about an unregistered KvState instance.
     *
     * @param jobVertexId JobVertexID the KvState instance belongs to
     * @param keyGroupRange Key group index the KvState instance belongs to
     * @param registrationName Name under which the KvState has been registered
     * @throws IllegalArgumentException If another operator registered the state instance
     * @throws IllegalArgumentException If the registration name is not known
     */
    public void notifyKvStateUnregistered(
            JobVertexID jobVertexId,
            KeyGroupRange keyGroupRange,
            String registrationName) {
​
        KvStateLocation location = lookupTable.get(registrationName);
​
        if (location != null) {
            // Duplicate name if vertex IDs don't match
            if (!location.getJobVertexId().equals(jobVertexId)) {
                throw new IllegalArgumentException("Another operator (" +
                        location.getJobVertexId() + ") registered the KvState " +
                        "under '" + registrationName + "'.");
            }
​
            location.unregisterKvState(keyGroupRange);
​
            if (location.getNumRegisteredKeyGroups() == 0) {
                lookupTable.remove(registrationName);
            }
        } else {
            throw new IllegalArgumentException("Unknown registration name '" +
                    registrationName + "'. " + "Probably registration/unregistration race.");
        }
    }
​
}
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
  • notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
  • notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

小结

  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • KvStateRegistryGateway
  • JobMaster
  • KvStateLocationRegistry
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档