前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink UDF--Table Functions&Aggregation Functions

Flink UDF--Table Functions&Aggregation Functions

作者头像
小勇DW3
发布2020-01-14 15:01:28
7250
发布2020-01-14 15:01:28
举报
文章被收录于专栏:小勇DW3小勇DW3

1.Table Functions 表函数

   与标量函数相似之处是输入可以0,1,或者多个参数,但是不同之处可以输出任意数目的行数。返回的行也可以包含一个或者多个列。

   为了自定义表函数,需要继承TableFunction,实现一个或者多个evaluation方法。表函数的行为定义在这些evaluation方法内部,函数名为eval并且必须是public。TableFunction可以重载多个eval方法。Evaluation方法的输入参数类型,决定着表函数的输入类型。Evaluation方法也支持变参,例如:eval(String... strs)。返回表的类型取决于TableFunction的基本类型。Evaluation方法使用collect(T)发射输出rows。

   在Table API中,表函数在scala语言中使用方法如下:.join(Expression) 或者 .leftOuterJoin(Expression),在java语言中使用方法如下:.join(String) 或者.leftOuterJoin(String)。

  • Join操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行。
  • leftOuterJoin操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行,并且在表函数返回一个空表的情况下会保留所有的outer rows。

在sql语法中稍微有点区别:

  • cross join用法是LATERAL TABLE(<TableFunction>)。
  • LEFT JOIN用法是在join条件中加入ON TRUE。

下面的例子讲的是如何使用表值函数。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).

public class Split extends TableFunction<Tuple2<String, Integer>> {

  private String separator = " ";
  public Split(String separator) {
      this.separator = separator;
  }
  public void eval(String str) {
      for (String s : str.split(separator)) {
          // use collect(...) to emit a row
          collect(new Tuple2<String, Integer>(s, s.length()));
      }
  }
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");

myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

   需要注意的是PROJO类型不需要一个确定的字段顺序。意味着你不能使用as修改表函数返回的pojo的字段的名字。

   默认情况下TableFunction返回值类型是由flink类型抽取工具决定。对于基础类型及简单的POJOS是足够的,但是更复杂的类型,自定义类型,组合类型,会报错。这种情况下,返回值类型的TypeInformation,需要手动指定,方法是重载TableFunction#getResultType()。

下面的例子,我们通过复写TableFunction#getResultType()方法使得表返回类型是RowTypeInfo(String, Integer)。

public class CustomTypeSplit extends TableFunction<Row> {
  public void eval(String str) {
      for (String s : str.split(" ")) {
          Row row = new Row(2);
          row.setField(0, s);
          row.setField(1, s.length);
          collect(row);
      }
  }
  @Override
  public TypeInformation<Row> getResultType() {
      return Types.ROW(Types.STRING(), Types.INT());
  }
}

2.Aggregation Functions 聚合函数

   用户自定义聚合函数聚合一张表(一行或者多行,一行有一个或者多个属性)为一个标量的值。

   聚合函数需要继承AggregateFunction。聚合函数工作方式如下:

  • 首先,需要一个accumulator,这个是保存聚合中间结果的数据结构。调用AggregateFunction函数的createAccumulator()方法来创建一个空accumulator.
  • 随后,每个输入行都会调用accumulate()方法来更新accumulator。一旦所有的行被处理了,getValue()方法就会被调用,计算和返回最终的结果。

对于每个AggregateFunction,下面三个方法都是比不可少的:

createAccumulator()

accumulate()

getValue()

   flink的类型抽取机制不能识别复杂的数据类型,比如,数据类型不是基础类型或者简单的pojos类型。所以,类似于ScalarFunction 和TableFunction,AggregateFunction提供了方法去指定返回结果类型的TypeInformation,用的是AggregateFunction#getResultType()。Accumulator类型用的是AggregateFunction#getAccumulatorType()。

   除了上面的方法,还有一些可选的方法。有些方法是让系统更加高效的执行查询,另外的一些在特定的场景下是必须的。例如,merge()方法在会话组窗口(session group window)上下文中是必须的。当一行数据是被视为跟两个回话窗口相关的时候,两个会话窗口的accumulators需要被join。

AggregateFunction的下面几个方法,根据使用场景的不同需要被实现:

  • retract():在bounded OVER窗口的聚合方法中是需要实现的。
  • merge():在很多batch 聚合和会话窗口聚合是必须的。
  • resetAccumulator(): 在大多数batch聚合是必须的。

AggregateFunction的所有方法都是需要被声明为public,而不是static。定义聚合函数需要实现org.apache.flink.table.functions.AggregateFunction同时需要实现一个或者多个accumulate方法。该方法可以被重载为不同的数据类型,并且支持变参。

   为了计算加权平均值,累加器需要存储已累积的所有数据的加权和及计数。在栗子中定义一个WeightedAvgAccum类作为accumulator。尽管,retract(), merge(), 和resetAccumulator()方法在很多聚合类型是不需要的,这里也给出了栗子。

/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
  public long sum = 0;
  public int count = 0;
}
/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
  @Override
  public WeightedAvgAccum createAccumulator() {
    return new WeightedAvgAccum();
  }
  @Override
  public Long getValue(WeightedAvgAccum acc) {
      if (acc.count == 0) {
          return null;
      } else {
          return acc.sum / acc.count;
      }
  }
  public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
      acc.sum += iValue * iWeight;
      acc.count += iWeight;
  }
  public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
      acc.sum -= iValue * iWeight;
      acc.count -= iWeight;
  }
  public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
      Iterator<WeightedAvgAccum> iter = it.iterator();
      while (iter.hasNext()) {
          WeightedAvgAccum a = iter.next();
          acc.count += a.count;
          acc.sum += a.sum;
      }
  }
  public void resetAccumulator(WeightedAvgAccum acc) {
      acc.count = 0;
      acc.sum = 0L;
  }
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use function

tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

3.udf的最佳实践经验

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-01-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Table Functions 表函数
  • 2.Aggregation Functions 聚合函数
  • 3.udf的最佳实践经验
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档