我有一个简单的程序,使用flink CEP库从日志记录文件中检测多个失败的登录。我的应用程序使用事件时间,我正在对登录的‘用户’执行一个keyBy。
当我将StreamExecutionEnvironment并行设置为1时,程序运行良好。当并行性是其他任何东西时,它就会失败。我不明白为什么。
我可以看到,所有与特定用户相关的记录都会转到同一个线程,那么为什么会出现这个问题呢?还可以看到,在许多情况下,记录不是按事件时间顺序排列的(不确定这是否是一个问题),但是我在api中找不到任何东西让我按窗口内的事件时间对记录进行排序。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1); //tried with 1 & 4
.....
DataStream<LogEvent> inputLogEventStream = env
.readFile(format, FILEPATH, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.map(new MapToLogEvents())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(0)) {
public long extractTimestamp(LogEvent element) {
return element.getTimeLong();
}
})
.keyBy(new KeySelector<LogEvent, String>() {
public String getKey(LogEvent le) throws Exception {
return le.getUser();
}
});
inputLogEventStream.print();
Pattern<LogEvent, ?> mflPattern = Pattern.<LogEvent> begin("mfl")
.subtype(LogEvent.class).where(
new SimpleCondition<LogEvent>() {
public boolean filter(LogEvent logEvent) {
if (logEvent.getResult().equalsIgnoreCase("failed")) { return true; }
return false;
}
})
.timesOrMore(3).within(Time.seconds(60));
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);
DataStream<Threat> outputMflStream = mflPatternStream.select(
new PatternSelectFunction<LogEvent, Threat>() {
public Threat select(Map<String, List<LogEvent>> logEventsMap) {
return new Threat("MULTIPLE FAILED LOGINS detected!");
}
});
outputMflStream.print();在下列情况下还转载了打印输出:
并行=1(成功地检测到模式)
04/03/2018 12:03:53 Source: Custom File Source(1/1) switched to RUNNING
04/03/2018 12:03:53 SelectCepOperator -> Sink: Unnamed(1/1) switched to RUNNING
04/03/2018 12:03:53 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(1/1) switched to RUNNING
04/03/2018 12:03:53 Sink: Unnamed(1/1) switched to RUNNING
LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
**THREAT** ==> MULTIPLE FAILED LOGINS detected!并行=4(未能检测到模式)
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(3/4) switched to RUNNING
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(2/4) switched to RUNNING
04/03/2018 12:05:33 Sink: Unnamed(2/4) switched to RUNNING
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(2/4) switched to RUNNING
04/03/2018 12:05:33 Sink: Unnamed(3/4) switched to RUNNING
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(3/4) switched to RUNNING
2> LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
3> LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
3> LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
3> LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]发布于 2018-04-24 19:48:07
我认为,当使用.keyBy()使用CEP时,不同的分区会得到这些事件,这一点非常重要。
你的代码
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);我认为应该是
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream.keyBy("eventCategory","user"), mflPattern);你可能想看看https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
https://stackoverflow.com/questions/49624021
复制相似问题