前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink动态CEP Demo

Flink动态CEP Demo

原创
作者头像
sixjo
发布2023-05-18 01:27:00
7180
发布2023-05-18 01:27:00
举报
文章被收录于专栏:Flink动态CEP

Demo1 使用动态CEP处理数据

使用动态CEP处理数据,分别传入事件流和配置流,配置动态生成Pattern并再DynamicOperator生成状态机等待事件进入,匹配

默认return type为DynamicMatchData<Map<String, List<IN>>>

代码语言:java
复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(2, "start", 2.0),
				new Event(3, "foobar", 3.0),
				new SubEvent(4, "foo", 4.0, 1.0),
				new Event(5, "middle", 5.0),
				new SubEvent(6, "middle", 6.0, 2.0),
				new SubEvent(7, "bar", 3.0, 3.0),
				new Event(42, "42", 42.0),
				new Event(8, "end", 1.0));

DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				DynamicConfigurationDemo.dynamicConfiguration2()
		);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(input, dynamicConfigurationDataStream)
		.inProcessingTime()
				.defaultProcess();

returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();

DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);

print结果如下,与预期结果一直,匹配结果为id(2,5,8),id(2,6,8)的事件

代码语言:shell
复制
dynamic:2> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[Event(5, middle, 5.0)], end=[Event(8, end, 1.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[SubEvent(6, middle, 6.0, 2.0)], end=[Event(8, end, 1.0)]})

Demo2 使用动态CEP处理KeyedStream

事件流为KeyedStrem,则根据KeyBy以后进行排序,入参配置有两个规则,其中处理不同

其中minPatternInitialized(2),则表示DynamicCepOperator中,至少初始化2个Pattern之后,才会开始正常处理事件流数据

默认minPatternInitialized=1

代码语言:java
复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));

DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
		.defaultProcess();

returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();

print结果如下,与预期结果一致,规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)

代码语言:shell
复制
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

Demo3 使用动态CEP处理,返回结果为Object

return type为DynamicMatchData<Object>,建议使用这个,因为可以接收所有类型数据,方便后续处理,防止类型转换错误

ObjectDynamicPatternProcessFunction和DefaultDynamicPatternProcessFunction返回的数据是一样的

也可以继承DynamicPatternProcessFunction自己

代码语言:java
复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
				.objectProcess();

returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();

print结果如下,与预期结果一致,

规则1返回数据中的match为字符串,匹配结果id=1,price(2.0,1.0,7.0)

规则10086返回的数据中match为DynamicMatchData<Map<String, List<IN>>>,匹配结果id=1,price(2.0,1.0,7.0)

代码语言:java
复制
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

EventPatternProcessFunction代码

代码语言:java
复制
public class EventPatternProcessFunction extends DynamicPatternProcessFunction<Event,String> {
    @Override
    public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
        DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
        objectDynamicMatchData.setMatch("这是EventPatternProcessFunction matchs"+match.toString());
        return objectDynamicMatchData;
    }
}

Demo4 配置入参ClassName不继承DynamicPatternProcessFunction

入参的dynamicPatternProcessFunctionClassName必须继承DynamicPatternProcessFunction

如果不继承则使用默认的处理方法DefaultDynamicPatternProcessFunction

代码语言:java
复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction2.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
				.objectProcess();

returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();

DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);

print结果,与预期结果一致

规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)

规则1,10086 返回数据类型均为DynamicMatchData<Map<String, List<IN>>>

代码语言:shell
复制
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

EventPatternProcessFunction2

代码语言:java
复制
public class EventPatternProcessFunction2 extends PatternProcessFunction<Event, DynamicMatchData<String>> {
    public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
        DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
        objectDynamicMatchData.setMatch("这是EventPatternProcessFunction2 matchs"+match.toString());
        return objectDynamicMatchData;
    }

    @Override
    public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<DynamicMatchData<String>> out) throws Exception {
        DynamicMatchData<String> dynamicMatchData = transform(match,ctx);
        if(ctx instanceof DynamicCepOperator.ContextFunctionImpl){
            DynamicCepOperator.ContextFunctionImpl context = (DynamicCepOperator.ContextFunctionImpl) ctx;
            String id = context.getPatternProcessor().getId();
            int version = context.getPatternProcessor().getVersion();
            dynamicMatchData.setId(id);
            dynamicMatchData.setVersion(version);
        }
        out.collect(dynamicMatchData);
    }
}

dynamicConfiguration2匹配规则伪代码

代码语言:java
复制
Pattern.begin("start").where("name='start'")
    .middle('middle').followedByAny("name='middle'")
    .end('end').followedByAny("name='end'")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Demo1 使用动态CEP处理数据
  • Demo2 使用动态CEP处理KeyedStream
  • Demo3 使用动态CEP处理,返回结果为Object
  • Demo4 配置入参ClassName不继承DynamicPatternProcessFunction
  • dynamicConfiguration2匹配规则伪代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档