00:00
刚才我们给大家演示了用Q的方式呢来创建RDD来动态获取数据,之前呢咱们演示了socket,那么其实你会发现采集数据的方式有很多种,可是我们这个霸史之命当中所提供的这个。采集器的种类并不多,那如果你想采集MYSQL的数据,你想采集h base数据,你想采集的数据,那么这种情况下可能就需要你自己来定义,咱们的采集器叫receiver的啊,就是这样,所以啊,这个咱们称之为叫自定义数据源,这个呢,我们给大家稍微的去演示一下啊,来咱们拷贝。我们写上个零三啊,咱们叫DIY是吧啊,自己动手丰衣足食啊,来点击OK,那好,我现在呢,把这个放过来,把这些东西呢,我们全都给它去掉,然后把这个呢,咱们也不要啊,这个我们的文字性描述都去掉吧,好了,那我现在想干嘛呢?我现在想自己写一个啊,咱们写上叫class,叫MY,它里面有要求什么呢?就是你要继承一个叫receiver,所以呢,拿过来咱们写个继承,咱们叫receiver,这个receiver当中,我们在这里呢,给它来啊,咱们导入一下。
01:14
然后呢,这个receiver啊,我们看一看,咱们点一下点点完以后你会发现它是一个抽象类,它里面有一个泛型,所以呢,我们在这里写上啊来,呃,咱们叫自定义,自定义啊,咱们叫自定义数据采集器,然后呢,这个地方写个一,我们应该去继承。咱们应该继承啊,咱们的receiver,然后呢,我们写上叫定义泛型,这个泛型什么意思呢?就意味着你现在的数据是什么样的一种类型,那咱们就是普通的字符串呗,所以写个string就可以了,但你光写string还不够,为什么呢?因为这个receiver是需要传参数的,这个参数啊,它是一个我们继承的复类参数,那么我们应该显示的去传递这个参数,对吧?所以加个括号,那么这个括号当中它有个叫storage level。
02:05
同学们还有没有印象,我们的storage level其实就是一个存储的级别,那么我们这里呢,可以直接来啊,咱们就写上叫我们叫做点,叫做memory only,咱们叫memory only放在内存当中对吧?哎,所以定义泛型,然后呢,我们叫做传递我们的参数,那么我们第二个,那还说啥呢?就是重写方法了呗,哎,重写方法这个重写方法比较简单了,来为什么说简单呢?因为只需要两个方法,一个叫onsar,一个叫onsto,就是当我们启动采集器的时候,它该词执行了,那如果关闭采集器,它就会onsto,对不对,点击OK啊好,那这里我该怎么写呢,对不对?诶咱们拿过来这里啊,首先第一点就是我们这个unsar呢,它表示的是我启动的时候我该怎么怎么办,那我启动的时候,其实对于我来讲的话,我们应该啊,有一个线程独立于我们当前的这个运行。
03:05
的线程,所以呢,我们写上一个new啊,咱们叫做thread ok.start哎,然后呢,干嘛呢,在这里写上一个new,咱们叫roundable啊。放过来以后,有一个run方法需要我来去什么呢?实现,那这个run方法干嘛呀,其实说白了就是不断的在产生数据嘛,那所以我们这里就写上了,咱们叫做well,然后呢,我给他一个true啊,这个处什么意思呢?就是源源不断的采集数据,但这个采集的这个数据啊,我觉得呀,你不能采集太快了是吧,太快的话会很多,所以呢,我们这里来,咱们写上个叫thad.sleep给他一个比方说500吧,500毫秒呢,我们去来暂停一下,然后呢,我们再去生成,再去生成,咱们这么来做。那这个地方我们就开始要生成一个数据了,那这个数据呢,我们就叫啊message吧,叫消息吧,那你生成什么数据啊,其实我们这里呢,可以来,我们叫random啊,咱们叫做嗯,OK,然后拿过来。
04:08
好,嗯,放到这边,然后点我们叫next in,给他一个这个值啊,咱们写个十。写完以后呢,给它来一个,咱们再加上一个,或者说直接来一个to string吧,咱们叫to string OK,很简单啊,就是把一个我们的字符串给它随机生成出来。生成出来之后,然后呢,我们再干嘛呢,把它操作一下,那我这里写上吧,啊来咱们写上咱们采集的数据为,诶这么写啊,那你这么写完,其实就是为了什么呢?演示一下啊,那么每500毫秒呢,会生成一个,每500毫秒会生成一个,就是这意思,可是你生成的数据你采集过来你干什么了。大家看一下你的三秒钟,把咱们的某一个周期之内的数据给它采集过来,你干什么了?你是不是要做封装,你封装以后是不是应该往下传递呀?所以咱们这边会有一个什么呢?封装的概念其实也就是存储的意思,在它的底层来做操作,所以同学们看,当我拿到一个消息之后,咱们这里有一个方法叫做什么呢?叫store,叫存储,你把message给我存进去啊,存进去,你存进去以后,它的底层自动再做封装,封装的级别其实就是这个叫memory only。
05:21
那这样的话不就OK了吗?那我现在呢,已经把这个我们的onsar已经做好了,就意味着我们现在可以完成咱们数据的一个生成了,那这个on stop该怎么办?那你肯定当停止的时候,它就不能再执行,对不对?所以这个处啊,它应该受到控制,所谓的受到控制呢,应该由外部来进行控制,比方说我们写上一个咱们叫做什么呢?叫做flag,叫标记,默认情况下它是个true啊,然后呢,你把这个flag,诶给他放到这里,对吧?然后呢,我们这里可以给他一个我们的,那我这里呢,写上一个咱们叫flag,诶放到这儿,然后呢,放过来干嘛呢?给他一个我们的false,当你等于false的时候,那么我们是不是这就等于false了,那么它根本就不再往里执行了吧,就跳出了run方法,跳出run方法这个线程不就结束了吗?所以啊,这样的话就连在一块儿了啊,这就是我们自定义的数据采集器,可是你自定义的数据采集器你。
06:22
该怎么用呢?同学们看,我们在这里来,我们叫SSC,它里面有一个方法,这个方法的名字叫receive stream啊,有一个这样的东西,然后呢,我们new把咱们的my receiver给他放过来,放过来以后点叫VAR回车,这个时候你拿到的就是我们得到的那个message对吧,就是他咱们叫DS叫stream。那我别的就不要了吧,我们这里直接点我们叫print是不是就可以了呢?好同学们,我们现在呢,自己定义了一个数据采集器,不断的去生成我们的字符串,然后呢,在我们这里不断的把它采集过来,一个我们的这个生成一个我们的采集,最后打印在控制台上,这不就OK了吗?啊好,我们运行一下,看看我们这边能不能够自动完成。
07:15
嗯,好,同学们看我现在的这个地方,是不是我们的数据已经出来了呀,对不对,诶我们的数据它已经出来了,就是这么个意思啊,所以呢,里面的数据呢,在不断的发生变化,不断的发生变化,那我们最终就可以采集过来,就是这个意思,好了啊,咱们就讲到这里啊,自定义的采集器。
我来说两句