00:00
接下来我们再讲第二种udf函数,这种函数呢叫做表函数,它就叫做table function啊,那这个听起来看起来这个操作是比较呃,比较一般化是吧,Table方式,但其实不是的啊,它主要的特点是什么呢?它是同样也是以这个多个标量值作为一个输入参数,但是它输出的结果跟我们之前的那个标量函数不一样,它是什么呢?可以返回任意数量的行作为输出,而不是单个的值。所以它返回的是任意数量的行,大家想想这是什么含义,这就是一张表啊,对吧?多个行,这不就是一张表吗?所以它其实是可以直接把我们的一行这个数据输入啊,一条数据,一行数据直接扩展成一个表的这样的一个函数啊,大家想一想,提到这个概念能想到什么东西呢?大家是不是就直接想起了,之前我们在应该讲过这个爆炸函数对吧?啊,应该讲过这个测写函数其实是类似的一个概念啊,所以大家可以结合起来看,如果说之前我们那个标量函数function是就是输入一行数据,然后得到一个值一对一的关系,类似于一个map操作的话,那这里边的表函数就相当于变成了一个一对多行对吧?一行对多行,输入一行数据,得到的变成了一张表,所以我们把它叫做表函。
01:32
数table播方式啊,那这种一对多的输出有点类似于flat map,也非常的类似于之前之前大家非常熟悉的这个测写函数啊,所以这个大家可以整个就是结合起来看啊,那具体实现呢,跟skill方阵一样,也是你得实现一个抽象类对吧,这个抽象类就叫做cable function啊,然后必须要实现它里边的一个求职方法,也叫做evil,这个都一样,对吧,必须得是public的,另外呢,必须命名为evil,然后注意啊,如果是table的话,这里的evil不能有返回值,必须是unit类型的返,呃,这个返回值那大家就会想到了,那你这里边我们的,呃,那个多行数据怎么返回呢?这张表怎么返回呢?啊,因为你是多行数据一张表吧,确实是直接定义这个返回值,不好返回对吧?哎,那大家还记得我们之前那个Fla map怎么返回数据吗?怎么样发出数据吗?哈,用了一个clap。
02:32
就用了一个collector对吧?调用那个collector的collect的方法就是告诉我们,哎,当前的这个数据相当于发送出去,写入到那个输出缓冲区里边,这里边的方式几乎是一样,对吧?大家看这里边也是调用一个,当然我这里边不知道是调的谁的方法,对吧?哎,这就是table方式里边自己有一个对象,就是一个collect,我直接调用它的方法,Collect直接输出,哎,比方说哎,那那这里面我输出的这个类型到哪里去定义呢?输入的类型还是在这儿对吧?呃,你输入哪些字段就在这儿,那输出的类型在哪里呢?在table function本身的这个泛型里面去定义,比方说我这里面定义的是一个二元组,对吧?诶,那这里面我就包装成二元组,每来一个,这里面for each了,大家看啊,直接SP之后for each哈啊打散了之后,然后我们直直接挨个输出直接COLLECT2元组数据啊,所以这个过程就是类似于我们之前这个啊大。
03:32
他熟悉的这个测写函数啊,那具体在呃代码调用的过程当中,如果要是想要去呃用这个呃tableable API,或者说是用这个CQ调我们自己实现的这个table function的话,那同样也是要用到一个就是所谓的这个呃就是就是就是侧向连接对吧?呃draw就是类似于我们的那个侧写,侧写的那个过程啊,做一个侧向连接才能够实现啊,所以接下来我们在代码里边给大家看一看这部分内容怎么样去做好,还是重新在代码里边下边包下边新建一个object,当前这个我们叫做table table function test,好,然后接下来前面的内容我们这个就不需要说了,对吧,这个都是完全一样啊这个环境,然后定读取数据外部成样例类,然后我们定义成表。
04:32
对吧,直接到这里通通的全部引入,同样还是为了后边我们转换方便,把这里的这个影视转换依旧引入,然后接下来我们就要自定义一个table function式了,自定义table function,然后我们这里边定义的这个table方式要做什么,要做什么打散的转换呢?哎,这里边给大家提供一个思路,就是哎,我们这里边不是有这个呃Sen ID嘛,对吧?哎,这个ID里边是大家看下划线分割了两部分的,哎这样我可以做一个拆词,对吧,把这个ID拆开,前面一部分,后面一部分,然后呢,每一部分我都作为一个数据,要统计一下它里边的字符的个数。
05:21
那这样的话,我输出就相当于一行数据来了之后,最后就输出一张表了,对吧?那这张表里边每一行数据都应该有一个,呃,就是当前的,比方说当前的这个这个这个啊是什么?当前的这个词分出来这个词到底是什么?然后呢,里边它的那个字符个数,叉叉的这个count又是什么对吧?呃,就是有这样的两个字段,呃类似于我们做那个count那个感觉一样是吧?然后拆成这样的一个表,以下划线分割,那前后就有不同的数据,当然就有我们这里边的话,应该这张表就是两行数据对吧?啊就前面一个S,后面一个拆开的一个数据啊,所以接下来我们做这样的一个操作啊,那还是定义一个class,那这个class名字的话,我们就叫做呃,就叫做split吧,对吧,就是拆分嘛,我们知道本身这个底层skyla也好,还是说这个Java也好,也都有这个split方法,我。
06:21
组,这里边只是给大家做一个示例啊,就是这个传入一个separateator对吧,一个分割符,这个是一个string类型,然后它必须要去继承一个,实现一个table function,诶我们把它引入这里边同样还是用flink table function下边的table function,接下来里边有泛型对吧?哎,这个泛型指的就是当前你转换之后输出的那张表里边每一行的数据类型,我们要的这里边一个二元组啊,或者说你定义一个肉,然后再去专门指定类型也是可以的,那个有点麻烦,二元组是最最简单的,我们现在要的不就是一个word对吧?然后一个它里边那个的count嘛,对吧,一个就是这样的一个出,然后里边你看这个没有必须要重写的方法,但是呢,这个名字又不能改,对吧,必须得得去把它定义成一个E对吧,EY6这样的一个求职操作,然后我们这里边呢,就是把。
07:21
把当前的字段string传进来,然后就基于这个separateator去做一个,去做一个这个切分操作,然后切分开的呢,每一个字段,把当前的那个呃,就是差的个数啊字符的个数取出来,我们包装成这一个二元组输出,那输出的时候怎么输出呢?诶这当时给大家说过,在这个table function里边,大家看它自己定义了一个protected的一个属性collector,对吧,叫做collect,然后接下来这个里边你会看到啊,它下边有一个方法就叫做collect对吧?诶,这里边啊有一个方法就叫做collect,它就会调用我们这个collect collect的的方法,然后输出当前的一行数据,对吧?哎,你看就是输出这个out的这个肉,对吧,发射出去我们当前的这个输出的这一行。
08:18
啊,所以跟我们之前的那个,呃,Fla map的那个操作啊,或者process function里边输出数据的那个操作是一模一样的,所以它的底层肯定就是process function嘛,啊那这大家基本上就知道了啊,那当然这里面没有返回值,所以我可以直接啊这里啊,就是直接把它这么一写,返回一个优I的类型对吧?里边的实现呢,那当然就是split,直接按照我们当前传进来的这个去做一个分词对吧?分开之呢,哎,我们就用一个这个方法,然后每一个分开之后的每一个都返回一个,都得到一个这个,呃呃,一个word和它的那个那个数量对吧,差的那个数量,但这里面呢,我们不是直接返回,因为for each大家知道,只是做一个操作而已,我们是要怎么样呢?调用那个collector的collect的方法对吧,但是collector本身是一个protected的一个属性,所以我们直接要它公开的那个collect。
09:18
把的方法就完事了,包装成二元组对吧?这里边我们是word以及word的长度lengths,大家就不用再去判断它的那个差的个数了,对吧?直接length返回就完事儿啊,所以这个整体来讲还是比较简单的一个过程对吧?已经把定义出来了,那关键是我们在代码里边怎么样去调用,怎么样去实现它啊,那同样跟之前的这个过程类似啊,Table API,我们先看一下table API里面怎么样写。这里边首先还得是啊,就是先去创建一个uf的实例对吧,SP我们先把它创建出来,你有一个split,这里要传入下划线作为我们的分割对吧,这边你一个U实例,好,然后接下来table API里面调用,大家就看到了啊,我定义这个table就是我们前面说的,它类似于呃,就就是我们的那个测写函数嘛,所以说它再去做处理的时候呢,是需要把原始的这个table跟我们扩展出来,这个table呢,做一个侧向连接,所以大家看这个table里面有一个方法啊,状语大家知道,一般的这个表状影的话,那就是这个内连接嘛,对吧,那如果要要是侧向连接呢的话,有一个方法叫做draw later对吧?呃,做一个侧向连接,类似于我们那个侧写函数的那个那种方式啊,然后里。
10:49
里边呢,这个表达式,那就是直接调用我们定义好的这一个table function里边把我们要去传的参数传进去ID做一个分词对吧,然后接下来呢,啊,还有就是后面你如果想把它分词之后的这个数据要提取出来显示的话,那是不是还得做一个重命名啊对吧,就是你新的这个表里边的这个这个字段啊,到底叫什么名儿,所以这里边我们as给一个名啊,这个就叫做word,同样还是用这个啊single的方式表达式写出来,还有这个lengths对吧,当前它的这个长度啊,这样的话,我就可以把当前的每一条数据,大家知道这个这个draw,这个侧向连接,其实就相当于一个底耳机对吧,我们当前的每一个,呃,当当前的每每一条数据啊,每一行数据都跟这里边的这个数据做一个这个这个连接操作,然后后边呢,哎呃,这就是说都针对自己的那一行数据。
11:49
跟它里边的这个每一行做一个连接操作,对吧?啊就就就就是自己跟自己的那个ID拆出来的做一个连接操作,那还都是自己嘛啊,那接下来我们就是可以去接下来就可去了,得到这个输出结果了,对吧?这里我们这个了一个括号,是不是外面的那个括号少了对吧?接下来select我们当前要的字段,比方说我要ID对吧?TS,另外呃,我还把这个word和LS也做一个输出,这样的话就稍微会好一点,对吧?后面我们就可以这个做打印输出了啊那呃,另外再给大家来来说一个,呃,这个CQ里边的写法啊,CQ里边的写法,其实呃简单来讲,这个就是还是要去做一个侧向连接,对吧?这个可能稍微会麻烦一点,那首先我们还是先做一个注册吧。
12:49
N createary,先把这个表先注册进来,呃,当前的这个sens啊,写进来,然后里边function对吧?诶,当前我们还是把这个注册成split,然后接下来,诶,当前把这个split的这个实例传进来,那接下来就是得到这个写CQ,得到当前的这个result table了,Result CQ table同样还是基于环境,直接CQ query,好,大家看这个写法就稍微的可能会麻烦一点啊,但其实也还好,诶,Select select什么呢?这里边我要的其实就这么几个字段啊,我直接列在这了,Ids对吧,还有这个,还有然后接下来哪张表呢?诶,这里边我这里边可以直接S,然后他要跟自己的。
13:50
啊,就是这就是基于我们这个呃,Table function得到的那个,就是所谓的侧写函数得到的那个侧向扩展的那张表做一个交叉连接,求笛卡尔机对吧?啊,所以求笛卡尔基的话,我们这交叉连接其实可以直接用逗号分割嘛,对吧,这两张表就相当于直接连在一起了,所以这里边你看呃,我们直接调这个lial lial table对吧,这个没有拼错吧,侧向对吧?然后table啊,Table里边我们这里边就要得到,就是调用这个table function得到这样的一个letter table对吧,Split里边穿插,哎,当前的ID传进来,然后另外呢,As要把它做一个重命名对吧?要不然我们这个word和lengths怎么样定义出来呢?啊,这是重命名,但这是一张表,所以还得有一个表名对吧?哎,我们这个管它叫。
14:50
Le ID吧,然后里边就是word和LENGTHS2个字段,大家看只要做一个这样的定义就完事了啊,所以整体来讲这个还是比较简单的啊啊就就直接把它拿到了,那接下来我们来做一个结果的输出,Result table啊,那大家想这个做呃,做这个这个这个就是我们做这个聚合的过程啊,这个不算聚合,就是做这个连接操作的过程,它涉及到更改我们之前的某每每一条数据嘛,不涉及对吧,因为一条数据来了之后,它就扩展开,扩展出一堆对吧,扩展出一个一张表,然后都插进来,再来一条数据,又是扩展一堆表插插进来,其实都是插入,只不过可能会一对多插入多条数据而已,所以这里边我们就直接还是图判stream,然后里边给一个肉类型,把这个引入后边直接print打印输出,当前是。
15:50
同样的也是转换成流stream,然后打印输出啊,这个过程都是一样的啊,然后最后我们把它执行起来,当前这个是table function test,好,接下来我们来运行一下,看看这个效果怎么样,对吧。
16:13
好,大家看一下现在这个运行结果啊,我们得到的这个数据是什么呢?诶你看得到的啊,第一条数据进来之后,他应该是输出了两条对吧?我们第一条数据其实是三一,然后这个35.8这条数据嘛,十十九十九秒给的这条数据嘛,它分开之后是什么呢?诶拆成了两个word和抗和那个lengths,前面是3SIR,然后这是六个字符,后面是一一个字符对吧?哎,所以后面同样的拆,你看这个,呃,这个CQ里面,这得到的结果都是一样的对吧?然后后面这个三六来了之后呢,也是三是六,六是一对吧,那最后不一样的是这个三四十,大家看到三四十,这里边就是三四六十二,对吧,那就是后面这个是两个字符,这就起到了我们这个拆分统计它的这个字符长度的一个作用,那最后输出CQ的方式和table API的方式结果都是一模一。
17:13
啊,这就是关于table API的table function的用法。
我来说两句