00:00
好,接下来呢,我们来看一下第三章卡卡的生产者,那在这一章当中我们要做什么事呢?还是大家回忆一下卡卡呢,有三部分组成,由生产者、博科和对应的消费者。那在第三章,我们要做的事就是将生产者的数据如何发送到合法集群。那在这个发送过程当中发生哪些爱恨情仇是吧?哎,就做这件事儿,那首先呢,我们来看一下卡卡生产者的一个发送原理。双击PPT打开。那首先摆在我面前的问题是什么呢?希望啊,将外部接收过来的数据传送到卡布集群,就干这个事儿。相当于把100G的还有日文如何搬到宋红老师家里面?那怎么办呢?那他首先呢,创建一个微线程。因为他要搬送的话,是不是得有人呢?哎,那他创建一个对象,就是他发producer。有了这个客户端对象,他后续呢,哎,就可以把这数据发送到合法集群,那他怎么办的呢?哎,他首先调用的是一个叫散的方法。
01:09
把这批数据诶,通过S的方法进行往出发,好,那接下来经过的是一个拦截器。那这波数据到拦截器相当于什么呢?哎,相当于宋老师每天晚上把你拦下来,哎,送给你点海之门,说白了可以对你这个数据啊进行一些加工和一些操作。那当然你说这个海哥,我不希望他被他拦下来行不行呢?当然可以,没问题,这个拦截器呢,是一可选项,比如说你用到拦截器,那你就增加这个功能,如果不用那也没关系,其实在生产环境当中用的并不是特别多啊,那一般我们用谁的拦截器呢?还记得我们之前学一个flu吗?Flu框架当中有south China和S,而且这个位置是不是有个拦截器啊,哎,通常情况下呢,我们愿意用flu的拦截器,哎,因为它的这个配置使用起来更简单一些啊。好,那经过拦截期之后,数据继续往下传。
02:03
经过的就是序列化气。那这个序列化卡布尔呢,有自己的序列化器,那他为什么不用Java的序列化叫呢?原因是因为Java的这个序列化,是不是传输的这个数据太重了,比如说这是有效数据,其他的这些啊,是一些辅助保证你安全传输的一些内容。但是你想大数据场景下,每批传过来数据量都非常非常庞大。那你每个数据都占用这么多,那严重的浪费时间,他希望做的是,诶,这么多都是有效的信息,我简单的一个校验就OK了。哎,所以通常情况下,大数据我们学到框架,像oop哎,以及后面学的这个Spark flink哎等等,都会自己去对这个数据进行去化。所以这个要知道啊,那这里面呢,卡普卡也是用的这个去热化器啊,进行去化。然后再往下传,下边走到了分区器。那你过来这批数据我要往哪一个分区里面进行存储,因为我们要把一个海量数据切割成几块嘛,对吧?哎,分区就是一块一块的。
03:08
那你切割成,比如说我这里切割成三块。那我过来一条数据,这条数据到底应该发送零号分区,一号分区还是二号分区呢?由谁来决定呢?那这里面就有这个分区器来帮我们判断这条数据应该发生到哪一个分区,那这里面有对应的规则。那假如说哎,我们已经确定好了,这批数据呢,就发送到每一个队列里面,记住啊,是一个分区会创建一个队列啊。方便数据的一个管理,那好。那这个整个这块,记住它都是在内存里面完成的。内存里面创建多个队列,那这个内存大小是多大呢?它的总大小默认值是32兆。记住是32兆这个值啊,非常重要啊。那下面那我们往这里面传,每一批次的大小又是多少呢?一批次大小是16K。
04:04
16K1P16K1P这样的一个进行存储好往下走,那么下边呢,这个有个三根线程,它又来干什么呢?它是来啊帮我们把这个缓冲队列里面的数据读出来之后,发往对应的卡法集群,他来干这事的,你前面这主线程,那我这里呢,是专门用来发送的线程。那发送的时候它怎么发呢?那首先来了解第一个概念叫batch size。就是我们刚才说的这个值,哎,Producer。那每批次大小呢是16K,那我三根线程什么时候来拉这里面数据。记住每批次的数据满了之后,那我就可以把数据发过来,你只要一满我就装,哎,不是说你这里面,比如说你装了只装了一半。这里面放了一点,那我就来拉,那不合适,那这个效率有点太低了,知道吧,哎,我是等你这里面全部装满之后,我才开始调用三能线程,这是第一个条件。
05:02
那第二条件。零,为啥有个0S呢?大家想哈。如果我这里面就一条数据,那你是不是也得能帮我去进行处理啊。哎,那这里面引入了一个时间,比如说虽然说你这个bit size啊没有达到16K,但是呢,哎,我这里有另外一个参数叫link Ms。它的值到了啊,比如说这里面有我是可以设置五毫秒。那五毫秒内,你这里面没有达到这个16K,那我把这批数据也可以拉上啊,是这个意思啊,那只不过呢,这个02S啊,默认这个值呢是零毫秒。那零毫秒什么含义啊,就是数据过来一条,我就发送一条,讲究的是时效性,那在生产环境当中啊,这两个值呢,都需要进行一个灵活的调整。那你看你要是这边设置为零的话,那这个16K这个参数是不是就没有用了,那你就得过来一条数据发生一条,那其实这个效率也比较低,其实你每秒钟能够传送的这个数据量就要少一些,后面我们有专门一节啊来讲解对应的这个如何调整卡卡的这个存储量。
06:04
总之啊,我们这里面两个条件啊,再总结一下,一个呢是这个批次大小达到16K,或者呢LMS这里面时间达到零毫秒,那我们就可以从这里面取出来我们想要的数据,然后准备发往卡帕机群。好,那接下来呢,三根线程开始拉取数据,那它怎么拉的呢?是这样啊,你这里面不是一个一个分区的数据吗?未来要发送到不同的卡法集群节点。那它怎么发呢?它是这样,它以这个每一个节点为K,比如说博一它为K,哎博二为K,然后后边跟上对应的一波波请求,叭,如说这里面请求的,哎,我拉的是这个分级的数据,这个分级的数据那它会放到一个队列里面,哎形成这么一个请求,哎这样来发送,那这样发送的话,大家注意,假如说我第一个请求,诶拉这个数据发送到博客一。那么,如果博客一没有及时的应答,那我允不允许发送第二个请求呢?
07:05
哎,他是允许的,最多,那我能拉取多少呢?哎,最多有五个请求,如果都没有收到对方的应答,哎,那这就是极限了。啊,如果你再多的话,那我就不会再给你发送请求了,比如说到第六个请求的时候,我还给你发吗?哎,不允许发了,直到第一个请求说有对方应答了,那你可以再发第六个,比如说最多可以缓存五个啊这个呢,大家知道,呃,是为了我们后面讲乱序的时候给大家再详细的讲解啊。那下面呢,这个呢是select,那select是干嘛的呢?你想我现在啊,是把这里面的数据发送到对应的卡法奇群,那首先从底层的链路上,我是不是得打通啊。爱让这个,比如说这是IO流的输流,这边呢是IO流的输出流啊,相当于是一条高速公路,那这个请求呢,相当于高速公路上跑的一个汽车啊。那链路上打通之后,那这边就开始发送数据,发送到卡法集群之后,那卡法集群呢,有一个副本的一个同步的机制,发送过去发送过去。
08:08
那么如果卡集群收到数据之后,它涉及到一个应答机制,那么卡巴的应答级别啊,有这么三种,零,一和负一,分别什么含义?如果你设置的应答级别是零,就是生产者发送过来数据之后,你就不用管了,他继续发下一波数据,那这里呢,就是零这种方式。那下一个呢,就是ack等于一这种方式,一的方式呢,是你这边数据发送过来之后,Leader收到,Leader收到,哎,收到兵落盘之后,那他就可以进行应答了,那你即使你这个副本没有同步完成,那也没有关系啊,是这种方式啊。那另一种方式呢,是AC等于负一或者二,这是一个概念啊,它俩相等,那么就要求leader和isr对列里面所有的节点收齐,那这个isr呢,大家现在呢还不太理解,那你就把它当为leader和所有的佛都收齐之后进行应答,比如说大家都收到了,都已经落盘了,那给他一个应答信号,说哥们儿,我已经收到了,哎,是这个意思啊,好,那后面呢,会详细的给大家讲解对应的应答机制。
09:13
那如果说反馈回来的信息是成功,那我生产者这块会怎么做呢?首先我要把这个请求诶给它清掉,同时呢,我要会清理掉对应的每一个分区的数据,那这边我清理掉完之后,哎,这是成功的一个机制,那万一没成功呢?哎,天有不测风云,失败了,那失败怎么办?哎,失败了我可以进行一个重试。那这个重试的次数默认是多少啊,哎,默认是int的最大值,说白了就是死磕,哎,直到这个重试成功为止,那当然了,这个re踹啊,它是可以进行一个修改的哈,啊那那他重试怎么重试啊,哎,重试的时候啊,他直接从这个请求里面,哎再进行一个发送,然后这面再应答,直到它成功为止啊这样一套流程哈,好,那下面呢,我们来呃给大家详细的回顾一下整个这个发送流程哈。
10:07
首先呢,摆在我面前的是这是生产者数据,这边呢是集群,我们希望把这里的数据导入到这儿,那导入到这儿的过程当中,首先呢,我们是在main线程当中创建了一个pro对象。然后呢,我们会调用一个散的方法来发送数据,那发送数据过程当中有可能会遇到拦截器,根据你生产环境的业务需求,是否要增加拦截器啊。那这是可选项,一般建议呢啊不用就OK了,那下面呢,是通过序列化器对这个数据呢进行序列化,因为你这是跨节点通讯嘛,这个节点和这个节点之间,那很显然啊,你需要序列化,那序列化呢,一般都是用它自己自带的序列化啊来进行处理,不用Java的S。那下面呢,你要想把数据发往到不同的分区,那就得有分区器来规定每个数据发送到哪些分区。
11:00
那发生到分区啊,其实呢,它是发生到一个缓存队列里面啊,缓存队列这个队列大小呢,是32兆,32兆。之后呢,这里面每批次大小是16K行,那这里面稍微给大家介绍一下啊,稍微扩展一点,其实这个队列呢,叫双端队列。它这里面除了有这些队列之外,还有个叫内存池。内存池,这个内存池有啥作用呢?你这边往这边发送批次数据的时候,会创建批次大小对吧,那创建的时候它会从内存池里面取出内存。那后续啊,这些数据发到这个卡法基之后,这个数据会不会释放,哎,释放的时候会把这个内存再释放到内存池当中,所以说一边呢是创建获取内存,一边呢是释放内存归还到内存值,哎,是这样一个啊,叫双端队列啊。稍微了解一下啊,想达数据进到这里面之后,接下来三线程来主动拉取数据,那拉取数据呢,有两个条件,一个条件呢是BAT size,就是达到16K大小,那我就可以发,或者呢,就是这个ler Ms。
12:06
这个延迟时间到了,那我就主动发送,那这两个条件是货的关系,任何一个满足要求就能够发送数据啊。那下面呢,我们在开始发送数据的时候要注意了,哎,发送数据的时候呢,我们是把这里面每一个分级的数据,哎,通过以节点的方式,每一个节点来一个这么一个队列来往出进行一个发送,好。那这里面要注意哎,你这个发送的时候,发送过去之候,它这边如果没有及时应答,那我最多呢,可以缓存五个对应的一个请求啊就可以了啊好,那我们下周下面呢,我们是通过这个select,诶把这个底层链路啊给它进行一个打通,这是呢类似于输入流,这边呢类似于输出流啊整个通道打通之后开始发送数据,发送数据集群收到之后进行一个副本的同步,同步完毕之后呢,进行一个应答,那应答呢有三种级别,零一和负一,零呢就是生产者发送过来之后就不管了,那一呢,就是生产者发送过来之后,Leader收到数据落盘,那就可以进行一个应答,那么如果A等于负一或者二,那就是生产者发送过来数据之后,Leader你暂时可以认为和所有的follow都收集之后进行一个应答啊。
13:18
啊,那这边呢,应答成功,如果成功,那就清掉对应的这个请求啊,同时呢,要清理掉分区的数据,那万一没有成功,失败了,失败了就是重重试啊,那重试呢,这个默认的重试的次数呢,是int的最大值,哎不断的发送,直到它成功为止,哎这样呢,就是整个这个生产者的一个发送流程,大家呢,先从整体上有这么一个印象,那后续呢,我们会针对每一个核心模块,哎详细的给大家去啊阐述啊。
我来说两句