00:00
我们已经了解了udf当中标量函数的用法,那这种函数呢,其实是最为简单的一种一对一的转换啊,我们说它其实就是输入一个或者多个参数,然后得到一个值,每一条数据都对应着一个输出啊,那所以在表里边进行转换的时候,它是最为简单的一种方式,接下来呢,我们要介绍的是另外一种函数,表函数啊,那跟标量函数一样啊,表函数的输入参数呢,也可以是零个,一个或多个标量值,也就是说我们在进行操作的时候,当前我们对于表函数的一次调用,它是针对一行数据作为输入的啊,那这一行数据里边有可能我们又要提取出里边很多个字段啊,作为当前表函数的输入。那得到的结果呢?那就不一样了,它得到的结果经过转换之后,可以得到多条数据的输出,也就是说一行数据进来,我们可以直接输出多行数据,那所谓的多行数据,我们知道这不就是一张表吗?哎,所以表函数之所以叫这个名字,就是因为它可以返回一张表,这是一个一对多的关系。
01:10
像之前我们介绍过这个窗口TF啊,窗口的这种表值函数的计算,其实本质上就是一种表函数。跟前面我们讲到的标量函数非常类似啊,当时我们需要去实现的对应的那个抽象类是sc function,那现在呢,我们要实现自定义的表函数,也要实现一个抽象类,那就是table function,这个table function呢,里边也必须要有一个名字叫做E的求值方法,这里边就指定了我们当前处理转换的核心逻辑。那跟标量函数不一样啊,标量函数是一对一的转换嘛,所以本身的skill function这里是没有任何的泛型的,哎,那关键就定义我们当前输入什么数据,然后返回什么值就完事了,现在的表函数呢?呃,表函数我们说它是可以生成一组新的数据,哎,所以这个时候的话,我们就得定义出来你返回生成的这个新的数据到底是什么类型,什么样子,哎,所以它是需要有一个泛行参数指定返回数据的类型。
02:14
而evil方法呢,E方法没有任何的返回类型,因为evil并不能只返回一个值,它有多次输出,所以它里边是通过调用了collect的方法来发送我们想要输出的好数据啊,就每一次调用collect就相当于发出了T类型的一行数据啊,那所以这个跟我们之前讲过的这vpi里边的Fla map function或者是process里边啊,想要发出数据的时候,调用那个奥点CLA的方法是完全一样。那所以直观来看的话,当前我们对于这个表函数的使用整体来讲是差不多的啊,定义是差不多的,那接下来呢,关键还有一个问题在于,我们在代码当中注册了之后,怎么样去调用呢?怎么样在CQ当中去调用它呢?
03:04
哎,那对于一行数据,那使用了表函数之后,就可以得到一张表,我们联想的话就会发现这跟have里面的UDTF非常相似嘛,哎,那所以对于原先我们输入的整张表来讲。一行数据就可以扩出一张表,一行数据就可以扩出一张表。那对于之前的整张表来讲,这么一扩充了之后,接下来要做的事情,那应该是要扩成一张大表。这个操作怎么去扩呢?一个简单的想法就是让每一行都跟它扩展出来的这张小表做一个状语操作,哎,那这样的话就相当于一行扩展成了多行啊,然后把扩展出来的很多个每一行扩展出来的结果再拼成一张完整的大表。哎,那其实我们知道在have的CQ语法里面啊,就提供了一种叫做侧向视图啊,Let view这样的一个功能,它可以将表当中的一行数据直接拆分成多行,所以现在我们要做的这个事情其实也是差不多的啊。
04:09
Li CQ当中也是有类似的功能,哎,所以在CQ当中,如果我们想要去使用表函数的话,那就可以使用一个letter table啊,侧向表这样一种语法来进行一个实现。所以具体的调用方式我们可以看一下到底怎么调啊,呃,首先我们应该是先把这个类先实现出来,接下来呢,那就要在表环境当中去注册一个系统函数,接下来哎,我们要去调用的时候,那就是将原始数据表我们的这个my table和使用了表函数之后生成的这个所谓的侧向表table。它俩之间要做一个交叉连接啊,所以如果说我们使用交叉连接的话就知道啊,就是直接from,然后把这两张表逗号分割就可以了,Cross draw相当于就是针对里边每一行数据做一个交叉连接,就把原来的那个数据啊,针对表函数处理之后做了一个扩展。
05:09
然后除了交叉连接这种方式呢,其实也可以使用一般的照应方式啊,那比如说我们这里可以使用一个带安处条件的左连接啊,那就是直接使用我们原来这张表,Left draw left table啊,那同样这个调用还是啊,把当前我们定义好的表函数传进来,哎,那里边它的参数该传什么传什么,后边加一个onto判断条件。啊,那另外呢,对于我们当前测向表里边得到的结果扩展出来的那些新的字段,还可以去做一个重命名啊,就假如说啊,我们当前是希望把对应的这个字段要提取出来的话,哎,那这里边可以做一个重命名,就是left draw let table来对应的这个数据做了处理,然后ST,然后里边加上这个字段,就是我们重命名之后的字段啊呃,接下来如果想要去提取的话,就可以直接把这个字段提出来了。
06:05
啊,那至于这个表函数里边核心的处理逻辑呢?哎,我们看到就都是在这个E里边去实现的,那这里边E它要输出什么样的结果呢?输出的数据类型就是扩展出的那张表里边的数据内容啊,我们是以这里的泛型来指定的啊,在这儿我们可以指定一个肉类型啊,一般化的一个行类型,那当然了也可以是其他类型啊。那这里面呢,就是针对比方说啊,我们做一个SP做一个拆分,把某一个字段传进来之后呢,按照空格拆分开,每一个字段都做一个输出,哎,这样的话就是一个一对多嘛,我们这里调用的就直接调用collect的方法。生成发出的数据当然就是肉类型了。啊,所以接下来我们可以在代码当中把这一部分内容也做一个测试实现,所以接下来我们要测试的是创建一个SC的object还是udf的测试。
07:01
测试的是table function表函数。美方法先写出来,哎,那整个的处理流程其实跟之前是完全一样的啊,我们先把这个完整流程都copy过来。哎,那这里面主要就是我们要注册的就不是标量函数了,而是一个表函数啊,那下面我们首先得把这个函数实现出来实现。自定义的表函数,诶,那我们实现具体的功能是什么呢?啊,其实就按刚才我们在文档当中看到的啊,想做一个字符串的拆分,那我们到底拆什么东西呢?现在输入的这个数据里边,诶,这个URL这个字段其实是还可以做进一步的拆分的啊,因为现在我们已经把它包装成event了嘛,已经有三个字段了,而针对这里的URL字段呢,我们知道它里边可能包含了很多信息,比如说如果我们访问的是商品详情页的话,那这里问号前边的可能指的是这个商品的类别,然后后边呢,有可能就是具体的商品ID了,哎,所以我们希望把这些信息做一个拆分,然后扩展进行输出,哎,那所以这里我们的分隔符就变成了一个问号,所以在这里啊,按照。
08:13
问号分割URL字段。哎,那得到的结果分别输出前后的内容,我们直接把它都输出一遍。好,接下来我们又可以做这样的一个实现了,Class。我们把它叫做my sweet吧。我自己定义的一个分割的函数啊,Extend table方式。好,那同样这里我们需要把table function啊,还是在flink table functions下边的table function要做一个引入。注意这里我们是有泛型的,那泛型就是输出的,扩展出的数据结果,我们现在扩展出什么样的东西呢?这个分割之后啊,其实我们想到啊,把URL分割之后,其实就是两段了,这两段都是一个字符串,那我们输出呢,也不要这么简单了,我们把这个字符串以及它的长度分别做一个输出吧。
09:04
所以这样的话,简单起见,我们就可以直接把这个输出类型定义成一个二元组类型啊,那前面是一个词句,一个字符串,后面那就是一个int类型的长度嘛,接下来里边。同样我们看到啊,这样一个抽象类,它也是啊,里边没有必须要实现的那个抽象方法的,只有这个对于类型的一些判断和处理,所以接下来我们就还是得自己把这个e Val e方法做一个实现。这个方法的输入参数,哎,当然就是一个string类型的URL嘛。所以我们就是把这个字符串类型输入返回值的话,哎,那其实根本不需要返回任何的结果啊,我们是通过C的方法去收集数据去进行发送的,所以这里的返回类型就是unit啊,那这里可以直接不写,或者说默认股权就可以。然后接下来里边具体的处理方式,那我们就是针对str这个数据啊,当前的URL要调用SP做一个分割,现在我们的分割符是问号。
10:08
啊,那当然了,问号本身是正则里边的一个关键字啊,所以我们得做一个转义,两个杠转移过来,然后经过切分之后呢?哎,那后边我们可以做一个for。拆出来的每一个字段,诶,都把它包装成一行数据输出出去,所以这里面就直接调用我们看到啊,可以直接调用C方法。这个方法本身就是在table function里边已经给我们提供了的,我们看到就是这个final方法collect,它底层调用的呢,呃,就是内部的这个collect的collect啊,所以这跟我们之前所使用的那个al.collect是完全一致的。接下来呢,我们这里就collect里边当然就是一个二元组的数据了,那前面的第一个字段就是当前拆出来的这个string,后边呢,还要跟一个int类型的长度,哎,所以这里两个地方都用到了当前的这个数据,所以我们这个拉姆达表达式还是完整的写出来吧,哎,所以里边应该是一个string啊。
11:08
不要跟外面重名,我们就叫S吧。Collect的时候,前面的第一个字段就是S,那第二个字段的是它的less,是它的长度,这样就得到了我们想要输出的信息。有了这一个table function定义之后,接下来我们就可以在表环境当中去注册一下刚才的函数了,那当然这个我们不要叫做my hash了,我们叫做myli。后边class传入的也是my split。这样的话就注册好了,然后接下来我们就可以进行查询转换在CQ当中去使用了。那我们这里想要去筛选的信息呢?诶,当然可以加上当前的U的信息,UID,还有URL的信息,诶完整的URL到底是什么?另外呢,扩展出的字段呢,我们又包含了两个属性啊,那一个是当前到底是什么样的字符串,另外呢,就是它的长度,哎,那所以这两个呢,我们可以单独的做一个重命名,我们可以把它叫做一个word,一个lengths啊,那这样的话,所要提取的信息,现在我们可以更改一下,那就是u I Du URL。
12:16
以及当前的一个word和一个less,我们就叫len吧,那接下来后边from,哎,当前如果要做这个表函数的使用的话,那是要针对之前的数据表和扩展出来的侧向表做一个交叉连接的啊,那所以我们这里就是even table它要去啊,我们可以做这个left draw啊,也可以直接交叉连接,所以直接逗号分割。这里扩展出来的是一个侧向表,那是let。Table里边直接去调用我们定义好的表函数MYL传入的参数,哎,那是当前的URL,这样的话就可以得到扩展出来的对应的那些数据了,然后做了这样一个交叉连接之后,就得到了完整的这张大表啊,那当然了,这里我们还应该做一个字段的重命名啊,那就是S当前的T。
13:12
Word。Less啊,这样的话就可以把对应的字段也完整的提取出来了。当然了,如果这里我们看的不是特别的舒服的话,也可以把这里的关键字都做一个大写啊,Select,然后后面我们可以把这里做一个空行,后面我们来一个from。然后接下来这里。侧向表的一个交叉连接let table。把这些都大写出来之后,我们看的就非常的明显了。这就是整个调用的过程,然后接下来呢,得到的结果。就可以转换成流做一个打印输出。这里我们还要去思考的一点就是到底是to data stream,还是需要to change log stream。啊,那关键点还是在于啊,我们进行查询转换的时候,有没有做更新操作,是不是更新查询?哎,那我们直观上去看的话,现在其实就是原始的这个even table里边每一行数据扩展出了两条嘛,多了一个word和lengths这样的一个字段,哎,那接下来每来一个数据就扩展出两条,每来一个数据就扩展出两条,那所有的数据来了之后,很显然都是在后边追加两条就完了,只有追加查询,没有更新操作,哎,所以现在我们直接to data stream转换成流打印输出是完全没有问题的。
14:30
整个的处理逻辑我们就都已经完成了,哎,那如果说我们直接想要去运行,然后看一看测试结果的话,那会发现会报错,问题在哪里呢?主要其实是在于这里自定义的表函数,我们指定泛型的时候,直接给了一个二元组类型。这里如果直接给元组类型的话,哎,那会带来底层进行类型解析的时候,没有办法直接解析正确,所以在这儿的话,最好我们不要直接这样去定义。
15:01
那这里我们定义成什么呢?诶,那就是我们在文档里边提到的,可以直接把它定义成肉类型嘛,啊,一个行类型,其实我们知道行本身就是在表里边通用的一种数据类型,可以认为就是不同字段组成的一个多元组,哎,那所以当前如果我们直接把它定义成肉的话,哎,那在上边再追加一个function hint啊,就当前我们函数的一个标注,单独的指明它的数据类型是什么,我们看到指明的是肉,然后里边有两个字段,这里还可以给它名称啊,一个叫做word string,另外一个叫做length in。这里需要注意的是,在这个肉类型里边,它的每一个字段必须得是包装类型,诶,所以对于这个string这是没有问题的,那对于int呢,Int本身是一个基本类型嘛,所以我们还要做一个包装int.box得做一个这个操作啊,这个看起来稍微有点麻烦啊,我们可以照着这个重新实现一下。上面这里改成肉,我们需要去引入flink types点肉。
16:04
然后接下来这里的collect的话,就要把它变成一个肉点,Of里边输入的第一个字段是S啊,那第二个字段呢,要做一个int的包装,Box里边传入的是s.length这样的话就可以。那上面呢,还得加一条标注,指明当前的类型是什么,这个稍微有点麻烦啊,就里边的这个写法都是固定的,就是output到底等于什么?这里边呢,又了一个data type paint在里边以字符串的形式指明了我们当前输出的数据类型。所以在这我们可以直接把它复制在这里,这里的type hi需要去做一个引入,另外function hi也需要去做一个引入。啊,那这里名字如果我们跟上面一致的话,直接把这个叫做len就可以了。这样的话,整个处理流程就已经通畅了,我们可以运行一下,看一看得到结果到底是什么样。
17:02
我们现在还是直接读取这个文件里边的数据啊,现在看到输出的数据比之前似乎就会多一点了啊,我们看到啊,Mary这条数据,哎,那一条输入就一个输出,Bob这条数据,一条输入一个输出,接下来又来了一个爱丽丝的访问数据,我们看到他访问的是一个商品的页面,这里面有问号,所以呢,就把它拆分成了两条数据输出了。这里面我们看到输出的这个word,那是前面一部分点杠product prod,然后它的字符长度是七,后边呢,在这个问号后边呢是ID等于一,它的字符长度是四啊,这里边我们没有特别实际的意义啊,但是明显看得出来表函数的作用能够把一行打散了之后拆分出多行,对于表进行一个扩展啊,这就是表函数的使用。
我来说两句