前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >今日指数项目之FlinkCEP入门案例

今日指数项目之FlinkCEP入门案例

作者头像
Maynor
发布2022-05-07 10:32:10
2720
发布2022-05-07 10:32:10
举报

CEP案例

1.入门案例

需求: 有一个业务系统,用户要使用该业务系统必须要先登陆 过滤出来在2秒内连续登陆失败的用户

在test源码目录下创建测试类:cn.itcast.LoginFailDemo 开发步骤: 1.获取流处理执行环境 2.设置并行度,设置事件时间 加载数据源,提取事件时间 4.定义匹配模式,设置时间长度 5.匹配模式(分组) 6.数据处理 7.打印 8.触发执行

数据源:

代码语言:javascript
复制
	Arrays.asList(
        new LoginUser (1, "192.168.0.1", "fail", 1558430842000L),		//2019-05-21 17:27:22
        new LoginUser (1, "192.168.0.2", "fail", 1558430843000L),		//2019-05-21 17:27:23
        new LoginUser (1, "192.168.0.3", "fail", 1558430844000L),		//2019-05-21 17:27:24
        new LoginUser (2, "192.168.10.10", "success", 1558430845000L)	//2019-05-21 17:27:25
)

参考代码

代码语言:javascript
复制
/**

 * 使用CEP实现三秒内登录失败两次的用户
   */
   public class LoginFailDemo {

   public static void main(String[] args) throws Exception {
       //1:初始化流式运行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //2:设置并行度为1
       env.setParallelism(1);
       //3:指定数据按照事件时间进行处理
       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
       //4:构建数据源
       DataStream<LoginUser > LoginUserStream = env.fromCollection(Arrays.asList(
               new LoginUser (1, "192.168.0.1", "fail", 1558430842000L),//2019-05-21 17:27:22
               new LoginUser (1, "192.168.0.2", "fail", 1558430843000L),//2019-05-21 17:27:23
               new LoginUser (1, "192.168.0.3", "fail", 1558430844000L),//2019-05-21 17:27:24
               new LoginUser (2, "192.168.10.10", "success", 1558430845000L)//2019-05-21 17:27:25
       )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginUser>(Time.seconds(0)) {
           @Override
           public long extractTimestamp(LoginUser element) {
               return element.getEventTime();
           }
       });

       //5.1:定义规则模型
       Pattern<LoginUser, LoginUser > LoginUserPattern = Pattern.<LoginUser >begin("begin")
               .where(new IterativeCondition<LoginUser>() {
                   @Override
                   public boolean filter(LoginUser loginUser, Context<LoginUser > context) throws Exception {
                       return loginUser.getEventType().equals("fail");
                   }
       
               })//匹配第一个事件,匹配的是登录失败
               .next("next") //匹配到第一个事件以后,紧跟着一个事件数据,next表示两个事件必须严格的临近
               .where(new IterativeCondition<LoginUser >() {
                   @Override
                   public boolean filter(LoginUser loginUser, Context<LoginUser> context) throws Exception {
                       return loginUser.getEventType().equals("fail");
                   }
               })//匹配第二个事件,匹配的是登录失败
               .within(Time.seconds(3));//定义结束状态,结束状态可以是时间触发也可以是满足了某个事件触发
       
       //5.2:将规则模型应用到数据流中
       PatternStream<LoginUser > patternDataStream = CEP.pattern(LoginUserStream.keyBy(LoginUser ::getUserId), LoginUserPattern);
       //5.3:获取到符合规则模型的数据
       /**
          * IN:传入的数据类型
          * OUT:返回值的数据类型
          *  (Long, String, String, Long):(用户id, 登录ip,登录状态,登录时间)
          */
       
       SingleOutputStreamOperator<Tuple4<Integer, String, String, Long>> loginFailDataStream = patternDataStream.select(new PatternSelectFunction<LoginUser, Tuple4<Integer, String, String, Long>>() {
           @Override
           public Tuple4<Integer, String, String, Long> select(Map<String, List<LoginUser>> pattern) throws Exception {
               //根据刚才的分析,符合规则的数据会存储到状态集合中,也就是state中,所以查找匹配的时候需要在state中获取数据
               LoginUser loginUser = pattern.getOrDefault("next", null).iterator().next();
       
               //返回匹配到的数据
               return Tuple4.of(loginUser.getUserId(), loginUser.getIp(), loginUser.getEventType(), loginUser.getEventTime());
           }
       });
       
       //打印出来符合条件的数据
       loginFailDataStream.print("连续两次登录失败的用户>>>");
       //执行任务
       env.execute();

   }

登陆对象:

代码语言:javascript
复制
   public int userId; //用户id
   public String ip;//用户Ip
   public String eventType; //状态
   public Long eventTime;//事件时间

 /**
     * 构建登录对象
     */
    public static class LoginUser implements Serializable {
        public int userId; //用户id
        public String ip;//用户Ip
        public String eventType; //状态
        public Long eventTime;//事件时间

        public int getUserId() {
            return userId;
        }
    
        public void setUserId(int userId) {
            this.userId = userId;
        }
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public String getEventType() {
            return eventType;
        }
    
        public void setEventType(String eventType) {
            this.eventType = eventType;
        }
    
        public Long getEventTime() {
            return eventTime;
        }
    
        public void setEventTime(Long eventTime) {
            this.eventTime = eventTime;
        }
    
        public LoginEvent(int userId, String ip, String eventType, Long eventTime) {
            this.userId = userId;
            this.ip = ip;
            this.eventType = eventType;
            this.eventTime = eventTime;
        }
    
        @Override
        public String toString() {
            return "LoginEvent{" +
                    "userId=" + userId +
                    ", ip='" + ip + '\'' +
                    ", eventType='" + eventType + '\'' +
                    ", eventTime=" + eventTime +
                    '}';
        }
    }

}

