00:01
好,那我们上午呢,是测试了三种不同的这个CDC。对吧,啊,测试了三种不同的这个CDC来做的这个事情,呃,而且呢,也对比了一下,最后呢,大家一定要把这个结论给他记下来,对吧,而且呢,我等会儿把这个聊完之后,会给一点时间大家去自己做测试,并且把我们的ods层。搞好对吧,好,那现在呢,我们只是测完了这个CDC,接下来呢,我们要把ods层做完对吧,那我们就搭建这个模块,呃,在这边。我们创建一个新的模块叫j more go real time啊注意啊,没下一步,然后接下来呢,我们这边叫G末go real time。实时模块对吧。下一步把这个呢粘连起来完成。完成好以后呢,这边我们要去导出依赖,并建立好我们相应的包啊,要把包这个的建立一下,那这里边呢,我们主要建立这几个包,后面再有其他的包,我们再说,那目前呢,我们建一个APP对吧?APP启动类啊,启动项相关的,然后呢,并招聘所放的目录common公共啊,一些常量类啊,或者什么东西放在这最后一个优工具类。
01:26
对吧,啊做这个事情行,那把这个四个包呢,给它建好。在realtime当中。创建四个包,看点爱的硅谷点包。嗯,然后呢,我们还有并包对吧,在创建一个啊,看我们点爱的硅谷。点并包,呃,公共包。Come。还有一个优工具类所存放的包U包,OK吧,那后面再有其他的东西,我们再说OK吧,好,那除了这个之外,我们增加配置信息啊,那这个配置信息呢,就会多一些了,我把它整个拿过来,我们一起来看一下啊。
02:15
我把这个配置文件全部拿过来,然后呢,放到我们的real time这个文件当中。好,那接下来。前面我们是定义了一个pro啊,因为后面呢,我们关于flink的加包很多地方都会写01:12点零,所以呢,把这样的东西我们可以给它提取出来,对吧?啊,那Java版本1.8,呃,然后呢。加编译版本啊,都是这个1.8啊,然后Li版本01:12scla02:12哈度版本3.1.3啊,那下面呢,都是我们直接Dollar的方式去引用就好了。对吧,这样的话,如果说我们要修改版本,只需要修改上面这一个位置就够了,不用到每一个依赖当中都去改啊,好,那第一个是弗Li Java,第二个弗Li streaming Java没什么好说的,对吧?然后呢,跟卡夫卡的连接器,因为我们未来写代码是要跟卡夫卡连接的啊,然后呢,弗Li客户端,因为我们要在本地运行啊,做测试,所以呢需要有弗Li客户端帮接下来CP啊,就是后面呢,有一个需求当中我们会用到这个CP啊,所以把CP的依赖导进来,弗link杰son啊,因为最后呢,我们要用那个弗Li circle里边有杰son fast杰森,对吧?然后哈杜的一个客户端,因为HDFS我们要保存简查里啊,往往要保存检查点,然后买S驱动包,买S的连接器,对吧?之后呢,打印日志相关的啊,打印日志,其实这些依赖呢,我们之前都见到过。
03:52
都见到过对吧,好,那之后呢,这是一个打包依赖。啊,这是一个打包依赖。
04:01
那这个呢,我们就搞定了,接下来呢,我们到at包里边,我们就写我们的代码,那这个呢,我们叫ods层点flink CDC。这是读取我们MY数据对吧,做这些事情好,那这个东西呢,我们都写过了啊,我们都写过了啊,所以呢,我就不写了,我就直接把之前的代码呢拿过来就行了,然后接下来呢,在APP包底下我再创建一个啊。好,叫AB点方程。放这个。函数的,我们有可能会自定义很多的函数,所以我给他提出来,稍微复杂一点的我就提出来,对吧?啊,但是都属于这里边的,那我们要提的东西呢,比方说弗林格CPC当中,我们肯定要把自定义的反虚量化器这个东西拿过来,对吧?Pro c,哎,拿到我们的方程。啊,拿到方程当中对吧,先给它放在这,接下来呢,我们的ods层的代码,Ods的代码,那就用弗林格CDC么意思这个内容就好了,对吧,用它。
05:12
但是呢,我们有的东西需要修改一下Li CDC在,诶不是它啊,是这个OS。把这个拿过来啊,我没敲麦函数啊。然后呢,给它粘进来啊,这个包呢,我们倒一下啊。啊,这个异常我们就处理一下,然后卡打一下包。好,ABB方下面的一个包,OK啊。这个就搞定了,那接下来呢,有一个地方我们需要做一个修改,第一我们既然要监控这个业务库,所以呢,他不要了。他不要了,我们要监控整个库,对吧?第二这个初始化呢,我们一般的生产环境当中,你什么时候做这个内容,那你就从这一刻开始算历史数据呢,就不要管了,所以呢,我们改成latest啊,这第二块我们修改了,那么第三块我们并不能打印数据,打印数据呢只是做测试,并将数据将写入卡夫卡。
06:16
对吧,我们要写到卡夫卡里边,OK,那这个时候呢,呃,大家想一下卡夫卡这个东西我们应该怎么去写了。我们应该怎么写啊?卡卡还记得吗?还记得怎么写卡不卡吗?卡夫卡生产者think,哎,对了。Think啊,那也就说这块呢,我们将要用stream s.I think。
07:01
然后又一个叫flink卡夫卡的一个producer对吧,然后在这里面呢,传相应的参数,比方说我们要传的是broke list。啊,然后呢,主题接下来呢,是我们的civilization的一个scan序列化。内容,那我们用simple scam是不是就好了,对吧?啊simple sc好,那我们想啊,现在呢,我们是读取买circle数据写到ods,那后面之前我们说过了啊,接下来呢,应该用fli,再消费ods数据写到DWD,然后呢又消费DW数据写到。DWM,又DWS,也就是说要写卡不卡的话,好些地方都要写对吧,那你想想看,每一个地方六是不是不太好了,所以我们可以把这个代码给它干什么。提出去啊,给他提出去放到工具类里边吧,以后要用的时候,我们就不要每一次都去溜了,直接从工具类当中提取出来更为方便一点,对不对?好,那这个地方呢,我们就不要这样写了啊,直接到这边我们先写我们的工具类。
08:14
啊,到这边来几个工具类叫麦卡不卡。啊,这麦卡卡6OK。嗯,那这里边呢,我们肯定要写一个代码了啊,写什么样的代码呢?来我把这个others啊,然后再把它打开。刚才我们看到这里面呢,如果说正常来说呢,我们要有一个卡不卡的一个producer。对吧,那没有它,我们这里边肯定要有一个。方法返回值为那个类型,对吧?Public static返回值类型啊。这里叫。Li卡的一个producer,好,那get producer。啊,Get的开发或六好,那么刚才我们看到这个东西要传什么参数进来啊,那我们想什么东西该在外面写?
09:11
这样我们当前这个项目呢,肯定是连的同一个卡夫卡集群,所以呢,在这边我们用这个卡a list在外在里边写就行了,不用在外面传,那其实在外面传进来的是不是就是一个主题呀。对吧,我们在外面要传一个主题进来,这个呢,也是大家所熟悉的叫simple string scanal,对吧?啊,那这个呢,也在里边写就好了啊,所以呢,这个地方我们只需要准备一个string。S topic。好,那它呢,我们叫ods。DB。对吧,好,那这边呢,我们将用这个地方呢,就很简单了,Topic。
10:01
啊,Topic对吧,那将来呢,在这边我们直接return又一个flink卡卡的一个producer啊第一个位置要一个brook啊要一个brookli好,那这个brookli呢,我们可以在这写一下啊杜甫102冒号9092逗号啊杜甫103冒号9092。多吧,还有一个哈杜104对吧,括号9092,好,这是第一个参数搞定,第二个它有一个主题,那我们主题呢,就叫topic啊,还有第三个参数,第三个参数呢,看一下他要一个civilization的一个scal,所以呢,我们直接用一个叫simple string STEM就好了。对吧,那以后呢,我们就不需要每一次都去创建这个内容了啊,那这边呢,去用的时候也不要去扭这个对象了,直接拿着叫MY卡普卡YouTube.get卡卡的一个producer,把这个think topic进去。
11:03
就好了吧。没有问题吧,啊,就这样的一个方式就搞定了。对吧,那后面呢,我们可以复用了,就简单一点,只要给一个主题就够了,不需要写那些东西了啊,但是有的人说你这个代码也不多啊,但是你要学会这种方式啊,就是说这个代码明显会被复用嘛,啊,要被复用的呢,我们就给他提出去对吧,提出去啊。这个这个东西我改一下叫他啊,这个没用。行,那这个是将数据呢,呃,我这行那就不要了,之前我们也说过了这个事情,对吧,不要不要了啊。我们监控这个库,然后呢从latest,然后用自定义的这个反虚化器来做的,对吧。
我来说两句