前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Byzer UDF 函数开发指南

Byzer UDF 函数开发指南

作者头像
用户2936994
发布2022-04-07 15:30:59
9930
发布2022-04-07 15:30:59
举报
文章被收录于专栏:祝威廉祝威廉

Byzer 提供了三种方式让用户自己实现 UDF 从而更好的扩展SQL的能力。

  1. 动态 UDF. 在 Byzer 中使用 Scala/Java 编写 UDF, 随写随用,无需编译打包发布重启
  2. 内置 UDF. 使用 Scala/Java 编写 UDF,然后发布成 Jar, 引入 Jar 包后,需要重启
  3. 使用基于 Hive 开发的 UDF

动态 UDF

动态 UDF的使用最简单,用户可以使用 Byzer 的 register 语句将一段 Scala/Java 代码注册成 UDF.

比如,我们正在开发一个 ETL 脚本,希望获得一个数组的最后一个元素,但发现没有原生内置的函数能够实现这个,这个时候,可以直接用 Byzer Register 语句生成一个 UDF 函数,名称叫 arrayLast, 代码示例如下;

代码语言:javascript
复制
register ScriptUDF.`` as arrayLast 
where lang="scala"
and code='''def apply(a:Seq[String])={
      a.last
}'''
and udfType="udf";

select arrayLast(array("a","b")) as lastChar as output;

写完之后,就可以在后续的 select 语句中使用。 运行结果如下:

在上面的示例中,如果用户使用 Scala 编写,那么 udfType 支持 udf/udaf 。UDAF 也很简单,示例如下:

代码语言:javascript
复制
REGISTER ScriptUDF.`` AS plusFun options
className="SumAggregation"
and code='''

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class SumAggregation extends UserDefinedAggregateFunction with Serializable{
    def inputSchema: StructType = new StructType().add("a", LongType)
    def bufferSchema: StructType =  new StructType().add("total", LongType)
    def dataType: DataType = LongType
    def deterministic: Boolean = true
    def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, 0l)
    }
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val sum   = buffer.getLong(0)
      val newitem = input.getLong(0)
      buffer.update(0, sum + newitem)
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
    }
    def evaluate(buffer: Row): Any = {
      buffer.getLong(0)
    }
}
'''
and udfType="udaf";

SET data='''
{"a":1}
{"a":1}
{"a":1}
{"a":1}
''';

LOAD jsonStr.`data` AS dataTable;

SELECT a,plusFun(a) AS res FROM dataTable GROUP BY a AS output;

执行结果:

注意: Java 在当前版本只支持 UDF, 不支持 UDAF。

如何构建可复用的 UDF 工具集

对于这些动态编写的 UDF 函数,我们可以将其放在独立的 Byzer notebook 里,然后通过 include 语法引入(注意,该功能需要 Byzer notebook 1.1.0 以及以上版本才支持)

具体做法分成两步。第一步,创建一个 用于存储 UDF 的 Notebook, 比如 Notebook 名字叫 udfs.bznb

然后我们填入 arrayLast 函数的代码。接着,新创建一个 Notebook, 比如叫 job.bznb, 在该 Notebook 里可以通过如下方式引入 arrayLast 函数:

代码语言:javascript
复制
include http.`project.demo.udfs`;
select arrayLast(array("a","b")) as lastChar as output;

结果如下:

在 Byzer Notebook 中,需要在一个 Notebook 里引入另外一个 Notebook,可以通过 Include语法,其中 http 和 project 是固定的。 后面 demo.udfs 则是目录路径,只不过用 . 替换了 /

假设 udfs 里有很多函数,不希望把所有的函数都包含进来,那么可以指定 Cell 的 序号 。 比如只包含第一个 cell, 那么可以这么写:

代码语言:javascript
复制
include http.`project.demo.udfs#1`;
select arrayLast(array("a","b")) as lastChar as output;

期待 Byzer notebook 以后可以支持给 cell 命名

除此之外,还可以将代码放到 git 仓库中,假设用户放到 gitee上,那么可以用如下方式引用:

代码语言:javascript
复制
include lib.`gitee.com/allwefantasy/lib-core`
where alias="libCore";

include local.`libCore.udf.hello`;
select hello() as name as output;

在这里,我们引用了 lib-core 项目里的一个 hello 函数,然后接着就可以在 select 语法中使用。

结果如下:

内置 UDF 函数

新建一个 Java/Scala 混合项目, 里面创建一个 object 对象,比如叫:

代码语言:javascript
复制
package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration

object MyFunctions {
}

接着添加一个函数 mkString

代码语言:javascript
复制
package tech.mlsql.udfs.custom
import org.apache.spark.sql.UDFRegistration

