00:01
OK啊呃,那我们现在就已经把这个基础的工作啊,就把它这个准备好了。能明白吧,那准备好以后呢,接下来的话,我们就可以正式的进入到我们这个日志数据的这个消费和分流的处理。OK,来,接下来我们就开始写了啊好,那我们回到我们的这个开发工具中,把这个都关掉啊,关掉以后呢,我们下面就回到我们的APP里面,就是我预留好的啊,在这里面写,那我们先去写上一个类,好,这也是一个object啊,行,那这一层的话,我们是属于这个ods层的处理,所以说我就叫这个ods。那行吧,就叫ods啊,然后呢,呃,我们处理的是就是最原始的这个日志数据,那我就叫base log吧。对吧,Base的一个APP就是一个应用。那行吧,这个取名字你可以自己去看啊,到底取什么名字,好来说一下,这就是我们日志数据的这个消费分流。能听懂吧,日志数据的消费分流啊行呃,那我们大体呢,可以先去分析一下,在这个里面我们都要做什么事情,对吧,你还是去想想你这个整体的这个结构啊同学们。
01:12
就我需要什么,通过这个Spark streaming,然后呢,从你的卡不卡中把数据呢消费出来,消费出来以后呢,我去做一些处理,然后呢,最后呢,把分流出来的数据呢,写到这个不同的这个主题中。是不是啊,就大概的这个过程,大家都是能够想的明白的,那我们就要再具体一点,就是我在这个过程中处理的时候,我都要做什么事情。是吧,跟着我来分析一下啊呃,那首先这个最核心的第一件事情就是先去准备你的这个环境了呗,对吧,就是准备我们这个实时处理环境啊,就你的这个是streaming contact这东西你得有吧。对吧,好,那有了以后注意啊,接下来我是不是可以什么先从你的卡夫卡中先把数据先消费出来再说呗。对吧,所以第二步就是什么,从卡夫卡中诶消费数据。
02:05
能理解吧,好,那我消费到数据以后呢,接下来不就是你这个分流的一个处理了吗。对不对啊,那这个分流的话呢,呃,我们不仅仅有分流啊,还会有一些这个额外的一些什么其他处理,所以说呢,我们就整体把它称之为是一个数据的一个什么处理数据吧,对吧,处理数据。好,那这个处理数据的话呢,我们暂时能够想到的就是有什么有这个分流操作。是不是其实你在分流之前呢,你想想哈,我现在从卡夫卡中拿到的数据是什么格式啊,是不是一个完整的一个什么再生字符串啊,同学们对吧,就是我们上午做过测试的啊,就这个玩意儿啊,我看还有没有啊。好像看不着了是吧,哎,就有啊,就上午我们做过测试的,你拿到的是整体的这样一条数据,那你说这个数据就是这个数据,你想去做分流,你怎么分流啊。你解析这个字符串,你好解析吗?你不好解析,但是我们好在是什么,我们知道它是一个摘格式的,那如果你是一个摘格式的话,我就可以考虑把它转换成一个什么具体的对象。
03:08
对不对,比如说加成对象也好,或者说呢,转换成什么呀,转换成这个,你单独给他去封装一个对象也好,那你转成对象也好,我就可以非常方便的从你的对象里面去提取你的每个字段。能不能理解我的意思对吧,我就可以提取你的每个字段了。OK吧,所以说呢,诶,我们这个地方在分流之前的话呢,你需要什么,先去做一个就是结构转换,转换结构啊,转换这个数据结构。能听懂吧,然后呢,转换完成以后呢,你再去做这个分流操作。OK,那这个分流完成以后的话,我们这个第四步吧,就是呃,再把这个分流好的数据呢,再去写出到我们的卡夫卡啊来,写出到DW层。能理解吧,同学们这样的话你看一下啊,你完整的去对比一下我们的这个流程,呃,这个图这样哈。我老去看他,那我就干脆盯个图得了。
04:02
对吧,放的小一点,然后呢,我就干脆什么放到这个地方,我们方便去查看对不对,你现在对比一下吧,是不是来先从它里面呢,先准备好环境,准备好环境以后呢。来准备好环境,准备好环境以后,从他那边拿数据,好拿到数据以后呢,我们在这个环节中呢,去处理你的数据,处理完成以后呢,再把数据呢,写到这个卡不卡中,就是你的这个DWD层。理解吧,同学们好,那我们接下来我们就可以开始去写了啊好呃,那首先我们先写一个漫画法,然后呢,第一步就是准备我们的环境了啊,准备这个实施环境啊。好,呃,那这个实施环境的话,就是创建这个streaming contacts啊,那我们来去用上一个啊,这个大家应该都还记得吧,Stream contracts呃,用的时候呢,这里面是需要传上一个配置对象的啊,叫这个。叫Spark。对不对,当然了,还得再传一个什么这个Du啊,这个就是你的采集周期啊,因为你这个实时处理嘛,那你这个采集这个多久计算一次呢?对吧,它默认的这个计算周期就是你的这个采集周期,能听到吧,默认的计算周期就是你的采集周期啊,所以说我们就按照这个规划,比如说三秒钟五秒钟啊,自己去看啊,好,它需要一个SPA com对象,那我们先用上一个Spark com对象。
05:26
好,呃,这个对象的话,我们该设置什么东西呢?该设置一下你的APP name,比如说我就叫ods,然后呢,贝斯log的一个APP,然后呢,再设置一下这个master。对吧,就是你要分配几个这个并行度啊,那这个的话,我们之前都是写的这个东西吧,Local什么什么星。能不理解同学们?啊,你都写的这个东西啊,行,呃,但这一次的话我就不写这么多了,因为你在生产环境中,你肯定不会这么去写的啊同学们,你的生长环境的话呢,将来这个地方呃,就不写这个local了啊,将来你这个任务的话,肯定是分配到这个什么集群中去跑的,对吧?但是呢,我们目前的话呢,我们就不再去考虑那个集群了啊,我们就这个本地去测试就OK了,这样比较方便一点,呃,但是本地测试的话呢,我也不希望把这个所有的这个资源都分给你啊,我就给上你这个几个,比如说给上你四个吧。
06:17
啊,当然这个给四个其实也没有什么讲究啊,因为毕竟我们都是一个测试的环境啊,这个没有什么讲究啊,那这个地方我给四个的话呢,呃,我是有一个原因的,什么原因呢啊。嗯。大家应该都知道哈,这个你给个四,就代表着我将来有四个并行度。是吧,我有四个并行度,然后如果说你们还记得的话,如果说你的这个Spark streaming的数据源呢,是你的卡夫卡对吧,那我们要求是最好能够保证就最好啊,就是你的并行度能够跟卡不卡的分区个数,就是那个topic的那个分区个数保持一致。
07:00
就比如说我的卡不卡有假设我的卡不卡有四个分区对吧,零号区,一号区,二号区,三号区,那现在我开启这个四个并行度,那么正好他将来就会什么一一对应,一一对应,一一对应一对应。能明白我的意思吧,啊,那也就意味着,如果说我这个地方给四个并行度的话,其实我们就尽可能要保证你的卡不卡的,每个主题是四个分区。当然如果你给两个的话,那你尽可能保证它是两个分区。能听懂我的意思吧,这个至于是两个还是四个,就是我们目前的这个情况里面没有任何影响,就是没有什么区别的,因为你都是一个测试环境。能听懂吧,好行,呃,那我既然写了四了,那我就需要把另外一个事儿给大家去说一说啊,就大家其实上午应该也发现了哈来。我让你看个东西啊,我们写了一个脚本叫KC.kf.SH然后我叫什么。Describe,然后我写上一个叫做ods,然后呢贝斯log,然后呢1018对吧,我看一下它的这个详情啊。
08:04
呃,诶。没有这个主题吗?我是不是给他删了呀,后来。我我好像后来给删了是吧,好没关系啊这样,呃,那我先做一个消费啊。对吧,我先做一个消费啊。只要我一消费,他就会给你创建这个主题啊。呃。这应该是创建好了吧,来创建好我停掉,停掉以后呢,再来看啊,再来看这个啊。好,大家看一下,就是我这个主题啊,我没有自己手动去创建,我是交给这个卡不卡帮我去创建的,那么你会发现它创建出来以后呢,就会有四个分区。能不看明白?啊,那这个是怎么做到的呢?啊,给大家去说去说一下,呃,默认情况下,如果说你交给卡夫卡自动帮你创建主题的话,它默认只给你一个区。听懂了吧,他只跟一个区,然后呢一个副本。啊,那如果说你希望指定,就比如说诶,你卡不卡自动帮我创建这个topic,然后呢,你要按照多少个区来去创建的话,那这个是需要你去改配置文件的,而我的配置文件我是做过更改的啊来给大家去说一下。
09:10
呃,那我们就回到这个随便找个地方吧,来到这个卡夫卡的这个安装目录下面啊,然后来到卡菲里面,里面有一个叫做sod properties,来sod properties,注意进来以后你看一下啊,它里面有一个配置叫numberd partitions,能不能看得懂?对吧,这个配置它默认原来是一啊,然后呢,我是已经把它改成四了,那么这就表示我们将来在创建你这个topic的时候,它默认的这个分区的个数就是四个了。听明白我的意思了吧,啊,它就是四个了啊,说这个大家下去以后啊,如果说你这个里面也写这个四的话啊,那你就什么把那个配置也改一改,就是让这个卡不卡呢,帮你默认创建的时候就是什么四个分区啊,当然呢,你不改也行,那你就什么提前手动的去创建分区,然后你在手动创建分区的时候,你是可以什么通过那个杠杠叫做什么,呃,叫做什么杠杠什么partitions对不对,你可以什么去指定一下这个分区是四个啊这样也可以。
10:11
好吧,多种情况啊,你自己看。但是你一定要知道哈,我刚刚说的这个事情是什么?明白了吧,呃,因为这个,呃,就算我说了啊,接下来还会有同学问我。说老师为什么我的分区不是四个?为什么你的就是四个?对吧?啊,就算我说了还会有同学问我啊,他就是没有好好听啊,行,来这个事情给大家这个说明白以后啊,对,还是再强调一下啊,就是你改的时候,你改了这个配置文件以后,一定要去做分发,去做同步啊,因为我们的卡不卡是一个集群,你的102有,你的103也有,你的104也有,那你把这个配置改了以后,比如说你在102改的,那你要把这个整个配置文件发给103,发给104,这样的话你整个集群才能够同步。OK吧,行,这个事儿我就说完了啊,那说完以后我们就可以回到我们这个代码中了。
11:04
好吧,回到代码中啊,来叫SPA康,我这个强调一下吧,就是要注意什么呀,注意啊并行度啊,并行度啊与什么呀,与这个卡不卡呃,这个卡不卡中啊topic的这个分区个数啊,分区个数的这个对应关系啊,对吧?啊,我把这个注意的东西给大家这个说一下。OK吧,就将来你看代码看到这个地方的时候,哎,你就知道了啊行,那把这个创建出来以后,我就可以把这个Spark com给它传进来,传进来以后呢,后面是我们的采集周期,呃,这个采集周期的话呢,我就我们就给五秒钟吧,对吧,就是五秒钟一个周期啊,五秒钟一个周期。理解吧,五秒一个周期啊行呃,然后呢,接下来就是启动好,然后呢。这种代码我就不解释了。
12:02
基本代码。OK吧,行好,那我们把这个。呃,实时环境准备好以后,下一步就是从你的卡夫卡中去消费数据来吧,那我们就来去消费一把啊,第二步从卡夫卡中。好消费数据,那你怎么消费呢?它里面消费数据不就是什么读成一个什么STEM吗?对吧?好,那这个东西我们还用再去手写吗?啊不是还用你再去这个从头从头去写吗?不需要了,因为我们刚刚诶花了一定的时间,我们去写过工具类了,所以说你看了哈,我直接这么去写,叫做MY,卡不卡you跳点叫create,一个不对,应该叫什么get啊get卡卡stream。这里面你给人家把该传的东西传上不就完了吗?对吧,那我们需要有一个topic,需要有个global ID,那我就先提前定一下topic name对吧?那你说我这一次从哪个主题消费数据啊,同学们。
13:03
我是不是从你卡不卡的这个统一topic里面消费的吧,所以说这个地方你写的主题名字一定是你啊,哪里的呀,一定是你,就是生成器中你所配的这个主题,因为你的生成器就你的生成器把数据呢,发送到这个卡夫卡,那我叫Spark streaming,就要从你的卡夫卡中把数据给他什么消费出来。他们两个的主题一定要对应起来,对吧,所以这里面你写的是什么?OK,那我的代码中我就要写什么来强调一下,这个是对应什么呀?对应生成器配置中的主题名啊。理解了吧,好,然后下面是我们的主啊,主的话就比较简单了啊,就叫什么给法ID吧,好给ID啊,那我们就等于什么呀,就等于呃。就OS随便写一个了啊ods来我大写吧ods,然后呢,贝斯log的一个什么group对吧,当然如果你想加的话,把这个1018也加上。
14:08
这样的话我就可以就你们加不加其实无所谓啊,因为我是讲过很多班啊,说是这个,呃,我我我这个卡不卡内部啊什么的,可能会有重名的这个问题啊,所以说我加上一个后缀呢,就尽可能去避免这个重名。理解吧,啊,大家这个加不加都无所谓啊,行,那这个有了以后,你看一下我们的SSC也有了,那我就SC传过去,然后呢,把你的topic name传过去,对吧,把你的这个GOID传过去接收回来,这不就是我的卡夫卡。Re吗?对不对啊,你花时间,然后呢做了封装,然后呢,你再去真正用的时候,你就感受到这个好处了啊,这个时间其实花的还是很值的。理解了吧,好,那我们就拿到数据了啊行,拿到数据以后呢,接下来就是我们数据的一个处理了啊呃,那我们最好在处理之前呢,你先去看一下我到底能不能够去消费到数据。
15:04
但是你现在看的话呢,会有个问题啊,大家可能想的那还不简单吗?你看啊,我给你走一下卡不卡stream,我打印一下不就完了吗。对不对,我打印一下不就完了吗?我看看你这个流里面的数据不就好了吗。对吧,来是这么个道理啊,我们把它跑起来。运行你就知道问题了啊。呃。你看到这一堆东西的时候,我就想把它停掉了,这什么东西啊,同学们?这个是卡夫卡帮我们打的日志,是不是对不对,他帮我们打的日志啊,呃,但其实这个正常情况下,我们做测试的话,其实我不想看到这么多这个信息啊,这个太乱了啊,所以说呢,我们再这样吧,我们再加上一个配置啊,就在这个下面加上一个配置。我们把那个日志的这个配置文件给他加进来啊。叫log for,能理解吧,这里面写什么东西呢?你不要手动写,你到这个文档里面去找啊,我们有一个添加配置文件,这里面有啊,把它复制过来。
16:15
这个你们之前都呃,你们之前应该都加过的哈,我就不再多去解释好吧,啊都加过的来把这个加上以后,呃,我们把这个日志的级别呢,改成这个error,那么这样的话呢,像你这个什么info呀,什么warning啊。对吧,什么debug呀,都不会打,只有这个错误的信息才会往出打啊行,这个时候我们再来重新启动。啊,大家看看,他就不会再给你打日志了啊。OK,等一会儿吧,因为我们是有打印的啊,它会出现那个就是诶打印时间戳。能看到吧,同学们,行,那现在我们就可以做测试了,怎么测试呢?那你要从我的这个卡不卡中消费数据,那我不得把数据给你生产到这个卡不卡中吗?所以说你看啊,接下来我要去生产数据了啊。
17:07
直接按照我的脚本log.sh,我给你生产一波数据时间我就呃不给了吧,就默认生产一波数据就行了啊好,当我生产数据,那我的数据呢,就会发到卡不卡,那么他就能够从他这边拿数据,拿到数据,我们做一个打印好这个时候。你看我的代码。报错了。对吧,这个报错是什么原因呢?啊,你要去学会分析啊,同学们看一下,他说有一个这个错叫做object notable。啥意思?对象不能够进行。序列化。A,你在整个Spark的使用过程中,只要你看到这个错了,那你就要想到我现在涉及到对象的一个传递了,有可能从driver传给exq,有可能是从ex,我再拉回到driver。
18:00
对吧,只要你有这个对象的一个传递,那我就要涉及到序列化,好,那你涉及到序列化,你就得保证你当前所传递的这个对象,它是支持序列化的,如果不支持,就会给你报出这么一个错来。你在使用Spark的时候,你就得这么敏感,好那现在你看一下吧,他说谁不支持呀,这很明显,告诉你说,诶你的卡夫卡的什么consumer record这个玩意儿是不支持做序列化的,好那我们来看一看当前我拿到的数据里面是不是就是consumer record,这是我们的消费者记录吧,对吧,卡不卡顿消费出来的什么消费者记录,好诶这个玩意儿不支持吗?诶你点过来看看。支持吗?同学们有实现什么什么so那些接口吗?并没有,如果没有,那我告诉你,这个东西不支持。对吧?诶,那有同学想了,那不对呀,我们在卡夫卡中,我们的消息你说他最终要写到你的磁盘中,那你不得做序列化吗?为什么它就可以呢?
19:02
啊,同学们,为什么他就可以啊啊。这个涉及到一个什么点呢?听我说啊,卡夫卡中人家有自己的序列化的过程,你能明白我的意思吧,它有自己的序列化的过程,那你的这个对象过去以后呢,他会通过什么自己的序列化过程,或者什么反序列化的过程,然后呢,帮你做处理,人家是不要求你这个对象实现什么序列化接口的。人家自己写过这个序列化过程的,你们应该都还记得那个消息发送流程吧?对吧,消息发送流程就会涉及到啊,我的消息呢,先经过什么呀,先经过你的。什么叫什么序列化器啊,还有什么这个叫什么,呃,什么分区器,还有一个叫什么来着,过滤器对吧?诶拦截器啊过滤器啊,拦截器啊拦截器对吧?走拦截器走什么序列化器,走什么分机器啊,还是这个这个这个顺序我有点记不住了啊,这个先走序列化,先走这个拦截器啊,我给忘了,反正人家是有这个序列化器的。人家自己内部实现了。
20:00
但是你到了这个地方,人家可就不管你了呀。对吧,所以这个对象你不支持序列化,那你就会有问题。OK吧,所以你还看不了。啊,那如果你非要想看的话,你可以这么去看,我们先把第三步做一下,你看啊,他不是要转换这个数据结构吗?那我给你转一下。来处理数据,处理数据的3.1步,我们就什么转换数据结构,我只要转成一个支持序列化的结构不就完了吗?好,那这个时候我们就回到这个转化这个数据结构这段去说了啊,你说我要转数据结构,我转成什么结构啊。对吧,来这个是有讲究的啊同学们,我们一般是两种情况,第一种情况叫做专用结构,专用结构一种是通用结构。这个什么叫专用结构呢?专用结构就是你要单独去封装并对象。对吧,就自定义定义一个病对象,比如说诶,我这个数据里面有多少个字段对吧,那我单独的把它定义成一个什么病对象,比如说什么某某某病啊,某某某病对吧,然后你就把这个加项数据呢,转成一个什么病对象。
21:10
那么这种病对象就叫专用结构,它只限于在你当前的项目中用,比如说你把它拿到什么,另外一个项目中,他认识吗?他可能不认识你是个啥。对吧。那什么叫通用结构呢?通用结构就是比如说我转成一个map对吧,或者说转成一个什么加object。这些东西你放到哪个项目中他不认识啊?是不是都认识啊同学们对吧,说句看情况你是想转专用的还是想转这个通用的。OK吧,那这个地方的话,我们就没必要去转这个专用的,因为我们接下来马上要把你的数据给它拆开做分流了,是不是,所以说你说你封装这么个对象,好像就是中间过程用一下也没啥太大的必要啊,所以我们就不转它了,我们就什么转一个JA object吧,因为反正我们知道我们当前生成的这个数据,它就是一个摘数据来再来看一下。对吧,那么既然你是一个JA数据,那我就可以转成什么摘object OK,来我们转一下,怎么转呢?转结构的话,那不就是相当于把你的这个流拿过来卡不卡stream,好,我要做转结构的转化了,我问你用什么方法?
22:16
或者用什么算计?这个大家应该能够想明白吧,用什么呀?你就是把你流中的每一个数据呢,作为一个转换,那转换的话呢,按照你们之前学习的经验来讲,首先想到的应该就是map吧。对不对,Map不就是转的嘛,对吧,把你的保护对象,然后转成另外一个另外一个类型呗。是不是说这个我就不纠结了啊,直接什么map了啊好,那map的话呢。他给我传过来的是一个什么,是一个consumer record,不就是你这个流中的一个什么具体的对象吗?对吧,那他是一个consumer record来好consumer,然后呢,呃,我接下来要把它转成一个什么东西呢?那我要把它转成一个什么东西啊。
23:09
是不是一个战胜对象啊,那我就需要什么呀,需要先把你这个消息中的value拿出来,因为value才是我们具体的消息,前面这个是K,我们其实没有K的啊,现在。对吧,我们在生成你生成数据的时候,没有给你生成key啊,它只有一个什么消息啊,只有一个value,所以说我就什么先把这个消息拿出来啊,就是获取什么呀,获取record啊好record中的这个value。明白了吧,啊,Value就是就是我们这个就是我们的志数据啊,就是日志数据好怎么获取呢?那不就是consumer record.value这不就拿到了吗?对吧,这就是我们的一个log。好,那你拿到这个日志数据以后,我们要什么转成什么,转成转换成什么这个摘对象对吧,怎么转呢?那不就是通过我们的JA,这个你们都用过吧,就是阿里巴巴的啊,然后它里面有个什么叫做pass。
24:04
诶,没打包啊。好,等一下包啊,这什么pass一个object OK,我把log放进去,那我就转成你的一个,就是这个再生object。理解吧,那我转完以后呢,因为你传过来一个对象,那你转完以后,你得给人返回回去,所以说我把这个object再给他返回回去。对吧,这个是返回好返回OK来返回回去以后我们把这个接收回来,好接收回来以后,这就是我们的摘生object一个stream了啊就相当于我把你的礼呢,诶从你原来里面放的是一个consumer record,现在转换成了一个诶摘生object。理解吧,来转完以后,现在你再去做打印就OK了,为啥呢?因为这个东西它是有这个序列化的。好吧,来,下面我们就可以做测试了啊,我们把这个重新启动起来。
25:13
好看一下吧,其实现在就已经看到这个结果了啊,因为我们刚刚是生成过一波数据的,但只不过呢,他消费到以后呢,在做打印的时候呢,这个报了错了对吧,那你看一下这个不就把数据给他消费到了吗。看明白了吧,好,那我能够这个消费掉数据,那也就表示我们当前走到目前为止是没有什么问题的。就是我能够正常的从你的卡不卡中把数据拿过来,然后接下来就是可以做你的分流处理了。OK吧,行,先停一下。
我来说两句