00:01
好呃,那我们就接着这个昨天的啊,继续往后推进啊呃,昨天我们已经就是全部都测试通了啊,我能够正常的把数据呢采集到我的这个卡不卡中了,那接下来我们就来做这个业务数据的。消费和这个分流操作OK吧,嗯,行,那我们回到这个代码中啊,回到我们的这个开发工具中啊,我们再去写一个APP啊过来,呃,过来以后就在这个上面啊,我们再来写一个。好,又上一个盖类啊,这个叫OS base DB APP对吧,上一个我们叫什么baselo啊,这个我们叫什么base DB啊。好,呃,然后呢,先写注释啊。这个是我们业务数据的分流操作啊,业务数据消费分流吧,分流对吧,我把你的业务数据消费出来,然后呢,去做一个分流啊,然后啊,我们结合上我们这个就是base log的这个经验啊,我们是不是可以稍稍的把这个你要做什么事给他这个梳理一下的呀,对吧,其实像我们这个base logg的这个,我们应该要重新梳理一下的,因为我们后面是不是加了很多东西呀。
01:19
对吧,这个大家自己去梳理一下啊来,那我们这个base DB里面我们需要做什么事,大家一起来跟着我分析一下。首先第一件事情啊,毋庸置疑,肯定还是什么准备你的环境了吧,对吧,准备这个实施环境啊,实施环境好,然后呢,你准备好这个实施环境以后,接下来。你再想好啊,来,我把这个图拿过来啊,今天还得什么反复去说这个图啊,那我干脆早一点拿过来吧。对吧,把它这个定个图啊,我就放到这,放到这以后我这个好操作啊,来大家来跟着我看啊,那你来看我做这个消费分流的话,我是不是也是从你的这个卡夫卡中来去读数据的呀。
02:03
对吧?那么既然你也是从卡夫卡读数据的,那我们就会涉及到你的重复消费或者是漏消费这个问题。对不对,好,那你既然有这个问题的话,诶,那是不是又得去把我们在。日志数据、消费分流中所研究过的那一套东西,再来研究一下。对吧,比如说你怎么就能够保证。不漏数据了,怎么就能够保证不重复数据了,那最后这个问题我怎么去解决,我怎么做到精确一次。大家想想啊,这个过程跟那个日志数据的那个过程是不是一个道理啊。对吧,它是一个道理的啊,那如果说你能够把这个事情想明白的话,那你看一下我们在之前已经写了一套这个偏移量管理方案了,那我是不是可以直接照搬到你的这个环节中啊。反正你都是从我的卡夫卡里面去消费数据,对吧,最后的数据也是写到卡夫卡啊,其写到哪里无所谓啊,主要是你这个从卡夫卡消费数据,我们主要是说的消费的一个过程,那你就看看吧,我是不是可以把你这个之前那一套平价的管理方案直接照搬过来就可以去用了吧。
03:05
对不对啊,所以说啊,大家注意来这一套就是我们在写这个贝斯DB的时候就省事多了,因为我们该做的东西之前都已经差不多都做好了啊,就直接用来,你看第二步就干嘛呀,那我们就要什么通过之前的这个偏移量的管理方案来去写,那首先我们想要从你的。卡不卡,拿数据的前提就是我得带上你的opposite,那如果你想带上opposite,你就得从你的面去读,对不对,所以说我们要干嘛呀,来从。中读取什么呀,便宜量。好,第三个读到偏移量以后呢,来我们从什么从卡布卡中啊,从卡不卡中。啊,消费数据对吧,那你消费到数据以后啊,注意下一步,那你是不是要干嘛呀,从啊应该是叫什么叫这个提取偏音量对不对,提取偏音量结束点就是你每消费到一波数据以后,我一定要把这个偏移量的结束点给它提出来,好提出来以后,然后接下来第五步就是我们的数据处理。
04:06
对吧,数据处理,那你这个数据处理的话呢,应该大概率情况下还会有这个什么结构的转换对不对,哎,转换数据结构,转换数据结构对不对啊,那你肯定关键的不就是分流了吗?啊分流是人流诶。诶,啊,分流是吧。好。能看到吧,同学们啊,你大概率情况下就是你一定要去做这个分流的啊呃,那这个分流的话,我们怎么分呢?这里面有两个原则,一个是看一下你的事实数据,诶分到什么地方呀,事实数据进你的卡不卡,然后呢,你的维度数据对吧,进你的。这是我们已经什么规划好的。对吧,然后这个细节的话呢,我们再分流的时候哦,再来具体说好吧,行,那你等把这个全部都处理好以后啊,注意因为我们数据之后呢,也是写到这个卡夫卡中的是不是,所以说我们是不是尽可能还得什么去做一个什么事情啊,叫刷写叫什么呀,这个flash啊叫flash卡夫卡啊,Flash这个缓冲区啊,卡不卡的这个缓冲区啊。
05:23
对吧,然后最后呢,还有一个更关键的,你是不是得去提交你的offset了呀。对吧,提交你的这个偏移量。明白这个意思了吧,同学们啊,提交这个偏移量啊,OK,行,那么这就是我们在整个这个诶业务数据消费分流中,我们应该要去完成的工作,好,那整体来看一下吧,这几步。非常简单吧,这几步我们都做过了吧,所以说你看啊,整个这个消费分流,其实核心的还是在你的第五步好,所以说我们这个迅速的把前面的先把它做出来啊来,那我们就写了啊过来过来以后呢。
06:01
呃,这个改成object对吧。好,来,那就开始写吧。呃,写个注释啊,然后第一步呢,就是准备这个环境了哈,准备实施环境好,那我们就什么直接扭吧,我们需要一个Spark对象。对吧,然后呢,Set这个APP name,比如说我们叫做什么贝斯ods吧,然后呢,贝斯DB的一个APP,好set这个master,比如说我还是写上这个四个并行度啊。好,这时候的SPA com对象,然后再去用上一个streaming contact,对吧,把这个Spark对象给它传进去,然后呢,接收回来,这是我们的SSC啊。好,后面还需要一个采集周期啊。哎,对着呢啊采集周期啊1SECOND。
07:02
呃,就这个五秒钟吧,对吧,好,然后接下来就是SSC,点这个start,然后呢,S sc.a好,那就写好了吧。对吧,这个环境应该很简单啊,行好,然后接下来第二步,第二步干嘛来着,是不是从你的red中读取偏移量了呀,来从red中读取偏移量,好咱们读来着,直接什么my opposite you2种read你的opposite,那这里面需要传什么呀?传你的topic和你的group ID,好,那我们来传一下。那我们定义出来吧,好吧,定义出来这个是topic name。好,这个我们用什么套贝呀,同学们。啊,用手套这个。你是不是要读卡不卡的哪个topic吧,消费哪个topic,你要去写哪个topic,那你看你消费哪个topic,是不是你max把数据写到了哪个topic呀。
08:02
对吧,所以这个一定要跟你Maxwell中配置的是保持一致的啊来我们找一下哎,Maxwell啊,像呃,Con properties这里面有啊,我们找一下那个呃,叫topic是吧。就在这对吧,我们写的这个名字吧。是不是我就把它复制过来了啊好,放到这行,那还得有一个主对吧,Group ID啊行,那我就叫这个名字对吧,Ods什么什么杯子DB,然后呢,加一个什么group吧。啊,这个主的名字的话,你就写,随便写一个得了哈,OK吧,行来,那就把它写好了啊好,写好以后你看了就把这个topic name传进去,诶topic name传进去,把这个group ID传进去,好接收回来,好,这就是我们的呃,Opposite对吧?好,那这个有了以后,那接下来第三步就是从你的卡不卡中消费数据,那消费的话呢,我们就要带上你的便移量了啊,但这个东西不一定有,特别是你第一次的时候,它肯定是没有的,说你要做判断啊,如果说你的opposite是不等于空的,对吧?并且呢,你的opposite是什么?Now emptyed对吧,就是飞空的,就它那面一定是有东西的,这种情况下,我就可以通过叫什么叫get,卡不卡stream,然后SSC传进去来看什么还要传你的topic name,传你的group ID,再把你的opposite给它传进去。
09:37
对吧,就是你有偏移量,那我就可以什么拿上去读了。好,来把这个提出来。放到这啊OK,把这个改成V2好,然后再给它呢,赋一个值就可以了,对吧,那如果说我没有这个便宜量,那怎么读呀,那不就是把它拿过来往这一放,然后呢,这里面不管它了呗。
10:07
对吧,这就独到了。好。然后这个拿到数据以后,接下来第四步就干嘛呀。是不是提取偏移量结束点啊,对吧,那这个怎么提呢?我们用的是卡夫卡stream,然后呢,做一个transform,呃,然后呢,他给我传过来的是一个RDD。对不对,他给我传的是一个RDD,那我把RDD拿过来,然后呢,这里面我们就RDD讲as instance of转成一个,就是可以提取偏移量的一个类型啊,Has opposite ranges,然后它里面有个opposite ranges拿出来,对吧,这就是我的opposite,好,这是我最终想提的啊,那我把它统一拿到外面,因为我要在外面去使用的啊,把它改成VR。好,这个是等于空,再给大负值啊。
11:01
行,那我把它提到了,提到以后啊,因为这个我们只是在去提取偏移的结束点,我们并没有对你的RDD呢做任何的处理,所以说呢,我们最后还是要把最后还要把这个RDD呢价格大家给返馈回去啊,对吧?来接着回来,好,这个是我们的就叫offset range斯的这个吧。This stream OK,这不就好了吗?对不对啊呃,那么到目前为止的话,你看一下啊,我们就已经把这个该做的工作做完了,那接下来第五步呢,其实就是我们的数据处理了啊来,那我们这个做一个简单处理啊,第五步处理数据啊呃,处理数据的这个第一步的话就是转换结构,转换这个数据结构啊,那你说我这个转换的什么结构啊同学们。啊,转换什么结构,你看啊,现在我拿到的是不是还是一个消息啊,就是一个consumer code,那我们知道的是,呃,我们重点关心的就是你这个消息的一个什么value,像你这个key的话,其实我是不关心的,我只是关心这个value就完事了,对吧,我只关心这个value啊,OK,呃,那我们就可以把这个value取出来,取出来以后呢,呃,它还是一个JAJA什么呀,Jason格式的,因为我们知道这个Maxwell啊。
12:19
就这哥们啊,他把这个数据库里面数据采集到以后呢,它是什么,通过什么摘格式的数据呢,发送给什么这个卡不卡的,所以说我这个地方拿到的数据还是一个再审格式的,那我是不是可以同样把它转换成一个摘审的一个什么对象就可以了吧。对吧,来我们先转成加入加入对象啊,这样我是可以看得到你这个效果的啊OK,那我们做一个转换吧,做一个map操作,呃,Map的时候呢,他给我传过来的是一个纯门尔靠的啊,OK,那我要给人家返回什么东西啊同学们。啊,是不是consumer record,讲这个value拿出来,这是我的一个叫data吧,就是我的这个战数据。
13:01
好,然后呢,我再什么通过这个摘讲什么呀,叫先导包啊。好的包包啊叫什么叫pass object对不对,然后呢,把这个data呢搁进去,然后呢接收回来,这就是我的战object。是吧,然后最后呢,我把这个什么加上object给他什么。返回返回回去。能看到吧,这来这个我们介绍回来啊好,这是我们的Jason object的一个stream。OK,行,那我们拿到这个以后呢,诶其实我们现在就可以什么做一个打印了啊,就刚才我们不敢打印啊,为什么不敢打印啊,啊为什么不敢打印啊同学们,因为你的这个是不支持序列化的,如果你要打印就会出问题,但是呢,我这个是支持序列化的,我就可以什么进行这个打印,好吧,来我们这个打印一下啊,行,那就相当于我们先看一下我能不能把数据呢?诶在我这个代码层面给他这个消费过来。
14:06
能理解吧,哎,在代码层面把它消费过来啊,行呃,那如果你要运行的话,这里面你看啊,我们用到了Maxwell,用到了什么,卡不卡呃,用到了什么。那这些东西你不得都把它启动起来吗?对不对,所以接下来啊,我们先把这个该启的东西启一下啊嗯。好回到我们这里面,然后呢,先把这个主K起一下。啊,先把环境提起来啊。然后呢,卡不卡。好呃,Red对吧,我们red-server,然后呢,指定一下OT model买RED1018下面的red.com啊。OK,然后再查一下啊,就是他正常要取成功的啊。
15:02
没问题吧,好还有一个Maxwell,呃这个B就在这啊对吧B然后呢,呃,Maxwell然后呢,刚刚con conig点啊杠刚DEMO是吧,呃这个这最好写个脚本是不是比较好一点了。对吧,大家把它包成一个脚本吧,啊这我我就不包了吧,因为这个就这么行,代码我就写写也行啊行,如果你要嫌麻烦,你就给他什么搞成一个脚本啊。好,那现在应该都起了吧,这个这边看一下啊。Mal也有了对吧?行,那这个都齐了以后呢,接下来我们就可以正常的起我们的程序了啊,把这个程序跑起来。好来运行一下。好,稍等啊。
16:00
呃,等这个程序跑起来以后呢,我们去做一个数据的生成啊。我没有打印啊,等他这个打印那个时间的哈。好,第一次读不到哈,第一次读不到。好。看到了吧,开始打印时间了啊,开始打印时间了,OK,行,那接下来我们就可以去生成数据了啊,回到这个地方来生成一下数据,呃,那这个生成数据的话,我们好像没有准备脚本对吧。对不对,就是是没有写这个脚本对不对,好,来这个我们写一个吧,就方便一点啊,把这个脚本给大家补一下啊,补一下这个脚本啊,这个怎么补呢?呃,那就非常简单了呀。这还咋补啊?我们之前不都写过一个脚本了吗?把它复制过来不就可以了?看一下啊。
17:00
把它拿过来。这是第四个第五个对吧?好这是什么呀,放到这啊,第五个我把这个脚本补一下啊,这样会方便一点啊,生成什么呀,业务数据脚本好还是一样的,呃,我们要干嘛呢?就是还是判断你的这个参数个数,前面那个都不用动啊,要改的应该是这个地方,就是我们不叫DB log了,我们叫的是我们不叫a log,我们叫DB,下方叫log啊,看我这个路径啊。对吧,我叫什么DB下滑的log,然后呢,文件的名字呢,我叫application。把它一改对吧,这是改日期的,然后下面就是CD到你的OBD model,然后呢,DB下回的log。好,然后Java钢栅,然后呢,运行的这个炸包,把它复制过来。行。好,那这脚本不就写好了吗?对吧,那你直接把它复制出来,复制出来以后呢,你就放到一个脚本里面啊,呃,其实我是有一个的啊,但是我如果贸然用的话,你们你们也不知道我为啥用它对吧?来呃,来到病里面啊嗯。
18:10
我删了啊,我叫DBS啊,我把它删掉来好,那现在就相当于我们要重新创建一个脚本了啊,我就叫DBs.SH啊好过来以后呢,我把这个刚刚复制的这一堆拿过来,好把它放进去,这就搞定了。然后呢吧,好搞定以后来保存退出。啊,给权限啊,全值mod,然后呢,U加X,然后DBs.SHOK,那这个有了以后呢,接下来我们以后再去生成数据的时候啊,比如说先拿到这啊OT Mo dlo你看一下啊,当前它里面是呃,它里面那个日期啊mo.data在这是20223月12号对不对,对吧,杠零三杠幺二啊然后接下来我们干嘛呢?就是DBS,你看看啊,我直接什么写个2022杠零三杠。二二好,这就开始生成数据了,然后呢,其实你看一下啊,那个文件里面的日期呢,也就发生了改变了。
19:06
看到了吧,就一个道理啊,跟我们以前写的是一个道理的啊,行好,他开始生成数据了,那我们回到这个里面,应该就能看到这个数据了吧。往下翻啊。对吧,是不是看到这个数据了呀。这就是你这个Maxwell帮你这个采过来的这个数据。能看明白吧,啊,Maxwell帮你采过来的数据啊。这个没毛病吧,同学们。那也就意味着到目前为止。啊,我们已经什么能够成功的把数据呢?诶通过我的Spark stream把它消费过来了,那消费过来以后下一步的操作就是做分流操作了,OK吧,行,先停一下。
我来说两句