专栏首页码匠的流水账聊聊flink的ListCheckpointed
原创

聊聊flink的ListCheckpointed

本文主要研究一下flink的ListCheckpointed

实例

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {
​
    /**  current offset for exactly once semantics */
    private Long offset;
​
    /** flag for job cancellation */
    private volatile boolean isRunning = true;
​
    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();
​
        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }
​
    @Override
    public void cancel() {
        isRunning = false;
    }
​
    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }
​
    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
  • CounterSource是一个有状态的RichParallelSourceFunction,它实现了ListCheckpointed接口,snapshotState方法返回了当前的offset,而restoreState方法则根据传入的state来恢复本地的offset;这里要注意,如果要在failure或者recovery的时候达到exactly-once的语义,这里更新offset的时候要使用SourceContext.getCheckpointLock来进行同步操作

ListCheckpointed

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java

@PublicEvolving
public interface ListCheckpointed<T extends Serializable> {
​
    /**
     * Gets the current state of the function. The state must reflect the result of all prior
     * invocations to this function.
     *
     * <p>The returned list should contain one entry for redistributable unit of state. See
     * the {@link ListCheckpointed class docs} for an illustration how list-style state
     * redistribution works.
     *
     * <p>As special case, the returned list may be null or empty (if the operator has no state)
     * or it may contain a single element (if the operator state is indivisible).
     *
     * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value.
     * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master.
     *
     * @return The operator state in a list of redistributable, atomic sub-states.
     *         Should not return null, but empty list instead.
     *
     * @throws Exception Thrown if the creation of the state object failed. This causes the
     *                   checkpoint to fail. The system may decide to fail the operation (and trigger
     *                   recovery), or to discard this checkpoint attempt and to continue running
     *                   and to try again with the next checkpoint attempt.
     */
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
​
    /**
     * Restores the state of the function or operator to that of a previous checkpoint.
     * This method is invoked when the function is executed after a failure recovery.
     * The state list may be empty if no state is to be recovered by the particular parallel instance
     * of the function.
     *
     * <p>The given state list will contain all the <i>sub states</i> that this parallel
     * instance of the function needs to handle. Refer to the  {@link ListCheckpointed class docs}
     * for an illustration how list-style state redistribution works.
     *
     * <p><b>Important:</b> When implementing this interface together with {@link RichFunction},
     * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}.
     *
     * @param state The state to be restored as a list of atomic sub-states.
     *
     * @throws Exception Throwing an exception in this method causes the recovery to fail.
     *                   The exact consequence depends on the configured failure handling strategy,
     *                   but typically the system will re-attempt the recovery, or try recovering
     *                   from a different checkpoint.
     */
    void restoreState(List<T> state) throws Exception;
}
  • ListCheckpointed定义了两个接口,一个是snapshotState方法,一个是restoreState方法
  • snapshotState方法,方法有个checkpointId参数,是唯一单调递增的数字,而timestamp则是master触发checkpoint的时间戳,该方法要返回当前的state(List结构)
  • restoreState方法会在failure recovery的时候被调用,传递的参数为List类型的state,方法里头可以将state恢复到本地

小结

  • stateful function可以通过CheckpointedFunction接口或者ListCheckpointed接口来使用managed operator state;对于manageed operator state,目前仅仅支持list-style的形式,即要求state是serializable objects的List结构,方便在rescale的时候进行redistributed;关于redistribution schemes的模式目前有两种,分别是Even-split redistribution(在restore/redistribution的时候每个operator仅仅得到整个state的sublist)及Union redistribution(在restore/redistribution的时候每个operator得到整个state的完整list)
  • ListCheckpointed是CheckpointedFunction的限制版,它只能支持Even-split redistribution模式的list-style state
  • ListCheckpointed定义了两个方法,分别是snapshotState方法及restoreState方法;snapshotState方法在master触发checkpoint的时候被调用,用户需要返回当前的状态,而restoreState方法会在failure recovery的时候被调用,传递的参数为List类型的state,方法里头可以将state恢复到本地

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink的ListCheckpointed

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/chec...

    codecraft
  • 聊聊flink的MemoryStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBacken...

    codecraft
  • 聊聊flink的MemoryStateBackend

    flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBacken...

    codecraft
  • 聊聊flink的ListCheckpointed

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/chec...

    codecraft
  • Pytest fixture参数化params

    unittest使用ddt来实现测试用例参数化、或parameterized实现测试用例参数化,pytest测试用例里面对应的参数可以用 parametrize...

    橙子探索测试
  • pytest文档42-fixture参数化params

    参数化是自动化测试里面必须掌握的一个知识点,用过 unittest 框架的小伙伴都知道使用 ddt 来实现测试用例的参数化。 pytest 测试用例里面对应的参...

    上海-悠悠
  • 使用虚拟dom和JavaScript构建完全响应式的UI框架

    最近我热衷于响应式编程,特别是在Mobx生态系统。我非常喜欢这个框架背后的思想:以透明的方式实现响应式。所以我问我自己…

    疯狂的技术宅
  • linux中ftp服务搭建需要注意的地方

    adduser -m -d /home/data/ftp -s /bin/sh -g root ftptest3

    砸漏
  • 互联网直播平台架构案例一

    直播平台整体架构 ? 视频直播链路 ? 视频流转换成不同清晰度 不同的端,不同的网络环境,需要不同码率,以保流畅 ? 播放器的基本实现 ? SDK在播放器上做层...

    用户1263954
  • LFS 8.0 正式发布:从零开始编译自己的 Linux 发行版

    去年的9月8日发布 LFS 7.10之后,就在昨天,LFS 8.0 终于面世了。LFS 的全称是 Linux From Scratch,就像它的名字一样,这个发...

    Debian社区

扫码关注云+社区

领取腾讯云代金券