前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink动态CEP之规则停用以及任务恢复

Flink动态CEP之规则停用以及任务恢复

原创
作者头像
sixjo
发布2023-05-26 22:21:23
1K0
发布2023-05-26 22:21:23
举报
文章被收录于专栏:Flink动态CEP

前面已经实现了Flink动态CEP增、改的功能,既然可以增、改,那必然少不了删,但是有了删,必然可能会出现在用规则数少于原定的最少规则数,针对与这种情况,任务是应该要正常运行的,故将规则停用和任务恢复放在一块儿了

原理讲解

规则停用:

根据规则id清除对应规则状态以及其他所有相关内容

任务恢复:

增加runningCountState,记录运行中的规则数

当任务无状态启动时候,runningCountState为空,待规则初始化达到最小规则数以后,任务开始正常处理数据,canProcessElements1置为true,更新运行中的规则数,此时,任务开始正常处理事件流数据,任务正常处理时间流数据后,当规则发生变化,更新runningCountState中的运行的规则数

当任务有状态重启时候,在initializeState方法中初始化状态,包括规则配置、runningCountState、未处理的事件,再通过状态中保存的规则去初始化每个规则对应的computationStates,elementQueueState,partialMatches等CEP相关的状态以及根据runningCountState判断任务是否可以处理事件

之后,会在open方法中根据从checkpoint/savepoint中获取到的规则配置来初始化状态机

代码语言:java
复制
@Override
public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    this.initializationContext = context;
    this.beforePatternInitOverElements = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            BEFORE_PATTERN_INIT_OVER_EVENT_QUEUE_STATE_NAME,
            TypeInformation.<StreamRecord<IN>>of(new TypeHint<StreamRecord<IN>>() {
            })));
    this.dynamicConfigurationListState =
            context.getOperatorStateStore().getListState(
                    new ListStateDescriptor<DynamicConfiguration>(DYNAMIC_CONFIGURATION_LIST_STATE_NAME,DynamicConfiguration.class)
                    );
    List<DynamicConfiguration> dynamicConfigurations = (List<DynamicConfiguration>) dynamicConfigurationListState.get();
    for (DynamicConfiguration dynamicConfiguration : dynamicConfigurations) {
        initCEPState(dynamicConfiguration,dynamicConfigurations.size());
    }
    this.runningCountState =
            context.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>(RUNNING_COUNT_STATE_NAME,Integer.class));
    List<Integer> runningCount = (List<Integer>) runningCountState.get();
    if(CollectionUtils.isNotEmpty(runningCount)){
        canProcessElements1 = Boolean.TRUE;
    }
}

测试代码

代码语言:java
复制
env.enableCheckpointing(60000L);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
    @Override
    public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
        String[] split = s.split(",");
        if (split.length == 3) {
            Long timestamp = Long.valueOf(split[2]);
            EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
            collector.collect(event);
        }
    }
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
        WatermarkStrategy
                .<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
        if(null != dynamicConfiguration){
            collector.collect(dynamicConfiguration);
        }
    } catch (Exception e) {
    }
}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
        DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
                .minPatternInitialized(3)
                .objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();

测试过程

1. 数据准备

由于上方代码中配置的最小初始化3个规则之后才,故我们先初始化两个规则,规则1和规则10086,以及输入一些事件,此时并未有任何匹配输出,因为尚未初始化完成,故并未开始计算

代码语言:shell
复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}
{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}
代码语言:shell
复制
nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2. 从规则未初始化完成的checkpoint恢复任务

待上述数据准备完毕以及完成checkpoint后停止任务,从checkpoint启动,通过debug,我们会发现前面准备的数据已经从checkpoint恢复

代码语言:java
复制
Configuration configuration = new Configuration();
configuration.setString("execution.savepoint.path","file:///rocksdba1d31ed55a64005fefb630fe56c2e9601/chk-2");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

此时我们继续输入事件,依旧不会触发计算,因为此时规则尚未初始化完成

代码语言:shell
复制
2,end,4003

然后我们输入规则 2,此时已经规则初始化完成,任务开始正常计算,与预期一致,三个规则全部有一条匹配结果

代码语言:shell
复制
{"id":2,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":{\"within\":[\"SECONDS\",5]}}]"}
代码语言:shell
复制
2,end,4004
代码语言:shell
复制
dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})

此时我们删除规则10086,删除后,正在运行的规则数变成2,初始配置的规则数3,此时我们期望任务可以正常计算,并且已经没有规则10086

代码语言:shell
复制
{"id":10086,"version":1,"operate":"DELETE"}
代码语言:shell
复制
2,end,5004
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
3.从规则初始化完成的状态恢复,但是规则数少于最少规则数

此时我们停掉任务,从最新的checkpoint恢复,继续输入事件,正在运行的规则数依旧为2,依旧可以正常计算,

代码语言:java
复制
configuration.setString("execution.savepoint.path","file:///rocksdb/0e08999f4b06cc4f50c22c6a752e4b4b/chk-6");
代码语言:shell
复制
2,end,6004
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
代码语言:shell
复制
2,end,7003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
代码语言:shell
复制
2,middle,7002
2,start,7001
2,end,8003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell
复制
2,end,9003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=6004)]})
代码语言:shell
复制
2,end,10003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})

继续输入事件,此时,规则1、2皆匹配到结果

代码语言:shell
复制
2,end,11003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell
复制
2,end,12003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell
复制
2,end,13003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell
复制
2,end,14003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell
复制
2,end,15003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})

我们输入规则,更改规则1,此时规则1之前的状态被清空,继续输入事件,规则匹配结果与预期一致

代码语言:shell
复制
{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}
代码语言:shell
复制
2,end,16003
2,middle,16002
2,start,16001
2,end,17003
2,end,18003
2,end,19003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
代码语言:shell
复制
2,end,20003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
代码语言:shell
复制
2,end,21003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
代码语言:shell
复制
2,end,23003
代码语言:shell
复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell
复制
2,end,25004
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=21003)]})
代码语言:shell
复制
2,end,25003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=22003)]})
END

至此,Flink动态CEP的测试Demo已完成,基于FlinkCEP源码改造的Flink动态CEP已基本成型

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理讲解
  • 测试代码
  • 测试过程
    • 1. 数据准备
      • 2. 从规则未初始化完成的checkpoint恢复任务
        • 3.从规则初始化完成的状态恢复,但是规则数少于最少规则数
          • END
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档