首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >当并行度>1时,Flink CEP无法检测模式

当并行度>1时,Flink CEP无法检测模式
EN

Stack Overflow用户
提问于 2018-04-03 06:57:32
回答 1查看 366关注 0票数 0

我有一个简单的程序,使用flink CEP库从日志记录文件中检测多个失败的登录。我的应用程序使用事件时间,我正在对登录的‘用户’执行一个keyBy。

当我将StreamExecutionEnvironment并行设置为1时,程序运行良好。当并行性是其他任何东西时,它就会失败。我不明白为什么。

我可以看到,所有与特定用户相关的记录都会转到同一个线程,那么为什么会出现这个问题呢?还可以看到,在许多情况下,记录不是按事件时间顺序排列的(不确定这是否是一个问题),但是我在api中找不到任何东西让我按窗口内的事件时间对记录进行排序。

代码语言:javascript
运行
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.setParallelism(1); //tried with 1 & 4
.....    
DataStream<LogEvent> inputLogEventStream = env
    .readFile(format, FILEPATH, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
    .map(new MapToLogEvents())              
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LogEvent>(Time.seconds(0)) {
            public long extractTimestamp(LogEvent element) {
                    return element.getTimeLong();
            }
    })
    .keyBy(new KeySelector<LogEvent, String>() {
            public String getKey(LogEvent le) throws Exception {
                    return le.getUser();
            }
    });
inputLogEventStream.print();

Pattern<LogEvent, ?> mflPattern = Pattern.<LogEvent> begin("mfl")
    .subtype(LogEvent.class).where(
            new SimpleCondition<LogEvent>() {
                    public boolean filter(LogEvent logEvent) {
                            if (logEvent.getResult().equalsIgnoreCase("failed")) { return true; }
                            return false;
                    }
    })
    .timesOrMore(3).within(Time.seconds(60));

PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);

DataStream<Threat> outputMflStream = mflPatternStream.select(
    new PatternSelectFunction<LogEvent, Threat>() {
            public Threat select(Map<String, List<LogEvent>> logEventsMap) {
                    return new Threat("MULTIPLE FAILED LOGINS detected!");
            }
    });
outputMflStream.print();

在下列情况下还转载了打印输出:

并行=1(成功地检测到模式)

代码语言:javascript
运行
复制
04/03/2018 12:03:53 Source: Custom File Source(1/1) switched to RUNNING 
04/03/2018 12:03:53 SelectCepOperator -> Sink: Unnamed(1/1) switched to RUNNING 
04/03/2018 12:03:53 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(1/1) switched to RUNNING 
04/03/2018 12:03:53 Sink: Unnamed(1/1) switched to RUNNING 
LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
**THREAT** ==> MULTIPLE FAILED LOGINS detected!

并行=4(未能检测到模式)

代码语言:javascript
运行
复制
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(3/4) switched to RUNNING 
04/03/2018 12:05:33 Split Reader: Custom File Source -> Map -> Timestamps/Watermarks(2/4) switched to RUNNING 
04/03/2018 12:05:33 Sink: Unnamed(2/4) switched to RUNNING 
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(2/4) switched to RUNNING 
04/03/2018 12:05:33 Sink: Unnamed(3/4) switched to RUNNING 
04/03/2018 12:05:33 SelectCepOperator -> Sink: Unnamed(3/4) switched to RUNNING 
2> LogEvent [recordType=base18, eventCategory=login, user=paul, machine=laptop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:08Z, timeLong=1522103408000]
3> LogEvent [recordType=base21, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:06Z, timeLong=1522103406000]
3> LogEvent [recordType=base20, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:05Z, timeLong=1522103405000]
3> LogEvent [recordType=base19, eventCategory=login, user=deb, machine=desktop1, result=failed, eventCount=1, dataBytes=100, time=2018-03-26T22:30:03Z, timeLong=1522103403000]
EN

回答 1

Stack Overflow用户

发布于 2018-04-24 19:48:07

我认为,当使用.keyBy()使用CEP时,不同的分区会得到这些事件,这一点非常重要。

你的代码

代码语言:javascript
运行
复制
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream, mflPattern);

我认为应该是

代码语言:javascript
运行
复制
PatternStream<LogEvent> mflPatternStream = CEP.pattern(inputLogEventStream.keyBy("eventCategory","user"), mflPattern);

你可能想看看https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49624021

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档