前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的jobstore配置

聊聊flink的jobstore配置

原创
作者头像
code4it
发布2019-03-09 11:23:42
1.3K0
发布2019-03-09 11:23:42
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下flink的jobstore配置

JobManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

代码语言:javascript
复制
@PublicEvolving
public class JobManagerOptions {
    //......
​
    /**
     * The job store cache size in bytes which is used to keep completed
     * jobs in memory.
     */
    public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
        key("jobstore.cache-size")
        .defaultValue(50L * 1024L * 1024L)
        .withDescription("The job store cache size in bytes which is used to keep completed jobs in memory.");
​
    /**
     * The time in seconds after which a completed job expires and is purged from the job store.
     */
    public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
        key("jobstore.expiration-time")
        .defaultValue(60L * 60L)
        .withDescription("The time in seconds after which a completed job expires and is purged from the job store.");
​
    //......
}
  • jobstore.cache-size默认是50M;jobstore.expiration-time默认是1小时

SessionClusterEntrypoint

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java

代码语言:javascript
复制
public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
​
    public SessionClusterEntrypoint(Configuration configuration) {
        super(configuration);
    }
​
    @Override
    protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
            Configuration configuration,
            ScheduledExecutor scheduledExecutor) throws IOException {
        final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
​
        final Time expirationTime =  Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
        final long maximumCacheSizeBytes = configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
​
        return new FileArchivedExecutionGraphStore(
            tmpDir,
            expirationTime,
            maximumCacheSizeBytes,
            scheduledExecutor,
            Ticker.systemTicker());
    }
}
  • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore

FileArchivedExecutionGraphStore

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java

代码语言:javascript
复制
public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
​
    private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
​
    private final File storageDir;
​
    private final Cache<JobID, JobDetails> jobDetailsCache;
​
    private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
​
    private final ScheduledFuture<?> cleanupFuture;
​
    private final Thread shutdownHook;
​
    private int numFinishedJobs;
​
    private int numFailedJobs;
​
    private int numCanceledJobs;
​
    public FileArchivedExecutionGraphStore(
            File rootDir,
            Time expirationTime,
            long maximumCacheSizeBytes,
            ScheduledExecutor scheduledExecutor,
            Ticker ticker) throws IOException {
​
        final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
​
        LOG.info(
            "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
            FileArchivedExecutionGraphStore.class.getSimpleName(),
            storageDirectory,
            expirationTime.toMilliseconds(),
            maximumCacheSizeBytes);
​
        this.storageDir = Preconditions.checkNotNull(storageDirectory);
        Preconditions.checkArgument(
            storageDirectory.exists() && storageDirectory.isDirectory(),
            "The storage directory must exist and be a directory.");
        this.jobDetailsCache = CacheBuilder.newBuilder()
            .expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
            .removalListener(
                (RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
            .ticker(ticker)
            .build();
​
        this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
            .maximumWeight(maximumCacheSizeBytes)
            .weigher(this::calculateSize)
            .build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
                @Override
                public ArchivedExecutionGraph load(JobID jobId) throws Exception {
                    return loadExecutionGraph(jobId);
                }});
​
        this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
            jobDetailsCache::cleanUp,
            expirationTime.toMilliseconds(),
            expirationTime.toMilliseconds(),
            TimeUnit.MILLISECONDS);
​
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);
​
        this.numFinishedJobs = 0;
        this.numFailedJobs = 0;
        this.numCanceledJobs = 0;
    }
​
    @Override
    public int size() {
        return Math.toIntExact(jobDetailsCache.size());
    }
​
    @Override
    @Nullable
    public ArchivedExecutionGraph get(JobID jobId) {
        try {
            return archivedExecutionGraphCache.get(jobId);
        } catch (ExecutionException e) {
            LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
            return null;
        }
    }
​
    @Override
    public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final JobStatus jobStatus = archivedExecutionGraph.getState();
        final JobID jobId = archivedExecutionGraph.getJobID();
        final String jobName = archivedExecutionGraph.getJobName();
​
        Preconditions.checkArgument(
            jobStatus.isGloballyTerminalState(),
            "The job " + jobName + '(' + jobId +
                ") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
​
        switch (jobStatus) {
            case FINISHED:
                numFinishedJobs++;
                break;
            case CANCELED:
                numCanceledJobs++;
                break;
            case FAILED:
                numFailedJobs++;
                break;
            default:
                throw new IllegalStateException("The job " + jobName + '(' +
                    jobId + ") should have been in a globally terminal state. " +
                    "Instead it was in state " + jobStatus + '.');
        }
​
        // write the ArchivedExecutionGraph to disk
        storeArchivedExecutionGraph(archivedExecutionGraph);
​
        final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
​
        jobDetailsCache.put(jobId, detailsForJob);
        archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
    }
​
    @Override
    public JobsOverview getStoredJobsOverview() {
        return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
    }
​
    @Override
    public Collection<JobDetails> getAvailableJobDetails() {
        return jobDetailsCache.asMap().values();
    }
​
    @Nullable
    @Override
    public JobDetails getAvailableJobDetails(JobID jobId) {
        return jobDetailsCache.getIfPresent(jobId);
    }
​
    @Override
    public void close() throws IOException {
        cleanupFuture.cancel(false);
​
        jobDetailsCache.invalidateAll();
​
        // clean up the storage directory
        FileUtils.deleteFileOrDirectory(storageDir);
​
        // Remove shutdown hook to prevent resource leaks
        ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
    }
​
    // --------------------------------------------------------------
    // Internal methods
    // --------------------------------------------------------------
​
    private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
​
        if (archivedExecutionGraphFile.exists()) {
            return Math.toIntExact(archivedExecutionGraphFile.length());
        } else {
            LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
            return serializableExecutionGraph.getAllVertices().size() * 1000 +
                serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
        }
    }
​
    private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
​
        if (archivedExecutionGraphFile.exists()) {
            try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
                return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
            }
        } else {
            throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
                ". This indicates that the file either has been deleted or never written.");
        }
    }
​
    private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
        final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());
​
        try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
            InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
        }
    }
​
    private File getExecutionGraphFile(JobID jobId) {
        return new File(storageDir, jobId.toString());
    }
​
    private void deleteExecutionGraphFile(JobID jobId) {
        Preconditions.checkNotNull(jobId);
​
        final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
​
        try {
            FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
        } catch (IOException e) {
            LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
        }
​
        archivedExecutionGraphCache.invalidate(jobId);
        jobDetailsCache.invalidate(jobId);
    }
​
    private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
        final int maxAttempts = 10;
​
        for (int attempt = 0; attempt < maxAttempts; attempt++) {
            final File storageDirectory = new File(tmpDir, "executionGraphStore-" + UUID.randomUUID());
​
            if (storageDirectory.mkdir()) {
                return storageDirectory;
            }
        }
​
        throw new IOException("Could not create executionGraphStorage directory in " + tmpDir + '.');
    }
​
    // --------------------------------------------------------------
    // Testing methods
    // --------------------------------------------------------------
​
    @VisibleForTesting
    File getStorageDir() {
        return storageDir;
    }
​
    @VisibleForTesting
    LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
        return archivedExecutionGraphCache;
    }
}
  • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache
  • jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置
  • FileArchivedExecutionGraphStore还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

小结

  • flink的jobstore有两个配置,分别是jobstore.cache-size默认是50M,jobstore.expiration-time默认是1小时
  • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore
  • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache;jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置;它还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • JobManagerOptions
  • SessionClusterEntrypoint
  • FileArchivedExecutionGraphStore
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档