00:00
好,那接下来我们再来详细的给大家讲解一下link里边编程风格里边非常明显的一个特色,就是所谓的udf函数类啊,大家其实前面我们在实现的过程当中,大家看到了啊,往往我们定义一个算子啊,就调这样一个方法,Data stream API这个接口的时候,里边是不是传的其实都是一个。哎,都是一个一个什么什么function这样的一个实现这样一个接口啊,诶,所以这其实是flink编程的一个特色啊,它里边可以说flink是暴露了所有的算子里边都有对应的一个udf函数类的接口啊,那所以呃,它的这个实现方式其实就是一个接口或者一个抽象类啊,那我们之前看到的应该都是一个接口对吧?啊,关于抽象类后面我们再说啊,那所所谓的这个接口,那就是假如说我在data stream上调了一个map方法,那我里边其实就直接可以自定义一个一个类,一个udf函数类,对吧?我实现的是一个map function接口啊,那如果要是一个filter的话,我就实现一个filter function接口,对吧?呃,那另外还有一个特殊的,大家看是一个叫process function,这是我们后边要提到的。
01:17
所谓的三层API里边的那个最底层的那个process function API,对吧?啊,它其实也是一个udf函数类的这样的一个接口啊,但是它比较特殊一点,它不是接口,它是一个抽象类啊,所以这个我们放到后边再说,所以现在大家直观的来看的话,就是任何的一个操作,任何的一个方法里边是不是都可以传一个自定义实现的udf函数类啊,对吧?实现这样一个对应的接口就可以了,不光是这个基本转换啊,前面这个map filter Fla map可以传这个对应的map function filter function,呃,那后面这个reduce大家回忆一下是不是也可以传啊。Reduce是不是可以传一个reduce function啊,所以这个其实都是都是一样的啊,啊对于这个弗林可而言,几乎所有的API都可以里边的这个参数啊,都可以传一个udf函数类啊,那当然你像这个简单滚动聚合的话,这个特殊一点,你像这个sum,我们没有说里边要传一个sum function对吧?因为大家知道sum是不是相当于底层都已经给我们实现好它的机制了,你不需要再去指定,因为大家想你要传一个什么什么function。
02:27
这是不是相当于,呃,就是你要传一个具体的操作方法啊,那你说你像这个sum max命它的具体操作方法都已经确定了嘛,就是求和,就是求最大最小值,那这个就不用指定了,对吧?啊,所以它调用就会简单一点啊,所以其实这个自定义函数类的这种方式,这是flink的一个通用接口实现,对吧?啊,就是它不是具体的某一个特定的,呃,我要实现的功能,比方说求和,比方说最大最小,而是说反正我就放在这儿了,你想实现什么功能,你自己去实现这个接口就完事,对吧?啊,所以这其实也是flink data streamam API的一个特色啊,就是它是接口都放在这儿放好了。
03:11
它并不给我们直接实现特定的功能啊,那所以大家可以看一下这个例子啊,这个例子也非常的明确,我们看一眼就知道怎么回事,就是这里边我们是大家看有一个有一个推推,这个大家知道是推特对吧,比方说这里边这个具体的例子就是我们可以从这个,呃,比方说像这个一个网站啊,我可以去获取当前的哎,对应相关的一些推测信息,对吧?啊就是我们类似于我们国内大家取那个微博上面的一些信息,对吧,我就当成一个日志啊,这些信息就来了啊,然后直接灌到卡夫卡里面,然后接下来我读取数据是不是得类似于就是有这样的一个string字符串的一个数据流啊啊,我就把这个叫做TWS啊,那然后接下来我要干什么呢?我可能想要去是不是想要去从里面提取一些关键字啊,啊对吧,所以那接下来我就可以直接怎么样,是不是直接做一个filter就可以了,比方说我现在要去提取跟flink相关的一些推特推文,诶那你。
04:11
看接下来是不是就是直接一个filter里边去实现一个自定义的。Filter function对吧?啊,所以这个实现起来其实非常的简单啊,啊,那这里边具体来做的话,这这是不是就相当于它需要实现这个filter function接口里边我们要实现的是一个filter方法对吧?它是把当前的每一条数据作为参数传入,然后返回一个布尔类型的值对吧?所以这里边比方说我们要求你必须包含flink,那是不是我直接return value.contains flink就可以了啊,这样的话就是只要有flink的内容我全部筛选出来对吧?所以这个整体来讲是非常简单的啊。啊,那当然了,就是除了这种实现方式之外,之前我们也也写过了,这是在外边直接做了这个类的定义,然后我们这里边new它的一个对象对吧?哎,那有时候大家可能会想到我这是一个特定的功能,你单独把这个类声明出来好像也没什么用,哎,就是我这个类可能就在这儿用一次对吧?呃,直接就定义出来就完事了嘛,所以我们还可以实现成什么?对匿名类,之前我们做过对吧?呃,就是有你直接可以在这儿去声明一个自定义类的地方,都可以去实现成一个匿名类啊,所以就是直接new filter function string,对吧?大家看这直接就是这个接口啊,没有我们自定义的类的名称,然后你直接实现这个接口就完事了。
05:35
啊,这个实现和上面这个结果是完全一样的。然后除了这些方法之外,那大家想你你前面直接定义这样的一个这个,呃,自己单独定义这样一个类,这就很麻烦嘛,那它有什么好处呢。诶对它家它的好处是不是就是通用性更强啊,你是不是可以在不同的地方去调用啊,那有同学可能想不同的地方,难道我这个filter,这个flink我还要给它filter两次吗?那肯定我filter一次就完事了吗?啊当然首先是你这个类定义出来之后,我在不同的代码里边,不同的类里边可以去调,对吧?呃,这是首先一个这个呃代码的封装啊,封装特性,另外还有一个就是我既然可以把它就是实现这样的一个单独的类,我是不是可以它里边有自己特定的这个属性啊,所以我接下来大家看我现在这个flink,这是作为这个hard code啊,直接把这个关键词是不是写死在里边的,那我现在是不是可以把这个flink当成一个,就是我当前构造方法的一个参数传进去啊,啊,所以你看这就具体在工作当中啊,项目实际。
06:41
创建的时候,这个方式啊,实现的方式就多种多样,啥都可以了,你比方说我现在啊,我先这个把这个推文拿到对吧,退拿到,然后接下来呢,我当前的这个类,我直接定义成一个叫做keyword filter,这是一个filter function对吧,我我里边要把这个flink作为一个关键词keyword直接传进去,所以我里边是不是应该有一个呃,这个keyword这样一个属性啊,然后构造方法,这里边是不是应该把当前传进来的参数付给这个属性,然后后边我实现必须实现的这个filter方法,是不是直接用这个keyword就可以了。
07:18
对吧,诶这个就不需要我再去单独做这样的一个操作了,对吧?啊,所以整体来讲的话,这个还是比较简单的啊啊大家可以看到这个各种各样的实现方式,呃,这是关于这个自定义函数类的这种这种做法啊呃,然后接下来还有另外一种写法。我们在代码里边,其实大家也发现了,就是随便的任何一个接口,一个算子啊,我们在定义的时候,里边的参数可以传一个自定义的函数类,另外还有一种传参的方式,可以传什么。诶,之前其实大家看到就是关于我们这里边的所有的这个函数类啊。呃,就比方说下边。
08:03
哦,我们这里边这是这是给了一个key select是吧,我们这里是这个max啊,我们看那个reduce reduce这里面我们不是实现了一个reduce function吗。大家看这个reduce function,这是一种实现方式对吧?那另外我们知道它其实是继承自这个function接口对吧?那么function接口接口它里边其实是空的,然后只实现了一个这个s liable对吧?只继承了这个several Liz,所以它其实在JAVA8里边是不是给我们相当于预留了可以用这个拉姆达表达式匿名函数来实现的这样一种方式啊,对吧?啊,所以当时我们在做这个reduce function的时候,大家看第二种写法是不是就是直接传一个对拉姆达表达式就可以了呀?啊,所以我们后面讲的这个就是匿名函数的写法,你像前面我们这个filter function的这个写法可以直接简写成什么?大家看前面那么一大堆,我如果写成这个拉姆DA表达式是不是非常简洁,哎,直接当前这个数据来了之后,是不是直接contains flink就可以了啊,当然你如果用这种方法的话,那是不是就不能把它作为一个keyword直接传参传进来了,对吧?你这个就必须写死了啊,但是这个语义其实也非常的明确,所以在有一些场景下,我们用这个匿名函数拉姆达表达式,明显代码会更简洁,对吧,你就不需要实现那么一大段了啊,还自定义那个函数类,这个看大家这个具体的需求啊,那我们前面也提到了,就是说这个拉表达是在有些场景下有些麻烦。
09:35
比方说我们做那个map Fla map的时候,大家还记得就是做word count的时候,如果你直接想把它包装成一个二元组类型,但是这个元组类型这又不是这个Java底层直接就支持的,这是flink单独包装出的这样一个对象,对吧?啊,所以。它里边其实有泛行的啊,就是TEMP2啊,里边是有具体每一个字段类型,它是有泛行的,所以在这种场景下,你如果直接写拉姆达表达式的话,弗Li弗link就会做这个泛型,就是泛型擦除,它就解析不出里边我们具体原原组里边每一个字段是什么类型了,对吧?啊所以在这种场景下,你后边还得单独再去专门指定它的那个对应的类型,这个就比较麻烦啊,那大家如果要是觉得那个不太好做的话,那是不是直接在那种场景下,是不是直接实现这个啊,UGF函数类就可以了啊,这种场景是肯定没问题的啊啊这就是关于我们这个基本的UGF函数类和这个匿名函数的使用,那接下来我们重点是要给大家说一下,Flink里边还有另外一个很有特色的东西,叫做。
10:42
负函数啊,就是reach functions,这又是个什么东西呢?从字面上理解的话,负函数reach吗?它是一个富有富裕的版本,那它是针对什么比较,呃,比较富有呢?就是我们前面所说的啊,就是本身已经给我们提供的这种基本的udf函数类,对吧?那所以这个udf函数类,每一个udf函数类啊,在电stream API里边都有一个对应的rich版本,那它到底这个负是附在哪里了呢?
11:19
它可以获取到我们平常的这些函数类,这些接口里边获取不到的一些东西,比如说运行时上下文。呃,就是有一个contact对吧,呃,就是runtime context,然后在这个运行上下文里边就能干很多比较复杂的事情了,比如说在运行上下文里边可以获取当前。当前的状态,这样的话我就可以,呃,大家想我之前的这个所有的算子里边有没有状态呢。哎,其实这个大家看这个有没有状态,好像我自己控制不了对吧,你比方说像map flat map这个里面有没有状态。
12:00
好像没有对吧,那来一个处理一个嘛,来一个处理一个,他并不依赖任何之前的数据和之前的所谓的这个状态处理结果,哎,所以有些好像就没有状态,那有些呢,你像reduce,像some,它是不是又有状态啊,但是这个状态你像reduce,我们是可以这个呃,单独指定这个状态到底怎么去聚合,但是你想这个状态我们可以单独定义吗?也不能定义,对吧,你在调reduce的时候,是不是这个状态是什么类型就已经放在那儿了呀,对吧,它就只能是对应的那个类型,你像那个sum,它就只能是一个我们求和之后的那个结果,对吧,那就这个类型其实我们都已经完全定死了,那如果说我们获取了运行是上下文的话。可以做的事情就更多了,我可以在里边相当于自定义一些状态,然后去做一些更加复杂的对于状态的操作和处理啊,这个我们在后边讲到状态编程的时候再给大家详细展开啊,大家先有这样一个概念,呃,然后另外这个运营上下文里面还能获取到其他的一些信息,比方说我可以获取到诶,当前正在运行的,我当前的这个就是相当于分区的编号,就是我当前这个并行子任务的那个编号,对吧?到底是第几个线程啊,在在运行着的,就像我们最后print打印出来的前面那个小标一样,这个是可以获取到的啊。另外在这个运行上下文里边还可以就是调用其他的一些生命周期啊,就不是运行上下文啊,是我们当前副函数里边可以调用其他一些生命周期方法。
13:38
那所谓的生命周期方法是什么呢?那就是大家看就是open close对吧,类似于这样的一些方法,那open是干什么呀?但是open是开始打开嘛,对,所以它其实就是一个初始化的过程。它的特点就在于是当前大家想我当前不是一个函数类嘛,那函数类我具体在在生成我我当前这个函数类不是定义在这个处理的流程里边,是一个dag里边的一环嘛,对吧,中间的一个任务嘛,那我生成对应的那个任务的时候,有那个执行图了,然后我最后task manager要执行的时候,是不是应该创建一个对应的这个类类的对象啊,那所以这个类的对象创建的时候要干什么基本的初始化操作呢?这些我们都可以放在open里边去定义。
14:28
啊,所以这这个里边就往往都是做一些初始化操作的,对吧,然后另外还有就是close close大家知道就是最后收尾对吧,这个类要去销毁的时候,关闭的时候,我去做一些收尾的工作,一般是做清理啊,那那所以接下来给大家看一看这个具体的一个实现啊,我们在代码里面把这个简单的还是给大家做一下。下边去new一个class,现在是。TEST5接下来是做一个这个rich function的一个测试啊。
15:07
啊,那前面的话这个过程其实都大同小异,这个我们就不详细说了,对吧,我直接copy一下这个reduce这里吧。前面呃,创建环境,然后从文件读取数据,呃后边把它map转换成一个sensor reading,我们想要的pole类型啊,类似于抓va病这样一个类型,然后最后不要忘记还有那个execute执行起来对吧?呃,然后中间我们要去做这个处理的时候,我就可以直接做一个,比方说我定义一个data streamam啊呃,这这里面就随便了啊,比方说我想得到一个,呃,你像之前我们不是想要把它转换成一个这个二元组输出嘛,对吧?哎,那所以这里边比方说我直接定义一个这个二元组temple,然后里边比方说我要这个当前的这个34ID,另外还需要呃,我我随便来一个对应的那个就是具具体的一个信息吧,比方说我从运行上下文里边获取到当前它的那个就是。
16:11
之前我们不是看那个多线程运行的时候,可以看到它的那个前面的一个小标嘛,对吧,我看到它当前到底是在哪个分区,哪个并行子任务上去去执行的,那所以这里边我可以来一个英体制啊,获取一下当前那个并行子任务的编号,哎,这是完全可以的,对吧?啊,我我定义的这个有点奇怪啊,只是给大家做一个测试而已,Result stream定义一个这个,那我可以基于前面的this stream。大家看我直接做一个map转换就行,对不对,诶那这里面你有一个比方说这个我叫my map对吧。My map。后边做一个这个result stream的一个打印输出,这是整体的处理流程啊,那前面我们都已经讲过了,假如是一个简单的实现的话,那是不是public static class,我这里边可以把这个my map实现一个,诶,实现一个这个map function是不是就可以了,这大家都知道对吧?诶,那所以它的输入数据应该是一个sensor reading对吧?呃,然后输出是一个TEMPLE2对吧?二元组类型这里边我指定了,里边的泛型是对string和inte,然后里边必须要实现的是一个map方法对吧?然后大家会发现这里边map方法我是不是就呃,然后你除了这个map方法,你看它别的能实现的方法是不是就只有那个Java object里边那些方法了,对吧?啊哈希code,什么ES to string啊,只有那些了,所以他没有别的生命周期,那这里边我能做的事情是不是就只能拿到这样一个当前数据啊,然后另外。
17:51
大家要注意这个map方法是什么时候调用的。是不是就是每一个数据来了之后都会调用这样一个方法呀,对吧?呃,所以接下来这个integer,那大家会想到我这里面就只能拿到这个这个value,那其实我也干不了什么事,对吧?哎,所以这里边比方说我要的这个它的ID,另外这个in体,那那也就只能自己去定义了,对吧,比方说我我要的是当前这个ID的这个Les对吧,我直接把这个输出,这是可以的,你从当前的这个数据里边给是可以的,或者你自己不依赖数据,自己单独去,呃,自己去生成也是可以的啊,但是你就拿不到当前运行上下文里面的东西,对吧?啊,所以我把这个比方说我把这个my map叫做my map0,对吧,这是这个自定义的map方式啊,那这里边给大家看一下,实现自定义的复函数类啊。
18:47
对比public static class这个my map。诶,那家看,呃,大家可能会想到,那接下来这个是不是应该叫reach map function啊,就是这样一个东西,然后大家看它同样也是输入输出,那跟我们刚才的这个定义是一模一样的,对吧,我把这个直接copy过来。
19:08
同样输出得到一个啊,这样一个二元组这样类型啊,然后大家看必须要实现的也是一个map方法对吧?那但是大家看下面上面这个还在还在报错,呃,这里大家要注意啊,Reach m function,它现在其实已经不是一个接口了。因为啊,大家看对,它是一个抽象类对吧?啊,就是它是一个瑞士方嘛,它是继承自一个抽象类,抽象reach方,抽象负函数这样的一个结,这样一个复函数啊呃,这样一个抽象类,然后呢,这个抽象负函数,这个抽象类它继承自rich function,实现了rich function接口,那同同时呢,这个rich map function还实现了map function接口,对吧?所以大家看它就是两两两个方向嘛,一个是实现ma function这个接口,另外是不是实现这个rich function接口啊啊所以接下来它本身是一个抽象类,那这里边就不能implement,而是需要用extend对吧?啊,所以是这样去做啊,那这里边你你看这里边如果我要去new这个二元组的时候,同样还是拿到当前它的ID,另外还可以拿什么呢?哎,现在我们这个就比较丰富一点啊,我大家看。
20:26
还可以get runtime context,获取运行时上下文,然后接下来,诶,大家看我可以get到很多东西对不对,我可以获取到这个sub task的很多信息,比方说我可以获取它的index of this sub task task,那这就当前这个子任务的一个序号,对不对,那这就是执行的那个对应的那个那个编号,对对吧?啊,就是我们那个并行输出的时候啊,你看到打印出来的前面那个小数小的数字,这就是关于它的这个,呃,调用的过程当中啊,可以获取运行上下文的这些信息。
21:04
啊,那另外关于这个运行上下文啊,也可以给大家多写一点,大家看它还有什么什么方法呢?我可以get list state,你看可以get aggregating state可以,当然还有get直接get state对吧,可以获取各种各样的状态。这就是我们后面说的这个所谓的这个状态编程里边啊,可以在这儿自定义状态,然后去获取状态做处理。啊,当然现在我们还不详细讲啊,就是能做的事情很多,另外还有一些生命周期方法,大家看现在的这个生命周期方法,是不是就有这个rich function里边的一些接口,呃,对应的那些方法了,对吧?所以大家看有open,有close,我们这里边包括那个get runtime context是不是也是这里边的呀,对吧,也都是reach function里边的啊,所以这里边open,那这里边是不是一般情况就是做一些,呃,就是大家会想到就是初始化对吧,初始化。
22:05
初始化工作一般一般用来干什么呢?啊,一般其其实是给呃,一般是定义状态对吧,或者或者干什么呢。啊,或者大家想,如果我要跟外部一些数据库做连接的话,是不是可以在这儿统一先跟数据库连接一次啊,这样是不是要避免你像我们前面这里,你是不是只能在map这里边做一个连接啊,那你map里边做连接的话,是不是来一个数据要连接一次,来一个就连接一次,这个就效率上很很低下对吧?所以我们这里边是不是就可以在一开始创建这个类的对象的时候,直接连接一次数据库就可以了啊,只连一次对吧?后面数据来了之后,你就来一个直接掉,这就完事了啊,就不需要重复建连接了啊,或者建立数据库。连接。啊,这是一般一般的一些常规操作啊,啊,那所以这里边我们就不详细写了,给大家打印一条数据就可以,然后另外还有就是close对吧,Close这里边一般是呃,就是一般是。
23:16
对,关闭连接和清空状态。对吧,清理状态的操收尾操作。啊,那这里边我们同样还是给大家打一条这个叫close,那现在我们可以测试一下啊,大家可以看一下当前这个输出的是什么啊,为了要完整一点,我们把这个并行度还是调大一点。大家可以看一下呃,首先是就是这里边我们输出的时候应该是掉一次,掉一次map应该就会有一次输出,对不对啊,就是后面这啊调一次map方法,这里面就应该有一个输出,然后另外这个close应该输出几次呢?大家想一下close输出几次啊。
24:06
好,大家注意一下close,要大家看open close是不是输出各输出了四次啊,为什么是四次?对,因为大家注意这里我是不是就是就是对有四个分区对吧?四个分区的话,那是不是应该每一个分区对应都有一个,诶,都有一个这个类的实例啊,所以是不是这个实例创建的时候都要open一次,然后最后关闭的时候都要close一次啊,诶所以大家看接下来我的这个数据是分到了12344个不同的分区里边,每来一个数据都会有对应的一次输出,对吧?Map都会有一次转换,然后这里边你看它的这个输出的这个编号是0123对不对,它是从从零开始的,不是从一开始的,对吧?我们前面这里是1234啊,而这里边这个s task index,这是0123,呃,然后这里边这个close和open的话,这是各输出一次对吧,每个分区输出一次啊,这就是当。
25:07
目前我们这个负函数里边生命周期的一些用法。
我来说两句