专栏首页码匠的流水账聊聊flink的TableFunction
原创

聊聊flink的TableFunction

本文主要研究一下flink的TableFunction

实例

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}
​
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]
​
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
​
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
​
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
  • 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据

UserDefinedFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala

abstract class UserDefinedFunction extends Serializable {
  /**
    * Setup method for user-defined function. It can be used for initialization work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def open(context: FunctionContext): Unit = {}
​
  /**
    * Tear-down method for user-defined function. It can be used for clean up work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def close(): Unit = {}
​
  /**
    * @return true if and only if a call to this function is guaranteed to always return
    *         the same result given the same parameters; true is assumed by default
    *         if user's function is not pure functional, like random(), date(), now()...
    *         isDeterministic must return false
    */
  def isDeterministic: Boolean = true
​
  final def functionIdentifier: String = {
    val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
    getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
  }
​
  /**
    * Returns the name of the UDF that is used for plan explain and logging.
    */
  override def toString: String = getClass.getSimpleName
​
}
  • UserDefinedFunction定义了open、close、functionIdentifier方法

TableFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala

abstract class TableFunction[T] extends UserDefinedFunction {
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Emit an output row.
    *
    * @param row the output row
    */
  protected def collect(row: T): Unit = {
    collector.collect(row)
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * The code generated collector used to emit row.
    */
  private var collector: Collector[T] = _
​
  /**
    * Internal use. Sets the current collector.
    */
  private[flink] final def setCollector(collector: Collector[T]): Unit = {
    this.collector = collector
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType: TypeInformation[T] = null
​
  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
            s"automatically determined. Please provide type information manually.")
      }
    }
  }
​
}
  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法

ProcessOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java

@Internal
public class ProcessOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
​
    private static final long serialVersionUID = 1L;
​
    private transient TimestampedCollector<OUT> collector;
​
    private transient ContextImpl context;
​
    /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
    private long currentWatermark = Long.MIN_VALUE;
​
    public ProcessOperator(ProcessFunction<IN, OUT> function) {
        super(function);
​
        chainingStrategy = ChainingStrategy.ALWAYS;
    }
​
    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
​
        context = new ContextImpl(userFunction, getProcessingTimeService());
    }
​
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }
​
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }
​
    //......
}
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner

CRowCorrelateProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

class CRowCorrelateProcessRunner(
    processName: String,
    processCode: String,
    collectorName: String,
    collectorCode: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[Any]
  with Logging {
​
  private var function: ProcessFunction[Row, Row] = _
  private var collector: TableFunctionCollector[_] = _
  private var cRowWrapper: CRowWrappingCollector = _
​
  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
    LOG.debug("Instantiating TableFunctionCollector.")
    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
    this.cRowWrapper = new CRowWrappingCollector()
​
    LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
    val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
    val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
    LOG.debug("Instantiating ProcessFunction.")
    function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
    FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(collector, parameters)
    FunctionUtils.openFunction(function, parameters)
  }
​
  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {
​
    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)
​
    collector.setCollector(cRowWrapper)
    collector.setInput(in.row)
    collector.reset()
​
    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }
​
  override def getProducedType: TypeInformation[CRow] = returnType
​
  override def close(): Unit = {
    FunctionUtils.closeFunction(collector)
    FunctionUtils.closeFunction(function)
  }
}
  • CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

小结

  • TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
  • 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
  • ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊SecurityContextPersistenceFilter

    本文主要研究下SecurityContextPersistenceFilter的作用。

    codecraft
  • 聊聊storm的WindowedBoltExecutor

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor....

    codecraft
  • 聊聊Spring Data Auditable接口的变化

    spring-data-commons-1.12.8.RELEASE-sources.jar!/org/springframework/data/domain/...

    codecraft
  • leetcode: 119. Pascal's Triangle II

    Petrichor_
  • Codeforces 777A Shell Game

    A. Shell Game time limit per test:0.5 seconds memory limit per test:256 megabyte...

    Angel_Kitty
  • SyntaxError: Block-scoped declarations (let, const, function, class) not yet supported outside stric

    qubianzhong
  • 第四周编程作业(二)-Deep Neural Network for Image Classification: ApplicationDeep Neural Network for Image Cl

    Deep Neural Network for Image Classification: Application When you finish this, ...

    致Great
  • Python之‘’控制流‘’

    一、if语句 格式: i1 = 3 if i1 > 4: print('yes you are right') elif 0 < i1 < 4: ...

    用户1173509
  • 【爬虫基础】网页是怎么构成的?

    作者 张俊红 本文为 CDA 志愿者张俊红原创作品,转载需授权 所谓的网络爬虫就是从网页中指定位置找到对应的数据并下载,要想知道数据在什么位置,我们需要首...

    CDA数据分析师
  • 论文推荐 | 提升100倍速度的切片递归神经网络

    RNN训练慢、训练困难的问题已经是老生常谈了,循环结构带来的跨越多个步骤时的梯度消失和难以并行的特点几乎被认为是不可克服的,人们也已经接受了“RNN就是这样的”...

    AI研习社

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动