00:00
标量函数是flink CQ提供的udf类型当中最为简单的一个,我们可以把它理解成简单的MAP1对一的转换。输入的动态表里边每一行数据到来之后,经过标量函数的转换计算都会得到。一个新的一行数据的输出插入到当前的结果动态表里,这是标量函数。那接下来我们再来介绍第二类,就是所谓的表函数K轴方式。那其实它跟标量函数有类似的地方,就是他们的输入都是零个,一个或多个标量值,也就是说当前我们想要调用的时候呢,都是针对一行数据去调用当前的表函数的。不同的是,当我们得到结果动态表的时候呢?之前的标量函数是只得到一行输出,而当前的表函数它可以得到任意多行输出。
01:02
那其实我们想到了任意多行输出的话,这某种意义上来讲不就是一张表吗?所以我们可以把这个表函数看成是返回了一张表的函数,它是一个一对多的关系,有点类似之前data stream API里的。Flat map。因为每一行数据可以把它扩展成多行进行输出啊,那之前我们介绍的这个窗口TF啊,进行这个表格扩展,扩展对应的这个窗口属性字段,本质上呢,它也是一种表函数的一个应用。那我们说如果想要实现自定义的标量函数,我们要实现的是一个抽象类,那同样想要实现自定义的表函数的话,要实现的是一个自定义的。Table function抽象类。也完全一样的是,它的内部也必须要实现一个叫做evil的求职方法,也就是说之前我们那个skill方里边不是要输入一行数据对应的某几个字段,然后得到一个值返回输出吗?诶,那当前我们这个想要实现这个表函数的时候呢,里边也是传入一行数据里边的多个字段,然后调一个E方法,得到对应的结果做输出,哎,那不同的是之前我们只返回一行数据,很显然就可以直接一次调用就完了啊,那直接有一个返回值做返回就可以,那现在如果说我们要返回多行数据的话,很明显那就不是一个返回值能够搞定的了。
02:40
所以当前这个table呢,它还是有区别的。到源码里边具体去看一下table function。那么这个贴波方式,我们看到它跟之前skill方式就不一样,它是有一个泛型的,所以这个泛型本质上其实就是它要返回的数据类型啊,那所以这里边它的返回其实是什么呢?就是我们想要通过计算扩展出的对应的那些属性字段啊,那这个类型就是你想扩展一列的话,那就是一个基本类型,如果扩展两列三列更多的话,那怎么办呢?我们可以把它包装成原组,也可以使用tableable API里边的肉行这样的一个数据类型进行一个包装,这都是可以的。
03:30
那另外一个就是我们当前既然有这样的一个返回类型,在当前的类里边直接以泛行的形式定义了,那后边呢,Evil方法调用的时候,它就不会直接返回具体的。结果了,而是要通过一个我们看到里面有一个collector。Collect collector它的泛型就是T,所以当前的这个collector很明显就是会有一个,我们看这是对应的还是实现了这个,呃,Collect接口啊,我们可以直接调用collector的collect的方法去进行一个输出,这个我们非常熟悉了,Data three vpi里面Fla map还有process function,他们不都是,诶,当前我要实现的那个Fla map方法和process element方法里面都没有对应的返回值吗?我们也都是调用alt.collect进行。
04:25
呃,输出数据的一个发送的,所以这里边的实现就真的是跟我们之前熟悉的flat map是非常的相似,它是一个一对多的转换。那接下来还没完,因为我们想到当前这个一国方法,相当于只是得到了我们想要扩展的那一个或者多个字段,那这一个或者多个字段怎么样跟之前原始的这个数据进行一个连接,进行一个啊,进行对应的一个真正意义上表的一个扩展呢?那我们自然就想到了当前我可以把每一行数据扩展出的这些行。
05:06
可以让他们做一个表的draw嘛,按照每一行对应的这个关系进行一个join,然后拼成一个完整的大表,这不就相当于把之前的这张表做了一个扩展吗?诶,那所以这个功能我们之前应该也都是非常的熟悉,这就相当于是。在汉里边我们实现了一个UDTF,然后接下来可以使用侧向视图啊let这样一个功能,就可以把一行数据拆分成多行,那同样CQ里边呢,也提供了类似的功能,就是我们在CQ里边只不过调用的不是view,是lateral c这样一个语法,那这个语法其实是标准CQ里边的用法啊,在很多呃一些。关系型数据库里边是提供类似的用法的,具体的使用过程我们可以看一个具体的例子啊,我们可以看到当前首先定义了一个自定义的table function啊,那我们返回的类型是什么呢?诶,这里大家看到返回的类型是肉。
06:12
也就是当前可能有多个返回字段,所以说我们把它扩展成了一个行,一个肉,那这里这个肉就得单独的定义一下它的类型了,因为我并不知道里边每一个字段具体是什么类型啊,所以我们看到上面是加了一个function hint,然后给了一个data type hint,这里面at这个标注指定的就是当前类型的一个标签啊,类型的扩展出来的一个类型的指定,我们现在要的是一个word这样这样一个字段,它是string类型,另外还有一个lengths,它的长度是一个int类型。相当于是一个二元组了。然后我们看到里边的E方法呢,没有任何的返回值,传入一个string类型的参数,把某一个string类型的字段传传进来,接下来呢,哎,是把它做了一个拆分啊,拆分然后返回了对应的每一个字段,以及它的长度,我们看到返回的时候直接调用collect方法啊,因为这里面我们在源码里边啊,也可以看到c function当中,我们可以看到就是有一个collect方法的。
07:20
啊,就是一个final方法,那这个方法呢,直接调用的就是当前自己定义的这个collect的啊,那所以直接调用就可以了,所以这样的话,我们就相当于每来一行数据都会把对应的这个string做一个拆分,拆分成多行,然后进行输出,那输出出来之后的这多行在原始的那个表里边怎么样去扩展呢?呃,我们首先同样还是先把它注册进来,Create temporary方式,把它注册到系统表环境里,然后接下来呢,哎,我们可以使用交叉连接的方式。也就是直接啊FROM2张表,逗号分割,这就是一个cross啊,一个完整的卡机的一个匹配,那这里面呢,就是原始的数据表,然后。
08:11
这个侧向表,侧向表里边是什么呢?诶,就是调用我们的表函数,传入对应参数啊,然后得到结果,把这张侧向表跟原始数据表做一个交叉连接就可以了。那当然了,我们知道对于这种呃,侧向视图的使用,或者说侧向表的使用呢,可以用交叉连接也可以,它完全等价于带on数条件的左连接啊,所以另外一种方法就是直接做一个my table和。这个table啊,这两张表的left draw后面加一个on数条件,当然了,这个就相对来讲更语法更复杂,更多一点,所以往往我们直接用交叉连接这种方式就可以了。另外需要多说的是,还可以对这个测向表里边的字段进行一个。
09:02
重命名啊,就比如说我们在这里啊,在上面我们要做提取的时候,它是直接就把对应的这个字段,我们就按照它的名称啊,Word length直接就提取出来了,这个名称是在哪里定义的呢?哎,是在这里我们有肉对应的这个名称和类型的定义吗?那假如说这里我们没有对应的定义,这里又该怎么提取呢?哎,我们可以给它做一个重命名,加一个S。然后加上一个T,里边就是我们对应的力,使用这个表函数啊,得到的扩展出来的新字段的名称,我们可以把它叫做new word和newle,那么在上面如果要select的话,就可以直接select对应的字段出来了。这就是表函数的一个应用,我们看可以在具体代码当中再来做一个实现和测试。我们新建一个Java类。
10:01
这个同样还是udf的测试。只不过我们现在是table。那前面的内容跟function就是完全类似,所以我们可以直接照抄。干脆把这个没方法都直接抄过来吧。整体的实现是差不多的,关键就在于接下来我们首先要注册一个自定义的表函数,然后调用udf进行查询转换,最后转换成打印输出,过程都是完全一样啊。关键就在于我们要实现一个。自定义的表函数啊,那我们这里想要实现的功能也就简单一点,跟文档里边想要做的操作是一致的,就是直接做一个词缀类型字段的拆分,我们当前的这个数据里边。我们干脆按照这个问号做一个拆分吧,啊,因为这里面有可能有这个访问某一个页面商品页面的数据,我们干脆呢,就把这个问号做一个拆分,拆分出来前边的这一部分,以及后边ID啊,等于多少的这一部分,那这一部分我们就还是啊,就直接把对应的这个LAS做一个打印啊,看它这个长度到底有多少就可以了。
11:17
尽管没有太多的实际意义,但是我们可以看到对应测试出来的效果。所以接下来public static class。这里我们定义一个my split,其实主要就是做了对应的一个拆分。Extend当前这个抽象抽象类叫做table function,它是有的啊,那我们可以在这里直接把它定义成肉,那在上面就要加一个type hint了,我们这里呢,也可以简单起见,直接给它定义成二元组就可以了,好呢,我们这里就定义一个Java的二元组类型,里边定义词,这就是。这里需要是二啊,然后是string和。
12:05
需要把这个二元组的Java类型引入,接下来啊,同样这里我们看到并没有默认必须要实现的抽象方法,那我们呢?呃,这里面必须去写一个方法。尽管没有要求,但是我们必须这么写。传入一个令类型的str啊,那这里边我们可以首先对它做一个SP啊,做一个拆分,我们这里要做的是基于问号去做拆分啊,那当然了,这里我们需要去把它做一个转译了啊,因为问号本身是。里边的匹配符啊,那所以这里边我们把它拿到之后,这个就叫做。提取出来的fields。一些一些属性字段啊,那接下来呢,我们就把所有的这些fields每一个进行一个处理。每一个field拿出来啊,从S里面接下来直接调用方法。
13:07
里边当然是要创建一个新的二元组进行输出了。二点,然后里边我们就是当前的field以及这个点,这样直接输出可以。后面我们少了一个括号,这样的话整个语法就是正确。然后接下来就是在上边需要去注册自定义的表函数了,调用create temper。Temporary方法,这里我们注册的可以叫做my split。那当然了,这里直接把这个就要过来。用当前定义好的,接下来我们这个CQ在使用这个udf进行查询转换的时候,这个语法就会稍微的复杂一点啊,那我们这里边可能需要的是。User啊,然后呢,我们把URL也列出来,那另外呢,我们就是针对这个URL里边的问号要做一个拆分嘛,啊,那后边还应该有当前的word以及当前的。
14:13
在这里可以换一行,看的更清晰,From接下来要做一个交叉连接,原始的click table以及一个侧向表啊,那这个侧向表就是。Table。我们这里是传入了一个my split。定义好的表函数啊,里边它的参数的话是URL,我们要针对问号进行切分。外边我们可以把它做一个重命名,因为我们当前并没有type去指定它的字段名称和类型嘛,我们这里做一个重命名,当前叫做word和。L。这样的话,就把当前想要提取的字段全部都定义好,最后可以把这个result做一个打印输出,诶,那同样我们会想到当前直接to date可以吗?诶,肯定是没有问题的,因为同样都是一行数据来了之后就直接转换成,哎,我们当前能想到啊,就一个问号,那应该就是两行转换成两行输出肯定是一个追加,每来一行转换成两行。
15:24
添加在结果动态表的后面就可以了,所以直接完全没有问题,接下来我们可以做一个测试运行,看看结果怎么样。我们可以看到当前输出了对应的结果,每一个用户和他点击的URL,另外呢,后边有。按照问号分割之后,前段和后段的当前的字符串以及对应的长度啊,那当然了,前面这个home cart它都没有问号,那当然就是原封不动的输出了啊,那至于后边如果有问号的话,我们看来一条数据就会拆分成两行输出啊,那前面是点。
16:10
后边是ID等于100啊,那当然了,它们两个结合起来的话,这个长度是不一样的,前面是七,后面是六啊,那同样后边car home这些都是都是没有问号,所以来一条数据只有一行输出,而如果要是来了一个product ID等于十的话,就会有对应的两行输出。所有的数据都是加I,都是追加到结果动态表后边。这就是表函数的使用。
我来说两句