00:00
来,我们再来给大家总结一下flink的编程风格,那就是udf啊,这种函数类的实现方式,其实前面我们在代码过程当中也已经感受到了啊,像我们之前一开始上来之后给大家讲了一个自定义的S,大家还记得吧,当时这个自定义S的时候,我们是不是就ADD source,然后里边传入的这个东西呢?就实现了一个s function接口,哎,那这里边的这个s function,这是这是什么东西呢?这其实就是所谓的U,就是我们这里边对于每一步操作里边本身要传的都是一个函数,对吧?嗯,那这种所有的接口呢,它其实都有两种实现方式,你要不就是直接传一个UTF函数,我们看到里边这里边呃,点进去之后有一种实现方式,就是直接写拉姆达表达式,对吧,这里边这个呃,参数你直接传一个函数就行,或者你自己自己定义。
01:00
一个函数传进来就完事了,那或者另外一种写法,另外一种方式就是flink帮我们包装成了函数类对吧?那这里边就是它本身有这样的一个s function接口,我们现在要做的事情就是你实现这个接口,自己写一个类出来,实现它传进来就完事了,那我们知道在它的底层,其实跟这里边你传一个function进来其实是一样的,要做的这个东西它只是给我们包装了一下,更加方便我们去做,呃,就是呃,序列化对吧,反序列化,然后更方便去做这些任务的解析代码的可读性也会更强,所以这是flink。代码风格的一个重要的特点,一个特色啊,那至于这个udf函数到底有哪些呢?其实就是说几乎所有的所有的操作,所有的算子都有它对应的那个utf函数类的接口,对吧?比方说map map对应的里边我们可以传一个函数进去,那或者呢,就实现一个map function式,那filter filter就实现一个filter方式,对吧?啊,那或者后面还有我们讲到的,你如果要是最底层的操作的话,那就是一个process function对吧?我们说那个最底层的API叫做process function API嘛,啊,其实它它是怎么怎么去用的呢?其实就是有一个有一个data three API的这个算子啊,一个调用方法叫做点process啊,这个点process你想它就是它就是处理嘛,好像也没说要干啥是吧?啊,所以它就是最底层,最一般化,你里边所有的东西都自己去定义就可以了啊,就是理论上这个process function,你可以实现一个map的功能。
02:41
可以实现一个filter的功能,对吧,或者可以实现一个reduce,呃,其他的一些聚合的功能,这些都可以实现,所有的一切你自己来定。这是就是函数类的一个定义,那当然这里边我们也可以给大家简单的在这里再实现一个啊,我们就不给大家写那个比较复杂的东西了,我们自定一个啊,自定义一个呃,函数类啊,比方说我们在这里边就给大家定义一个呃,一个这个filter吧,对吧。
03:16
比方说这个叫my filter,如果要定义一个自己想要去在在这个filter里边去调用,那肯定是得是一个filter function了,所以它实现的这个接口就是filter function,大家看到又是一个Java的interface对吧?啊,或者还有另外一种方式是什么呢?叫做reach filter方式啊,这个后面我们再说什么叫rich呢?其实前面我们也见过啊,啊,那这里边它也有泛型,也就是要处理的这个当前data stream的数据类型是什么,这里就写什么对吧?比方说我还是要这个s reading,然后大家看如果要想实现这个接口,必须重写一个方法,叫做filter filter啊,那家想它这个filter就是传入sensor reading数据,返回一个布尔类型,这跟我们在filter里边传一个那个拉姆达表达式对吧?或者自己定义一个函数,效果不是完全一样吗?啊,所以这个其实就是非常简单的这种操作啊,那比方说这里边我们直接定义当前的value,我要以什么作为filter标准呢?
04:16
比方说我ID啊ID。然后我要求他,比方说start with sensor啊,比方说SENOR1啊,那大家其实能想到,如果我要求他是这个三色一的话,后面的这个效果应该是什么呢?哎,那是不是这里边就是。就是341是可以的,然后三四十应该也可以,对不对,应该只是按照这个字符去过过滤嘛,字符串去过滤,所以说这个拿出来的就是这样一个效果,那里边如果你要是调用它的话,怎么去调用这个,呃,My filter呢?我们这个调用过程也非常简单,就前面这里边不是定义了这个有这个data stream吗?对吧?假如说你想要做一次过滤的话啊,不是KBY啊,那我就直接filter里边可以去传一个filter function,所以我直接new一个my filter这样就可以了,对吧?调用方式非常非常简单。
05:13
好,这个我就不给大家测了啊,就给大家就可以了。啊,那当然了,在这个文档里边,大家还看到这里边你可以做另外的一些尝试,可以怎么样呢?就是还可以把这个实现成一个匿名类啊,就是我这里边直接点filter的时候,比方说这里边我直接去new一个对吧?啊,大家知道这个你直接new,然后后边跟上这个,呃,一个接口,一个interface,或者是一个呃,一个treatit啊,或者是一个抽象类的时候,那其实并不是直接new它的一个对象,而是要对这个。呃,对这个抽象类或者接口进行一个实现,然后把这个匿名类的对象,然后啊传传给这里边的这个filter作为参数,对吧?啊,所以大家看这里边你用匿名类的方式也是完全可以的啊,这其实就是SC基本语法嘛,大家知道这个所有可以用的方式都是可以可以拿来做的,然后这个你定义这个函数类还有一个什么好处呢?啊,大家看到就是我这里面filter啊,比方说我这里面filter是什么,Filter什么呢?我要去把这个value里边包含了contain flink的这些当前的这些数据都把它。
06:26
过滤出来对吧,只处理这个有有flink的这些数据,那我们在ETL的时候经常会有这样的需求,那这里边你像这个flink呢,这是写死在代码里了,那有时候我们这个应该是可以配置的,对吧?而你当前这个类它可以我们可以就是泛泛的把它定义出来,然后有时候呢,还会复用,你传不同的这个字符串进去,然后做一个过滤,这是完全有可能的,那这种场景怎么办呢?啊,其实非常简单,因为它是类嘛,你这里面定义的是类,所以说我们可以把它当成类的构造器里边的参数给它传进去,对吧?哎,你比方说这里边我定义一个类啊,这个什么keyword filter,对吧,然后传一个keyword进去,然后接下来我们在new这个类的时候呢,把flink当成参数构造器参数传进来,这不就完了吗?对吧,你在里边实现filter方法的时候,直接用我构造器方法参数里边的这个keyword,然后去做过滤就完事了啊,所以这个其实整体来讲还是。
07:26
呃,比较简单的一种实现啊,呃,那大家会看到这个在弗林。调用这个方法的时候啊,每一步操作它的这个编程风格两种方式,要不就是上面的这个函数类,要不就是你传一个函数对吧?啊,那这个函数一般情况我们也不会单独去定义,那就是一般直接写一个匿名函数就好了,比方说你像这个前面这种大家看啊,这种简单需求,你其实没有必要写这么多,你可以怎么样呢?是不是直接用一个匿名函数啊,大家看这个下划线就表示我们那个匿名函数的缩写简写嘛,对吧,如果那个参数只出现一次的话,我们用下划线表示,然后直接点contains flink就完事了吧。
08:09
这就表示当前我们是要以它是否包含flink字段作为一个过滤筛选的条件啊,这个这种方式大家可能会更熟悉一点,但是我们会发现,而且写起来更简单是吧,这个更更直白,更简洁,但是对于一些复杂的需求,可能你就不能简简单单这么写了,对吧?啊,你如果要是说复杂需求,你写成了就是类似于这样的一个,呃,这这里我们是匿名类啊,你如果写成匿名函数,那展开可能也会这么复杂。那可能我们就需要把它包成一个自定义的函数类,看起来会更加的直观一些。所以后边啊,就包括这个flink源码里面,大家会看到更多的实现风格是用的函数类这种风格。那另外还需要给大家说一句的就是flink里边呢,不仅仅有这个函数类,另外还有一个什么呢?刚才我们在给大家写那个filter function的时候,大家也看到了啊,除了filter function,另外还有一个rich filter function,哎,那什么叫rich filter function呢?Rich我们说是富有富裕的意思,对吧?所以reach就是,呃,Reach方式所说的就是所谓的负函数,哎,就是这里边它是一个富有的函数,富余的函数,那它又是个什么呢?啊,它其实也是一个函数类的这样的一套接口,在这个,呃,我们这个弗link盖LA的底层实现里边的这个负函数,啊,这里边的这个负函数一般情况都是一个,都是一个rich class,呃,都是一个abstract class,就都是一个抽象类。
09:48
而对应的这个filter方式呢,啊,大部分这个就都是一个Java的interface啊,所以说这里边的实现可能这个略有不同,大家调用的时候稍微注意一下,引包要引对就可以了。
10:01
然后这里面几乎所有的。函数类这个所有的这个function类啊,都有它的rich版本,也就是说map function,那就有rich map function filter function就有rich filter function对吧?啊那那同样呃,就是所有的这个都有对应的实现,那我们说它是一个负函数富有,它到底负有在哪里了呢?啊,它最大的特点啊,这里给大家可以定义一下,比方说这一下我们定义一个,呃,My rich map啊,定义一个富有的这个map类啊,Map map function,所以这里边我要去实现的就是一个,呃,Rich map function,对吧?啊,大家看到这里边它需要map嘛,它需要的这个类型,这里边大家看到这是一个抽象类,对吧?然后这里边大家看到它的类型有两个,一个是in,一个是out,也就是map。可以做数据类型的转换嘛,所以他要的就是你输入什么类型,然后输出什么类型,都给我定义好,所以最后你要实现的一个map方法,大家看这不就是来一个输入硬类型的value,输出一个out类型的数据嘛,啊所以非常简单啊,本身啊,所以这里边我们的输入,比方说我要一个sensor reading对吧?啊,那输出啊,这个随便吧,比方说我输出直接就给一个这个这个string。
11:23
这完全没没毛病对吧?啊,这个是完全可以的,那具体的实现当然就是一个map函数啊,那么这个map map方法啊,Map函数它里边做的操作就是把这个sensor reading里边你想要提取出来的字段转成string输出就完事了啊,那比方说这里边我们想想输出的就是拿它的那个ID对吧,大家知道这里边有这个ID啊,那比方说我再加上一个,比方说加上一个这个呃,三四几啊这样的一个temperature传感器,对吧?啊,我就加一个这样的字段把它返回,这完全是可以的。
12:02
那大家想一下当前的这个rich,它rich在哪了呢?这看起来跟这个之前我们这个一般的这种函数类没什么区别啊。如果说大家在这里啊,你看一下它现在所有可以去重写的方法的话,你就会发现它比简单的这个函数类要多了很多啊,这里边同时我给大家把那个为了做对比啊。把这个一般的这个也定义出来,这里边我们不要叫rich map,叫my map map对吧?然后这里边我们定义一个这个map方式,然后大家知道这里边是一个Java的interface对吧,这是一个本身是一个接口,然后里边同样必须要实现的也是一个map对吧?哎,那大家可能想你这两个这有什么区别呢?哦,如果要是你就这么写的话,那是没区别,它俩一模一样对吧?里边的实践不就是要调这个map方法吗?呃,一模一样,那这里边它的区别在于,如果我这里边你看一下它的重写方法,它是不是就只能重写。
13:11
抓va的那个object底层的那些哈西,Code呀,To string啊对吧,Clone啊,Finalize啊,只能重写这些方法对吧?哎,这是非常一般化的,一般情况我们也不会去重写,所以一般的这个。这里边我们这个函数类啊,就是简单的这个函数类,相当于就跟你写一个匿名函数,直接做这个操作是一模一样的啊,只是把它包起来,你可以给它传参对吧,可以传一些这个,呃,构造器的时候传一些参数而已,那这里边如果要是这个负函数的话,它的特点就是。负函数啊,可以获取到运行时上下文,它附在这儿对吧,另外还有一些还有一些生命周期方法,哎,那这生命周期方法又是什么呢?哎,这个给大家稍微看一下,大家就知道了,它这里边因为我们本身它是一个reach function,它是个抽象类嘛,然后它又继承自什么呢?继承自一个叫做abstract reach function这样一个抽象类,那这个类里边它有什么方法呢?大家看到这里边就有这个了,大家看它可以get wrongtime context。
14:28
也可以set runtime context,可以对当前的运行是上下文去做读写操作,可以获取运行上下文,还可以去对它进行设置set啊,所以这个能做的事情就更多了,对吧,你有运行上下文嘛,然后还可以有什么方法呢?还可以有open和close啊,所以这两个open和close,这个就叫做生命周期方法。那一般情况下我们这个open close用来干什么呢?啊,这个其实也比较简单,就是一般情况下我们会在这儿比方说啊,假如说我这个map的过程当中,我想干什么呢?我想去从某一个数据库里边拿一个当前的那个数据,那个值读一下对吧,拿到那个值,然后我根据那个值的。
15:14
当前的那个值到底是什么,我这里边可能做一个if else判断对吧?呃,就是如果那个值是一的话,我当前map成一个什么,如果是二的话,我再map不成另外一种数据啊,所以对于这种场景,那怎么办呢?哎,大家可能想到那非常简单,你建立数据库的连接,然后去,然后去拿不就完了吗?对吧,你把那个数据库对应的那些包访问他的那些,呃,依赖的那些包都都引入,然后我们直接调用方法去访问就完了嘛,但这里面有一个问题就是那你是大家注意啊,这个map是什么,这个map定义好了之后,我们其实是每来一条数据都要执行一次这里的map对不对?哎,那你假如说你的那个数据库连接,你直接放在这个map里边的话,那就相当于每来一条数据,我就重新建立一次连接,然后再去拿这个数,每来一条数据就建立一次连接,那你想这显然是不必要的,对吧?而且这个你连接数越来越多,你不释放这个,这个有问题的呀,啊,那或者有同学想到,那我就每来一次,呃,先建立连接,后面再把它close,对吧?先建立连接,后面再close close,那你说这个太复杂了是吧?你何必那么麻烦呢?我们数据量大的时候,你那个性能影响非常大呀,所以这里边的一种比较好的实现方式就是我可以直接在open生命周期里边去做一些初始化的操作,对吧?这里边一般就是做一些初始化操作,比如数据库的连接,就一般情况,我们会在这里面去做。
16:46
哦啊,那那当然了,这个大家会想到,那你要这么说的话,那open那就不是所有的数据来了之后都都去调一次了啊,当然了,当然不是了,它是当前这个函数类的生命周期,所以这个open其实是什么时候呢?什么时候调用呢?在当前这个类的对象创建的时候,这有点像初始化一样,对吧?啊,这有点有点像我们那个构造器一样,但是它本身不是构造器,这个类的构造方法大家知道是你在这里边直接加括号,这样去主构造方法是这样去调的,对吧?然后里边你可以去设置那个this啊,去给它添加那个辅助构造方法,那它是在构造方法调完了之后,然后已经加入到我们当前flink运行的运行时环境里边来之后,然后去调的一个初始化,就是还没数据来对吧,但是我的任务都已经设置好了,配置好了,然后我可以去执行一个open这样的一个操作。
17:38
所以在这个里边,我是真正你现在在这个里边去get当前的wrongtime context啊,获取当前的这个运行上下文,这个是比真正有用的,就是你如果在外面去获取的话,它其实还拿不到,你在这个里边再去获取,这就有用了。呃,那呃,当然了,这个有有这个初始化,那就有收尾对吧,那close这里边一般就去就去做收尾了,对吧?那这里边一般一般做收尾工作。
18:12
啊,那比如关闭连接数据库连接对吧?啊,或者清空状态,哎,那后面我们讲到这个状态编程的时候,会给大家再再来做一个这个详细的讲解啊,就是状态编程里边,那我们需要去干什么呢?那其实就是get状态context,从运营上下文里边,因为大家知道你定义了状态之后,它并不是当前这个数据带着的东西,对吧?哎,我得去之前已经保存好的状态,我得到上下文里边去找,所以说我是先get runtime contacts,然后大家看到可以get各种状态对吧?我这里边比方说这里边可以get list state,对吧,Get一个map state啊,或者我就直接get state,这都是可以的。所以后面我们讲到这个,呃,状态编程的时候,还会再给大家来说一说这个。
19:04
负函数里边的get runtime context对吧?获取上下获取上下文的这个操作啊,那所以我们现在就可以有一个概念啊,就是负函数主要附在哪里,有生命周期,还可以获取运行上下文,生命周期一般情况用来干什么?哎,就是做一些初始化和收尾的工作,对吧?比如说数据库连接,或者说运行上下文里边获取可以获取当前的状态,进行状态编程。啊,所以这些都是我们后面要给大家再展开讲的内容,大家先做一个整体了解。
我来说两句