00:01
好,我们上课了啊。还是先把昨天讲的东西做一个简单回顾哈,嗯,昨天的话我们主要完成了就是日志数据的采集和分流,对吧,把它这个分流工作做完了,然后呢,还解决了它里面的几个问题。啊,然后最后的话,我们又把这个,呃,业务数据的这个采集啊,这个做完了,这个分流还没有做。是吧,我们把这部分内容再来这个简单看一下啊。这个日志数据的这个采集和分流啊,嗯,主要还是围绕着这个图来展开吧,对吧,其实我们在这个写代码的时候呢,也是一直在呃围绕这个图来去展开的,嗯,就是你先去搞定这个采集的这个工作,就是能够把数据呢采到这个卡不卡的一个topic里面,然后呢,再通过我们这个Spark streaming呢,去做这个分流操作啊,最后呢,把数据呢这个分到对应的这个主题中啊,这是我们这个整体的一个处理流程啊。好,呃,这个基本的这个分流工作应该都没有什么难度啊,就是你把数据拿出来,然后呢,做一个解析,做一个拆分啊,然后呢,通过这个卡不卡的生产者把这个消息呢给大家什么再发送到你的对应的主题中啊就完事了,好吧,就这部分功能我就不再去说了啊呃,那我们重点还是去说一下这个解决了两个问题吧。
01:19
第一个是我们想要去做的一个精确一次消费啊,这样的一个效果。对吧,就是最终我们肯定是要达到这个精确一次的啊,因为你这个处理数据嘛,你不能说,呃,同一条数据你这个处理了多次啊,或者说呢,有的有的数据呢,你没有这个没有这个进行处理,嗯,漏掉了,或者是你这个数据丢了,那你最后这个计算的这个结果肯定也不准确。是吧,所以说我们想追求的就是在处理数据的时候,我们的每一条数据呢,都能够,诶不多不少刚刚被处理过一次,那也就是所谓的什么精确一次。对吧,这是我们最终要去追求的啊呃,但是昨天我们在讲的时候,我们在呃,当前的这个环节中啊,我们还是比较难去做到这个精确一次的。
02:02
啊,虽然说也能做啊,但是那种方案我们不给他pass掉了,就是这个事物的这个方案大家应该都有印象啊。呃,来吧,那我整体说一下啊,首先呃,现在我们代码中存在的一个问题呢,就是它可能会丢数据啊,也有可能会怎么造成这个数据的重复。对吧,就是我们在这个消费的时候呢,出现了什么呀,这个漏消费啊,还有就是这个重复消费。好,那怎么就漏消费了呢?那很明很明显就是你先把这个oppositeset做了提交,然后呢,我后面的话再把这个数据呢,做一个写出操作,那假如说我这个写出失败了。对吧,这个数据就漏掉了,因为你的你已经提交这个offset了,这个时候卡夫卡呢,就认定,诶我刚才发给你的这一波数据,你已经成功处理过了。他就不会再给你发了。那么这种情况下,你的这一部分数据你就没有消费到,相当于是因为你没有写到成功的写到下游。对吧,说他是什么漏掉了啊OK,呃,那这个重复消费又是什么情况呢。
03:04
它就相当于是这样子的,我们先把这个数据呢,往这个下游去写出去。好,接下来我再去提交这个offset,那假设我这个写成功了,但是呢,我提交失败了。那这个时候卡不卡认定我刚刚发给你的消息呢,你没有成功处理,它会重新发送。那重新发送过来以后呢,又会经过我们的数据的写出。对吧,就可能会导致你的同一波数据,然后呢,被重复的多次进行处理,那么这样的话就会导致我们这个重复消费,就相当于你这个相同的数据,你这个写到这个下游以后呢,写了很多次。对吧,这是我们这个目前代码中的存在的这个问题。好,呃,那如果你要想解决这个问题的话,那就是解决这个数据丢失和这个数据重复的问题。对不对啊,那怎么解决呢?呃,我们想到的策略的话有什么呀,有这个事物的方案对吧,把他们两个就绑定成这个事物啊,做成这个原子,做成这个原子绑定啊,让他们俩什么做到这个同生共死,就是如果说我成功你也成功,那我们就都成功,对吧?只要有一个是失败的,那我们就都失败。
04:06
就相当于把之前这哥们做的事给他做一个回滚,那么至于这两个谁在前谁在后,呃,其实就无所谓了。能听到吧,其实是无所谓的啊。反正你们都是在一起的嘛,反正后面这个失败了,你前面的就要去做这个回顾,对吧,你可以做成事务方案啊,但这个事务方案的话呢,在我们这个目前的这个项目中来讲的话呢,它这个局限性比较大。啊,局限性比较大,就是对我们这个限制是比较多的啊,我们昨天也这个分析过了啊,比如像这个,呃,你必须得采用这个支持事物的这个数据库来进行这个数据的存储。对吧,这是一个点啊,然后再一个点呢,事物本身的性能呢,也不是很好啊,然后还有一个就是我们现在这个Spark STEM,它是怎么做这个分布式的一个计算的。那你的数据呢,是在这个不同的AQ上面,这个这个XQ上面去做做这个计算的。那你在这个上面做计算,那你的事物你怎么去去做这个整体控制呢。
05:01
对吧,这里面你会涉及到一个这个分布式事务。那这个带来的这个什么更复杂了,对吧,管理的这个复杂性啊,这个就什么更麻烦了。是吧,所以说我们就不考虑去使用它啊,呃,当然的话,我们也说过啊,如果说呃,你这个处理完的数据,比如说你在这个每个。Ex,上面啊,你处理完的数据已经什么变得很小了,对吧,这个时候我可以考虑什么呀,把他们都拉回到这个Java端,那你数据如果说再放到同一个篮子里面以后,对吧,这个时候你再去做这个事物的处理啊,就不用考虑这个分布式的事物了。对吧,这样也行。啊,但是呢,前提就要求你的数据量一定要什么足够小了啊,不能很大,但是呢,我们的场景呢,其实我们在处理的时候,并不是把数据做了聚合,而是把数据做了拆分。对吧,你的数据量呢,完全没有降低啊,所以说呢,我们这么去做还是不合适的啊,所以最后的话呢,呃,我们就没有什么选择这个事物这种方案。啊,那如果说你不选择这个事物这种方案的话呢,也就表示目前来讲,我们是很难把他们两个。
06:07
放到一起来去做,就是保证这个同时成功,或者什么同时失败。对吧,那我们就是什么,另外找一种这个策略啊,什么策略呢。就第二种策略了,我们采用这个后置提交偏移量,再加上这个密的这个处理。啊,这样就可以了,当然现在我们只能做到的是这个后置提交偏移量,然后像这个密等处理的话,你只能是依靠这个后续的一些什么呀,技术啊,或者什么后续的一些什么这个主件,然后呢帮你去做啊,当然哈,我也给大家去预告,预告过了,我们后面要讲这个ESES呢,它是什么支持这个幂等写入的。啊,所以说这个事情我们是可以去做的,那因此的话呢,现在我们只要保证我能够后置提交偏移量。对吧,为什么后置提交偏移量呢?因为后置提交偏移量就能够保证你的数据呢不丢失。对吧,比如说我先把数据呢写出去,然后呢,我再次怎么去提交偏移量。
07:02
这叫什么所谓的后置啊,就说白了啊,你一定要保证是在这个写出数据之后呢,再去提交你这个偏移量的。OK吧,好行,那本来的话,你这个后置提交偏移量这个事情应该很好做,我们只要手动的去什么呀,在。合适的位置去提交一下这个opposite就可以了。而且我们之前在这个学卡不卡的时候也讲过这个手动提交这个偏移量对吧。但是呢,放到我们现在这个场景吧,他又不太好做了。啊,本来它是很简单的一个事儿,但是放到我们这个场景里面就不太好做。因为我们对卡不卡数据的消费呢,并不是你自己搞的这个消费者,对吧,而是什么通过这个Spark stream去做的。就说白了,我们手里面没有消费者对象,而是人家什么做了这个封装的,那这个事就比较痛苦了,对吧?呃,不过人家这个Spark的话也想到这个问题了啊,他也帮我们提供了一种方案。对吧,就是你可以通过呃,你拿到这个什么stream,然后呢,做一个什么呃,做一个什么类型转换,然后呢,再去做这个提交就可以了。
08:07
对吧,他也提供了这种方案,但是我们分析了一下啊,这种方案的话呢,我们还用不了。为什么用不了呢?因为他要求哈,如果你要做这个事,那你的STEM的类型必须是你的这种类型的啊,必须这种类型的。但很明显,我们将来在做做这个数据处理的时候,我肯定不能保证我的类型是这种类型的,因为我要对你的数据呢,做各种各样的转换,对吧,做各种各样的处理,那等我把这个数据完全写完以后呢,那我就不一定诶变成什么类型了。这个时候你再去用它操作,那就操作不了了。对吧,说它也是对我们有一定的这个限制啊,所以说最后的话,我们就说,哎哟这个也用不了,那怎么办呢。那就只能什么自己去。写一套管理这个这个方案了,对吧,我们最后是选择了使用这个red,然后呢,自己去管理这个opposite。
09:00
好,那大概的思想是什么样子,是这样子的啊,看这个流程。就是呃,我们想方设法把你这个就是每一次哈,你读到的这个数据。对吧,你从卡夫卡中这个消费到数据以后,我先从你的这个数据里面呢,把这个opposite的这个结束点呢,给它提取出来。对吧,提出来以后呢,那你就什么该处理数据处理数据,该写数据写数据,最后呢,等你把这个数据都写完以后呢,我们提交一下这个oppositeset,当然我们说的是提交,实际上就是把你的opposite呢,诶维护到这个RA中。啊,让这个red呢,帮我们去做一个管理,做一个维护。好,等这个下一次你再到这个卡夫卡去读数据的时候,那我就什么从这个中呢,把这个offet里给大什么读出来。对吧,读出来以后呢,我带上这个,然后呢,从你的卡发卡中去消费数据。能理解吧,啊相当于就是我们自己去管理的这个oppositeset,每次读的时候呢,我告诉你从哪个位置开始读,对吧,那你读了一波数据以后呢,我看一下你读到了什么地方,那你所读到了这个地方,就是我们要去存到里面的那个位置,也就是我们下一次要到你里面去读的时候的那个位置。
10:13
对吧?啊,那么这样的话呢,我们就可以保证啊呃,这个opposite呢,是可以什么正常的去做维护的,而且呢,我们也能够什么手动的去控制这个事儿,比如说诶你在什么地方去提交这个opposite,我们都可以什么手动的去控制。对吧,诶这样的话呢,这一套方案我们就可以,呃,通过这个代码来把它这个实现出来啊,就可以满足我们想要实现的这个效果。对吧,这是这个昨天我们花了很长时间啊,然后去做了一个事儿。好吧,这个大家知道了啊,行呃,那至于这个密等星的这个操作的话,我们现在就不多说了啊,这个等我们讲完这个后续的一些主将以后啊,自然而然你就知道他怎么去做了。OK吧,这个事情不多说了啊。行啊,然后这个第二个问题呢,是卡不卡这个消息发送的一个问题啊,其实主要就是这个缓冲区问题。
11:04
对吧,昨天我们这个分析过了啊。就是你这个卡不卡啊,他发消息呢,它是异步的发送啊,一般我们都采用这个异步发送啊,那么他先把这个消息呢,发送到一个缓冲区里面。对不对,然后呢,呃,这个有两个条件的去控制啊,一个是那个批次大小,一个是那个时间,对吧,那么当这两个条件满足以后呢,他才会把消息呢,真正的写到你的这个磁盘中。是不是真正的写到那个分区内部啊,写到那个磁盘上面。好,那我们就在怀疑一个问题啊,如果说。我在我的代码层面啊,我调用了一个什么卡不卡的这个生产者,我把消息呢发走了,发走以后呢,紧接着呢,我就什么提交了这个opposite对吧,那你从代码层面来讲,你看这个流程肯定没问题是吧,但是有可能是这样子的,你调用了这个圣诞者以后,他把消息呢,诶发到这个缓冲区里面,就目前呢,还没有真正的写到你这个分区中。
12:00
是不是然后然后呢,我这个offset也提交成功了。那么这个时候,假如说我的卡不卡故障了。那数据呢,在你这个缓冲里面一下就什么都没有了,那都没有以后呢,你看看你的最终卡不卡中,你没有这个数据,但是你的opposite呢,又做了一个提交。那你的数据还是丢了?对吧,数据呢,还是丢了。好,那这个怎么办呢?啊,我们也有很多种这个解决的方案啊,比如说你最直接能想到的就是。那我就发一条对吧,我先保证每一条数据都能够什么成功的写到你这个分区里面,这不就OK了吗。对不对?我发一条,我就要保证这条数据写成功,我发一条,我就要保证这条数据写成功。是不是,那我就什么可以把它搞成一个什么同步发送。能明白吧,搞成一个同步发送啊,那这个同步发送呢,是能解决这个问题,但是呢,它会带来。性能的一个影响。明白吧,因为本身这个卡夫卡这个吞吐量是很高的,但如果说你把它搞成这个同步发送以后呢,那你的吞吐量肯定会降低,因为你是发一条,然后呢,等对吧,等这一条,哎,人家给你响应结果了,说这个发送成功了,那你才会什么去发送下一条对吧,就以此类推,你发一条等发一条等发一条等发一条等,那这个肯定就吞吐量就降低了。
13:17
对吧,啊,所以这种方案的话,我们一般是不会选择的啊,就不到这个逼不得已啊,一般是不会去选择这种方案的。OK吧,行,呃,那我们还有一种方案就是,呃,我们可以这么去做,你现在的问题不就是你把消息呢,是先发送到一个缓冲区里面,对不对,然后呢,再把消息呢,发送到这个分区里面,就是真正的这个磁盘里面。好,那我可不可以这么去考虑啊,就是我在调用完你这个生产者以后,我接下来不是要调用我这个提交了吗?对不对,好,那我能不能在他们两个的这个中间啊,我强制性的让这个缓冲区的数据。往出刷写对吧,强制他来把数据给我写到这个磁盘中,就是相当于我在这个地方,我多一步操作啊,多一个动作,这个动作呢,就是告诉卡夫卡你赶紧的啊,把这个缓冲句的数据给我写到这个分区中。
14:13
对吧,这样的话呢,其实就能够大概率啊,去保证你的数据是能够写到这个分区里面的,不会一直在这个什么缓冲里面待着。能明白吧,但是昨天我也说过了啊,这个并不能够百分100%保证数据绝对不会丢,因为你这个地方只是告诉他把数据给他刷走了。对吧,但是他能不能真正的去刷到这个分区里面,真正的写成功,这个其实还是有待研究的。啊,但是我觉得像这种事儿,就基本上这个数据往磁盘中写,数据往磁盘中写,只要你不出故障对吧,只要你不出故障,那我的数据一般都是能够正常写过去的。是吧,啊,所以这个会更加的靠谱一点。能理解吧,啊,会更加靠谱一点啊,就是我让你的数据呢,不要长时间在这里面待着,你就赶紧把数据给我什么写走。
15:00
理解了吧,啊,赶紧把数据写走啊,行。所以说呢,我们就可以怎么调用一下这个,诶生产者的一个这个flash方法啊,让他把这个数据呢,给我什么刷到这个block里面啊,就刷到这个磁盘里面。听清楚了吧,啊,这就是我们的,呃,第二种策略啊,那我们最后采用的也是这一种策略。OK行呃,然后在这种策略里面的话呢,我们代码倒是很好写的啊,但是这里面我们有一个有一个点啊,我这个D没打开啊,打开一下啊,有一个什么点呢,就是你什么在什么时机去做这个flash操作。明白吧,啊,在什么时机去做这个flash操作对不对。啊,我们问了大家这么一个问题,你可以什么呀,从。我们之前讲过的,比如说我们用到几个算子啊,什么for r DD的外面,什么for r DD里面什么里面。
16:02
对吧,哎,有各种各样的位置啊,那你就要去想明白,我们想刷写你这个缓冲区对吧,那我应该在什么地方去刷写。对不对,来我们一起来这个分析一下了啊,再来看一下这个代码,呃,这个其实我们讲完以后,大家也能够想明白,它其实不难。对吧,他其实不难,但是呢,你自己能不能把这个事儿给他琢磨清楚啊,当时能不能自已把这个事儿给他想出来啊,那我觉得还是挺关键的啊,好,一起来看。我们总共有这么几个位置。呃,For r DD的外面,For r DD的里面,还有一个是for的里面,这是我们一开始写的代码啊,同学们一开始写的代码,然后呢,我问大家这几个哪个合适。对吧,最后发现A呢,嗯,能用对吧,但其实也不是很合适这个B,这个B跟C的话根本就不能用,你知道吧。啊,为什么呢?还是那句话啊,因为我们的这种代码啊,大家注意,我们这个就是发送卡不卡的这种代码,这个代码全部都是在我们的这个for里面执行的,就一开始我们写的代码啊,那就说白了。
17:08
你将来这个真正写卡夫卡的这个动作呢,是在你的里面去执行的。能听懂吧,那比如说我们有四个并行度啊,假设有四个并行度,那我们应该就是有四个。Pass吧,对吧,你就假设有四个ex,好吧,能听懂,那就说白了,我在我的每一个ex里面,我都会去调用这个写卡普卡的动作,那如果说你都要去调用写卡卡的动作,那么它都会在你的每个Q里面创建一个produce对象。对吧,都会有一个什么produce对象,好,那你说他负责了这个写操作,他也负责的写操作,就说白了啊,他把数据呢,写到自己的缓冲区,他把自己的数据呢,写到自己的缓冲区。对不对啊,以此类推啊,以此类推,那你这个动作你写完以后,你想的说那我要去flash了,然后呢,你在这个B跟C里面flash,你B跟C呢,是在driver这执行的。
18:00
那就说白了,你在Java端呢,你也搞了一个produce对象,你要去做flash了,你就想吧,他flash跟人家有啥关系吗?没关系吧?啊,跟人家是没有任何关系的啊,同学们所以说。大家注意了,这几个B和C肯定是不行的啊,那现在我们就迫切的想着说,诶,那我就一定要什么在你的ex里面呢,去做这个flash操作,而且呢,我还不想着一条数据做一次,因为你看啊,如果说你放到它里面,那就是相当于什么一条数据做一次,那就相当于是同步发送。对不对,这个不行。好吧,所以说这个时候我们就换了一个方法,我们叫for partition,它是什么意思呢?它就是每批次每分区。就说白了啊,你的每批次每分区,它的每一个分区。它怎么分区的呀,同学们,它怎么分区的呀,它是不是跟你的卡夫卡的分区是做对应的呀,同学们,因为我们不是给了你什么呀,给了你这个几个并行度是不是啊,那你的每批次每分区就像说白了啊,你可以假设我在每个ex里面我处理一个分区对不对啊,每个X我搞一个分区。
19:11
是这样的吧,那你看啊,我的每一个分区的数据,我写完以后,我整体做一个flash,这不挺好吗。对吧,所以最后我们就换成了这个for。OK吧啊,所以说呢,呃,这个代码倒是不难啊,但是你能不能想得到我们应该在什么地方写会更加的合适啊,这个就比较关键了。这个对于你们的。就是一些什么扇子的这个使用啊,还有是对这个Spark的一个这个理解啊,还是有一定的要求的啊。行吧,不过这个我们讲完以后,大家应该都能够想的明白了啊,我就不再多说了啊。好,来,这是我们这个日志数据的采集和分流啊,然后接下来就是我们的业务数据的采集和分流了啊,嗯,还是啊,先把这个图啊。记到你的脑子里面。对吧,整个这个流程是什么样子的啊,这个是非常非常关键的,就现在还好,我们只是什么把这个日志数据呢,做了一个分流,然后接下来我们做这个业务数据的分流,那么这两个分流完事以后呢,我们还有什么进行这个DWD层的一个处理,我们还会再把这个数据呢给它调出来,然后再去做一个什么处理。
20:16
啊,就是你这个写的多了以后呢,你整个流程就会变得比较复杂,那你可能就会什么搞乱了。所以说呢,你一定要很清楚的知道我的每一层,我的数据是怎么流动的。对吧,我做了什么什么处理啊,这个是非要什么一定要什么清楚的记住的啊,说这个图呢,一定要是把它记住了,就知道我的这一层,我到底做了什么事。明白吧,啊,做了什么事啊?行,那这个目前的话,我们完成的是这一部分啊,就是已经把这个分流工作啊,错了啊,已经把这个采集工作给他完成了。就是我们能够什么通过这个maxwelll对吧,把数据呢,从你的mal中,然后呢,采集到我这个卡不卡的某一个topic中。这个都是我们测试过的,那接下来我们就可以什么呀,开始什么通过这个Spark streaming,然后呢去做这个数据的消费,然后呢做这个分流,那分流的话呢,设实数据进卡不卡维度数据进你的RA,这是我们昨天这个已经讲过的。
21:09
对吧,已经分析过的OK吧,那么稍后的话,我们就可以就按照这个什么流程,然后呢,把数据呢,给他做一个处理。好行啊,这是我们这个整体的一个架构啊,然后细节的话,这里面,嗯,这个mal怎么用啊,我不想再说了啊,你们之前已经用过一次了啊,然后昨天的话我又给你们讲了一遍。啊,基本上是。没啥大问题的啊,大家这个自己去配一配啊,就是如果说你这个采集搞不通的,那肯定就是你在配的时候呢,某个环节中啊,你出现问题了,你没有给人配好。听明白了吧,没有给他配好啊,这里面就是你首先Maxwell它安装好以后,主要是改一个它的配置文件,叫那个conflict.proper这个里面的东西一定要配对的,对吧,你要往卡布卡发,那你卡不卡的地址在哪里,你要从买烧烤那去监控,那你怎么去连买烧烤。
22:01
对吧,MYSO在哪里,你的用户名和密码在哪里,对吧?用户名和密码是什么,你都给他配好了。对不对,然后这个MYSQL层面的话就是,呃,给他什么该创建账号的创建账号,该分权限的分权限。能理解吧,还有什么MYSO,什么去开什么冰log啊,你把这些东西一套都搞定以后,你就整体去测试,一般都是可以通的。OK吧,啊,这个东西大家要自己去这个慢慢去做一做啊行,那昨天这个有个任务,就是让你们把这个采集工作呢,给它打通啊,这个基本上应该都是可以打得通的,如果说你现在有问题的,呃,自己搞了很长时间都解决不了的同学,那你这个下课以后,呃,这个这个找老师啊,找我也好,或者找咱们助教老师也好,然后给你调试一下。明白了吧,啊,赶紧把它搞通啊行,搞通以后呢,接下来我们就可以开始做采集了。对吧,啊什么做彩做这个分流了。OK吧,行,就这样啊。好,那我们就回顾这么多吧。
我来说两句