00:00
我们接着讲这个流处理API,这里要跟大家说一句的是,呃,大家可能会回忆起来,在我们讲word count的时候,不是用了两种方式吗?一个是批处理的方式,一个是流处理的方式,流处理的方式当时我们用的那个API是data stream的API,对不对?那批处理的方式其实跟它不是一套,我们当时那个叫什么data set a对API啊,但是data set跟data stream大家会看到它的层级是一样的。下边的很多方法算子其实也类似,对不对?只是可能稍有差别,而且大家会发现这个如果我们了解了这个流处理stream之后,Data set就非常非常简单了,而且在flink的应用场景里边,一般情况下我们更多的用的也是data stream,对吧?所以data set我们在这里就不专门去讲了,重点就都放在这个流处理上面,弗link本来就是大数据的流处理框架嘛啊,所以我们重点放在data stream上面。好,首先我们还是回顾一下。
01:01
Flink的一个程序代码里边分成哪几部分,哎,是不是最主要的就是三部分啊,哎,当然前面我们看还应该再包括一部分,就是什么创建执行环境对吧?啊最后不要忘记,因为它是这个呃,懒加载的这种模式,我们最后还要有一个执行对吧?呃,这个execute把它执行起来,所以大家会看到,其实大致来分的话,那代码里边就是这样的四框。前面一块是这个执行环境,后边呢,就是我们所说的代码主体三大块,Source,然后transform,中间的转换后,Think对吧,前面是这个数据源的读取输入,然后中间的转换操作,做计算做分析,最后做输出,诶就是这样的三个步骤。所以今天呢,我们就按照的这个这几大模块给大家逐一来讲解啊,大家看就是environment,然后source transform,呃,最后是think,在这个之前呢,再给大家插播,呃,这个插播的不是广告,这个还是挺基础的东西啊,就是就给大家讲一下我们在flink里边支持的数据类型啊,这当然就是大家有大概有一个概念就可以啊,然后后面这一部分还挺重要的,这一部分插在这里就是让大家体会一下flink里边的一些特点了,就是我们可以去自定义一些udf函数啊,那这一块其实更加体现这个flink的一些代码的风格,所以到时候讲到的时候,大家其实这个是放在这里,是专门把这个概念提出来,整个讲解的过程当中其实都贯穿始终啊,大家就是一边讲一边去体会这个flink的编程风格就好了。好,那首先。
02:51
我们来看这个environment environment,这个很简单,我们之前代码里边都已经做了,对吧?第一步上来之后,我们那个这不就干的就是这个创建执行环境吗?对吧?我们直接如果是流处理的话,我们data stream API的话,是不是要用stream execution environment,然后去get它的execution environment,对吧?这个大家应该是已经比较熟悉了,这里我们要给大家讲的是啊,就是这个东西我们比较熟悉,但其实呢,呃,就是它并不是最底层的这种,呃,创建执行环境的方式,最底层的方式是什么呢?
03:30
大家看是这两个。一个叫做create local environment,一个叫做create remote environment,从名字上大家能知道这两个是分别是干什么事情吗?对,一个是本地的执行环境,一个是remote,对吧,远程的这样的一个执行环境,也就是说我们真正在生产环境里面集群的那个执行环境。所以大家看。这个是代表什么呢?是不是我们如果要在本地开发环境里边,IDE里边去配的时候。
04:05
大家想我应该是用哪一个用对应该直接你用这个local environment就可以了,它这里面有一个参数,就是你直接指定并行度就可以了。然后呢,如果说我们要把它做部署要提交的时候,那应该怎么办呢?诶,那对这个环境就不一样了,大家会想到我在本地这个开发环境里边,我的执行环境当然就是本本地local,那如果我提交到这个集群里边去,那可就不是了,对吧?那就跟我提交的这个集群环境有关系了,所以在这里边是不是就得把我对应的那个hostname,还有那个端口是不是都得传进去啊,最后还得指定我们要运行的那个炸包,对吧?啊,所以大家看这个相当于就是把我们提交任务的那个所有的那些参数那个环境都放在这里了,这里大家注意一下这个端口给的是多少。
05:00
6123,哎,为什么6123的大家还记得吗?还有印象这个623在哪里出现过吗?对ya某那个配置文件里边是不是有一个job manager的RPC的端口啊,而job manager它跟别的机器去做通信的时候,RPC通信的时候的那个端口,哎,是这里我们要给的端口对不对?哎,所以这里边大家就看,如果你在压缩文件里边改了这个的话,这里边就不一样了。啊,所以这个是大家需要注意的一个,那这里边给的host nameme,当然是我们job manager host name对吧?啊,这个不要给成task manager啊,啊那大家看这两种方式它比较底层,但是为什么我们平常不用呢。就是因为这个就太烦了,对吧,如果说要用这种方式的话,我们得得怎么办?在开发环境,我们自己做测试,在运行的时候,我得用这个create local environment,然后呢,我最后要打包,要编译提交,编译打包提交到这个集群上的时候呢,我是不是还得改一下这个源码啊,我得把它再改成这个remote的环境,然后才能提交,这个是不是就非常的不爽,对吧?对于我们一个程序员来讲,你这个已经测试完成之后,再去改源码,再提交,这个其实是一个一个比较大的忌讳,其实不应该这么干,你所有的这些东西其实都是我们相当于环境配置相关的一些东西,你应该在外部去把这些东西做完,把这些东西传进去,而不应该去动源码,诶,所以大家就会看到fli给我们提供了一个get execution environment的方法,它相当于是什么呢?它会自动的去查询。
06:48
当前运行的方式,然后决定告诉我们返回的是local的环境还是remote的环境,然后底层再去调这个对吧,下面的这样两种方法,哎,所以这其实就是我们最常用的一种创建执行环境的方法了啊,这个就是大家了解一下就可以啊,当然了,就是在这个创建执行环境的时候,往往呃有一个什么呢?就是我们可以在这个创建完执行环境里之后去做一些配置,对不对?比方说如果我要禁用这个,呃,任务量operator chain,呃,我可以在这里把它禁用,或者说我在这里是不是可以去设置全局的并行度啊啊对吧,大家看到还可以设置一些其他的东西,我们到后面讲到的时候再给大家说啊,所以就是一开始往往环境这一步,我们就是做这些事情,全局的一些东西设置。
07:44
好,那呃,接下来我们就真正的要做这个API的测试了啊,这个就没没什么没没什么好测的,对吧?接下来我们啊,首先我们来看SS吧,SS我们说了这是数据源对不对?诶这里边我们会讲好几种不同的读取数据源,就是创建数据源的这种方式啊,我们这里边单独给大家再去新建一个。
08:11
我们新建啊,我我直接去新建一个那个object啊,这个包我定义成at,硅谷定义另外一个包,对吧,我叫API test,然后下边我叫test。呃,单独给一个这个东西去做我们source这个算子的一个测试,SAPI的一个测试,然后我们把这个内函数写出来啊,Word count就就不要不要用word count做例子啊,我们这里边还是单独去做定义吧,我们这个用什么做例子呢?大家看到在文档里边给了大家一个例子,就是大家看这个S是什么意思啊。Sens是传感器,对,这个其实就是在真正的这个工业互联网里边应用很广的一个一个应应用场景,就是大家知道有很多很多的传感器,对吧?它的数据采集实时性要求非常的高,这个如果数据量又很大,实时性又很高,要求延迟又很很很低,对吧?然后这个正确性还得保证,这个过程当中我们往往用flink用的比较多,好那我们就以这个作为一个例子,呃,那这里面大家会看到把把它是包成了一个样例类,对不对啊,这里边我们先定义这个样例类吧。
09:30
呃,Case class,我我的这个样一类叫做S。就是传感器的数据读数。传感器。毒素样例类。啊,那当然这是一个,这是一个温度传感器啊,温度传感器。大家想一想,如果要是一个温度传感器的话,返回的这个数据,我们收集起来的这个数据日志里边啊,应该是包含什么东西呢?应该得有一个温度对吧?啊,那当然温度传感器嘛,得有一个温度,还应该带什么数据啊对,是不是还应该带时间出啊对吧?你是什么时候采集到的这个温度是什么样的,要不然的话,你这个温度它在不停的变嘛,你这个就没有没有意义了,另外大家想还还应该有什么。
10:24
哦,大家可能会想可能有地点对吧,你你哪个地方的这个传感器,但这个地点可能这是外部配置的一个参数了,对吧?对于传感器而言,我们是不是大家想一想这个物联网的这个状态,万物互联,那是不是应该每一个传感器得有自己的一个ID呀?哎,我们只要知道这个ID是哪个传感器,是不是就可以知道定位到它是哪个位置了啊,这个我们再去查就可以了,所以大家看这里边我们给给它给定的这个,呃里边的内容啊,就是一个ID,呃,这是一个string类型,因因为它里边可能我们给一些特殊的字段啊,所以给一个string,然后可能要有时间,对吧,Time stamp,呃,这是一个law类型的一个时间戳,然后最后还有一个真正要用的那个读数temperature对吧,呃,这个温度给个double类型吧,有小数点。
11:21
所以大家会看到就是,呃,这是我们先把这个。本身的数据读进来的数据,先要把它包装成这样的一个样例类,对吧?啊,这也是往往我们在项目当中常见的一种方式,就是数据读的时候,有可能我们从日志文件里面读对吧?有可能从这个比方说卡夫卡来的数据啊,有可能我们从其他的一些源来的数据来了之后,我们往往都是包成一个我们想要来统一处理的样例类,对不对?哎,这个比较好理解一些,好,有了这个样例类之后,我们就在这个main函数里边可以去对它做一些处理了,首先大家不要忘记我们第一步是干什么来着,是不是要创建那个执行环境啊,对,这个还是不能少啊,Execution environment get对吧?呃,这里大家把这个还是直接。
12:14
改成这个下划线,然后方便我们的那个太平information的影视转换啊好呃,然后接下来我们就真正的要开始从不同的原里边去读数据了。首先我们要给大家介绍的第一种SS是从自定义的集合中。读取数据,好,那那大家会想到这里边我定义的这个叫stream one吧。呃,那他是不是可以env直接调对应的一些方法啊,呃,大家其实之前知道就是我们,我们如果要是说从文件我们是不是。曾经从文件里面读过,另外是不是还曾经从那个soet,那个呃套贴字里边直接去读过,对吧?啊,这个soet我们现在要从一个自定义的集合里边去读,那是怎么去读呢?大家看它有一个from collection这样的一个方法啊,这个方法就是里边大家看可以直接传一个S,对不对?所以这里边我直接给一个list,然后把我想传的东西包装成一个list传进去就完事了。
13:26
那当然我们现在要传的东西就是都应该包成sensor reading对不对啊,这个数据直接在这里写死就好啊,这里边我就不详细去给大家一个一个写了,我们就直接照着文档里边啊,大家看这个这个小数点后面位数还挺挺挺多啊,说明这个传感器温度精度有点太高了是吧?啊这这个不重要啊,大家就是知道这是一个数据就可以,我把这个copy过来,大家看他给的这个ID,就是一个S下划线,然后带一个数字对不对,这是他的ID,后边这个时间戳,大家知道这是这是秒做单位还是毫秒做单位吗?啊对,这个是秒做单位的对吧,因为呃,这这个当然了,就是也有可能它是毫秒做单位,如果是毫秒做单位,那是不是相当于这个时间就非常非常小了,对吧,就是按正常来讲,我们现在的时间啊,一般情况来讲,如果大家看到是这个十位的这个状态的话,一般哎,可能就代表这是一个一个一个秒做单位的。
14:27
如果要是13位后面再跟上几位的话,那可能就是一个毫秒,对不对,这个我们讲到后面时间的时候再给大家详细说啊,呃,这里是我们的一种这个数据读读读取的方式,那我们已经读出来了,那看一看效果吧,那我们在这里边是不是可以直接print出来啊,大家看这个print里边还可以给一个string,这个string是什么意思呢?哎,我们先把这个给进去啊,大家看它是think identifier对吧,它是把这个print是不是就当成了一个think啊,尽管我们没有讲到thinkk,但其实这里边的printnk就是一种think,因为它控制台输出是不是也是输出啊,所以在link程序里边大家看。
15:13
所有程序是不是都是这样的一个流程,South transform,然后最后是think,这个是一定要跑完的这个流程,好,我们给一个这个东西吧,呃,然后最后这个不影响我们的正确性,可以直接给他一个并行度是一对不对,我们看一下效果就可以了,这个可以直接跑了吗?哎,对,大家不要忘记啊,是不是差了一个对执行啊,对吧,这里边我们执行这个叫做。呃,Test对吧,好。现在我们可以跑下了。看一眼啊。看看这个效果怎么样?好,大家可以看到我们当前的这个状态啊,直接是不是就都读取出来,然后直接输出了,那刚才我们给的那个呃,这个这个字符串,这个stream one,这这STREAM1,它到底是出现在了哪里呢?而且大家看是不是在前边就会有这样对应的一个字符串,然后后边有一个大于号,这个这个监括号,对吧,然后后边是我们真正输出的那个那个状态啊,所以这个其实就是方便我们看你打印的这个到底是哪一哪一条流,或者说哪一个呃输出对不对,方便我们去看。
16:31
有些同学可能会想到,那之前我们不是在在这个监控号前面的是什么来着。是那个我们分配的slot的那个那个编号对不对,或者是我们那个现成的编号对吧?诶那这里边他把这个替代了,那那怎么办呢?因为我们这里边是不是并行度是一啊,这个没关系对吧,那大可能会想我如果要并行度不是一,那怎么办呢。我这里给个六,他会怎么样呢?
17:01
哦,其实这个就是我们试一下,大家会想到是不是肯定会叠加在一起啊,不可能把它覆盖对不对,呃,所以这个大家试一下看看这个效果。大家就看到最后叠加起来的效果是什么呢?就是我们要打印的这个字符串,就是前面的这个identify对吧,这个标识是在前面,然后加一个冒号,后边是我们跟着的这个呃,并行的这个slot的编号对吧?啊是这样的一个一个过程啊,我们一般情况呢,这里边就是说呃,方方便大家去看的话,我们一般给定行度是一就就OK了,对吧?啊这个大家知道就行,好,除了这个自定义的集合呢,其实我们还讲了很多啊啊第二种这个我们就简单写一下就是。呃,我们还讲过那个从文件里边读取对不对,呃,还讲过那个从socket里边,呃流逝的直接去读取socket,这个不太常用,这个就只有是测试的时候才能么用了,对吧,而且就是只有是这种你本地起一个最简单的这种,呃,这个so的这个server端口起起来之后才会用到,所以我们这里就不专门列出来了,但是从一个文件里边读取。
18:13
这种情况其实还是有的,大家可能会想到,假如说哎,我们要做一个类似于批处理的那种方式,一个有限流,那是不是应该他都在一个文件里边,都应该有这个数据啊,呃,Log文件一组数据都已经来了,我们直接读,直接去处理就可以了,或者有时候我们可以就相当于哎,把之前呃,所有已经处理过的数据再重新处理一遍,其实也是从一个文件里面把它读出来的,好,所以我们这个重新定义一下啊,第二个方式叫从文件中读取数据,呃,这个就简单写一下吧,大家都很熟了啊,用什么是不是可以read text file啊啊,当然这里边这个text text file本身的类型可以多种多样啊,就是我们这里边是都从这个text里边去读取,如果我们还是这个sensor的话,大家可以从那个word count里边读,这里边如果想自己再定义一个,我们再定义一个sensor吧,方便。
19:13
后面做测试啊呃,那这里边的这个数据是不是我得把这个改一下呀,大家会想到这个粘过来之后不能直接用,对不对,就像这些引号是不是就不需要了。这括号不需要对吧。这个数据是不是就不应该包装成3READING啊,要不然你读出来之后这还得解析啊。我们正常的话,是不是就应该是这样的,一行一行数据读进来就完事了,哎,所以这是我们正常的这个文件的读取方式啊,那。呃,这个读进来之后,当然我们大家会想到我这里边如果要是print这个STREAM2的话,它会是一个什么效果。他还会跟我们之前STEM1一样吗?输出的是3READING吗?那STEM2如果要直接去跑的话,大家看输出应该是什么,它包成sensor reading了吗?大家想一下是不是直接走,诶这里边啊,路径没写啊,这个。
20:21
这个大家写自己的路径就好,大家其实会看到我们在这个过程当中是。前边我们是直接就在这个list的定义的时候,就已经把它包成sensor reading了,那在这里边我们是不是只是从文件里边把它做了一个读取,根本没有包装啊,那所以读出来是一行,是不是这里边还是原封不动一行输出啊,诶,所以大家看到是这个这个效果啊好呃,这这一部分就是都很简单,大家大概有一个了解就好了。另外还有一个,其实前面大家可能在这里边就是我们点这个from的时候,大已经看到了,除了from还有一个from element对吧?啊,这个其实直观的理解大家知道这个from elements相当于是什么呢?
21:09
是不是就是从不同的元素里边去创建一个元啊,所以这个就相当于是什么,相当于更是一个大招了,就是什么我我随便可以给给不同的元素,对吧,大家看是不是这样啊,对吧,我直接给一个string,直接这么去读都可以。然后你如果这里边想去把它print出来啊,我们看一眼print出来,这个大家可以知道,这print出来是不是就是原封不动把它打印出来就完事了啊,所以其实flink里边对数据的读取方式其实是多种多样的,而且灵活度是很高的啊,这里边我们只是以这个from啊,大家看就是这样对吧?啊,这里边没有设置那个并行度,所以说是按照我默认的并行度是四对不对啊,我这四核的机器啊,好,这个我就给大家注掉了。
我来说两句