00:00
好,同学们,我们接下来给大家讲一下第六章啊,咱们叫优雅的关闭,那这句话是什么意思,什么叫优雅的关闭呢?对不对?那这里呢,我们首先要提到优雅的概念,这个优雅呢,我们这里说一下,咱们来写上咱们叫优雅,这个其实对应的就是强制啊,咱们就叫强制,所以呢,就意味着有一个叫优雅的关闭,那么这边就有一个叫强制关闭,那这个可能大家不是很明白什么叫强制关闭啊对不对,所以我们回过头来来把这个关掉,关掉以后我们拷贝,拷贝以后呢,我们写上一个零八啊,然后呢,我们写上一个叫close,嗯,咱们叫做close啊好,把这个呢来,然后呢点击OK放过来,首先啊,我们如果假设先不说啊,我们先不给大家去说明我们的这个概念的话,我们先问成这个问题,你们在学Java中的线程的时候,应该学过线程的关闭对不对啊,比方说线程的关闭啊,或者说怎么怎么着,那么我们在这里咱们举个例子吧,比方说在这儿啊来。嗯,在这吧,咱们在这儿啊,来咱们写上咱们叫做什么呢?叫做线程啊,来程的关闭,那么线程的关闭呢,有一个叫new啊thread,然后点startad,那么我们启动了对不对,你启动了以后,同学们看一下啊,我们写上一个叫thread,等于然后呢,这边来我们的thread OK start,那么接下来我们是不是应该thread啊,咱们的thread点我们stop呀,哎,应该这样,可是不对,为什么呢?因为在大家学习那个线程的时候,我们大家学的时候那个关闭呀,其实并不是用stop,是用什么呢?就是跳出run方法,我相信你们学的时候都是这么学的,就是一旦跳出那个run方法,其实线程就自动停止了,对不对。
01:44
所以啊,就这么个意思,那为什么是这样的呢?什么原因呢?为什么我们这里有个stop方法,我们却不用它呢?这个其实告诉大家,它就叫强制关闭,它就叫做强制关闭,那怎么理解呢?就意味着我不管你当前的业务逻辑,包括你的程序执行到什么位置,只要我调用stop方法,整个线程它就会停止。
02:08
那这样的话就会导致数据它有冲突,那数据就会出现安全问题,为什么呢?因为我运行到一半的逻辑呢,被你强制的停止了,我还没运行完,比方说咱们有些操作呢,它是一些啊非原子性的,对吧?诶比方说爱加加,它就是非原子性的,还有我们那个long,我们的那个double,他们是64位的,64位的数据操作呢,它要分两步来操作,这样的话容易中间被打断,打断的话就会出现问题,所以如果我们用这个stop就叫强制关闭,就会导致一些正常的数据出现问题,所以我们是不推荐使用的。但是话又说回来,真正的停止就应该是stop,对不对?哎,只是因为会出问题,我们才想了别的办法,不就是这样吗?那好,这个强制关闭这个事情呢,我们有个大概的了解,回过头来咱们再说我们的这个位置,咱们这个地方来拷贝啊,咱们来拷贝点我们写上一个啊,咱们点叫做print OK,那我现在。
03:08
那比方说我想关闭咱们的这个什么我们的大追命的采集器,你关闭采集器的情况下,那么这个时候呢,我们就可以中断我们数据的采集了。那一般什么情况下咱们才需要中断采集呢?比方说业务更新了,对吧,我们业务发生了改变,程序逻辑发生了改变,你是不是应该把之前的逻辑给它停掉,这个没有问题吧,同学们,这个我相信是没有问题的,那所以啊,当我们的程序的逻辑发生改变的话,我要重新部署程序了,那你肯定要停止嘛,但这个停止你怎么停,那你是不是?呃,我像我们那个进程强制停止呀,比方说Q啊,在我们的机器上Q一下,那这样的话强制停止,强制停止就意味着里面的数据我不管处于什么状态,我都会给他停掉,这样的话就会出现风险。
04:00
那该怎么办?所以这里面就会提到一个优雅的关闭啊,优雅的关闭,所以啊,咱们这里说一下,咱们叫优雅的啊,优雅的关闭,那你优雅的关闭怎么关呢?SSC点我们叫stop,这个stop当中有一个true,这表示的是关闭我们的Spark contest的环境,还有第二个参数就叫优雅的关闭啊,叫优雅的关闭,这就是这意思。所以啊,我们所谓的优雅关闭其实就是我们的第二个参数,那么你说他怎么会理解呢?很简单,就意味着当我关闭的时候,并不是强制关闭,而是让我们的计算节点不再接收新的数据,而是直接把当前的数据处理完毕之后再关闭,它是这么一个概念啊,就意味着我还要继续执行一点,把我当前的数据处理完了,我再给他停掉,这个就不是强制了,对不对,你要强制的话,这个数据可能就出问题了,怎么办?诶,我先等一等,把我的处理完了以后,我再给它关闭去,就这个意思啊,所以所谓的优雅关闭呢,就是说哎,来,呃,我们的计算节点,计算节点啊,它不在。
05:07
接收啊,我们新的数据,而是将我们现有的数据啊,它处理完毕啊,然后关闭,诶就这么个概念,好,那我这么写是不是就可以了呢?不行,因为如果你这么写的话,大家想一想,咱们之前给大家讲呢,这边有个叫启动,启动采集器,那么这边有个叫avi,叫等待,它要等待的话,它会阻塞当前运行的线程,就意味着在这儿执行不到。这个大家能不能不能明白,就意味着这个地方呢,它会block,咱们叫主摄呀,啊咱们称之为叫主摄,对不对,它会把当前的线程主摄下啊,咱们叫主摄,咱们叫闷线程。那你如果你阻塞我们的内线程的话,那这个代码肯定走不到,那你怎么关呢?你关不掉老师,那简单你放前面呗,好,那我放前面去,你把这个你放到前面去,你把它放前面去,不太靠谱,为什么呢?因为你刚刚启动采集器上来就关掉,啥逻辑还没做呢,不合适吧。
06:11
那老师,那我再往前放呢,你再往前放就更不靠谱了,对不对,你连启动都没启动,你上来就关掉,你更不合适了吧,所以在这种情况下,我们该怎么办,我们该怎么办?所以啊,这个时候我们同学们千万注意了啊,咱们千万注意了,我们需要在我们的启动之后,咱们应该创建一个线程去完成它的关闭,也就意味着不是说在当前的主线程中完成,因为你当前主线程中你放哪儿都不合适,所以啊,我们这里给大家说一下啊来,如果啊想要啊关闭我们的采集器,那么我们需要需要什么呢?创建咱们叫新的线程,为什么要创建新的线程呢?就要跟它独立开,所以咱们叫做new,叫做thread,然后呢,我们写上一个点我们的start OK,然后在这里呢,我们写上一个叫做new runable,嗯,OK,好了,那么我们有一个run方法,那么我们在run方法当中就应该干什么呀?诶,给它来放过来。
07:12
好,你放过来,这样的话就是OK的,可是你这个代码写到这儿,它能执行,肯定能执行,而且跟当前的主线程没有任何的关系,这个是对的,但是咱们都知道啊,在我们测试的时候,一旦一个线程启动,基本上它的延迟是没那么长的,就意味着它会马上执行它的run方法,那你马上就关了,是不是好像也没有接收多少条数据啊。可是我们说了,咱们的这个采集器想关闭的话,它应该是什么?在特定场景下关闭,而不是一启动就关闭,这个也不合适,那我该怎么办呀?所以啊,我们需要有一个状态,什么状态呢?就是在我们的主线程当中去启动的时候有个状态,而这个状态呢,我会判断我们需不需要关闭,这个状态不应该在它们两个线程之间,我们应该呢,可以由第三方的存储来做,比方说咱们举个例子,什么例子呢,来。
08:08
咱们比方说我们在那个买circle中啊,有一张table,有个table,这个table当中啊有一个数据啊,有一个肉一行数据,那这行数据呢,如果有数据了,那就说明我要关闭了,对不对,比方说这个table呢,就叫做什么呢?叫stop,咱们就叫stop Spark哎,就叫它,那么如果我添加一行数据,我就知道,哦,原来我要关掉了,那这个时候就有第三方我们来做操作,对不对,这是一种方式,还有呢,我们再来啊,咱们再来干嘛呢?比方说red啊,这个大家可能还没有学,对吧?Red red当中也可以存储我们的数据,比方说在我们的一个特殊的字符串类型啊,或者说什么数据当中啊,这个KV嘛,啊咱们这里KV,那么在这里呢,可以把这个状态呢,给它保存一下,或者呢,还有比方说嗯,我们的主keepper,那主keepper当中会有那个节点呢,比方说我们叫stop,他们的Spark,把这个节点看有没有,如果这个节点存在的情况下,我不断的去轮询,我看一看,哦,原来这个地方我是要删掉的,那我就知道了。
09:09
或者呢,还可以在HDFS上面,比方说我创建个节点,这个节点呢就叫stop Spark也行,反正其实你怎么做的目的都是希望什么呢?我们能够通过第三方的一种处理机制来判断它的状态,那如果你把这个状态放在这里面的话,那么我们还需要在我们的什么处理过程当中要考虑内存的操作,所以这样的话感觉并不是很好啊,我们应该呢,通过第三方,因为你在这里面去有状态的话,那么你如何通过外部把这个内存改了呢?不是很方便啊,所以我们如果想要关闭采集器的话,需要创建新的线程,而且它需要啊,在我们的第三方,我们的啊第三方的程序中啊,我们要增加我们的关闭状态。我刚才说过了,如果你把这个状态你放在这个主线程当中,你怎么去控制它,这个不好控制的对不对?所以呢,我们需要在运行过程当中把这个状态找到,那我们一般都会用第三方的程序,而不是用主线程,也不是用这个线程啊,那我们第三方的程序,比方说my circle呀,Red呀,主per啊,SDS有很多种方案,那我们这里就别等那么复杂了好不好,同学们,我们这里干嘛,我就给它固定写死了,咱们就写上一个什么呢?Well to,哎,咱们就这么,哎,这是不有点麻烦呀?呃,咱们的逻辑是什么样子呢?大家看啊,是这样的逻辑,True,然后呢,我们在这里就是if务判断啊,判断判断什么呢?判断咱们的买circlel,比方说在这里啊,咱们写个if true,如果为true的情况下,就是如果我取到数据的话,那么就关掉,对不对,应该是这样子的,所以啊,我们应该这么来写啊,但是你这么写是就是当然我这么写是告诉大家这么个思啊,就是我们这里要读取买circle吧,Red啊,Per啊,HS应该是这样的啊,所以说我们这里就写了个true表示这个。
10:54
概念,但是其实还有个问题,如果你当前的环境已经关闭了怎么办,对不对,如果已经关闭怎么办?所以啊,除了我们的状态以外,还有它本身的环境的状态,所以啊,咱们在这里写上啊来获取我们叫做Spark streaming啊,咱们叫streaming它的这个状态,哎,它的状态那怎么取呢?SSC点我们就get state点一下VR飞车,这是我们当前的环境状态,如果它处于活动状态的话,那你关闭没问题,它本来都已经关了,对不对?那么你再去做就不是很合适了,所以呢,拿过来我们写上点,这个点咱们叫active,如果你等于active的话,那么你把它关闭,这个是有道理的,对不对?那如果你现在本来就是停止状态,你再关就没有什么意义了嘛。所以啊,我们基本的代码应该这么写,当然我说这不对啊,这个应该是从这边取的。
11:50
所以啊,如果取不到,那取不到呢,还说啥呢,取不到那我就休眠一下呗,对不对,那我过点时间咱们再去取不就完了嘛,比方说五秒钟啊,或者说一分钟,这个都无所谓啊,就你这么去看一看就可以了,好,这个是我们的基础性的代码啊,来把这个注掉,那我们现在练习嘛,就不要这么做了,那练习咱们就演示一下,那怎么演示呢?同学们看啊,我们这么写,我们就写上叫做什么呢?Thread啊,点我们的sleep,然后呢,我们写上,哎,我们写上,咱们比方说五秒钟吧。
12:21
就是我们当运行五秒钟之后呢,我们自动的让它结束不就完事了吗?所以把这个拷贝拷贝过来以后,我放到这就可以了啊,记住这个是我为了测试方便让大家直接感受一下,而这个呢,其实应该是我们的真正的代码啊好,那我现在呢,我们试一试啊,前面的这个地方我们来操作一下。呃,现在呢,我们这里给它运行啊,记住啊,当我们启动五秒钟之后,它这边呢,会判断我们的这个状态,然后给它关闭,咱们看看什么样的一种效果好不好。看看啊,咱们这里诶有print没问题啊,行,稍等一下咱们看一看,嗯。
13:04
好了,现在已经出来了,同学们看一看,观察一下。好同学,有没有发现其实在我们这个位置,它就已经sleep被打断了,而且它会进行关闭,大家看一下这个红钮还有吗?没了,因为这个红钮如果停止的话,就意味着我们当前的这一块呢,就已经什么我们说出了问题了,对不对?哎,就是这样,所以呢,我们出了问题之后,那么你会发现它是不是马上就停止了,不是,他是不是后面还打印了几行我们的内容啊,就说明它并不是马上结束,还是要再稍微等一等,这就叫优雅的关闭,就是这个意思啊,当然了,我们这个代码呢,除了我们现在这种写法以外,还得有再补充一点什么呢,就是你把咱们当前的环境停了,其实呀,应该还有一个,还有个什么东西呢?就是我们的system啊,咱们DR,咱们就退出啊,啊,就是我们的线程它也要停止,其实你这个线程停止啊,只当run方法执行完了,它本身也都会自动停,对不对,那我们说的是在V处的情况下,我们需要把它停掉,所以啊,我们有。
14:09
这么几步操作,这个大家了解一下啊,好,这是我们给大家演示的一个叫优雅的关闭,咱们课件当中呢,给大家写的稍微的有点不同啊,咱们来看看啊,他怎么说的,咱们的流式任务呢,需要七乘24小时执行,但是有的时候呢,会涉及到升级代码,需要主动停止程序,但是分布式程序没有办法做到一个一个进程去杀死,所以配置优雅的关闭就显得至关重要了啊,使用外部文件系统来控制内部程序的关闭啊,就说明我们得需要第三方的程序来控制了,那么这个程序当中啊,咱们来看看他怎么做的啊,来来来,同学们看,呃,在咱们的这个地方当中。在我们这边有一个叫做monitor stop,这个monitor stop干嘛呢?它是一个线程可运行的,然后呢,里里面有个wrong,这个wrong是怎么写的呢?它获取了咱们文件系统,就是我们的HDS,然后呢,从文件系统当中,它取得我们的一个路径,这个路径呢,就叫stop Spark,那就叫stop Spark就是它了。那么这个路径啊,我们判断是否存在,如果存在的情况下,我们在判断它的状态,如果它的状态是活动状态,激活状态,那么咱们去给它关闭,就是这个意思啊,那么然后把我们当前的线程给它停掉,所以啊,这就是我们的最基本的优雅的关闭,跟我们刚才写的呢,其实大同小异,只不过它的细节上用的是HDS,对吧,我说了呀,My circle啊,主keepper这样的一些东西都是可以的,没问题啊。
我来说两句