object MyFunctions {
   def mkString(uDFRegistration: UDFRegistration) = {
    uDFRegistration.register("mkString", (sep: String, co: WrappedArray[String]) => {
      co.mkString(sep)
    })
  }
}

该函数接受一个 UDFRegistration 对象, 然后使用该对象注册真实的 UDF 函数。 register 方法的第一个参数是 UDF 在 SQL 中使用的名字,第二个参数则是一个普通的 Scala 函数。

如果想具体的业务逻辑使用 Java 开发,那么需要单独再写一个 Java 类,在里面实现具体的逻辑,然后在 Scala 函数中调用。

开发完成后,打包这个项目,生成 Jar 包,为了能够让 Byzer 识别到这些 UDF, 需要做三件事:

  1. 把 Jar 包丢到 Byzer 项目的 jars 目录里去
  2. 启动时,在启动脚本中添加一个参数 -streaming.udf.clzznames "tech.mlsql.udfs.custom.MyFunctions" 如果有多个,用逗号分割即可。
  3. 然后重启 Byzer。

目前内置的很多内置的 UDF 函数就是利用这种方式开发的。 参看 streaming.core.compositor.spark.udf.Functions

如何把 Jar 包放到正确的目录里很重要,对于不同的 Byzer 发行版,目录可能有差异。具体如下;

  1. 分布式 Yarn based 版本,将 Jar 包放到 ${SPARK_HOME}/jars 目录即可。 如果是已经运行了,你需要重启 Byzer。
  2. 分布式 K8s base 版本, 现阶段可能需要重新构建镜像。参考 Byzer build 项目。
  3. Sandbox 版本,启动容器后,进入容器 /work 目录,然后将 Jar 包放到 /work/${SPARK_HOME}/jars 目录即可. 需要重启容器。
  4. 桌面版本,以 Mac 为例, 将 Jar 包放到 ~/.vscode/extensions/allwefantasy.mlsql-0.0.7/dist/mlsql-lang/spark 目录下即可,然后重启 VSCode 即可。
  5. 命令行版本,则是在发行版根目录下的 libs/ 目录里。

使用基于 Hive 开发的 UDF

首先,按照前面内置函数中说的方式,将基于 Hive 规范的 UDF 函数的 Jar 包放到指定的目录中。

其次,你需要执行特定的指令动态注册:

代码语言:javascript
复制
CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs';

考虑到该指令重启后会失效,用户可以将这些指令放到一个单独的 Notebook里,然后采用 动态 UDF 中介绍的,通过 include 语法在需要使用的地方进行引用即可。

为啥 UDF 不支持 Python 呢

为啥 UDF 不支持 Python 呢? 因为我们有更好更高效的方式 [Byzer-python](虎年知识年货之 Byzer-python 一站式教学指南来啦!),比如下面的两段代码:

代码语言:javascript
复制
load binaryFile.`/tmp/cifar10/cifar/train/*.png` as cifar10;

上面这段代码把对象存储里的五万张图片都加载成表。接着我希望把每张图片缩放成 28*28 像素,这个时候用 Python 其实会方便些,因为 Python里有很多成熟的库,比如 OpenCV。 用户可以按如下方式处理:

代码语言:javascript
复制
#%python
#%input=raw_cifar10_table
#%output=cifar10_resize
#%cache=true
#%schema=st(field(content,binary),field(path,string))
#%dataMode=data
#%env=source /opt/miniconda3/bin/activate ray1.8.0

import io,cv2,numpy as np
from pyjava.api.mlsql import RayContext

ray_context = RayContext.connect(globals(),"127.0.0.1:10001")
ray_context.to_dataset().to_dask()
def resize_image(row):
    new_row = {}
    image_bin = row["content"]    
    oriimg = cv2.imdecode(np.frombuffer(io.BytesIO(image_bin).getbuffer(),np.uint8),1)
    newimage = cv2.resize(oriimg,(28,28))
    is_success, buffer = cv2.imencode(".png", newimage)
    io_buf = io.BytesIO(buffer)
    new_row["content"]=io_buf.getvalue()
    new_row["path"]= row["path"]    
    return new_row

ray_context.foreach(resize_image)

这里我们可以看到,我们只要在 Notebook 里写一个 Python 回调函数,然后传递给 ray_context 对象就可以。

最后,把 Python 处理的结果保存回文件系统:

代码语言:javascript
复制
select arrayLast(split(path,"/")) as fileName,content  
from cifar10_resize 
as final_dataset;

save overwrite final_dataset as image.`/tmp/size-28x28` 
where imageColumn="content" 
and fileName="fileName";

SQL 和 Python之间实现了无缝衔接。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 动态 UDF
    • 如何构建可复用的 UDF 工具集
    • 内置 UDF 函数
    • 使用基于 Hive 开发的 UDF
      • 为啥 UDF 不支持 Python 呢
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档