前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >全网最详细4W字Flink入门笔记(下)

全网最详细4W字Flink入门笔记(下)

原创
作者头像
BookSea
发布2023-07-21 10:15:18
4820
发布2023-07-21 10:15:18
举报
文章被收录于专栏:Java随想录Java随想录

本文已收录至Github,推荐阅读 👉 Java随想录

接前面中篇,此为下篇

Table API & Flink SQL

在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一

在 Flink 1.8 架构里,如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。 Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,阿里巴巴的 Blink 团队在这方面做了大量的工作,已经实现了 Table API & SQL 层的流批统一。阿里巴巴已经将 Blink 开源回馈给 Flink 社区。

开发环境构建

在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能,取名叫: Blink Planner。在使用Table API和SQL开发Flink应用之前,通过添加Maven的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了Table API和SQL接口。

代码语言:html
复制
	<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>

Table Environment

和DataStream API一样,Table API和SQL中具有相同的基本编程模型。首先需要构建对应的TableEnviroment创建关系型编程环境,才能够在程序中使用Table API和SQL来编写应用程序,另外Table API和SQL接口可以在应用中同时使用,Flink SQL基于Apache Calcite框架实现了SQL标准协议,是构建在Table API之上的更高级接口。

首先需要在环境中创建TableEnvironment对象,TableEnvironment中提供了注册内部表、执行Flink SQL语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment创建方式也有所不同,但是都是通过调用create()方法创建

流计算环境下创建TableEnviroment:

代码语言:scala
复制
//创建流式计算的上下文环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(env)

Table API

Table API 顾名思义,就是基于“表”(Table)的一套 API,专门为处理表而设计的,它提供了关系型编程模型,可以用来处理结构化数据,支持表和视图的概念。在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了,非常实用。

下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从CSV文件中读取数据,然后执行简单的查询并将结果写入到另一个CSV文件中。

首先我们需要导入maven依赖:

代码语言:html
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>

代码示例如下:

代码语言:Java
复制
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

public class TableAPIExample {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

        DataSet<Tuple2<String, Integer>> data = env.readCsvFile("input.csv")
                .includeFields("11")
                .types(String.class, Integer.class);

        Table table = tableEnv.fromDataSet(data, "name, age");
        tableEnv.createTemporaryView("people", table);

        Table result = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30");

        DataSet<Tuple2<String, Integer>> output = tableEnv.toDataSet(result, Tuple2.class);
        output.writeAsCsv("output.csv");

        env.execute();
    }
}

在这个例子中,使用readCsvFile方法从CSV文件中读取数据,并使用includeFieldstypes方法指定要包含的字段和字段类型。接下来,使用fromDataSet方法将数据集转换为表,并使用createTemporaryView方法创建一个临时视图。然后,使用sqlQuery方法执行SQL查询,并使用toDataSet方法将结果转换为数据集。最后,使用writeAsCsv方法将结果写入到CSV文件中,并使用execute方法启动执行。

除了上面这种写法外,我们还有下面2种写法:

代码语言:Java
复制
//这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明
//“$”符号用来指定表中的一个字段。代码和直接执行SQL是等效的。
Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"),$("user"))
  
//这其实是一种简略的写法,我们将 Table 对象名 eventTable 直接以字符串拼接的形式添加到 SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤。
Table clickTable = tableEnvironment.sqlQuery("select url, user from " +eventTable);
Virtual Tables(虚拟表)

在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。

代码语言:Java
复制
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL,又该怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册;所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用:

代码语言:Java
复制
tableEnv.createTemporaryView("NewTable", newTable);

这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图” (createTemporaryView)。

表流互转
代码语言:Java
复制
// 将表转换成数据流,并打印
tableEnv.toDataStream(aliceVisitTable).print();
// 将数据流转换成表。
// 另外,我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
动态表和持续查询

在Flink中,动态表(Dynamic Tables)是一种特殊的表,它可以随时间变化。它们通常用于表示无限流数据,例如事件流或服务器日志。与静态表不同,动态表可以在运行时插入、更新和删除行。

动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询(Continuous Query)。

