聊聊flink的FsCheckpointStorage

本文主要研究一下flink的FsCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {
​
​
    boolean supportsHighlyAvailableStorage();
​
    boolean hasDefaultSavepointLocation();
​
    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;
​
    CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;
​
    CheckpointStorageLocation initializeLocationForSavepoint(
            long checkpointId,
            @Nullable String externalLocationPointer) throws IOException;
​
    CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException;
​
    CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
​
    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------
​
    /** The prefix of the directory containing the data exclusive to a checkpoint. */
    public static final String CHECKPOINT_DIR_PREFIX = "chk-";
​
    /** The name of the directory for shared checkpoint state. */
    public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";
​
    /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
    public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";
​
    /** The name of the metadata files in checkpoints / savepoints. */
    public static final String METADATA_FILE_NAME = "_metadata";
​
    /** The magic number that is put in front of any reference. */
    private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };
​
    // ------------------------------------------------------------------------
    //  Fields and properties
    // ------------------------------------------------------------------------
​
    /** The jobId, written into the generated savepoint directories. */
    private final JobID jobId;
​
    /** The default location for savepoints. Null, if none is configured. */
    @Nullable
    private final Path defaultSavepointDirectory;
​
    @Override
    public boolean hasDefaultSavepointLocation() {
        return defaultSavepointDirectory != null;
    }
​
    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
        return resolveCheckpointPointer(checkpointPointer);
    }
​
    /**
     * Creates a file system based storage location for a savepoint.
     *
     * <p>This methods implements the logic that decides which location to use (given optional
     * parameters for a configured location and a location passed for this specific savepoint)
     * and how to name and initialize the savepoint directory.
     *
     * @param externalLocationPointer    The target location pointer for the savepoint.
     *                                   Must be a valid URI. Null, if not supplied.
     * @param checkpointId               The checkpoint ID of the savepoint.
     *
     * @return The checkpoint storage location for the savepoint.
     *
     * @throws IOException Thrown if the target directory could not be created.
     */
    @Override
    public CheckpointStorageLocation initializeLocationForSavepoint(
            @SuppressWarnings("unused") long checkpointId,
            @Nullable String externalLocationPointer) throws IOException {
​
        // determine where to write the savepoint to
​
        final Path savepointBasePath;
        if (externalLocationPointer != null) {
            savepointBasePath = new Path(externalLocationPointer);
        }
        else if (defaultSavepointDirectory != null) {
            savepointBasePath = defaultSavepointDirectory;
        }
        else {
            throw new IllegalArgumentException("No savepoint location given and no default location configured.");
        }
​
        // generate the savepoint directory
​
        final FileSystem fs = savepointBasePath.getFileSystem();
        final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';
​
        Exception latestException = null;
        for (int attempt = 0; attempt < 10; attempt++) {
            final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));
​
            try {
                if (fs.mkdirs(path)) {
                    // we make the path qualified, to make it independent of default schemes and authorities
                    final Path qp = path.makeQualified(fs);
​
                    return createSavepointLocation(fs, qp);
                }
            } catch (Exception e) {
                latestException = e;
            }
        }
​
        throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
    }
​
    protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;
​
    //......
}
  • AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现

FsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
​
    private final FileSystem fileSystem;
​
    private final Path checkpointsDirectory;
​
    private final Path sharedStateDirectory;
​
    private final Path taskOwnedStateDirectory;
​
    private final int fileSizeThreshold;
​
    public FsCheckpointStorage(
            Path checkpointBaseDirectory,
            @Nullable Path defaultSavepointDirectory,
            JobID jobId,
            int fileSizeThreshold) throws IOException {
​
        this(checkpointBaseDirectory.getFileSystem(),
                checkpointBaseDirectory,
                defaultSavepointDirectory,
                jobId,
                fileSizeThreshold);
    }
​
    public FsCheckpointStorage(
            FileSystem fs,
            Path checkpointBaseDirectory,
            @Nullable Path defaultSavepointDirectory,
            JobID jobId,
            int fileSizeThreshold) throws IOException {
​
        super(jobId, defaultSavepointDirectory);
​
        checkArgument(fileSizeThreshold >= 0);
​
        this.fileSystem = checkNotNull(fs);
        this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
        this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
        this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
        this.fileSizeThreshold = fileSizeThreshold;
​
        // initialize the dedicated directories
        fileSystem.mkdirs(checkpointsDirectory);
        fileSystem.mkdirs(sharedStateDirectory);
        fileSystem.mkdirs(taskOwnedStateDirectory);
    }
