由于测试案例数据量较小,为了防止不同task之间的watermark影响,导致迟迟不出数据,暂时将并行度设置为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
@Override
public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
String[] split = s.split(",");
if (split.length == 3) {
Long timestamp = Long.valueOf(split[2]);
EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
collector.collect(event);
}
}
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);
SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
if(null != dynamicConfiguration){
collector.collect(dynamicConfiguration);
}
} catch (Exception e) {
}
}, TypeInformation.of(DynamicConfiguration.class));
KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
.minPatternInitialized(1)
.objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();
规则流
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}
事件流
[root@sixjo ~]# nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
此时,尚未输出任何结果,因为Watermark机制,尚未触发计算,事件流继续输入,有匹配结果输出
2,end,4003
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
2,end,5003
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
2,end,6003
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
2,end,7003
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
基于上方运行中的demo继续测试
规则流输入规则1,version为3,更新规则1,within 5s
这个时候,清空规则1的状态,包括watermark至为初始状态-9223372036854775808,假如有很早的数据先进来,则也是可以正常处理的,但是触发计算是任何级别的,并不会因为单个pattern的watermark提前,单个pattern就提前触发计算
{"id":1,"version":3,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":{\"within\":[\"SECONDS\",5]}}]"}
事件流输入数据测试,至此,未有任何匹配结果输出
2,end,-1001
2,middle,-1002
2,start,-1003
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2,end,5003
2,end,6003
2,end,7003
2,middle,7002
2,start,7001
事件流继续输入,测试流程如下
2,end,8003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=-1001)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5003)]})
2,end,9003
2,end,10003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
2,end,11003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
2,end,12003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
2,end,13003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
2,end,14003
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
2,end,15003
2,end,16003
2,end,17003
至此,基于事件时间的Flink动态CEP测试已基本结束
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。