00:00
啊,然后接下来大家会看到给大家简单的说一下,在这个flink里边到底支持哪些数据类型,前面我们已经讲了这么多了,对吧,在flink里边到底支持哪些数据类型呢?呃,其实简单来说就是flink里边。Java和skyla里边的所有的常见数据类型,还有他们的这个Java类,Skyla类啊,所有的这些类型都是支持的,比如说啊,大家大家看一下这个这个例子啊,支持所有的Java sc基础类型,之前其实我们也已经看了一个就是enna,我们这个创建流的时候,是不是可以直接from elements啊,From elements的时候,大家看这里我举的例子是都是这个长整形类类型,前面我们在举例子的时候,是不是这里给整形也可以,给string也可以,对吧?给浮点给double都可以啊,所以这个它根本没有要求啊,基本的数据类型都支持,然后另外还支持什么呢?哎,原组类型都支持啊,Java和Java和scla的原组类型都是支持的,比方说我们可以from elements,直接传什么。
01:08
诶,大家看直接给一个元组对吧,你里边任意去定义这些数据类型都可以,所以是非常非常灵活的啊,另外还有就是我们已经用过了skyla样例类,是不是这个也可以啊啊,我们s reading就是一个这样的一个样例类啊,另外还可以。Java的简单对象啊,也就是Java类对不对,Po啊,这个也是可以的,那大家可能会想到,诶,你这个东西,那这里边得是一个Java代码啊呃,这个不重不要紧啊,不重要大家可能知道我们这个skyva跟Java代码本来就可以混合编译嘛,对吧?所以只要我们把这个Java类定义在外边,定义一个单独的Java类,然后在这个scale代码里边,我去引入去new一个这个啊,Java的这个对象是不是就可以了啊,所以就可以直接把它引进来了,这个其实都是没问题的啊呃,另外呢,就还有一些常规的这些特殊的类型,比方说release,对吧,哈希map,或者说我们的这个媒体类型,所有的这些类型都是支持的,本身link对于这个skyla和Java的支持。
02:17
数据类型的这个支持度都是非常非常高的,而且大家可能会发现flink的源码里边,它其实现在的这个状态是什么呢?就是有一部分源码是这个抓码代码对吧,有一部分源码是这个加代码,像我们之前给大家讲的这个,呃,这个SS这部分啊,自定义这个s function的时候,这是不是大家看就调用的,这是一个Java interface啊,Java接口对不对啊,然后平常我们用的一些这些类啊什么的,有可能这是呃,这个scla的一个类,Scla类对吧?啊,所以它的这个实现本来就是一个比较混合的一个状态,大家可能也得适应这个flink里边的一些编程,编程风格啊,就它的支持度很好,灵活,灵活度很高,那同时你要去处理的事情也就会更多。
03:10
讲完了这个transform,而且讲过了这个知识的数据类型,接下来给大家讲一下,在flink里边,我们可以自定义实现这个udf函数,它主要的目的就是要去实现更细力度的一个控制,对我们的数据流可以做更细力度的控制。那首先我们要讲这个函数类,函数类就是所谓的这个function classes啊,函数类是个什么东西呢?哎,简单来说,我们前面做那个,呃,这个source API调用的时候,我们不是有一个ADD source方法吗?里面我们传的那个s function大家还记得对吧?哎,那就是一个函数类。在flink里边,它其实是暴露了所有的udf接口的方法,言下之意就是说什么呢?呃,就是说呃,比方说map这个操作对应的里边我们就可以传一个map function。
04:06
Filter操作里边对应的我们可以传一个filter方式。啊,这个另外就是process可以传process方式,这就是我们说的那个底层呃API了,对吧,最底层的调用,所以这些统一来讲都是函数内。那比方说我们这里面给大家举一个具体的例子吧,呃,前面这里边我们data stream已经有了是吧,这里把这些先都住掉啊,接下来我们做一个函数类。呃,比方说我们的那个data stream是不是可以直接后面做filter啊,大家习惯的这个filter怎么去filter。对,是不是啊,大家看,可以传一个函数,然后返回一个波尔类型的这样的一个函数,对不对啊,就是除false对吧,然后判断,根据这个条件来判断它,呃,这个到底是不是符合这个过滤的条件,那另外它还可以传什么呢?大家看,首先你可以传这样的一个函数,这是大家比较习习惯的这个写法,另外还可以传一个filter function。
05:18
这种实现方式就是我们所谓的这个函数类的这样的一种接口实现,那现在我们就来实现一下,所以这里边filter的时候可以直接对new一个my future,这就是一个函数类啊,对吧,自定义一个类嘛,那当然了啊,你直接在这儿写出来这个还不知道它是什么东西,那我们这里边当然要定义一下my future。他要实现一个什么东西呢?对,实现一个filter function,好,这是flink API。提供的这样一个类型,然后这里边大家会想到它里面的数据类型是什么呢?是不是还得是我们现在处理的流的数据类型啊,哎,所以这里边还是sensor。
06:11
好,给大家看,写完这个之后上面不报错了对吧?啊,那接下来这里还报错,为什么啊,你肯定得有必须要复写的函数嘛,对吧?实现的函数大家看必须要实现的,就是要实现一个filter方法对吧?所以大家看这个filter它是不是最后返回的就是一个不尔类型啊,这是不是跟我们直接写一个那个,呃,这个返回一个布类型的函数是一样的这个表达啊啊,所以本质都是一样的,只不过就是flink的编程习惯,就是它暴露了所有的这种函数类的接口,你可以都把它包装成一个自定义的函数类啊,所以这里面这个filter,比方说我们用一个什么条件去筛选吧。呃,比方说大家看我这里边筛选是不是就是筛选这个当前输入数据符合某种条件对吧,当前的输入数据是不是就是value,比方说我这个value啊,Value这样吧,它的ID我要以某个值开始啊,那比方说是不是可以start with啊,Start with,比方说SENSOR1,那么以这个SENSOR1开头的所有的这个值筛选出来,大家想一想对应到我们这里的话。
07:27
应该是哪些?对,是不是一肯定要筛筛出来对吧?哎,是不是十也能筛出来啊,哎,所以我们这个条件应该是把它们都能筛出来,所以大家可以看一看,诶,这个start with直接返回的就是一个波尔类型,对不对啊,所以这个我们就直接这样就可以了,这就是我们自定义的一个filter function,一个函数类的实现,那我们把这个print一下,看看这个效果怎么样吧。好,大家看到我们已经得到了输出的结果,这个非常符合我们预期,是不是一跟十的所有数据都被我们筛出来了,哎,这就是我们做到了这样一个自定义的feature方式,是不是很简单啊,当然了,就是大家如果想做的话,你也可以在这个map的时候,比方说前面我们这里边不是,呃,啊,这里没有啊,这里不是有map吗?那大家会想到这里map是不是相当于也可以传一个,是不是也可以传一个map function啊,对吧?那这是同样的,你就去实现一个这个map function就好,那这个map function里边必须得去实现什么东西呢?Interface对吧?是不是必须得去实现一个map方法啊,所以这个其实就跟我们直观意义上的那个map是一样的,我们map是不是也是要传入一个map方法啊,对吧,一个函数嘛,啊,所以这个其实是一样的啊,我们就不详细说了。
08:54
啊,然后呃,大家看到这个文档里边还有一些例子,大家简单的看一看吧,呃,比方说这里边我们实现这个函数类是不是也可以contain什么样的字段啊,比方说contain一个flink,呃,另外大家会看到还可以把它这个这里边的这个函数呢,实现成一个匿名类,什么叫匿名类呢?就是我们前面在这里边直接去定义的时候,是把这个这个类单独在外面定义出来了,对吧?然后这个类就相当于是可复用的一个类了,对不对啊,这里边就相当于是new了一个,呃,当前的这个一个类的一个对象,一个实例,那这里边还可以怎么样呢?是不是可以不给这个类名啊,直接就去扭,对啊,这样就是一个匿名类啊,这样也是可以的,另外还有什么啊,大家会看到就是还有一些就是如果你创建类的话,有一个好处,我们是不是可以给他传一些参数啊,就相当于当成它的内部的这个私有私有变量了,对吧,私有成员了。
09:54
所以假如说我们把这个flink当成参数传进去,在这个new filter的时候给它传进来,那么本身我们实现的这个class,它是不是就有就有了这样的一个keyword呀,你传什么我就根据这个来去筛什么,对吧?啊,这样它的这个复用度是不是就会更高啊,所以这些都是可以做的啊好,这个是函数类function啊,Function class,然后接下来还有这个匿名函数,匿名函数就更简单了,那这个是不是就是说我们可以直接在这个里边传一个函数啊,传一个匿名函数,那刚才我们这种实现方式是不是就相当于直接,呃,不是temperature啊,是不是直接id.start with,然后什么3ENS1是不是直接这样也是一样的啊。
10:44
那大家知道这个下划线这个代表什么含义呢。他是不是就代表我们这里的一个data,然后。返回一个get,他点这个啊,对吧?哎,所以这其实就是一个匿名函数对吧?啊,所以大家如果熟悉这些表达的话,这个都是一样的操作啊,我把这个还是改成我们自定义的这个函数类,好,那接下来再给大家讲一个概念,叫负函数,就是reach方式,就之前大家在有一些这个呃,代码里边其实可能见到过这个对吧?啊,它是一个rich什么什么function,那什么叫做rich function呢?
11:23
既然它是负函数,所以它应该,那那大家会想到它是函数吗?它叫负函数,但其实不是函数,它还是函数类啊,它最后就还是一个函数类,那么它跟函数类比起来,它到底有什么区别呢?哎,它它就会富一些,对不对啊,它就会富有一些,那它富有什么样的东西呢?对,它会有更多更强大的功能,比方说它可以获取到运行环境的上下文。比方说他可以拥有一些生命周期方法,所以它能实现的东西就更加多,更加复杂了,啊,那可以说啊,就是flink里边所有的flink函数类都有它对应的rich版本。
12:13
比方说我们前面不是说有这个map function吗?对吧,你可以传一个map function,那对应的就有一个rich map function,那有这个filter function吗?对应就有rich filter function啊,那那所以几乎所有的这个函数类啊,都有它的这个rich版本,它的特点就是说里边有生命周期,可以获取这个运行环境上下本,我们给大家简单的这个,这个我就不详细在在上面调用了,就给大家看一眼就好了,比方说我在自己定义一个,比方说my map function啊my map吧。呃,大家会想到我可以去继承一个map方式,对吧,另外还可以继承什么?对,还可以继承rich那方啊,当然这里边。
13:09
诶,这个这个可能会有点儿问题啊,怎么没有办法去引入它。好,我们把这个rich map function引入,然后大家会看到它里边的这个。它里边的类型有两个,诶,为什么得有两个呢?因为大家知道做了map转换操作之后,是不是数据类型有可能改变啊,哎,所以它的类型是两个,有一个in,有一个outt,这个就不光是reach是这样,就普通的是不是也是这样啊,也得一个印一个out啊,所以这里面大家看它下边必须要实现的一个是不是也是map啊啊所以我们看一眼啊,Rich map function这里面的类型我们本身输入的是sensor reading输出啊,我们随便定义啊,比方说我直接给一个string是不是也可以啊,对吧,这个都没问题。
14:00
必须要实现的方法map对吧?呃,那比方说我这里边就直接输出一个呃,Flink没关系,对不对啊,直接每一个元素来了之后,是不是就相当于把这个SRA直接呃这个转换成一个flink了啊,那大家会看到在这个rich慢方式里边,它还有哪些可以调用实现的方法呢?比方说它可以。Get wrongtime contacts是不是可以获取运行是上下,呃上下文啊,上下文环境对吧,另外还可以set对吧,还可以set这个环境,另外它还有一些生命周期,比方说open close对吧?啊,这些东西它都可以去做这些操作,你如果想去用的话啊,我可以直接在这里边把它引入进来就可以了,大家可以会想到就是一般情况这个在这个open啊,什么这里边去干什么事情呢?我们在这个文档里边有一个例子,大家可以看一看。啊,这里面是用了一个rich flat map方式,这个其实差不多的,对吧,大家知道意思就好啊,大家看它这里就定义了一个open,那这里面他干了什么事情呢。
15:09
它通过这个上下文环境获取了一个index of this sub sub tasks,就是我们当前子任务的那个索引值,就当前的那个任务,子任务的那个编号,他获取到了。呃,然后大家可能就会想到,他有可能就会用这个,呃,获取到这个编号,可能就会做一些做一些工作,对吧?有可能根据自己当前的那个编号,比方说作为随机数种子去生成一些啊随机数啊,或者说怎么样啊,这就按照你自己的业务逻辑去定义了,他可以从上下文里边拿到这些东西,另外这个open里边还可以干什么事情呢?比方说可以做一些初始化的工作,比方说我们要跟那个h emms做连接了,那这里边是不是可以先建立连接啊,对吧,我要跟去做连接了,在这里边可以先连接起来,这就是生命周期里面的一个作用,在创建的时候它就可以去连接,而不用等到数据来的时候再去做,对吧?这是这部分内容,那当然了,这个代码大家看它最关键的实现的就是这个Fla map嘛,所以它这个最后输出的,大家看它的输出用了什么,就是out.collect就表示它的输出啊,这个之前大家也有类似的那个,呃。
16:25
就是在s function里边,它是不是也是用上下文的collect方法去把它输出了呀?啊这里边它是用了这个out,这个collector它的这个方法把它输出了啊,它是输出了一个什么呢?它是如果当前的这个输入的数据啊,它200%分号二这个表示什么?对二取对不对,对吧?啊也就是看它是奇数还是偶数啊,得到的是零还是一,如果说这个刚好等于当前的那个呃,子任务的那个编号的话,就把它输出啊,是它这个特定的一些呃,一些筛选的条件,对吧?大家大概知道就可以,那后面对应的有open就有close close的时候是不是就可以做一些清理工作啊,可以把我们这个呃,HDFS的连接断开啊或者怎么样?呃,给大家举一个例子,大家知道就好了。
17:16
好,这就是这个udf这一部分内容。
我来说两句