专栏首页码匠的流水账聊聊flink的MemCheckpointStreamFactory
原创

聊聊flink的MemCheckpointStreamFactory

本文主要研究一下flink的MemCheckpointStreamFactory

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.java

/**
 * A factory for checkpoint output streams, which are used to persist data for checkpoints.
 *
 * <p>Stream factories can be created from the {@link CheckpointStorage} through
 * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
 */
public interface CheckpointStreamFactory {
​
    CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;
​
    abstract class CheckpointStateOutputStream extends FSDataOutputStream {
​
        @Nullable
        public abstract StreamStateHandle closeAndGetHandle() throws IOException;
​
        @Override
        public abstract void close() throws IOException;
    }
}
  • CheckpointStreamFactory为checkpoint output streams(用于持久化checkpoint的数据)的工厂,它定义了createCheckpointStateOutputStream方法,这里返回的是CheckpointStateOutputStream;CheckpointStateOutputStream继承了FSDataOutputStream,它定义了closeAndGetHandle及close两个抽象方法
  • CheckpointStreamFactory有两个以factory命名的实现类,分别是MemCheckpointStreamFactory(它有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一个子类为FsCheckpointStorageLocation)
  • CheckpointStorageLocation接口继承了CheckpointStreamFactory接口,它有三个实现类,分别是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.java

@Public
public abstract class FSDataOutputStream extends OutputStream {
​
    public abstract long getPos() throws IOException;
​
    public abstract void flush() throws IOException;
​
    public abstract void sync() throws IOException;
​
    public abstract void close() throws IOException;
}
  • FSDataOutputStream继承了java的OutputStream,它多定义了getPos、flush、sync、close几个抽象方法

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.java

/**
 * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
 * and lifecycle/cleanup methods.
 *
 * <p>CheckpointStorageLocations are typically created and initialized via
 * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
 * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
 */
public interface CheckpointStorageLocation extends CheckpointStreamFactory {
​
    CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;
​
    void disposeOnFailure() throws IOException;
​
    CheckpointStorageLocationReference getLocationReference();
}
  • CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法;这里定义了createMetadataOutputStream方法用来创建CheckpointMetadataOutputStream;disposeOnFailure方法用于在checkpoint失败的时候dispose checkpoint location;getLocationReference用于返回CheckpointStorageLocationReference

MemCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java

/**
 * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
 */
public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
​
    /** The maximal size that the snapshotted memory state may have */
    private final int maxStateSize;
​
    /**
     * Creates a new in-memory stream factory that accepts states whose serialized forms are
     * up to the given number of bytes.
     *
     * @param maxStateSize The maximal size of the serialized state
     */
    public MemCheckpointStreamFactory(int maxStateSize) {
        this.maxStateSize = maxStateSize;
    }
​
    @Override
    public CheckpointStateOutputStream createCheckpointStateOutputStream(
            CheckpointedStateScope scope) throws IOException
    {
        return new MemoryCheckpointOutputStream(maxStateSize);
    }
​
    @Override
    public String toString() {
        return "In-Memory Stream Factory";
    }
​
    static void checkSize(int size, int maxSize) throws IOException {
        if (size > maxSize) {
            throw new IOException(
                    "Size of the state is larger than the maximum permitted memory-backed state. Size="
                            + size + " , maxSize=" + maxSize
                            + " . Consider using a different state backend, like the File System State backend.");
        }
    }
​
​
​
    /**
     * A {@code CheckpointStateOutputStream} that writes into a byte array.
     */
    public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
​
        private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
​
        private final int maxSize;
​
        private AtomicBoolean closed;
​
        boolean isEmpty = true;
​
        public MemoryCheckpointOutputStream(int maxSize) {
            this.maxSize = maxSize;
            this.closed = new AtomicBoolean(false);
        }
​
        @Override
        public void write(int b) throws IOException {
            os.write(b);
            isEmpty = false;
        }
​
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            os.write(b, off, len);
            isEmpty = false;
        }
​
        @Override
        public void flush() throws IOException {
            os.flush();
        }
​
        @Override
        public void sync() throws IOException { }
​
        // --------------------------------------------------------------------
