00:00
好,接下来呢,我们来看一下时间出拦截器。那这个伏母时间轴连接器,它是解决什么问题的?还记得吗?他是不是来解决零点漂移的问题啊?就说它的核心思想是,比如说将8月7号所有产生的数据都放入到HTFS的一个目录里面去,然后呢,在8月8号的时候,统一对8月7号所有的数据进行一个处理,他不希望有任何一条数据写入到下一个文件夹里面去。那这样呢,你后续还需要对这个文件呢,进行一个整理。是这样一个背景。那为什么会产生说不对,他处理就会把数据写入到下一个文件夹呢?原因是因为这里面有一个HDFS。它这里面默认情况下,它会把数据啊按照什么呢?按照你Linux这个系统它的一个时间,比如说你在开幺零市场,它就会在按照104的本地时间往外去写。
01:01
那你数据已经传到这儿的时候,它已经变成8月8号了,那他就会将这条日志写入到8月8号里面。好啊,那我说之前是会产生这种情况,那他到底会不会产生这种情况呢?我们来看一下对应的官网哈。看一下A,然后往下找走哪呢找这块。哎。看一下这里面的值啊,说这个这里面有一个a hier with the key,那header里面这个key呢,就是这个time。比如说时间戳啥意思啊,这是整个文件头,这是保底。Body里面数据,然后头里面有一个谁呢?有一个time STEM有个它这变量,那这个变量的值取谁呢?哎,这个value我们希望呢,它是按照日志的时间,哎,日志产生的时间进行一个替换,这样吗?哎,那不替换的情况下,那么什么情况,不替换的情况,Unless这个值will be set to,比如说它会被设置成true,那这个值什么意思呢?说use local times time。
02:07
按照本地时间去获取时间。哎,是这个意思啊,比如说你这个值如果不设置的话,你就得需要将这个值设成除。好吧,嗯,那你看一下默认这个值是多少啊,往下找。知道这地方。默认这个值呢是false,比如说用本地时间是false。但是呢,你如果不用这个本地时间,你就得需要给上面谁呢这个地方。就得给这个值进行设置,在哪里设置呢?你可以定义一个时间戳拦截器,在拦截器里面将这个值进行一个修改。啊,这么几种方式好吧。这样呢,在下面呢,我们就来写一下对应的这个拦截器。首先我们来看来到这里面,哎,在这个com硅谷啊intercept包下呢,创建这么一个啊类的一个名称,比如说第一个类。
03:08
到这里。右键。又一个class。啊,这个类就有了啊,然后第一个类之后实现intercept接口啊in t,那这里面这个包呢,不要搞错了哈,导第二个包下的interceptor啊al加灰车。实现四个方法,分别是初始化关闭单event和对应的多event处理。那接下来我们这里面呢,这个初始化和对应的关闭呢,暂时不用处理,只需要来到这个单一文的里面来进行处理,那在这里面干什么事呢。我们要做的事啊,哎,也就说将这条日志拦下来。将日志来下。拦下之后干什么事儿?首先取出对应的里面的。
04:09
是吧,哎,从这里面取出那个K叫time time好,那接下来,那还接下来取出什么呢,取出。Body。里面对应的时间。日志事件。那这个日志时间是什么含义?其实我们要获取的就是在这条日志当中。买点日志格当中,你看这是一条完整的日志对吧?啊,有common,有action,有place往下走,在它最后这块是不是有一个,比如说这条日志生成之后的一个时间是它吧,哎,是有个它的。我们现在呢,就想把这个TS这个值给它取出来。按取出值,然后接下来第三步。将。TS的值。
05:02
复制给谁呀?赋值给对应的这个hi里面这个key。哎,那这个K呢,其实就是他。好吧,来,把它拿过来就给到他。就干这么一件事,那行,那下面我们来看一下,哎,怎么来处理哈。首先第一件事儿,获取还得头。怎么获取啊,那你这里面是不是有这个对应的这个event呀,哎,那就拿它去获取。因为点get。德丝点吧。OK,那现在呢,我们就拿到了对应的这个头信息,那接下来第二步。第二步,获取body。中的TS。你说这个日志对应的时间是这样吗?哎,那怎么获取呢?这样吧。
06:00
even.get对应的这个body,诶点。这样呢,就拿到了这个Bo底知道吗?哎,好,接下来啊,我们将这个字节数组哎转换成对应的使菌,方便我们后续的一个处理哈,另外一个。使劲,那这里面这个包啊,注意一下,导下面这个浪包下,然后将这个包底传进来,那下边呢,我们要控制一下它对应的编码方式,让它呢,哎统一成UTF8,那后面加上哎S。TN啊,UTF8OK。那反过来呢,就是对应的这个log日志。行,那这条日志拿到了,那你现在是实质类型,我如何取出来对应的这个TS呢?那大家思考的问题,你回过头来看一下,这是这条日志当中这个TS,其实啊,本身这条日志它是不是就是一个接身对象嘛,这不就是一个接对象吗。那如果说我把这个string变成接分对象,那我传进来对应的K,那对应的Y流值是不是就取出来了。
07:07
就这么简单啊,那本身它确实是一个JA森,那我就把它变成一个杰森,通过谁呢?我们这里面是不是引入了一个fast杰森这个框架啊。知道吧,那他这里面就有帮我们处理对应的API冲突,那怎么办。接着。健身。我不在乎。阿里巴巴啊,fast.pass object,然后呢,我们在里面将这个log日志传进来。点吧。吧,哎,那就是解析出来的先分对象。那拿到这个对象,我们点get。是的。传进去对应的这个K值,那比如说这个K啊,是对应的这个TS,因为就是它吗。然后就能取出来对应的这个Y流值啊。好,取出来了对应的这个值,那取出来值之后,那接下来我们开始进行第三步操作。
08:05
将。负值。给对应的这个时间窗,那也就是说这个time step。知道吧,哎这样一个操作,那怎么操作呢,我们拿到这个hi德斯,比如说是他。然后。点put。Put谁进来呢?Put这个key,那这个key就是他。放进去。然后对应的Y流值,Y流值就是我们这个。这样就OK了,你看简单吧。这就是这个获取header,获取body,然后已赋值,那后面这个呢,你这个进来的是谁,那我返回的就是谁,那这样呢就能保证对应的工作,比如说单一文我们就已经写完了。那下面呢,我们再来处理一下这个多event,那这个呢就更简单了。首先呢,这是一个集合对吧,啊进来进来之后我还是循环便利。
09:03
Yeah。好吧,哎,循环便利电力之后,我是不是要一个一个的调用这个单event呀,哎,我希望这个单event。你每进来的一个,我都进行一个循环便利。然后取出来对应的这个hier以及body,然后将hier里面的这个time STEM替换成是这样一个操作吧,这样操作。行,那正常情况下呢,其实你就把这个例子集合再一返回就行了,我其实就是变了一遍,根本就没改变里面的值,哎,直接返回就可以了。好,那这是这块,那下面呢,我们还需要哎创建一个builder是这样吧,Public。Static。Class,然后呢,Build build。实现实现谁呢?实现intercept下面的点。OK之后out加回车实现走。
10:01
哎,这样就创建好了,那这块呢,需要创建一个自己的一个对象,比如说new的一个是自己。是他吧,来,把它拿过来。哎,这样呢,整个这个时间戳拉线器啊,我们就写好了啊,非常简单吧,那下面呢,需要对它进行一个打包编译。OK,编译成功之后啊,我们仍然需要将这个拦截器啊,这个夹包再次上传到,哎,辅助的这个立包下,好吧,诶那右键。Open打开。打开之后,哎,放起来。我们来到这个104上啊。来到这里面,然后呢,再进入到这个力包厢。将这个文件直接拖拽过来。
11:00
关闭,关闭之后呢,我们查看一下,看它在不在LS。Grape。你在不?接下来啊,这里面我们就查询出来两个拦截器,其实第一个拦截器呢,这个是我们之前写ETL拦截器的时候,是我打包上传的对应的这个架包啊,诶,那现在呢,下一个呢,这个是我们刚才既包含ETL,同时也包含对应的时间戳拦截器,那怎么办?哎,把下面这个给它删掉啊。I'm。杠2f intercept点零好删掉,哎,这样呢就可以了哈。
我来说两句