00:00
OK,那MYSQ表的全量同步我们就同步完了,那全量同步完事,我们还有一些表需要进行增量同步的呀,那增量同步我们看看增量同步的数据通道,哎,是什么样的啊,先来看一看。那增量同步我们要通过my circleq,然后呢,把my servel的数据发送到卡普卡,发送到卡卡之后我呀再通过哎这个某些工具给它同步到HDFS啊my circle到卡夫卡,哎,增量到卡夫卡这个过程我们是不是已经实现完了,我们用的是谁呢?我们用的是max ma,哎用的是max,然后呢,你看啊,那后面我们通过一个flu给它发送到HDFS,那OK,那接下来我们看看啊,MYSQ这有一个表,那最终发送到HDFS是怎么样呢?哎,也是啊路径最终呢,包含一个表名,哎,这个后缀就是INC了,然后再包含一层日期啊,那再来啊这个表啊,这个路径,那这个表。
01:07
这路径啊,这路径那最终啊,在卡布卡当中,我们这些表它存储在哪一个topic呀,它存储的这个topic的名字叫做topic_DB。哎,存储在这个topic当中啊,之前我们这个topic是topic啊,下划线log log,那现在啊,它是一个DB了,然后通过flu我给你同步过来。好,那这个事我们应该怎么干呢?哎,这个环节我们是不是做完了。那做完了,接下来我们是不是就只剩下这个了啊,那又是写flu,那来吧,我们看看这个flu我们应该怎么写呢。好啊,先暂停一下,OK,那接下来我们要分析一下这个flu我们的选型应该是选什么样的啊,啊,那过程还是跟之前一样,哎,还是要把卡夫卡的数据发送到HDFS,然后我们在这个地方需要一个动态的日期,那既然需要一个动态的这个日期,我们就要考虑到数据漂移,那考虑到这个数据零点漂移的问题,我们就需要的有这么一个呃,Source拦截器啊,需要用到拦截器,那用到拦截器我们就必须得有S,所以说这个啊,我们还是得用s China和think这个结构,那最终我们给它定为哎,这个啊,卡瓦萨斯。
02:34
以及HD FS think,哎,这个我们之前写过啊,那我们看一看吧,怎么写啊卡这回监控的是谁呀?监控的topic是topic_DB了,然后我们要来一个拦截器,哎,那这个拦截器我们呀,在这个里边就要做两个事儿了,之前这一个拦截器我们做的是哪个事儿,是不是就只有一个时间戳啊?哎,只给它附上一个时间戳,为了解决数据漂移的问题,好,但是现在我们还要有一个字段了,叫做table name,为什么呢?因为我们是根据表名来进行什么呢?
03:15
哎,来进行落盘的,好,那根据表名来进行落盘,既然时间它能自定义啊,你看时间是可以自定义的,那对应的我们这个表名它也可以自定义,那它怎么自定义呢?哎,依然是用百分号啊,用百分号加上大括号来一个table name获取我们这个自定义的hier的的名字啊,Hi得的K,通过这个就能获取hier对应的K,它所对应的Y流值了,那么我们在这个拦截器当中是不是就要做两个事儿啊,一个是。时间戳,那另外一个呢,就是我们的table name了,好,那来吧,我们看一看这个拦截器怎么写啊,先不看拦截器啊,先看的配置啊,OK,那我们看看啊,这呢也给我们了一个例子来打开双击。
04:05
我们在卡夫卡的topic当中啊,数据是哪来的呢?是数据是Maxwell导入过来的,那数据的格式是这样的,它是一个摘森字符串啊,你看就是表明啊类型是一色的,然后TS事件,好,那然后呢,我们通过给它发送到HDFS,我们看看啊,这是卡S,那卡瓦卡S当中我们就要怎么的在hi点头当中把数据包里的这个table标明,哎,我们是不是有一个table字段呢?那把这个字段放到head当中,好,那对应的我们是不是也要把这个时间字段。这个时间是我们数据写入到MYS表的时间吧,哎,把这个字段给我们复制到这个time stamp上,哎,要做这么两个事的。啊,那来吧,我们看看这事应该怎么做呢?那其实这个time STEM这我们在写的时候啊,还是有点,哎,这个地方还是要有点注意事项的,那好我们遇到问题的时候,我们再说往下翻啊看这个配置,这配置呢,我就直接复制过来了,跟之前是完全一样的。
05:14
啊,CTRLC,拿过来,拿过来,我们简单的看一眼。CTRLV粘贴啊,语言下看啊,那首先的配置文件,我们第一个是要定义组件,哎,South China think啊,Re,那south我用的是谁呀?用的是卡夫卡source,哎,然后呢,By size duration millions,哎,一个是控制P的条数的,那另外一个呢,是通过时间来控制,我这一批什么时候往下游写,那再有我要读卡夫卡,我得设置一个卡普卡的boot server吧,连接到卡卡集群呢,哎,然后我还要知道我读取哪一个topic的数据呢?诶,我读取的是topic下划线地利啊OK,然后对应的我们在这还设置了一个group ID,消费者组的ID啊这个呢,你自己定义啊,还是尽量要定义一个带业务名字的啊,那我们呢,就把这个topic的名字给它拿过来就OK了啊,这个呢,就是带业务意义的好。
06:19
那接下来,哎,你看这还有一个参数啊,这个参数我们是不是没见过呀?啊,Set topic hier topic header这两个,那这两个参数什么意义的,其实这两个参数在我们这是没什么义的,只不过我们给它拿出来了,哎告诉大家这两个参数也可用,那它默认的是什么呢?这个默认的是false false,那如果说false的话,这个参数就不生效了。啊,这个参数就不能要了,那好我们来官网上看一看。来到官方网站上。官网,Document user guide control Apple、卡夫卡、source。
07:00
啊往下找,嗯,那卡这里就有这么一个字段叫做set topic啊还是啊,它默认还是处呢,啊行吧,默认是处,那行,那我这个地方再给他拿回来啊,默认是处,那这个是什么意思呢?默认处,当我们给它set成处的时候,它会存储这个topic的名字啊,把这个topic的信息,也就是topic的名字存储在我们的hi当中。啊,啥意思,你看如果说我们把这个参数设置成处了,哎,它呀会把我们这呢,我们不是从topic-DB这topic当中获取的数据吗?那么它会把这个topic给我们存储在。的hier头当中啊,这个是hier头,这是hier存储在hier头当中,那存储它需要一个KV呀,哎,VALUE6很显然是topic-DB了,那K呢,哎,是我们下边设置的这个topic还ER,哎,名字就叫做topic,哎,它是这么存的啊,啊topic,然后下划线ad啊这这个名字叫做topic value呢叫做topic_DB啊那如果说我们不需要它的话,其实这个参数就可以给它设置成false了,那么这个参数一给它设置成false,哎,这个呀,它就不生效了,那这个参数我们可以给它干掉啊,其实没有必要啊,那你们呢,想给它放上也无所谓,行,那接下来我们这还需要一个拦截器啊,哎,那这个拦截器叫做time stamp and table name intercept,哎,在这个拦截器当中,我们处理了一个时间戳,又。
08:49
处理了一个表名啊好。OK,那这个拦截器呢,我先放到这,然后接下来file啊,这个channel我们用的是file channel checkpoint DR是备份,注意这个备份的目录啊,不要和之前的这个目录一样,之前叫什么呀,Op pd model takepoint behavior一看一下在这里104CD杠,OT model。
09:18
然后take point,点这里我们已经有一个BEHAVIOR1了啊,如果说你再来一个BEHAVIOR1,他俩就存储在一个文件当中了,哎,不要这么存啊,就存储在一个目录了,不要这么存啊,不好啊。啊,再有。Date DR啥意思啊,数据存储的这么一个目录啊,同样这个目录我们也要给它区分开啊,之前都叫做behavior cb,点点,我们再来到date。电子目录看一看啊,这里也有个BEHAVIOR1,现在我们要都给它改成二了啊,OK,第一个存储在一当中,第二个flu的数据存储在二当中,OK,那接下来我们这还有一个啊,Max file size文件最大存储的数量啊,Cap啊,这个five参Le的一个条数的限制啊,容量啊是100万条啊Q,什么意思呢?我等待几秒,我再次执行一次,我这个事五啊,如果说再没执行成功,我才回滚啊,那第一次执行失败,第一次这个事务没有成功,先不回滚,先等待你这个keep alive的时间,OK,那接下来我们看看h d FS think,那首先HDFS啊,Type类型是HDFS,然后pass文件路径,哎,我们指定的是origin date gl,然后DBDB下边啊,用文件的啊,不是文件名,是表的名字,拼上一个INC,叫做增量表。
10:46
那增量表我们后边还要拼接上一个日期啊,好,OK,那这file的perfect文件名的前缀之前叫log,现在我们叫DB了啊,现在都叫DB了,好,那接下来round呢,我们可以做一个更细力度的这个文件滚动啊,OK,那下边这三个不用多说了吧,哎,处理小文件的啊,这个呢是基于时间滚动啊,这个呢是基于大小滚动,这个是基于条数滚动,哎,我们在生产环境当中啊,它设置成零就完事了,不让它生效,设置成零的意思代表它不生效啊,OK啊,这个为什么设置成这么短啊,十秒,因为我们现在要看效果,那你设置成一个小时。
11:28
啊,设成一个小时不合适,我们呢,可能得一一个小时之后才能看到结果,那这个呢,哎,我们是给它设置成128兆,哎,默认的呀。我们的哈杜的文件块大小就是128兆,诶,这个是我们最想要的啊,OK,那接下来这个是文件压缩方式,我们指定了一个comp stream啊文啊,文件类型我们指定了一个comp stream,叫做压缩流,那如果说我们选择的是压缩流的话,那么这个参数我们就必须配,选择其他的不需要配啊,它还有两个,一个叫做呃,Data stream,也就是这个文本文本文件,那还有一个叫做sequence file啊,这个二进制啊好,那这个压缩编码我们给它指定的是GZOK,那接下来是拼装,哎,把这个S发送到China think,从这个channel当中读出去啊,OK,那么这个就是我们的。
12:26
的配置好,那我把这个的配置拿过来,CTRLC。然后放到我们的项目当中,注意我要来到104,来到104的这个啊里边那里边来到这个job,那LL,然后我要新建一个VM,一个卡夫卡图HDFS,注意啊,现在不是log了,叫做下划线db.com OK啊,完事之后我在这右键粘贴,诶把这个拿回来啊,冒号WQ保存,OK,那么这个的配证件,哎,我们就写完了啊,OK,那的配置完事了,接下来我们还要编写一个拦截器,那拦截器我们依然就用之前的这个拦截器的项目来写就行啊,我们要给它起一个名字,名字叫做这的啊。
13:23
啊,就这个time stamp and table name intercept,拿过来,拿过来依然在这包下面啊,Intercept右键new一个Java class ctrl v,诶拿过来好,这有点长啊,有点长,但是它代表业务含义嘛,那怎么办呢?我们实现一个。那我们要实现一个form的intercept啊,Intercept实现完之后啊,重写四个方法,四个方法初始化,关闭单event多event,然后呢,哎,立即的把这个静态内部类给它写出来啊PU public staic class builder b啊继承一个啊不是继承实现一个inter。
14:13
Intercept里面的点build好,然后给一个大括号,重写两个方法。那这个方法我们还不能到,我们要new一个我们自己的,哎,这个拦截器啊,注意不要拗错了啊,New time stamp and table name intercept。OK,那完事之后,那来吧,开始实现这个方法吧,那这个方法我们最终要实现一个什么功能呢?我们呢,要把body当中的数据拿出来,放到handle当中吧,诶拿拿哪个哪两个字段了啊,现在是哪两个字段了,一个是TS,一个是table啊表明两个,那第一步我们应该获取拍当中的数据。
15:02
Head和body当中的数据好获取,那来吧,Event用它来获取啊,CTRLC点一个。Get header,先把header获取出来,就叫header,然后even点一个get body,再把body也拿出来啊,Body拿出来之后是一个字节数组,还不行,我们用一个string啊,用一个string,然后给他一个body啊,给完body之后,我们在是啊standard给他一个编码叉赛UTF8啊给他一个编码格式,OK,那完事之后我们再给他一个返回值,返回值叫做log啊,OK,那这个log拿出来之后,Log呀,它现在是一个摘森字符串啊,这个里边呢,有两个K,一个K啊有有两个字段,一个叫做。
16:00
TS一个叫做table,我们看一看啊,在这上面。啊,这里边现在有两个字段,一个叫做PS,一个叫做table,我们要把这两个拿出来啊,那这两个拿出来,那第二步。解析log当中的TS和table name和table字段,好,那解析我们之前是不是也用过一个方法呀?哎,在这个阿里巴巴的fast啊,Fast里边有一个object这么一个类啊,这个类里边诶有一个方法叫做price object,它呀,就可以把我们的这个Jason类型,Jason字符串给它转化成一个Jason object类型,那这个JA object类型,它也是一个KV结构的这么一个数据结构啊,然后我们可以通过啊这个方式诶。
17:08
到哪去了这了啊,通过再object,第一个get string啊给它获取出来,获取出来之后你看我们干嘛呢,我们呀。获取的这个K是哪一个K呢?首先table表名,哎,我们先把表获取出来,获取出来之后我呢,给他一个table啊,获取完之后我们还得来一个,现在获取谁呀,还要获取一个TS啊,获取两个好,获取完了,获取完之后我们最后一步第三步。把和table name和table放到hi当中的table name。
18:08
大写的name table name和这个time time stamp字段stamp OK,放到这两字当中,行,那来吧,我们操作吧,开始先来table name啊,嗯,Table name hier,点一个put,那put一个K叫什么呢?哎,就叫做table name,我看是table name吗?啊,来到这啊。在哪呢?获取的是谁?哎,就是它啊CTRLC拿过来,拿过来之后放到这啊,没有问题,那它的value是谁呢?Value就是我们刚刚获出来获取出来的table,好,那接下来我们这还有一个呀,啊还有一个叫做TS,那TS我们也得放进来呀,那TS那这个呢,叫做TSOK,那完事了吗?还没用啊,这还有一个一问返回,那这个就完事了吗?哎,也还没用啊,其实这我们有一个要注意的地方。
19:07
哎,什么呢?来到问答当中。看看这儿啊。你看我们数据当中的这个包当中的TS,它是一个几位的呀,它是一个一二三四五六七八九十啊,我们body当中它是一个十位的数据,也就是说Maxwell它会把我们的这个数据给转化成一个。十位的时间戳,那十位的时间戳它是一个精确到秒的,但是我们之前说过呀。我们的拦截器它需要的这个时间戳是一个13位的,也就是精确到毫秒的啊,那他俩我还不能直接就这么放进去呢啊,我要把你这个秒给你改成毫秒,那怎么改呢?啊,无非就是差三个零呢,你看我现在是十位的,那十位的我在这往里写的时候,你不TS吗?我给你加上一个三个零字不串,哎,加上之后我现在这是什么了。
20:16
哎,这是不是就变成一个从十位的,哎,我再加上一个三个零,那就变成一个13位的,哎,毫秒值了啊,那但是我们文档上啊,文档上不是这么做的,那文档上是怎么做呢?我们看看。文档上做的要比较麻烦一些,但是我觉得没有必要啊,往下翻我们看拦截器,哎,这。嗯,这个地方啊,你看文档上啊,它获取的是get了一个long啊,获取这的时候他没有get get一个string,在这get了一个浪,那get一个浪之后它干嘛呢?通过。string.value off再给它转化成string啊,那把谁转化成string呢?把这个TS你不是一个浪吗?我给你乘以一个1000,哎,是不是也是在当前这个十倍的后边乘以个1000嘛,也是在乘以一个三个零嘛,啊一个意思啊啊乘以完之后我给你转化成一个啊,时间戳毫秒值的时间戳那转完之后。
21:24
你看它干嘛呢,把这个参数放到这了,放到time Sam啊,我这写错了啊,是time Sam不是TSOK啊,这个呢得拿回来CTRLC好,那你用这个方式也行,那你呢,用我这个方式也可以啊,你想用谁都行啊,这两个你用哪个都没有问题啊,这个地方注意啊,是time STEM不是TS啊啊这个它解析不了TS字段,OK。那这个单event我们写完了,接下来我们看看多event,那多event跟我们之前写的是一样的呀,List呗。
22:00
List第二一个for for循环那event,然后调用我们的这个intercept把event传进去,传进去之后我最终啊把list啊返回出去,OK,那么到这我们这个拦截器啊就写完了,嗯,写完了之后我得给他打个包啊克里。好,可令一下。可练完之后拍打包。好。OK啊,打完了打完了,嗯,我们来到这个target目录,在这右键open in X explore,好,依然啊,是这个炸包,那这个炸包我们给它拿到我们的项目当中,好拿到104啊,注意CD1点点啊进来,然后CD到力目录啊进来,进来之后我们先把之前的这个from intercept给它干掉,干掉之后我呢把它拿进来。
23:02
诶把这个拿进来,OK,拿进来之后,我们这个完事了,然后配置是不是也写完了呀,CD点点啊,然后CD到照目录我们看一看vim_DB啊,这会看到OK,那这个我们是不是也写完了,那写完了之后啊,注意啊,我们为了避免这个出错啊,为了避免拦截器出错,我呀直接在这儿给他copy一下。Builder,右键copy。全里面啊copy reference,我呢自己重新搞一下给。我不用它这个我重新搞一下,为了避免它出错,好三角。OK,删掉之后右键粘贴啊,然后呢,把这个点啊给它换成一个到了符啊,OK,那这个拦截器的名字啊,肯定是没有问题的,我呢直接自己复制过来的,哎,完事冒号WQ保存,那完事之后,接下来我们是不是就可以进行一个测试了,好,那到这我先暂停一下那测试首先我要把这个flu给启动起来吧,哎,把这个flow给请起来啊。
24:10
啊。那我们在这呢,也有一个。的命令啊,我们先把这命令拿过来。在哪呢?哎,这不有单命令啊,这呢,我们把这个form的启动命令拿回来,放到这CD点点,然后右键粘贴,哎,那光启动这些flu没用啊,我们得把整个的链路都起来吧,你看我们现在启动的进程都有谁呢?XCJPS。哎,我只启动了一个卡豆吧,啊还不行,我还得把卡卡起来,那启动卡不卡,我要先启动ZKSTT啊,把JK启动。哎,这个测试啊,就是我们的一个全链路了啊,从头到尾啊,从买SQL到卡不卡,然后一直到HDF啊,那它启动完了,我们看看状态s sta OK,启动完了再来Kf.ST好。
25:09
那卡夫卡启动完了还没完事,我们还得把Maxwell启动起来,MXw.SHTRT好啊,它也启动啊,那把它启动完了之后,那接下来就剩下我们的这个了,哎,右键粘贴启动没事。OK,那到这整个的这个链路我们是不是就打通了,看一下啊,看看这个数据通道,先启动了一个HDFS,然后呢,启动了一个CK和卡夫卡,启动完之后再启动一个Maxwell,最后我启动了一个这个,那么当我往my circleq表当中写数据的时候。对应的Maxwell会采集到卡夫卡,卡夫卡会给我们采集到HDFS的,哎,每一张表的每一个目录当中,好了,那接下来。
26:02
我就生成呗,开始吧啊,我在102上,我CD到o p model,然后DB下划线绕我们这是不是有一个炸包啊,我在这里执行一个Java杠炸进卖了啊回。看看行不行呢?嗯,行不行,OK,数据已经生成了啊,我们等待的这个Maxwell,哎,不是Maxwell等待的这个104的发生变化,诶104的已经开始变化了,你看这个地方是不是已经生成了,开始创建文件了啊啊,这个是command in。哎,评论表啊,评论详情表的一个INC增量表,哎,你看这是一个日期,然后呢,这边还有一个啥呀,哎,这是不是还有一个。这个啊,文件呢,DB点一个什么什么啊,这是我们的前缀啊,点time,现在还是一个临时目录,OK,那接下来我们来到HD上看一看吧,嗯,卡布吧。
27:05
1029870,哎来到这打开打开date gl DB啊,你看这个是全量表的,那然后我们找找,哎,你看INC的是不是也有了呀,哎,那关于增量表的呀,我们就也给它都同步过来了啊OK。你看已经有INC这个存在了啊。IOK,呃,那到这儿啊,我们这个其实整个通道啊就已经打通了,行,哎,但是你们有没有发现一个问题,你看啊,我们是不是已经做过数据漂移处理了。数据漂移处理我们已经错过了吧,我们在拦截器当中已经把这个时间抽给添加进去了呀。啊,这时间串我们已经给添加进去了。那为什么他还是解决不了数据漂移的问题呢?他写到哪了?他还是用我服务器的时间来给我进行落盘呢?
28:05
这不行啊,那这万年期失效了啊,好,那这个问题我们下节课再说。好,那增量同步的这个写完之后啊,我们也要给他一个启停脚本,这个启停脚本我就不自己写了,我们给他拿过来看一下,哎,这个启定脚本跟我们之前写过的flu启定脚本完全一样啊啊井号叹号B当BA这是一个S脚本,然后通过case选择器判断第一个参数是start呢还是stop呢,如果说你是start,我要SSH的哈,坐标零四执行启动的命令,那如果说你是停止啊,我SSH到104来执行一个停止的命令啊,依然是通过Q来来进行停止,好,那我把这个脚本给拿到我们的项目当中,CRC。来大会查查L那102的并目录进来之后啊,我vim一个f3.sh好啊,完事我把它粘贴进来,冒号WQ保存,那保存完了我们测试一下呗,啊先给他一个权限啊ch mod777f3。
29:18
点啊。好,那我们先来到1042看一看啊,GPS啊,现在没有room进程,那我们启动一下吧,F3STT发。好业务数据消费啊启动了,那启动完之后我们过来看看GPS。好啊,启动的这个进程啊,已经有了啊,或者进程已经启动了,那完事之后我们再测试一下stop呗,来停止一下,OK,再来到104看一看GPS。好啊,那这个芙蓉进程啊,它就被停止掉了啊啊,那这会暂停一下。
30:00
那我们接着刚刚内容,那刚刚啊,我们测试了一下,哎,我们这个增量同步的flow,那增量同步flu这个地方啊,有这么一个小问题啊,我已经做了数据漂移的,哎,这个解决了,那但解决了它并没有给我解决掉啊,你看本身我这个数据,哎,我包的当中的数据,我在生成的时候,我已经指定了,你看我们买S表当中啊。来到my circlel表看一看吧,My circle啊,我们呢,找一个嗯,Command in吧,看能不能找到这个command in,你看这边,那这表里边我们看看啊,我们刚刚生成的时间是什么时间呢?评论的时间,哎,评论的时间那肯定是6月14号的呀,2020年6月14号,但是。我们依然写到了今天的啊,这个文件夹下面5月20号,也就是用了服务器的时间啊,我这个没有生效,那什么原因呢?啊,那这是我们分析一下,好这啊这么一个图往下看。
31:04
啊,往下看。看这个Maxwell配置,看这个啊,嗯,Maxwell这呢,它有一个时间串的问题,你看啊,我们现在这个链路是要把my circle的数据哎通过呀,这个Maxwell给它发送到卡夫卡,然后form把卡夫卡的数据拿出来,再发送到HDS啊,那我们往买SQL表里边写数据,写到的是哪呢?写到的是这个表当中啊,啊写了一个2020年6月14号的数据啊,那写了这个数据之后,他是不是会把这个数据记录在哪呢?记录在我们的my circle的bin log日志当中吧。啊,他会把这个呃,相关的内容记录到B日志当中,那同时呢,它还会记录一个TS时间戳,好了,那现在我们要注意的就是这个问题,那这个TS时间戳它记录的是什么时间呢。
32:05
它记录的是我们生成的数据的这个时间吗?哎,不是,它记录的是我们买circle表的变更时间。我的数据虽然是2020年6月14号,那我们的变更时间是什么时间呢?是不是今天呢,6月20号,那我们假设啊是五月啊,啊5月20号,那我假设啊520,那我假设是5月18号,那么我5月18号执行的,那这个b log日志,它最终是哪个时间的呀。我们看一看这个时间是不是五月。18呀,那在b log日志当中的TS时间戳,它是5月18号好了,然后Maxwell。那他要同步b log呀,那它肯定就会同步5月18号,那这个日志啊,哎,记住那Maxwell要同步过来肯定就是5月18号了,那么这个地方就不是我们的6月14号了啊好,然后我们把这个TS以及一些相关的数据发送到卡夫卡,发送到卡卡之后,鲁姆把卡夫卡的数据同步过来,诶在这儿啊,我们还觉得啊,我们很聪明,做了一个时间抽的转化,那做了一个这个数据漂移问题的解决,我们把body当中的这个时间啊,卡夫卡S读出来,Body是不是就这个呀,5月20号吧,那5月20号,那五月二啊,5月18号,对今5月18号,那5月18号的数据我再给你放到嗨点头当中有什么意义吗?是不是没什么意义啊,他当然是今天的时间了,那最终落盘落到哪一天呢?那很显然落到今天呀。
33:52
那SDS think它默认是基于hier头当中的time stamp呀。哎,基于hi的头当中的time sta盘来落盘的,好了,那这个问题怎么解决呢?那我们先想一想,在这个环节它能不能解决啊。
34:09
啊,肯定是没法解决了,因为这个包的呀,它已经是5月18号了,哎,我们根本就不知道数据生成的时间是多少,哎,那Maxwell这能不能解决呢?哎,讲道理来说呢,Maxwell它其实是可以获取到这个时间的啊,那我们呢,就在Maxwell这来做了一个手脚,我们看看Maxwell这是怎么做呢。那怎么解决,依然是这个过程啊,往这写写的是5月18号,那还记不记得我们在安装mala的时候,有两个版本,一个叫做官方版,一个叫做教学版,那教学版它有一个什么好处呢?那安装官方版本它这个数据是什么样的,是2020年5月18号的吧。是不是5月18号,那安装教学版之后,我们在教学版本当中呢,给你多增加一个参数,注意啊,这个是我们自己增加的,教学版本多增加了一个参数,在config properties,也就是Maxwell的conf properties的这个配置文件里,我们增加了一个Mo data参数。
35:16
啊,那我让这个Mo data这个参数和我们生成数据的这个时间保持一致啊,那我们这个时间是在哪控制的呀,是不是在炸包里啊,在炸包的这个a.pro这个参数啊,哎,在这里控制的这个时间吧,也有一个Mo,那这两个Mo date的时间我让你是一致的。那么他会干嘛呢?这个呀,他就会强制的把我们TS的时间戳给改变了,你看啊,我之前不是5月18号吗?同步过来5月18号,那同步过来之后,它会强制的把我们的TS给换成6月14号,那接下来数据到卡夫卡是不是6月14号啊。
36:02
那然后我这边拦截器再通过数据漂移的这么一个方式一解决,那么现在是不是就6月14号了,哎,那它呢,也变成了6月14号了啊,那基于hi头来落盘吗?哎,Time落盘OK。那这个问题的解决方案呢,就是用我们macel,哎,教学版就能解决,然后增加一个参数,那这个地方大家要注意,教学版是我们学习的时候用的,哎,我们呢是为了看到这个,看到什么呢?我们是为了看到这个结果,然后呢,我们在这做了一个改动啊,那在生产环境当中不要用它,生产环境当中不会有这个问题,生产环当中你看啊,往往买SQL表的变更,那肯定是今天的呀。对吧,然后我6月14号的数据,他肯定会写,他这个B的记录肯定是6月14号。OK啊,这个地方是没问题的啊,那由于我们现在是测试环境,所以说这会出现一个时间差的问题,那我们呢,就给他把这个时间差的问题给解决一下,OK,那我呢,部署的刚好就是教学版,那接下来我就要增加一个这个参数叫做Mo,好,那来吧。
37:14
解决一下啊,在这呢,把这个参数拿过来,拿过来之后。来到哈杜102啊,CD-OPT model Maxwell,哎,这里有一个con.proper进来,进来之后我在这增加一个参数啊,右键粘贴粘贴过来,然后冒号WQ保存啊Mo it参数好。诶冒号WQ保存保存完之后,注意MYWELL1定要重启啊,你不重启这个参数不成效,然后MX w rest。OK,重启完了,那重启完了,接下来我得这么的啊,嗯,来到这儿,来到这儿之后这样吧,还是这个我给它删掉啊,给它删掉,删掉之后我就删掉这一个目录啊,那这一个目录他如果说要是生效了,那其他的是不是也就生效了,好我就以这一个为例啊好,那来吧,那接下来我再执行一下Java杠杆,Java CD-OPT model。
38:26
DB下往前走,我执行一个Java杠,这记没有好回去。好,它现在已经开始执行了,那开始执行了,那接下来我们看等到等着这个104这它变化。哎,等这里变化,这个变化了,它再往下写好完事了,那完事之后我们来到HDFS上看一看,OK,那接下来我一刷新,你看,诶现在这个问题关于时间串的问题,我们是不是就解决了啊,OK,那这就是我们教学版啊,啊这么一个Mo data参数的么一个变动,好,那这会暂停一下,那增量数据同步到这我们就完事了,那增量数据我们同步的是my circle业务数据库表的,哎,这么一个数据啊,那my circleq业务数据库表它很有可能是已经运行了一段时间之后了,我们再做增量同步,那既然是运行了一段时间之后,它可能就会有一些历史数据啊,那增量同步历史数据我们是不是同步不过来呀,我们只能同步,当我们这个Maxwell上线之后,然后我们再往买口写数据,那么Maxwell可以把这个数据给它同过来。那之前的这些历史数据,它。
39:46
同步不过来,那怎么办呢?啊,那有同学说,哎,我可以在增量之前,我做一次全量同步啊,对啊,没问题,确实我们要做一个全量同步,那这个全量同步我们用哪一个工具来做呢?啊,那之前有同学说了,那全量同步我们刚做完呀,来用这个叉来做呗。
40:08
对叉,我们把历史数据全量的同步一次啊,那行不行呢?用data叉行不行呢?啊行,但是data叉它不是最好的选择啊,为什么它不是最好的选择呢?我们分析一下啊,Date差它同步之后到HDFS的数据格式是什么样的呀?是不是以一个特定字符分割的这么一个文件呢?哎,我们用的是反斜杠T啊,那每一个字段和字段之间我们用的是反斜杠T分割的好,那增量数据同步我们是用max啊,那Maxwell它是一个什么格式啊,它是一个zon呢?那既然是on的话啊,我用两个数据来做这个增量同步,是不是数据的格式它不一致啊,那虽然数据格式不一致,我们是不是可以给它转化一下啊,我们依然可以进行统计,那能做,但是这样呢它不好,那用谁来做比较好呢?怎这个首日的全量同步啊,哎,我们在学Maxwell的时候,是不是学习过一个max的boot功能啊,那max的boot功能它就可以来做全量,那来呗,那接下来我们就用macel的boot功能来做首日的全量同步啊,这个脚本呢,我给它拿出来。
41:30
啊。拿出来复制看TRC来到这,那过来看一看吧,啊,它是一个SS脚本啊,井号叹号定等match,那SL脚本我们先来看case选择器啊,那这个地方它接收的是一个一个的表明啊啊第一个参数啊,找到了这表明之后啊,我们调用了一个importt date方法,然后把这个表明传上来,那来吧,我们主要就看看这个importt data这函数它是怎么做的啊在import data这里啊,它就调用了Maxwell加目录下面啊你看我们在这定义了Maxwell的加目录下面的变目录下的不啊Maxwell boot这个脚本,那这个脚本呢,就是用来做全量同步的啊然后呢,杠杠database指定一个库名,杠杠table指定一个表名,那这个表名就是我们在下面传递过来的啊啊完事,杠杠con,我们指定一个ma。
42:33
大的,哎,配置啊,那通过这个我们就可以做一次全量同步了,那我们有很多个表啊,要做这个手热的全量,那怎么办呢?我们在下面又给了一个参数二把我们想要做增量同步这些表,哎,通通的给你做一次全量同步,OK,好,那这个脚本呢,我们给它拿到我们的集群当中啊,CTRLC拿过来,拿过来之后来到好都L02啊,我CD到加目录的闭目录,那进来之后我vim一个my circle。
43:10
下线啊,图下角线HDFS下角线负。三条线in腻点SH啊,初始化就是首日的全量同步,那进来之后我给他粘贴过来啊,粘贴完事冒号WQ保存,那完事之后啊,我给他一个权限啊,这个复制。Ch ch mod777有碱粘贴好,那完事之后我们呢测试一下吧,嗯,那测试它之前呢,我们啊先来到HDFS上,我呢先把这个origin date啊给它删掉啊,我们呢是为了演示这个结果啊,我先给它删掉,那如果说它又重新生成了一个origin date目录,那就证明我们这个首日全量就过来了呗,好,那给它删掉之后,我们再看看我们集群的数据通道XCLGPS啊首先我们这个是买circleq的数据啊,我们通过Maxwell给读出来,读出来之后我们通过哈杜OP104上的一个flu来再给它采集到HDFS,那哈杜OP104上的这个flu,我们用的是f3.sh,那我呢给它启动一下,好,那启动完了之后我们呢。
44:40
开始啊,启动我们的这个脚本了啊,我们刚刚的这个脚本把它启动一下。复制右键粘贴,那粘贴之后我呀,给他一个奥参数,给他一个奥参数,它就是代表全部启动啊好OK回车。
45:00
啊,第一个第二个啊,第三个完事了,哎,第四个好,那接下来我们来到HDFS上看看它有没有生成这个目录呢,我刷新一下,哎,你看date已经生成了,然后gmail DB下面的啊INC嘛,就是增量表啊,增量表啊,它生成的我的首日的一个全量同步,那它呀,就会把我们当前所要增量同步的这些表啊,全都给我们全量的同步一次啊首日全量同步一次,好,那首日全量同步它没有问题了啊,用Maxwell可以做,那同学问了,那这个Maxwell它既能做增量同步,也能做全量同步,那我们还为什么要用date差来做全量同步呢?哎,这地方我们就要分析了啊,Maxwell啊,它虽然能做增量又能做全量,但是它擅长的不是全量啊,那全量同步啊,它不擅长那全量同步我们为什么要用data叉来做呢?还记不记得我们学data叉的时候,它支持一些流控的功能啊,并且它支持更多的数据源啊,同时呢,他也支持并发的读写,那这叉呢,他在做全量同步的时候,他要更擅长一些啊,那macel呢,他就不太擅长全量同步啊,所以说这个地方我们只是用macel首日执行一次,在增量同步买SQL表之前,我首日的全量同步一次就完事了,那所以说呢,它也不会耗费什么很多的性能啊,那这个叉呢,我们是每天都要执行的,那如果说我们每天都用Maxwell来执行全量表的话,那么这个性能可能就要比较低了,OK,那这个地方我暂停一下。
我来说两句