00:00
那么上午呢,我们已经完成到了这一步,对吧?完成到哪一步呢?就是我们的这一个Spark worker,对,它可以向master完成一个注册。那么服务器这边呢,可以把这个master,呃,这个worker呢,呃,保存到它的map之中,好,这是第一个功能,我们是上午讲完的,那下面呢,我们来完成它的第二一个功能啊第二个功能呢,就是worker,诶worker。Worker干什么呢?定时的发送心跳。呃,因为这个worker在工作的过程中呢,就是我们上午画的这个图啊,就是他在这个工作的过程中,呃,也有可能因为网络的原因,或者是因为呃,Worker本身它运行的原因导致。瘫痪或者宕机,那么我们作为服务器这边的这个管理者呢,应该知道哪一台worker被宕机了啊,甚至我们也可以把这个呃,宕机的这个模块进行再次的管理,那我这里呢,就实现这么一个效果什么呢?
01:07
呃,Worker,哎,这个worker它可以干什么呢?它可以定时的发送心跳给master,那master接收到以后呢,要更新worker上一次的心跳时间。那我们来画一个示意图,还是在这里写就可以了。怎么做的呢?诶,同学们想一想,应该就在这里加逻辑。加一个逻辑,加什么逻辑呢,就是我们这边。啊,就是我们这个worker这边呢,它会,诶就是相当于说定时定时发送心跳。啊,定时发送心跳,那在这里肯定有一个发送,诶干嘛发送这个心跳。的这个信息啊,发送信息,发送心跳信息,那每隔多长时间呢?假如我们确定为三秒。
02:02
但这个时间呢,是咱们可以设置的。对,那他发送这个信息的时间呢,仍然哈,根据我们前期的这个业务逻辑,那么他这个发送信息呢,呃,发送心跳的信息呢,也是通过我们这个网络传递过去的,比如说这根线。啊,比如说这根线代表发送一个心跳信息。那么它在发送这个心跳过后呢,我们这边肯定就能得到,所以说这里面呢,应该有一个处理啊,就是如果我们这这个地方干什么呢?比如说接收到了啊,啊处理这个心跳。心跳这个时间的一个更新任务,那干什么呢?他这里接收到了,对接收到心跳。心跳的这个信息过后,我们要做什么事情呢?我们就取出,取出对该这个worker。
03:00
取出这个艾克从哪里呢?从我们的这个map中啊,就是从。从map中从这个呃哈希map中取出,取出这个worker。干什么呢,更新。对,要更新。更新他的这个最近心跳时间。更新worker的最近就是上一次吧,上一次啊,那个心跳时间。心跳时间。好,这是我们要要完成的这两件事情,也做了一些简单分析,那显然我们这边肯定要去定义新的这个什么呢?叫做协议,我们这里面肯定要定新的这个协议,这是自然的一个想法,对不对?好,那么我们就来按照这个顺序来走,现在首先要解决的一个问题是如何完成定时的发送心跳,显然这边应该要做一个定时器。对,就这里面我们要去解决一个定时干一件事情的任务,那么这里面就引出在开里面怎么去写一个定时器的问题。
04:09
对吧,定时器。定时器的问题,好,同学们,那么我们就来看一下这个业务逻辑是怎么完成的。打开幻灯片,我们来lawyer。呃,首先看一下这个代码大体的一个流程,这边是我们的worker,到时间呢,我会写这么一个东西,叫这个conduct system,这里面有个k skele,这面有四个参数,待会我解释一下,我先发一个send hard的bit,然后这个消息在这里,在这个地方它会首先一旦这个执行了过后呢,它每隔。你这写的这个三三百三千毫秒,那3000毫秒就等于等于三秒。而三秒干什么呢?去发一个胜的be hardbe这个这个消息,这个消息会触发kiss语句,然后呢,我用master pery发送一个heartbeat给谁呢?这这是两个信息啊,我这定了两个。
05:11
一个是这个。这个是发给服务器的。还有一个send harder bit呢,是触发我们刚才的这个行为的。所以说在这里呢,我需要定义,至少目前定义这么两个啊,一个是send heartbeat,一个是send heartbeat,一个是send heartbe的,一个是heartbe的,我分开的。那么这个send hardb呢,是一个呃样例类,这个是一个object,好我把这两个呢拿过来用一下,那么这边拿到以后呢,这边就进行一个更新就可以了,就拿到这个信息我们判断哦,如果你是什么,直接把它进行一个更新就可以了,好我们来玩一把代码非常的简单啊,首先我们在这个呃,Message里面我们增加两个。
06:02
啊,两个这个协议,这个协议是是什么呢?是这个worker。Worker每隔每隔一定时间时间,一定时间由由什么呢?由这个定时器,定时器发给发给自己的自己的一个消息。哎,我就这样定义的,因为只是表示一个出发,所以说用的是object,那下面这个是干什么的呢?这个是worker,哎,Worker发送就是每隔隔一定时间有定时器,定时器。定时器触发,而像什么呢?像服务器,就是我们的像像什么呢?像我这个master。哎,向master发送的,发送的一个协议消息。
07:03
这个消息要干什么呢?要带上自己的ID号,大家看到要带上自己的ID号,那么讲到这儿呢,大家想想这个work in for呢,我们要做一个改进了。因为到目前为止,我们这个working for里面是没有包含。记录上次心跳的时间的,因此呢,我要在这里加一个加一个信息啊,我写一个VR,那么就是一个last,比如说我写个last,上一次的heart heartbeat啊heartbeat就上一次的这个心跳的一个时间,那么我给他来一个闹这个类型。啊,让这个类型默认给他一个零吧,啊,默认给他一个零,当然你也可以这样写啊,啊,你要说不愿意给零的话,给一个默认值,或者是直接用我们这个,呃,当前的时间来做,对当前时间,那么要获取到当前时间也很简单,大家看这里有一个方法叫system current time,这个就可以拿到。
08:08
啊,这个就已经拿到,好,我们这也可以初始化,就用system.current这个啊初始化那后面就不停的更新,好这个思路有了过后呢,找到我们的worker就加这段代码就可以了。好,我们来看看在哪里加呢?好,刚才我们已经讲了,触发这个行为最好是在注册完毕以后。啊,就来做这个工作,这是最合理的,对吧,所以我这个注册成功了嘛,啊就是一些当注册成功后。注册成功,成功后就干什么呢?就触发啊,就就定义一个定时器。R定时器。定时器,然后呢,每隔啊,就是每隔一定时间啊,然后发送我们刚才写的那个叫sound hardbe。
09:05
这个消息给自己,Harder,这个beat消息给自己。啊给自己啊给自己好,那这个就简单了,刚才呢,我们这一句话呢,已经有了,我就这一块呢,我就直接拿来用一下就可以了啊拿来用一下,过过拿来用一下就行了,我然后呢,我做一个注释好,我这写一写,嗯,这边要需要引入一个包啊引个包。Import of scan。谁开了点哪一个呢。呃,是哪一个呢,叫做current啊,Conre点。啊,During点里面有我们时间单位说这样用了它就没问题了,这个地方呢,要引入对,要引入好,引完了过后,大家这句话呢,我要做一个解释啊同学们,下面我做一点简单的说明,大家看第一个参数。第一个参数这个零表示什么呢?代表立即执行这一个定时器,如果你这假如写了一一啊,一页1000,就代表一秒以后再触发,然后斜着零呢,就是立即出发。
10:13
啊,就是不用延时,不延时不延时啊这个。不延时啊,立即执行立即啊立即执行什么呢?我们这个定时器。第17第二个同学们可以看到这还有一个3000毫秒,这个表示什么意思呢?好,这个表示每隔每隔三秒啊,三秒执行一次,执行一次,所以每隔三秒发一次这个信息。由系统定时器来完成的,那么第三一个呢,同学们看到这个self代表发送给谁,代表发送给我自己。啊,就是就说自己话表示发给自己。啊发给自己,那么第四一个。
11:03
三的B的就是发送的内容啊,发送的内容没问题,好有了这样一个信息过后呢,下面大家都知道应该怎么写了,那我就如果我得到了这个信息对吧,如果我得到了这个信息,那我干什么事情呢?诶这个地方我一旦得到了过后我就向我的这一个啊服务器。发送那个这个协议,这个协议刚才我们已经讲了,是heartbeat heartbeat,就是ert heartbeat,这里面要带上自己的ID号。啊,因为我我你这边要知道是谁来进行更新,好这个就简单了,那我这就写了啊,那简单的很,就什么意思呢?Master。走。啊,发什么呢?就是刚才写的heart beat beat beat beat。
12:00
好,写到这里,那么这边我们要引入刚才写的这个,诶诶,这问题是不是写错了啊。叫hearterert少写了一个T,好,这样就对了,把这个拿过来,诶把这个拿过来,拿过来过后呢,我这写上写上把我的ID传进去,ID显然就是我们事先已经拿到这个ID啊,刚开始就有的,然后引入啊,这个东西就搞定啊搞定那这边呢,大家看这边还有一个问题,Master这写错了啊,Master proxy啊这就可以了。那么每这个时候呢,就每隔一定时间就发消息过去了,好,我们这边可以打一个信息啊,打一个信息就是worker,谁谁谁发送了信息,这样写吧,Worker。等于加上它的ID,写一句话吧,加上他说啊发送给谁呢?给服务器,给服务器发送。
13:00
啊,这个给吧,给master发。发送什么呢?发送这个心跳。心跳好这边拿到以后,我们显然就要处理这这一头了,那么这边他接收到这个心跳更新的,这个心跳更新这个怎么办呢?好办,那这个时候我们打开master,现在呢,我们要这里做一个行为,那就做处理了case。Case,如果我拿到了你的heart at heartbeat这个信息,当然这边带了ID啊,那么我一旦拿到了我怎么办呢?好的,我就更新,我就更新对应的这个worker的心跳时间。好,心跳时间,那首先我们既然要更新新跳时间,首先我们要拿到当前时间线段,第一步取出,从哪里取出呢?从这个master,诶大家看啊,从我们这一个哈希麦里边把它先取出来。
14:05
肝脏基本理论搜索已经在。啊格取出我们这个,呃,这个work in for。Walker inform。好,这么怎么取呢?非常简单,那就worker点。对,我们怎么从map里面取一个东西出来啊。是不是以前最简单的方式直接写就可以了是吧?哎,IDVR我取出来就work for肯定能取的出来。那一旦取出来过后呢,我们现在再获取一下当前时间啊,把当前时间拿出来给他附进去就可以了,怎么写呢,Work in for点。啊,Last等于我们当前的这个系统的时间点就是current meaning就可以了,然后呢,这就更新了嘛。这就更新了,那更新我们这输出一句话。啊,就说服务器或者master更新了,拿了谁谁谁的这个心跳master更新了,更新了谁呢,这个把它ID打出来。
15:10
啊,把ID打出来,就是叫这个ID的心跳时间。心跳。啊,心跳时间哦了。好,我们看看这个逻辑对不对啊,大家再想一想,首先我们看这头,呃,它一旦注册成功,一旦注册成功,我们就触发了一个定时器,每隔三秒让他去发一个heartbeat。Heartbeat,那么heartbeat在master这边拿到过后呢,我如果拿到过后,我就把这个对应的。这个work for取出来,然后把它最上一次心跳时间进行一个更新,并说出了一句话。啊,好的同学们,我们现在呢,来玩一把。看看这段代码能不能正确的运行,先运行我们的这一个master。
16:00
先运行我们的master。好,运行一下。好的。那么运行这个,这时呢,我们发现它已经提示了服务器已经启动,没有问题,现在呢,我们来启动一个worker。再启动一个worker,糟。好,启动以后呢,这边他说我发送了一个心跳,这是一次。好的,第二次。第二次。好,你看现在每隔三秒呢,它就发送一次心跳给master,那么我们master这边有没有得到呢?诶,我们发现master它在对应的时间也更新了这个心跳。啊,说明这个这条任务就已经打通了啊,这条任务咱们就已经打通了,什么打通了就是呃,刚才说的这个定时发送心跳这个任务就打通了,好,我把这个代码呢给大家整理一下啊好呃,先让他心跳一会儿吧,我们这。
17:01
把这个代码给它简单的整理一下,打开笔记。那下面呢,这个心跳就有就有用了哈,往下走。好,我们把第二一个功能给大家。啊,证明一下,这是第一个功能,咱们说完了。第二个功能就是发送心跳。把对应的概代码呢给大家整理一下,这来好,刚才我们完成的第二个功能就是worker worker定时的发送心跳。好,这是刚才的一个执行的流程,给大家来一个标题三。标题三。好,那么这边是标题上,我们这边做的一个简单的说明,看看有没有,诶简单说明有没有把这个简单说明也说一下,就是这这样一个任务。好的。给他来两个标号啊,这是一个说明,这是一个思路的分析,那么思路分析呢,然后就是代码实现,把这个思路分析我们以图形的方式给大家画出来了。
18:11
好的,就截取这一段,同学们就截取这段,哪一段呢?就这里,它定时发送心跳,这边拿到消息过后,从map里面取出worker,更新上一次的心跳时间,这是一个对应关系。好,我把它呢放到这里,那具体代码来说,我们改改到哪个地方呢?好我们看一下,首先我们找到,首先我们找到这个。协议这个位置我们先做了一个改进,就是我们加了两条。两个协议同时呢,对它做了一个改进啊,这样同学们可以看得到的好,可以看得到的好,我把这个呢给大家拉过来,第一个地方。对,第一个地方我们是这样子啊,修改了或者在哪里呢,在message。
19:02
好在message,然后呢,Protocol protocol.scan文件中中我们增加了啊,增加了对应的协议是不是好,这是没问题的,紧接着我们又做了什么工作呢?我们在master这边又继续做了工作了,好。在这里我们继续写啊,就是我们说的第二部分。啊,第二部分我把这个代码拿过来是哪里呢?是worker worker,我们准确的讲就是动了这点代码。对吧,就是动了这一点代码,其他没有改进,因此呢,我把这一小节拿过来就是更新了,更新到哪个地方呢?就是我们的Spark。我靠。worker.scan好,具体来说,我们在这里增加了这么一段代码,诶,我就不截全了,这是我们更新的代码,另外呢,增加了一个偏函数啊,增加了一个偏函数,对它进行一个处理。
20:04
好的,呃,这个做完以后呢,我们动了动了这个master这边,Master这边准确的讲,其实就是增加了一个处理的机制,就是你的心跳来了,我得更新。那是哪里呢?啊,我们把这个也给它整理到了这里,我们这是更新了啊,更新了,更新了什么呢?就是我们的这个Spark。master.scan好的代码,具体来说就这么一段代码,好,同学们,关于我们第二个功能,我们就先给大家讲到这里。
我来说两句