00:00
了解了table API和CQ当中UD的整体调用流程,那接下来我们就来按照不同的类型分别做一个实现和讲解。那首先呢,最简单的当然就是第一类标量函数了,Function,那我们所说的标量函数,它是把多个标量值针对每一行啊,同一行数据的多个标量值转换成一个值输出啊,那所以呢,它的输入可以是零个,一个或多个标量值。然后它的输出呢,当然就是一个新的标量值啊,那所以当前如果说从输入和输出。动态表的这个转换关系来看的话,我们当前其实就是一行数据,经过转换、提取、计算之后得到了一行新的数据,所以它其实是一对一的一个转换关系,类似于我们所说的map。
01:01
啊,这就是一个map的转换关系,那如果说我们想要实现自定义的标量函数呢?Flink CQ给我们提供了底层的一个抽象类,就叫做function。里边有一个叫做evil的求值方法啊,其实就是评估求值的这样一个方法,那标量函数的行为呢,主要就是在这个方法里边去定义的啊,这里需要注意的是,它并不是当前这个抽象类里边定义好的抽象方法。呃,但是呢,它有要求,就是必须它是公有的,而且名字必须叫evil,所以这里边有点有点奇怪啊啊,我们在这个,呃,想要实现这样一个抽象类的时候呢,并不能直接去重写这样一个方法,但是呢,我们又必须自己去做声明,把它声明成public evil这个函数名称也方法名称也不能变啊,这是它的底层框架要求的啊,我们必须这么去做,所以很明显现在的table API和Q。
02:01
整个架构上应该还是有一些值得完善的地方,我们相信在未来的版版本当中会有所改进啊,那呃,当前能够调用的方式呢,我们就只能是这样去直接把它写死了。那。与scale function以及其他所有的udf接口都在一个包下边,就是org阿帕奇Flink.table.functions下边,那接下来我们就可以在代码里边看一个具体的例子了,我们可以来实现一个所谓的自定义求哈希函数的这样一个方式啊,比方说我们就叫做MY哈希方式,我们可以新建一个Java类。叫做啊,我们主要是要进行这个UDF的测试,我们可以叫UDF。Test。后面可以加上,当前是function的测试。
03:02
整体的处理流程跟之前其实还是完全一样的,所以我们可以先把前面的这一部分都copy下来,我们整个这个呃环境的定义以及后边创建表的过程就可以省略了。包括这个密方法,我们可以直接copy过来。为了更加清楚一点,我们在下边直接env execute,整个这个架构先搭起来。因为我们可能涉及到最后得到的结果要转化成流做输出嘛,所以我们这里边还是把这个执行这步调用。啊,那首先我们这是创建出来了,接下来的第二步,那当然就是需要注册。自定义函数。这里边我们用的是标量函数。再往后的第三步。就应该是去调用。自定义的标量函数udf进行。
04:02
查询、转换。应该得到一个结果啊,那最后就可以。输出。转换。成流打印输出。整个的流程是这样啊,那接下来首先我们当然是要把。这个function类要先实现了,所以我们首先要自定义。实现。Scale function。所以public static我们定义这样的一个类啊,Class我们把它叫做my function。Extend。当前要扩展,要继承叫做function的这样一个抽象类。我们这里可以看到啊,点进来之后发现function它是继承自user function啊,那当然这就是udf的。
05:05
本身它最底层的啊,基础的那个类了,那这里边的skill function并没有泛型,它里边我们可以看到。里边所有的方法都只有,比方说get type influence啊,Get kind,只有跟类型相关的一些通用方法,并没有涉及到求职方法,所以说我们这里边并不能直接去override,而是要。自己手动去敲啊,就这里,尽管本身这个抽象类里边没有定义,但是呢,底层又要求我们必须实现这样一个公有的E有方法啊,所以这里边我们就只能这么去写了,它就叫做evil,那这里面evil有没有参数呢?啊,参数就是我们当前要去针对哪个字段进行转换,对应的那个属性字段要传进来啊,那这里的类型我们想要做一个这个哈希,求求取哈希的过程的话,那我们可以传进来一个字符串string。
06:04
然后得到的结果,诶,那当然就可以指定成它是一个int类型了,所以我们要返回一个哈希值嘛,直接这样就可以实现。啊,那里边其实也非常的简单,我们可以自己去做一些哈希转换,这里我们就不详细去实现这个哈希的算法了,我们干脆啊简单一点啊,直接去调用。底层Java里边有对应的哈希扣的方法,我们干脆直接返回就好,关键我们是要测试这个自定义的哈希方法,这个函数到底好用不好用啊,所以只要有了对应的实现,接下来我们看上边就要去做一个注册table。需要create temporary system function里面。我们可以把它定义成这个,可以大写一下啊,就叫MY,然后my has function点把对应的这个类型进去。然后接下来呢,就可以调用这个udf进行查询转换了,我们就把这个MYH这个函数,就像之前用过的count啊,或者说upper对应的这些函数。
07:10
可以当成是一样的去使用啊,那所以这里面我们可以直接。调用table的q query方法写一句。Select,比方说把这个当前我们有这个user,把user以及当前每一个user求一个。My hassh。User字段,求一个MYH啊。直接from当前的。Click table。直接把它出来。而这就是一个非常简单的一对一的转换,每一行数据应该都把它的user提取出对应的哈希,然后打印输出,那当然这里我们需要把这张表再定义出来。我们可以把它叫做result table。
08:03
很显然result table并没有任何的操作啊,都是来一个就转换一个,然后在后面追加就可以了嘛,所以我们可以直接将这张表做一个to data stream转换。接下来直接做一个大输出就可以了。好,那接下来我们可以来运行一下,看一看测试的结果怎么样。我们可以看到。当前所有的都是加I,都是追加操作,而且我们看到后面直接输出了当前每一个用户对应的哈希值,哈希code啊,那如果是相同的用户的话,很显然后边输出的哈希值就是一样的,而且对应我们也可以看到。每一个数据就对应着。一条输出,所以是一对一的转换关系,非常典型的一个标量函数的应用,这就是标量函数定义和使用的过程。
我来说两句