00:00
好,然后呢,我们带着去写一个代码,然后呢,你回去之后呢,可以,呃,看一看其他代码其实很相似,呃,无论是我们讲的生产者代码,消费者代码也很类似,也很类似,大体都是一样的,首先呢,我们导入依赖,对吧,这个依赖呢啊这样吧。嗯,这个是打开了卡夫卡的一个,这个东西不要我还是打开我们的什么。那个PROJECT0408对吧,啊,当前窗口。打开一下,嗯,然后之后呢,这些东西我们都给它干什么,关掉了啊没用,然后接下来我再建一个什么。再建一个model对吧,啊建一个model这卡不卡。杠1MO对吧,啊卡卡。确定。然后呢,我们同样呢,要给它添加依赖,等它它变蓝吧,等待它变蓝色啊,它还在加载的过程当中啊,稍等一下。
01:06
因为越学越多,每次加载的时候会变慢了。好,变成蓝色了吧,啊变成蓝色了,好这个时候我们就可以去添加什么依赖了啊,然后呢,是depend这里面呢,我们要添加的是卡夫卡的一个客户端。啊,添加这个就行了啊,添加卡卡的客户端后面你要对Spark的时候还添加其他的一些内容啊,这个里边我们用的是0.11.0.0没问题吧,啊跟那个我们集群保持一致啊,集群保持一致放在这,然后最好呢,如果说你不放心的话,你再看一眼吧,到这个卡卡DEMO依赖。没问题吧,啊进来了,如果说没进来是干什么,刷新一下对吧?啊刷新一下这个大家应该都会了,好这个呢,我们可以关掉,关之后呢在这写代码,嗯。在这呢有一个class啊,这个呢叫com点。爱的硅谷。Producer for里边那个什么my producer可以吧,然后第用CD producer。
02:06
好,那这个里面呢,我们要写慢方法一定要有吧,PSVM啊要运行的好,那我们想一下,就是我们先不看这个文档提示啊呃,正常情况下我们现在是不是要写一个内容,要去往卡发卡里边塞东西啊。那你是不是得有一个类似于客户端的一个对象啊。你像你之前学的HDFS等等那些东西,是不是创建了一个本地文件系统,File system文件系统,然后去操作的是一样的道理,那你现在操作卡夫卡,你是不是应该有一个什么卡夫卡的一个客户端啊。对吧,啊,那我们正常情况下应该没一个卡夫卡或者叫卡夫卡的一个类似于这种吧,我们找一下叫卡夫卡KAFKA。呃,还真有。但是它是一个什么接口啊,卡客户端,但是除了这个东西,你还看到了什么,是不是一个produce,一个是consumer啊,其实我们要用的是谁啊。
03:08
Produce,而且呢,他还是一个什么泛型,在泛型的。对吧,这个对象在泛行的啊,那而且KV这个KV你说定义的就是什么。你生产者输入的这个KV类型啊,这个发型OK,那我们来一个这个里边呢,我们先写什么。Three three可以吧,好,这块它报错,报错说明他没有空餐的构造方法。它没有空参的,所以会报错能听懂对吧,所以我们看到CTRLP看一下它带的参数,这里面呢,可以用map,可以用什么。Problem看见了吧?啊,里边有个problem,也就是说先第一步,我们还不能说直接去创建对象,先要创建什么卡夫卡生产者的。
04:00
配置什么信息那一样的,你在控制台写这个生产者的时候。是不是也杠杠什么Su cable啊,不是杠杠brook list,杠杠topic等等这些东西啊,那你写代码是不是也要这些参数啊,一样的道理吧,啊,一样道理OK,那创建它这个参数,它是不是pop和这个map都可以啊,那还是习惯上用这个promise,因为promise也是什么。K。对吧,啊也是K啊,用一个problems。加把报销的吧,不都包了。加一个分号,然后ctrl alt加V得到我们什么?Pro,然后接下来往那个pro往这里边一扔,你看它就不报错了,对吧?Ctrl out加V得到我们什么producer的一个对象,但是这个东西就能连到我们卡法集群吗?是不是应该在这个地方添加配置信息啊,对吧?啊配置信息,那我们想一下这个配置信息应该要什么内容,首先卡卡齐群是不是要对吧?啊,最重要的就是它。
05:07
最重要的是他来卡不卡那个地址点是不是库对吧,KV形式还记得那个东西吗。对吧,这个东西呢,我们来拿一下,就这。不,哎,这块代码当中又不一样,命令行里边叫对吧,代码里边叫什么。好,我们把它拿过来吧,还是写一个。在这给拿来。啊,在这给他拿过来,那这样吧,我把相关的参数都给这一块呢,都给它拿进来。还有这个。啊,这一套给它拿进来,然后我们来看一下,其实有的东西呢,可以不配,因为它有什么默认值,然后我们把它改一下吧,R。
06:05
Pro。DS替换成什么?对吧,啊all。好,没有了这个习惯了,那好第一个这个参数干什么用的。指定连接的。连接的好不好集群对吧?啊,你要指定那个内容好,他这个东西你看。是不是A应答级别,哎,叫应答级别。好,那这个地方第四个叫微。重试的一个什么次数,那我们假如说发送失败,让他多试几次,对吧,保证它成功好接下来第五个叫ba.s批次的大小,一次发送多少大小的数据,这个是字节,这个16K。
07:03
16K啊,你懂啊,16K好,第六个叫。等待的时间默认一毫秒。那这个什么意思呢?因为这不是批次太小吗?假如说我现在就发送一条数据,难道就发送不进去了吗?因为我一条达不到16K吧,所以还有另外一个条件来控制。就是说要不然你到了16K我会发送,或者说你到了一毫秒我也会发送两个东西来复制的,要不然如果说你只按这个P3,我就发送一条数据写不进去了吗?有问题吧,如果说没有时间控制对吧?啊,它一毫秒之后他也会去提交啊,也会提交,还有第七个这个东西叫什么。Call a culator。这个是32兆,33554432,这个东西熟吗?不熟吗?你们在那个呃哈多宝提交流程里面不是看到这个数字吗?本地的模式里面不是32兆吗,一个块。
08:05
334455333554432这个值这是32兆啊,32兆好,这个指的是8.memory也是一个什么缓冲区的大小,那问题来了。你这边不是16K发送一次吗,怎么这。这个不是说的一个那什么对吧,这个是我到了这么大小,我写到这个里边。对吧,而这个里面最大能存这么多吧,啊存这么多讲这个事啊,这个都是默认值,其实这几个东西呢,可以干什么都不配啊,都不配,都有默认值啊,就是放在这儿,你要知道一下这些东西都可以调,然后这个两个点。这个什么序列化方面的,对吧?因为我们是不是KV都要序列化呀,他传进去是序列化好的,所以要指定序列化类,如果你不是你是double inel,是不是要用相应的一个类型来做这样的一个序列化,对吧?因为我们当前是不是要用的是。
09:08
病啊啊,所以呢,这个是我们所说的key。Value的序列化类。好,那问题来了,之前大家一直所想的一个东西。主席都没有。对吧。主题都没有,是不是这个问题啊,啊主题的没有,来看一下继续这个呢,是第九步,我们写一下叫创建什么。生产者对象对吧?创建生产者对象好,那理论上说我们发送出去,不管有没有主题,假如说有主题我们在添加对吧?好,那正常情况下接下来应该是发送。数据了,对不对,对吧,那发送数据是不是应该拿着这个producer去调用某一些方法呀,那我们看一下producer点。
10:03
调用哪个方法呀。Send吧,你要发送出去,你肯定调用send吧,好,那你send有两个重载的,第一个就是普通的发送一个什么。Produce里边的一个行记录对吧,一个记录,然后接下来第二个。带回调函数的啊,就根据一个反馈是消息,就是这个意思啊,自己给他打印出来啊,可以告诉你这个消息,呃,你当前这个这条数据发送进去了,在我卡夫卡集群里边哪个主题哪个分区啊,Offet是多少可以给你带回来。能听懂,因为你发数据的时候。有吗?就是你发送数据的时候,是你生产者和指定的吗?不是,是卡普卡里边按照位置一个一个的,你进来一下,我给你一个对吧,累加的吧,啊不是你这个,那回调函数call back不是回调吗?回调函数它就可以把你的什么。信息这条数据所有的在卡普塔里边,所有的信息给你带回来啊,做这个事的,那我们现在没必要搞这么复杂,就先用第一个吧,好,第一个好,那这里边他刚才靠着B看到他要一个producer record的一个对象,对吧,那我们就只能干什么,又一个。
11:15
Producer record,对象走好。CRP来这个东西熟吗?是不是上午看到过对吧?啊就是从这截到了啊,就是这截的,那我们之前一直想到topic在哪啊。在这儿吧,啊,在这个在每一行数据里边可以单独指定的啊,单独指定的OK,那这块呢,我们来写一下第一个参数,我们写到。First可以吧,啊,写到first里边第一个参数。好呃,然后第二个必须要一个value,他现在还报错对吧?啊,我们现在也不按K,我现在只来一个value叫那个。这啊,因为发送出去,你发送一条是不太少了,来个什么来个一条啊,那个十条吧,因为好看一点。
12:05
因为100条去消费,咔咔一堆东西对吧。好,这块呢,我们来一个,呃,这爱的归读可以吧,加。哎,这个艾特硅谷应该干什么?加一个可以吧。加个I,那就是挨硅谷零挨的硅谷一,一直到A的硅谷九吧,啊做这个事的,那加一个那个刚刚嘛,啊这样做一个分隔符啊,省能挨在一块,这个里边灰色的说明它可以干什么。不要啊,可以不要啊,是这个意思,好,那这个东西我们发送完了,发送时间数据吧,发送十条数据之后要记得干什么。关闭。四月啊,因为你是一个连接,是不是你一个生产者是不是跟集群做了一个连接啊,啊最后呢,你要把这个连接干什么。啊,关闭掉啊,一定要做关闭掉,好,那我们来看一下这个数据啊,来跑一下,第一我们是不是应该开启一个。
13:06
消费者呢?有消费者没有是吧。这个什么。这个当时的是我开启的消费者是吧。啊。那不管了,我们现在要开启一个什么,呃的一个消费者是不是。到这个卡不卡,里边B卡不卡。看刚刚如不说都可以吧,好多吧号刚刚。First。少了一个什么,呃,对个空格吧。这个就好了,然后呢,我们来看什么。发送数据啊,然后你看一下这个数据,大家想的应该是爱德硅谷零爱德硅谷一,爱硅谷二,爱硅谷三这种方式吧。
14:01
来看一下,当时我们建的是两个分区吗?还是一个分区?那不管了。这是两个分区,两个分区啊,这个地方两个分区,因为我们说了生产者那边是批量发送的。对吧,轮询的方式,零进到一个分区,一进到一个分区,也就是02468跟13579肯定分到两个区就没问题了。它是02468跟13579呢,同时写进去了。就五条数据一次写进去好,一次写进去之后,消费者那边是不是轮询轮许看到这个数据,先看数据,诶有数据了,他消费的时候注意。他有可能先消费零号分区,有可能先消费一号,正好轮询到每个恰好数据来了,对吧,好一轮询到零号分区,他要把零号分区数据都给他干什么拿出来,然后再去拿什么一号分居,所以你看到的消费的结果是什么?
15:09
02468跟13579,当然你自己做测试的时候,可能是13579 02468,这个也有可能,因为恰好假如说这前面发送的零分,这个发送到一号分局。刚才我们所说的轮询的时候恰好。看到这个是不是下一次恰好看到一也有可能啊,啊这也有可能就是你要注意一下啊,它呢是批量发送这块也能证明他批量发送的个问题了,不是一条一条的,如果一条一条的发送,那这边应该是按顺序0123456789对吧?啊批量发送的,因为批量的写到你一个分区里边,然后我消费的时候是不是也是批量消费啊,我不会一条一条的去消费,那就太慢了啊,效率太低了啊这块内容啊,这个呢,我们所说的是他这个批量问题。接下来我们要看一下这个点。一毫秒,这个是。
16:00
一毫秒的一个事,来来,注意我把这个。这个close方法呀,它其实如果说你没到一毫秒。或者说没到16K,最后你准备关闭资源的时候,他会把内存当中的数据给它清掉。会做这个事儿。来,我继续发送。啊,大家觉得这个循环十次要多久啊,代码执行。多少非常快,非常快来怎么证明呢?非常。发现怎么了,我这边代码走完了。我运行了来再运行一下对吧。走完了吧,没有报错吧,这有数据吗?没有数据吧,因为那个瞬间根本就没到。
17:05
就十次。你这边不掉十次吗?没到一毫秒。没到一毫秒,那么之前是怎么能翻呢?是因为你调用了close方法。克的方法呢,它会去调用一些内容,这个是什么?这发送的是什么东西?那不管了,他接收消息接收失败是吧,那我不管了,这个close方法呢,就像我们之前所说的那块,你要用P,不要用杠九,它里面有勾子程序一样,他会把内存的东西干什么给他清掉。能听懂,那这块我们来看一下怎么证明这个问题呢,其实你只要在这块干什么。不是一毫秒就够了,是100毫秒就够了,对吧,来走一下。
18:03
再发了一次吧,就刚才执行的,因为过了什么一毫秒啊,他去发送的,所以呢,你这块呢,切记要你这种不靠谱吧。哪种客服关闭吧啊,他会去做里面资源的一个回收啊,所以呢,资源之前所说的什么爱物流啊,这些东西该关的一定要去关吧啊,一定要记住这个资源一定要关啊,一定要关,因为它里边还要做很多事情。好接下来还有一个问题就是大家有同学可能在想,你这里面这么多参数我怎么记得。对吧,而且还有你看到的这个东西。只是冰山一角,只是冰山一角,它这里面参数。特别多啊,那这个东西呢,不需要大家记,因为你记了会抄错,但是你要记什么东西呢,记一个类。有一个类你要记住,来,我把它替换一下。
19:06
叫produce con。这个里边定义了什么?Produce,所有的参数点,这里面就有我们要的什么。叫的这是不是看大写很不爽吗?因为它是里边的,这是一个常量类,它里边常量,常量不是定义人大写的吗?对吧点。好,我们点进去再点。啊,他还有一个公共的客户端的一个配置信息,对吧,也就是说生产者和消费者都要的。对吧,消费者是不是也要连集群啊,点开你看。跟你写一个字符站是一样的吧,啊,但是你不用记这个字符,还有同样道理,它下面还有一个文档,就是对这个东西呢,做了一个什么解释啊,这个干什么用的,然后呢,你可以他说你可以怎么培养。
20:01
这逗号分开配多个呀,对吧?啊逗号分开配多个,我们这也说了,生产环境当中我们就要配什么多个啊,就要配多个是这个意思啊,也就是说这些参数我们都可以给它替换掉,那这个东西也可以替换掉,对吧,那假如说替换叫producer点。A,来点进来这块它并不是公共的,因为消费者那边我们有提到ad嘛。没有吧,它是生产者独有的,所以呢,其实这里面有三个常量类,一个叫。Producer config,还有一个是我们刚才看到的这个内容。叫公共的客户端。也就生产者和消费者都有的,还有一个你猜一下叫什么名字。对,叫。消费者他也有一个什么。这个内容啊,也有这种,所以呢,你要记得就是这个类吧,还有要记得就大概要配什么东西对吧,配什么东西好,我们看一下这个地方。
21:03
啊,看这个这个解释,他说这个数字表示了一个返回值,跟这个伸伸产者跟谁啊,跟leader之间要接受这个返回值的一个内容吗?那看一下它有三个参数啊,第一个。如果设置为零,那么表示这个生产者将不会等待任何的什么。应答对吧,好,这第一个,第二个我们所说的等于什么。一来找一下等于一的,有人看到吗?先提的,哎,这边是一吗。对吧?啊,这个好,他说,这将意味着leader写入这个record一行数据之后。对吧,写到他本地的什么。Log。但是。不等待所有的什么。Follow落的一个防位置消息对吧,哎,只是干什么。
22:01
离啊,只是干离,OK,那这个时候呢,它去返回消息啊,它就返回消息,这个就是我们所说的仅仅是leader,那接下来还有一个我们当时说的是负一。二是一样的,你看这说了等于二吧,你看先看最后一句。等同于负一吧,啊,所以我当时在文档里面写的是不是负一,后面写了一个O啊啊,你可以啊两个随便随便,那这个表示什么意思,你看一下。他说leader将要等待所有的谁,是不是我们所说的isr队列啊,我们所说的之前提到的副本是不是SR里面的副本啊,哎,他不是说等待真正等待所有的副本,而是等待什么SR啊,SR里边的啊,这块呢,所有的一个解释,你在这一类里边可以找到所有参数的一个解释,因为它这里面定义的变量都是这样定义的。一个什么内容,然后一个什么do,一个什么内容,一个do看见没对吧,一个内容一个do啊,这个do不是给你调的,是给你看的。
23:07
啊,给你看到其实他完全没必要这样做,它可以把这个do放在哪,放在注释里边也可以,对吧?啊但是呢,它是定义人的一个do啊,一个do,因为它有一个什么好处呢,就是说你有时候写代码的时候没写这个内容,或者说。写错了,配置一个错的,他要打印这里面一行数据信息给你告诉你报动信息里面给你卡打印一堆,对吧,说你要配这个内容是干什么事干什么事的,那你又把它定义成变量是不是更好啊,那你要定义成那个注释,你怎么调动能有啊,是不是要自己要写一份就很麻烦嘛啊所以呢,它定义成那正成变量啊变量,所以呢,这个是一个最简单的一个删产整,删产整啊里边所有的东西呢,你都可以替换掉啊,所以呢,我们要记得就是它。要记得就是它其他东西呢,你可以不要记,不用记。
我来说两句