下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从Kafka主题中读取数据,然后执行持续查询并将结果写入到另一个Kafka主题中。

代码语言:Java
复制
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DynamicTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = 		   EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        tableEnv.executeSql("CREATE TABLE input (" +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'input-topic'," +
                "  'properties.bootstrap.servers' = 'localhost:9092'," +
                "  'format' = 'json'" +
                ")");

        tableEnv.executeSql("CREATE TABLE output (" +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'output-topic'," +
                "  'properties.bootstrap.servers' = 'localhost:9092'," +
                "  'format' = 'json'" +
                ")");

        Table result = tableEnv.sqlQuery("SELECT name, age FROM input WHERE age > 30");
        tableEnv.toAppendStream(result, Row.class).print();

        result.executeInsert("output");

        env.execute();
    }
}

在这个例子中,首先创建了一个StreamExecutionEnvironment来设置执行环境,并使用StreamTableEnvironment.create方法创建了一个StreamTableEnvironment。然后,使用executeSql方法创建了两个Kafka表:一个用于读取输入数据,另一个用于写入输出数据。接下来,使用sqlQuery方法执行持续查询,并使用toAppendStream方法将结果转换为数据流。最后,使用executeInsert方法将结果写入到输出表中,并使用execute方法启动执行。

连接到外部系统

在 Table API编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。

其中最简单的当然就是连接到控制台打印输出:

