00:00
好,上节课我们已经介绍了flink的基本的一些特点,还有我们为什么要用flink,那接下来呢,我们不能仅仅浮于表面,那接下来就要实际上手去做操作,大家可以看到在文档里边,我们第二部分就是一个快速的上手,我们来写一个简单的world count的程序啊,大家在大数据领域,这个world count就跟hello word一样啊,所以我们还是写一个这样的代码啊。好,我们接下来先把idea启动,然后去创建一个微项目。接下来我们就是一个用LA写的ma项目啊。X的。ID com.at硅谷if ID叫flink。把它创建出来,呃,我们用这个来做整个理论学习部分的代码的项目啊,都放在这个项目里边。
01:09
然后大家会想到这个项目创建之后,第一步我们应该干什么啊,对,第一步是不是应该先把这个po文件里面的依赖,该引入的引入啊,呃,这个我们要引入什么依赖呢?那我们直接来看一下,大家知道既然是要用flink吗?里边是不是一定要去用到flink啊相关的一些依赖。那一般这炮文件会去认真的看吗?呃,这个我建议大家还是要还是要看一下啊,就是大家写项目的时候,都是老师把po文件给他直接一抄是吧,一一贴就完事了,这个我建议大家还是至少是过一遍,就是我们不一定非得去手敲这一个一个标签啊,但是至少要过一遍,知道我们引入的每个依赖是什么含义到底是什么,因为有时候其实有时候大家如果要是形成这个贴pop文件的习惯的话。
02:05
后面再做项目的时候,就会出现什么呢?就会出现对你根本不知道用了哪些东西,而且会出现什么呢?反正不管三七二十一能跑就行,所以会大量没用的东西都贴到里面去了,对吧,而且很容易就是出现各种冲突啊,各种各种各样的情况出现,所以大家还是要把这个东西做一些了解去管理起来的啊,大家看一下文档里边给出的依赖,我们主要要用什么呢?当然要用到flink,所以这里边引入的是flink scla2.11,这里边我们用的flink版本是1.7.2,当然这里边的s scla版本是2.11,对吧,就是先把这个flink GALA引入,另外还要引入flink streaming GALA2.11。版本都是一样,只要把这两个依赖顶住就可以了。这里边就直接把这个copy过来啊,Depends。
03:02
然后有了依赖之后,那么大家会想到还需要融入什么呢?哎,除了依赖之外,还有一个很重要的是在build环节里面是不是还应该引入一些插件啊,还有一些plug in,对吧?这里边我们引入两个重要的插件,一个是LA,它是干什么的呢?主要是用来做编译的,对吧?呃,这个把scla代码编译成对应的class文件,另外还有一个插件叫haven assemblyy,这个是啊,大家一看这就知道,这是用来打包的对不对,这是一个对打包打包的工具,把我们的那个相关的依赖可以都打进去,3PLUG。你把这两个也引入里边的,具体的一些配置,这个就不详细说了啊,大家大概知道是。但东西还比较多。好,我们把这个这两个东西堵住。
04:04
好,然后接下来我们就可以真正去把代码了啊,当然了就是在这个框,如果说大家这里面没有那个框架支持的话,还应该把对应的那个框架支持是要引入的,对不对,就是大家如果没有依赖支持的话,在这边需要去ADD freework啊,把那个skyla要引入。然后接下来我们的代码就会放在DOS下边,当然这里面大家看到这里边是这个dramava啊,那我们其实想要的是不是一个scla啊。你重建一个文件夹叫SC啊,那当然如果有同学想直接把它改名的话,也是可以的啊,然后把它指定成。Root对吧,这是我们常见的一些流程,大家把这个先准备工作处理完成,接下来就可以对,就可以写代码了,我们新建了一个这个。
05:09
直接去new一个scla class,呃,这里边我们要新建的是一个做一个这个word com的程序,那其实不需要去创建class,直接去创建一个对一个object单例对象就可以,那这里边我们带上包名com.at硅谷。WC这个包在WC这这个大家知道是什么意思啊,不要不要误解啊,Work count。先把它创建出来。然后接下来我们就可以去写真正的代码了,在在这个写代码之前,我们这个代码先要跑的是什么呢?我们先跑一个批处理的一个代码吧,对吧,先做一个批处理的,我所以。期处理程序。
06:02
如果是批处理的话,其实大家那就想到了,我们还缺一个东西呢,为什么呢。既然是数理,那是不是应该这一批数据应该是现成的,然后我一下全读进来,然后去做处理就可以了,那所以我们是不是还缺数据啊,这里边我们的数据就放在呃,Resource下边,新建一个text文件就可以,比如说这里边我们就叫。到点吧。这里面的数据我们就随便写了啊,Word count,那可hello word。Hellolink。Helloella,这个随便写啊,除了hello之外还可以how are you。Thankyou,是吧?啊,还还有and you是吧,这这个大家英文都学的很好,好,这个大家随便写几句就好了,我们把这个倒过来,然后接下来在word count里边,我们就可以读取这个数据,然后做处理了,接下来大家看一看,首先当然是肯定要有一个main函数,对吧?先把这个ma写出来,那我们要去做批处理,怎么样做呢?
07:23
Link里边程序的这个风格,首先是先要去创建一个执行环境。创建一个执行环境。所以一开始呢,我们先定义一个env,他要去从这个哪里去拿到当前的执行环境呢,大家看用CU对吧,从这里边把它引入,然后get这样把它引入。所以这个是不是大家跟Spark去做类比的话,是不是很接近啊,Spark是不是一上来之后也得去啊,这个拿到这SC对不对,也得拿到啊,就是上下文嘛,所以这里边也是类似的,我们先拿到创建一个执行环境,然后接下来是不是可以从文件中读取数据,对吧。
08:24
那我们的文件路径是什么呢?这里面定义一个input pass吧。把这个哈哈的这个路径我们考一下啊。然后接下来是应该定义一个,大家注意这里既然是P处理的话,那我们应应用的API就不是data stream API了,而是data set API。面我们提到flink里边是用流的思路,同时去做批处理,他在提供data stream API的同时,也提供了一套data set API啊,所以我们这里先给大家简单的用一下data set API啊,所以得到的应该是一个data set啊input data set怎么去读取呢?用en nv,然后可以直接大家看是不是可以read the text file啊,从一个文本文件里边去读取数据,呃,当然这里边我们可以直接传这个pass就可以了啊,把它传进来,这就读取出来了,那接下来就是去做我的处理了。
09:37
这个处理的过程大家也能想到,应该怎么去去count这个词呢?对,是不是应该。对,切分物距得到word。然后,然后是不是可以把它转换成一个可以计数的,呃,这样的一个比方说二元组,然后我们就可以group by,然后去上,呃,就是这样的一个过程啊,其实非常简单,所以然后再按。
10:13
Word做分组聚合。这里定义一个最终我们这个word data set。他就应该是一个什么东西呢?就是在input迭赛的基础上,这就要处理了,那大家有想到这是不是就可以调用各种各样的转换API啊,所以大家看最简单的一种方式,是不是可以先把它打散,呃,大家还记得我们做文看是不是都要把它打散啊?诶大家看Fla map是不是熟悉的东西出来了,直接做一个Fla map,这里边根据什么?诶,这里是不是先得把它做分词之后再打散啊,诶所以这里面我们是要下划线先split按照什么区分词呢。
11:09
当时我们是空格区分对吧?所以这里边我们直接按照空格把它先分开,然后做flat map把它打散,再接下来呢,接下来做什么呢?接下来可能还要做一个格式转换,对不对?因为接下来我们希望是根据当前的那个word要去做一个分组,然后再做聚合的,所以这里边呢,我们把它map成一个二元组,然后是不是就可以方便这个处理啊,二元组的话,那是不是word本身是要保留的,另外是不是可以再去对来一个,就有一个计数一个一,这样是不是就可以去count了,我们到时候是根据前边的这个word去做分组,是不是根据后边的这个做some就可以了。
12:04
接下来。直接可以不了。呃,大家知道在这个Spark里边,那是reduce by key,对不对?在这个呃,Link里边,你如果要用这个的话,那就只有reduce对吧,没有reduce by key,所以这里边我们要用的这个分组的方式是什么呢?是group by。这里面里边不是干什么。An是不是可以直接去传一个int类型啊,所以它可以直接指示前面我们这是一个二元组嘛,所以这里边是不是第一个就传一个T0就可以,大家看可以直接传一个这个整形的数值,对吧,Int类型的数值。那同样我可以5000,然后接下来就可以。点上一是不是就可以了。藏衣是不是就相当于是用第二个这个字段去进行丧啊?
13:06
啊,所以大家看它是这样的一个一个方式啊呃,当然这里面大家如果点进去看这个API的话啊,其实可以看到这个,这里面我们用的是这种方式,这里面传一个ink啊,表示它的一个字段field,还可以传什么呢?还可以传一个string是什么名字,这个字段是什么名字,你也可以传,另外还可以传什么呢?大家会看到是不是还可以传一个函数啊,就是一个K的提取器函数对不对?所以大家如果要是写那个下划线,有些同学会想到我这里边如果写下划线,点下划线的可不可以呢?那他是不是这样也可以啊,所以这个其实也是一样的啊,这里边你如果去直接点它的源码的话,那当然调用的这个位置是不是就调用到这里来了啊,它有不同的实线啊,这里稍微给大家说一说,这个我们这里边就是你直接用那个零也是可以的,接那个下划线方式也是可以的,这个好像更直观一些啊,就是我们就用这个来举例了,呃,这样就做完了,大家看是不是非常简单。
14:15
我这就做完了,我们的第一个link程序就做完了,当然最后如果我们想看结果的话。啊,是不是可以把它print出来啊,Word can't get set print来直接把它输出就可以了,接下来我们跑一下看看这个效果怎么样吧。大家看这里面执行直接报错了,他报了一个什么错呢?转换,诶对这里面涉及到这个影视转换类型的这个信息隐食转换啊,那这里面需要引入一个新的另外的一个包,什么包呢?就是org阿帕奇link API。改了。下划线,大家看这个我引入之后,它是不是直接就没有变灰,直接就引入了,对吧?啊调用这个就可以了。
15:06
同学可能看到你这么麻烦干什么呢?那下面是不是还还有一个这个环境是不是也在这个下面啊对,那我直接是不是下面这个不要了呀,只要有它就可以了,对不对,所以接下来我们再跑一下啊。看到这个已经程序已经跑起来了,但是现在还没有结果输出,我们看一下它的结果输出是什么样。诶,大家可以看到这个输出是什么样的呢。这是不是就是我们最后统计出来每一个词对应的个数统计的com数目啊,呃,那大家会看到这个U和HELLO2个是最多的,我们看一眼数据确实是这样对不对,各自都是出现了三次,这就是我们批处理的一个结果啊,他就是就是所有的数据都攒齐了一下输出,最后每一个词是不是都只有一个输出结果啊,这是这样的一个场景。
我来说两句