00:00
我们已经了解了flink CQ当中的系统函数啊,那这一部分确实是数量非常的多,功能非常的强大,但是呢,啊,系统函数再多也不可能涵盖我们所有的需求啊,但那如果说我们遇到了一个特殊的需求,现有的系统函数没有办法直接实现,那这个时候怎么办呢?哎,那当然了,这个时候我们还有对应的接口可以去做一个自定义函数的实现,这就是所谓的udf,哎,那事实上现在的这个系统内置函数啊,还在不停的扩充,哎,所以说如果我们认为啊,自己开发出来的udf足够通用,比较有意义的话,诶,那就其实可以直接提出这样的议题,然后请求把自己的UDF添加到系统函数里面,哎,那后面就大家都可以去使用了。啊,那现在弗Li CL当中定义的这个自定义函数的类型啊,其实就是通过不同的接口,以抽象类的形式来定义出来的,那主要有这样的几种类型,一种就是前面我们提到的标量函数,非常简单啊,那就是将输入的一个或多个标量值转换成一个新的标量值,简单来看的话,这个其实就是一条数据输入,里边可能提取各种不同的字段,然后呢,转换成一个新的输出,它是一个一对一的转换。
01:18
然后另外还有一个我们前面比较熟悉的聚合函数,聚合函数every function,那就是多行数据里的数据提取出来,转换成一个固定的值,一个新的标量值,所以它是一个多对一的转换。这两个在系统函数里面我们都已经做过对应的介绍了啊,然后现在呢,还有两个比较特殊的类型,一个叫做表函数tableable function table function,其实前面我们讲到窗口的时候也提到过啊,窗口本质上来讲,我们说它就是一种表值函数啊,那它的做法是什么呢?其实就是可以得到一个新的表啊,具体操作呢,就是针对一条数据,我们可以把它进行转换,拆出多个新的行数据来,然后扩展成一张表。
02:06
啊,所以这个表函数呢,理论上它可以实现一个一对多的转换,反过来了啊,就是之前这个聚合函数是多对一,现在是一对多,它最后的返回结果就是一个table,就是一张表。最后呢,还有一个叫做表聚合函数,表聚合函数的含义有点像是表函数和聚合函数的一个结合啊,就是首先它呢最后的返回值也是一个表,其次呢,在这个转换计算的过程当中,还需要把之前多行数据里的值,然后进行一个聚合统计,哎,它就不是针对像之前这个表函数啊,它是每一行一对多的一个转换,它现在就变成了一个多对多的转换啊,那所以这个表聚合函数可能就会非常的复杂。不管这个哪种类型的函数啊,在使用的过程当中,其实整体的调用流程都是一样的,所以我们先来看一下它的整体调用流程到底是怎么样的。
03:04
如果我们想要在代码当中使用这个自定义函数的话,那首先啊,我们说这个对应给出的这几种类型都有对应的接口,是以抽象类的形式定义出来的,所以呢,我们首先应该要实现对应接口对应的这个抽象类,然后呢,哎,我们要在表环境当中去做一个注册,注册之后接下来就可以在table API和CQ当中去调用这个函数处理数据了。所以整体的这个处理流程啊,应该分成这么几步操作,首先当然是先把那个类先实现出来了,实现出来之后,接下来我们就是要去注册一个函数,注册函数的时候呢,调用的是table env当前表环境的create temporary system function这样一个方法,那一看这个方法,其实我们就想起来之前去注册表的那个方法了啊,注册表的时候其实就是create temporary view嘛,创建一个临时的视图,就可以把一个表对象传入进来,一个table的对象传入进来,注册到表环境里面,接下来就可以用。
04:09
那同样现在也是这样,现在呢,就是要在表环境里边注册一个方式,注意呢,这里注册的时候传入的并不是它的对象,而是传入的是当前我们这个类的class对象,哎,那所以注册完这个函数之后,接下来我们就可以去用它,这里需要注意我们调用的是create temporary system方式,那自然就想到了这注册的是一个系统函数啊啊,那所以这个应该是叫做一个临时的系统函数,也就是说我们所有的那些之前能够调用的系统函数呢,那是内嵌在。我们整个系统里的啊,内置的永久化的系统函数,随处都可以访问,随处都可以调,现在呢啊,现在我们是临时性的注册了一个系统函数,它的作用范围就跟之前的系统函数一样,全局可用。那如果说我们不去注册系统函数,那去注册什么呢?也可以调一个create temporary function方法,那它注册出来的就是一个当前目录下当前数据库内的一个函数,这个函数就叫做catalog方式了,啊呃,就是依赖于当前的数据库和目录,它的完整名称就应该包括当前的catalog名和database名。
05:24
所以一般情况下呢,我们也没有必要单独去指定它的作用范围,只能在当前数据库里面用啊,那所以我们直接调用这个create temporary system方式,注册一个临时的系统函数就可以了。注册好了之后,接下来就可以调用了,调用方式呢,又跟你到底使用table API还是使用CQ有关,之前我们说了啊,如果是使用table API的话,那其实调用的时候是一个对象的方法调用啊,那所以这里边我们要调的话,可以首先获取到当前一张表对应的这个实例它的对象,然后呢调一个select,我们想要去做一个映射的投影,里边要筛选的字段我们可以看到啊,是针对这个my field的这个字段,还要做一个函数的处理转换。
06:14
哎,这里我们定义的这个函数就叫做my function,所以呢,我们看怎么样去针对my field做一个转换呢?My field相当于是参数嘛,所以这里面我们是直接要调一个靠方法直接去做一个调用。这个方法有两个参数,那么第一个参数就是当前注册好的函数名称来function,第二个参数呢,其实是调用这个函数时候要传入的数据参数,哎,那所以其实就是针对这个my field调用my function,得到结果就是我们想要去提取出来的字段。哎,那另外呢,在这个table API里边也可以真的不去注册函数,就前面我们说是必须先得做这样一个注册了,那不注册能不能直接用呢?其实也行啊,就是这个需要我们在靠的时候传入的不是注册好的注明,而是传函数类对应的这个class对象。
07:08
啊,那当然了,这种table API的方式啊,稍微的有一点,呃,违反我们的直觉,而且也比较麻烦,所以一般情况呢,我们还是直接在CQ当中调用就好了,比如说哎,刚才这一个例子,我们不就是想要去使用my function去做一个转换,哎,那针对哪个字段做转换呢?针对my field做转换,所以我们就直接可以写成在CQ里面写select my function括号my field然后from my table就可以。所以这个其实调用就非常的简单,就像调用一个系统函数一样啊,那之前我们针对这个URL做count的时候,不就直接是count,然后括号里边URL吗?哎,所以这个其实是非常简单的啊,跟系统函数是完全一样。所以后边我们所讲解的内容就主要都是以CQ当中的调用为例的。
我来说两句