前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 状态生存时间(State TTL)设置

Flink 状态生存时间(State TTL)设置

作者头像
yiduwangkai
发布2021-10-15 15:22:02
2.2K0
发布2021-10-15 15:22:02
举报
文章被收录于专栏:大数据进阶大数据进阶

为什么状态需要被清理

  1. 状态不需要一次存储
  2. 状态有效期有时间限制,超过时间需要重置状态(业务上)

开启状态清理:

代码语言:javascript
复制
StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
        stateDescriptor.enableTimeToLive(ttlConfig)

ttl相关的配置

1.有效期

代码语言:javascript
复制
 /**
     * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
        /** TTL is disabled. State does not expire. */
        Disabled,
        /**
         * Last access timestamp is initialised when state is created and updated on every write
         * operation.
         */
        OnCreateAndWrite,
        /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
        OnReadAndWrite
    }

有三种

  1. 如果设置为 Disabled,则表明不更新时间戳,永远有效
  2. 如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳
  3. 如果设置为 OnReadAndWrite,在状态创建、写入、读取均会更新状态的时间戳

失效时间=上次访问的时间戳 + TTL > 超过了当前时间

2.状态可见性

代码语言:javascript
复制
/** This option configures whether expired user value can be returned or not. */
    public enum StateVisibility {
        /** Return expired user value if it is not cleaned up yet. */
        ReturnExpiredIfNotCleanedUp,
        /** Never return expired user value. */
        NeverReturnExpired
    }
  1. 如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;(即即使状态过期了,仍会把过期的状态返回给用户)
  2. 如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。(过期的状态不会返回给用户)

3.清理策略

代码语言:javascript
复制
  /** Fixed strategies ordinals in {@code strategies} config field. */
        enum Strategies {
            FULL_STATE_SCAN_SNAPSHOT,
            INCREMENTAL_CLEANUP,
            ROCKSDB_COMPACTION_FILTER
        }
  1. FULL_STATE_SCAN_SNAPSHOT:全量清理,不过对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题
  2. INCREMENTAL_CLEANUP:基于内存的增量清理
  3. ROCKSDB_COMPACTION_FILTER:基于rocksdb的增量清理

4.TTL作用域

代码语言:javascript
复制
/** This option configures time scale to use for ttl. */
    public enum TtlTimeCharacteristic {
        /**
         * Processing time, see also <code>
         * org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>.
         */
        ProcessingTime
    }

现在是针对的系统处理时间

demo

代码语言:javascript
复制
//自定义function
class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String>
        implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);

    @Nonnull private final StateTtlConfig ttlConfig;
    private final UpdateStat stat;

    private transient Map<String, State> states;
    private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;

    TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long reportStatAfterUpdatesNum) {
        this.ttlConfig = ttlConfig;
        this.stat = new UpdateStat(reportStatAfterUpdatesNum);
    }

    @Override
    public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Exception {
        for (TtlStateVerifier<?, ?> verifier : TtlStateVerifier.VERIFIERS) {
            TtlVerificationContext<?, ?> verificationContext =
                    generateUpdateAndVerificationContext(updates, verifier);
            if (!verifier.verify(verificationContext)) {
                // Please do **NOT** change the prefix, it's used in test_stream_state_ttl.sh for
                // test verifying
                out.collect("TTL verification failed: " + verificationContext.toString());
            }
        }
    }

    private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(
            TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception {

        List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId());
        Object update = updates.getUpdate(verifier.getId());
        TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update);
        stat.update(prevUpdates.size());
        prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
        return new TtlVerificationContext<>(
                updates.getKey(), verifier.getId(), prevUpdates, updateContext);
    }

    private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception {
        return StreamSupport.stream(
                        prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false)
                .collect(Collectors.toList());
    }

    private TtlUpdateContext<?, ?> performUpdate(TtlStateVerifier<?, ?> verifier, Object update)
            throws Exception {

        return MonotonicTTLTimeProvider.doWithFrozenTime(
                frozenTimestamp -> {
                    State state = states.get(verifier.getId());
                    Object valueBeforeUpdate = verifier.get(state);
                    verifier.update(state, update);
                    Object updatedValue = verifier.get(state);
                    return new TtlUpdateContext<>(
                            valueBeforeUpdate, update, updatedValue, frozenTimestamp);
                });
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) {}

    @Override
    public void initializeState(FunctionInitializationContext context) {
        states =
                TtlStateVerifier.VERIFIERS.stream()
                        .collect(
                                Collectors.toMap(
                                        TtlStateVerifier::getId,
                                        v -> v.createState(context, ttlConfig)));
        prevUpdatesByVerifierId =
                TtlStateVerifier.VERIFIERS.stream()
                        .collect(
                                Collectors.toMap(
                                        TtlStateVerifier::getId,
                                        v -> {
                                            checkNotNull(v);
                                            final TypeSerializer<ValueWithTs<?>> typeSerializer =
                                                    new ValueWithTs.Serializer(
                                                            v.getUpdateSerializer(),
                                                            LongSerializer.INSTANCE);

                                            ListStateDescriptor<ValueWithTs<?>> stateDesc =
                                                    new ListStateDescriptor<>(
                                                            "TtlPrevValueState_" + v.getId(),
                                                            typeSerializer);
                                            KeyedStateStore store = context.getKeyedStateStore();
                                            return store.getListState(stateDesc);
                                        }));
    }

    private static class UpdateStat implements Serializable {
        private static final long serialVersionUID = -4557720969995878873L;

        final long reportStatAfterUpdatesNum;
        long updates = 0;
        long prevUpdatesNum = 0;

        UpdateStat(long reportStatAfterUpdatesNum) {
            this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
        }

        void update(long prevUpdatesSize) {
            updates++;
            prevUpdatesNum += prevUpdatesSize;
            if (updates % reportStatAfterUpdatesNum == 0) {
                LOG.info(String.format("Avg update chain length: %d", prevUpdatesNum / updates));
            }
        }
    }
}
代码语言:javascript
复制
public class DataStreamStateTTLTestProgram {
    public static void main(String[] args) throws Exception {
        final ParameterTool pt = ParameterTool.fromArgs(args);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        setupEnvironment(env, pt);

        setBackendWithCustomTTLTimeProvider(env);

        TtlTestConfig config = TtlTestConfig.fromArgs(pt);
        StateTtlConfig ttlConfig =
                StateTtlConfig.newBuilder(config.ttl).cleanupFullSnapshot().build();

        env.addSource(
                        new TtlStateUpdateSource(
                                config.keySpace, config.sleepAfterElements, config.sleepTime))
                .name("TtlStateUpdateSource")
                .keyBy(TtlStateUpdate::getKey)
                .flatMap(new TtlVerifyUpdateFunction(ttlConfig, config.reportStatAfterUpdatesNum))
                .name("TtlVerifyUpdateFunction")
                .addSink(new PrintSinkFunction<>())
                .name("PrintFailedVerifications");

        env.execute("State TTL test job");
    }

    /**
     * Sets the state backend to a new {@link StubStateBackend} which has a {@link
     * MonotonicTTLTimeProvider}.
     *
     * @param env The {@link StreamExecutionEnvironment} of the job.
     */
    private static void setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
        final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider();

        final StateBackend configuredBackend = env.getStateBackend();
        final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider);
        env.setStateBackend(stubBackend);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档