前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的StateTtlConfig

聊聊flink的StateTtlConfig

作者头像
code4it
发布2018-12-28 11:55:25
1.7K0
发布2018-12-28 11:55:25
举报

本文主要研究一下flink的StateTtlConfig

实例

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • 这里利用builder创建StateTtlConfig,之后通过StateDescriptor的enableTimeToLive方法传递该config

StateTtlConfig

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.java

/**
 * Configuration of state TTL logic.
 *
 * <p>Note: The map state with TTL currently supports {@code null} user values
 * only if the user value serializer can handle {@code null} values.
 * If the serializer does not support {@code null} values,
 * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
 * at the cost of an extra byte in the serialized form.
 */
public class StateTtlConfig implements Serializable {

    private static final long serialVersionUID = -7592693245044289793L;

    public static final StateTtlConfig DISABLED =
        newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();

    /**
     * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
        /** TTL is disabled. State does not expire. */
        Disabled,
        /** Last access timestamp is initialised when state is created and updated on every write operation. */
        OnCreateAndWrite,
        /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
        OnReadAndWrite
    }

    /**
     * This option configures whether expired user value can be returned or not.
     */
    public enum StateVisibility {
        /** Return expired user value if it is not cleaned up yet. */
        ReturnExpiredIfNotCleanedUp,
        /** Never return expired user value. */
        NeverReturnExpired
    }

    /**
     * This option configures time scale to use for ttl.
     */
    public enum TimeCharacteristic {
        /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
        ProcessingTime
    }

    private final UpdateType updateType;
    private final StateVisibility stateVisibility;
    private final TimeCharacteristic timeCharacteristic;
    private final Time ttl;
    private final CleanupStrategies cleanupStrategies;

    private StateTtlConfig(
        UpdateType updateType,
        StateVisibility stateVisibility,
        TimeCharacteristic timeCharacteristic,
        Time ttl,
        CleanupStrategies cleanupStrategies) {
        this.updateType = Preconditions.checkNotNull(updateType);
        this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
        this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
        this.ttl = Preconditions.checkNotNull(ttl);
        this.cleanupStrategies = cleanupStrategies;
        Preconditions.checkArgument(ttl.toMilliseconds() > 0,
            "TTL is expected to be positive");
    }

    @Nonnull
    public UpdateType getUpdateType() {
        return updateType;
    }

    @Nonnull
    public StateVisibility getStateVisibility() {
        return stateVisibility;
    }

    @Nonnull
    public Time getTtl() {
        return ttl;
    }

    @Nonnull
    public TimeCharacteristic getTimeCharacteristic() {
        return timeCharacteristic;
    }

    public boolean isEnabled() {
        return updateType != UpdateType.Disabled;
    }

    @Nonnull
    public CleanupStrategies getCleanupStrategies() {
        return cleanupStrategies;
    }

    @Override
    public String toString() {
        return "StateTtlConfig{" +
            "updateType=" + updateType +
            ", stateVisibility=" + stateVisibility +
            ", timeCharacteristic=" + timeCharacteristic +
            ", ttl=" + ttl +
            '}';
    }

    @Nonnull
    public static Builder newBuilder(@Nonnull Time ttl) {
        return new Builder(ttl);
    }

    /**
     * Builder for the {@link StateTtlConfig}.
     */
    public static class Builder {

        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TimeCharacteristic timeCharacteristic = ProcessingTime;
        private Time ttl;
        private CleanupStrategies cleanupStrategies = new CleanupStrategies();

        public Builder(@Nonnull Time ttl) {
            this.ttl = ttl;
        }

        /**
         * Sets the ttl update type.
         *
         * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
         */
        @Nonnull
        public Builder setUpdateType(UpdateType updateType) {
            this.updateType = updateType;
            return this;
        }

        @Nonnull
        public Builder updateTtlOnCreateAndWrite() {
            return setUpdateType(UpdateType.OnCreateAndWrite);
        }

        @Nonnull
        public Builder updateTtlOnReadAndWrite() {
            return setUpdateType(UpdateType.OnReadAndWrite);
        }

        /**
         * Sets the state visibility.
         *
         * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
         */
        @Nonnull
        public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
            this.stateVisibility = stateVisibility;
            return this;
        }

        @Nonnull
        public Builder returnExpiredIfNotCleanedUp() {
            return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
        }

        @Nonnull
        public Builder neverReturnExpired() {
            return setStateVisibility(StateVisibility.NeverReturnExpired);
        }

        /**
         * Sets the time characteristic.
         *
         * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
         */
        @Nonnull
        public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
            this.timeCharacteristic = timeCharacteristic;
            return this;
        }

        @Nonnull
        public Builder useProcessingTime() {
            return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        }

        /** Cleanup expired state in full snapshot on checkpoint. */
        @Nonnull
        public Builder cleanupFullSnapshot() {
            cleanupStrategies.strategies.put(
                CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
                new CleanupStrategies.CleanupStrategy() {  });
            return this;
        }

        /**
         * Sets the ttl time.
         * @param ttl The ttl time.
         */
        @Nonnull
        public Builder setTtl(@Nonnull Time ttl) {
            this.ttl = ttl;
            return this;
        }

        @Nonnull
        public StateTtlConfig build() {
            return new StateTtlConfig(
                updateType,
                stateVisibility,
                timeCharacteristic,
                ttl,
                cleanupStrategies);
        }
    }

    /**
     * TTL cleanup strategies.
     *
     * <p>This class configures when to cleanup expired state with TTL.
     * By default, state is always cleaned up on explicit read access if found expired.
     * Currently cleanup of state full snapshot can be additionally activated.
     */
    public static class CleanupStrategies implements Serializable {
        private static final long serialVersionUID = -1617740467277313524L;

        /** Fixed strategies ordinals in {@code strategies} config field. */
        enum Strategies {
            FULL_STATE_SCAN_SNAPSHOT
        }

        /** Base interface for cleanup strategies configurations. */
        interface CleanupStrategy extends Serializable {

        }

        final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

        public boolean inFullSnapshot() {
            return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
        }
    }
}
  • StateTtlConfig用于设置state的TTL属性,这里定义了三个枚举,分别是UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig定义了CleanupStrategies,即TTL state的清理策略,默认在读取到expired的state时会进行清理,目前还额外提供在FULL_STATE_SCAN_SNAPSHOT的时候进行清理(在checkpoint时清理full snapshot中的expired state)的选项
  • StateTtlConfig还提供了一个Builder,用于快速设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java

    /**
     * @see KeyedStateBackend
     */
    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.java

