00:00
大在已经知道flink到底是什么了,有哪些特点,那接下来呢,就要真正的在代码里边来感受一下flink到底怎么样去啊,怎么样去编程啊,怎么样去写link代码风格的程序,那呃,我们这里边要给大家做的代码的实现呢,首先用到一些工具,首先IDEIDE我们还是用idea啊,大家比较熟悉的这个idea啊,然后我们会搭建一个maven项目啊,这个也是大家比较熟悉的啊,我们用maven作为当前的包管理工具,那我们用到的语言呢,是skyva啊,就对于这个弗link而言,我们说它底层其实是Java,然后它给我们提供了Java和SKY拉,非常方便的同时提供了这样两套这个API,那我们在开发的过程当中呢。你可以用Java,也可以用skyla,那用skyla整体来讲代码风格可能会更加的简洁,可读性更强,所以比较推荐大家用skyla啊,那当前我们这个项目就是用scla去写一个微稳项目啊,用用idea去作为当前的这个编辑工具,IDE工具,所以接下来我们就是在当前,哎,我们首先打开idea啊,去新建一个微问项,当前我给一下当前的这个groupd啊,叫做抗点at硅谷啊,然后当前的I的ID我就叫做flink。
01:22
啊,Tutorial教程。好,先先把这个先定义出来,Tutorial。边士。然后接下来大家会想到,既然是微问项目,那我们本身进去之后啊,首先要去写的就是一个POM文件了。好,这里我们已经启动了啊,那么提起来之后,大家可以把这个呃,Autoport可以enable打开,我们首先看看这个pop文件啊,那这个pop文件里边我们主要是要添加一些当前项目需要的依赖和插件,对吧,Dependency和plug,那所以我们可以看一看这个po文件啊,这个在文档里边已经给大家列在这里了,我们主要想要引入的依赖是什么呢?既然要写。
02:13
Flink的代码,那当然你要引入flink的依赖,我们这里边现在给大家使用的版本,Flink的版本是一点十点一啊,是这个当前的比较新的这个版本啊,然后接下来另外还有一个大家看到还需要定义scla的版本啊,这里边我们用到的skyla版本是2.12,所以我们引入的这个依赖呢,叫做flink scale2.12,它自己本身的版本是一点十点一,它的group ID是阿帕奇弗link啊,这是我们首先要引悟的内容。然后接下来我们是这个基于scla去做的这个开发嘛,对吧,所以接下来我们还需要呃呃呃,就是接下来我们首先要做这个流式处理的开发嘛,呃,这里边大家看到这个基于scla做开发,所以我们有这个scla版本在后边啊,然后后边我们还需要引入一个依赖,叫做flink streaming scale 2.12。
03:11
啊,所以这个大家看到做流式处理的话,这个依赖也是必须要引入的啊,那后边这里边同样版本还是一点十点一啊,大家把这两个依赖引入,接下来我们就可以基于flink去做代码的书写了啊,那除了这个基本的依赖之外呢,我先把这个先直接copy过来啊。Copy过来,这是我们需要引入的dependency,然后另外我们可能还需要引入一些插件啊,这里我们需要的插件主要就是用来做编译和打包了啊,大家之前可能并不太关心关于这个插件啊,对吧,甚至大家可能连po文件里边这些内容都呃都不关注,直接就copy过去,直接能跑起来就可以了,那希望大家还是要就是至少把引入的一个依赖和插件啊,就每一个组件至少要看清楚,因为实际在项目当中。
04:06
因为引入的依赖啊,或者说插件的问题导致的冲突,导致我们整个这个项目跑不起来,这种还是挺常见的啊,大家一定要细心一点啊啊,那后面我们引入的这个插件主要有两个啊,大家看到在build的下边对吧,引入plugins,然后有两个plug in,第一个plug in呢,从这里啊到这里啊。叫做scla maven puy啊,那这个插件主要用来干什么呢?啊,它主要用来做编译,就是在maven项目里边,我们把这个skyla源文件编译成自解码文件啊,主要是做这样的一个事情,但是大家知道就是如果说我们本身在呃项目当中啊,引入了这个scla的框架支持的话,其实它自己是可以给我们做编译的,你直接去运行是没有问题的啊,但是如果说我们想要在生命整个这个组件啊,这个项目的生命周期里边去做更加灵活的管理的话,那引入一个这个专门的好用的这个插件还是比较有必要的啊,这个就是skyla plug in啊,我们这里边用的是最新的版本是4.4.0,好,然后这里边可能大家引入之后,会有一些跟当前你自己ma问版本可能会有不匹配的情况啊,那到时候你可能需要去调整一下,升级一下自己的ma问版本或者说。
05:28
你改一个这里边插件的一个低版本也也是可以的啊,以一切以我们能够正确的运行程序为为准,大家可以下去之后自己把这个做一个调整,那这里边大家看我这里边声明了绑定到compel阶段对吧?啊,所以这是一个编译的插件,然后下边还有一个插件叫做maven assembly plug,那这个插件是用来干什么呢?那大家知道assembly嘛,组装呃,它是用来打包的对吧?所以这里边我们引入这个插件之后,就可以按照我们的需求去定义把我们呃最后生成的这个项目啊,打包成一个转包啊,那这里边还给了一个定义了一个后缀,大家看script rif啊,定义了一个叫做jar with dependence啊,就是我们加上这个后缀,表示我们当前打包的时候,把那个dependency啊,把这些依赖全打进去了啊,这个就是看大家具体的这个需求了,有时候你也可以不打依赖进去啊。当前我。
06:28
我们用的这个也是比较新的版本,3.3.0啊,这是我们引入的两个。插件我把这个也是copy过来。这个大家就是需要。就是本身了解它到底是用来干什么的,对吧,就是至于里边的一些细节,你可以不去做太多的深入啊,只要知道它是干什么就可以了,引入之后我们看一眼,在这边main projects下边啊,我们刷新一下。可以看到dependency下边flink scale和flink steming scale这两个我们都已经引入了,然后在当前的plugin下边,你可以看到这里有一个scalela这里啊,大家看到有一个这个skyla plug in,对吧?这是我们引入的这个编译插件,下边它有这个编译的环节,我们主要是在这个环节去用它,另外还有一个assembly这个插件,这是我们引入的ma assembly plug in,对吧?这主要是在打包环节要要去做这个操作,那后边我们做操作的时候呢,你直接用这里边life cycle里边的这些操作就可以了,对吧,我要编译点comp,要去打包点package啊,这是我们整个项目的一个前提啊,大家把这个先梳理好,然后接下来啊,当然就是说,如果说大家习惯于就是在这里边添加这个框架支持的话,那还是啊呃,你可以在这里边,假如说没有添加这个框架支持,你在这里把这个勾选上框架支持先引入,要不然的话,下面你没有办法去创建这个对应的SC文件嘛。
07:57
好,那接下来我们的主体文件。
08:00
原文件当然是要放在source may下边,那下边我们看到本身默认的这个source目录是Java对吧,然后有一个resource resources这个目录,那我们现在要写的是scla文件啊啊,所以说我可以去重新创建一个叫做scla的文件。需要把它在Mac director as source route,把它标记成当前的原文件目录,然后在下边就可以创建对应的scla class了,啊,这个大家其实都非常熟悉啊,啊,然后关于这个skyla大家知道一上来之后,我们如果要是创建一个可执行的程序的话,那其实我们不需要去创建类,对吧,我只要创建一个。一个单例对象对吧,一个object就可以了,所以这里边我们直接选SC的object把它创建出来,在这里我们带上包名啊,要com.at硅谷,然后我们当前要首先写一个world count程序,那所以这里边我们加上这个包名叫做WC吧,大家不要想歪想想歪啊,WC world count的缩写啊,World count好把它先创建出来,然后大家看到这个非常熟悉的skyla代码的风格啊,我们现在要写的是一个批处理的。
09:24
Word count这样的一个程序,然后在加代码里面,那首先进来之后肯定就是。命令方法对吧?哎,直接在里边去写我们想要定义的这些流程,那如果说这里边是一个批处理的话,那我们又会想到我们这里没数据呀,因为你如果要是批处理的话,数据不是源源不断来的,你应该是提前已经准备好对吧,那就数据都已经放在那儿,然后我统一把它这个加载进来,然后去做处理就完事了。所以在这个批处理的过程当中,我应该还需要有一个。
10:01
哎,是不是相当于数据文件啊啊所以这里边比方说我随便定义一个这个数据文件啊,我就叫做这个hello吧,hello.text定义一个文本文件啊,然后里边我随便写几行啊,啊,Hello word啊,大家知道一开始上来之后我们都要打招呼吗?Hello word hello flink啊,大家还可以这个hellola对吧?啊,这个随便写啊,啊也不光是hello,我们还可以how are you啊,还可以这个fine thankyou啊那那大家知道还有and you是吧?啊这个I'm fine to,大家随随便写啊,相信大家这个英文都很过关啊,这些肯定可以写一堆,呃,我们接下来就是要统计这个文件里边每一个词出现的次数,对吧?Word countt出现出现的这个频率,这个应用非常的经典,这也是大数据处理过程当中,大家可以认为这就跟hello word一样啊,上来之后我们首先就得讲这个过程啊,那我们看看大家可能已经。
11:02
就是在学习呃,这个拉的时候,已经写过类似于我的程序啊,对吧,用一些集合操作直接去做这个事情,然后我们在学习这个Spark的时候啊,啊,大家这个分布式系统,分布式架构怎么样去做处理调查里边的一些算子怎么样去实现,也已经有过这样的经验了,那在这些基础上大家会发现。Link里边做workout非常简单,真的非常简单啊,简单到什么程度啊,接下来大家看一遍就知道了啊,那首先我们这里边第一步,第一步首先要干什么呢?呃,就是要创建一个执行环境,因为我们现在,呃,大家回想一下Spark,呃,为什么我们要创建这个执行环境呢?因为你当前是分布式架构处理啊。对吧,这不像我们那个,你直接写一个单机程序,你之前学习scla的时候,我这里边,哎,就是当前单机这个程序,你直接把那个数据读进来,呃,读成一个这个list对吧,或者呃,读成一个这个,呃,其他的一个数据结构集合,集合类型,然后直接挨个去做这个转换,For for each啊对吧,一个for循环,或者说直接点这个for each,或者说直接用那个集合里边的点map操作啊,直接去做转换就完事了,现在不一样啊,现在是分布式环境,所以说我们必须要先创建一个,哎,类似于在这个Spark里边的上下文,对吧?呃,像这个SC一样的一个东西,在flink里边呢,就叫做执行环境啊,这里边创建一个批处理的执行环境,大家可以认为这一步就类似于SPA里边创建那个star context是一样的啊啊,那这里边我要定义定义一个这个执行环境就就叫做env啊。
12:50
Environment对吧?哎,我直接把它定义出来,怎么样定义呢?我需要引入一个叫做execution,大家看environment这样的一个东西。
13:01
啊,我引入这个对象,大家看它是这个scla里边的一个object对吧,一个呃,某个类的一个伴生对象啊,然后接下来我们调它的一个方法叫做get execution environment。大家注意引入的时候,他要的是阿帕奇f API scla下边的execution environment啊,因为这个大家一定要注意啊,就是在flink编程的时候,它有一个特点,因为它同时提供了Java和SKYLA2套API嘛,而且这个很不方便的一点,就是说我们在使用的过程当中呢,呃,就是你要要用这个skyla API的时候,又必须要把这个,呃,Java里边的那些东西都引入,因为它底层会调到Java那边去,那那在这个使用的过程当中,有很多类,它的名称其实是一样的。所以你在引入的时候一定要小心,诶,到底是用用的是那个Java代码里边的还是scla代码里的,对吧?我们这里边用到都是scla啊,所以大家注意上面是flink API scalela.execution environment,然后这里边需要给大家说的一个是还需要做一个什么样的操作呢?啊,就是我需要引入一个or RG点阿帕奇,点link API scalela下划线,我需要把这个引入啊,这个主要是把这个呃,Skyla这个包下边的对应的那些引定义的那些影视转换要引入。
14:30
你如果要是啊,不把这个引入的话,后边我们执行就会报错啊,我们先把这个放在这里啊,然后接下来啊,既然已经有了执行环境了,那我们需要啊,去定义当前的各种转换操作,首先先得读出数据来,对吧?啊,所以接下来我们是从文件中读取数据。啊,那我先首先定义一下当前的这个路径吧,对吧,Into the pass啊,这个非常简单啊,我把这个哈,直接copy一下当前它的这个全路径,因为我们做测试嘛,这个无所谓,对吧,我直接用当前的这个绝对路径放在这里,然后接下来那读取数据用什么呢?我直接定义一个当前是P处理,所以我定义一个这个data set,对吧?啊,那基于环境啊,就像我们在SPA处理的时候基于这个SC一样啊,我们现在基于当前的执行环境调一个方法叫做read。
15:31
Text file对吧,我们现在要读文件嘛,当前这个文本文件读text file里边大家看到传什么参数呢?诶它需要传的可以只传一个参数,就是一个string类型的file pass,那那不就把这个直接传进进去完事了吗?对吧?Input pass直接一传完事。这样就得到了我们想要的一个data set,哎,在这里大家看就是直观写的话,这个写的比较简单,我可以给大家,为了方便大家看清楚啊,呃,这个skyla方便的一点就是说,因为它有这个自动类型推断嘛,所以说你你有些地方你根本不用关心它这个到底类型是什么,但是一开始我们写的时候还是希望大家能搞清楚它类型到底是什么,对吧?所以这里边我可以把这个类型写出来,呃,它的类型本身是一个execution environment对吧?啊,就是当前我们这里边引入了这个execution environment啊,然后下边这里边这个就不用说了,当然是一个string了啊。
16:31
然后下边得到的这个input data set又是什么呢?它,呃,这个就是大家可以点到源码里边,你看一下啊,这里边大家看,如果要是你没有当前的这部分源码的话,你可以点击这个download source,把对应的这个源码下下来,对吧?或者说你也可以单独的把源码下下来之后,你去做一个关联啊,做一个attach,这个都是可以的,那我们这里边点进来之后,你可以看到就是当前的这个批处理执行环境,它的这个text file,最后返回的是一个data set string,对吧?啊,这是它的泛型,然后我们定义的整个的这个数据结构,大家看这类似于一个集合类型了是吧?呃,一个数据集嘛,Data set啊,然后里边的数据是一个string类型,所以当然我们这里边,你如果要写它的类型的话,应该也是。
17:25
也是这样的一个数据类型,对吧?啊,这就是当前的这个操作啊,然后接下来你既然已经读进来了,那当然就是基于data set,我们不是说批处理有data set API吗?你就基于data set API去做转换操作处理就完事了啊,所以接下来我们要做的操作就是。啊。对数据进行转换处理统计对吧,我们的具体操作其实就是先分词对吧,大家知道要把这个每一行的数据是不是要先打散开啊啊,那我们知道这显然是一个flat map嘛,然后打散之后我们诶在。
18:11
呃,按照这个word进行分组对吧,然后最后进行聚合统计,其实整整体的这个过程就是这样啊,跟之前大家熟悉的那个操作基本上一样,对吧?啊,那这里边我们快速的定义出来,这叫做result data set。啊,那当然得到的还是一个data set。这个data set,我们希望一个什么样的数据结构呢?我们希望最后得到应该是word count嘛,那就应该是一个word的一个count值,一个word一个countt值,所以我们希望得到一个二元组,对吧?啊,大家知道这个skyla里边的这个元组类型,所以我们直接在这里边定义出来当前的data赛的泛型就是一个scla的二元组类型,然后接下来就是基于input data set去做转换了,哎,那首先是一个Fla map,大家看有这个操作对吧?哎,那当年flag map里边传什么,大家看传一个方式嘛,那这跟我们之前这个已经了解到的这种这种方式几乎是一样的,那我们这里边的这个这个写法当然可以跟之前一模一样,对吧?按照空格去做一个SP,做一个分割。
19:29
当然现在还报错,为什么?因为你还没转换成二元组嘛,我这里边为了方便后边做统计,可以直接就把当前的这个每一个word啊,打伞分开的每一个word就先map成一个二元组,那这个二元组怎么去定义呢?就是前面的第一个元素,就是当前的word这个数。然后后边呢,直接默认给一个一。也就是说来一个当前的这个word,我就相当于统计它的值,现在是一对不对,那大家想后边我怎么样做,是不是根据第一个去做分组,然后根据第二个去做一个一个sum求和就完事了,好,所以接下来哎,大家自然想到,哎,那你接下来是不是group by key呢?哎,大家发现了啊,在这个data data set API里边没有group by k,但是它直接有一个group by对吧?啊,那这个group by里边又传什么呢?
20:25
大家看到这里边是要给一个,哎,大家看可以给一个int类型的表示字段位置的一个数字。所以说你如果给一个数字的话,这就表示什么?就是按照当前这个数字作为,呃,你当前这个元素数据的那个下标对吧,下标的第几个元素,然后根据它以它为K去做分组统计,那我们当前是以谁为K呢?当然是根据第一个元素了。根据第一个元素做分组统计,那我这里边直接给一个GROUP0,大家注意啊,GROUP0,因为我们的下边是按照零作为起始位置统计的,对吧,所以这里是以。
21:10
乙。D一个元素作为K进行分组啊,那后边分组之后怎么统计呢?那当然就是针对第二个元素进行求和统计对吧?那这里面有一个非常简单的求和操作,当然是sum了,那some大家看里边同样也是可以传一个field,这个field呢,可以传这个string类型,它的那个字段名称也可以直接传一个,这里边上上下文里边啊在这对吧。这里边也可以直接传一个int类型,表示它的那个下标的位置,所以这里边我们some应该some什么呢?SOME1对吧?啊这个大家要注意啊,有同学可能想哦,萨一,那就是把后面那个一都加起来嘛,不是这个意思啊,我们这SUM1的意思是对所有数据的,就当前分组的所有数据的对吧的第二个元素求和是做这个聚合操作。
22:19
啊,这就是一个完整的操作啊,啊,那最后我们可以把它做一个打印输出。Result data set,哎,这里边大家看它有一个方法直接就叫做print,那我们直接把它打印出来就完事了。好,那接下来我们来运行一下这段代码啊,看看效果怎么样。好,大家看到现在已经代码执行完毕,我们看看这个运行结果是什么样的,哎,这个非常符合我们的预期,对吧,大家大家看我们要的不就是这样的二元组嘛,然后我们看到统计出来当前这个数据里边,呃,这个哈比较多对吧,我们看到哈是三个啊,然后还有这个U比较多,因为U我们下边这里边好几个吧,有三个,然后别的呢,别的都只出现了一次,所以我们一个word一个count值,一个word一个count值,最后把它们每一个都统计出来,诶直接输出在当前的这个控制台了。
23:16
这就是批处理做这个word com的一个完整的过程啊,那当然大家看到了,这里边我们最后不需要做关闭操作,对吧?哎,直接这个运行完就运行完了,因为我们这里面大家想到这是一个有界级嘛,对吧?啊就是类似于我们处理的这个过程呢,其实就是你把这个数据一个一个都读,读取完,然后处理完,处理完就完了,就已经没没任何的操作了,所以当前任务就可以结束,直接退出了。所以当前是这样的一个场景啊,那大家看到这个对于批处理这个其实跟我们之前做Spark的这个操作是非常非常相似的啊,非常接近。
我来说两句