使用动态CEP处理数据,分别传入事件流和配置流,配置动态生成Pattern并再DynamicOperator生成状态机等待事件进入,匹配
默认return type为DynamicMatchData<Map<String, List<IN>>>
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
new SubEvent(4, "foo", 4.0, 1.0),
new Event(5, "middle", 5.0),
new SubEvent(6, "middle", 6.0, 2.0),
new SubEvent(7, "bar", 3.0, 3.0),
new Event(42, "42", 42.0),
new Event(8, "end", 1.0));
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
env.fromElements(
DynamicConfigurationDemo.dynamicConfiguration2()
);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(input, dynamicConfigurationDataStream)
.inProcessingTime()
.defaultProcess();
returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();
DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);
print结果如下,与预期结果一直,匹配结果为id(2,5,8),id(2,6,8)的事件
dynamic:2> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[Event(5, middle, 5.0)], end=[Event(8, end, 1.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[SubEvent(6, middle, 6.0, 2.0)], end=[Event(8, end, 1.0)]})
事件流为KeyedStrem,则根据KeyBy以后进行排序,入参配置有两个规则,其中处理不同
其中minPatternInitialized(2),则表示DynamicCepOperator中,至少初始化2个Pattern之后,才会开始正常处理事件流数据
默认minPatternInitialized=1
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(1, "start", 2.0),
new Event(1, "foobar", 3.0),
new SubEvent(1, "foo", 4.0, 1.0),
new Event(2, "middle", 9.0),
new SubEvent(1, "middle", 1.0, 2.0),
new SubEvent(1, "bar", 5.0, 3.0),
new Event(1, "42", 6.0),
new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
env.fromElements(
dynamicConfiguration,dynamicConfiguration2
);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
.inProcessingTime()
.minPatternInitialized(2)
.defaultProcess();
returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();
print结果如下,与预期结果一致,规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
return type为DynamicMatchData<Object>,建议使用这个,因为可以接收所有类型数据,方便后续处理,防止类型转换错误
ObjectDynamicPatternProcessFunction和DefaultDynamicPatternProcessFunction返回的数据是一样的
也可以继承DynamicPatternProcessFunction自己
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(1, "start", 2.0),
new Event(1, "foobar", 3.0),
new SubEvent(1, "foo", 4.0, 1.0),
new Event(2, "middle", 9.0),
new SubEvent(1, "middle", 1.0, 2.0),
new SubEvent(1, "bar", 5.0, 3.0),
new Event(1, "42", 6.0),
new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
env.fromElements(
dynamicConfiguration,dynamicConfiguration2
);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
.inProcessingTime()
.minPatternInitialized(2)
.objectProcess();
returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();
print结果如下,与预期结果一致,
规则1返回数据中的match为字符串,匹配结果id=1,price(2.0,1.0,7.0)
规则10086返回的数据中match为DynamicMatchData<Map<String, List<IN>>>,匹配结果id=1,price(2.0,1.0,7.0)
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
EventPatternProcessFunction代码
public class EventPatternProcessFunction extends DynamicPatternProcessFunction<Event,String> {
@Override
public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
objectDynamicMatchData.setMatch("这是EventPatternProcessFunction matchs"+match.toString());
return objectDynamicMatchData;
}
}
入参的dynamicPatternProcessFunctionClassName必须继承DynamicPatternProcessFunction
如果不继承则使用默认的处理方法DefaultDynamicPatternProcessFunction
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(1, "start", 2.0),
new Event(1, "foobar", 3.0),
new SubEvent(1, "foo", 4.0, 1.0),
new Event(2, "middle", 9.0),
new SubEvent(1, "middle", 1.0, 2.0),
new SubEvent(1, "bar", 5.0, 3.0),
new Event(1, "42", 6.0),
new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction2.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
env.fromElements(
dynamicConfiguration,dynamicConfiguration2
);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
.inProcessingTime()
.minPatternInitialized(2)
.objectProcess();
returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();
DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);
print结果,与预期结果一致
规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)
规则1,10086 返回数据类型均为DynamicMatchData<Map<String, List<IN>>>
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
EventPatternProcessFunction2
public class EventPatternProcessFunction2 extends PatternProcessFunction<Event, DynamicMatchData<String>> {
public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
objectDynamicMatchData.setMatch("这是EventPatternProcessFunction2 matchs"+match.toString());
return objectDynamicMatchData;
}
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<DynamicMatchData<String>> out) throws Exception {
DynamicMatchData<String> dynamicMatchData = transform(match,ctx);
if(ctx instanceof DynamicCepOperator.ContextFunctionImpl){
DynamicCepOperator.ContextFunctionImpl context = (DynamicCepOperator.ContextFunctionImpl) ctx;
String id = context.getPatternProcessor().getId();
int version = context.getPatternProcessor().getVersion();
dynamicMatchData.setId(id);
dynamicMatchData.setVersion(version);
}
out.collect(dynamicMatchData);
}
}
Pattern.begin("start").where("name='start'")
.middle('middle').followedByAny("name='middle'")
.end('end').followedByAny("name='end'")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。