00:00
我们已经了解了UDF在代码当中整体调用的流程,那接下来呢,我们就分门别类,分别来介绍一下udf到底怎么样去使用。首先就是最简单的一种类型,标量函数sc function,前面我们提到了啊,标量函数呢,可以认为就是一个一对一的转换,这里所说的一对一转换呢,主要说的是针对输入的一条数据啊,那流里边的一条数据对应在表里边就是一行数据,经过这一个标量函数转换之后呢,对应的输出还是一行数据,还是一条数据,哎,那所以它就是一个一对一,类似于map转换。而对于具体的标量函数呢,它所实现的功能那是有可能哎,它需要去输入一个或者多个参数的啊,那也有可能不输入啊,所以是零个一个或者多个标量值作为输入参数,这些参数当然都是从当前这一条数据里边提取出来的,可能是不同的字段,然后经过转换计算之后呢,得到的就是一个唯一的标量值,这就是标量函数的概念。
01:09
那如果说想要实现自定义的标量函数呢,那我们就需要去自定义一个类,这个类呢,需要去实现啊,继承抽象类scale function本身这个类,这就是Li当中给我们实现的标量函数的底层接口。那在这个类里边,我们需要实现一个叫做E的求值方法,所以本质上来讲,标量函数的行为就取决于这个evil方法的定义啊,那这个方法呢,必须是公有的,必须是public的,而且名字必须就叫做evil。这里需要注意的是,这个方法可以重载多次啊,就是它的传入的数据类型,包括这个数据的参数的个数都可以单独去指定。所以接下来我们就可以用一个简单的案例啊,在代码当中去做一个测试了,我们可以新建一个测试的object。
02:04
我们现在要测的是udf test。现在测的是标量函数function。没方法先写出来,哎,那前面的流程其实整体都是差不多的啊,所以我们就直接找一个来copy一下吧,比方说我们找这个top这里边啊,创建表执行环境,然后接下来呢,创建一张表,从文件里边读取数据,先把这些都拿过来。然后接下来创建表,这是我们的第一步操作。然后接下来当然就是要针对输入数据去进行查询转换了啊,那我们这里的查询转换呢,需要使用自定义的函数,一个标量函数,所以这里面我们需要定义出来之后,还要注册嘛,先得在系统当中注册,所以注册。标量函数,我们先把要做的步骤都列出来。那后边呢,当然就是调用函数。
03:00
进行。查询转换,最后我们可以把得到的结果表打印输出,当然了,最后应该是转换成流打印输出,最后我们再加上一个envecu,因为我们是转换成流进行处理的嘛,这就是一个完整的流程啊,当然这里的关键啊,在这个注册标量函数之前,我们得有对应的标量函数啊,哎,那就是要去实现一个skill function,所以接下来我们去实现自定义的标量函数,那这里我们可以想一个具体的场景啊,只是做测试的话,我们简单想一个这样的转换函数,就把一个字符串的字段作为输入,然后呢,诶,我们就计算一个它的哈希,我们自定义一个哈希函数,返回它的哈希值啊,那这就是一个非常简单的一对一的转换啊,标量转换,所以我们自定义一个。哈希函数。
04:02
Class我们可以把它叫做呃,麦哈希。然后接下来extend,现在要实现的是scale function,我们需要引入啊,这里引入的当然是flink table functions下边的function。然后我们会发现啊,这样一个抽象类,它本身没有泛起啊,所以我这里边我们不需要有对应的这个类型参数啊,然后啊,它本身继承自user方式udf啊,那自然其他的那些类型的接口啊,也需要去继承自user DeFine方式,这里边是其中的一种,然后我们可以看到在这个里边啊,它里边所提供的方法啊,一个get,看哎,这里边是有具体对应的实现的。另外还有一个是get type influence,相当于只是跟当前的一些类型相关的方法,而且它有对应的实现,那我们知道这是一个抽象类啊,那我们必须要实现的定义它行为的那个抽象方法到底在哪里呢?之前我们说过那个方法就叫做evil evaal啊,但是呢,呃,这里边它没有给我们直接定义出来,声明出来啊,所以这其实是CQ啊,不够完善的一个地方,我们就只能自己去写这样的一个evil方法。
05:19
D evaal,注意,这个名字必须叫做evil,那对于他来讲啊,我们输入的参数当然就是提取一个string类型的字段了,我们就叫做str。String啊,那至于它计算出来的哈希返回值,哎,我们可以定义直接一个int或者是长长形了,都是可以的,具体的实现过程啊,这个我们可以自己单独的定义一个哈希算法啊,但是没有必要,我们这里只是想做一个测试,这个具体的流程并不重要,所以我们干脆就直接返回啊,其实我们知道STEM这个对象啊,本身就有一个哈扣的方法,我们直接把它哈扣的返回不就完事了吗?啊,所以这样我们一个自定义的标量函数就实现了。
06:03
接下来我们就是需要在表环境当中去注册一个标量函数A,注册函数的时候是table env去create,注意现在我们要创建的是一个temporary啊,可以是function,也可以是一个system function,我们说一般情况直接指定一个system function就可以了。好,那里边呢,需要传入两个参数,一个是一个词string类型的当前注册的函数的名称,我们把这个就叫做my test。另外还有一个就是我们定义出来的udf类,它对应的class信息了啊,那所以这里我们可以直接调一个class啊,那class of传入my hush。这样的话就注册好这个标量函数,接下来就可以调用这个函数进行查询转换了,啊,这个查询转换的过程呢,Table CQ query。里面写一个CQL啊,我们随便吧,比方说直接select u ID啊,User拿出来,另外呢,就直接调用my hash,针对当前的UID做一个转换计算啊,那from。
07:11
当前这张表叫做event table,非常的简单,直接转换出来输出就可以了啊,那当然了,得到的这张表我们可以叫做result table。接下来结果需要去做一个打印输出,因为我们知道这是一个一对一类似于map的转换嘛,显然它是一个追加查询,不涉及到更新,所以我们可以直接调用table en的to data stream方法,然后把result table传进来,做一个打印输出就可以了。好,接下来我们可以运行一下,看一看得到的效果到底是什么样的。这里我们的数据比较少,只在这个文件里边有几条数据,哎,我们可以看到啊,当前的数据输入进来之后,如果是Mary的数据的话,那么就输出的是Mary他的这个名字对应的一个哈希值,如果是Bob的话,哎,那就跟他不一样了,爱丽丝不一样了,我们看到如果同样是Mary的点击事件的话,当然输出的哈希时就是一样的,那同样Bob的点击事件输出哈希时也都是一样。
08:16
这就是标量函数的具体实现。
我来说两句