前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink Table的ScalarFunction

聊聊flink Table的ScalarFunction

原创
作者头像
code4it
发布2019-02-10 08:34:59
2.3K0
发布2019-02-10 08:34:59
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink Table的ScalarFunction

实例

代码语言:javascript
复制
public class HashCode extends ScalarFunction {
​
    private int factor = 0;
​
    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }
​
    public int eval(String s) {
        return s.hashCode() * factor;
    }
}
​
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
​
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
​
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
​
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
​
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode继承了ScalarFunction,它定义了eval方法

ScalarFunction

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala

代码语言:javascript
复制
abstract class ScalarFunction extends UserDefinedFunction {
​
  /**
    * Creates a call to a [[ScalarFunction]] in Scala Table API.
    *
    * @param params actual parameters of function
    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
    */
  final def apply(params: Expression*): Expression = {
    ScalarFunctionCall(this, params)
  }
​
  // ----------------------------------------------------------------------------------------------
​
  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @param signature signature of the method the return type needs to be determined
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
​
  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
            s"automatically determined. Please provide type information manually.")
      }
    }
  }
}
  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowProcessRunner.scala

代码语言:javascript
复制
class CRowProcessRunner(
    name: String,
    code: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[ProcessFunction[Row, Row]]
  with Logging {
​
  private var function: ProcessFunction[Row, Row] = _
  private var cRowWrapper: CRowWrappingCollector = _
​
  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
    LOG.debug("Instantiating ProcessFunction.")
    function = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(function, parameters)
​
    this.cRowWrapper = new CRowWrappingCollector()
  }
​
  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {
​
    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)
    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }
​
  override def getProducedType: TypeInformation[CRow] = returnType
​
  override def close(): Unit = {
    FunctionUtils.closeFunction(function)
  }
}
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java

代码语言:javascript
复制
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
​
    private static final long serialVersionUID = 1L;
​
    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter
     * and also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
     *            a {@link TimerService} for registering timers and querying the time. The
     *            context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
​
    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
     *            querying the {@link TimeDomain} of the firing timer and getting a
     *            {@link TimerService} for registering timers and querying the time.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
​
    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {
​
        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program
         * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();
​
        /**
         * A {@link TimerService} for querying time and registering timers.
         */
        public abstract TimerService timerService();
​
        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
​
    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /**
         * The {@link TimeDomain} of the firing timer.
         */
        public abstract TimeDomain timeDomain();
    }
​
}
  • ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

代码语言:javascript
复制
class DataStreamCalc(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    input: RelNode,
    inputSchema: RowSchema,
    schema: RowSchema,
    calcProgram: RexProgram,
    ruleDescription: String)
  extends Calc(cluster, traitSet, input, calcProgram)
  with CommonCalc
  with DataStreamRel {
​
  //......
​
  override def translateToPlan(
      tableEnv: StreamTableEnvironment,
      queryConfig: StreamQueryConfig): DataStream[CRow] = {
​
    val config = tableEnv.getConfig
​
    val inputDataStream =
      getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
​
    // materialize time attributes in condition
    val condition = if (calcProgram.getCondition != null) {
      val materializedCondition = RelTimeIndicatorConverter.convertExpression(
        calcProgram.expandLocalRef(calcProgram.getCondition),
        inputSchema.relDataType,
        cluster.getRexBuilder)
      Some(materializedCondition)
    } else {
      None
    }
​
    // filter out time attributes
    val projection = calcProgram.getProjectList.asScala
      .map(calcProgram.expandLocalRef)
​
    val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)
​
    val genFunction = generateFunction(
      generator,
      ruleDescription,
      inputSchema,
      schema,
      projection,
      condition,
      config,
      classOf[ProcessFunction[CRow, CRow]])
​
    val inputParallelism = inputDataStream.getParallelism
​
    val processFunc = new CRowProcessRunner(
      genFunction.name,
      genFunction.code,
      CRowTypeInfo(schema.typeInfo))
​
    inputDataStream
      .process(processFunc)
      .name(calcOpName(calcProgram, getExpressionString))
      // keep parallelism to ensure order of accumulate and retract messages
      .setParallelism(inputParallelism)
  }
}
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

小结

  • ScalarFunction继承了UserDefinedFunction,它定义了apply、getResultType、getParameterTypes方法;ScalarFunction主要用于将零个/一个/多个scalar值map成新的scalar值,其map行为由用户自定义的public的eval方法来实现;另外一般建议使用原始类型作为declare parameters或者result types,比如用int替代DATE/TIME,用long替代TIMESTAMP
  • CRowProcessRunner的processElement方法调用了function.processElement,而function.processElement会去调用用户定义的ScalarFunction的eval方法;这里的function继承了ProcessFunction,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成;ProcessFunction继承了AbstractRichFunction,它定义了抽象方法processElement
  • DataStreamCalc的translateToPlan调用了CommonCalc的generateFunction方法,生成了genFunction,之后通过genFunction.name,genFunction.code及CRowTypeInfo创建了CRowProcessRunner;生成的code继承了ProcessFunction,实现了processElement方法,该方法会去调用用户定义的ScalarFunction的eval方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • ScalarFunction
  • CRowProcessRunner
  • ProcessFunction
  • DataStreamCalc
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档