00:00
接下来我们来讲一下第四章啊,咱们叫转换,那么这个转换呢,跟咱们之前学RD换是很类似的啊,就是我们RD呢,可以通过一些转换的子呢,把旧的RDD变成新的RDD对吧?那咱们的stream呢,也可以这样啊,通过一些我们的方法啊,将我们旧的stream呢转换成新的stream,其实含义上是一样的,那么这里面比较特殊,比较特殊在哪儿呢?它会有一个状态的概念,就是我们想转换没问题,但是会有个状态的概念,那这个状态是什么意思呢?举个例子。在生活当中我们经常会碰见呢,比方说我要登录,那么你每一次访问网站都登录就太麻烦了,所以一般呢,在登录的时候呢,它会提醒你啊,比方说两周之内是不需要登录的,对不对啊,无需登录,为什么?因为啊在你这一回登录的过程当中,它会把你的登录信息呢保存下来,这样的话呢,在下一回你访问的时候,它自动完成了这个登录的操作,你就不用管了,所以这个就是一种状态,他把你登录当前的状态呢保存下来,那么你下一回呢,自动根据这个状态来判断你的用户的操作,所以这种状态就是需要临时的保存下来,那我们这儿也是一样的,咱们在之前给大家讲解的过程当中,大家还记得吗?我们每一次的统计结果,其实他们之间是没有关系的,比方说咱们举个例子,咱们这里呢,假设有一个嗨啊,咱们有一个嗨啊是这样,然后呢,这里呢,给它一个蓝色,然后呢,这个地方我们来啊,把这个变成我们的嗨,然后呢,我们再蓝啊,变成hello。
01:34
然后呢,这里呢,变成我们的嗨。然后呢,这个地方我们来给它画上我们的蓝色啊,就意味着在我的第一个采集周期当中,你统计的hello这个单词呀,只会出现一次,然后呢,当我们的时间推移,诶,向后采集的时候,你会发现嗨喽,就出现了两次,这个一和二是独立的,为什么?因为他们没有状态保留下来,诶就这个意思啊,就是无状态的一种转换操作,你想做world count没有问题,那么我们把单词呢,汇总之后求得它的数量,可是你上一个采集周期跟这个采周期就一点关系都没有,那所以你没有保存任何的信息下来的话,那肯定就是一种无状态操作,像咱们之前给大家演示的那些操作,比方说map呀,Fla map呀,包括reduce这些方法呢,其实都是无状态的,他们只是针对于当前这个采集周期来做统计分析,跟别的没有任何关系,就是一种无状态操作啊,但是在某些情况下会有点问题啊,什么意思呢?举个例子,同学们。
02:39
们比方说我现在想统计啊,从我们的什么,比方说从九点钟到12点之间的一个人流量,那这个时候啊,每时每刻都会有人,比方说进入到公园当中啊,或者说人流过来啊,那我都要统计数据啊,那你不能说我只统计三秒钟的这个人流量,对不对,那就不行了,所以在这种情况下,我们需要的是从九点到12点之间的所有的人流量,我全要保存下来,所以这就会保一种状态,那比方说咱们这儿吧,那么在最开始的时候,大家可以看到我们这个采集周期呢,有个嗨,那好了,那我现在需要干嘛呢?把它保存起来。
03:14
这个呢,我们给它来一个黄色啊黄色然后呢,在这里面呢,干嘛呢,我们要保存我们的嗨,这个嗨呢,就放到这儿了,为什么呢?因为它出现了一次嘛,诶就是这样啊,它出现了一次,那比方说咱们这个白色的date,那date出现了五次嘛,所以把这个呢,咱们放过来啊,所以来拷贝,拷贝过来以后,这个是个白色啊,咱们是个白色。然后呢,这里呢,我们的date,然后给它写上一个五啊,就是这样好了,那这个时候没有任何问题啊,然后呢,我们的箭头大家可以看到,就意味着这个采集周期呢,会把这个状态呢保存下来,就是这个采集周期的状态,那好了,那你保存下来以后,当我们的时间推移,它到了下一个采集周期的时候,这个时候呢,又会有相同的key了,比方说hello,你会发现这就有两个,那么两个的话,它应该跟什么,应该跟我们当前的这个黄色的这块区域呢,做一个汇总,所谓的汇总呢,就是更新呢,你要把这个HELLO1更新成我们新的一个数量,那么这是两个一加二,不就应该是三嘛,对不对,它应该更新成三啊,是这样的,同样道理,这边是白色的四个date,那么这边的data不应该是我们的九嘛,所以这样的话呢,我们就不断的去对这个黄色的区域呢,做一些数据的更新,所以呢,所谓的状态和无,就是无状态和有状态呢,其实就是看你是不是保存我们某一个采集周期的数据,如果保存就是。
04:35
有状态,如果不保存就是没有状态对吧,那好了,那么我们这个无状态咱们就不说了,咱们之前给大家演示的那些方法呢,其实都是无状态操作,那如果你想要有状态怎么办?这时候我们要用到什么呢?一些其他的方法,比方说同学们看往下走。完了之后有个叫这个东西叫update by key,这个呢,我们给大家演示一下啊,咱们来演示一下,那我现在呢,把这个呢,我们去掉啊,咱们去掉,咱们来拷贝,拷贝以后呢,我写上一个我们的五啊,这个咱们称之为叫state,嗯,好了,然后呢,我们点击OK,我们放过来啊。
05:11
好了,放过来以后,我把这些东西我们先全去掉啊,咱们怎么简单怎么来,所以呢,我们写上SSC,点我们叫socket,咱们叫socket啊,Tax stream,我们写上叫local host,然后写上四个九,OK,然后接下来我们写上啊,咱们就叫做就叫做数据吧,咱们叫dates,对吧,那也就叫数据了,然后呢,你的这个数据呢,我们date是我们点我们叫做flight map,其实啊,我们这个也不用走那么复杂,咱们直接来个map就行了,括号,然后下划线,逗号一,然后呢,接下来我们这边写上啊,咱们就叫做word to one OK,接下来呢,我们直接word to one点我们叫reduce by key,这个by key下划线加下划线,然后在这里呢,咱们叫word to,咱们的这个叫做count吧,嗯,好,哎,就这么写就行了,然后呢,Word to count点我们的print,诶就是这样好,我现在呢,把咱们最开始写的word count呢给大家演示了一下啊,就是这个操作。
06:11
那现在呢,我们在这里来,我们写上咱们叫做嗯,Net有一个杠LP啊,在我们Windows下面,咱们用这个叫杠LP,如果在我们的Linux下面就用那个杠LK就行了啊,咱们四个9OK回车,回车以后,现在呢,我们开始准备来运行。那我这里呢,咱们就可以稍微看一看了,来好,时间戳已经出来了,同学们看,那我现在干嘛呢?我就直接写啊,咱们多写几个,多写好了,多写以后大家有没有发现我有这么多的A,但是你会发现它会分成两个我们的采集周期来采集,一个是A3,一个是A6,那么总共应该是九个吧,对不对,所以呢,大家看一下,咱们123456789没问题,总数没问题,但是它是分成了两个采集周期的,就跟我刚才的这个图形是完全一样的,他没有这个状态,对不对,他没有这个状态啊,同学们这个是需要清楚的啊,当前是没有这个状态的,那好了,那现在没有状态的话,那肯定不行啊,所以咱们先把它停掉啊,所以啊,咱们这里说一下来,咱们说一下这个就叫做什么呢?叫做无状态啊,咱们的数据操作,那么它只对啊,咱们说一下,它只对我们叫当前的采集我们的周期内,周期内的数据呢,进行我们的处理啊,咱们叫。
07:31
进行来处理,就是这个意思,在某些啊,在我们某些我们场合下啊,场合下我们需要咱们来来需要我们叫保留,嗯,保留我们数据的统计状态啊,统计的结果其实就是一种状态实现啊,实现我们的数据的聚合啊,数据的汇总吧,咱们就别叫聚合了,咱们叫汇总,那这个该怎么办呢?其实啊,Map倒是无所谓,为什么呢?因为我们真正的map就是一种转换结构,对不对,咱们为了使用reduce by key,但其实我们的reduce key啊,这个就有问题,因为它直接出结果了。
08:05
他直接就把结果得到了,可是我想要的是跟那个缓冲区的那个数据呢,做一个汇总,对不对,所以这个方法就不能用了,记住啊,这个蕊就是bey就不能用了,那你不能用的话用什么呢?所以大家咱们拷贝啊,点点了以后它有一个叫update。咱们叫update state by key,那么这个update state其实就是更新状态的意思,它根据什么更新,根据那个key来更新,所以大家会发现在咱们这里面就有个hello,有个这就是那个key,根据key来对我的数据做更新,如果没有这个key会增加,如果有这个key就会update,就是这个意思。那么它里面是什么样的一个逻辑呢?这个逻辑大家会发现其实很简单,它表述的就是一个叫s seq in,再加上个option,所以啊,咱们这里呢,给它来啊,咱们这里来括号我们写上叫做s seq,然后写上一个叫做option,啊,然后呢,再返回一个option,所以返回一个option啊,所以这就是我们的一个逻辑,那你的这个我们啥意思呢?我们说过了,你前面的这个地方叫reduce key,你直接就聚合数据了,那么这不是我想要的,我想要的聚合呢,其实应该跟什么?诶跟那个缓冲区相关,就是我们之前给大家看到黄色的区域相关,对不对。
09:23
所以这个时候呢,我们就不能直接做聚合,我们应该想办法把你当前的数据拿到,把缓冲区拿到,所以啊,这里咱们就说一下来。Update by key啊它干嘛呢?就是我们写上啊来根据我们的key对我们的什么我们数据的状态进行更新啊进行更新,所以这时候呢,我们需要什么呢?更新嘛,那我们就需要获取两个东西啊,第一个呃,那么我们传递的这个参数中啊含有我们的两个值,那这两个值什么意思呢?第一个啊,第一个值它表示我们相同K的数据啊的value数据。
10:03
因为怎么聚合你还没告诉我,所以我就把这个value数据先拿到啊,这第一个值,第二个值,第二个值它表示的就是缓冲区的数据。啊,缓冲区相同K的数据啊,咱们叫相同K的那个value数据。那大家看以到第一个值就是你相同的那个K啊,它的value诶给他拿到了,拿到了以后,第二个值呢,表示的是缓冲区当中相同K的那个VALUE6值就是这个三呢,这个九啊就被拿到了,为什么呢?相同T嘛,相同的就把这个三拿到了,而你这个一和一就变成了二,所以啊,那么也就意味着我们这里呢,其实应该写上一个咱们叫我们写上啊,咱们冒号咱们叫SEQ,然后写上一个我们的咱们这个in,它就因为是什么呢?是因为是它,我们说把那个相同的T的value渠道吗?这个value不就是int嘛,对不对,然后这个是什么东西呢?这个叫option,这个option呢,其实也是我们的特啊,也是它,然后你返回它就够了,可是你返回它你什么都没有变,诶老师,那为什么这里呢,它表示的这种缓冲区的感觉呢,因为缓冲区是有可能有值,有可能没值的,对不对,还记得吗。
11:13
咱们的缓冲区,你第一回访问是不是就没值,你以后访问是不是就有值,所以它有可能有这个key,也可能没有这个,所以我们的option恰恰就体现了一个有值无值的概念,所以它其实就是那个缓冲区,那我叫上buffer可能会更好一些,对吧?那好了,那么我们现在要想统计一个结果,我们该怎么办?那么肯定是把我当前传进来的值跟我的buffer的值做一个什么我们的计算对不对?所以呢,我们来,我们写上啊,来咱们叫做new,咱们的count,它等于咱们的buffer点。点了以后叫get or else,然后呢,给他一个零,就是看你能不能取到,如果能取到的情况下我就取,取不到给个默认值,然后再加上SEQ,咱们叫点咱们的sum,诶这么写就可以了啊,好,你写完之后,那么你这个新的值我应该给它放到缓冲区当中,所以啊,我们可以这样来,咱们叫option,给他一个叫new count就可以了啊。
12:16
好了,你这么写了以后,大家会发现呢,我们其实就把我们的那个状态给它更新好了,所以来我们写上叫state,嗯,放过来放过来以后这个state呢,我们可以呢,来print一下。好了,那OK,那我这个方法呢,给大家稍微的演示完了,那我们现在呢,来跑一跑啊,那这个呢,我们给它重新来,嗯,OK,然后启动,启动以后我们开始运行,这个时候你来观察一下会出现什么情况,因为之前呢,它的这个数据很多,但是呢,你的这个状态没有的话,它是分着采集周期来做统计的,但是呢,当你用了我们的状态之后,你发现它不对,它哪不对呢?你往上看,同学们它的这个地方怎么说的呢。这能明白吗?叫checkpoint directory has not been set,那这句话什么意思?他说的很简单,而且很直接,就是告诉你有一个检查点的目录是没有被设定的,诶,怎么会有检查点的目录呢?这个咱们在之前给大家讲那个Spark括的时候就提到过检查点的概念,对不对?他会将数据呢,给它保存到分布式存储当中,这样的话更安全一些,对不对?那同样道理,大家想一想,咱们当前的这个地方不有一个黄色的区域吗?这个黄色的区域我们称之为叫缓冲区,对不对?可是你叫缓冲区,那你存哪儿啊?
13:38
的内存当中不安全,所以呢,它这里呢,干嘛呀,就可以放在我们的缓冲区当中,那这个缓冲区的路径有没有设定,所以呢,我们这里就说一下咱们SSCDR,咱们叫做什么呢?咱们叫checkpoint啊。然后写上咱们叫CP就可以了,在咱们的这个位置,大家可以看到我们这里呢,现在没有对吧,所以我们现在呢,可以来操作一下啊,它就有一个CP,好,那么我们重新执行一下,刚才报错了嘛,那么我们在这里呢,也说一下吧啊,咱们叫做使用啊,咱们有状态啊,有状态操作时。
14:13
啊,需要设定我们的检查点路径啊,这个是比较明确的了啊好,那我们现在来运行。运行以后啊,如果不再出现问题的话,我们左边的这个位置会出现一个CP的目录,然后呢,我们这边会不断的去打印一些信息啊,咱们来看一看。好了,现在已经可以了,可以了之后呢,接下来我就跟刚才一样,大家看一下啊,我们把这个先清空,清空以后我在这里呢,来我们写上aaaaaaaaaaa好了。大家可以看到我写完以后,这个A2是不是我们前一个采集周期的对不对,但是你会发现后面是不是就A11了,那我来确定一下啊,来一二三四五六七八九十十一吧,所以总共是11个,但是呢,你前面叫A2,你后面怎么了?
15:05
他会按理说应该九个吧,但是不是九个,他把那个二和九做了一个累加,累加之后更新起来,更新到检查点当中,而且检查点这边是没有关闭的,没有关闭的情况下,检查点不丢失,那么你会往下发现,每一次是不是都会出现这个A11啊,每一回都会出现,所以这就等于把状态保留下来,过一段时间诶更新一回,过一段时间更新一回,不就这个意思吗?这就叫有状态操作,好吧,同学们这个能不能理解啊,我相信还可以吧,对不对,就跟我们画图一样,你在这边给他了一个黄色的内容,那么它就是一个缓冲区,不断的把它里面的值做更新,不就是这个意思嘛,啊所以呢,我们这个呢,把这个代码呢,稍微的看一看,能够看明白就可以了,这个就是相同key的那个value的集合,这个就是相同key缓冲区当中的value,你把它们做个聚合,再给它返回,或者说更新到缓冲区当中,这。
16:05
就够了啊好了。
我来说两句