00:00
来我们开启另外一部分内容,那就真正的要大量的上手做练习码代码啊,我们要调用flink里边的data STEM API去做流处理计算了啊,那对于这个flink流处理API呢,主要我们先做一个划分,做一个分类,那前面其实在讲解这个执行图的时候,我们已经知道了,Flink代码里边主要分成三大部分,对吧?啊,就是里边的操作,所有的这个算子操作分成三大部分,首先是source,然后是transfer,最后是S,就是读取数据输入,然后做转换计算,最后再做输出标准的三部曲。那在这个之前呢,有一个前提就是我们先得创建执行环境,对吧?啊,那所以一开始我们还是给大家先把这个执行环境再来完整的介绍一下,那这个执行环境大家会会想到,这还不简单吗?之前我们不就是直接啊引入这个execution environment,或者是流失处理的话,对吧,Stream execution environment。
01:00
然后直接点get就就完事了吗?那确实实际使用的时候这个是非常简单的,而我们最常见的这种方式呢,也就是直接如果是批处理的话,那用execution environment.get对吧?那如果是理由处理的话,用这个stream execution environment.get就可以了,这就是表示获取当前执行程序的上下文。这里大家要注意啊,这里边其实是涉及到不同的这个具体实现的底层实现的,为什么呢?因为大家想到其实我们在在这个你写这个代码在运行的过程当中,你说你的这个执行程序的上下文,在idea里边,我们的开发环境去执行,和你提交部署到我们真正的生产集群环境里面执行的时候,这个上下文能一样吗?肯定是不一样的,对吧,那这里边我们直接get它真正的这个底层get是做了一个什么操作呢。啊,这个大家注意啊,它其实是会自动帮我们判断你到底是本地环境还是一个远程的集群环境,对吧,生产环境,然后帮我们调用底层分别调用的是什么?一个是create local environment,如果我判断出来现在是这个开发环境的话,调这个。
02:16
里边传一个参数,这个参数就是默认的并行度对吧?啊,就是当然你也可以可以不给,大家知道,如果不给的话,默认就用现在的这个CPU核心数量啊,呃,然后的话,如果说你是集群开发环境的话,默认命名度那应该就不是这个CPU数量了,是要以这个配置文件里边的para,呃,paraism.default默认并行度为准,对吧?啊,那那所以说如果是集群生产环境的话,提交的时候我们这个执行执行环境其实应该是create了一个remote environment。啊,所以这个过程呢,大家看它其实里边要传什么参数,你得指定当前我们这个执行环境啊,它本身指定的应该是当前job manager的这个啊。
03:06
定位到job manager上对吧?当前的这个job就已经定位好了,所以我要传入job manager的host name,以及它的RPC通信的端口号,6123,对吧,大家还记得吧,默认的那个RPC的端口号,另外还有一个我们要运行的抓包,我也得指定好在这里边直接获取它当前的这个执行环境。啊,所以说我们平常调用的时候,大家会想,你如果要是真的直接这么写的话,这个就很麻烦了,难道说我们写一段代码,你在idea里边做测试的时候,你写这个,然后你要做部署提交啊,在集群环境里边运行的时候,你还要改一下代码吗?难道你这个改代码测试完了之后再改代码去部署,这个是非常呃,非常不靠谱或者说不推荐的一个行为,对吧?所以说我们当然应该就是flink帮我们把这个东西包起来,它会自动帮我们去做判断,我们只要统统一的做一个执行环境的get就可以了。
04:06
啊,它会根据我们当前的这个运行的方式啊,决定返回什么样的执行环境,这个就是最简单的方式,大家需要呃,稍微的了解一下吧。呃,那这个比较简单,我们就大概的这么一说,大家知道就可以了,然后接下来我们就给大家讲这个S了,那就是从集合里边要读取数据,读取这个原数据啊,那接下来我们还是直接在代码里边给大家做一个实现吧。呃,这里我就直接在skyla下边去新建一个object,呃,我接下来带上这个包名啊,com.at硅谷当前这个包名我们是测API,所以是API test,然后当前的这个名字我要测这个S啊,我直接叫source test,把它先创建出来,那方法定义出来哦,然后接下来首先我们是创建执行环境,对吧?诶,刚好就再过一遍啊。
05:03
创建执行环境,我们接下来就不给大家再测这个批处理了,所以之后我们一切都是流处理的这个data STEM的API的测试,那创建执行环境呢,当然创建的也是流处理的stream execution environment,然后点get,把它get进,然后接下来我们要做这个source的测试啊,啊,那这里边我们就有一个应用场景了,这个应用场景呢,是一个非常经典的例子,就是对于我们这个弗林而言,我们说它有一个快速、灵巧,实时性高的这个特点,那它有一个典型的应用场景,就是用在呃,工业,呃这个互联网,或者说我们叫这个物联网领域。采集传感器数据对吧?哎,那这里边我们就定义一个,比方说定义一个温度传感器。那我们在外面直接定义一个样例类对吧?这个大家呃,之前讲这个scla的时候肯定是已经学习过的啊,样例类的好处就在于定义非常简单,然后它的构造器,我们直接把这个参数传进去,就就里边就有了对应的这些属性了,而且呢啊,它会自动给我们生成半生对象apply啊apply方法对吧,你可以做模式匹配,非常的方便,所以这里边我们直接把它定义出来,定义样例类啊,这主要就是传感器对吧,温温度传感器。
06:27
那一个温度传感器,大家想一想,它需要有什么样的数据呢?哎,我们把这个大家知道传感器叫sensor对吧?呃,我们把它的这个读数叫sensor reading啊,这跟文档里面的实现是一样的,那它里边首先我们得知道你是你是哪个温度传感器对吧?它到底在哪,你得先明确它的位置,那这个位置一般我们不好写,那你给它定义一个编号嘛,定一个ID不就完了吗?诶,所以这里边首先我们要有一个ID,这个我们给一个string类型的ID放在这里。
07:00
然后另外呢,你采集的这个温度需要有温度对吧,另外还需要有一个采集的时间,因为这个传感器是在不停的采集数据,然后不停的更新,你如果要是只有一个温度值的话,我们收集到之后不知道它到底什么时候发生的呀,所以里边要有一个时间戳time stamp长整型啊,然后另外还有一个temperature temperature double型温度值,所以我们先把这个样例类定义出来。然后后边接下来那就是从不同的地方哎去读取数据源了,对吧?呃,第一种情况我们给大家讲的是从集合中读取数据啊,这是第一种情况啊啊那啊大家可能会想到就是说这个从集合里边读取数据,那相当于怎么去读呢?那就是直接把那个集合写死放在这里对吧?呃,就是比方说我定义一个list,然后我直接把这个呃集合写子放在这儿就完事了,那比方说我可以定义一个呃,就是data list啊那那这个list我直接用skyla里边的集合类型啊,这个列表类似的类型里边呢,我直接包装成想要的这个样衣类类型sensor reading里边的数据,数据我就自己定义了,对吧?啊比方说这个SENSOR1啊,然后后边这个给一个时间戳啊,然后给一个这个温度值,具体我就。
08:30
全部详细写了啊,这大家可以直接参考一下文档里边的这个定义,比方说你看三四一六七十,然后给了这么几行数据,我直接把它copy过来就完事了。Copy过来啊。诶,这个没有考比上。Copy一下。好呃,先把它拿过来,然后接下来我们后边再去读取数据的时候呢,那有一个方法,大家知道数据源这步操作一定是基于环境去读的,对吧?之前我们这个呃,从ET文本流里面读的时候也是env点去调用,然后我们现在呢,也是基于它去调用,然后这里我们就直接env,然后有一个方法叫from collection,对吧,从集合里边去读取啊,那这里边直接就把这个当前大家看它需要传的是一个,呃这个迭代器的类型,类型或者是一个序列啊,一个S类型,我们的这个这个是比较上层的,呃,SKY里边的集合类型,对吧,大家知道集合SKY里边集合类型这个呃,S序列,然后set和map map嘛,这样去划分的,那list当然它就是一个S了,那所以我直接把它传进去,这个是没有问题的啊。
09:46
然后大家看这里边我们可以呃写写错了啊,Data list对吧,直接把这个传进去,这样就可以啊,把数据读读进去,然后我们直接把它做一个打印输出,看看它的这个数据,哎,是不是真的读进去了是吧?啊那另外大家不要忘记流式处理程序必须要执行对吧?不要忘记这一这一条啊,这里边我们是这个source test,好,那接下来我们可以简单的执行一下,大家看看效果。
10:19
注意啊,这里边我们必须后边如果你做了这个,呃,调用这个data stream API做一些操作的话,尽管这里边我没有涉及到transform的转换啊,那你至少是把它读进来,然后还打印了啊,所以这里边必须要引入一个type information的饮食转换,那这个定义呢,我们当时给大家说过,必须要把这个下划线引入对吧?啊,所以你看这个引入进来之后,它本来是亮着的啊,然后再执行一下。我们把这个代码已经执行起来,大家看一看它的执行结果是什么样,诶大家看这里边就执行全部读进来了,对吧?然后大家看到,因为我们当前是个什么呀,是个有界数据对不对?呃,就是我们说这是个有界流,那大家想有借流是不是相当于批处理了呀,有点类似于批处理对不对,它是用流处理的方式,相当于处理了一个有介集,然后处理完成之后,它没有一直挂起,在这里等待数据输入,直接就退出了,因为你当前数据都已经处理完成了嘛,我当前要的数据就这么些,所以直接就就完成就退出了。
11:23
然后这里边还有一个特点,大家会发现我们读取数据的时候,按道理应该是一六七十依次输入对吧,但是我们会发现这里边输出的时候,十六七一这个顺序完全乱的,为什么是乱的呢?啊,这个也很好理解,因为我们现在有并行对吧,我这边没设并行度,开发环境里边默认是CPU核心数是四,所以现在我四个数据当然就是按照不同的这个,呃,就是并行度啊,按照不同的这个呃,任务出去直接读入,然后去直接输出了,所以最后你看到它的这个顺序是不一样的,有可能会出现乱序的情况啊这就是关于我们这个从集合里边读取数据的方法,当然刚才在这个给大家调这个env的from的这个方法的时候,大家会发现啊,还有一个from element对不对。
12:17
啊,那这个from elements这个就就更简单了啊啊,这个大家可以认为就是你做测试的时候,这个几乎就是一个大招了,为什么呢?因为这里边的数据类型不限啊,因为大家知道就是在scla里边所有的数据类型,就是你你随便给一个值类型,那都是一个any value对吧?啊,就是最最上层副类都是一个any类型,所以说这里边你给什么类型都行啊,那这里边比方说我给一个这个1.0对吧?呃,然后后边给一个给一个35,然后后边我再给一个hello,那这个是完全没问题的,你可以直接这个from elements去把它读出来,然后你给一个stream就等于它对吧,然后你把它打印输出,你就会发现,就依次把它打印输出就完了啊,所以有时候做测试的时候也会用到这个,呃,Elements啊,但是会比较少对吧,你为什么要那么膈应的非要定义我们的数据是不同的数据类型呢?一般情况我们进来的数据都是同样的类型,对吧?甚至我们都是做过ETL之后的那个那个效果了。
13:17
啊,所以一般我们用的是这个from collection啊呃,然后另外还有一个比较类似的,我们给大家讲这个第二个是从文件中读取数据啊,那那这个其实就跟这个集合定义好的集合非常类似,只不过现在是不是写死在集合里边,而是写死在文件里了,那首先我们需要去创建一个文件喽啊,你也可以用之前的那个哈,直接测啊,我们现在因为后边有可能会经常用到这个文件,所以我把当前的这个sensor reading这个数据给大家提取出来,我定义一个叫做sensor.txt这样的一个文件啊,那同样我把这个直接copy过来啊,这数据copy过来,然后copy过来之后就不要有这个,就是奇奇怪怪的这些包装的样例类的类型啊,对吧,大家知道文件里边这个数据当然就只有它的这个字段,比方说我们这个是,其实这就相当于一个CSV文件啊,只以逗号分割。
14:17
然后有三个字段分别表示它的ID,后边是时间戳,还有一个是当前的温度值,哎,我就直接把它放在这里。然后接下来我如果要测的话,读取读取数据对吧?哎,那我首先还是定义一下我当前的input pass,当前的这个路径copy一下copy pass。好,然后接下来STREAM2,那当然就是env,哎,有另外一个方法,大家就会想到,肯定就是直接读文件对不对,Read text file,所以接下来这里边传的就是一个string类型的一个路径了,Input pass传进去完事了啊,所以我们也可以看一下,就是把这个STREAM2打印输出又是一个什么样的效果。
15:05
好,运行一下。好,大家看到现在已经执行结束啊,你最后会看到这里边我们输出的结果,哎,是。还是这几条顺序还是乱的,对吧,跟刚才的顺序又不一样了哈,这个顺序又不一样了,而且呢,我们发现前面这个也不是1234完全按照这个来了,那家说这又是为什么呢。哎,这是因为你之前是这个三四一三四六三四七三四十,呃就是按照这个我们分别去给它做这个数据的传输,对吧?做做这个呃轮群的输出,而我们这里边读取这个文件数据的时候呢,诶这个就没准了,呃就就有可能未必是按照我们这里边完整的这个顺序去做的这个读取了啊,它这里边有这个并行读取的话,就会出现这种两个两行数据有可能是在同一个到上去执行的。啊,这这是有可能会出现这种情形的啊好,所以这是关于我们从文件里边读取数据的这种情形,我们就看到了,它其实都是一个不管是从集合还是从文件里面读,都是一个有界流,最后执行完了之后就直接退出了。
16:17
这是这种场景。
我来说两句