00:00
我们前面讲到的是标量函数,那接下来呢,再给大家讲另外一类udf,这个其实用起来可能没有标量函数那么广泛,但是这个其实一讲大家就知道它是它是什么情况了啊,知道是怎么回事了,这就是所谓的表函数,表函数叫推波function式,那么它的含义指的是把零个一个或多个标量值作为输入参数,这个表述跟之前skill方式是一样的,对吧?呃,还是啊,零个一个或多个标量值,那大家想这是不是就相当于是一条数据对应的字段要提取出来啊啊,这个还是同样的啊,然后接下来是跟标量函数不一样的是呢,它返回的数量,标量函数是只返回一个标量值,而现在表函数可以返回任意数量的行作为输出。那大家直观上理解的话,这个有点像什么呀?诶大家想啊,对,这这个类似于我们之前给就是在have里边大家学过的,像这个,呃,像这个爆炸函数对吧?呃,像这种,其实这应该是我们后面讲到的这个,呃测写侧写函数对吧?呃,类似于这个测写视图这样的一些东西啊,所以这里面的表函数,它类似于也就是把一行的数据可以直接拆成多行直接输出,对吧?那如果我们要是在把这个思路再稍微的扩展一下的话,联系到之前我们讲到的电stream API,大家能想到什么?
01:34
大家感觉这个过程scale function标量函数是不是就是是不是就是一对一啊,来一条走一条,来一条走一条对吧,那中间做一个转换就完事了,那现在是不是就相当于一对多,来一条之后可以拆成多个输出,大家联想联想起来,对,是不是就相当于skill function,相当于之前我们讲的map,现在表函数是不是就相当于我们的flat map呀?哎,所以大家看到它的这个过程其实是相通的啊,是类似的,我们现在所谓的呃,你像所谓的这个爆炸函数啊,所谓的这个测写函数什么的,其实也就是起到了一个flat map这样的一个作用,就是来一条数据进来之后,我们可以把它做一个拆分,可以做多条输出。
02:20
啊,那么同样我们要定义这个表函数的时候呢,大家要去实现继承的是,呃,这个table function这样一个类,对吧,这样一个抽象类,那么它里边跟我们前面那个skill function也是完全类似,必须有一个evil方法,这是我们必须要声明成public,然后去做对应的定义的,然后大家自然就想到了,既然它是输出多行嘛。那它里边是不是就不应该有一个返回值啊,因为有一个返回值的话,那大家想它是不是就只能一个,只能return一次对吧?那现在我们想它是类似于这个flat map那样的一个东西要输出多次嘛,那它怎么输出呢?
03:01
诶对,大家看这里边就是直接collect对吧,它用了这种方法去输出,所以接下来我们在代码里边也是把这个来实现一下啊呃,然后我们当前实现的需求就跟这个类似,我们要实现一个什么需求呢?大家看是类似于一个切分split这样的一个功能,那我到底要切分什么呢?现在我们不是有这个。有这个ID吗?呃,其实大家会想到在有一些场景下啊,我们对于ID的设计是有是有一定的设计的标准的,就比方说ID里边可能是用连字符连接起来的好几个字段里边,可能前面的字段表示,哎,比方说呃,你像这个传感器啊,有可能前面表示哎当前它所处的大区域对吧,后面那个表示它的一个当前的一个小范围区域啊在后面有可能是一个一个一个一个这个厂的一个编号对吧?在后面一个设备编号有可能是这样的一串串下来的,所以大家想有些场景下,我是不是应该要对这里边的字段做一个解析啊,那大家想我要解析的话,是不是解析出来之后应该是一条数据就对应着多个字段,然后解析出多条啊,哎,那所以就可以用这样的一个表函数来做一个实现,比方说这里边我最简单的一个啊,就是我们那个34ID不是有下划线连接吗?前面是一个三四这个名称对吧,后面是一个编号啊一或者346347,那所以我简单。
04:23
他们做一个操作,就是按下划线把它分开,分开之后前面后边每一个每一部分都统计一下它的。那个词的当前这个string啊,字符的个数对吧?啊,就相当于我最后包装成一个二元组,输出一个当前的string和它的那个长度,Inte的一个长度,接下来我们在代码里边实现一下这一部分。呃,那前面我们已经讲过这个skill function的定义和使用的流程,现在大家就知道差不多对吧,这个整体来讲都差不多,所以。Udf test2,我们这个table function先创建出测试的这个类来,前面的流程是不是就就应该跟标量函数几乎完全一样啊啊,所以这个我就干脆直接把这个main方法直接抄下来了啊,Copy过来。
05:18
全部先copy过来,然后接下来我们的不同其实就是,呃,就应该是这里边我们这个函数就不一样了,对吧?呃,这里边当然就是我要实现的就是一个表函数啊,Table方式,那么我们要实现的是。将ID拆分并输出,就是拆分开之后的那个word和lengths对吧,每一个词以及它当前这个词的字符串长度,那呃,后边这当然就不是哈扣的啊,我们把这个就先删掉了,然后后面还要注册对吧?这个我们也空下,呃,然后下边我们去做对应的这个输出,做做一个计算,呃,那首先接下来我们就是要自定义这样的一个表函数了,实现自定义。
06:15
Table方式。呃,Public static class,呃,当前我们就直接把它叫成SP吧,还是叫成这个切分这样一个操作,然后CE,当前这个是table function对吧,大家看就是叫做table function,注意它是不是后边还有泛型啊。哎,那大家想一下这个table function他要的泛型是什么呢?大家注意,这里边就是它定义的。Output的每一行的对应的那个类型,对吧?大家想一下为什么是output呢?因为输入的类型它是不是要提取我当前这个每一行里边的字段啊,对吧?你到底要提取哪个字段?所以输入的类型我们是可以直接在参数里边指定的,但是大家注意,现在既然是类似于Fla map,它是不是没有输出的类型啊,这里边不定义对吧?它要靠collect去输出,那是是我们就必须在泛型里面指定啊,要不然我怎么知道这里面collect你到底输出什么呢?啊,所以我们必须在外边。
07:22
Table function定义的时候给一个泛型,当前我们要的是words和lengths的这这样一个二元组,所以我包装成一个temple two这样一个二元组,用Java的这个元组类型里边就应该是一个word,一个string,对吧?哎,那另外还有它的长度,Length的话那是个TE。这就是我们实现的过程,对吧?当然了,下边大家看它没有任何可以实现的方法,对吧?你这里边呃,能实现的就是这个,呃,User,呃就是function DeFinition里边的,呃,Get requirements啊什么的,这些东西都没什么用,我们说最需要的啊,必须的那个其实是一个一个evil方法,对不对啊,必须实现一个evil方法。
08:13
注意没有返回值对不对,没有返回值,所以这里边还是必须这么写,Public VO e,然后里边的参数,我们现在既然是要针对那个ID去做一个拆分嘛。那所以这里面我应该是有一个string对吧,然后里边的具体的这个写法的话,那当然就是我对当前的这个string按照,哎,我们当前是要按照下划线去做拆分,大家会想到这个分割符是不是也可以作为一个属性传进来啊,诶,所以当前我们更通用化一些的话,我们可以定义属性对吧,私有属性分隔符。所以这里边private private,呃,定义一个string,我把它叫做separateator对吧?分割符啊,比方说这里边我默认是逗号,但是后边我传的时候调构造方法,是不是可以把对应的这个传进来啊,我当前应该传的是下划线对吧?啊这样的话就可以了,然后接下来我当然是要是不是要string,要split这个separateator啊,然后得到一个string类型的数组,然后是不是得遍历这个数组,把每一行都统计它的那个长度,然后输出啊诶,那这里边大家会想到其实就是一个for循环嘛,遍历嘛,对吧?所以我直接就写在这里边就完事了啊,直接每一个在这个string做过str,做过split操作之后啊,用当前的这个separateator啊,做过拆分操作之后,得到的结果是不是都要来一个输出啊,呃,都要输出。
09:56
我们这个二元组对吧?诶那这里边这个问题就在于我这里边输出的时候怎么调那个click方法呢?诶对大家可以看一下,就是当前这个table function里边啊。
10:08
其实大家想到它里边一定是得有一个类似于我们之前那个out.collect大家知道out是个什么吧,Out是个是不是个collector啊,所以大家看它里边是不是默认底层也有一个collector啊,那我们最终是不是应该调这个collector的collect的方法啊,但是大家看到collector是一个。Protected的一个属性对吧,哎,所以下边它其实直接就有一个公开的方法,大家看到就叫做。Collect,然后collect调的其实是不是就是就是,呃,当然这个不是公开的方法啊,是protected的方法对吧?啊就是我们这里边当然protected,你既然是继承关系嘛,可以调对吧?呃,那接下来这个class里边,它其实调的就是。Collector点对吧?啊,所以我们直接调这个方法就完事了。所以接下来在这里边。
11:01
是不是直接collect啊,直接调大家看啊,你也可以直接用这个collector对吧?啊,我们这里边直接就调这个collect的方法就可以了,里边要去你一个二元组里边当前的这个是不是就是把S传进来,另外s.lengths传进来是不是就完事了,哎,这就是我们想要得到的这个结果啊,一对多每一每一个是不是都调C的方法,就会输出一个啊啊输出这样的一个二元组的一个结果。这是我们定义的这个table function,那大家想一下这个table function应该怎么样去使用呢?首先还是应该在这个我们先创建一个实例对吧?呃,首先这个定义一个split split啊,New一个split里边我需要传入当前下划线作为这个分隔符,对吧?先把它定义出来,然后接下来我们是不是还是在环境里面要去做一个注册呀,对吧?Split啊,我把它定义注册成就叫做split注册进来,然后接下来我要去做这个操作的时候,大家想一下这个表函数这个要怎么使用呢?
12:14
这是不是接下来我得用一个这个就是侧写连接的这种方式啊,对吧?啊大家知道这个呃,MYSQL啊,就是CQL本身,呃,源语里边,它其实支持这样的一个侧写连接的啊,侧向连接对吧?或者叫就是所谓的这个呃,Draw这样的一个方法的啊,那在那个大家之前熟悉的过程里面也是有这个let let view对吧?啊其实差不多的啊,就是我们可以基于当前的表,然后跟经过表函数扩展出来的这个,这相当于另外一张表了,对吧,我做一个侧写连接,连接起来之后,是不是就相当于在之前的那个表基础上扩充了字段啊,一行相当于就扩充成好几行了,对不对,然后又加上了追加上了新的字段啊,所以接下来我们做的操作其实就是一个。
13:07
这就不是一个简单的select了,接下来应该是首先是不是要做一个不是draw,是draw later对吧,做一个侧向连接,然后里边当然就是要传我们当前的这一个。这个table function了对吧,类似于我们的这个测写函数一样啊split啊,大家看我是直接就这样写啊split,然后传对应的参数,我们传是不是传ID啊。我当年是不是就传一个string进来,然后就拆分了啊,所以跟之前那个调用方法一样啊,Spli ID,然后注意后边你如果还想要这里的字段的话,那是不是应该给一个别名啊,哎,所以这里边我来一个as,呃,我叫做word和lets,这样把它定义好,下边就可以做select做一个提取了,比方说我现在要的是idts啊,另外我还要当前的word和length。
14:06
这就是我们整个定义这一个,呃,所谓的table function表函数,然后做侧写连接时候提取它里面的内容的过程。啊,这就是这样的一个写法,好,这个是table API的做法啊,那CQ怎么写呢?CQ当然还是我们先把这个表三四先注册进来,对吧,这个过程是一样的,然后接下来就是q query啊,CQ query这里边写的时候,这个大家要注意就是涉及到一个这个,呃,就首先还是ID啊,TS,然后这里边就不是哈西code的ID了,我们要提的是word和lengths,对吧,是要的这几个字段,那接下来有一个问题,就是我from的时候,这个from什么呢?那是不是就要sensr和哎,我们的另外的一个这个这个表函数要做一个得到那张表要做一个连接,对吧?哎,那在这里边我可以就是当前的这个flink CQ支持的这个语法是我可以直接让它做一个交叉连接,大家知道逗号分割的两个表是不是直接做交叉连接啊,但是我把后边这个定义成一个lateral lateral table。
15:18
对吧,我直接定义成这样一个侧写表,这就有点像我们那个letter view一样,对吧,跟那个letter view是非常类似的定义啊啊,然后这里边,呃,我们把后边的这个写在里边,Split ID,对吧?把它写在这里边,然后后边呢,As我可以定义一个别名,注意这个别名的话,首先得是这个table的一个别名对吧,得有一个表名啊,所以我比方说我把它叫做split ID,然后接下来里边应该有两个字段,对吧,Word lengths,大家看是这样的一个写法,类似于之前大家熟悉的这个呃,Letter view的那种用法。好,那所以接下来我们看一下这个结果到底长什么样吧,运行一下。
16:05
哦,大家看到这里边我们报错,这明显是里边有有地方这个写的表达式不对,对吧,大家看这里边其实就是这里边写的这个拼错了,是不是这这个sweet啊拼写拼错了,但是后面这个无所谓,但是我们也还是改过来吧。哎哎,这个不对,诶这个是对的的是吧,上面写错了是不是下边这个是对的啊,就是split对吧,我们当哦这个是split是对的对吧?这儿写错了啊,Split之前我们这个函数就拼错了啊,所以都改过来吧。Split。好把这个都改过来,然后下边把这个名称我们也改一下。好,这个应该就没有问题了,对吧把,诶这里边还在报错,我们看一眼。
17:05
哦,这里边这个还是定义错了啊sweet对吧,好这样就没问题了,接下来我们来运行一下。大家会想到最终我们应该得到的是不是应该大家想是不是就是那边的一条输入数据,这里面应该有多条输出啊,对吧,应该是这样啊,我们看一下结果。这里其实我们只要看一个result或者CQ就可以了,都是一样的,大家看一条数据,199这个数据输入之后,它是不是输出了两条输出啊,然后输出的是,哎,大家看拆开是34611,这是什么意思呢?这明明不是3ENOR1的一条数据嘛,啊,大家注意这个六指的是对sensor的字符长度对不对对吧?这这字符串我们不是要统计它的那个长度嘛,所以你看三四,第一个词是三四,然后长度是六,这个后第二个词是一,对吧,长度是一是这个意思啊,所以你看后边这个如果是346数据来了之后,是不是也是34661啊啊,这是一样的啊,那后面你看这个三四十来的时候,是不是就是三四六十二对吧?哎,这其实这个应该叫一零啊,这我们统计的本来是那个字符串拆分开之后。
18:19
统计了一下它的长度,所以每一条数据来了之后都会拆成这么两行,但是如果前面你对于这个ID啊,定义的时候是下划线连接的好几个字段的话,大家想是不是我就可以拆出好几条数据啊?就非常类似于之前我们讲的这个,呃,对应的这个flat map操作,所以scale function是一个一对一,是类似于map。而这个。呃,Table function就类似于一个flat map1对多的一个关系啊,这就是一个具体的实现。
我来说两句