00:00
那数据采集通道我们知道了啊,得用一个flow把卡夫卡的数据啊给我们发送到HDFS,那好了问题来了,那这个flu我们应该如何来选型呢?我选择什么样的source,什么样的channel,以及什么样的S。啊啊,想想用什么来合适呢?好,那我们打开哎,这个画图板。诶,那我们之前呢,是不是学过这么三个组件啊,啊,学过这么一个卡卡China能啊,那我们上游的flow用的是这个结构来做的,诶是不是这样他就已经把数据发送到卡布卡了呀。那好了,那下游的我们是不是要从卡夫卡当中读数据呀,发送到HDFS,哎,也就是用HD FS sit写出去,那上面这个过程是不是就刚刚好,你看这个卡卡,它就可以直接把卡普卡的数据读出来,发送到哪儿呢?
01:07
发送到我们的这个,嗯,这叫什么了。发送到我们的HDFS网,好,那我们这在传统的结构上是不是还有一个这么个结构啊,哎,我们把它拿上来啊,把它拿上,那这个应该是什么呢?啊,给它改一下啊,哎,除了这个之外,除了卡瓦柴,我们是不是还可以用卡普卡S卡法s file才能,然后HDFS性啊,这是HDFSHDFS性,然后写出到HDFS上,这地方就不画了,那这个从哪读数据啊,是不是从卡法当中读数据啊,好了,那既然是从卡法当中读数据,我们看看啊,他俩用谁好呢?左边这个电路。卡巴,从卡巴当中读数去。
02:00
一个啊,这么一个数据传输,然后两个最终写出去,它需要三步吧,它需要传输三步,那我们再看看右边这个呢。好,右边这个从卡帕读数据一步。写入到file才两步。然后我再写到hdfs think3步,Hdfs think,再把数据写到HDFS,用了四步。啊,那现在我们就能知道在数据传输上谁更有优势啊。是不是我直接用卡夫卡拆它更有优势?那同学说,哎,那没问题啊,那我们就选择它呗,啊,他在传输上没问题,但是我们还要考虑这么一个事啊,我们刚刚分析了这个数据采集通道,我们要通过一个时间来进行落盘。那么这个时间它是在哪获取的数据啊,哎,它是在我们的hier头当中获取的数据,那么这个时候啊,我们呢,就需要保留hier头,那我们看看,那既然我们分析哎,它在数据传输上有优势,那它能不能用我们的hier头呢?
03:17
在上游的时候,我们是不是设置了这么一个参数啊,False意思是我们不以from方式来进行存储。啊,那卡卡channel写入到卡普卡的数据是不是就只有一个body了,他肯定是没有了。他肯定是没有handle的,那好了,那我们的这个这叫什么呢?你看啊,那如果说他再从卡夫卡读数据的话。哎,他会不会把上游的hier拿过来呀?啊,他会不会把上游的这hier拿过来呢?他肯定不会把上游的hier拿过来了。哎,而是呢,在卡瓦菜诺这,它封装了一个自己的header header加上body,哎,Header加body它是这么干的,然后发送到h d f s think,最终写出去,那好,那现在有这么一个问题啊,你说我们上游它我们想设置一个时间,那能设置吗?是不是设置不了,那同学说那你上游设置不了了,那我在下游设置呗。
04:21
啊,我在这个地方,我给你设置一个拦截器行不行呢?啊,我自定义一个拦截器,不好意思。它不支持,为什么?因为我们自定义的拦截器只有在SS的情况下,有SS的情况下,SOC有S的情况下,它才能设置到,我现在没有S,那么自定义拦截器你设置不了,设置完了啊,他报错,你想给它放到China,这不行他报错。那么很显然这个结构啊,我们呢不能用,这个结构是不能用的啊,因为它无法设置自定义的拦截器,那这个结构我想用,我什么时候可以用呢?当上游的这个卡普卡参考,哎,这个参数改成处的时候,那他就可以用了。
05:13
为什么说把它改成处它就可以用呢?因为把它改成处,我们是不是可以在这个S这设置拦截器啊。而且写入到这的时候,是以hier加上body的方式来进行写入的,然后下游这个flu啊,这个卡普菜,他把卡普卡的数据读出来,它会这么干,你看啊,卡普菜它依然会给我们封装成一个hier和一个body啊,卡把China它依然会封装一个hier加body,那它怎么封装呢?那我这个地方写进来的时候,它已经是一个hier加body了啊,已经是一个hea一样hier。加上一个包围了。那我下游这个卡瓦参它如何封装呢?
06:02
啊,下游这卡瓦参它如何封装啊,它是这么干的啊,如果说我们这个里边是hier加body的形式存储,那么它会把我这个hier。给我解析出来,放到我这个event当中的这里。然后把body解析出来啊,放到body的这个地方,哎,它是这么封装的,那如果说我们这个地方设置的是false的话,它这是不是就只有一个body了,哎,就只有一个body了,那只有一个body的情况下,它是怎么封装的呢?是这样,他依然要封装一个event,只不过呀,在这个event里,Hier是他自己封装的啊,那body呢,给他拿回来。啊,是这样,所以说我们把它设置成处,这两个结构,它俩可以搭配使用啊,把它设置成醋,它俩才可以搭配使用啊,这就设置成处,那如果说设置成false,它俩搭配的时候,你没有自定义拦截器,那行,那你有自定义拦截器,那么不好意思,你用不了,所以说对于我们现在的这个情况,它是不是就用不了啊,我们现在设置的是false,而且我们还需要拦截器,那怎么办?我们只能用这个结构,那拦截器在哪写呢?我可以给它写在这儿。
07:30
OK,那么接下来我们就要用这个flu啊,注意啊,这个地方不是写入到卡普卡了啊,是写到HDFS,我这没有改啊,啊你们就知道是这么回事,那接下来我们就要实现这个flu用什么组件啊,卡夫卡S,从卡卡读数据,然后发最终啊h d think。我们把数据发送出去啊,实现这么一个路啊,来吧,我看看啊文档。
08:00
啊,文档这。的选型卡夫卡source,然后呢,哎,订阅哪1TOPIC啊,这个topic我们定义的是定阅的是topic_log啊,用户行为数据吗?哎,Topic log,然后我们这需要一个自定义的拦截器,叫做timemp intercept,那这个time intercept它具体有什么用,稍后我们再说啊啊啊,然后接下来HD think。我们再把数据写出去啊,写到HDFS,那中间呢用file channel,那用file channel还是channel跟什么相关呢?哎,跟这个数据的重要性小哥相关,那如果说我们这个数据是跟钱相关的,那必须用file才能,那如果说这个数据它就是一个普通的日志,我丢个百十来条啊,丢个三条五条啊,没有没所谓,诶那我们呢,就可以用麦采总啊好,那我们在这儿就用反参照来做,OK,那这个地方我暂停一下,那配置flow,第一个事儿我先给他一个圆啊。
09:07
配置flow,第一个是井号啊,然后叹号。啊,不是井号叫什么了,定义组件第二个配置source,配置source啊,第三个配置China,第四个配置think,第五个。组装,哎,这回south China think它是不是就都有了呀?OK啊好,那我们首先先把组件给它定义上。啊,依然是组件的名字,A1.1个source等于一个RAE点一个猜是等于一个C1啊,A1.1个think是等于一个。
10:12
K,好,那来吧,我们先配置SS,那配置S怎么配置呢?我们呢把的官网打开啊。Blue来到document啊,用户指南CTRLF,找到谁啊,找到我们的。第一个S叫做卡夫卡s soce好过来诶找到了,那卡巴S我们需要都配置哪些呢?啊先看谁啊,先看黑色字体啊,那首先它配置了一个拆斯啊,那来吧,配置拆斯肯定是配置到组装这啊啊A1.s,然后点一个R1啊R点一个,那它往哪一个channel发送数据呢?往C1这发送数据啊好再来第二个配置谁呀,配置一个type,那它的类型呢?哎,搁这儿呢,卡夫卡S的一个权利名啊。
11:18
TY p type等于一个卡夫卡萨斯的全利名啊,再来第二参数卡普卡的不达stop s啊,我要往卡普卡从卡卡读出去,我得连接到我的卡瓦集群呢啊卡图卡102冒号9092啊啊卡多卡103冒号9092。OK,再来。那我要从卡夫卡读数据,我得知道我要从哪一个topic读数据吧,好,那接下来呢,我们在这儿还要设置一个topic,你看这有两个,一个叫做topic,一个叫做topics,什么意思呢?我们可以设置一个正则啊,正则你比方说我们有这个topic toic topic-DB,还有topic-log啊,那其实啊,我们就可以配置一个topic杠芯,哎,那通过正则的方式,它就可以匹配到我这两个topic啊,但是呢,我们不行,这两个我们一个一个来啊,我们一个一个来,哎。
12:31
嗯,把它拿回来,TOPIC4CTRLC,那既然是TOPIC4的话,它啥意思啊,哎,其实可以从多个topic读数据啊,那我们在这呢,就从这个一个topic读数据啊toic topic_log那如果说想配置多个topic怎么办?来逗号啊topic啊,其他的啊,OK诶,那我们只有一个topic。
13:02
这两个呢,选择一个就可以啊,这俩有一个就行,OK,那接下来你看这还有一个consumer group ID,我们可以配置一个消费者组的ID啊,注意那这个消费者组的ID呢,其实我建议建议大家去配置一个,哎,去配置一个。这个呀,消费者主的ID啊,建议呢,大家配置跟业务是一样的啊,那其实我们就把这个topic啊给他拿过来就完事了,你看配置成跟业务是一样的,那如果说你这个业务不一样啊,假设啊假设什么呢?我现在有两支业务。但是我都用默认的,我没有配置你这个消费者组的ID,那他们会默认的都用这个,哎,那消费者组ID都一样,什么意思啊,他俩是不是属于同一个消费者了。那巧了,他俩都消费topic,下回先唠,那会出现一个问题,你消费到了,你消费不到。
14:05
啊,我他俩都需要这一个topic啊,那但是他俩属于同一个组了,他消费到了,他还能消费到吗?不能了吧,啊,所以说消费者组的ID啊,建议大家设置成一个跟业务相关的唯一的啊这么一个ID好。再来,嗯,还有什么参数呢?诶,就是这么两个参数,By side啊,还有一个bit delicious millions,这两个什么意思呢?By size啊,我这个SOUTH1批发送多少数据,哎,这个得给他个前缀啊CR。这也得给一个by side1批发送多少数据,那这个地方给他设置多少合适呢?哎,其实我们给他生产环境当中啊,我们给他来个2000左右啊就可以了,好了,那如果说。我这个一批他迟迟的没有攒到,你看啊,我这一批你不是默认啊,我给你设置2000条吗?那我这2000条他迟迟的没有到。
15:10
啊,我这只有1000条,那么我这1000条我就不往下发送了吗?哎,不是的啊,那我应该怎么办呢?再有一个时间来控制在这呢,That duration millions,通过它好给他拿回来,拿回来之后把这个呢。抗日位啊粘贴回来,那这个给它设置多少合适呢?啊,毫秒值啊,啊在生产环境当中啊,这两个参数它俩是配合使用的啊,那怎么设置合适这个呀,就要根据我们的流量来设置了啊,就要根据流量来设置了,那你比方说啊。我们在生产环境当中,这流量啊,我们假设这2000条数据,那这时候我们就要看一看啊,这2000条数据它有多大,哎,那然后呢,我们再看看我多长时间大概能生成两千点,哎,能产生这2000条数据啊。
16:08
哎,这个要根据一个时间来进行配置,你这个时间你给他设置的太大太小都不好,你看你给它设置的太小,哎,我给设置成一毫秒,那我说是来一条就写,来一条就写,你这个2000根本就没生效,但是那我说我把你设置大点呗,我给你设置成一天。那你设置成一天,那我还是会有一部分数据卡在这儿啊,我就只有1000条,那么这1000条数据它会等一天才发送出去啊,那对于数据的时效性还不好,这个参数你给他设置成太大了,数据的时效性不好,设置的太小,哎,那它呢,这个批次啊还太少,我们这个be size不生效啊,啊不太好,那怎么办呢?设置成一个合理的值,这个合理的值怎么设置,假设啊,我们在这个地方设置成2000条,那我们就要看一看了,我们多长时间会生成2000条的数据呢?那如果说我们五秒钟啊,5000毫秒生成2000条的数据,那我们就可以给它在这儿设置成五秒,哎,五秒左右,哎,OK啊,这动态的设置条,好,那我在这呢,为了我们项目的演示,我给它设置成1000毫秒啊,也就是一秒钟的时间呗。
17:23
OK,看看啊,还有什么参数。嗯,往下往下。嗯,应该还有一个参数啊。啊,好没了,行,那这么多。嗯,OK,那接下来我们配置拆,那拆我们用的是什么呀。ae.t是点一个C1啊,T的TYT,我们用的是file channel吧,好,那我们搜索一下file channel file fileel channel,那file channel我们看看啊,首先type类型,哎,就是file啊,没问题,那接下来你看这还有一个什么呢?Checkpoint DR。
18:16
还记不记得我们说发要拆呢,哎,我们有这么一个索引的机制啊,是维护在内存的,那这个内存挂了怎么办呢?它呀会把这个索引给我们备份到内存当中,那如果说这个,哎,那这个备份备份的位置就是这个目录这个胖点。那如果说我觉得你这个备份它也不安全,它也有可能挂,怎么办?我们可以再来一个二次备份,好,那这个是一次备份,那二次备份呢,默认是关闭的,哎,这一次备份默认它就是开启的,我们必须得配置啊,那二次备份呢,它默认是关闭的,那如果说我们给它改成错了。这这个就是我们backup你看啊,备份我们的take point电啊,再来一个二次备份,那这个目录对应的我们就要给它填上啊,啊,那它给它拿回来CTRLCCTRLB。
19:11
好,马老师,那这个目录我来到文档当中来去粘贴一下。们的配置。嗯。哪呢,这儿呢。诶,垂个判的点这个CTRLC好放到这,那加油,那这个参数呢,我也拿过来啊。CTRCCTRV这个参数啊,它默认的是false啊,那我们呢,就给它设置成false,就告诉大家有这么一个参数,那如果说给他开启张处,我们还要配置一个backup take point DR,好啊,再有再往下看。Date d,这是什么意思呢?还记不记得我们讲哈杜的时候,有一个磁盘的多目录存储啊,那么这个D啊,这个date d是的多目录存储啊,我们把数据存储在一个服务器的多个磁盘上啊,哎,但我们现在只有一个磁盘啊,我们呢,只能哎配置一个磁盘啊。
20:25
哎,跟呢,跟跟这个哈,它是一个意思啊,Take点。存储在这数据目录。OK。啊啊,这配置下完事,OK,那再来你看啊,还有两个参数。Max什么意思呢?我们file它这个文件呢,哎,它是写入到文件的呀,那我这个文件你最大,我能容纳你写多大呢?哎,是两个G啊,这是一个默认的参数,这是默认的参数。
21:06
看C拿过来。等于个谁啊,默认是它啊,两个GOK,嗯,我们再往下看啊,还有参数叫做keep alive,看看它。还有一个cap city。拿过来啊,那这是什么意思呢?Type我们之前说过,它是file channel的一个条数的限制,默认呢是100万条啊,这是100万条,OK,那这cable na是什么意思呢?这个参数我们没讲过,我们看一看。Key它的意思啊是你看啊,我们这一波source要往China写数据吧,哎,往China写数据,我这一次PUT15,那如果说我这个里边已经达到100万了啊,满了,那满了之后我这一次PUT1万条,我是不是put不进去啊。
22:16
那破不进去,我这一次是不是就要回滚了啊,他就回滚了,那回滚了之后怎么办?我重新从文件或者是从卡夫卡再重新读一次,那这种是不是有点浪费性能啊,你看并不是说我这个flu挂了,我导致写不进去,而是我这边没有内存啊,没有地方写了,那你说有没有这么一种可能啊,我在这等你一会。我等你。这个think think,我等你think消费一会儿,那有可能哎,就给我余出来这1万条空间了,那我这一次PUT45,我等你一会儿之后,我再来一次,是不是就能哎写进去了呀,OK,那么keep alive它的作用就是等你一会,哎等你一会之后呢,我再put它一次,哎,再次put它一次,那么这次它就能写进去了啊。
23:11
那如果说等一会儿之后呢,他再写进去,哎,那他依然会回滚掉啊OK。好,那这是我们的file channel啊,File channel,那接下来我们看think吧,哎,Think,那think呢,这个地方我就不自己配了,我呀直接把文档当中的。哎,直接给他拿过来就完事了啊h DFS think,我们看一看。CTRLC拿过来,还有这个组装啊,组装呢,我给你拿过来吧,OK。好,那我们看看think think的类型,Hdfs think,然后呢,你看pass pass怎么写的,Recent gmail log啊,Topic-log,百分号Y100分200M,百分号D啊OK,然后接下来这有一个什么呢?文件的前缀啊,是log点,哎,然后呢,Round是什么意思呢?哎,这个参数我们是不是没讲过呀,我们看看啊,Round是什么意思呢?它这个给他一个放啊来到文档当中。
24:20
嗯,搜索一下啊HDFS。好,往下翻啊,你看这有这么一个参数叫做run,默认是false,呃,我们的这个你看啊,文件落盘,我们在这儿可以来一个百分2Y,百分2M,百分2D,那如果说我再有一个需求,我再来一个百分2H,是不是也没问题啊,这是一个小时,哎,我基于小时来进行落盘,那如果说我想基于六个小时或者12小时来落盘怎么办呢?那我在这儿来一个乘六,哎,能不能这么做呢?它不行,它不支持,那如何能支持这个其他单位的这个时间落盘呢?啊,那round的我们就要给它变成醋了。
25:12
Round变成处之后,下面这两个就生效了啊,一个叫做round unit什么意思呢?单位啊,那单位有这么几个参数。Second秒,Minute分钟,这是R啊,那这次呢,所对应的值啊,那如果说我们想进行六个小时一个落盘的话,那这个参数我们就要给它改成hour ho u r,好,Hour,那这个参数呢,我们给它改成六,那么接下来小时这个地方,它就可以基于六个小时哎,来进行一个落盘了啊,OK。好,那这个地方啊,这是round,那我们在这儿呢,不需要,我们只需要按天来进行落盘就完事了啊,所以说这个地方呢,我们来写到这儿就完事了,那只是我们可以支持一个,哎,其他力度的这么一个配置啊,在这儿呢,跟大家说一声好,那接下来诶,这三个参数啊,那他仨在项目当中有什么作用呢?我呀先给它注释掉。
26:11
啊,我在这先给它注释掉啊,稍后遇到问题我们再说,那在south这呢,我们来到文档上啊,文档上。哎,文档上呢,SS这还有一个拦截器,哎还有个拦截器,这个拦截器呢,我们也给它拿回来,哎,那拿过来我们拦截系数还没写呀,我也给它先注释掉,哎先注释掉,我代表它现在没有啊,那我们现在到这就配置完了,那稍后呢,等我们遇到问题的时候,哎,我们就知道拦截器以及这三个参数它的一个作用了,OK,那的配置啊,我们就先配置到这啊这个写错了啊,是9092好。好,那这个地方呢,我先暂停一下啊。好,那呢,我们刚刚配置完了,那接下来我们测试一下,哎,这个它能不能用呢?啊,数据通道能不能打通啊,你看那我们这个flu呢,首先啊,这有一个拦截器,我们先给它租掉了,然后下边think这啊还有三个参数我们也给它租掉了啊,稍后我们再看看这三个啊,这几个参数它都有什么用呢?那我们这个弗鲁姆,它是把我们卡夫卡的数据读出来,给它写到哪啊,来给我们写到HDFS这个路径啊,你看路径的名字log叫做topic下游线log,那如果说我们稍后还会同步这个。
27:36
业务数据啊,这个topic的名字叫做topic下划线啊,DB啊,OK,那这个我们怎么办呢?啊,给它拿过来CTRLC,那这个flu我们拿到哪呢?啊,102103上是不是都有一个了,我们给它拿到104上啊,啊,我们来到104的op pd model,那进来之后来到赵录进来,进来之后我们先把之前的这个删掉啊,删掉删掉之后我们VI'm一个什么呢?哎,现在这个业务是注意啊,这个名字一定要带业务的啊,尽量是要带这个业务含义的,不然你不知道它是什么,卡不卡下划线to谁啊,To HD FS,那我叫to h d fs.com就行了吗?哎,还不行,我们现在只是业务啊,我们现在是用户行为数据的这个同步啊,那我们稍后是不是还有业务数据啊,那我们现在叫做卡不卡to h DS下。
28:37
发现杠log,那稍后我们再同步业务数据的时候,叫做下划线DB啊进来进来之后我在这。啊,右键粘贴OK,粘贴完了冒号WQ保存,那保存完了,我是不是要启动我这个啊,那在启动之前它是要往哪写啊,往HDFS写啊,往HDFS写,我看看啊,我的HDFS还没有启动啊,我先启动一下HDHDp.SSHTT啊启动那它启动着我们呢,把这个也给它启动了啊来到文档当中,在文档当中呢,我们的用户行为数据同步,哎,这有一个测试啊,测试这呢。
29:26
就给我们提供了一个启动命令,拿过来,CTRLC好。拿到我的1104在这CD点点啊,然后右键暂停回事,OK,你看它现在就正在运行了,那它正在运行了,我们,哎,这个哈多应该已经启动完了,那启动完了我们接下来看一看啊,来到这卡多吧,1029092进来,那进来之后来到这啊,看看我们的数据,那现在只有两个,一个是系统的,一个是临时的,那我们怎么能看看这个flu生没生效呢?我们是不是有一个Lg.SH啊,我一执行Lg.SH数据是不是就会写到这个日志文件当中啊,写到日志文件,哎,然后呢,我们还应该通过一个flu帮我们采集到卡夫卡吧,Kaf KA对吧,这有一个flu,同时我们刚刚在104上启动一个。
30:34
他帮我们给它采集到HDFS,那现在我们还需要这么一个事,哎,把这个flow也给它启动起来啊,那这个flow我们是不是有一个脚本叫做F1加SH,好,ST,记住。然后GPS。啊,已经启动起来了,启动完之后我执行一个Lg.SH我看看对应的哪啊对应的这个HDFS上,它会不会写进来数据。
31:07
啊,看他会不会写进来数据,好,那稍等一会啊。稍等一会。发行,哎,你看a date。进没有log log下面topic_log进来,进来之后2022,哎,5月18号,诶是不是刚好今天呢?哎,然后我们再进来。这是什么呀?你看对应的这个文件的数据是不是就进来了啊,你看我们点进来一个看看,哎,OK啊没有问题,好啊,这个其实也可以看得到啊,这看不到啊,十二进制行不管它OK,那么数据现在写进来了,哎,那证明我们整个的这么一个数据通道啊,再来这图看一看。这个数据通道是不是就完全打通了,一直到我们的HDFS,哎,整个的这个数据通道啊,就完全打通了,OK,那这个地方我暂停一下,数据通道打通了,你们有没有发现点什么问题啊啊第一个问题。
32:15
你看啊。呃,这叫什么呢?你看我们这每一个文件,它是多大呀,文件的尺寸。这多大,你看200多个兆啊,你看最大的才是六啊,不是还还不是兆呢,是B呀,600多个币,这是最大的,最小的呢,是280B啊,那我们这每一个块大小是多大呀,是128兆,那这个叫什么?这妥妥的小文件了啊,这妥妥的是小文件了啊,那这个问题我们应该怎么解决呢?那首先我们要知道每一个文件我们尽量让它控制在多大呢?是不是让它跟块大小是一致的呀,哎,128兆啊,尽量控制在跟块大小是一致的,哎,这样是比较好呢,那这个问题我们应该如何来解决呢?啊啊,那说到小文件,我们要知道小文件有哪些危害啊。
33:15
啊,小文件的危害,我们在计算map任务的时候,每一个小文件它是不是每一个文件吧,先说每一个文件它是不是都会对应的生成一个task task好了,那我现在有多少个小文件呢?啊,有这么多,每一个它都会生成一个m task,那假设我现在这里有多少个,我看看啊,这一页是呃,25个。啊,这一页是25个吧,啊一到25,那有多少页呢。这是有17页啊,25乘以,假设是20吧,啊,二五一十,哎,500页,哎呀,这一共是500个,有500个小文件左右啊500个小文件,那我们对应的就有500个map task,然后每一个map task我们还要有一个内存啊,有内存的设定,那么这500个map task我们是不是会占用大量的内存呢?好,那这是第一个影响,那第二个影响呢?你看啊,呃,我们每一个数据块,这是我们的DNDNDN啊,这是NN。
34:28
啊,我们在DN上的每一个数据块啊,啊,每一个数据块它是不是都会去内部note进行一个上报啊,然后在name not上有一个存储,存储什么呢?存储DN的上的这个数据块了,原数据信息啊,每一个数据块它都要进行一个上报。啊,这些也都上报啊,好存储它这个数据块的原数据信息,那这个原数据信息是干嘛的呢?当我们想查询这个数据的时候,哎,我来到DN啊,找到我要查询的数据在哪一个节点上呢?哎,我找到了你在这儿,然后我来这儿查询,那这个在原数据上它占多大存储空间呢?
35:17
每一个这个数据块它占用150个字节啊,每一个数据块都占150个字节,那我们现在小文件是不是非常多呀,那是不是占用的这个字节数就非常多了,那它会影响我们name node的使用寿命啊,小文件它会影响我们内node的使用寿命,那这个是小文件的两个危害好了。那这是第一个问题啊,小文件,那接下来我们再来看看第二个问题。注意看这这个时间是什么时候呢。呃,5月28号啊,5月18号,2022年的5月18号,诶,那我们的数据是什么时候呢。
36:03
我们的数据啊,我们的数据还记不记得我们模拟的数据啊,Lg.SH这个回头看一看啊,CD杠,OPT model,哎,不是room long进来,进来之后我们的VM一个点YML啊,我们模拟的数据是2020年6月14号,那现在有这么一个事啊,2020年6月14号的数据我们落到哪一个文件夹了,落到2022年的5月18号的文件夹了啊,那有同学说你这个问题是什么时候产生呢?哎,它是我这个测试环境产生的呀,那如果说我模拟的是5月18号的数据,那是不是就会落到今天这个文件夹了呢?对,没有问题,它确实可以落到今天的这个文件夹,但是。
37:02
我们只现在看到的呢,只是大部分的数据,那你说有没有这么一种可能啊,啊,其中有一批有这么一小批数据,它依然会落到不是今天这个文件夹,我现在不是十五五月多少号啊,5月18号啊,5月18号的数据,你说他有没有可能落到5月19号呢。那这种问题出现了,你看啊,我们5月19号,我要计算5月18号的数据啊,然后我漏的过来,漏的过来之后,但有一部分写到这儿了,那这部分数据是不是没有计算来呀?好,那这个问题叫什么呢?在我们生产环境当中叫零点漂移问题,哎,叫数据漂移,那我们看看这个数据漂移问题是如何产生的啊,来到文档。Hundred。啊,在文档当中。啊,往下翻啊,有这么一个图,好,我们看一看,那我们看看数据漂移问题它是如何产生的啊,首先我们这个过程是要把文件当中的数据通过卡普卡啊写到卡普卡,然后呢,再通过这个,也就是我们刚刚写的这个卡普卡SHDFS的这个flu发送到HDFS,那我们现在要把6月14号的数据发送到哪呢?发送到HDFS6月14号这个文件夹吧,好。
38:26
那我们现在就有这么一条数据,是6月14号23:59:59的发送到卡夫卡,是不是6月14号23:59:59啊,然后我们这个flu读出来,那读出来。他是不是要把这条数据给封装成一个event呀,那event分为hier和body啊,那我们会把6月14号23:59:59这个信息给它放到哪呢?是不是给它放在body当中啊,好,那对应到我们这个hi里边,是不是我们可以给它存储一些东西啊啊好,你看啊。
39:09
我们这个数据啊,从文件当中过来啊,发送到卡不卡,哎,发送到这它得经过一段时间吧。那经过多久呢?我们假设它经过三秒。那经过三秒的话,这个time stamp时间戳它在封装。它就有可能经过多久啊。哎,经过三秒我再给你封装的话,那是不是很有可能啊,就变跨天了呀,你这个是6月14号嘛,他也跨天变成多少了,变成6月15号了。啊,他这么给你封装了,封装成6月15号了,那好了,那接下来我们这个地方啊,China发送给China了,哎,才过来,然后呢,发送给我们的这个h DFS think,那我们再看看啊,那h Dis think,我们这有一个基于动态的这个文件落盘呢,哎,Pass,它可以基于时间来落盘,那它基于谁来落盘呢?它就是基于T当中的这个time stamp时间出来落盘的。
40:14
那他怎么落的呢?我们看一看啊,它落盘的规则,你看是基于谁啊,它是基于handle当中来落盘的呀,那它会落到哪一个文件呢。他是不是会落到。2020年6月15号这个文件文件夹下面啊啊hdfs think默认是基于event当中的time stamp实现出来罗盘。所以说它会落到6月15号,那这种数据叫什么呢?这个就叫做。啊,数据的啊,零点漂移问题,好,那这个问题怎么解决呢?往下看。如何解决零点漂移问题?啊,如何解决,那还是啊这个过程我们要把这个6月14号23:59:59的这些数据发送到哪啊,发送到HDFS啊怎么解决呢?啊。
41:14
哎,我们可以在SS这增加一个拦截器呀,对吧,我增加一个拦截器,还记得我们学过拦截器吧,啊拦截器它是属于SS的,那拦截器我们做什么事呢。拦截器当中,我们可以把body获取出来,同时也可以把hier获取出来,然后干嘛呢?我这个数据当中是不是有一个时间呢,就是2020年6月十四二十三点五十九分59秒,然后我通过拦截器,你看啊,我这个还得当中之前是6月15号吧,那我通过拦截器把你6月15号的这个数据变成6月14号,哎,跟我数据的这个时间保持一致。
42:01
啊,那接下来我再发送到hdfs think的时候,那这个hier头当中的time stamp它变成谁了?它是不是就变成6月14号了,那接下来我再落盘呢。那再落盘,它是不是就会落到6月14号这个文件夹了,OK,那这个就是数据漂移问题的,哎,这么这么一个解决方案啊,OK,好。OK,那接下来我们就解决一下小文件以及数据漂移的问题,那小文件的问题怎么解决呢?哎。呃,HDFS落盘,那肯定是在HD FS think这个地方,那这我之前注掉了三个配置啊,啊可以告诉大家把这三个配置打开了,那么小文件的问题你们就解决了啊,把这些打开了你就解决了,那为什么这么神奇,把这三个配置打开就解决了呢?好,那这三个参数啊,我们再回顾一下啊,它都是什么意思,首先肉肉size肉com,那来到文档当中我们看一看呗。
43:12
嗯,文档在哪呢?呃,往下往下诶啊在这呢,肉size肉count,肉into,肉into是什么呢?啊,它是基于这个实际啊,它是基于这个秒数啊,来给我们进行一个滚动的啊,我多少秒滚动一个文件,那如果说给它设置成零,这个参数不生效啊,那这个呢,Roseet基于文件的大小来进行滚动,那这个单位是什么呢?Bet,那如果说给它设置成零,它不成效,这个呢,基于文件的条数啊,也就是even的数量来进行滚动,那我们这个even啊,多少条呢?啊,我进行滚动一次默认是十条,那如果说把它设置成零。
44:00
那么这个参数不生效,那好了,那这三个参数在生产环境当中我们怎么配置呢?看一看,首先我们看看肉塞子,诶,这个是不是我们最想要看到的呀,我们HDFS的块大小是128兆,那我是不是也希望你这一个文件是128兆啊,那这个怎么控制呢?你看我这刚好有一个文件大小进行滚动啊。哎,刚好有一个文件大小进行滚动啊,那把它设置成128兆就完事了呗,啊那刚刚好,那有同学说,哎,我把它设置成128,然后这两个呢,都是零,那就完了呗,我只让这一个参数生效,那是不是刚刚好啊,所有的文件肯定都是128兆,那好了,那我们想这么一个问题,那如果说我们把这个给它设置成128兆了,OK,那么接下来它是怎么滚动的呢?那他在写数据的时候,在这它会开启一个文件,那开启一个文件往里写,注意这个文件它并不是叫做什么log,一个什么什么点GZ啊,最开始它不是叫点GZ,它是叫点temp tmp临时文件,那临时文件有一个什么问题啊。
45:15
临时文件我们在想查询数据的时候,你读不到,哎,临时文件我们是读不到的,那么这个临时文件什么时候滚动呢?达到我们这三个参数了,肉size,肉into,肉count,那么这个临时文件就会变成了一个点GZ的文件了,好,那现在有这么一个事啊,我们只让Rose set生效,那也就是128兆呗,那我现在数据只有64兆啊,啊,只有64兆,未来的十年我都没有数据了,那都没有数据了,十年之内它都是临时文件,那都是临时文件,它是不是就不滚动了,那么这64兆的数据我们永远都不会读得到啊,所以说这个是我们最想要的,但是我光用它控制不行,它影响我们数据的时效性啊。
46:07
啊,时效性十年了我们还没读到,那怎么办呢?哎,我们呢光通过一个批次还不行,哎,我们呢还要通过时间来进行滚动。那也就是说,那我们已经啊很久了都没有达到我们这个128兆啊,那我们呢,再通过时间哎也给他来进行一个控制,那这个我们呢,在这给他设置的是十秒,哎在生产环境当中,那设置多少呢?这个哎生产环境当中这个肯定是设置128了,那它呢,啊,同样我们就要进行一个这个判断嘛,你看啊,我们是多长时间能生成128兆的数据呢?哎,这个就需要我们进行一个计算了,看看我们每天有多少有多少数据,然后我们再通过一个平均值来算算我多久能生成128兆的数据,假设我一分钟能生成128兆的数据,那么这个地方我们就给它设置成60秒左右,这样是最合理的,那上下呢,这个文件大小也不会相差多少啊,也不会差多少对吧,那如果说你给它设置成十秒,你一分钟128兆啊,然后你给它设置成十秒,它差了六倍呀,那它除以。
47:18
六就变成20多兆了,相对来说这个文件也不是很大啊,好,OK,那接下来我们再看这肉count这个参数,生产环境当中就给它设置成零,那我们为什么不基于条数滚动呢?原因是这样的啊,你看我们有的数据可能是1KB,那有的数据2KB,有的数据啊0.5KB,有的数据0.1KB,那好了,你说有没有这么一种可能啊,我在这给你设置成一个肉抗啊,我给你设置成1000条,然后这1000条数据都是0.1KB的啊,都是0.1KB的,本身呢,1000条1KB的数据,我们假设它就能达到128兆。
48:02
那么现在这1000条它都是0.1KB,相对于我这1KB来说少了多少啊,是不是少了十倍呀。少了十倍呀。那这样的话,通过roll count呀,为什么把它设置成零呢?就是因为用文件的条数它不太好控制啊这个啊文件的大小啊,通过文件的条数通过啊,不是通过这个,呃,通过这个数据的条数,它不太好控制我这个文件的大小,那么这个参数我倒不如不让你生效了啊,通过这两个啊,它就可以控制文件的滚动好了,那这个地方我们假设啊,在生产环境当中,我们是,呃,多久呢?我们是五分钟吧啊五分钟,然后才能滚动到128啊,这个是秒啊,它的单位是秒啊,我给你设成五分钟,那你说我们现在是学习,那这个参数我给你设置成五分钟吗?
49:00
我们首先啊,首先我们这一批数据肯定没有128兆,我们这一批数据也就是几十个KB,就几十KB,那如果说我在这设置五分钟,那我们就等吧,写完数据五分钟之后我们才能看到结果,哎,那肯定不行,那么这个参数啊,我们要想立即看到效果怎么办呢?我们给它设置短一点,设置成十秒钟嘛,对吧,现在我们是测试啊,哎,给它设置成十秒钟就OK了,好,那这三个参数呢,就是啊小文件的控制好,那这个地方我暂停一下。OK,那么小文件的问题解决了,我们还有一个问题啊,叫做数据漂移,那数据漂移的问题我们再回头看一下怎么解决了,哎,是通过拦截器啊,把数据的时间,哎,给它放到hier头当中啊来吧,那接下来我们又要实现一个拦截器了啊,再实现一个拦截器,那这个拦截器的名字我们叫做time STEM啊intercept好,嗯,来到我们的idea打开。
50:10
诶。打开好,打开之后我们呢,点开之前写拦截器的这个包啊,然后在这又建一个。Class啊,叫什么呢?Time stemmp time stamp intercept啊回事。OK,那创建完了之后,我们在这啊实现一个intercept c pq2好,那实现flu下面的这个intercept啊,不要搞错了,完事之后重写四个方法啊。那重写完了不要忘了啊,重写完之后立即把这个静态内部类给它写上,不然的话你们一定会忘,重写完之后直接就搞静态内部类,你别说哎,我写完之后我再给他补上,那你就忘了啊。
51:05
Public static class c as class build bus build,然后实现一个英特尔啊,实现一个。啊,Intercept里面的点builder.build。好,那完事之后给他一个括号啊,然后呢,这个里边重写两个方法,第一个build,我们要用一个我们自己的time STEM啊intercept啊,那这个呢,我们不用管啊,这是配置相关的,OK,那接下来我们就主要实现这个拦截器呗,啊两个方法,一个是单event,一个是多event,我们先搞单。那我们要知道这个逻辑是干嘛呢?我们要把body当中的数据拿出来放到hier当中嘛,好了,那第一个事。
52:06
我们是不是叫获取啊,获取he DR hier和body当中的数据吧,那都得获取出来,哎,都得获取出来才可以,那怎么获取呢?是不是这有一个even呀,用它我用它点一个啊,点一个谁呀,先get一个hier,你看我能把hier获取出来,那获取完之后我还可以用它点一个什么呢?Get body啊,获取出来,获取出来那body它现在是一个二进制的啊,字节数组啊,那怎么办呢?我给你来一个new string,通过这个new string,注意啊,New Java long下面的,哎,完了把body传进来,传进来之后我们呢,再来一个编码啊,Standard的叉,Set utf杠八,完事了,Log OK,那到这hi。
53:07
和body,我们是不是都获取出来了呀?那获取出来了之后我们应该怎么办呢?我们的这个log当中是不是有一个TS时间戳啊,TS时间戳还记得吗?看一看啊,这个log现在是我们的用户行为数据啊,来到用户行为数据当中看一看呗。用户行为数据的格式啊在哪呢,这呢。我们这个格式是一个Jason嘛,啊,它是一个Jason的格式,那Jason格式里面我们看看往下翻,往下翻,诶,这有一个TS啊时间戳啊,那跳入页面的时间是不是就代表我们当前这个。啊,用户的这么一个,这叫什么呀?哎,他的动作的这个时间吧,当前哎这个日志的时间了,也就是我们当前这个事件的时间吧,啊事件时间好,那我们是不是把这个数据给它放到我们的hi当中就完事了,OK,那第二步。
54:12
解析解析谁呢?解析这个body。DY啊,解析body的TS时间戳字段啊,解析吧,怎么解析我们这个log啊,不叫body吧,叫log吧啊,哎呀,解析log的TS时间头字段,那这个log它现在是一个什么类型啊,它现在是一个Jason的格式吧,它现在是一个JA森,那这个杰森我们怎么解析呢?啊,还记不记得我们导入了一个工具类啊,这个类叫做fast JA,它呀就可以解析我们的JA,好,那怎么做呢?我看看啊,那fast JA里边有这么一个类叫做摘森。
55:06
Object这一类我们昨天也用过啊,用过这里边的什么方法呀,有一个方法叫做price object,那它呀就可以把我们的这个。这叫什么呢?把我们的Jason啊,你看这个log,把我们的Jason Jason字符串转化成一个Jason object类型,那么这个JA object类型跟谁有点相似呢?它跟我们的map有点相似,它也是一个KV的结构,那我们想获取这个KV结构当中的谁呀?是不是想获取一个TS啊,那我们get成一个string啊,改成一个string,获取一个string类型的K啊,叫做TS,我们想把TS给它获取出来,好,那这样呢,它就能把TS获取出来,获取出来之后我们要干嘛呀?
56:01
是不是要给它放到hier当中呢?啊,第三步,第三步。把解析出来的TS啊,Set put到put到哪儿啊。Put到我们的her hier头当中,OK,那来吧,我们给它put进来,那put一个谁呀,那用谁put呢?用hi头put,把它put到哪啊,把它put到我们hi头当中的。什么字段?Time STEM time stepmp字段吗?头当中的time step,好,Time,注意这个地方不能错啊,你给它搞错了,你put进去之后也没有用啊,它呢,也无法是解决我们数据漂移的问题,OK,然后把TS给它拿回来,OK,那么你看看啊,接下来每一条数据经过这个方法是不是都可以把数据当中的ts put到我们的time stepmp字段呢?
57:18
OK,好,那最终这个地方我们给它返回一个event就完事了,OK,好了,单event完事了,那接下来我们看看多event,那多event怎么做呢?我们来一个list list点一个谁呀for啊,注意这个地方就不用迭代器了,它用for循环就可以了,为什么?因为我们没有三数据啊。啊,没有蕊木啊,我们没有蕊木,我们只是干嘛呢?我们只是在历史的集合当中,把所有的event拿出来,然后把每条数据进行一个这样的改造就完事了呗,哎,你看把每条数据干嘛呢?拿出来之后调用这个intercept方法,把谁传过去呀,把这个一问传过去,你看。
58:06
每一条数据都调用它啊,都调用一个这个方法完事了啊,那每条数据是不是都改造完了,改造完了之后,我把list的集合,哎,刚刚的这些list的集合当中所有的数据我再给你return回去,那么这个flu啊,拦截器啊,我们就写完了,OK,好,那写完了之后我得让它生效啊,那怎么生效啊,注意啊,这个你看啊,别忘了我们要在这右键copy这个builder的权利名,哎,在这copy。Copy reference,然后来到我们的的配置文件当中,哎,把它放开,把它放开啊,注意啊,这个权利名呢,我们自己重新写一下啊,万一不太一样呢,对吧,CTRL粘贴过来,粘贴过来之后在这个地方给它改成什么呢?给它改成一个哎,Dollar符号,好,哎,不是不是百分号是Dollar符,嗯,OK,那这个拦截器配置完了,然后呢,数据,哎,解决小文件的这个问题呢,这些参数我们呢,也已经配置完了,那接下来我们把这两个配置拿到我们的项目当中,哎,我们看看它到底生不生效呢,给他拿回来,拿回来之后啊,我们来到这儿啊。
59:23
哎,来到104啊,来到104啊,先把它停掉,诶,怎么这么小了。啊,停掉停掉之后我们vim一个照目录下面的卡夫卡啊to hdfslo进来,进来之后我们先把这三个参数给它放开,把这个给它放开,然后呢,我们再把flu这个来给它填上啊,我先给它删了,删了之后我右键粘贴我复制过来的,OK,那粘贴进来了,那完事冒号WQ冒号。WQ,保存啊,没错吧,没改别的东西吧。
60:04
啊,这组装组装了两个删掉OK好,然后冒号WQ保存,保存完了之后我们呢,把这个再次给它启动,那我们先GPS一下看看啊,先GPS下看看啊GPS。啊,这伏膜现在已经关闭状态了啊,关闭状态我们再给它启动起来,OK,那启动起来往上翻一翻,你看啊,它是是报错了,他报了一个什么错呢,他说我们这个class not found什么意思呢。我们刚刚配置了一个拦截器,但是这个拦截器包找不到,为什么找不到,因为我们这个炸包是不是还没有上传进来啊,哎,所以说这个炸包我们要打一个包啊,可。给他上传过来。Package。OK,好,这给他停掉。啊,停掉,停掉之后我们CD到力路。
61:03
好,那注意啊,我们这个力目录当中,我们先LS一下谁呢?From-intercept这个是不是有啊,那它有我们要先给它删掉,如果说你要是不给他删掉的话,那我们新上传过来的这个包啊,他的这个名字啊,他不会给他覆盖掉,我们新上传过来的这个包它不会覆盖掉,那它呢,会在后面加一个点零啊,如果说你不删掉的话。你上传过来没有用啊,必须要先给它删掉,我们要先把它RM掉啊,RM掉删掉之后我们,哎打完了,打完之后来到这。Open。In X explore好,完事把它上传到我们的项目当中,注意啊,一定要先把之前的删掉啊,不删掉你们可以尝试一下,它是不好使的,那完事之后我们CD点点再次启动一下我们的flu。
62:00
嗯。OK,那到这它现在就启动成功了,那启动成功了,那这么的,我们先把HDFS上的这个目录,我先给它删掉啊,删掉啊origin date删掉delete好删完了,删完了之后我们怎么办呢?来到这啊,来到102,来到这我们再次执行一下LG。点,我们看看这回小文件的问题和数据漂移的问题,它是否能解决呢?好,我们再次来到我们的HDFS刷新刷新啊,刷新还没过来啊,十秒,我们配置的是十秒钟啊。放心,诶,你看过来了,那过来之后我们看看见证奇迹的时刻,Log topic log,首先数据漂移的问题是不是解决了?所有的数据它都能落到2020年6月14号这个目录了,那再来。
63:05
诶,我们看看现在小文件的问题有没有解决呢?啊,表面上看。文件是不是没有那么多了,但有同学说那不对呀,你这还是几KB啊,那相对于我们块大小来说是不是也也小很多呀?哎,那这个问题是因为什么原因导致的呢?我们刚刚也说过,是因为我们在这儿设置的这个时间啊,我们仅仅设置了十秒,那在生产环境当中,你可以给它放的更大一点,OK,所以说我们现在为了这个学习嘛,看效果啊,所以说哎,它出现这个文件大小是非常正常的,OK,那么到这儿小文件还有数据漂移的问题,那我们也就解决完了啊好,那我暂停一下,那到这儿呢,我们的数据啊,哎,你们看是不是就已经发送到我们的HDFS了呀,啊,那发送我们是怎么发送呢?在哈杜104这我们是不是启动了一个进程了啊,启动了一个这么啊进程,那么这个进程我们要启动的话,是不是也要通过命令来到这个机器上启动啊,哎,那我们在这呢,也给。
64:11
给他写一个脚本啊,写一个脚本之后,通过脚本啊就可以启动了啊,不然启动这个命令你们也记不住,那怎么办?哎,我们通过脚本来做这个事儿,那这个脚本跟我们之前写这个f1.sh啊是一样的啊,那这个脚本呢,我们就不再重新编写了,我直接在文档当中啊,把这个脚本呢拿过来我们看一看啊。在哪儿呢?这呢啊弗起行脚本看一看CTRLC,诶拿过来来到这啊新建于CTRLV过来,OK,那我们看一看井号叹号并杠BI,这是一个cell脚本,然后呢,我们通过case选择器啊判断我们的第一个参数,我们脚本的第一个参数是start呢还是stop呢?那如果说是start的话,我们怎么办?哎,我们通过SSH啊SSH到哈104。
65:12
诶,这个只需要到104就完事了,因为我们的这个是部署在104上的啊,OK,我们在104上启动这个消费我们用户行为数据的IZ,那启动的命令找到加下面的哎,并录里边的杠NG,然后启动一个agent agent的名字杠N啊A1-C,找到count啊,杠F呢,找到我们要启动的哎,这个的job啊,卡普卡推DFS下线log.com,然后标准输入啊,标准输出追加到第一位now,然后标准错误输出呢,给它重定向到哪啊,重定向到标准输出啊,最终给一个后台运行,然后前面再来一个noha啊,给它阻塞到我们服务器的后台,那如果说是stop呢?
66:06
哎,我们依然SSH到哈104就完事了啊,这个地方呢,它就不需要放循环了,因为只有一个机器好,然后杠EF gra过滤出我们当前的哎这个的job啊,OK,然后通过gra-V,再来一个gra,把grape进程过滤掉,那么最终啊,它过滤出来一行数据,然后在一行数据当中,我们还要提炼出来一个进程号啊,通过aw k来进行一个切分啊,最终通过XX-N1啊反向穿参,然后通过Q把当前的这个进程,哎,前面的这个进程,哎,给我杀掉啊,OK,那我把这个配置拿过来CRC。拿过来之后到我们的哈杜102啊,然后CD到加下面的并录进来,我vim一个f2.sh在这右键粘贴,粘贴之后冒号WQ保存好了,那保存完之后啊,我给他一个权限ch mod777好F2回常。
67:17
好了,那完事之后我们看一看这个进程,现在有没有GPS呢?104上FK啊,它没有FK,那我们先看看f2.shstartstart先启动一下。启动完,我们再来看看GPS。哎,OK,你看这个进程它已经给我们启动起来了啊啊,现在是一个五位数的进程,OK,那接下来我们再给它停止一下s top啊,Stop,好,然后我们再来看看GPS,好,那这个进程呢,它就给我们停止掉了,OK,那么这个就是我们用户行为数据的这个,哎,下游的flu啊,这个这么一个启停脚本啊好,我们暂停一下。
我来说两句