00:00
接下来我们来学习第二章的入门啊,就是看一看咱们的封装的数据模型stream该如何进行操作啊。呃,那么首先还是做一个world count的一个案例吧,那么这个案例他说了,使用net cat工具向我们四个九的端口不断的发送数据,然后由咱们SPA streaming呢读取这个端口的数据,并且统计不同单次出现的次数啊,那么首先我们第一个先先要增加依赖关系来吧。回过头把咱们这个泡文件咱们打开啊,咱们打开,打开以后来把这个课件中的依赖啊,大家可以看到,其实啊,就是在Spark的后面增加了一个streaming OK,在我们这个位置给它拿过来啊,放过来,放过来以后如果依赖关系不出现问题的情况下,那我们现在呢,就可以呢去创建我们新的程序了,咱们叫stream work count啊,这个呢,我们来创建,在这里点击new创建一个package啊,我们就叫做streaming,嗯,好,然后在这里我们点击new创建一个它,咱们叫做SPARK01,咱们叫做streaming啊,咱们叫做streaming,诶,我们叫做Spark streaming01吧,好不好,然后下划线,咱们就叫word count OK,然后在这里呢,我们写上一个men,那行,那现在呢,其实大家会发现我们又开始要全新的一套操作了,那么这里呢,我们写上todo,咱们要创建环境的对象啊,环境的对象环境。
01:29
对象OK,然后呢,我们写上todo啊,咱们写上咱们叫做关闭。关闭咱们的这个环境啊,关闭环境对象,关闭环境吧,然后中间呢,是我们的逻辑处理啊嘟do逻辑处理啊。好了,那我们现在呢,就一块儿来来看看我们该如何去完成这个操作,首先第一个要创建环境对象,那么我们回过头来,咱们先看一看咱们之前的图形,咱们的图形当中大家会发现这边有一个叫streaming contest。在我们SPA circle当中有一个叫做SPA session,是对我们RDD的那个环境做了一个封装,同样道理,这个叫streaming contest是对我们的SPA streaming当中那个RDD做了封装,所以咱们这里呢,来同学们看,咱们这里呢叫SC啊,因为呢,我们这里面涉及到一个streaming,留了一个操作,所以new咱们叫streaming contest啊,Streaming咱们叫contest,那么这么写完了以后,你会发现在小括号的下面是不是有红色波浪线,这种情况下一般是参数是有问题呢,所以鼠标放上去,放上去以后啊,这个我们提示一下,得了提示提示以后,大家会发现在咱们当前里面会有一些参数,其中有一个咱们比较熟悉这个东西,为什么说它比较熟悉呢?是因为咱们这里有个叫Spark contact,咱们也比较熟,但它需要两步操作,它得需要一个康复,再加上一个对象,而我们这里呢,大家可以发现,我们是不是只需要一个康复就够了,对不对,还有一个呢,叫做be。
03:00
Duration这什么意思呢?这个BAT就是批处理,Duration就是持续时间,说的简单点,就是你的批处理持续多长时间,那我们可以换一种说法叫采集周期,所以啊,咱们这里来啊,我们写上。咱们的streaming contest啊,这个我们创建时啊,创建时它需要啊,需要传递我们两个参数,那么我们的第一个参数,它表示环境配置,所以我们这里呢,来写上一个咱们叫做Spark com,它等于new new Spark,咱们的com,然后点一下啊这个地方都倒一下。然后呢,点咱们叫set,嗯,我们的master,然后呢,给它来一个我们的local,嗯,再加上一个星,然后点我们叫做set APP name,呃,给它写上一个咱们就叫Spark streaming就可以了啊好,写完之后这是我们的第一个,OK,来,然后再写第二个,第二个是什么东西呢?第二个咱们说一下第二个参数,它表示的是什么呢?是我们的批处理啊,批量处理。
04:09
的周期啊,咱们叫做duration,就是持续时间嘛,咱们称之为叫周期就行了啊,那这个怎么理解,什么叫批量处理的这个周期啊,咱们之前画图的时候,我们不说了嘛,你没有办法说啊,把咱们的这个时间变得很长,所以呢,我们会有一个V批次的概念,这个V体现的是时间,就是我们的时间范围短的意思啊,那么我们这里三秒钟诶,采集或数据,三秒钟采集回数据,这个三秒就是我们批量数据处理的一个什么周期,所以啊,我们有另外一个称呼,咱们称之为叫做什么呢?叫采集周期,咱们叫采集周期,就是说在三秒当中把数据采集过来,再过三秒又采集,再过三秒又采集啊,它会有个采集周期的概念,那这个咱们提示一下,提示一下他要传的参数是什么呢?它的传的参数呢叫duration,那这个duration怎么用啊,不知道,所以我点一下,点完以后呢,咱们看有叫duration点点完以后它是一个什么,它是一个样例类,那么你把这个duration给它拿。
05:10
拿过来之后,大家会发现它可以传一些参数对不对,哎,传一些参数进去,但是你这个传上啊,它的单位是毫秒,那么毫秒的话你还得计算对不对,有的时候不是很方便,所以在这种情况下,大家往下看它。同时它提供了这样的一些东西,叫做second minute,它把我们的那个秒啊分钟啊给它创建出来,然后在这里面呢,自动帮你构建duration,所以啊,这个东西咱们拷贝,拷贝以后拿过来,比方说我看我前面说了三秒钟,那我就写成个三。啊,写那个三就行了,你的这个地方你可千万注意了,同学们,你可别写三乘1000啊,你这样的话,那时间可太长了,因为你的这个呃,名称已经发生变化了嘛,之前叫duration,它是以毫秒为单位,现在你是second,已经以秒为单位了,那么你这就不能写错了,对不对?诶就这意思,好,那我现在的这个SSC现在已经有了,那么到最后逻辑处理现在还没有逻辑呢,那我们是不是直接关掉呢?所以咱们要关闭这个环境,所以SSC点我们叫做stop,咱们就这么写就可以了啊。
06:17
好,写完之后,别的先不管它,同学们,我们现在呢,先给它运行运行看结果吧。好,同学们,现在呢,结果已经完成了,对不对,哎,就是这样啊,那好,那么现在这块没有任何的问题了,那接下来我们该干嘛呢?按照咱们之前的分析啊,他就这么说的,他说了咱们现在要做的是这么一个事情,我们要监控啊,我们的端口数据啊,并统计单词出现的次数,那么现在我们就来了端口,那我写上一下啊,来,咱们写上叫获取端口数据,这啥意思呀?诶告诉大家这个端口啊,Socket它可以从客户端源源不断的发送数据,那我们可以监听到来获取数据对不对,所以在这种情况下,咱们SSC点它里面有一个叫socket tax stream啊,有一个叫做我们的网络的一个文本流对不对,那么这个socket test stream,那么里面需要传一个参数,这个参数呢,大家看到其实啊,就是我的主机名称和那个主机地址,那我现在呢,就在本机啊,咱们的local host,还有一个咱们叫四个九嘛。
07:26
好了,你写完以后点一下叫VAR回车,这个时候你拿到的就是我们端口当中不断传输的数据,这个数据它是一行一行的字符串,跟咱们前面差不多,所以我写上一个叫做lines啊就可以了,那么如果你是一行一行的这个字符串的话,那你可能会有多个单词呀,所以我们的Les呢,我要想把它扁平化成多个单词,所以我们点它会有一个什么呢?叫flat map。这个Fla map下划线点split,给它来一项个空格,那么这个时候呢,我们写上叫worse啊单词,那么你单词的话,咱们要想做聚合的话,咱们知道需要给他什么呢?哎,做一个我们的呃,格式的转换,让他带着K的键值,对会更方便一些,对不对?所以我们的was点我们叫map,然后写上一个括号啊,咱们写个下划线逗号一,那么这个时候呢,我们就写上了咱们叫做what啊,To咱们的one。
08:28
哎,就是这个意思,好了,那这么写完以后,那我们现在就想做聚合了呗,那所以word to one,那么有一个叫做什么呢?叫点,我们叫reduce by key,那这个时候下划线加下划线好点一下VR,这个时候就是word to咱们的count了。好,同学们,那我现在呢,就把它写完了,写完了以后,这不就是我们的word count吗?所以我们来写上点,我们叫做print来打印,把它打印在控制台上,就这么回事好了,那你现在这么写完以后,你会发现这个代码好像很熟悉呀。
09:04
我们很熟悉这个,咱们一会儿再说这个事情,现在咱们先演示一下咱们现在写的这个代码,能不能统计出我们的结果好不好,同学们好,那我们现在干嘛呢?按照它的要求,咱们要通过工具向这个端口不断的发送数据,那这里呢,我们来,然后呢,我写上咱们叫做我们的他,然后呢,来。咱们写上四个九,OK,回车,回车以后,那我现在呢,服务器的端口已经被什么占用了,准备要提供服务了,那我现在呢,在这边就准备给它启动啊,连接这四个九,你的四个九等于被我们的net k占用了,我就可以向它发送数据,那我这里呢,就可以接受数据,就是这种感觉啊。诶,大家会发现我现在这个地方呀,我打开了个socket,但是你会发现这边什么都没有做,直接就停了,有没有发现,诶这什么意思呀。
10:00
这说明有问题,什么问题呢?我现在一条数据我还没发呢,我还没往端口发呢,你这边关了那能行吗?那不行,所以啊,咱们前面的这个地方图形当中有一句话叫做什么呢?长期运行的任务就说明啊,你的接收器在启动之后是不能关的,那这个不能关体现在哪呢?体现在环境上,所以咱们这个stop其实是不能关的啊,所以我们这里说一下来。哎,我们说一下,由于。我们的Spark啊,Streaming它的那个采集器,它是咱们叫长期执行的任务,所以呢,它不能够直接关闭,别上来就把它关了,那你这个关闭就出了问题了,对吧?诶这是这个还有一个,如果你要是不能直接关的话,但有个问题,什么问题呢?咱们这个main方法执行完了,这个应用程序是不是就结束了呢?所以这也是有问题的,所以我们说一下啊来,如果。
11:01
如果咱们的main方法它执行完毕,那咱们的应用,咱们的应用,咱们的应用程序它也会啊自动结束,所以在这种情况下,我们是不能够让main方法执行完毕的,那所以它不能够啊,不能让我们main方法执行完毕,那这个我们该如何去做呢?对吧?我们不能让main方法执行完毕,那我们该如何去完成呢?首先我们这里要分两步来做啊,啊那么我们分哪两步呢?第一步咱们叫关闭,环境就不合适了,就不能关闭了。首先我们这里要做第一点,第一点是什么呢?我们要启动采集器,叫启动采集器,第二个叫等待啊,咱们叫等待采集器的行,也就意味着或叫等待它的这个关闭吧,什么意思呢?第一个我先让采集器给它执行了,所以它里面其实是有个叫start。那你star的话就会把采集器的任务呢,开始执行,这第一点第二点呢,你等待采集器的关闭,就意味着如果采集器不关,你不能停,就这意思,那这个我们怎么能做到呢?大家看它有一个SSCDR,我们叫做await啊,就是它awa termination啊,就是这个东西,那等待它的结束嘛,所以你把这两个方法给它加上之后,那么咱们这就不会停止了,咱们在这边就可以什么呢接收它的消息了,好,我们再来试一试啊,这个比较特殊了,所以咱们单独的再来操作一下运行,运行以后看结果,同学们。
12:37
好,同学们,在我们控制台上,大家会发现我们现在是不是已经有一些信息打印出来了,这个信息首先不说别的,有没有发现它,这是9000,然后呢,2000,然后再往下5000。你会发现每次是不是都差三个3000吧,3000毫秒不就是三秒钟吗?为什么?因为咱们这里告诉你了,采集周期是三秒,那么就意味着每三秒他会把数据采集过来做统计分析,但是我们现在啊,因为是没有任何数据过来嘛,但是呢,他还会照样采集,所以呢,我们现在给他发点东西啊,咱们来看一看,嗯,啊,我刚才是关掉了吗?
13:19
哦,刚刚关掉了是吧,那重新来啊。我们现在呢,给它运行,记住了,同学们,我现在给它来运行咱们的程序,每三秒它会计算一次,每三秒会计算一次,这要采集周期的概念,那现在呢,我们把这个打开,打开以后一旦这边出现了那个打印的信息,我们这边就可以呢去输入内容了,比方说咱们叫hello,哎,咱们的word好回车。回车以后,咱们观察一下,大家有没有发现你的hello word已经被得到了,那好我们再来,我们为了简单起见吧,咱们叫aaaaaaa,好了,回撤以后大家有没有发现你的这个A它输入了五次,五次的时候他会把这个时间段之内的数据全都给它汇总在一块儿做统计,但是你会发现到了这儿就不一样了,为什么?因为这个A跟上面的A它正好跨越了两个时间周期,对不对?所以它并不会在一块儿来统计,它是分着来统计的,所以啊,这就是我们给大家演示的一个最基本的流式数据处理当中的一个word count,大家体会一下,我是在咱们socket的位置啊,源源不断的输入数据,那咱们的这个控制台就会不断的去打印咱们的统计结果,对不对?诶,这就是我们最基本的一个word count。
14:36
好,同学们啊,这个咱们先说到这里。
我来说两句