前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的AbstractTtlState

聊聊flink的AbstractTtlState

作者头像
code4it
发布2019-01-02 14:37:31
4380
发布2019-01-02 14:37:31
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的AbstractTtlState

InternalKvState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java

代码语言:javascript
复制
/**
 * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
 * {@link State} being the root of the public API state hierarchy.
 * 
 * <p>The internal state classes give access to the namespace getters and setters and access to
 * additional functionality, like raw value access or state merging.
 * 
 * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
 * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
 * intended to be used by user applications. These internal methods are considered of limited use to users and
 * only confusing, and are usually not regarded as stable across releases.
 * 
 * <p>Each specific type in the internal state hierarchy extends the type from the public
 * state hierarchy:
 * 
 * <pre>
 *             State
 *               |
 *               +-------------------InternalKvState
 *               |                         |
 *          MergingState                   |
 *               |                         |
 *               +-----------------InternalMergingState
 *               |                         |
 *      +--------+------+                  |
 *      |               |                  |
 * ReducingState    ListState        +-----+-----------------+
 *      |               |            |                       |
 *      +-----------+   +-----------   -----------------InternalListState
 *                  |                |
 *                  +---------InternalReducingState
 * </pre>
 *
 * @param <K> The type of key the state is associated to
 * @param <N> The type of the namespace
 * @param <V> The type of values kept internally in state
 */
public interface InternalKvState<K, N, V> extends State {

    TypeSerializer<K> getKeySerializer();

    TypeSerializer<N> getNamespaceSerializer();

    TypeSerializer<V> getValueSerializer();

    void setCurrentNamespace(N namespace);

    byte[] getSerializedValue(
            final byte[] serializedKeyAndNamespace,
            final TypeSerializer<K> safeKeySerializer,
            final TypeSerializer<N> safeNamespaceSerializer,
            final TypeSerializer<V> safeValueSerializer) throws Exception;
}
  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue

AbstractTtlState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java

代码语言:javascript
复制
/**
 * Base class for TTL logic wrappers of state objects.
 *
 * @param <K> The type of key the state is associated to
 * @param <N> The type of the namespace
 * @param <SV> The type of values kept internally in state without TTL
 * @param <TTLSV> The type of values kept internally in state with TTL
 * @param <S> Type of originally wrapped state object
 */
abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
    extends AbstractTtlDecorator<S>
    implements InternalKvState<K, N, SV> {
    private final TypeSerializer<SV> valueSerializer;

    AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
        super(original, config, timeProvider);
        this.valueSerializer = valueSerializer;
    }

    <SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<T>, SE> getter,
        ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
        return getWithTtlCheckAndUpdate(getter, updater, original::clear);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return original.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return original.getNamespaceSerializer();
    }

    @Override
    public TypeSerializer<SV> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        original.setCurrentNamespace(namespace);
    }

    @Override
    public byte[] getSerializedValue(
        byte[] serializedKeyAndNamespace,
        TypeSerializer<K> safeKeySerializer,
        TypeSerializer<N> safeNamespaceSerializer,
        TypeSerializer<SV> safeValueSerializer) {
        throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
    }

    @Override
    public void clear() {
        original.clear();
    }
}
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑

AbstractTtlDecorator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java

代码语言:javascript
复制
/**
 * Base class for TTL logic wrappers.
 *
 * @param <T> Type of originally wrapped object
 */
abstract class AbstractTtlDecorator<T> {
    /** Wrapped original state handler. */
    final T original;

    final StateTtlConfig config;

    final TtlTimeProvider timeProvider;

    /** Whether to renew expiration timestamp on state read access. */
    final boolean updateTsOnRead;

    /** Whether to renew expiration timestamp on state read access. */
    final boolean returnExpired;

    /** State value time to live in milliseconds. */
    final long ttl;

    AbstractTtlDecorator(
        T original,
        StateTtlConfig config,
        TtlTimeProvider timeProvider) {
        Preconditions.checkNotNull(original);
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(timeProvider);
        this.original = original;
        this.config = config;
        this.timeProvider = timeProvider;
        this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite;
        this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
        this.ttl = config.getTtl().toMilliseconds();
    }

    <V> V getUnexpired(TtlValue<V> ttlValue) {
        return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    }

    <V> boolean expired(TtlValue<V> ttlValue) {
        return TtlUtils.expired(ttlValue, ttl, timeProvider);
    }

    <V> TtlValue<V> wrapWithTs(V value) {
        return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
    }

    <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
        return wrapWithTs(ttlValue.getUserValue());
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
        return ttlValue == null ? null : ttlValue.getUserValue();
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getter.get();
        if (ttlValue == null) {
            return null;
        } else if (expired(ttlValue)) {
            stateClear.run();
            if (!returnExpired) {
                return null;
            }
        } else if (updateTsOnRead) {
            updater.accept(rewrapWithNewTs(ttlValue));
        }
        return ttlValue;
    }
}
  • AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

TtlUtils.expired

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java

代码语言:javascript
复制
/** Common functions related to State TTL. */
class TtlUtils {
    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
        return expired(ttlValue, ttl, timeProvider.currentTimestamp());
    }

    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
        return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
    }

    private static boolean expired(long ts, long ttl, long currentTimestamp) {
        return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
    }

    private static long getExpirationTimestamp(long ts, long ttl) {
        long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
        return ts + ttlWithoutOverflow;
    }

    //......
}
  • TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围

ThrowingRunnable

flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java

代码语言:javascript
复制
/**
 * Similar to a {@link Runnable}, this interface is used to capture a block of code
 * to be executed. In contrast to {@code Runnable}, this interface allows throwing
 * checked exceptions.
 */
@PublicEvolving
@FunctionalInterface
public interface ThrowingRunnable<E extends Throwable> {

    /**
     * The work method.
     *
     * @throws E Exceptions may be thrown.
     */
    void run() throws E;

    /**
     * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
     * as unchecked.
     *
     * @param throwingRunnable to convert into a {@link Runnable}
     * @return {@link Runnable} which throws all checked exceptions as unchecked.
     */
    static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
        return () -> {
            try {
                throwingRunnable.run();
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        };
    }
}
  • stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable

小结

  • InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
  • AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
  • AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(TtlUtils.expired(ttlValue, ttl, timeProvider)),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue

doc

  • State Time-To-Live (TTL)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • InternalKvState
  • AbstractTtlState
  • AbstractTtlDecorator
    • TtlUtils.expired
      • ThrowingRunnable
      • 小结
      • doc
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档