00:01
到这里为止,我们已经了解了link当中转换计算的基本转换和聚合转换,那这两种方式我们会发现啊,它们其实是有共同的特点的,首先我们说它都属于data stream API调用的时候呢,在代码当中都是我们先得到一个data stream,然后接下来去调用一个相应的方法就可以了,比方说诶,点map或者。KBY之后点reduce,或者说点Fla map.filter啊,这些都是对应的一个方法调用,对于这些方法调用里边传入的参数其实也是有共同的特点的,比如说像前面我们说的啊,点map这里真正要传的就是一个map function式,那filter的话就是一个filter方式,后面我们讲到的reduce的话就是一个reduce方式,甚至我们还可以扩展到之前讲到的原算子啊,Source这里。要传入的其实就是一个S方式啊,这是之前我们也已经说过,所以这里我们会发现啊,它的命名规则其实就是当前的某一个转换操作加上一个方式表示我们当前要实现的一个具体的接口,而对应的这个接口呢,其实他们有一个共同特点,我们看到里边它都只有一个单一的抽象方法。
01:24
当前的这个reduce function,它为什么叫做function呢?我们看到后边它其实是继承自function这样一个接口,那这个接口本身就是空的,这就是我们所说的函数接口。它之所以是空的,主要就是为了方便我们扩展成为所谓的单一抽象方法的接口,就像这里的map function或者reduce function一样,里边只有一个唯一的抽象方法啊。其实我们知道啊,对于SC这样的语言,或者像Java java8之后也有了类似的一些特性,那就是针对单一抽象方法接口,或者说我们有一个简称叫做Sam。
02:04
Sam接口,那么就可以利用一个拉姆达表达式。拉姆达表达式来实现对应的功能啊,那这样的话,我们在具体代码里边就会特别特别的简单啊,因为标准的这种定义方式呢,我们真正的写法,那就应该是要自己去实现这样的接口,把对应的类定义出来,然后里边具体去实现相应的抽象方法。这种方法比较符合我们在Java代码当中的一般的习惯,那但是呢,在我们整个定义flink的转换计算流程的过程当中,就显得比较繁琐了啊,所以我们更希望的就是说在这个处理流程里边,直接一目了然的把我们每一步要做的转换计算展示出来啊,那这个时候使用拉表达式就是一个很好的方法。所以我们看到啊,在flink的代码里边,使用scla进行编程的话,主要就可以有这样的两种方式去实现我们中间的转换计算,一种就是实现对应的接口,诶,我们用一个类来进行实现,另外一种就是直接传入一个拉姆达表达式,哎,所以这两种方法其实我们看到在本质上它都是一样的,最终都是实现了我们用户自定义的一个类,然后实现了相应的方式接口啊,所以有时候呢,我们就把这种方式叫做flink当中的函数类啊,或者说我们把这个叫做用户自定义函数udf。
03:39
不管是我们传入了安表达式,还是说我们真正意义上的实现了接口,其实最终底层都是使用一个函数类作为参数,传给了我们当前的转换算法。那接下来呢,我们可以对这几种不同的编程方法再做一个梳理和总结啊,那就是对于这个所谓的udf的实现啊,其实可以分成这样的几大类啊,前面我们说了主要就是两大类,一类就是老老实实的把对应的这个接口实现,另外一种方式呢,就是直接传入一个拉姆达表达式,那此外还有另外一种方式,我们知道啊,在这里实现接口的时候,我们也可以不把这个类单独定义出来,而使用匿名类的方式来实现接口。
04:23
直接传入一个匿名类,这种方式也是可行的,所以接下来我们可以在代码当中来测试一下udf到底有哪些具体的使用方式。我们还是去new一个SC的object。当前我们测试的是。主要还是针对当前的转换是udf test。这里我们需要去强调的一点就是主要这里我们讲到的udf函数类都是针对转换计算的,因为我们知道转换计算往往我们要指定的就是一个操作嘛,这个操作用一个函数是最方便去表达的,而像之前我们所说的这个SS任务呢?哎,那SS任务这里边我们要传的这个s function其实是有所不同的,S function里边它并不是单一抽象方法,就像我们前面实现的一样,里边核心一个wrong方法,一个cancel方法,那显然它就不能简写成一个long表达式了。所以我们注意啊,只有。
05:21
单一抽象方法接口,它的这种实现可以用拉姆达表达式来进行实现,因为我们知道你这有两个抽象方法的话,那拉姆达表达式难道传入两个吗?我们当前的参数只有一个呀,所以这种方式我们就没有办法去简写了。所以其实拉姆达表达式我们也可以看成是scla或者是Java吧,里边给我们提供的一种语法糖啊,它其实实现的底层就是单一抽象方法接口,那接下来呢,我们就还是做一个测试类方法,那前面的部分呢,我们还是做一个照抄,直接把前面我们实现的过程做一个引入。把reduce这里前面引入数据和获取执行环境的部分全部copy过来。
06:05
那上面同样,我们还是下划线引入后边进行影视转换。好,那接下来我们就要去测试。函数类自定义函数类udf的用法。那首先这里我们测试最基本的经典的这种用法,那就是所谓的实现一个自定义的函数类,我们这里就用最简单的吧,用之前我们所说的那个filter方式来做一个举例吧,我们想要测试的功能是。去筛选。筛选URL中。包含某个关键字,比方说我们筛选这个后。的event事件。诶,所以这里边我们的逻辑其实就是判断每一个数据来了之后,它的URL里边是否含有后关键字啊,那这里的第一种方法的实现其实非常简单,我们这里直接stream.filter然后里边哎,那当然了,我们可以直接去你一个自定义的函数类,那我们就定义一个MY。
07:13
Filter function。那下面我们就是需要去实现了。实现自定义的。Filter function。Class my function那边我们实现的是function。当然了,对应的泛型是event。里边有一个唯一的抽象方法,就叫做filter啊,那所以这里面我们所要去提取的当然就是URL里边是否包含了,那么我们调一个contains方法。后。诶,这就是我们具体的一个实现得到之后,我们可以直接做一个打印。这个是一。
08:00
因为最后要执行起来,我们可以直接测试一下,看看能否得到对应的数据。当然我们看到了这里得到的数据就只有第一条Mary的第一秒钟的点击事件,它是点击了后这个页面,那这里面如果说我们想要去提取的是比方说带有prod商品的详情页的信息的话,哎,那当然了,在这里边我们直接运行一下就会发现得到的,那就是后边Mary的三条点击事件啊。哦,这里我们有一个小的拼写错误,我们可以改,改正过来。My function。那除了这种方式之外,当然我们还可以用第二种方式。那就是使用匿名类。来实现filter function式接口啊,那这种方式其实我们知道在Java里边也是通用的啊,那直接filter里边要实现filter function这样的接口,我们可以不定义出声明出对应的自定义的类来,我们直接拗一个filter function就可以了,好了,里边当然也是要实现对应的filter方法,这里边的逻辑呢。
09:10
完全一样,那就是。t.URL contains pro来只要做这样的一个实现就可以了,那接下来我们同样可以把它做一个打印,这个是二我们运行的话会会发现得到的结果。跟前面的一输出应该是完全一样的,都可以把当前商品的详情的访问事件全部提取出来。啊,那这两种方式如果要比较的话,我们会发现啊,使用匿名类的好处就在于我们省去了对于自定义类的声明,那反过来,如果说我们使用的是自定义的类,它有额外的好处就是我们可以给它再去添加一些属性,比如说我们在这里像要去判断的某个关键字,这个proudd,或者说后,哎,我们这里边是写死在里边的,那假如说我们每次要更改关键字的时候呢,显然呢,就要实现不同的类了啊,或者说我们在这里就要修改底层代码了。
10:09
如果说我们想要通用性更好,完全可以把这个关键字作为一个属性提取出来啊,那所以我们前面就可以怎么做呢,比方说在这里边做一个传入。传入一个pro,那么现在我们在my future function对应的定义里边,我们就应该要加上它的一个属性参数了,比方说我们就叫做keyword。String类型啊,那传一个pro进来,这就是我们要去提取的关键字,那这里边做筛选的时候把keyword传进来就可以了,好,这里我们可以直接运行一下,看看得到的结果是否还是一样。完全没有问题,跟我们之前的效果还是一样啊,所以这样的话就是呃,各有各的方便,使用匿名类可能省去了定义类的麻烦,那如果我们自己把这个类声明出来的话,可能就会有很多扩展的功能可以去实现。
11:06
当然了,我们会发现啊,即使是使用匿名类,这种方式也不是足够简洁的啊,因为我们还有另外一种更加方便的写法,那就是第三种。使用匿名函数拉姆达表达式。那这种写法的话,我们应该现在也非常的熟悉了,Filter里边可以直接定义好按照什么样的条件去筛选当前的数据,那这个逻辑当然跟之前还是完全一样的,只要这样写好就行。当然了,这里还有简写形式,我们用下划线代替只出现一次的参数,这样的话就是当前每一个数据按照它的URL是否包含pro这样的关键字来进行一个提取,非常的简单。啊,所以我们会发现在实际应用的过程当中啊,往往我们使用的方式呢,是拉姆达表达式,或者自定义一个函数类啊,那中间匿名类的方式可能会使用的比较少一点,而且其实我们发现啊,拉姆达表达式直接这么一写的话。
12:08
从代码的角度来看,跟我们这里边作为属性传递进去一个关键字没有什么区别啊,所以其实这个拉姆表达式应该是之后我们在编写flink代码过程当中用的最多的一种方式啊,这就是函数类的用法。
我来说两句