2.监控市场价格

需求: 物价局和工商局会监督市场上各种商品得销售价格,随着市场行情和商品供需得变化,商品价格会有一定程度得浮动,如果商品价格在指定得价格区间波动,政府部门是不会干预的额,如果商品价格在一定的时间范围内波动幅度超出了指定的区间范围,并且上行幅度过大,物价局会上报敏感数据信息,并规范市场价格。 在此,我们假定如果商品售价在1分钟之内有连续两次超过预定商品价格阀值就发送告警信息。

测试数据

代码语言:javascript
复制
{"goodsId":100001,"goodsPrice":6,"goodsName":"apple","alias":"苹果","orderTime":1558430843000}
{"goodsId":100007,"goodsPrice":0.5,"goodsName":"mask","alias":"口罩","orderTime":1558430844000}
{"goodsId":100002,"goodsPrice":2,"goodsName":"rice","alias":"大米","orderTime":1558430845000}
{"goodsId":100003,"goodsPrice":2,"goodsName":"flour","alias":"面粉","orderTime":1558430846000}
{"goodsId":100004,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430847000}
{"goodsId":100005,"goodsPrice":20,"goodsName":"apple","alias":"苹果","orderTime":1558430848000}
{"goodsId":100006,"goodsPrice":3,"goodsName":"banana","alias":"香蕉","orderTime":1558430849000}
{"goodsId":100007,"goodsPrice":10,"goodsName":"mask","alias":"口罩","orderTime":1558430850000}
{"goodsId":100001,"goodsPrice":16,"goodsName":"apple","alias":"苹果","orderTime":1558430852000}
{"goodsId":100007,"goodsPrice":15,"goodsName":"mask","alias":"口罩","orderTime":1558430853000}
{"goodsId":100002,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430854000}
{"goodsId":100003,"goodsPrice":12,"goodsName":"flour","alias":"面粉","orderTime":1558430855000}
{"goodsId":100004,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430856000}
{"goodsId":100005,"goodsPrice":20,"goodsName":"apple","alias":"苹果","orderTime":1558430857000}
{"goodsId":100006,"goodsPrice":13,"goodsName":"banana","alias":"香蕉","orderTime":1558430858000}
{"goodsId":100007,"goodsPrice":10,"goodsName":"mask","alias":"口罩","orderTime":1558430859000}

创建kafka topic

代码语言:javascript
复制
./kafka-topics.sh --create --topic cep --zookeeper node01:2181 --partitions 1 --replication-factor 1 

生产数据

代码语言:javascript
复制
./kafka-console-producer.sh --broker-list node01:9092 --topic cep

redis保存限制价格

jedisCluster.hset(“product”,“apple”,“10”); jedisCluster.hset(“product”,“rice”,“6”); jedisCluster.hset(“product”,“flour”,“6”); jedisCluster.hset(“product”,“banana”,“8”); jedisCluster.hset(“product”,“mask”,“5”);

开发步骤 在test源码目录下创建测试类:cn.itcast.CepMarkets 1.获取流处理执行环境 2.设置事件时间、并行度 整合kafka 4.数据转换 5.process获取bean,设置status,并设置事件时间 6.定义匹配模式,设置时间长度 7.匹配模式(分组) 8.查询告警数据