代码语言:Java
复制
CREATE TABLE ResultTable (
  user STRING,
  cnt BIGINT
WITH (
  'connector' = 'print'
);
Kafka

需要导入maven依赖:

代码语言:html
复制
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka,并定义必要的配置参数。

代码语言:sql
复制
CREATE TABLE KafkaTable (
 `user` STRING,
 `url` STRING,
 `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
 'connector' = 'kafka',
 'topic' = 'events',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
)
MySQL
代码语言:html
复制
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

创建 JDBC 表的方法与前面Kafka 大同小异:

代码语言:sql
复制
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (
 id BIGINT,
 name STRING,
 age INT,
 status BOOLEAN,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
 'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;

Table API实战

在Flink中创建一张表有两种方法:

  • 从一个文件中导入表结构(Structure)(常用于批计算)(静态)
  • 从DataStream或者DataSet转换成Table (动态)
1.创建Table

Table API中已经提供了TableSource从外部系统获取数据,例如常见的数据库、文件系统和Kafka消息队列等外部系统。

  1. 从文件中创建Table(静态表)

Flink允许用户从本地或者分布式文件系统中读取和写入数据,在Table API中可以通过CsvTableSource类来创建,只需指定相应的参数即可。但是文件格式必须是CSV格式的。其他文件格式也支持(在Flink还有Connector的来支持其他格式或者自定义TableSource)

代码语言:scala
复制
代码语言:txt
复制
   //创建流式计算的上下文环境
代码语言:txt
复制
   val env = StreamExecutionEnvironment.getExecutionEnvironment
代码语言:txt
复制
   //创建Table API的上下文环境
代码语言:txt
复制
   val tableEvn = StreamTableEnvironment.create(env)
代码语言:txt
复制
   val source = new CsvTableSource("D:\\code\\StudyFlink\\data\\tableexamples"
代码语言:txt
复制
     , Array[String]("id", "name", "score")
代码语言:txt
复制
     , Array(Types.INT, Types.STRING, Types.DOUBLE)
代码语言:txt
复制
   )
代码语言:txt
复制
   //将source注册成一张表  别名:exampleTab
代码语言:txt
复制
   tableEvn.registerTableSource("exampleTab",source)
代码语言:txt
复制
   tableEvn.scan("exampleTab").printSchema()
代码语言:txt
复制

代码最后不需要env.execute(),这并不是一个流式计算任务

  1. 从DataStream中创建Table(动态表)

前面已经知道Table API是构建在DataStream API和DataSet API之上的一层更高级的抽象,因此用户可以灵活地使用Table API将Table转换成DataStream或DataSet数据集,也可以将DataSteam或DataSet数据集转换成Table,这和Spark中的DataFrame和RDD的关系类似

2.修改Table中字段名
代码语言:txt
复制
Flink支持把自定义POJOs类的所有case类的属性名字变成字段名,也可以通过基于字段偏移位置和字段名称两种方式重新修改:
代码语言:scala
复制
    //导入table库中的隐式转换
    import org.apache.flink.table.api.scala._ 
    // 基于位置重新指定字段名称为"field1", "field2", "field3"
    val table = tStreamEnv.fromDataStream(stream, 'field1, 'field2, 'field3)
    // 将DataStream转换成Table,并且将字段名称重新成别名
    val table: Table = tStreamEnv.fromDataStream(stream, 'rowtime as 'newTime, 'id as 'newId,'variable as 'newVariable)

注意:要导入隐式转换。如果使用as 修改字段,必须修改表中所有的字段。

3.查询和过滤

在Table对象上使用select操作符查询需要获取的指定字段,也可以使用filter或where方法过滤字段和检索条件,将需要的数据检索出来。

代码语言:scala
复制
object TableAPITest {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)
    //初始化Table API的上下文环境
    val tableEvn =StreamTableEnvironment.create(streamEnv)
    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala._
    val data = streamEnv.socketTextStream("hadoop101",8888)
          .map(line=>{
            var arr =line.split(",")
            new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
          })

    val table: Table = tableEvn.fromDataStream(data)
    //查询
    tableEvn.toAppendStream[Row](
      table.select('sid,'callType as 'type,'callTime,'callOut))
      .print()
    //过滤查询
    tableEvn.toAppendStream[Row](
      table.filter('callType==="success") //filter
        .where('callType==="success"))    //where
      .print()
    tableEvn.execute("sql")
  }

其中toAppendStream函数是吧Table对象转换成DataStream对象。

4.分组聚合
代码语言:txt
复制
举例:我们统计每个基站的日志数量。
代码语言:scala
复制
val table: Table = tableEvn.fromDataStream(data)
    tableEvn.toRetractStream[Row](
      table.groupBy('sid).select('sid, 'sid.count as 'logCount))
      .filter(_._1==true) //返回的如果是true才是Insert的数据
      .print()

在代码中可以看出,使用toAppendStream和toRetractStream方法将Table转换为DataStreamT数据集,T可以是Flink自定义的数据格式类型Row,也可以是用户指定的数据格式类型。在使用toRetractStream方法时,返回的数据类型结果为DataStream(Boolean,T),Boolean类型代表数据更新类型,True对应INSERT操作更新的数据,False对应DELETE操作更新的数据。

5.UDF自定义的函数

用户可以在Table API中自定义函数类,常见的抽象类和接口是:

  • ScalarFunction
  • TableFunction
  • AggregateFunction
  • TableAggregateFunction

案例:使用Table完成基于流的WordCount

代码语言:scala
复制
object TableAPITest2 {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
    //初始化Table API的上下文环境
    val tableEvn =StreamTableEnvironment.create(streamEnv)
    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala._

    val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888)
    val table: Table = tableEvn.fromDataStream(stream,'words)
    var my_func =new MyFlatMapFunction()//自定义UDF
    val result: Table = table.flatMap(my_func('words)).as('word, 'count)
      .groupBy('word) //分组
      .select('word, 'count.sum as 'c) //聚合
    tableEvn.toRetractStream[Row](result)
      .filter(_._1==true)
      .print()

    tableEvn.execute("table_api")

  }
  //自定义UDF
  class MyFlatMapFunction extends TableFunction[Row]{
    //定义类型
    override def getResultType: TypeInformation[Row] = {
      Types.ROW(Types.STRING, Types.INT)
    }
    //函数主体
    def eval(str:String):Unit ={
      str.trim.split(" ")
        .foreach({word=>{
          var row =new Row(2)
          row.setField(0,word)
          row.setField(1,1)
          collect(row)
        }})
    }
  }
}
6.Window
代码语言:txt
复制
Flink支持ProcessTime、EventTime和IngestionTime三种时间概念,针对每种时间概念,Flink Table API中使用Schema中单独的字段来表示时间属性,当时间字段被指定后,就可以在基于时间的操作算子中使用相应的时间属性。

在Table API中通过使用.rowtime来定义EventTime字段,在ProcessTime时间字段名后使用.proctime后缀来指定ProcessTime时间属性.

案例:统计最近5秒钟,每个基站的呼叫数量

代码语言:scala
复制
object TableAPITest {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //指定EventTime为时间语义
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamEnv.setParallelism(1)
    //初始化Table API的上下文环境
    val tableEvn =StreamTableEnvironment.create(streamEnv)
    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala._

    val data = streamEnv.socketTextStream("hadoop101",8888)
          .map(line=>{
            var arr =line.split(",")
            new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
          })
      .assignTimestampsAndWatermarks( //引入Watermark
        new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)){//延迟2秒
          override def extractTimestamp(element: StationLog) = {
            element.callTime
          }
        })

    //设置时间属性
    val table: Table = tableEvn.fromDataStream(data,'sid,'callOut,'callIn,'callType,'callTime.rowtime)
    //滚动Window ,第一种写法
    val result: Table = table.window(Tumble over 5.second on 'callTime as 'window)
    //第二种写法
    val result: Table = table.window(Tumble.over("5.second").on("callTime").as("window"))
      .groupBy('window, 'sid)
      .select('sid, 'window.start, 'window.end, 'window.rowtime, 'sid.count)
    //打印结果
    tableEvn.toRetractStream[Row](result)
      .filter(_._1==true)
      .print()
  

    tableEvn.execute("sql")
  }
}

上面的案例是滚动窗口,如果是滑动窗口也是一样,代码如下:

代码语言:scala
复制
//滑动窗口,窗口大小为:10秒,滑动步长为5秒 :第一种写法
table.window(Slide over 10.second every 5.second on 'callTime as 'window)
//滑动窗口第二种写法 table.window(Slide.over("10.second").every("5.second").on("callTime").as("window"))

Flink SQL

企业中Flink SQL比Table API用的多

Flink SQL 是 Apache Flink 提供的一种使用 SQL 查询和处理数据的方式。它允许用户通过 SQL 语句对数据流或批处理数据进行查询、转换和分析,无需编写复杂的代码。Flink SQL 提供了一种更直观、易于理解和使用的方式来处理数据,同时也可以与 Flink 的其他功能无缝集成。

Flink SQL 支持 ANSI SQL 标准,并提供了许多扩展和优化来适应流式处理和批处理场景。它能够处理无界数据流,具备事件时间和处理时间的语义,支持窗口、聚合、连接等常见的数据操作,还提供了丰富的内置函数和扩展插件机制。

下面是一个简单的 Flink SQL 代码示例,展示了如何使用 Flink SQL 对流式数据进行查询和转换。

代码语言:java
复制
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FlinkSqlExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  // 设置并行度为1,方便观察输出结果

        // 创建 Kafka 数据源
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
        DataStream<String> sourceStream = env.addSource(kafkaConsumer);

        // 注册数据源表
        env.createTemporaryView("source_table", sourceStream, "message");

        // 执行 SQL 查询和转换
        String query = "SELECT message, COUNT(*) AS count FROM source_table GROUP BY message";
        DataStream<Result> resultStream = env.sqlQuery(query).map(value -> new Result(value.getField(0), value.getField(1)));

        // 打印结果
        resultStream.print();

        env.execute("Flink SQL Example");
    }

    // 自定义结果类
    public static class Result {
        public String message;
        public Long count;

        public Result() {}

        public Result(String message, Long count) {
            this.message = message;
            this.count = count;
        }

        @Override
        public String toString() {
            return "Result{" +
                    "message='" + message + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

在上述示例中,我们使用 Apache Kafka 作为数据源,并创建了一个消费者从名为 "input-topic" 的 Kafka 主题中读取数据。然后,我们将数据流注册为名为 "source_table" 的临时表。

接下来,我们使用 Flink SQL 执行 SQL 查询和转换。在这个例子中,我们查询 "source_table" 表,对 "message" 字段进行分组并计算每个消息出现的次数。查询结果会映射到自定义的 Result 类,并最终通过 print() 方法打印到标准输出。

最后,我们通过调用 env.execute() 方法来启动 Flink 作业的执行。

Flink的复杂事件处理CEP

复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。

CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。

CEP相关概念

配置依赖

在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。

代码语言:html
复制
 <dependency>  
 	<groupId>org.apache.flink</groupId>  
 	<artifactId>flink-cep-scala_2.11</artifactId>  
 	<version>1.9.1</version>
 </dependency>

 <dependency> 
     <groupId>org.apache.flink</groupId>  
     <artifactId>flink-cep-scala_2.11</artifactId>  
 	<version>1.9.1</version>
</dependency>
事件定义
  • 简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。
  • 复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。
代码语言:txt
复制
复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。

Pattern API

Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:

  1. 输入事件流的创建
  2. Pattern的定义
  3. Pattern应用在事件流上检测
  4. 选取结果
模式定义

定义Pattern可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的

代码语言:scala
复制
val start = Pattern.begin[Event]("start_pattern")

下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。

代码语言:scala
复制
start.where(_.getCallType == "success")
设置循环次数

对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。

  • times:可以通过times指定固定的循环执行次数。
代码语言:scala
复制
//指定循环触发4次
start.times(4);
//可以执行触发次数范围,让循环执行次数在该范围之内
start.times(2, 4);
  • optional:也可以通过optional关键字指定要么不触发要么触发指定的次数。
代码语言:scala
复制
  start.times(4).optional();
  start.times(2, 4).optional();
  • greedy:可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发。
代码语言:scala
复制
  //触发2、3、4次,尽可能重复执行
  start.times(2, 4).greedy();
  //触发0、2、3、4次,尽可能重复执行
  start.times(2, 4).optional().greedy();
  • oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
代码语言:txt
复制
  // 触发一次或者多次
  start.oneOrMore();
  //触发一次或者多次,尽可能重复执行
  start.oneOrMore().greedy();
  // 触发0次或者多次
  start.oneOrMore().optional();
  // 触发0次或者多次,尽可能重复执行
  start.oneOrMore().optional().greedy();
  • timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
代码语言:txt
复制
// 触发两次或者多次
  start.timesOrMore(2);
  // 触发两次或者多次,尽可能重复执行
  start.timesOrMore(2).greedy();
  // 不触发或者触发两次以上,尽可能重复执行
  start.timesOrMore(2).optional().greedy();
定义条件

每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。

  • 简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。
代码语言:txt
复制
  // 把通话成功的事件挑选出来
  start.where(_.getCallType == "success")
  • 组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。
代码语言:scala
复制
  // 把通话成功,或者通话时长大于10秒的事件挑选出来
  val start = Pattern.begin[StationLog]("start_pattern")
  .where(_.callType=="success")
  .or(_.duration>10)
  • 终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。
代码语言:txt
复制
  pattern.oneOrMore.until(_.callOut.startsWith("186"))
模式序列

将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。

  • 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。

val strict: Pattern[Event] = start.next("middle").where(...)

  • 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系。

val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

  • 非确定宽松邻近:和宽松邻近条件相比,非确定宽松邻近条件指在模式匹配过程中可以忽略已经匹配的条件。

val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

  • 除以上模式序列外,还可以定义“不希望出现某种近邻关系”:
代码语言:txt
复制
	.notNext()  —— 不想让某个事件严格紧邻前一个事件发生。
代码语言:txt
复制
	.notFollowedBy() —— 不想让某个事件在两个事件之间发生。

注意

  1. 所有模式序列必须以 .begin() 开始
  2. 模式序列不能以 .notFollowedBy() 结束
  3. “not” 类型的模式不能被 optional 所修饰
  4. 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
代码语言:java
复制

//指定模式在10秒内有效

pattern.within(Time.seconds(10));

代码语言:txt
复制
模式检测

调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream

代码语言:scala
复制
//cep 做模式检测
val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)
选择结果

得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。

通过Select Funciton抽取正常事件

可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, IterableIN],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。

代码语言:scala
复制
def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {  
//获取pattern中的startEvent  
val startEvent = pattern.get("start_pattern").get.next    
//获取Pattern中middleEvent    
val middleEvent = pattern.get("middle").get.next    
//返回结果    
OUT(startEvent, middleEvent)}

通过Flat Select Funciton抽取正常事件

代码语言:txt
复制
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
代码语言:scala
复制
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {    //获取pattern中startEvent  
val startEvent = pattern.get("start_pattern").get.next    
//获取Pattern中middleEvent  
val middleEvent = pattern.get("middle").get.next    
//并根据startEvent的Value数量进行返回  
for (i <- 0 to startEvent.getValue) {    
	collector.collect(OUT(startEvent, middleEvent))  
}}

通过Select Funciton抽取超时事件

如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。

代码语言:scala
复制
// 通过CEP.pattern方法创建
PatternStream  val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)  //创建OutputTag,并命名为timeout-output  
val timeoutTag = OutputTag[String]("timeout-output")  
//调用PatternStream select()并指定timeoutTag  val result: SingleOutputStreamOperator[NormalEvent] =   patternStream.select(timeoutTag){  
//超时事件获取    
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>       
TimeoutEvent()//返回异常事件  
} { 
//正常事件获取
pattern: Map[String, Iterable[Event]] =>      
NormalEvent()
//返回正常事件  
}
//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)

Flink内存优化

在大数据领域,大多数开源框架(Hadoop、Spark、Flink)都是基于JVM运行,但是JVM的内存管理机制往往存在着诸多类似OutOfMemoryError的问题,主要是因为创建过多的对象实例而超过JVM的最大堆内存限制,却没有被有效回收掉,这在很大程度上影响了系统的稳定性,尤其对于大数据应用,面对大量的数据对象产生,仅仅靠JVM所提供的各种垃圾回收机制很难解决内存溢出的问题。在开源框架中有很多框架都实现了自己的内存管理,例如Apache Spark的Tungsten项目,在一定程度上减轻了框架对JVM垃圾回收机制的依赖,从而更好地使用JVM来处理大规模数据集。

Flink也基于JVM实现了自己的内存管理,将JVM根据内存区分为Unmanned Heap、Flink Managed Heap、Network Buffers三个区域。在Flink内部对Flink Managed Heap进行管理,在启动集群的过程中直接将堆内存初始化成Memory Pages Pool,也就是将内存全部以二进制数组的方式占用,形成虚拟内存使用空间。新创建的对象都是以序列化成二进制数据的方式存储在内存页面池中,当完成计算后数据对象Flink就会将Page置空,而不是通过JVM进行垃圾回收,保证数据对象的创建永远不会超过JVM堆内存大小,也有效地避免了因为频繁GC导致的系统稳定性问题。

JobManager配置

JobManager在Flink系统中主要承担管理集群资源、接收任务、调度Task、收集任务状态以及管理TaskManager的功能,JobManager本身并不直接参与数据的计算过程中,因此JobManager的内存配置项不是特别多,只要指定JobManager堆内存大小即可。

jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。

TaskManager配置

TaskManager作为Flink集群中的工作节点,所有任务的计算逻辑均执行在TaskManager之上,因此对TaskManager内存配置显得尤为重要,可以通过以下参数配置对TaskManager进行优化和调整。

  • taskmanager.heap.size:设定TaskManager堆内存大小,默认值为1024M,如果在Yarn的集群中,TaskManager取决于Yarn分配给TaskManager Container的内存大小,且Yarn环境下一般会减掉一部分内存用于Container的容错。
  • taskmanager.jvm-exit-on-oom:设定TaskManager是否会因为JVM发生内存溢出而停止,默认为false,当TaskManager发生内存溢出时,也不会导致TaskManager停止。
  • taskmanager.memory.size:设定TaskManager内存大小,默认为0,如果不设定该值将会使用taskmanager.memory.fraction作为内存分配依据。
  • taskmanager.memory.fraction:设定TaskManager堆中去除Network Buffers内存后的内存分配比例。该内存主要用于TaskManager任务排序、缓存中间结果等操作。例如,如果设定为0.8,则代表TaskManager保留80%内存用于中间结果数据的缓存,剩下20%的内存用于创建用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size不设定的情况下才生效。
  • taskmanager.memory.off-heap:设置是否开启堆外内存供Managed Memory或者Network Buffers使用。
  • taskmanager.memory.preallocate:设置是否在启动TaskManager过程中直接分配TaskManager管理内存。
  • taskmanager.numberOfTaskSlots:每个TaskManager分配的slot数量。

Flink的网络缓存优化

Flink将JVM堆内存切分为三个部分,其中一部分为Network Buffers内存。Network Buffers内存是Flink数据交互层的关键内存资源,主要目的是缓存分布式数据处理过程中的输入数据。。通常情况下,比较大的Network Buffers意味着更高的吞吐量。如果系统出现“Insufficient number of network buffers”的错误,一般是因为Network Buffers配置过低导致,因此,在这种情况下需要适当调整TaskManager上Network Buffers的内存大小,以使得系统能够达到相对较高的吞吐量。

目前Flink能够调整Network Buffer内存大小的方式有两种:一种是通过直接指定Network Buffers内存数量的方式,另外一种是通过配置内存比例的方式。

设定Network Buffer内存数量(过时了)

直接设定Nework Buffer数量需要通过如下公式计算得出:

NetworkBuffersNum = total-degree-of-parallelism \* intra-node-parallelism * n

其中total-degree-of-parallelism表示每个TaskManager的总并发数量,intra-node-parallelism表示每个TaskManager输入数据源的并发数量,n表示在预估计算过程中Repar-titioning或Broadcasting操作并行的数量。intra-node-parallelism通常情况下与Task-Manager的所占有的CPU数一致,且Repartitioning和Broadcating一般下不会超过4个并发。可以将计算公式转化如下:

NetworkBuffersNum = <slots-per-TM>^2 \* < TMs>* 4

其中slots-per-TM是每个TaskManager上分配的slots数量,TMs是TaskManager的总数量。对于一个含有20个TaskManager,每个TaskManager含有8个Slot的集群来说,总共需要的Network Buffer数量为8^2*204=5120个,因此集群中配置Network Buffer内存的大小约为160M较为合适。

计算完Network Buffer数量后,可以通过添加如下两个参数对Network Buffer内存进行配置。其中segment-size为每个Network Buffer的内存大小,默认为32KB,一般不需要修改,通过设定numberOfBuffers参数以达到计算出的内存大小要求。

  • taskmanager.network.numberOfBuffers:指定Network堆栈Buffer内存块的数量。
  • taskmanager.memory.segment-size:内存管理器和Network栈使用的内存Buffer大小,默认为32KB。
设定Network内存比例(推荐)

从1.3版本开始,Flink就提供了通过指定内存比例的方式设置Network Buffer内存大小。

  • taskmanager.network.memory.fraction:JVM中用于Network Buffers的内存比例。
  • taskmanager.network.memory.min:最小的Network Buffers内存大小,默认为64MB。
  • taskmanager.network.memory.max:最大的Network Buffers内存大小,默认1GB。
  • taskmanager.memory.segment-size:内存管理器和Network栈使用的Buffer大小,默认为32KB。

本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。文章持续更新,可以关注公众号第一时间阅读。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Table API & Flink SQL
    • 开发环境构建
      • Table Environment
        • Table API
          • Virtual Tables(虚拟表)
          • 表流互转
          • 动态表和持续查询
          • 连接到外部系统
        • Table API实战
          • 1.创建Table
          • 2.修改Table中字段名
          • 3.查询和过滤
          • 4.分组聚合
          • 5.UDF自定义的函数
          • 6.Window
        • Flink SQL
        • Flink的复杂事件处理CEP
          • CEP相关概念
            • 配置依赖
            • 事件定义
          • Pattern API
            • 模式定义
            • 设置循环次数
            • 定义条件
            • 模式序列
            • 模式检测
            • 选择结果
        • Flink内存优化
          • JobManager配置
            • TaskManager配置
              • Flink的网络缓存优化
                • 设定Network Buffer内存数量(过时了)
                • 设定Network内存比例(推荐)
            相关产品与服务
            流计算 Oceanus
            流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档