00:00
那就想到了,那除了这个系统内置函数之外,嗯,那系统给我们已经提供的函数是这些,而且我们又说现在table API和弗CQ又不完善,很多功能还没实现,那假如说我们的这个需求它就没有给我们提供,那怎么办呢?啊,那没什么好说的,自己去定义对吧?Udf函数,所以用户自定义函数啊,这个user DeFine functions,这是一个非常重要的特性,在很多地方都有啊,大家在之前学习这个呃,CQ的时候,对吧?或者说学这个have的时候,大家肯定都是做过这样的一个自定义函数的定呃实现的啊,它其实会显著的扩展我们做查询的表达能力啊,那所以我们一般这个使用方式是什么样呢?呃,大多数情况下,我们定义的函数呢,需要先在环境里边注册,然后接下来才能在查询里边CQL查询里边去做使用,那怎么样去注册呢?有一个方法叫做register function,对吧?啊,这就是注册一个函数,就像我们。
01:00
啊,创建一个表,创建一个这个view视图一样,在在环境里边要把它注册进去,要把它定义出来,函数同样也是这样,你定义出来之后呢,还要先注册进去啊,那所以在注册的时候,其实本质就是插入到了这个表环境里边,它那个catallo里边,对吧?有那个函数目录,接下来我们的table API或者CQ的解析器就可以识别出来,正确的解释它了啊那接下来我们看一看到底有哪些类型的udf函数呢?首先最简单的就是标量函数啊,标量函数是这个名字叫做叫做scale functions啊啊,这个有点像啊,但是这是对吧,这个呃是标量的意思,什么叫标量呢?标量就是一个数值嘛,对吧,算出来就是一个值,所以说所谓的标量函数,那就是可以把零个一个或多个,也就是说这个参数是不定的啊,你可以传。
02:00
多个映射到一个新的标量值,简单来讲大家可以认为这是干什么,这不就是一个map操作嘛,对吧,我做一个map得到一个新的值就完事了啊,所以具体来使用的过程当中呢,就是我们需要要想定义这个标量函数,需要去扩展一个skill function啊,就是用这个flink table functions里边的这个鸡类skill function,然后实现它里边的一个求职方法,E value叫做evil,对吧,它这里边叫做evil,大家知道这是e value求职的意思,这里边有点奇怪,大家看这个并不是重写的方法对吧?它并不是当前我们的这个鸡肋,或者说这个接口里边已经给我们定义好的,哎,所以这个呢,是鞋,就是底层的这个扣子里边写死了,你必须实现这个方法,必须叫这个名儿,没办法,对吧,但是呢,它又不是重写的,你必须是自己手动的去定义出来,所以这里边大家会看到还是有一些不完善的地方啊,但是我们只要。
03:00
知道他怎么用就可以了,呃,这里边大家需要注意一下就是,呃,首先就是这个名字必须叫evil啊,另外还有一个就是必须是公开的,对吧,你不能定义成private,那就没办法再在外部去访问了,那所以这里面大家注意啊,这个evil传入的这个参数,这是个什么东西呢?传入的参数就是我们这里边输入的那些标量值,就是你当前的这个函数,比方说你到底要应用到哪个字段上对吧?哪个字段要调这个函数啊,所以你传进来的字段就是我们当前这个参数,这个s string,对吧?大家看,这是我要求一个哈希扣的这个这样一个函数啊,你可以把它传进来,然后呢,返回的数据类型是什么呢?就是当前e value evil的这个返回值的类型,比方说你做哈s code的求值嘛,直接把这个int类型返回对吧?呃,这就是所谓的这个标量函数,传入一个值或者多个值,得到一个数值一对一啊类似于这个。
04:00
呃,一对一,或者我们这里边说的一是一个一行数据对吧,你可以一行数据里边的多个字段提出来去传入,但还是一条数据里边的东西提出来去做转换,得到一个具体的值,标量值,类似于map对吧?啊,那接下来我们在代码里边把这一个一个。好,我们还是在当前的包下边,因为这个udf可能稍微会多一点,而且又不一样,对吧?我们单独的定义一个包叫做udf test,然后接下来我们定义一个叫function test,好把它先创建出来main方法定义出来,对吧?呃,然后接下来我们这个其实整体的流程跟之前应该是差不多的,对吧?哎,我先把这个想定义的这个先定义出来吧,我就直接在外面定义了啊,大家看这里面我写一个自定义标量函数,但呃,这个定义大家看,这种定义的方式就跟我们前面data stream API里面的那个函数类定义一样,对吧,看起来也是定义一个函数类,一个class嘛,啊,所以大家看看这个具体的写法啊,比方说我们就定义这个哈希code,我传入一个string,然后呢,呃,按照我自己定义的哈希算法。
05:23
把这个规则返回一个哈希code,一个int类型的哈希code对吧?那这里边我定义一个哈希code的这个类,然后里边可以传参这个,这个参数应该是一个因子对吧,或者说是一个随机数种子这样的一个东西啊,就是我外部去把这个东西传进来,那我这个叫做一个因子吧,Factor,一个int类型的因子,然后extend,它需要实现的是一个scale scale function这样一个接口,对吧?大家看这里边我们引入的呢,注意不要引入那个,呃,里边的那个啊,我们引入的是link table functions下边的这个function啊,这是这是我们说的这个抽象机类,对吧?啊,当前的这个抽象类继承这个抽象类,但是这个抽象类里边你会发现啊,它本身这里边的这个方法呢,好像没有正经这个求值的方法对吧,都是get这个get那个get什么type啊。
06:23
啊,都是这些啊,所以说这里边我们就只好自己去定义,到底里边怎么样去求值了,这个求职方法呢,必须是public的,而且必须叫做evil啊,那这里边参数是什么呢?呃,传一个string进来对吧,我们就跟文档里面写的一样啊,然后呢,返回的数据类型在这里给一个特类型返回,呃,那那当前的这个定义出来,最后里边要返回的值呢,我就直接算了啊,我们就不用详细的再挨个字符串一个一个拿了,对吧,我就直接大家知道可以调这个底层的哈希扣的方法对吧?本身是给我们提供了这个方法的啊,那我这里边呢,是基于这个哈西扣的,再去乘一个factor对吧,方说我这样,然后后边呢,再去减去1万,比方说做一个这样的一个操作,那相当于就是基于本身的哈s code又做了一个调整,对吧,根据传入的faster不一样,最后得到的结果。
07:23
也会有所不同啊,这就是我们自定义的一个标量函数,自定义的一个哈希函数啊,那接下来我们看看在代码里边到底怎么用它呢?关键是看怎么用啊,首先我们还是把前面该定义的这些呃东西都定义出来,这个我就不详细去写了,我们直接抄吧,对吧,前面的这个环境流失处理的环境,还有这个表环境对吧?啊,那后边还有这个本身这个数据的读取之后转换成样例类啊,这个我就直接全抄过来啊,甚至最后到当前的这个三这这。
08:01
好,该引入的所有的东西都引入,注意要把这个啊,为了后边我们做做这个转换啊,食转换要引入,把这两个引入好,然后接下来我们要得到的这一个调用啊,自定义哈希函数进行啊,就是对呃,我们这里面的string好像就只有ID对吧,对ID行运算,那我们一在这个的调用啊,Table API里边要调用它的话,这个其实比较简单,因为table API大家知道都是内嵌在Java和代码里面的嘛,你现在这就是一个scla里面的类呀,对吧,你这就直接定义好,就是一个类,那我不就是直接把它弄出来,这就是一个函数类,我就可以调了吗?哎,所以利用这。
09:02
思路,那这里边我们直接它的一个实例对吧?哈,这里首先首先拗一个UDF的实例。好,那这里边我们直接去new啊,然后里边传参,随便传一个,比方说我传一个23对吧,这当成我们当前的这个因子去做一个乘法操作,然后接下来大家看怎么样去做具体的运算呢?得到一个基于这个sensor table,直接去做转换select,这是最简单的一个,大家可以认为类似于map的一个操作,对吧?直接做转换,这个转换的过程当中,我要这个ID,然后比方说我还要当前的这个TS啊,直间我知道是哪一条数据,然后后边呢,直接调用哈希code对吧?然后里边需要把当前参数某一个字段作为参数传进来,那当前只有ID是这个字符串,对吧?我把ID传进去,作为参数传进去,这样就OK了,大家看就就这么简单,诶,就这样就可以把它做做这个完整的调用了,好。
10:19
啊,这是这个在当前这个table API里面的调用,那大家可能会想到,那如果要是在CQ里面调用呢,CQ里面调用怎么办?那就必须要需要在环境环境中注册udf对吧?呃,因为之前我们这个table API不用注册,是因为我们这里边本来就不依赖环境嘛,你只要有这个类对吧,我直直接能调这个这个函数,那就没毛病啊,我这个底层我知道这个skill function,它底层就是调它的定义的那个EE方法嘛,这跟我们实现一个map function对吧?Flat map function,这个process function,这不是差不多的这种调用方式嘛,其实是类似的啊,那所以接下来我们就真正看看在这个CQ里面调用的时候怎么做,那首先我们还是得把那个表先注册进去,对吧,我们表还没有呢,Create temporary view,定义出那个S来。
11:19
Sensor table还是这个流程啊,然接下来呢,要把这个当前的UD也注册进去啊,然后这里边你看传的参数是什么,那就是一个是string,就是当前function的name对吧,另外传一个哎,当前的这个skill function,当前的这个函数的类型对吧?啊那所以这里边我给一个当前这个就叫哈s code,然后呢,呃,把这个哈code直接传进去,大家看这里边我传的就是这个当前的这个函数实例对吧?UDF的这个函数类的一个实例,直接把它传进去就完事了,那后边呢,做这个调用的时候,Table,这里边就直接基于table,因为调用当前的CQ query,对吧,里边我们写一行这个CQ,呃,Select,我们想要的是ID。
12:20
啊,另外还有哈希code的,你看直接就在这个CQ里边直接掉就完事了ID对吧,就跟好像是呃,这个CQ里面已经给我们提供的一个系统内置函数一样,直接用就完事了啊啊那当然这里面大家会发现你我们前面这个table API里面,你不能像系统内置函数一样,直接就是字段点去调用对吧?呃,那个我们是没有没有去定义好的,但是可以用这种就是传参的方式去把它调用进去啊,那这里边from sensor好,这就是两种方式都做了一个实现,那最后呢,还是把它转换成流打印输出对吧?那这个过程显然没有经过聚合,不,呃,没有这个更改的数据对吧?直接按照类型直接把它做一个打印就完事了,肉啊,然后当前这个是下面。
13:20
啊,Table toend stream肉print,这个是大家看一看这个结果是不是正确对吧?最后不要忘记把这个执行起来,当前是scale function test啊,这就是完整的一个流程啊好,那我们来运行一下,看看结果效果怎么样,现在已经运行成功了,大家可以看到这里边输出的结果是什么呢?哎,就是当前的当前的这个ID,然后当前的时间TSTS为什么是这个样子了呢?哎,因为我们转成time了嘛,对吧,就是当前已经定义了时间属性之后,它是一个time stamp,然后接下来呢?呃,还有一个就是这个哈希,哈希你看我们这里边一层全是这个负数,为什么全是负数呢?啊,因为这里边大家知道,就是我们这里边要乘,就是求了那个哈希扣之后,这里边还要乘一个factor,对吧,而且我们乘一个二十三一下。
14:20
就超过了这个正数的范围了啊,所以这里边有可能就得到了一个负数对吧?啊,这个其实是可以理解的,所以这就是我们利用这个自定义的一个标量函数实现了一个求取哈希值的过程啊,其实实现调用是非常简单的,对吧,就是一个值的转换而已。
我来说两句