00:00
好,那第三步呢,保存出去,这个就不聊了啊,那接下来呢,我们就要写这个代码啊,写这个代码对吧?好,那我们到这边来,终于开始写这个代码了啊聊了。1000多啊。好,那在APP包里边在dim对吧,我们叫Di可以吧,就叫这个名字啊,当然我这个命名啊,有可能跟文档当中变量命名啊,或者类的命名有所区别,这个大家应该没问题吧啊,看的时候不要告诉我诶,你怎么跟文档写的不一样,因为我不会去记一个什么变量名啊,那有时候就。自己发挥了啊,反正尽量做到建民之意就好了,OK吧,这个我就强调一下啊,不聊这个事儿了啊,行,那我们就叫DB,然后呢,我把思路写一下,就刚才我们所说的对吧,这里边先PSM好,那么第一步。好,每次代码呢,我们整理完思路之后,我都把这个注释先写好,然后根据注释来写我们的代码,对吧。
01:03
第一步那当然是获取执行环境啊,当然跟我们的流程图可能稍微有一点点区别,对吧?好,那第二个。读取卡夫卡哪个主题叫BB?主题数据创建流对吧,这个创建主流啊,我叫主流。好,那第三步。第三。我们要。过滤掉非杰森数据对吧,但实际上我们说了,你在业务数据里边这一步呢,你也可以不做,因为业务数据呢,它是没有这种脏数据的,你要以防万一,是不是也可以做一次过滤啊,好,那我们把它加上吧啊。
02:01
过滤掉C。杰森数据啊,对吧,好,那更重要的呢,不是过滤掉非杰森数据以及什么非杰森以及。保留只保留,就是监测数据里边,我们也只保留什么。啊,如果说你确实是一个结算数据,对吧,这个是属于。脏数据吧,它是用不了的数据啊,那我们只保留什么新增变化初始化的数据,对吧,只保留这个新增。谁要新增变化?以及这个初始化数据对吧,像delete,还有这个in startto complete,我们都不要,这种是都属于脏数据对吧?初始挖的好,那这是一个点啊,当然这一步你可以不做。
03:04
你可以不做对吧,你写上也行,不写也可以啊,因为是业务数据对吧,好,那这个呢,就搞定啊,这是我们的第三步,那这样呢,形成了我们的。主流吧,待加工的主流,那接下来第四步。第四步。啊,咱们做什么事呢?嗯,使用CDC读取MYS配置信息表,创建配置流。啊,读取MY配置。信息表创建配置对吧?诶第二个流有了咱们的主流,咱们的配置流都有了,好那接下来第五步。将配置流做成一个广播流,对吧,将配置流处理为广播流,OK吧?啊处理成这个广播流,那么第六步啊,你主流有了,广播流有了,那就连接。
04:20
主流。与广播流对吧,就完全按照流程图来的,只不过说呢,我们在流程图里边把细节加上了什么直行环境啊,这些东西都加上了,对吧?好,那得到了一个连接流吧,那然后接下来第七步。处理。连接流。处理连接流,那这个处理连接流里边核心的点就是根据配置信息处理主流数据,对吧,那我的点呢,将配置数据写到状态。
05:03
主流无状态。对吧,啊好,那这个搞定以后,那这个就已经将我们数据过滤好了,我们只要的数据对吧,然后呢,将数据。好,接下来我们的流了,对吧,将数据写出到Phoenix Phoenix。对吧,好,最后一步。这个不能忘了,我们是一个流的程序,对吧,咱们得启动这个任务,好好,那下来我们就来写这个内容啊呃,那第一个在这个位置,我们要获取这个执行环境,那就是。点。Get对吧,Y加V,那我们就得到了一个因为啊,然后将来咱们设置变度,Said,好,这个地方呢,我设置为一啊,那是因为。
06:02
我的主题都是一个零度啊,那这个在生产环境当中,我们写一下生产环境怎么设置,大家告诉我生产环境当中设置为多少。有没有同学告诉我?就是关于这个并行度,在生产环境里边我们应该设置为多少。集群数量。啊,其他同学呢。合数啊,继续。还有同学呢?对了啊,就是设置为卡夫卡主题的分区数呀。
07:05
怎么集群数量合数CPU都出来了,你未来整个集群就跑这一个任务吗?啊。你未来整个机型就跑这一个任务啊。你有很多任务,你集群。整个CPU有5000个,你直接设置为5000吗?对吧,那肯定是根据卡夫卡的分区数啊,这个要知道啊,因为我卡不卡呢,都是一个分区啊,因为我是让他自动创建的,我没有手动创建过主题啊,偷个懒对吧,它自动创建的默认的都是一个分区,我也没改那个配置文件啊,但是呢,呃,我都测过那个啊,测试集群呢,我都用过在这个当中。我设置是四,就是多个冰度是没有问题的。啊,多个拼都是没有问题的,OK吧,啊,这个点好啊,那接下来呢,我们设置完这个内容之后,我们还要做一个事情,在这边我们写上。
08:05
一点好。开启拆个。啊,开启开个方啊,这个你在生产文件当中是一定要做到啊,因为点。Enable checkpoint。对吧,然后呢,给一个时间啊,生态环境当中呢,一般来说我们设置三到五分钟一次,那比如说我在这边啊写一个。五。乘以。对吧,啊,可以做这个五分钟一次啊,然后呢,你后面当然也可以加一个参数ex个event对吧?哎,开启个方的好,那开启完这个包之后呢,你还要做一个事情,设置状态后端对吧,还要设置这个状态后端啊相关的参数。
09:05
啊好,那在这边呢,我们设置状态后端,那就是因为点。Set stay back对吧,那咱们呢,现在。卡西啊,这是本地的一个内存级别的,然后呢,因为点get.set storage,然后这里边我们可以放一个HDF路径,对吧,HDFS冒号杜102。然后看,比如说我放到什么211126对吧,这个目录里边开point好吧,啊放到这个位置。啊,然后如果说你们这样做了,你要在本地运行,你本地的用户一般不是爱特硅谷对吧,我们集群上。名字都是爱硅谷,所以这个时候呢,还要同时设置一下叫system.set啊,然后这边呢,他杜叫user。
10:01
下划线啊,User name,对吧,加一个I硅谷,如果你想在本地运行要做这个事,好这是最基本的,那还有一些关于checkpoint是不是还有很多参数啊,对吧?啊,我们也把常见的在生产环境当中你可能要用到的一些东西呢给大家写上,因为点。啊,Get checkpoint conflict get set,你看这里面东西。Sorry呢,我们已经写过了,对吧,存储的空间啊,就是外部存储的,然后呢,Checkpoint out。啊,超时对吧,超时呢,咱们给一个十分钟啊。给个十分钟,好好,那这是超时,一般呢,我们也要设置一下点,继续said,看还有什么东西啊,呃,这个是max con checkpoint。就是最多同时可以有几个checkpoint,因为你看啊,我五分钟开启一次,超时时间是十分钟,那有没有可能我一次checkpoint做了。
11:07
八分钟。好,那这儿你开始诶,到八分钟以后再结束,那到五分钟的时候,我是不是又开启了一个新的拆方,对吧,这个时间是五分钟,这个时间呢是三分钟那。这这一段时间。就是由两个差方向的共存,如果你设置为一啊,假如说你这设置为一。那么你看啊,第一个拆point的,比如说你做了八分钟,这是八分钟,理论上来说在五分钟的时候,他要新起一个,但如果你这设置为一这个,这个就起不了。啊,这个就起不了对吧,这是con,哎,库存的有几个checkpoint。对吧,啊,像这样的东西啊,包括也没点啊。还有set对吧,呃,Mode你可以改,然后拆发in可以改这两个参数这两个。
12:01
你可以不用写了。因为这已经给了。啊,那这个就对应的是下面一个,这对应的是它对吧,你可以直接写啊好,那还有。看一下最小的间隔时间。对呀,啊,暂停时间这个啊,这个五分钟指的是头跟头,而这个是头跟尾啊头跟尾对吧?好,那这些东西呢,你都可以去设置,包括还有一个东西。重启,假如说遇到问题了,我要从checkpoint自动恢复,对吧?诶可以配置这个重启策略,呃,一般来说呢,我们可以用那个rest street。嗯。哎。
13:00
好,我刚才调用错了是吧,啊art我看一下啊。他的一个策略。他有一个。诶,我看一下这个地方。他放的内容啊,叫他是吧,啊这个哎,怎么调不出来呢。Strange对吧点啊,咱们呢,可以用这个失败率重启,也可以用这个固定频率重启对吧?啊,那固定频率重启呢,一般我们可以这样用,那怎么做呢?姐啊,我们看一下它的参数。好,什么叫固定频率重启呢?看啊这个呢,Restart叫重启尝试次数。这个呢,后面的参数一个long值,或者说给个time对吧,这是什么东西呢?啊,浪值的话就是毫秒,默认就毫秒啊呃,那叫delay between。就是说在。
14:02
两多次尝试之间的一个什么延迟,就是说这个假如给三次,这个给五秒,就是我总共会尝试三次重启啊,然后呢,每隔五秒钟尝试一次。就这个意思叫固定频率统计对吧?好,那比如说这给三啊,再给五秒。啊,是这样的东西。OK吧,好,那这个这块东西啊,注意在伸展环境当中,你一定要去把它配上好,但是咱们呢,现在是做测试嘛,因为checkpoint这个东西呢,大家之前也都。用过对吧?啊,如果说你这个东西都加上你测试的时候,所有的测试你都必须得开这个开开这个HDFS,但这样的话集群要多开东西,对吧?啊,压力会比较大,所以呢,这个东西我每一个代码里边都会去写,但是我会把它注释掉,注意啊,我会把它。
15:00
注释掉,每一个我都会写,但是我会把它注释掉,写上是为了告诉大家。我们生产环境当中,你一定要做这个事儿。啊,注释调呢,是为了测试方便,我们就不用每一次都去开HDS对吧,有时候是不需要有的测试不需要开HD,我们少开一些东西啊,但是呢,我每一个都会把这个粘过来,就告诉大家。我们是生长环境当中,你不要漏了,不要忘了,OK吧,好,那这个呢,咱们就搞定。
我来说两句