00:02
来吧,那我们就一起来这个分析一下啊,这个问题应该怎么去解决啊,呃,那你还是一样的啊,先从这里面去提取一个关键点。就是我这个问题的原因啊,就是问题的那个点在哪里啊。问题的一个点就是,我们只能控制消息的发送。对吧,但是我们控制不了他什么时候把消息真正的给我写到这个磁盘中。就目前的情况来讲,是不是你只能控制你调一个三的方法,那么他确实会给会给你发消息,但这个消息呢,到底是给你发送到这个bitch里面了,还是说已经给你写到这个磁盘中了,这个我们是感知不到的,我们是控制不了的。就目前来讲,我们是控制不了的吧,那么这就是我们这个问题的一个什么关键点啊,找到这个问题的原因了,那你找到问题的原因以后呢,那我们就可以想一想,你说我能不能控制一下,就是我要保证我的每条消息呢。都能够什么真正的写入到我这个磁盘中。
01:01
这样不就可以了吗?只要我做一个发送,那我就一定要保证你的消息是写入到这个磁盘中的。这样是不是就会好一点了呀?对吧,那你说这个事情我们应该怎么控制呢。啊来我们就有这么几种方案了啊,首先第一种方案大家可能也想到了哈,那我就直接什么使用这个。同步发送呗,对吧,我采用这个同步发送,因为什么叫同步发送呢?同步发送就是呃,我们每发一条消息,对吧,你就看这个地方啊,同步发送就是我每发送一条消息。那我就要等着这条消息呢,完全什么呀,发送完成,然后呢,把A给我响应回来以后,我才可以考虑发送第二条消息。如果说我的这条消息我都没有这个发送成功,那我是不会什么发送第二条消息的。对吧,这样的话,你想想是不是我的消息发过来以后,他就什么会立马把消息往你的这个缓冲区里面去写。呃,我往你的这个什么词法里面去写,因为你你的leader呀,还有什么你的这个follow呀,对吧,你都收到消息以后啊,这个就看你的这个A的级别是设置了什么啊好,那你假如说我们设置的比较严格,我们设置是那个什么二,那就说白了,你的leader和你的什么follower对吧,你的每个分区的什么leader和follower都收到消息以后,我才会给你响应AC。
02:16
对吧,这样的话呢,我就会什么按照你这个消息的一个什么发送结果来决定我到底要不要去提交这个。能明白我的意思吧,同学们好,我们可以把它改成这个同步发售。这没毛病。肯定得解决一个问,解决这个问题,但如果说你要这么去做的话呢,那就会带来一个更严重的问题,就是你的消息呢,是一条一条写给这个blocker的,这个的性能是非常非常不好的。明白吧,因为本身这个卡不卡的这个吞吐量是什么非常OK的,那如果说你把它改成这个同步发送以后呢,相当于我发一条我就要等一个结果,我发一条我就要等一个结果,我发一条我就要等一个结果。那就说白了,你的这个bit啊,其实已经就用不上了,因为你消息你过来一条以后呢,虽然说我这里面还能放东西,但是呢,我前面我不发了呀。
03:04
对吧,我只有等着你的这个消息真正的进入到你这个磁盘里面以后,你给我响应回来A以后,我才会考虑发送下一条消息。那就相当于我就根本用不上这个批次的这个处理了,我只能是一条一条的处理,你发一条我等一条,你发一条我等一条。能理解我的意思吧,好,那如果说你的这个,呃,就是你在这个实时处理过程中啊,你的这个数据的什么,这个数据呢,不是特别的快啊,就你这个数据消息来的这个数据呢,不是特别的快,那你这么做我觉得还OK吧,但如果说我的这个数据的什么,就是这个发送的这个数据是特别特别快的。那如果你这么做的话呢,就有可能在卡不卡层面,然后呢,导致你这个数据的一个延迟,因为你把它调成了一条一条写。对吧,那它的这个吞吐量可能就下去了,那这个时候你就要去看一下你能不能够接受,因为你把它改成了同步发送,导致你整个数据的处理过程变得更慢。对吧,你就要去考虑这个问题了。好吧,当然一般情况下我们都是接受不了的,因为毕竟我们搞的是实时呀,是不是啊,所以说我一般我是不会允许说,诶,你的某个环节呢,这个我为了解决某个问题,然后呢,我明知道你会拖慢我的数组,但是我还这么去做的啊,那这种情况下一般我不会采用,除非说逼不得已,那如果说我们有更好的方案,那我肯定就怎么放弃这种方案。
04:20
对吧,所以这种方案我们一般是不推荐的,你能理解吧,一般是不推荐的啊好。行呃,那这种方案我们不推荐,以后我们还有另外一种,另外一种方案,第二种方案就是什么呀,其实很简单,大家想想哈,我们现在问题的原因不就是你的数据写到这个缓冲区里面以后。对吧,还没有及时的写到你的这个分区中啊,还没有及时的写到你这个磁盘中,我就把给提交了。对吧,那你说我能不能这样去做哈,呃,你该发送的你还是发送,那我能不能在它们中间,我再去加上一个操作,什么操作呢?就是我强制你把缓冲区的数据呢,刷到这个磁盘中,然后呢,我再次去提交这个outside能不能。
05:07
这样是不是会更好一点啊,就是我调用发送的时候,你就这么正常去发,如果说你已经写到磁盘了,那就更OK了,如果说你的数据呢,还待到这个缓冲区中,好,那我在这个提交opposite之前呢,我就强制性的把你这个缓冲区的数据呢做一个刷写。就要求你把数据赶紧给我刷到什么,刷到这个磁盘中。那我有了这个动作以后呢,诶,我再次去提交这个offset,是不是更加的保险了呀。对吧?当然哈,我说的是更加的保险了,你说这种情况下你就敢绝对保证数据就不丢失了吗?不敢保证,因为你只是告诉他把数据呢给我刷到这个磁盘中,但是他能不能够真正的把数据刷到磁盘中,能不能把数据真正的写成功,这是另外一回事了,当然这个问题我们就控制不了了,因为这个是卡不卡它内部的机制了,你就控制不了了,但是大概率情况下它是不会出问题的。因为他本身就是就是来做这个事儿的,你怎么可能说,诶我刷刷个数据,我觉得这个刷出问题了,不会的,要出问题也是你整个这个卡不卡集群就出现问题了,那这种问题的话呢,我们是避免不了的。
06:10
你就没法去搞,你你哪能想得到啥时候这个卡不卡机群给你出个问题啊。对吧,所以说我们尽可能是要去做到什么,做到能够帮助他啊,然后呢,呃,把这个数据呢,能够更加安全的,或者什么更加这个靠谱的啊,写到我们的目的地。啊,我们尽可能去做到这个事情,但是呢,你想保证这个100%哦,那这个还是比较困难的。对吧?啊,但是呢,像我们刚刚现在现在分析这个问题,就是他能不能真正的把数据从你的缓冲区,然后呢,刷到这个磁盘中,这个事情我们只要把这个动作触发了,让他去写,那基本上是都可以把数据写到你的这个磁盘中的。所以说我们就什么可以在这个位置,对吧,在你每一次要去什么,要去提交这个opposite之前干一个什么事呢。刷写一下你的卡不卡的什么缓冲区啊,刷新一下卡巴缓冲区。
07:01
理解了吧,所以你看了哈,我们的方案就是这样子的,我们可以什么呀,在手动提交offset之前,然后呢,强制将缓冲区的数据呢,Flash到这个block。这个我们只要调一个flash方法就可以了,能明白吧?啊,而且我们的生产者对象呢,是提供了这个flash方法的,我们直接去调就可以了。好吧,那接下来我们就把这个问题呢,给他解决一下来,这个问题怎么解决呢?就很简单哈,你过来,那我首先这样吧,我先在我这个卡不卡这个U里面的话,我也把这个功能给它加上啊,你看因为我们发送消息,我们是不是都在使用这个produce对象啊,对吧,那我就干脆给它加一个方法,就是什么方法呢?叫这个什么刷写对吧,就什么将缓冲区的数据,然后呢,刷写到这个这个磁盘。对吧,这个怎么写呢?那我们就写一个什么,就那我也叫这个flash吧,可以吧,然后我就什么呀,直接什么这个producer好producer讲什么讲这个flash这就完事了。
08:01
就是我的生产者,然后呢,我强制的去做一个刷新啊,这个功能其实非常的简单。好,那关键是你这个功能是有了,功能有了以后呢,来我们再回到我们这个主程序中,我在什么地方去调用它呢。对吧,你从这个整个代码的这个流程中来看的话,我们是希望在你的这个中间位置,就是在它这个中间位置,然后呢,我去调用一下这个flash,然后呢,你再去做你这个。的一个提交。对吧,这是从整个流程角度来讲的,好,那如果说你放到我们这个代码中的话,那你想想我应该在什么地方去刷新。好,那我现在给你的还是三个答案吧,ABC,来,大家这个再来选一下。在什么地方?
09:00
啊,同学们选一下啊ABC如果是你的话,你会选择哪一个?有选A的。有选B的,有选C的啊,你看这一回这个意见就不统一了吧。对吧,意见就不统一了啊,我看看选什么的,选A的,目前选A的是比较多的哈,目前选A的比较多。然后还有还有选D的。还有选D的是吧,D是什么呀,那你跟我说一下D是什么,我现在没有给你D啊。啊,那你的意思就是ABC你都不同意是吧,ABC你都你都不太满意啊,你想的说能不能选D是吧?是这个意思哈,OK,可以啊行来吧,分析一下吧,啊同学们分析一下,首先还是一样的啊,我们把这个这还是我的佛的理念,现在要干什么事情呢?现在我要去什么,去这个刷写这个卡不卡的缓冲区啊,就是刷血。
10:08
卡不卡这个缓冲。对吧,啊环城区啊,OK,呃,那在这个地方合不合适呢?那你就分析一下呗,首先如果说你要在这个在这个风区里面写的话呢,那你还要明白的就是它是excu站执行,对吧,我把这个还是给你写出来啊,它是在我的这个excu端执行的,并且呢是每条数据呢,我都要执行一次。对吧,我给你强调一下啊,在这里面执行每条数据都执行一次,这个时候你再想一下我用它合不合适。啊,其实选A是不合适的,你能听懂吧,但是选A是可以的啊,但它不合适,为什么不合适呢?你来分析一下啊呃,因为首先啊,我们的代码就是像你看像我们的这个发送卡夫卡啊,发送卡卡这个过程,对吧,这个其实都是写到我们的这个for里面的代码,能理解吧,那就说白了,我整体的这个分流的过程,整体的分流它应该是在你的Q上去执行的,理解吧,啊在exq段执行的,然后呢,我们每拿到一条数据你看了哈,这不相当于每拿到一条数据吗?我就做下面的这个分流操作。
11:20
好,那如果说你是写到这个for里面的,就相当于你每拿到一条数据,你分流完成以后,你就做一个flash,你每拿到一条数据,你分流完成以后就做一个flash。那你这个是不是相当于还是一条一条数据再去做这个flash啊。那就相当于我刚刚把数据你看啊,刚有一条消息我发送过来了,那我就做了一个flash。对吧,我再有一条消息,我这个发送过来了,我又做了一个flash。那你看看这个跟你的同步发送还有什么区别吗?你同步发送不也是一条一条消息去发吗?那你现在不也是相当于我发一条消息我flash一下,我发一条消息我flash一下。你的缓冲去你也没有用上啊,你不也是一个相当于同步发冲了吗,就。
12:03
对吧,所以说你这个选A注意啊,它是不合适的,但他行不行,他一定可以。明白吧,但它不合适啊,这个太不合适了啊,这个是不行的啊,呃,如果你要选它的话呢,呃不太好,为什么不太好呢?就是呃这个这个就是相当于是同步发送了啊。相当于是这个同步发送了,同步发送消息。对吧,这个肯定是不合适的啊,我们是不选它的。OK吧,行,那我们再来这个分析一下B啊,也有同学选B啊,其实B和C啊。其实B和C啊,现在是一个道理,他们都不行啊,他们都不行。为什么不行呢?我一分析你就知道了啊。就大家现在呢,其实对这个对这个Spark这个东西啊,理解的还不是很到位。啊,为什么我敢这么说呢?因为这里面有一个非常关键的点,大家总是考虑不到什么点呢。
13:08
就是执行的位置,Java和就你代码执行的位置,这个东西你总是考虑不到。我给你分析一下啊。呃,如果说我在我的for r DD里面,For的外面,我要去刷写这个卡不卡的缓冲区,好,那我告诉你这个地方,我们是这个地方,我们是在这个Java端执行的。对吧,它是一个批次执行一次,那就相当于什么呢?听好了同学们。来我们在这个位置去执行,就是相当于什么一个批次,就整个这是一个批次的数据,对不对,你RDD对吧,整个是一个批次的数据,好那我们把这一个批次,比如说啊,把这一个批次的数据我整个都分流完成以后,那我现在要干嘛呢?我要去刷写这个卡不卡缓冲区了,对吧,这个是在Java上执行的,你能听懂。这个是在Java上执行的,而你真正的数据的分流呢,是在你的ex上去执行的,对不对,那就相当于我们将来你你里面的这种代码,比如说我们用到这个麦卡卡了,我在我的每个ex里面,比如说我有四个并行度,对不对,其实我的每个里面我都会执行这一份代码的,那就说白了,我的每个并行度里面,我都会单独创建一个生产的对象,就是这个produce对象,Produce对象,Produce对象,因为你只要一调这个方法,那你看一下我调用这个S的时候,我它里面你注意了啊,我调用这个S方法的时候,我是不是首先。
14:35
得保证你有这个对象啊,那你这个对象是怎么来的呢?这个对象是我定义出来的,他是不是要去创建一下的呀,那就说白了。如果说你是在这个driver端去在这个driver端,然后呢,去做了这个刷写,而我的分流呢,是在我的每个exq里面去做了这个分流,我这里面的produce对象,我这里面的这里面的这里面的。跟你端的是一个东西吗,同学们?
15:02
啊,我这个produce对象,他把数据呢,写到自己的那个什么缓冲区里面了,他写到自己的缓冲区里面了,他写到自己的缓冲区,它写到自己的缓冲区,然后你最后呢,说我在这个位置呢,我去做一个刷写。你觉得你能刷得到吗?啊,这个对象啥事都没干,都没写数据,然后你做一个刷写,你刷的是它,你能刷到这几个对象吗。难不难?根本是不行的吧。对吧,你你这玩意儿不就是什么,相当省,相当于一个刻舟求剑是吧。啊,你根本就不在同一个位置。你你说你在那个那那个地方,你能把那个键给它捞了起来。是吧?所以说这种问题啊,就是你们在这个使用这个Spark是需要这个经常要考虑到的,你的代码的执行的位置在哪里。那我将来做这个事情,我要对应你的位置去做这个事情。听懂了吧,好,所以说你在driver端写肯定是不行的,那既然我这个B这个位置,它是在这个driver端的。
16:07
那肯定就不行了,对吧,它虽然说是这个一个批次执行一次,但是呢,这里面的问题是什么呀?就是我们的分流,分流是在什么ex方案完成明白了吧,那你如果说你在这个driver端去做这个刷写的话,Driver端做刷写。做什么做这个刷鞋啊用的刷的不是一个对象啊刷的。不是同一个对象的这个缓冲区。明白我这意思了吧,那你这个他不行的话,那这个也是一样道理,他肯定也是不行了,这就什么,这就叫什么叫这个刷鞋。卡不卡的这个什么缓冲区对吧,他可能也是不行的了,一个道理啊,对吧,你分流呢,是在这个excu里面完成的,然后呢,你要去在你的这个driverva那做刷写,那你刷的是同一个对象吗?肯定也不是的。
17:07
所以说你这么考虑下来的话呢,其实选A的还相对会靠谱一点,它毕竟它是可以的,只不过呢,它导致了什么呀,你的性能都会变得更低了。理解我的意思了吧?好,那这个时候我们就要想着说,那我能不能有一个比较好的地方啊,就是我既可以做到不是同步发的,就是既既可以做到不是一条一条的刷写,然后呢,还能够在我的execute端去做刷写。有没有啊啊。那这个时候你就要从你的这个算子角度,然后呢去入手了,你就想一下有没有一个算子,它是在这个ex里面去执行的。对吧,并且呢,它还能够做到类似于什么呀,就是批次的这个效果啊,就是将来什么这个批批处理的这个效果有没有。
18:01
是有的。啊,它是有的,来看一下。我们现在调用的这个算子叫什么呀,叫做。A叫做for。那么其实我们在这个RDD里面,我们有这个东西叫for partition。对吧,诶大家都想到了啊,这个这几位同学还是不错的啊,这个李康还有什么这个呃,俊俊。俊鹏是吧,还有什么声明对吧,不错啊,想到了。那你看一下啊,如果我用这个for partition的话,那你给我的你你给我分到这个某个ex里面的数据,对吧,那我是按照分区来进行执行的,比如说你有四个区对不对,那我是不是可以在你的这个区的数据处理完成以后,我做一次刷写,你这个区处理完成以后,我做一个刷写,你这个去处理完成以后做一个刷,是不是正好它会对应什么呀?它它是不是会对应我这个卡不卡的那个四个区啊。明白吧,因为你看for each part什么,它里面的这个for each part,就是它里面的这个分区,它分几个区,这个是不是跟你那个什么并行度也是有关系的呀,同学们。
19:08
对吧,正好就会跟你这个卡不卡的这个取暖是吧,对应起来。能理解了吧,所以就相当于诶,我把你的这个数据呢,分成了这个四个区来进行处理,你这个区呢,将来是一个并行度,比如说诶在这个某个ex,你这个区呢,在某个并行度,在某个excu,这个也是在某个X对吧,这个也是在某个X里面,那我一个区我执行一次,然后呢,我做一个什么。Flash,这不就更好一点了吗?对吧,所以说我们要把这个for呢,给它优化成for partition,注意以后当你想到使用for的时候,你就考虑一下我可不可以换成for partition,因为它是可以怎么做的呀,它是可以每批次每分区执行一次的。而你这个是每批次每条数据要执行一次。听懂我的意思了吧,所以说你看我给你写一下啊,这个for partition,如果说我要这么去做的话,他给我传过来的就是你的,诶一个分区的一个什么迭代器了啊,那我就叫这个JA object什么it对吧,我给你的是个迭代器,那我这个迭代器里面注意了啊,我这个迭代器里面我是不是就可以对你的这个数据呢,做一个什么,做一个迭代,那我迭代出来以后是不是也是每一条数据啊。
20:26
能不理解对吧,那这里面就是你的每一条数据的处理,但是我的循环的外面这个位置,或者说呢,这个位就是你的for的外面,或者是这个位置来,我写到这这个位置是不是就是我们的叫什么叫for each partition里面,对吧?你的for part里面它是在哪执行的呢?它是在。端执行对吧,它的执行的周执行的这个频率什么呀,是每分区啊,应该是叫什么每批次每分区执行一次的。
21:02
能理解吗?同学们,所以说接下来啊,我们就可以把这个代码作为一个优化,你看啊,这个for的代码,For的代码你看啊,我这里面是不是拿到你的每个对象就开始做分流了吧,所以说我就从这开始,你看好了啊,从这啊,从这开始来,一直往下走。它到这结束了,对不对,那我就把这个代码,完整的这个代码,呃,我应该是这样啊,从这开始把这个注释就留到这啊,把这个完整的代码一直往上走,走走走到什么地方就走到这个位置啊,就走到这个位置,把这个拿出来。对吧,这是我们之前写的那个for的代码,这个代码我现在就不用了,你能听懂吧,我要把我刚刚那个整个代码呢,放到什么,放到这个里面,放到for循环里面,是不是也是我拿到了每一个战胜object以后,我接下来做的处理啊,那我就整体放到这个里面就可以了。理解吧,那么这样的话,你看一下我的这个for这个代码我就不要了,把它注释掉了,好注释掉。对吧,注释掉,那我把这个,呃,我看看啊,这个注释掉来,这个是我的,呃,For r DD里面啊,那我还是这个提交oppositeset,这没问题,然后你看一下这个时候我的for partition是不是也有了呀,那你既然这个地方有了以后呢,那你想想我是不是就可以在这个位置去刷写什么呀,是不是可以什么刷写这个卡卡了呀,对吧,那咱们写的就是买卡不卡有条件,什么叫做flash。
22:30
那你看一下我刷写完成以后,我在后续是不是提交了你的呀。对吧,同学们来整体看一下啊。我把这个for循环给它折起来,你看一下我是不是在你的一个分区的数据来了以后,我这里面是迭代你一个分区的数据,那这里面我就完成了你整个分区数据的一个分流工作,那我完成以后来我刷写一下。看到了吧,同学们,就相当于我现在是一个分区的数据,我整体的做一次刷新。OK吧,诶这个效果就会做的比较好。
23:03
好吧,来这就是我们这个最终的一个,诶解决方案,就把它这个解决掉了,行吧,这个下去以后,你这个好好的去分析分析啊就是。呃,我们的这个不同的这个方法对吧,或者什么不同这个算子啊,它这个执行的一个情况是什么样子的,你这个一定要很清楚才行。你在用的时候一定要想清楚,我用它合适不合适,我能不能做优化。OK吧,就现在大家对这个Spark的这个使用来讲啊,我觉得你要是调用一个什么基本的算子,这个算子干嘛的,你应该都很清楚。对不对,但是如果你再往深了去考虑考虑,就是它的这个什么执行的位置啊,它的一个什么执行的频率啊。啊,然后呢,结合上我们这个实际的一个什么业务啊,实际的一个需求啊,你去这个斟酌的话呢,大家可能这方面还是稍微欠缺一点。啊,以后你再去使用这个Spark的时候,你是优先要考虑,就是我用了这个方法,用了这个算子,对吧,或者说我写的这个代码,它是在哪里执行的,我希望他在什么地方执行,我希望他执行的频率是什么。
24:05
对吧,那你再结合上你对每一个算子的一个什么理解,那你不就知道用哪一个了吗。好吧,来这个我就不多说了啊,行,那这个问题呢,我们就解决完了啊。OK吧,就解决完了啊,就是这里面也有代码啊,大家可以自己去参考。OK吧,行,停一下。
我来说两句