前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink cep 案例之机架温度监控报警

flink cep 案例之机架温度监控报警

作者头像
大数据技术与应用实战
发布2020-09-15 14:24:19
9360
发布2020-09-15 14:24:19
举报

FlinkCEP是在Flink之上实现的复杂事件处理库。它提供了丰富的API,允许您在不停止的事件流中检测事件模式,并对复杂事件做相应处理。模式匹配是复杂事件处理的一个有力的保障,应用场景包括受一系列事件驱动的各种业务流程,例如在正常的网略行为中侦测异常行为;在金融应用中查找价格、交易量和其他行为的模式。

特点:

  • 复杂性:多个流join,窗口聚合,事件序列或patterns检测
  • 低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
  • 高吞吐:每秒上万条消息

在这篇博客中,我们将通过一个案例来讲解flink CEP的使用。 案例来源于官网博客:https://flink.apache.org/news/2016/04/06/cep-monitoring.html

输入事件流由来自一组机架的温度和功率事件组成。目标是检测 当机架过热时我们需要发出警告和报警。

我们通过自定义的source来模拟生成机架的温度,然后定义以下的规则来生成警告和报警

  • 警告:某机架在10秒内连续两次上报的温度超过阈值;
  • 报警:某机架在20秒内连续两次匹配警告;

首先我们定义一个监控事件

注意要重写里面的hashcode方法和equal方法

来自官网:The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.

代码语言:javascript
复制
public abstract class MonitoringEvent {
    private int rackID;

    public MonitoringEvent(int rackID) {
        this.rackID = rackID;
    }

    public int getRackID() {
        return rackID;
    }

    public void setRackID(int rackID) {
        this.rackID = rackID;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof MonitoringEvent) {
            MonitoringEvent monitoringEvent = (MonitoringEvent) obj;
            return monitoringEvent.canEquals(this) && rackID == monitoringEvent.rackID;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return rackID;
    }

    public boolean canEquals(Object obj) {
        return obj instanceof MonitoringEvent;
    }
}




public class TemperatureEvent extends MonitoringEvent {
    private double temperature;
    ...
}

public class PowerEvent extends MonitoringEvent {
    private double voltage;
    ...
}

我们通过自定义的source来模拟生成MonitoringEvent数据。

代码语言:javascript
复制


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Input stream of monitoring events
        DataStream<MonitoringEvent> inputEventStream = env
                .addSource(new MonitoringEventSource(
                        MAX_RACK_ID,
                        PAUSE,
                        TEMPERATURE_RATIO,
                        POWER_STD,
                        POWER_MEAN,
                        TEMP_STD,
                        TEMP_MEAN))
                .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

接下来定义模式,在10秒钟之内连续两个event的温度超过阈值

代码语言:javascript
复制

       // Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
        // appearing within a time interval of 10 seconds
        Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .next("second")  //紧接着上一个事件
                
                
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .within(Time.seconds(10));
                

使用报警模式和输入流生成模式流

代码语言:javascript
复制
     // Create a pattern stream from our warning pattern
        PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
                inputEventStream.keyBy("rackID"),
                warningPattern);

使用select方法为每个匹配的报警模式生成相应的报警。其中返回值是一个map,key是我们定义的模式,value是匹配的事件列表。

代码语言:javascript
复制

    // Generate temperature warnings for each matched warning pattern
        DataStream<TemperatureWarning> warnings = tempPatternStream.select(
                (Map<String, List<MonitoringEvent>> pattern) -> {
                    TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
                    TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);

                    return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
                }
        );

以上我们最后生成了相应的用于警告的DataStream类型的数据流warnings,接下来我们使用这个警告流来生成我们的报警流,即在20秒内连续两次发生警告。

代码语言:javascript
复制

   // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
        Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
                .next("second")
                .within(Time.seconds(20));

然后通过上面的报警模式alertPattern和警告流warnings生成我们的报警流alertPatternStream。

代码语言:javascript
复制

   // Create a pattern stream from our alert pattern
        PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
                warnings.keyBy("rackID"),
                alertPattern);

最后当收集到的两次警告中,第一次警告的平均温度小于第二次的时候,生成报警,封装TemperatureAlert信息返回。

代码语言:javascript
复制

  // Generate a temperature alert only if the second temperature warning's average temperature is higher than
        // first warning's temperature
        DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
                (Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
                    TemperatureWarning first = pattern.get("first").get(0);
                    TemperatureWarning second = pattern.get("second").get(0);

                    if (first.getAverageTemperature() < second.getAverageTemperature()) {
                        out.collect(new TemperatureAlert(first.getRackID()));
                    }
                },
                TypeInformation.of(TemperatureAlert.class));

最后我们将报警流和警告流输出,当然我们也可以对这两个流做其他的操作,比如发到报警系统等。

代码语言:javascript
复制
   // Print the warning and alert events to stdout
        warnings.print();
        alerts.print();

参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html [2] https://flink.apache.org/news/2016/04/06/cep-monitoring.html

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与应用实战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档