序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor...; import org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder...(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility...,之后通过StateDescriptor的enableTimeToLive方法传递该config StateTtlConfig flink-core-1.7.0-sources.jar!.../org/apache/flink/api/common/state/StateTtlConfig.java /** * Configuration of state TTL logic.
序 本文主要研究一下flink的StateTtlConfig images (1).png 实例 import org.apache.flink.api.common.state.StateTtlConfig...org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; StateTtlConfig...ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...,之后通过StateDescriptor的enableTimeToLive方法传递该config StateTtlConfig flink-core-1.7.0-sources.jar!.../org/apache/flink/api/common/state/StateTtlConfig.java /** * Configuration of state TTL logic.
可以通过 StateTtlConfig 配置关闭后台清理: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig...该策略可以通过 StateTtlConfig 配置进行配置: import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time...; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot...import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig ttlConfig = StateTtlConfig...import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig ttlConfig = StateTtlConfig
在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。...可以通过 StateTtlConfig 配置关闭后台清理: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig...; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot...该特性可以通过 StateTtlConfig 进行配置: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig...该特性可以通过 StateTtlConfig 进行配置: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig
ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...TTL 刷新策略(默认OnCreateAndWrite) 策略类型 描述 StateTtlConfig.UpdateType.Disabled 禁用TTL,永不过期 StateTtlConfig.UpdateType.OnCreateAndWrite...可以在StateTtlConfig 中配置。...这个特性可以在StateTtlConfig中激活: import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlCon...import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder
ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25))...)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build...ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25))...) .cleanupInRocksdbCompactFilter(1000) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(
ttlConfig = StateTtlConfig .newBuilder(Time.minutes(1)) .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime...) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired...org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds...如果要在 DataStream 中使用该过期请策略,请参考如下所示代码: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig...如果要在 DataStream 中使用该过期请策略,请参考如下所示代码: import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig
; import org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder...当 StateTtlConfig 对象构造完成后,即可在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。...换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入 StateTtlConfig 对象。...StateTtlConfig 的参数说明 TTL:表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。...State TTL 的实现原理 为了了解 State TTL 功能在 Flink 代码的实现,同样可以从 StateTtlConfig 类开始顺藤摸瓜。
;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder...(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(...Update类型的配置有以下两种: StateTtlConfig.UpdateType.OnCreateAndWrite :创建和写入StateTtlConfig.UpdateType.OnReadAndWrite...StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp – 如果数据没被删除可以返回。...;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot()
为什么状态需要被清理 状态不需要一次存储 状态有效期有时间限制,超过时间需要重置状态(业务上) 开启状态清理: StateTtlConfig ttlConfig = StateTtlConfig....newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)...final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class); @Nonnull private final StateTtlConfig...ttlConfig = StateTtlConfig.newBuilder(config.ttl).cleanupFullSnapshot().build();
. */ @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; /** The default...will expire, become unavailable and be cleaned up in storage * depending on configured {@link StateTtlConfig...* * @param ttlConfig configuration of state TTL */ public void enableTimeToLive(StateTtlConfig...= StateTtlConfig.UpdateType.Disabled && queryableStateName == null, "Queryable...supported with TTL"); this.ttlConfig = ttlConfig; } @Nonnull @Internal public StateTtlConfig
ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor...必须明确启用完全快照的状态删除,如以下示例所示: StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7))...以下代码示例演示如何启用增量清理: StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) // check...ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .cleanupInRocksdbCompactFilter() .build
currentDay = ""; @Override public void open(Configuration parameters) throws Exception { StateTtlConfig...ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25))...//default,不支持 eventTime 1.12.0 .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime...) .cleanupInRocksdbCompactFilter(1000) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build
实现方法: 1、生成StateTtlConfig配置 2、将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive方法中即可 import org.apache.flink.api.common.state.StateTtlConfig...org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.time.Time val ttlConfig = StateTtlConfig....newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)....setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build val stateDescriptor
. */ @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; /** The default...will expire, become unavailable and be cleaned up in storage * depending on configured {@link StateTtlConfig...* * @param ttlConfig configuration of state TTL */ public void enableTimeToLive(StateTtlConfig...= StateTtlConfig.UpdateType.Disabled && queryableStateName == null, "Queryable...supported with TTL"); this.ttlConfig = ttlConfig; } @Nonnull @Internal public StateTtlConfig
, N, SV> { private final TypeSerializer valueSerializer; AbstractTtlState(S original, StateTtlConfig...AbstractTtlDecorator { /** Wrapped original state handler. */ final T original; final StateTtlConfig...live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig...; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite...; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
示例展示了如何创建状态 TTL 配置并将其提供给状态描述符,该描述符将用户的上次登录时间作为 Long 值保存: import org.apache.flink.api.common.state.StateTtlConfig...org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.state.ValueStateDescriptor; StateTtlConfig...ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite...) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor...必须明确启用完整快照上的状态驱逐,如以下示例所示: StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7))
实现方法: 1、生成StateTtlConfig配置 2、将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive方法中即可 import org.apache.flink.api.common.state.StateTtlConfig...org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.time.Time val ttlConfig = StateTtlConfig....newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)....setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build val stateDescriptor...ValueStateDescriptor[String]("text state", classOf[String]) stateDescriptor.enableTimeToLive(ttlConfig) StateTtlConfig
K, N, SV> { private final TypeSerializer valueSerializer; AbstractTtlState(S original, StateTtlConfig...AbstractTtlDecorator { /** Wrapped original state handler. */ final T original; final StateTtlConfig...live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig...; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite...; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
ttlConfig = StateTtlConfig .newBuilder(Time.seconds(10)) .setUpdateType...(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired...想要使用 TTL,我们需要首先构建一个 StateTtlConfig 配置对象;然后,可以通过传递配置在任何状态描述符中启用 TTL 功能。...StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(10)) .setUpdateType...(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired
领取专属 10元无门槛券
手把手带您无忧上云