00:00
好,那我们呢,还是一样的,把前面所讲的内容呢,做一个快速的回顾啊呃,那首先呢,我们头一天啊讲的是dim层。对吧,写了一些代码,但是呢,还并没有去写完,还差最后一步,我们已经将数据过滤出来了,对吧?嗯,但是呢,还差最后一步将数据写到。Phoenix里边啊,这步还没做啊,那我们等会儿要去写的,呃,那前面我们做了什么事呢?我们整个的dim层,我们的目标。是将业务数据就根据维度建模理论对吧,抽取出这个业务表。啊,因为在我们业务库里边儿有实时表,有这个维度表。对吧,那有两种,那我们要把这个维度表呢,放到。Phoenix或者说是h base里边,因为我们之前分层的时候就聊过了,对于DM层它的一个使用方式,未来是事实表数据来了。
01:06
我们要根据实时表当中的。外键啊,我们可以叫外建对吧,然后呢去查询。维度表,或者说呢,拿着外键去关联,去join我们的维表。对吧,我们去做这个draw。啊。好,呃,那很明显,它的用法呢,是你未来只要来了实时表数据,就有可能会用上我的维度表数据,那么它就需要我们做永久存储。而且呢,最好还可以做join,或者说做。查询,根据主件查一行数据明细,对吧?我们要的是这样的一个功能,那么明显呢,我们当时所聊的卡夫卡呢,它就不太适合这个事儿了,最早聊分层的时候,对吧?所以当时呢,我们还分析了有很多框架,比如说这个,呃,H base hdfs啊,Have对吧?呃,My circle ES house red啊等等,都对比了一下我们之前所学习过的这种存储的框架。
02:15
对吧,啊,综合分析呢,更优一些啊,所以我们选用,所以才会有刚才我们所说的还差一步将数据写到H啊,当然呢。我们用了一个Phoenix。对吧,因为它可以通过S来做这个增加奶茶比较方便啊好,那这是我们的目标,也就是说我们要做的事情就是读取。Topic DB这个主题的数据,因为我们所有的业务表都在这一个主题里边,对吧,因为我们把前面过滤条件干掉了啊呃,那第二步呢,就是过滤出来我们想要的维表数据。啊,第二核心的一步,过滤出来我们想要的维表数据,第三步将数据写到。
03:01
Phoenix。对吧,这是第三步啊,那我们要做的就是这么三件事,第一件事很简单,消费数据我们已经写过非常多的,对吧,只不过说呢,我们在做这个的时候跟大家说了。之前大家所用的这个simple string STEM。它里边。如果消费到一个能值,那么它就会怎么样。空指针。啊,他就会报这个控制帧异常,因为他要求你那个数据不能为no,对吧,所以呢,我们并没有用simple string scal啊,我们自定义了一个SC对吧?核心的我们判断如果它没nu,我们把数据转换转成个空,对吧?啊,给它写出去,它这样就不会报错了,因为未来有可能会出现这种nu值进来呀,对吧?啊,你不要因为这个整个任务挂掉了,那就不好啊,所以呢,我们做一个处理,这是第一步消费卡不卡的数据,第二步过滤,我们就做了一个很长时间的讨论,然后真正实施起来确实也比较麻烦。
04:02
对吧,因为我们当时说了过滤有个很简单的方法。直接在里面写死,我要十张维表,那我把这个维表信息呢,写在一个集合里边。然后你数据过来之后,我拿着表明去我集合里面去查,我看有没有对吧,如果在集合里边确实存在。哎,那这个数据呢,我们就保留。那如果它不存在。那这个数据呢,我就不要,我就过滤掉,对吧,这是我们最原始最根本的这种想法,但是呢,我们说它不好。它不好的点在于什么呢?主要在于生长环境当中,我们的业务表啊,它会不断的去增加。那业务表增加了,我们其中的维度表也有可能会被增加到呀。对吧,好,那一旦你增加了尾表,而且你在。实时数仓里边需要用到这个为表的时候。
05:03
我们就只能修改代码。重新编译打包。对吧,上传到集群,把老的任务停掉,把新的任务重启起来。对吧,咱们要这样去做。这就很麻烦呀,你未来要做这个事情,你是不是麻烦太麻烦了,对吧?好,那咱们怎么办呢。我们就想着这里边有两件事儿,第一修改代码,第二重启对吧?嗯,那我们当时说的这有两个问题,一个呢是改代码啊,一个呢是重启啊,他们两个我们要解决问题,可以从三个角度解决对吧,解决它这个呢,先不解决啊,那这个不解决,单独解决它,以及两个都解决掉啊,我们如果单从分类上来说。就三种,但是有一种情况是不存在的,哪一种啊。就这个。嗯,就就哪个应该第二个对吧,第二个。
06:04
第二个。啊,解决修改代码问题啊,不解决修改代码问题,但是解决重启问题,什么意思呢?就是我代码正常的修改,但是我不重启的,这个是不可能做到的,对吧,你修改代码了,你肯定要重启啊。对吧,你修改代码了,你你把代码改了太大风险。对吧,所以呢,我们就变成两套方案,哎,那我呢,看想一想我能不能。不修改代码对吧,我代码不改了,但是呢,我我我可能需要重启,起码比刚才两个都要操作要好一点吧,那终极目标是不是两个都不做呀。对吧,我连重启都不想重启了,如果你不做重启代码肯定没改,对吧?啊,那这就涉及到动态的,我们从外面外部去影响已经。运行的程序很明显对吧,你如果说你外部也不去干扰,不去做这个干涉,你就让这个代码正常运行,那怎么可能呢。
07:06
对吧?啊,所以呢,第一步我们想着能不能做到不修改代码啊,不重新去打包,就去编译打包,然后上传,对吧,我只重启,诶我把任务停掉,然后重启一下,对吧,我只做这个事儿。有没有办法做到,诶可以呀,对吧,啊,那咱们呢,就是。用配置文件。啊,最早的时候我们说了用配置文件的方式。对吧,诶让他呢去读取这个配置文件。那未来你要增加一张表,那我在这个配置文件里边增加一个信息就好了。然后重启,重新去读这个配置文件,对吧,那通过这种方式呢,我们就解决了。第一个问题。我做到只重启,不改代码。对吧,啊,但是呢,离我们终极目标还差一点,我们终极目标呢,是连重启我都不重启了啊,那怎么做呢?我就想那第二步里边它为什么要重启,我们就从这个角度来说的,对吧?诶它重启的原因在于这个配置文件,它只在启动的时候加载这么一次。
08:17
啊,它只在启动的时候加载一次,那问题不在这儿吗。对吧,所以我们当时就想到了,哎,那可以。用定时任务,每隔一段时间去加载一次,或者说呢,我对于你这个配置信息,我就做实时监控。你一变化了,我就给你读过来。两种方案。对吧,那我们写的是第二种方案啊,咱写的是第二套方案。啊好,那第一个方案呢,我们之前说了,呃,在我get上面有代码,那如果说你把这个搞定了啊,当时呢,没给大家说对吧,在哪是哪个班的啊,现在可以告诉大家,因为大家已经把第二种敲过了嘛,对吧,省的两种方案呢,混了啊,起码你把第一种先敲过了,然后掌握了,咱们呢再告诉你第二种啊,那这个在哪呢?在我的给的意义上面啊啊如果说你想要这个,只是说你想要去了解的,你去看一下啊,你要说这个东西呢,不想掌握了,无所谓啊,那你也可以看了啊,这个无所谓的啊,不是说呃,非得大家都去看一下啊,在仓库里边。
09:26
然后是。嗯,我给大家找一下啊。比较早了啊这个。这个。啊,如果说你想要去了解的话,你就代码在这儿,你可以当下来在哪呢?还是在这个real time啊real time,然后呢,Src我给大家看一下啊,然后呢,这个man对吧。还给股,然后APP。
10:00
Functions。嗯,当时呢,我叫这个名字叫DB spli process function点开啊,定时任务写在哪呢?写在open方法里边,我们跟大家说了对吧,这个呢,读取配置信息,然后呢,定时任务在那。每隔一段时间去加载一次。啊,每隔一段时间去加载一次啊,但是呢,读取配置信息呢,封装成的一个方法啊,然后呢,这个是定时任务啊,这个定时任务如果你想要了解的,哎,你可以把代码放下去自己去看一看啊,我就不多聊了啊,就不多聊了,OK了,这个是我们之前给大家说的,诶,那这是。第一种方案。对吧。这个底线任务我说过,之前我都写过啊,之前都写过OK吧,这是第一个,第二个呢,我们说可以实时的去监控。啊,可以实时的去监控这个内容。啊,核酸回来了是吧?行呃,我们实时监控,那监控有两种方式,第一个呢,我们就正常写一个文件,我们可以用附众监控,第二个呢,就把这个配置信息写到MYS里边,用flink CDC去监控,当然第一个会更好一点,因为它的数据流程会短一点,对吧?因为你配置信息写完了,我用弗Li CDC直接监控lo就完了,那你这边还要经过弗卡夫卡,再经过弗link,对吧,你这个呢,经过的流会更长一点。
11:19
啊,那这种可不可以呢?可以对吧,那我们就不需要用到这么多组件了,因为我们已经讲过弗CDC了。对吧,现在我们直接用弗利CDC来解决当前这个问题,那就好了。OK吧,啊是这个意思,好,这是我们整个的一个思路。啊,整个的一个思路我们就回顾一下啊好,那前面呢,我们是聊到了他的一个思路,那接下来呢,我们就聊一下具体的一个实现啊,具体时间呢,我们这边是有这个PPT的,对吧,我们就直接通过这个PPT来做一个。回顾啊,做一个讲解啊。好,那咱们的内容呢,应该在。
12:02
这块对吧?啊,当然这里面呢,有一个小小的问题,我们统一的来看一下啊,啊有同学跟我提了啊,那我就现场在这边来改一下,省得再有同学跟我说这个事儿啊,因为我当时说了这个文档当中呢,可能还有一些笔误或者什么地方,对吧,那我就在这儿给大家去。改一下啊,那其他同学看到这个问题就不用跟我说了啊,已经有同学说过了,OK吧,好,那我们来看一下这个内容。首先,第一步。消费ods叫topic DB主题的数据对吧,然后呢,咱们做一个过滤啊,判断是否为杰森啊,我们不光做了这个过滤,我们还做了其他的。对吧,我们还做了其他的,还做什么。还记得吗?我们把这个删除操作给他干掉。对吧?呃,然后呢,我们考虑到未来会做这个初始化,所以呢,初始化里边它一头一尾也会有两条脏数据啊,那我们把这个是不是也干掉了,因为它里边肯定不会有这个信息。
13:03
啊,不会有我们要的这个数据嘛,对吧,所以把这个干掉了,那我们就只保留着。Insert。啊,呃,然后这个。Update新增修改,还有这个photoshp的音色的对吧,它这个做全量的这个阶段保留这么三个数据对吧,这是我们这儿也同时做了一个过滤,同时呢,把这种非阶算格式给它干掉了啊,那非阶段格式呢,就包括我们之前所写的那个,所以哈。对吧,我们把这个数据变成了双引号啊,那这种如果说你前面来了浪值,那么到这块也会被干掉。对吧,而且呢,当时我们写代码的时候,我们说这个地方未来用JS肯定会更方便,所以呢,我们要做两件事情,第一就是刚才我们所说的过滤,第二转化为杰森对象,对吧?呃,那如果说我们用filter来做过滤,那我们就后面还得用一个map。
14:00
对吧,来做一个什么。调整结构。啊,那实际上呢,用process或者说用这个map都可以。一个方法来解决这个问题啊好,这是这个内容,当然如果说你像这种脏数据你要保留下来的话,你就可以用process更好一点,用不用来的map,因为process呢,有这个测输出流,对吧,我们可以把这个脏数据啊写到测输出流里边,诶那呃,你可以保留下来。你可以把这个数据保留下来,未来可以看一下,诶脏数据到底有多少对吧?啊,如果过多你可以跟前面去沟通,但是业务数据其实还好啊,主要是行为数据那边可能有脏数据,但是呢,行为数据大家呢,在。From里边是不是写了一个ETL拦截器?大家记得吧,对吧,我们写了有ETL拦截器,所以呢,呃,把那个非监测表示数据呢,也已经过滤掉了。啊,烟过滤掉了,好,那后面写到。
15:02
这个日志数据的时候,我再跟大家说,如果我想把那个脏数据也保留下来,那怎么做啊,到时候再聊,现在呢,我们不聊这个事了,大家知道一下我们未来会有这个点啊啊,那如果正常的数据呢,我们就转化为JS格式之后写到主流。对吧,那主流呢,就形成了一个杰森object对吧,而且是过滤后的,过滤后的这里面呢,只有我们要的维表数据,以及有一部分事实表的数据在里边,注意这时候还有事实表,我们还有第二步最核心的过滤,对吧?啊,那最核心的过滤呢,我们要利用。另外的一个配置信息啊,那我们是说用第二套方案,用实时监控的方式,所以呢,我们用弗林格CDC读取配置信息表,创建一个流,对吧,创建流以后呢,我们。把它做成一个广播流,至于为什么要用广播流,我们之前就分析过了,为了不丢数据,对吧,它有两种方案可选,第一两个流呢,都做这个keep。
16:02
按照表明做key办对吧,这是第一种方案啊,按照表明做key啊,那么第二种方案呢,就是说我们说的广播。广播对吧,我们的目的就很简单,让我们数据。主流的数据能一定能找到它对应的。配置信息。对吧,那KBY,我能保证两个表明相同的进同一个温度啊,广播那就不用聊了。我配置信息会发送到下游所有的笔录里面。啊,会发送到下游所有的平行度里边。对吧,啊,那当然可以读到我们要的这个配置信息啊,所以呢,我们最终选用广播流,因为我们配置信息数据量它很小很小啊,它俩的区别我们说了K呢。它可能会造成树倾斜。啊,那如果说是广播流。
17:01
它缺点在于数据是冗余的,每个病毒有完整的一份。缺点在于,如果数据量特别特别大,那他就不擅长了。对吧,是这个意思,好,那这个搞定搞定好以后呢,你接下来我们就正常的去连接。啊,连接流,接下来我们就处理这个连接流,处理连接流呢,它里边有两个方法。对吧,第一。处理广播流数据,第二处理主流数据对吧?哎,我是换了一个位置才这样啊,如果不换位置,它应该是,诶,上面是process element,然后下面是process broadcast element,对吧?广播流好,那我们还记得这两个方法里边我们分别都做了这么三件事儿啊,比如说第一个广播流,我们做了哪三件事儿,想一想。回想一下。广播流里边我们做了什么事?123。来,我们一块来回想一下啊,想一想大家呢,可以,嗯,敲到这个公屏上。
18:10
一数据解析啊,不错,就是解析成table process对象。对吧,啊,但是呢,在做这个任何数据处理的时候,我们都需要注意一下。我们处理的这个数据的格式是什么样子啊,第二步啊,校验见表没问题。啊,就是说我们判断这个表是否存在,如果存在了我们就不创建,对吧,如果不存在我们就去创建,当然这个存不存在呢。很简单对吧,我们就用not exist就好了啊,第三步将状态写入啊,把这个数据写到状态里边并广播对吧?并广播啊,那这是我们广播流的三件事儿啊,大家一定要知道,因为你只有。非常详细,了解我们的流程以后,才能够做到写代码的时候更舒服一点,对吧,那还有另外一个主流。
19:06
主流里边同样的,我们也做了这么三件事,哪三件事儿。主流。告诉我做了哪三件事?啊,只有方总比较给力,一个人在这儿回答,是吧。其他同学都。不愿意。敲一下啊。是不是已经睡着了?啊,到周一的早上。
20:11
啊,还是方总对吧,加第一个获取广播流数据。啊,第二过滤字段啊,第三,哎,云总补充了一下说这个。Think table字段啊,周一的早上容易犯困是吧?是不是周一早上容易犯困啊,对吧,所以这个时候更应该大家都一起。动脑,然后呢,瞧一瞧,要不然你更容易困,你想想看,你脑子在那停滞着,不想就两眼呆滞对吧,盯着屏幕。啊,人家在那敲呢,你也不管啊,你这样不更容易捆绑是不是对吧?啊所以呢,还是一样的要敲一敲啊好,那这是我们做的三件事,那这里边呢,其实更重要的是大家的一个Java的一个功底,对吧?啊,就是里边自传拼接啊,乱七八糟的这种事情还挺复杂的啊呃,好,这是我们。
21:05
看到这样的一个事情啊,呃,那这个处理完之后呢,我们就可以把数据。写到流里边对吧,写到流里边以后呢,差最后一步,但是这一步呢,我们马上就要来做了,对吧?这是我们前面所有的一个流程啊,这是我们前面的东西啊,我们已经把代码写到这个位置了啊,这个呢都已经打印了,而且我们测试了对吧?动态的可以加一张为表,也可以写出来,也可以在这儿打印,那能在这儿打印,未来我就可以想办法把它写到。Phoenix对吧。
我来说两句