​
    // ------------------------------------------------------------------------
​
    public Path getCheckpointsDirectory() {
        return checkpointsDirectory;
    }
​
    // ------------------------------------------------------------------------
    //  CheckpointStorage implementation
    // ------------------------------------------------------------------------
​
    @Override
    public boolean supportsHighlyAvailableStorage() {
        return true;
    }
​
    @Override
    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
        checkArgument(checkpointId >= 0);
​
        // prepare all the paths needed for the checkpoints
        final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
​
        // create the checkpoint exclusive directory
        fileSystem.mkdirs(checkpointDir);
​
        return new FsCheckpointStorageLocation(
                fileSystem,
                checkpointDir,
                sharedStateDirectory,
                taskOwnedStateDirectory,
                CheckpointStorageLocationReference.getDefault(),
                fileSizeThreshold);
    }
​
    @Override
    public CheckpointStreamFactory resolveCheckpointStorageLocation(
            long checkpointId,
            CheckpointStorageLocationReference reference) throws IOException {
​
        if (reference.isDefaultReference()) {
            // default reference, construct the default location for that particular checkpoint
            final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
​
            return new FsCheckpointStorageLocation(
                    fileSystem,
                    checkpointDir,
                    sharedStateDirectory,
                    taskOwnedStateDirectory,
                    reference,
                    fileSizeThreshold);
        }
        else {
            // location encoded in the reference
            final Path path = decodePathFromReference(reference);
​
            return new FsCheckpointStorageLocation(
                    path.getFileSystem(),
                    path,
                    path,
                    path,
                    reference,
                    fileSizeThreshold);
        }
    }
​
    @Override
    public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
        return new FsCheckpointStateOutputStream(
                taskOwnedStateDirectory,
                fileSystem,
                FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE,
                fileSizeThreshold);
    }
​
    @Override
    protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
        final CheckpointStorageLocationReference reference = encodePathAsReference(location);
        return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold);
    }
}
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是FsCheckpointStorageLocation
  • FsCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage这里直接返回true;initializeLocationForCheckpoint这里创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream创建的是FsCheckpointStateOutputStream

小结

  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • FsCheckpointStorage的supportsHighlyAvailableStorage方法直接返回true;initializeLocationForCheckpoint方法创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation方法创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream方法创建的是FsCheckpointStateOutputStream

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java、Spring、技术分享

深入分析Spring MVC中RequestBody与ResponseBody

  在SpringMVC中,可以使用@RequestBody和@ResponseBody两个注解,分别完成请求报文到对象和对象到响应报文的转换。在Sprin...

2571
来自专栏Java成神之路

Java_数据交换_JAXB_用法入门

JAXB(Java Architecture for XML Binding) 是一个业界的标准,是一项可以根据XML Schema产生Java类的技术。该过程...

1053
来自专栏码匠的流水账

聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

1973
来自专栏Lambda

常用Lambda表达式实例

集合操作 从集合中过滤出某一个字段存入到新集合 // 从商品集合中过滤出商品类目id为一个新 List<Integer>集合 List<Integer> c...

2138
来自专栏ml

HDU----(3294)Girls' research(manacher)

Girls' research Time Limit: 3000/1000 MS (Java/Others)    Memory Limit: 65535/3...

2805
来自专栏小樱的经验随笔

Gym 100952D&&2015 HIAST Collegiate Programming Contest D. Time to go back【杨辉三角预处理,组合数,dp】

D. Time to go back time limit per test:1 second memory limit per test:256 megaby...

2906
来自专栏算法修养

HDU 5877 2016大连网络赛 Weak Pair(树状数组,线段树,动态开点,启发式合并,可持久化线段树)

Weak Pair Time Limit: 4000/2000 MS (Java/Others)    Memory Limit: 262144/262144...

38310
来自专栏闻道于事

SSM框架整合项目 :投票系统

框架: Spring SpringMVC MyBatis 题目: 投票系统 ? 导包: 1, spring 2, MyBatis 3, mybatis-spri...

1K5
来自专栏Flutter入门到实战

gson解析assets目录下的json文件(给新手看的)

852
来自专栏雨过天晴

转 GO json解码和编码

1944

扫码关注云+社区

领取腾讯云代金券