/**
 * This state factory wraps state objects, produced by backends, with TTL logic.
 */
public class TtlStateFactory<N, SV, S extends State, IS extends S> {
    public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(originalStateFactory);
        Preconditions.checkNotNull(timeProvider);
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<N, SV, S, IS>(
                namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
                .createState() :
            originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
    }

    private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;

    private final TypeSerializer<N> namespaceSerializer;
    private final StateDescriptor<S, SV> stateDesc;
    private final KeyedStateFactory originalStateFactory;
    private final StateTtlConfig ttlConfig;
    private final TtlTimeProvider timeProvider;
    private final long ttl;

    private TtlStateFactory(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) {
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.originalStateFactory = originalStateFactory;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = ttlConfig.getTtl().toMilliseconds();
        this.stateFactories = createStateFactories();
    }

    private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(
            Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
            Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
            Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
            Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
            Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
            Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
        ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.get();
    }

    @SuppressWarnings("unchecked")
    private IS createValueState() throws Exception {
        ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlValueState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <T> IS createListState() throws Exception {
        ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
        ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
        return (IS) new TtlListState<>(
            originalStateFactory.createInternalState(
                namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, listStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
        MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
            stateDesc.getName(),
            mapStateDesc.getKeySerializer(),
            new TtlSerializer<>(mapStateDesc.getValueSerializer()));
        return (IS) new TtlMapState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, mapStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private IS createReducingState() throws Exception {
        ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
        ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
            stateDesc.getName(),
            new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlReducingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
            (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
        TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
            aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
        AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
            stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlAggregatingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    }

    @SuppressWarnings({"deprecation", "unchecked"})
    private <T> IS createFoldingState() throws Exception {
        FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
        SV initAcc = stateDesc.getDefaultValue();
        TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
        FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
            stateDesc.getName(),
            ttlInitAcc,
            new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlFoldingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    //......
}
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建state,如果开启ttl则调用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),否则调用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
  • 这里createStateFactories创建了不同类型的StateDescriptor对应创建方法的map,在createState的时候,根据指定类型自动调用对应的SupplierWithException,省去if else的判断
  • ValueStateDescriptor对应createValueState方法,创建的是TtlValueState;ListStateDescriptor对应createListState方法,创建的是TtlListState;MapStateDescriptor对应createMapState方法,创建的是TtlMapState;ReducingStateDescriptor对应createReducingState方法,创建的是TtlReducingState;AggregatingStateDescriptor对应createAggregatingState方法,创建的是TtlAggregatingState;FoldingStateDescriptor对应createFoldingState方法,创建的是TtlFoldingState

小结

  • StateTtlConfig用于设置state的TTL属性,这里主要设置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies这几个属性
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法里头使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled来创建InternalKvState
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法这里会根据stateDesc.getTtlConfig().isEnabled()来创建对应的state;TtlStateFactory的createState会根据不同类型的StateDescriptor创建对应类型的ttl state

doc

  • State Time-To-Live (TTL)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • StateTtlConfig
  • AbstractKeyedStateBackend
  • TtlStateFactory
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档