前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink动态CEP之EventTime

Flink动态CEP之EventTime

原创
作者头像
sixjo
修改2023-05-22 01:59:05
4370
修改2023-05-22 01:59:05
举报
文章被收录于专栏:Flink动态CEP

代码

由于测试案例数据量较小,为了防止不同task之间的watermark影响,导致迟迟不出数据,暂时将并行度设置为1

代码语言:java
复制
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);

DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);

SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
	@Override
	public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
		String[] split = s.split(",");
		if (split.length == 3) {
			Long timestamp = Long.valueOf(split[2]);
			EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
			collector.collect(event);
		}
	}
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
		WatermarkStrategy
				.<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
				.withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
	ObjectMapper objectMapper = new ObjectMapper();
	try {
		DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
		if(null != dynamicConfiguration){
			collector.collect(dynamicConfiguration);
		}
	} catch (Exception e) {
	}
}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
		DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
				.minPatternInitialized(1)
				.objectProcess();
dynamicMatchDataDataStream.print("dynamic");

env.execute();

测试过程

1.FlinkCEP基于事件事件乱序处理

规则流

代码语言:json
复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}

事件流

代码语言:shell
复制
[root@sixjo ~]# nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003

此时,尚未输出任何结果,因为Watermark机制,尚未触发计算,事件流继续输入,有匹配结果输出

代码语言:shell
复制
2,end,4003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
代码语言:shell
复制
2,end,5003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
代码语言:shell
复制
2,end,6003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
代码语言:shell
复制
2,end,7003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
2.Flink动态CEP 窗口处理

基于上方运行中的demo继续测试

规则流输入规则1,version为3,更新规则1,within 5s

这个时候,清空规则1的状态,包括watermark至为初始状态-9223372036854775808,假如有很早的数据先进来,则也是可以正常处理的,但是触发计算是任何级别的,并不会因为单个pattern的watermark提前,单个pattern就提前触发计算

代码语言:json
复制
{"id":1,"version":3,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":{\"within\":[\"SECONDS\",5]}}]"}

事件流输入数据测试,至此,未有任何匹配结果输出

代码语言:shell
复制
2,end,-1001
2,middle,-1002
2,start,-1003
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2,end,5003
2,end,6003
2,end,7003
2,middle,7002
2,start,7001

事件流继续输入,测试流程如下

代码语言:shell
复制
2,end,8003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=-1001)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5003)]})
代码语言:shell
复制
2,end,9003
代码语言:shell
复制
2,end,10003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
代码语言:shell
复制
2,end,11003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell
复制
2,end,12003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell
复制
2,end,13003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell
复制
2,end,14003
代码语言:shell
复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
代码语言:shell
复制
2,end,15003
代码语言:shell
复制
2,end,16003
代码语言:shell
复制
2,end,17003

END

至此,基于事件时间的Flink动态CEP测试已基本结束

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 代码
  • 测试过程
    • 1.FlinkCEP基于事件事件乱序处理
      • 2.Flink动态CEP 窗口处理
      • END
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档