​
        @Override
        public void close() {
            if (closed.compareAndSet(false, true)) {
                closeInternal();
            }
        }
​
        @Nullable
        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
            if (isEmpty) {
                return null;
            }
            return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes());
        }
​
        @Override
        public long getPos() throws IOException {
            return os.getPosition();
        }
​
        public boolean isClosed() {
            return closed.get();
        }
​
        /**
         * Closes the stream and returns the byte array containing the stream's data.
         * @return The byte array containing the stream's data.
         * @throws IOException Thrown if the size of the data exceeds the maximal
         */
        public byte[] closeAndGetBytes() throws IOException {
            if (closed.compareAndSet(false, true)) {
                checkSize(os.size(), maxSize);
                byte[] bytes = os.toByteArray();
                closeInternal();
                return bytes;
            } else {
                throw new IOException("stream has already been closed");
            }
        }
​
        private void closeInternal() {
            os.reset();
        }
    }
}
  • MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,这里createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream
  • MemoryCheckpointOutputStream继承了CheckpointStateOutputStream,里头使用了ByteArrayOutputStreamWithPos,它在closeAndGetHandle的时候会校验大小是否超过maxSize的限制,超出则抛出IOException异常
  • MemCheckpointStreamFactory有两个子类分别为NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它们都实现了CheckpointStorageLocation接口

NonPersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
 * for metadata has been configured.
 */
public class NonPersistentMetadataCheckpointStorageLocation
        extends MemCheckpointStreamFactory
        implements CheckpointStorageLocation {
​
    /** The external pointer returned for checkpoints that are not externally addressable. */
    public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>";
​
    public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) {
        super(maxStateSize);
    }
​
    @Override
    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
        return new MetadataOutputStream();
    }
​
    @Override
    public void disposeOnFailure() {}
​
    @Override
    public CheckpointStorageLocationReference getLocationReference() {
        return CheckpointStorageLocationReference.getDefault();
    }
​
    // ------------------------------------------------------------------------
    //  CompletedCheckpointStorageLocation
    // ------------------------------------------------------------------------
​
    /**
     * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the
     * metadata in an internal byte array.
     */
    private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation {
​
        private static final long serialVersionUID = 1L;
​
        private final ByteStreamStateHandle metaDataHandle;
​
        NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) {
            this.metaDataHandle = metaDataHandle;
        }
​
        @Override
        public String getExternalPointer() {
            return EXTERNAL_POINTER;
        }
​
        @Override
        public StreamStateHandle getMetadataHandle() {
            return metaDataHandle;
        }
​
        @Override
        public void disposeStorageLocation() {}
    }
​
    // ------------------------------------------------------------------------
    //  CheckpointMetadataOutputStream
    // ------------------------------------------------------------------------
​
    private static class MetadataOutputStream extends CheckpointMetadataOutputStream {
​
        private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
​
        private boolean closed;
​
        @Override
        public void write(int b) throws IOException {
            os.write(b);
        }
​
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            os.write(b, off, len);
        }
​
        @Override
        public void flush() throws IOException {
            os.flush();
        }
​
        @Override
        public long getPos() throws IOException {
            return os.getPosition();
        }
​
        @Override
        public void sync() throws IOException { }
​
        @Override
        public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
            synchronized (this) {
                if (!closed) {
                    closed = true;
​
                    byte[] bytes = os.toByteArray();
                    ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes);
                    return new NonPersistentCompletedCheckpointStorageLocation(handle);
                } else {
                    throw new IOException("Already closed");
                }
            }
        }
​
        @Override
        public void close() {
            if (!closed) {
                closed = true;
                os.reset();
            }
        }
    }
}
  • MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是MetadataOutputStream
  • MetadataOutputStream继承了CheckpointMetadataOutputStream,里头使用的是ByteArrayOutputStreamWithPos,而closeAndFinalizeCheckpoint返回的是NonPersistentCompletedCheckpointStorageLocation
  • NonPersistentCompletedCheckpointStorageLocation实现了CompletedCheckpointStorageLocation接口,其getMetadataHandle方法返回的是ByteStreamStateHandle

PersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} when it durably
 * persists the metadata in a file system.
 */
