00:00
我们给大家讲了在flink CQ和table API里边系统内置的一些函数啊,这个更多的大家看这个文档的介绍,然后用到的时候去调用就可以啊,那我们这里边重点要给大家说的是,那假如说呃,所有系统内置的函数以后可能会继续扩展,可能会用到更多啊,但是。我们总会有一些特殊的需求,奇奇怪怪的需求,他满足不了啊,没有提供啊,我有些就是要去自己去描述,去定义出来,我的这个需求,定义一个这样的一个方法,一个函数可不可以呢?啊,当然也是可以的,这就像这个CQ里边我们可以写这个udf一样,Flink CQ或者说table API也可以去做这个用户自定义函数udf。这是这个也是table API和CQ里边一个非常重要的特性啊,啊,因为有了这个特性,这就相当于我们之后的这个功能函数就可以不停的扩展了,对吧?啊,就相当于接下来的这个开发,你也可以基于这个自己自己写这个udf去开发嘛,啊所以这里边其实它就是显著的扩展了。
01:08
当前这个table API查询的表达能力。啊,在大多数的情况下,我们这里边要调的这个函数udf啊,必须是先注册在。当前的这个环境里边,然后才能在CQ查询里边使用啊,那当然就是说,如果说你不注册的话也可以用,那就得怎么样,就局限在只能是在这个table API里边,对吧?Table API里边你只要你一个自己定义的那个类的实例啊,那么接下来其实就可以用这个udf函数了,你如果要在CQ里边直接用的话,那必须得先注册啊,这个大家还是对应着之前我们表的那个用法啊,现在是函数。啊,那怎么样去注册这个函数呢?那非常简单,就是register function啊,就像之前我们说已经弃用的那个register table一样啊,当前env,呃,可以调另外一个方法,这个没被弃用,现在还是一样的register。注册一个函。
02:02
那如果说这个已经被注册之后呢,它就会被直接插入到当前这个环境的catalog里边,有对应的那个函数目录,对吧,可以把这个放到里面去,这样的话,接下来如果我们用这个table API或者呃,Flink CQ啊,写这个对应的查询语句的时候,解析器啊,或者说我们那个explainer就可以正确的识别。把它解释出来。啊,那接下来就给大家看看这个不同类型的用户自定义函数udf,呃,首先最简单的一个是所谓的标量函数,就是functions啊,这个是那个标量的意思啊,就是大家知道,就是算出来是一个数对吧,一个一个简单的一个数据,这个就叫标量函数,所以它的特点是。我们定义一个标量函数啊,就可以把这个输入一个呃或者几个参数,或者不输入参数都可以啊,然后呢,得到一个新的标量值,也就是说最后返回的就是一个数,返回一个结果,输入一个或者几个数,得到一个数啊,这就是一个简单的函数的定义,那大家想这不就是最标准的函数吗?对吧,比方说我一个sum啊,那那不就是相当于。
03:18
把这个呃,一个sum,如果是我对两数求和的话,那我就把A和B传进去对吧,然后最后返回一个A加B,哎,这这不就是非常简单的一个呃这样的一个操作吗?啊,那当然对于这个我们表操作而言,Sum可能往往是做聚合了,但你如果只想定义两数求和的这个方法的话,那它也是一个标量函数对吧。啊,那这里边如果要定义标量函数的话,我们就必须当然是必须去实现一个接口,或者说扩展一个一个鸡肋了,这里边。Table API给我们提供的鸡肋叫,呃叫做skill function,就叫做这个名字function,它是在这个flink table functions里边,呃体现提供的啊,这里边我们实现的时候啊,得去实现什么方法呢?大家到时候我们在代码里边一试就会看到啊,它其实并没有这个就是对应的呃必须要实现的那个方法。
04:14
而是怎么样呢?我们要想得到它的结果,里边必须自己写一个evil这样的一个方法,求职方法,大家知道这个是那个,呃,Evaluate对吧,就是相当于估值,呃,这个计算对吧,类似于这样的一个,所以它就是要求值用这个方法。那这里边大家看本身这个function并没有类型定义。啊,所以说所有你返回值的类型全是靠这个E,它这里面的这个类型定义出来的。你这里面返回的是int,那我要是调这个这个函数啊,这个哈希code的这个函数的时候,就是传入一个int值,返回一个int值,就相当于是这样对吧。啊,所以呃,这注意啊,这这里不是不是传入一个int值啊,这里边我们是类里边传入了一个factor,这是我们可能在做哈希的时候调整的一个因子,对吧?呃,真正调这个函数传的时候传的是什么呢?传的是evil的这个参数,传一个string类型进来啊,然后呢,结合我们这个类的对应的那个参数啊,就是用这个类的时候定义的参数,然后呢,最后计算得到结果是一个int数。
05:24
一个哈希的值对吧?啊,这是一个例子,好,我们在代码里边给大家实现一下。这个整体还是比较简单的。我们可以先在这个。当前table test下边新建一个object,但是因为接下来都是这个udf,所以我定义一个udf test这个包吧,然后下边呃,我们定义一个scale function test。里边main函数啊,先把这个写出来,然后这个整体main函数里边这个代码应该跟之前差不多对吧?啊,我们前面就不想再做复杂的这个转换了,我干脆就直接抄之前window里边的内容,哎,我就定义环境,然后创建这个表的执行环境,读取数据,文件里面读取数据后边就卖不成样一类,然后直接转换成表,哎,直接这么干完事儿啊,反正现在好像也不涉及到这个时间相关的东西,我就用这个事件时间定义好了,也不影响嘛。
06:25
直接copy过来啊。这个倒简单啊,但是大家要搞清楚现在到底做了哪些事情,我现在是先读出读成读出数据,读成一个流,Data stream map成样例类,分配时间,Auto mark,然后后边转成了表,对吧,转成了一个table。好,那这里边我们看一下哪里这个有问题啊哦,这里边有那个影视转换的问题,所以我们要把这个对应的这个引入。下划线。哎,这里边的table apila也下划线引入,诶这样就不报错了,对吧,然后接下来我们就想写一个这个类似于呃,自定义的一个哈扣的函数,呃,那所以在定义这个类之前呢。
07:05
我们会想到在外面如果要用的话,那就得有这个类的一个实例,注册成我们的一个表,然后后边才能够直接调用,所以在外边我们最起码是要先实现这个类,对吧?哎,我们自定义一个。球。哈希code的。呃,这个标量函数。好,那这里边class,我就把它叫做哈希code。里边要传一个值,哎,我这个管它叫因子对吧,就是在做那个哈希调整的时候,给一个乘的一个数,然后它需要需要实现的是这个function。呃,它没有,它没有对应的那个类型,对吧,我把这个引入这里,我们用这个flink table functions function。啊,然后这里边大家看到啊,就是点进来之后,这是个抽象类嘛,里边它实现的这些方法好像没有必须要override的东西,对吧,大家看一下这里边啊,一个get kind,一个get parameter types,一个get result types,就好像都是跟这个类型啊,或者说这些东西呃相关,那这里面其实好像没有必须要实现的求值到底怎么求的方法。
08:21
啊,所以说这里边比较特殊啊,这就是现在这个呃,Table API里边,它的udf函数写的一个让人觉得不太舒服的地方,这这应该也是现在调整过程当中,就是没有没有统一没有做好的一个地方啊,就是这个东西是相当于在底层他写死了,就要判断这里边必须得有一个方法叫做evil对吧?啊,所以就是说这里边我必须去这么定义,但是呢,他又没有写在我们的那个抽象类或者是接口里边啊,这个就是就告诉你,你在里边没有,但是你必须把这个实现啊,要不然的话你就算不出值来。这个就是确实有点奇怪啊,这个大家也不用太在意,以后可能这个会调整的啊啊那当然这里面evil,我就是你要算什么嘛,算谁的哈希值哎,我传一个value进来对吧,这可能一般就是一个string了。
09:13
然后得到的类型,那应该是一个int对吧,我算成一个整形值,把它返回。啊,在里边具体的计算啊,这个我们就不用具体去算了啊,大家可能知道你要做一个这个哈希转换的时候,那可能比较好的方式是什么呢?就把它这这不是string嘛,对吧,把里边每一位都拿出来对吧,做一个转换,做一个包装,然后再再重复的不停的去调用啊那个稍微麻烦一点,因为我们知道在Java里边本身是有这个,就SKY里边本身是有这个,呃,哈希函数这个调用的啊,所以这里边我就偷懒了啊。这就好像没有什么特别,呃,就是没有什么特别的意义啊,本身人家已经给我们实现了,我这里边直接又调一下啊,直接把这个哈希code我们。
10:01
放放出来,然后乘以这个factor啊,这里面就是你传进来的这个因子,我给它稍微的调整一下,对吧?啊这样做一个。哈希的求职啊。我们主要是看它怎么用啊,这个已经定义出来了,那接下来这个这个玩意儿到底怎么用呢?啊,那我们使用这样啊。使用自定义的。哈希函数,呃,我们求谁的哈希值吧,这里边我们string好像只有ID对吧?我们就是求ID的哈希哈希值。好,那接下来我们这个就是先要去做一个基本的创建一个这个当前这个类的一个实例,然后才能用啊,要不然的话,你这个函数,那那你这里面本来是类啊,对吧,怎么用函数的方式调用呢?啊,它可不像我们呃,当然这里面大家可以认为这还是一个函数类对吧?啊这个跟呃,Flink底层啊,Data streamam API process API process方式,API这个调用的方式是类似的,但是我们这里边要调函数是得在类似于CQ的那个写法里边去调用啊,啊那你就不能直接用这个类了嘛,所以我们这里边这样啊,先定义一个变量,就叫当前的这个哈西扣。
11:23
啊,那我去拗一个啊。你有一个哈code对吧,里边还要传一个呃,Int类型的一个参数啊,那比方说我这里边。呃,或者说这个参数可大可小对吧,我这个给个double类型吧,这个可能看起来稍微靠谱一点啊。乘以这个double啊,当然这个类型就就得转了啊,最后我得到的这个M值哈希值必须是一个int类型吧,我再tot把它转换过来。呃呃,不是成double啊,成factor对吧,成factor,最后我再to把它转换回来,这样的话就可以了,那这里边我可以随便传一个原,传一个1.23对吧?啊,这是一个double类型,然后接下来定义我们的这个。
12:06
结果表。这里边的使用基于之前的这个sensor table啊,呃,那那现在我们其实没有什么聚合,只是相当于对里边的某个字段做一个调整,这就相当于一个map一样,是吧,Map里边的某一个字段一样啊,这里边其实就是直接select就完事了,Select的时候我们得到每一个的ID啊,比方说我还想得到当前的那个是TS,我们已经给了这个别名了吧,对,叫TS,呃,TS也拿出来,然后后边啊,我们再来一个当前这个ID的哈希code,那直接。用函数的这个写法对吧,ID的哈希code这样写就可以了。最后我们把它转换成流打印输出。这个转换。成流打印输出就是给大家看一看这个测试的结果啊,Result table。
13:07
呃,这个当然是to a pen就完事对吧,A pen stream啊,大家会发现其实有些流是如果做了更改啊,做了那个调整之后,它是不能to a pen stream。那那如果说本身他没有做过更改的啊,只是在后面追加的那些表,其实to retract stream也可以,对不对,所以retract stream其实是通用的一种输出方式啊,啊就是你不管什么来了之后直接to retra stream啊,大不了就是前面那个都是触码,都是新增的,这个是完全可以的,那stream呢,那那这个你就得分析好了,只有他在没有做过那个聚合更新过操作的那种情况下,才可以涂a pan stream。啊,在这里面类型我们还是直接给一个肉就完了啊,还是把这个flink types肉把这个引入,然后直接打印输出当前的result。好,那我们来看一看这个,呃,或者我们直接把另外一种方式也给大家都写完,一起来看结果吧,对吧?啊,这个比较简单,那这种方式是table API的。
14:11
这个啊,Table API。调用方式。然后接下来给大家讲一下这个。CQ调用方式啊,那同样这里边我们要做这个CQ的转换的时候,大家注意啊,现在我们是表也没有,就环境里边啊,这转换过来,这是从流转换过来的嘛,对吧,这现在是表也没有,是函数也没有啊,函数也没有注册过呀,所以接下来是首先要。首先要注册表和函数自定义函数udf啊,那所以用的就是table env啊,但是如果要想register这个那个table的话,想用这个漆用的这个方法也可以对吧,但是我们知道现在不推荐用了,用这个create temporary啊,这里边我们注册一个SS。
15:07
然后接下来是sensor。把这个放进来啊,那同样下面我们要注册一个register,一个function,诶这里边这个function名我直接就叫哈希code啊,但是这个跟前面定义的是一样的啊,你定义不一样的也是一样,呃也也是可以对吧,把上面定义好的那个哈希code传进来。啊,这样的话在CQ里边也可以直接调用了啊呃,我们定义一个结果。表CQ CQ table同样也是一个表啊,呃,当然现在是要用那个table env.CQ query去写这个具体的CQ了。啊,这里边非常简单select啊,其实我都不需要这个换行的,对吧,Select ID。和呃,这个TS哈希扣的。
16:01
ID。From,呃,当前的这个sensor,这这个表对吧,这样的话就可以把它得到了,最后我们打印输出table to a pen stream。同样还是给一个肉,最后print给一个不同的名称,我们这个叫CQ啊。大家来看一看这个效果到底怎么样啊,当然这个最后不要忘记,还有这个执行起来,当前我们是一个。Udf对吧,Function的一个测试啊,Test draw。好,运行一下看看。看看这个运行的结果。好,大家看到两条流都正常输出了对吧,我们看一下现在我们做的输入的还是这一条一条数据啊,大家看输入一个341,这result和CQ都输出了对吧?啊对应的这个时间戳,这是19秒的这个,呃,然后后边大家看这个经过这个我我们做的这个哈希转换之后,大家看3EN41变成了这个有点像时间戳一样是吧,1513这样的一个东西。
17:09
啊,然后后边呃,我们做那个三四十六,大家看进来之后啊,这个是221这个时间点来的,那对应的就是201了,对吧?呃,这个数据呢,它是,呃,经过那个哈希转换之后是1520,呃,那后面这个3471521,诶这看起来跟这个346非常接近啊,说明我们这个哈希做的不太好对吧?这个如果要是这个非常接近的话,这个其实是不是很好的,但是我们看到这个SEN40做了哈希之后,这个变化就大了,对吧,这个是5791啊这个结果就是这样,那后边如果再来341的话,他得到的这个1513跟前面还是一样,对吧?因为是哈希值嘛,所以说这个后面三四十一得到的都是一样的东西。正是我们这个计算结果,而且这个写CQ和用调用这个table API得到的结果是完全一致。
18:02
大家下来之后可以把这个练一练啊。
我来说两句