2.1.代码开发

代码语言:javascript
复制
public class CepMarkets {

    public static void main(String[] args) throws Exception {
       
        //1.获取流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.设置事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //3.整合kafka
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node01:9092"); //broker地址
        properties.setProperty("group.id", "cep"); //消费组
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "5000");
        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("cep", new SimpleStringSchema(), properties);
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource<String> source = env.addSource(kafkaConsumer);
    
        //4.数据转换
        SingleOutputStreamOperator<Product> mapData = source.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String value) throws Exception {
                JSONObject json = JSON.parseObject(value);
                Product product = new Product(
                        json.getLong("goodsId"),
                        json.getDouble("goodsPrice"),
                        json.getString("goodsName"),
                        json.getString("alias"),
                        json.getLong("orderTime"),
                        false
                );
                return product;
            }
        });
    
        //5.保留告警数据(设置时间)
        SingleOutputStreamOperator<Product> waterData = mapData.keyBy(Product::getGoodsId)
                .process(new KeyedProcessFunction<Long, Product, Product>() {
                    Map<String, String> map = null;
    
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        JedisCluster jedisCluster = RedisUtil.getJedisCluster();
                        map = jedisCluster.hgetAll("product");
                    }
    
                    @Override
                    public void processElement(Product value, Context ctx, Collector<Product> out) throws Exception {
                        long priceAlert = Long.parseLong(map.get(value.getGoodsName()));
                        if (value.getGoodsPrice() > priceAlert) {
                            value.setStatus(true);
                        }
                        out.collect(value);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Product>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(Product element) {
                        return element.getOrderTime();
                    }
                })
                ;
         //6.定义匹配模式,设置时间长度
        Pattern<Product, Product> pattern = Pattern.<Product>begin("begin")
                .where(new SimpleCondition<Product>() {
                    @Override
                    public boolean filter(Product value) throws Exception {
                        return value.getStatus() == true;
                    }
                })
                .next("next")
                .where(new SimpleCondition<Product>() {
                    @Override
                    public boolean filter(Product value) throws Exception {
                        return value.getStatus() == true;
                    }
                })
                .within(Time.seconds(60));
    
        //7.匹配模式(分组)
        PatternStream<Product> cep = CEP.pattern(waterData.keyBy(Product::getGoodsId), pattern);
    
        //8.查询告警数据
        cep.select(new PatternSelectFunction<Product, Object>() {
            @Override
            public Object select(Map<String, List<Product>> pattern) throws Exception {
                List<Product> result = pattern.get("next");
                return result;
            }
        }).print("告警数据:");
    
        env.execute();
    }

}

2.2.Bean对象
属性:goodsId、goodsPrice、goodsName、alias、orderTime、status
public class Product {
    private Long goodsId;
    private Double goodsPrice;
    private String goodsName;
    private String alias;
    private Long orderTime;
    private Boolean status;

    public Product(Long goodsId, Double goodsPrice, String goodsName, String alias, Long orderTime, Boolean status) {
        this.goodsId = goodsId;
        this.goodsPrice = goodsPrice;
        this.goodsName = goodsName;
        this.alias = alias;
        this.orderTime = orderTime;
        this.status = status;
    }
    
    @Override
    public String toString() {
        return "Product{" +
                "goodsId=" + goodsId +
                ", goodsPrice=" + goodsPrice +
                ", goodsName='" + goodsName + '\'' +
                ", alias='" + alias + '\'' +
                ", orderTime=" + orderTime +
                ", status=" + status +
                '}';
    }
    
    public Long getGoodsId() {
        return goodsId;
    }
    
    public void setGoodsId(Long goodsId) {
        this.goodsId = goodsId;
    }
    
    public Double getGoodsPrice() {
        return goodsPrice;
    }
    
    public void setGoodsPrice(Double goodsPrice) {
        this.goodsPrice = goodsPrice;
    }
    
    public String getGoodsName() {
        return goodsName;
    }
    
    public void setGoodsName(String goodsName) {
        this.goodsName = goodsName;
    }
    
    public String getAlias() {
        return alias;
    }
    
    public void setAlias(String alias) {
        this.alias = alias;
    }
    
    public Long getOrderTime() {
        return orderTime;
    }
    
    public void setOrderTime(Long orderTime) {
        this.orderTime = orderTime;
    }
    
    public Boolean getStatus() {
        return status;
    }
    
    public void setStatus(Boolean status) {
        this.status = status;
    }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CEP案例
    • 1.入门案例
      • 2.监控市场价格
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档