为什么状态需要被清理
开启状态清理:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig)
ttl相关的配置
1.有效期
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum UpdateType {
/** TTL is disabled. State does not expire. */
Disabled,
/**
* Last access timestamp is initialised when state is created and updated on every write
* operation.
*/
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
OnReadAndWrite
}
有三种
失效时间=上次访问的时间戳 + TTL > 超过了当前时间
2.状态可见性
/** This option configures whether expired user value can be returned or not. */
public enum StateVisibility {
/** Return expired user value if it is not cleaned up yet. */
ReturnExpiredIfNotCleanedUp,
/** Never return expired user value. */
NeverReturnExpired
}
3.清理策略
/** Fixed strategies ordinals in {@code strategies} config field. */
enum Strategies {
FULL_STATE_SCAN_SNAPSHOT,
INCREMENTAL_CLEANUP,
ROCKSDB_COMPACTION_FILTER
}
4.TTL作用域
/** This option configures time scale to use for ttl. */
public enum TtlTimeCharacteristic {
/**
* Processing time, see also <code>
* org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>.
*/
ProcessingTime
}
现在是针对的系统处理时间
demo
//自定义function
class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String>
implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
@Nonnull private final StateTtlConfig ttlConfig;
private final UpdateStat stat;
private transient Map<String, State> states;
private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;
TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long reportStatAfterUpdatesNum) {
this.ttlConfig = ttlConfig;
this.stat = new UpdateStat(reportStatAfterUpdatesNum);
}
@Override
public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Exception {
for (TtlStateVerifier<?, ?> verifier : TtlStateVerifier.VERIFIERS) {
TtlVerificationContext<?, ?> verificationContext =
generateUpdateAndVerificationContext(updates, verifier);
if (!verifier.verify(verificationContext)) {
// Please do **NOT** change the prefix, it's used in test_stream_state_ttl.sh for
// test verifying
out.collect("TTL verification failed: " + verificationContext.toString());
}
}
}
private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(
TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception {
List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId());
Object update = updates.getUpdate(verifier.getId());
TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update);
stat.update(prevUpdates.size());
prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
return new TtlVerificationContext<>(
updates.getKey(), verifier.getId(), prevUpdates, updateContext);
}
private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception {
return StreamSupport.stream(
prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false)
.collect(Collectors.toList());
}
private TtlUpdateContext<?, ?> performUpdate(TtlStateVerifier<?, ?> verifier, Object update)
throws Exception {
return MonotonicTTLTimeProvider.doWithFrozenTime(
frozenTimestamp -> {
State state = states.get(verifier.getId());
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
return new TtlUpdateContext<>(
valueBeforeUpdate, update, updatedValue, frozenTimestamp);
});
}
@Override
public void snapshotState(FunctionSnapshotContext context) {}
@Override
public void initializeState(FunctionInitializationContext context) {
states =
TtlStateVerifier.VERIFIERS.stream()
.collect(
Collectors.toMap(
TtlStateVerifier::getId,
v -> v.createState(context, ttlConfig)));
prevUpdatesByVerifierId =
TtlStateVerifier.VERIFIERS.stream()
.collect(
Collectors.toMap(
TtlStateVerifier::getId,
v -> {
checkNotNull(v);
final TypeSerializer<ValueWithTs<?>> typeSerializer =
new ValueWithTs.Serializer(
v.getUpdateSerializer(),
LongSerializer.INSTANCE);
ListStateDescriptor<ValueWithTs<?>> stateDesc =
new ListStateDescriptor<>(
"TtlPrevValueState_" + v.getId(),
typeSerializer);
KeyedStateStore store = context.getKeyedStateStore();
return store.getListState(stateDesc);
}));
}
private static class UpdateStat implements Serializable {
private static final long serialVersionUID = -4557720969995878873L;
final long reportStatAfterUpdatesNum;
long updates = 0;
long prevUpdatesNum = 0;
UpdateStat(long reportStatAfterUpdatesNum) {
this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
}
void update(long prevUpdatesSize) {
updates++;
prevUpdatesNum += prevUpdatesSize;
if (updates % reportStatAfterUpdatesNum == 0) {
LOG.info(String.format("Avg update chain length: %d", prevUpdatesNum / updates));
}
}
}
}
public class DataStreamStateTTLTestProgram {
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
setBackendWithCustomTTLTimeProvider(env);
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(config.ttl).cleanupFullSnapshot().build();
env.addSource(
new TtlStateUpdateSource(
config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
/**
* Sets the state backend to a new {@link StubStateBackend} which has a {@link
* MonotonicTTLTimeProvider}.
*
* @param env The {@link StreamExecutionEnvironment} of the job.
*/
private static void setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider();
final StateBackend configuredBackend = env.getStateBackend();
final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider);
env.setStateBackend(stubBackend);
}
}