前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink动态CEP之流式获取规则以及动态更新规则

Flink动态CEP之流式获取规则以及动态更新规则

原创
作者头像
sixjo
发布2023-05-22 00:38:28
1.3K2
发布2023-05-22 00:38:28
举报
文章被收录于专栏:Flink动态CEP

Flink任务代码

创建Flink任务,从socket中获取事件及规则,从localhost:8888中获取事件,从localhost:6666中获取规则

其中,如果需要动态更新规则,状态后端必须使用RocksDB

代码语言:java
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如需动态更新规则,状态后端必须使用RocksDB
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Event> eventStream = socketTextStream.flatMap((FlatMapFunction<String, Event>) (s, collector) -> {
	String[] split = s.split(",");
	if (split.length == 3) {
		Integer id = Integer.valueOf(split[0]);
		Double price = Double.valueOf(split[2]);

		Event event = new Event(id, split[1], price);
		collector.collect(event);
	}
},TypeInformation.of(Event.class));

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
	ObjectMapper objectMapper = new ObjectMapper();
	try {
		DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
		if(null != dynamicConfiguration){
			collector.collect(dynamicConfiguration);
		}
	} catch (Exception e) {

	}

}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<Event, Integer> eventIdKeyedStream = eventStream.keyBy(event -> event.getId());
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
		DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
				.inProcessingTime()
				.minPatternInitialized(1)
				.objectProcess();
dynamicMatchDataDataStream.print("dynamic");

env.execute();

测试过程

1.初始化

事件流中输入如下内容,此时,控制台并无任何输出,实际上数据也还没有处理,因为规则尚未初始化

代码语言:shell
复制
[root@sixjo ~]# nc -lk 8888
1,start,2.0
1,foobar,3.0
1,middle,9.0
2,middle,2.0
1,end,7.0

这时候,在规则流中输入规则json,此时,达到配置中的最少初始化一个pattern,出发计算,计算尚未处理的数据

代码语言:shell
复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}

计算结果输出如下

代码语言:shell
复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
2.增加规则

在规则流中增加规则10086,这时候,并不会有输出,因为之前的数据已经处理过了

代码语言:shell
复制
{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}

此时,在事件流中输入end事件,只有规则1有匹配结果,规则10086是从该事件开始计算,尚未有匹配结果

代码语言:shell
复制
1,end,7.0
代码语言:shell
复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

此时重新输入整个匹配的事件,其中规则1理应有3个新的匹配结果输出,规则10086理应有1个匹配结果输出

代码语言:shell
复制
1,start,2.0
1,foobar,3.0
1,middle,9.0
1,end,7.0
代码语言:shell
复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
3.动态更新规则

此时我们规则流重新输入规则1,version为2,更新规则1,此时会清空原有规则1的状态,重新初始化规则1

代码语言:shell
复制
{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{\"name\":\"start\",\"matchPosition\":\"begin\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='start'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"middle\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='middle'\",\"dynamicPatternConfigurations\":null},\"properties\":null},{\"name\":\"end\",\"matchPosition\":\"followedByAny\",\"afterMatchSkipStrategy\":null,\"conditionMethod\":\"where\",\"dynamicCondition\":{\"conditionType\":\"condition\",\"script\":\"name=='end'\",\"dynamicPatternConfigurations\":null},\"properties\":null}]"}

此时我们事件流重新输入end事件,只有规则10086有匹配结果输出

代码语言:shell
复制
1,end,7.0
代码语言:shell
复制
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

此时我们事件流重新输入完整匹配事件,其中规则1理应有1个新的匹配结果输出,规则10086理应有3个匹配结果输出

代码语言:shell
复制
1,start,2.0
1,middle,9.0
1,end,7.0
代码语言:shell
复制
dynamic:14> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

END

至此,Flink CEP动态新增/更新规则已完成

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink任务代码
  • 测试过程
    • 1.初始化
      • 2.增加规则
        • 3.动态更新规则
        • END
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档