public class PersistentMetadataCheckpointStorageLocation
        extends MemCheckpointStreamFactory
        implements CheckpointStorageLocation {
​
    private final FileSystem fileSystem;
​
    private final Path checkpointDirectory;
​
    private final Path metadataFilePath;
​
    /**
     * Creates a checkpoint storage persists metadata to a file system and stores state
     * in line in state handles with the metadata.
     *
     * @param fileSystem The file system to which the metadata will be written.
     * @param checkpointDir The directory where the checkpoint metadata will be written.
     */
    public PersistentMetadataCheckpointStorageLocation(
            FileSystem fileSystem,
            Path checkpointDir,
            int maxStateSize) {
​
        super(maxStateSize);
​
        this.fileSystem = checkNotNull(fileSystem);
        this.checkpointDirectory = checkNotNull(checkpointDir);
        this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
    }
​
    // ------------------------------------------------------------------------
​
    @Override
    public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
        return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
    }
​
    @Override
    public void disposeOnFailure() throws IOException {
        // on a failure, no chunk in the checkpoint directory needs to be saved, so
        // we can drop it as a whole
        fileSystem.delete(checkpointDirectory, true);
    }
​
    @Override
    public CheckpointStorageLocationReference getLocationReference() {
        return CheckpointStorageLocationReference.getDefault();
    }
}
  • MemoryBackendCheckpointStorage在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法创建的是FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream的构造器接收三个参数,分别是fileSystem、metadataFilePath、exclusiveCheckpointDir;其中fileSystem用于根据metadataFilePath来创建FSDataOutputStream,而exclusiveCheckpointDir则在返回FsCompletedCheckpointStorageLocation的时候用到

小结

  • MemoryBackendCheckpointStorage在没有配置checkpointsDirectory的时候创建的是NonPersistentMetadataCheckpointStorageLocation;在配置了checkpointsDirectory的时候创建的是PersistentMetadataCheckpointStorageLocation
  • NonPersistentMetadataCheckpointStorageLocation及PersistentMetadataCheckpointStorageLocation都继承了MemCheckpointStreamFactory类,同时实现了CheckpointStorageLocation接口(其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream类型分别为MetadataOutputStream、FsCheckpointMetadataOutputStream)
  • MemCheckpointStreamFactory实现了CheckpointStreamFactory接口,它的createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream;CheckpointStorageLocation继承了CheckpointStreamFactory接口,它通常是由CheckpointStorage来创建及初始化,提供数据持久化、metadata存储及lifecycle/cleanup相关方法

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink的MemCheckpointStreamFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointS...

    codecraft
  • 聊聊skywalking的async-annotation-plugin

    本文主要研究一下skywalking的async-annotation-plugin

    codecraft
  • 聊聊skywalking的async-annotation-plugin

    本文主要研究一下skywalking的async-annotation-plugin

    codecraft
  • 聊聊flink的MemCheckpointStreamFactory

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointS...

    codecraft
  • Ugly Number

    Tyan
  • 16个你绝不知道的Python神奇技能

    显示有限的接口到外部 当发布python第三方package时, 并不希望代码中所有的函数或者class可以被外部import, 在__init__.py中添加...

    CDA数据分析师
  • win10下配置Tensorflow1.4

    今天碰巧帮人配了一台win10 + CUDA8.0 + CUDNN V6.1 + tf 1.4,特此记录 配置要求 window版本的tf是用不成python ...

    GavinZhou
  • Golang语言下使用Protocol Buffer教程

    Protobuf是Google旗下的一款平台无关,语言无关,可扩展的序列化结构数据格式。所以很适合用做数据存储和作为不同应用,不同语言之间相互通信的数据交换格式...

    Zoctopus
  • 【Git 第11课】 远程仓库

    之前讲了很多课,都是在本地仓库里进行操作。而 Git 的作用一方面在于对文件进行版本控制,更重要的是便于多人协作开发。因此只有本地仓库的 Git 算不上真正的 ...

    Crossin先生
  • 敢死队

    这里牵涉一个是否要考虑按照顺序的问题。 如 132 和 123 表示的是一种情况,对于这种情况要进行排除 我的思路是 集中从小到大排序。

    用户4492257

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动