00:00
好,那接下来呢,看我们的第一个需求叫用户域用户注册的各窗口汇总表,这个需求呢,是非常简单的一个需求,就是统计一下各个窗口当中每天的。注册用户有多少,那这个呢,我们直接去消费DWD层用户注册表啊,在DWD层呢,我们已经将这张表处理好了啊,在第一个啊用户注册啊,等会儿呢,我们就可以拿取这个主题的数据取消费过来,在这当中呢,我们是只要音色的数据,那只要是新增数据呢,对于我们来说都是注册用户用户表当中啊,所以这个需求呢,我们就明确了我们的思路,来,我们直接看我们的PPT啊,因为这个呢就比较简单了,第一个先消费DW层用户注册主题的数据啊,第二步呢,转化为对象,这个主要是为了方便后续的操作,那第三步提取事件时间,生成我们的waterma,在实施项目当中呢,我们尽量的要使用这个事件时间啊,因为我们最终呢写到克house,比如说我们任务出错了,那么用施件时间的话,在任务重新消费这个数据以后,我们。
01:15
它可以保证两次消费的数据在同一个窗口,那最终呢,写到克里奥数据完全一样的,那我们可以利用克雷奥斯的密等性来保证最终的数据一致性啊,所以呢,大家在生产环境当中也一样,尽量的使用实件时间啊。最后呢,是我们的开窗聚合计算啊,那这边呢,并不需要做分组,因为我们就是计算一个注册的总人数,没有任何的维度啊,最终呢,把数据写到克里house,当然写到克里house呢,我们是给它封装了一个工具类,这边呢比较麻烦,所以等会儿我们写代码的时候分两段,前半段呢我们可以先写,之后呢,这个克号工具类呢,我们要分开去写啊。好,那接下来呢,我们到coding第一个克里浩斯的建表,我们先不用着急,这个建表呢,最后我们测试的时候再去做啊,那第一步呢,先把这个扎va病拿过来,在并包下我们创建一个Java病。
02:14
然后呢,把这个拿过来。啊,那这里边呢,就是四个字段,呃,前面两个呢,是窗口的时间啊,开始跟结束时间,还有一个C,我们注册时间加一个TS,我们的版本信息最终写的house,如果说我们需要做到密等信,我们需要提供一个版本号啊呃,那接下来到这边创建我们的主类。啊,中间的克雷house这个工具类我们先不用管,我们直接到我们的主类。把我们刚才的思路先写好。第一步。获取执行环境。然后那就是读取。
03:02
卡不卡DWD层。用户注册主题数据啊,那第三步将数据转化为。接对象,呃,但实际上这边呢,我们可以直接一步到位啊,转化为招聘对象,因为方便我们后续的处理啊,那这呢,我们就直接写招聘对象,当然你先转化为杰森也可以啊,这个都行的。啊,都是为了方便后续的操作啊好那。第四步。提取失电时间,生成word。好,那。接下来就是开窗。聚合啊,开窗聚合好以后呢,把数据写到house,将数据写出到clearhouse啊,但是这一步呢,我们先不着急去写啊,我们先写到这个开张句合这一步啊,那我们一步一步来,第一步先获取执行环境,那我们再写一遍啊,前面呢,我们已经知道有很多次了,Ex environment啊,这个导包不要导第一个,第一个呢啊,第二个呢是double.get。
04:25
所以有的同学呢,可能在下面,诶写着写着,呃,怎么我这边就开始包错了对吧,那一定要检查一下你的包是不是导错了啊,那接下来呢,就是读取数据点ADD source,呃,用我们之前写好的固定类点get。两个参数,一个主题,一个消费者组,主题呢,我们要到DWD层用户注册这边去取一个啊,直接将这个拿过来,最终我们写出的主题拿过来在这儿。消费者组呢,我们就直接取他。啊,那我们是DW叫用户注册对吧,我们就直接取这么多啊。
05:02
什么温度呢,这个就不要了啊,然后接下来把它。小写。叫user register,好,那这样的话呢,我们就得到了一个卡夫卡的一个流,呃,那我们对这个数据呢,要把它转化为切对象,或者说呢,招聘对象,那在转化为对象的时候,那我们肯定要用map方式啊,那我们是一行数据。呃,首先呢,第一步它是一个。普通的字符串啊,为了好取里边的字段,所以我们转成Jason,最后呢,又一个user并啊,就刚才我们写的这个扎病,那我们看一下这里边的字段啊,刚才呢跟大家说了有四个字段,前面两个呢是我们的窗口信息,现在呢并给不了啊,所以呢就空着,然后呢,这个注册人数来一条数据,我们就加一就好了,最后这个TS,由于下面呢我们要提取实现时间,所以S必须来自于这个数据当中,那我们要去看一眼啊这边的时间,呃,那咱们呢是有一个。
06:05
Korea time。对吧,这边有个创建时间啊,那有同学说这有个TS,这个TS呢是我们前面。CC给我们加的对吧,那这边呢,我们可以直接使用create time转换一下啊呃,那在这边呢,我们。之前写好的一个form。啊,那我们可以直接把它拿过来去使用。啊,当然了,我们也可以这样,我们要的是一个浪类型,我们可以这样来做啊,来看一下。我们还有一种方式,直接接of.get along,把create time给它取过来。那刚才想了你这个。这个点啊,Get到它可以是具体的,比如说你是123456789,哎L这种是可以的,那另外一种呢,就是如果说你是年月日十分秒这种格式的get到,那这个方法也是可以生效的。
07:00
好,那我们就得到了一个。叫的点啊跟你转化为一样,那接下来我们提取时间戳点a and啊,我们一般的给一个乱序程度,这边呢,设置公司当中最大的乱序程度就行了,比如说我这边呢,可以给一个M啊,那这边呢是我们的招聘的类型啊。之后在这提取时间戳,那时间戳呢,刚才我们已经知道了,在这个数据当中呢,就是我们的,就是我们的时间戳啊。好,那这个招聘呢,我给大家取个名字叫use red with world mark DS啊,那最后呢,我们要去开窗聚合,开窗聚合呢,两个目的,第一就是把这个一做一个累加,第二补充窗口信息那。对于一做累加呢,很明显我们可以用增量聚合,但是在增量聚合当中呢,并没有窗口信息,所以呢,我们接下来采用一个增量加全量的方式,点reduce。
08:09
Window or啊,然后又有一个。啊,这边呢,我们先给一个窗口啊,叫汤姆。诶。Tom。点off time,点十秒钟,这个之前我们介绍过了,点,然后reduce啊,正常的你在使用这个reduce function的时候,你会发现一个问题,就是说呢,这边它并没有窗口信息,它只能做到,诶,我返回VALUE1,然后value61.set CT,然后VALUE61点。Ity加上value的。哎,只能做这个累加,这是我们第一件事,第二我们还要把窗口信息补充上,那这个呢,需要我们的窗口函数来做这个事情啊,所以我们结合一下,这边又有一个叫all window的方式啊,那返回值呢,我们只是补充用户信息,对吧,所以呢。
09:13
返回值还是这个账号并在这个当中呢,这边就有一个看window窗口了,呃,同时呢,这边有迭代器,但是呢,我们是用的增量加全量的方式,所以此时这个迭代器当中只有一条数据啊,这个大家注意一下。点,我们直接获取next。下一个啊,不用判断是否有下一个,直接获取下一个就好了,Next啊,那接下来呢,补充信息,next.set叫ST,但是呢,我们是一个string,既然这样的话,我们肯定要给定义我们的年月日十分秒格式写出去,未来查询的时候更方便一点啊,那我们调用我们点to y mdh Ms,年月日十分秒,那看点get start啊,同理,Next点。
10:06
Set叫e dt窗口的结束时间data form点啊,还是一样的,我们用它的end啊,在这边呢,我还是要改一个值set ts,呃,因为这个TS呢,作为我们最终的版本存在的,所以呢,我们用当前的时间来处理啊,最后呢,加工完了之后,我们可以把数据输出哦,这边呢,我们可以用connect点。Connect,把这个next呢进行一个输出,这样的话我们就得到了我们最终的聚合结果叫reduces,在这边呢,我们可以做一个reduce ds.print做一个它的打印啊,当然了,在这我们肯定要把数据写出啊,那我们下一个视频再来聊这个事儿,最后如果你想要打印测试一下,你别忘了这边呢一定要启动我们的任务,也就因为。
11:02
那我一般呢,把这个。类名作为我们的job名称啊,这样在我们的未来在页面当中呢,也可以对各个任务做一个区分啊,那这块呢,我们第一部分代码这个还是很简单的,对吧,好。
我来说两句