00:00
好,我前呃,我们大家前面已经知道了这个标量函数的用法,它的做法其实相当于是把这个对应的字段参数输入之后,直接得到一个标量值,也就是说只是一个求值的函数啊,那大家可能会想到,假如说我想得到的数据并不简简简单单的就是一个值呢,啊,假如说我我这个想得到的这个数据有可能是呃,好几个值,甚至有可能是好几行,是一张表呢。哎,这种情况下,那就就要用到另外一个比较复杂的udf,所谓的表函数table方式啊,大家看那个所有的这些udf好像都可以叫表函数,对吧,但是在table API里边表函数啊,这个table方式是特指这里边的我们这种udf函数,它的特点是。就是把也是啊,跟这个scale function是类似的,它是把这个标量值一个或多个啊作为输入参数,然后呢,它的不同就在于输出不一样,它可以返回任意数量的行作为一个输出,而不是单个的值。
01:09
啊,所以后面我们还是看它到底怎么用啊呃,为了定义这个表函数,我们同样还是要扩展一个,它的鸡肋也是在这个flink table function里边的定义的,这个table function也是一个抽象类啊呃,那么下边我们如果要是想要实现它里边的那个求值转换方方法的话,同样也是要有一个evil函数啊,大家看这个定义方式跟那个skill function是完全一样啊,而且这这个方法必须是public的啊,就不能是private。呃,我们看这个具体的用法其实还是啊,Extend table function诶,这里不一样了,这里边有了类型的定义。啊,有类型参数了,有泛行了,而下面的这个E呢,就没有没有返回值了,那这是为什么呢。这是因为就是当前我们要输出多条数据嘛,所以相当于是这边我们来了的,呃,这个一行数据里边的某几个字段提出来作为这个输入参数,对吧,输入到这个E里边,然后我最后是要直接转换成。
02:14
多行数据,多条数据,那类似的,呃,可以认为它就是一对多的一个转换,对吧?啊,那前面我们那个sc function的话,就有点像是一对一的转换,所以如果要是拿一个例子来对比的话,这就有点像,有点像我们那个sc function是map对吧,做了一个一对一的这个操作,而。Table呢,有点像一个flat map啊,有点像就是你输入一条数据出来之后,打散了好几条啊,变成一个表了,类似于好几行数据,那可不是就变成一个表了嘛啊,所以它就叫做表函数,对吧,它叫做table方式,那接下来给大家做一个具体的示例啊,就是我们这个。在PPT里边给大家实现的这个事例啊,他是做了一个什么呢?做了一个呃,Split。
03:02
它是要什么?就是传进来一个字符串,我们把它按照某个分隔符这个类定义好的某个分隔符。前后切开,然后切开之后呢,我要统计前后每个词的啊,就是当前的这个词是什么,然后它的长度是多少,就占了几几位字节对吧,就统计这个啊,所以这个有时候在哪些地方有用呢?呃,比方说像我们。呃,比方说啊,就是user ID或者说某一些ID,我们定义的时候,它可能是一组名对吧?啊,就是前面某个ID,然后再下划线,呃,然然后来来一组什么什么样的一个ID,然后再下划线,后面又来一组,我们是那个分级的分层次的那个ID的定义,这样的话,我们可以就是给他做一个截取,对吧,截取出来再做分析啊,这就是相当于我把输入的一行数据就要把它拆开,拆成好几行啊,然后接下来就可以相当于。转成了一张表了,那这里面还涉及到一个就是你能把它相当于一行拆成多行啊,能能转换成一个表,那具体要用的时候怎么来用呢。
04:11
哎,这个大家就想到了。之前在这个C课里边应该是讲过有类似的这种,就是所谓的侧向连接,或者叫横向连接这种,呃,Join对吧,Let view的这种用法。它的用法其实就是说我可以基于原先表里边的每一行,然后呢,呃,做一个类似于一一行转多行的这样一个扩展啊,然后接下来可以跟之前的表做一个连接,哎,之后就可以得到更多的信息了。呃,那那大家类似的这个用法就有点像,可能大家对这个explode更熟悉,对吧?啊,有有点像那种用法一样,就是一转多,然后我们再连接起来做操作,如果大家之前用过这个呃,Later view的话,可能对这个理解会更加的清晰啊,所以如果我们在实际使用它的时候,那可能得结合cable API里边给我们提供的一个draw方法,叫做drawing letter啊,或者是用这个左外连接的later,这个侧向连接就是left alter drawing,用这两种方法把它传进去来做一个计算。
05:22
光这么说可能还是有点呃,就是不太理解到底是什么玩意儿是吧?说了这么多理论,我们还是在代码里边来实现一下啊。新建一个,在当前的这个包里边新建一个object。当前这个我们就叫做table function。Test。呃,同样前面这个还是main函数里边,跟之前几乎一模一样,对吧?呃,这个我们该有的东西还是全拿过来,一直到转换成表这里啊呃,时间字段我们也还用那个事件时间,然后接下来上边把这个。该转换的影视转换先引入对吧?啊,这些就把这个前期啊,我们应该做的事情先先做完,然后现在得到的这个表,我们就直接可以做一个这个table API的调用了,那后面如果说我们还想做这个,呃,就是CQ的直接引用的话,那还是一样要注册对吧,注册表,注册函数啊,然后接下来我们是要实现一个就是拆分字符串的转换成多个字符串,而且有它对应的那个字符长度,呃,把它解析出来的这样的一个操作啊,所以这里边我们自定义。
06:35
自定义table function。实现啊,分割字符串。并统计。长度字符串长度对吧的这样的一个功能啊,这个尽管比较简单啊,大家只要看一下它怎么用就可以了,所以我们就实现这样一个sweet。
07:00
呃,Split啊。那同样这里边我们要把那个分割符传进来,就是外部可以传这个,这个叫separate啊,分割符本身也是一个string。当前是一个table。注意后边它就需要有这个泛型了,对吧?这边我们引入的还是flink table function下边的table function,然后这里边它本身这个类型就是一个类型啊,那这个说的是什么?就是我们最后转换出来之后的,我们这不是tableo方阵嘛,相当于一行数据经过这个函数转换出来是一张表吗?啊,那你得定义出来这张表里边的数据结构对吧?啊,所以这里边如果说我们最后要的就是。呃,就是这个一个word一个一个长度啊,这个lengths的话,那我们这里面其实就是一个二元组了,一个string一个int够了。好,先把它定义好。然后接下来还是同样的必须要实现一个E方法对吧?啊做计算这里边我们传入什么参数呢?呃,那应该还是当前的那个string对吧,一个value或者说叫一个string吧,因为我们现在切割它嘛,就叫做string。
08:16
啊,那同样它就不能有这个输出值,呃,因为当前这它的输出是本身在这个外部,这已经定义好的,对吧?呃,所以当前的这个就有点像我们那个flat map那个操作一样,大家还记得map function的话,里边的map方法,这个大家给大家回忆一下啊,我们在之前那个transform data threepi里面。这里如果我们直接做这个map操作的话。里边传这个map function的话,大家看这里边我们直接map function是要返回一个输出类型的,对吧,如果我们输出int类型,这里就返回一个int值,但是如果是这个flat map的话。Flat map传这个flat map function的话里边,诶大家看返回就是void对吧。
09:06
啊,它是靠什么来输出呢?靠它的这个collect,就是我们调的时候是alt.collect直接输出。啊,所以这里边也有点类似啊,这个table function,这个表函数啊,自定义的表函数,在这个使用的过程当中呢,呃,它就是也是这个evil方法,就不再直接输出一个值了,而是通过什么呢?尽管这里边参数我们看不到,对吧,没没有能看到对应的这个参数是什么,它同样也有一个collector,它就叫做。呃,就叫做collector,大家看有这个东西对吧?啊,当然这里面我们不是就是直接用这个collector啊。这里面大家看它本身是一个protected这样的一个类型。但是它里边给我们提供了一个什么呢?提供了一个方法就叫做,诶这写错了啊。提供了一个方法,就叫做collect啊,所以我们可以,呃,当然我们也可以直接调用collector点对吧,因为我现在已经是在这个类里边嘛,所以说你直接调也是可以的,或者你就直接调collect collect方法它本身也调的是collect collector.collect。
10:12
直接把它掉这个输出,输出一行数据就可以了,对吧,这就是output肉啊,发出一条一行的这个输出数据啊,所以接下来我们要做的其实就是调这个CLA的方法输出,那这里边我们得做的是把这个string做一个切分,对吧?啊,那这里边我又偷懒了啊,直接用这个系统已经给我们提供好的SP方法,把separate传进去,做一个拆分,拆分之后的每一个呢?哎,那就是每一行都要转换成一个word和lengths,然后输出到我们的这个输出表里面啊,所以这里边其实是直接做一个for each计算就可以。每一个word。把它做转换,哎,那就直接输出对吧,直接调这个class的方法输出一行数据,我们当前的行就是一个二元组,当前的word和word的length。
11:05
大家看这样就把这个功能实现了啊,其实也是比较简单,因为我们这里边已经用了很多系统实现的东西,我们稍微包装了一下,做了一点调整,给大家来说明它到底是要干什么事儿啊,这就是把一行数据拆开,然后输出了多好对吧?啊,所以这是类似于一个测写的这种这种状态啊呃,那那接下来我们就是再看看在代码里面到底怎么用它了。首先还是要先定义对吧,我们这个,呃,这样啊,Table API调用。呃,这个其实不光是这个,呃,Table API调用的时候,我们要先创建实例,即使是后面注册我们也要创建实例,对吧?啊,所以说这一步其实是通用的啊,我们先创建一个udf。对象好,这里边,呃,我们这个就叫做split吧,就叫一个名字等于new一个split,然后现在我们要按照什么来做分割呢?诶,我们现在字符串好像只有那个ID34ID啊,那我们就拿那个下划线做分割吧,对吧,里边好像就有一个下划线是这样的一个连接符的一个标志啊,所以我们就用这个下划线做一个分隔符啊接下来。
12:24
Table API调用的过程,我们定义这样的一个result table啊,我就不写类型了啊,大家知道都是table sensor table啊,那大家看下面调用的过程当中,就一定要用到一个,大家看一个drawing lateral这样一个方法,对吧?因为你如果要是drawing表的话,大家知道肯定就是呃,传一个table进去就完事了嘛,对吧,那现在我们要用这个侧写函数啊,类似于我们做一个这个表的扩展,那这个draw,怎么draw呢?就得用later draw影的这种方式。Draw letter啊,把它里边传大家看,要传一个又是一个expression,对吧,这个expression是什么呢?其实就是我们这里定义的这个TABLEFU1定要用它,就是这里面要的一个expression啊,所以比方说这里我可以怎么样split直接调啊。
13:16
ID传进去对吧?呃,直接用这个方法把这个我们不是说它本身就是得到的就是一个表嘛,对吧,一行对应的就是一个表嘛,啊然后我用这样的一个侧写的这个方法把这两个表join起来,那相当于不就是每一行都扩展成了好几行嘛,啊对吧,这样的一个little draw,其实就是一个交叉连接对吧,就像相当于一个cross draw一样啊啊,直接就把它每一行都对应的扩展开了。那这里边为了最后我们输出结果比较方便一些,大家看到他得到的这个结果其实直接就是一个table对吧?啊,所以这里边如果说我想把里边的某些字段再单独的拆选出来的话,那这里边我可以对这个。
14:01
做一个重命名对吧,这里边as,我我这里边split之后得到的这个表里面的字字段,我叫什么名啊,比方说这里边我就叫做word。Word和LS对吧。哎,就把这个先定义出来。把把这个做一个重命名对吧,这样啊。这个是呃,侧向连接。然后。应用table function。然后得到的字段还要再做一个重命名,对吧,这个就不写了啊。最后我们做一个select,提取出想要的字段啊,这个就是ID,我们想要对吧,看一下,然后当前的时间戳是哪个,我们拿出来,呃,那对应的我可能还关心当前的word到底是什么,其实大家知道都是这个ID里边的嘛,对吧,一个word一个lengths,按照那个呃,下划线去分割的话,那其实前面所有的第一个词应该都是三四啊,第二个可能是一,可能是十,对吧,可能是六,可能是七啊这个就是看各自的不同了。
15:09
我们看一下分别统计出来的结果是什么啊,那这里边还是后面就是转换输出了啊,Result table to append stream,然后。还是直接定义一个肉类型?呃,这个直接打印输出啊,Print result。前面这个肉。引入进来type肉对吧,后面不要忘记env execute,当前这个是table function。Test job。啊,这就是我们这个完整的一个程序流程。啊,那同样在这个里边,我们还可以把对应的那个CQ的写法也写一遍啊,这里边我呃这这个CQ的方法,其实呃,做这个操作的话啊,就是要写一个这个呃,Letter table这样的一个一个方式啊,我们这里边。
16:04
写一个CQ调用。相当于我们在复习这个CQ了啊,这边不是先定义这个结果表,我们要先把对应的表,呃,当前的那个S啊,输入的表和当前的这个sweet这个函数先要注册在环境里边,所以先调用create temporary还是一样啊,这个sensor。Sensor table注册进来表先注册进来,然后呢,Register function当前就叫做split,我们直接把split传进来,对吧,注册这两个东西,然后接下来resultq。Table。同样的啊,定义一下,用这个table env的q query方法。写一个CQ啊,那这里面我们要筛选的啊,Select呃,这个ID呃,我们还要什么来着TS对吧,然后word还有这个length啊,要这么几个字段啊,然后from啊,这里面要注意from的话。
17:14
其实是什么呢?其实就是sensr这个表跟哎,它的这个侧写表的一个交叉连接,对吧?啊,就是一个laal table的一个交叉连接,所以这里边直接我们直接用逗号啊把它连接,这相当于是cross drawing嘛,啊,那这里边是LA lateral。Table。诶,这里边这个little table里边传什么呢?啊,这里边当然传的就是我们的那个table function了,Split ID对吧?这就是当前我们得到的这个表啊,当然这里还应该再重命名一下,因为后边我们还有这个字段提取,对吧,你不重命名的话,根本不知道这是什么,所以这里边as我们把这叫做sweet ID,这这个表对吧,里边是。
18:02
Word和L。哎,这就是这样的一个定义方式。啊,这样把它提取出来,跟我们前面的这种这个呃,Let join的这个table API的写法是一样的啊,它其实是把这个呃,就是表函数啊,Table function应用在当前的这个表上面,每一行上面得到了一个它的这个侧向的表,对吧,侧写表,然后呢,把当前自己这个表跟他的这个测写表再连接在一起,这相当于就是用一个交叉连接cross。啊,或者是用什么呢,或者另外这里给大家说一句啊。就是除了这个照应这个方法可以调之外,还有一个方法。大家先把上面那一句注掉啊,还有一个方法叫做呃,Left,大家看alter left alter drawing letter,左外连接它的这个侧写表对吧?呃,侧向的左外连接这个其实其实最后的结果也是一个类似于交叉连接,因为大家想左外嘛,对吧,左边我们的这个左左边表不就是当前本身这个表吗?啊,然后所有的这要都要保持,然后跟它扩展出来那个表去做连接,那最后得到的还是一个cross。
19:18
啊,所以这个大家大概知道就行啊,为什么会保留这两个这种写法呢?呃,这就可能还是跟这个之前的API定义啊,调用的时候他们可能用到这些需求就把它都定义出来了,在table API这里啊,可能我们平常只用一种就可以啊,大家不用掌握那么多,大概知道就可以了。然后下面同样还是把这个to paint stream。打印出来看一看啊,肉。Print当前的这个是QQ result啊,大家看一下这两张打打印出来的这个结果是不是一样啊好,运行一下看一下。我们看一下这里的这个输出结果怎么样哦,大家看输出了一堆啊,那这里边我们的做法啊,大家看这个result,这里边输出了两两条,就是三四十一这条数据输入进来之后,大家看19对吧?呃,然后我们看一共输出了四条result,呃,这个流里边两条,然后CQ里边两条。
20:21
他们看起来这个结果都完全一样啊,所以说我们实现的功能确实是没问题的啊,用table API实现了一次,CQ也实现了一次,得到的结果一模一样。那我们来分析一下,这里面这个结果是什么,我们当时要的这个东西是。ID。然后TS啊,这个我们看出来了,341第一个数据对吧,然后这里边是它的时间戳,后边我们要的是当前拆分之后的word和length。所以我我们在这里看到,诶,第一个拆出来的是三四,它的长度六个字母嘛,所以是六,然后第二个呢,是1341嘛,哎,这里它也是一对吧,长度是一啊,所以这个C口输出是一样的啊,第二个我们进来的数据是346的数据,那这里边一拆啊,三还是六,哎,这个不是后面的这个六啊,这是说三四这个字符本身是六对吧,六六个长度啊,然后是六,第二个字段是六,它是一个长度。
21:18
啊,那同样的后边我们这个347,大家看到进来之后也是啊三四它的长度是六七,长度是一,那对应的到这个三四十的时候就不一样了,三四十,哎,它是三四,长度是60,长度是二,对吧,这非常符合我们的预期,得到的就应该是这样的一个结果。啊,所以通过这样的一个表函数,就实现了把一个表每一行都转换成表,然后再做连接,就就实现了这样一个呃,字段扩展,然后去拆分的就是一行转多行的这样的一个功能。
我来说两句