00:00
好,那那个生产者的代码主要的就是这一些啊,但是你要知道的一个点就在于这个地方。就是这里面数据问题吧。伸展环境当中,就是你从某一个地方给他数据读过来,然后封装进来吧,啊不是说在这里边自己随便造的啊,就是这个地方不同,其他的代码都一样啊,其他代码都一样,OK,呃,伸展转呢,主要就是我们所讲的这些点啊,这些点大家一定要切记了,最后要关闭吧,之前没关闭是不是没发送出去对吧,因为今晚已经退出了啊,你没有到一毫秒他就不会发送出去,那这个呢,是我们所说的一个生产者的代码,那接下来呢,我们再聊一聊什么。消费者啊,消费者,那在这来一个什么consumer这个包啊这类吧,还是同样的写一个最简单的一个消费者consumer点叫MY可以吧。确定啊,同样的既然是一个代码的话,有。看方法啊,作为一个程序的入口,程序的入口,那同理之前。
01:05
我们写生产者的时候,是不是扭了一个卡夫卡producer呀,对吧,那那个才能跟去交互吧,跟卡法进行交互,那我们消费者呢。是不是应该也有一个类似的客户端呀,那通利可得他这个客户端应该叫什么。卡夫卡consumer对吧,哎,卡不卡consumer来点啊,这里边呢,这个地方同样的是泛型,那这个泛型还是什么。数据的KV类型。啊,只不过说前面是什么。带发送的这个是你读到的数据吧,啊读到的数据K类型好,那这里面呢,括号B看一下。跟刚才生产者那边一样吗?啊,还是要那些参数,但是有一个问题,就是具体的它参数里边内容一样吗。不一样吧,啊,首先第一步还是来创建什么。消费者的什么配置信息对吧,啊配置信息。
02:03
OK,直接用一个这个。Java优秀包下的加V。得到这个返回值啊好,那接下来应该是给配置信息来干什么,赋值对吧,给。配置信息。副职,哎,副职。复值点,那这个赋值我们就不用去写那个自己写字符串了,对吧,我们知道它有一个东西叫consumer con对吧?啊consumer好,那第一个我们要用的是。Consumer con,哎,在这个地方点。那你看一下。首先能看到的这些东西。反序列话看到没。对吧,啊,第一次来吧,诶这不很合理吗?前面发送的是序列化,你存到里面是一个已经序列化的,那你要读出来,你能正常的能反馈给你,是不是用相应的要用反序变化呀,对吧,要有反序列化啊这个呢,我们等会要添加,那第一个最重要的应该是它。
03:11
叫的。Service对吧,连接的集群信息,哎,这个东西大家都要有,那我们还是写下哈,多宝1028号9092对吧?啊9092第一个,那接下来还有一些我们给他填一下,呃,点put来还是用consumer com对吧?啊,你不用去记里边的具体的参数啊,记里的参数叫什么东西,那这块呢,有一个点在这。Outer。我看一下。自动提交他密的,哎,这个地方这啊自动提交了一个什么。来看一下。
04:05
自动提交的一个延迟,对吧?啊,自动提交一个延迟,它默认呢,它这个自动提交,那这块我是读数据自动提交,提交什么呀。提交谁啊,提交赛对吧,因为我消费到十了,我是不是应该把这个十存到某一个地方啊,啊提交的就这个off s啊默认呢,它是1000这个值啊1000,那我们还用默认值吧,1000是毫秒对吧,那就是什么。一秒啊,一秒钟去提交一次啊,一秒钟去提交一次,这里这个自动提交它默认的也是自动的,我们来找一下这个自动开启,开启自动提交的啊不的,那我们直接点进来,点之后呢,找一个自动提交com对吧,CR f Co。妈妈,Come。哎,这个地方。叫enable对吧,啊叫enable是这个内容啊,那我们来干什么。
05:11
Me?Con,哎,我就说为什么他还是不提示点叫,刚才我们看到那个东西叫什么。叫,诶在哪呢。这属性怎么一下子忘了叫enable是吧,应该这个地方。自动提交好这个呢,要给一个什么处啊,给一个处自动提交,那这样吧,把这个提交前面去是吧?啊再可以先打开这个自动提交,然后呢,写自动提交的一个延迟对吧?啊延迟那每一个呢,我们注释一下,第一个是连接的。集群对吧,啊连接的集群第二个是。开启,自动停掉。啊,开启自动提交。
06:00
然后自动。提交的是吧。延迟啊延迟。啊,就是这意思,那最后呢,我们再讲两个东西,就是KV的什么。反系列化对吧,K。Valued。反序列化。啊,把学的放的类,那这个地方点铺,这里面同样的还是consumer conflict点什么。K,第level对吧?啊第次level OK,那这个之前我们用的是level,这个是第次level,那其实就是跟之前的反过来啊,不是也不是反过来啊,就是全类名还是在同一个类下面,但是呢,它前面要一个string d level能听懂啊,其实我们说一下叫string。DE。这是code level SDR。在这。
07:01
注意看一下包啊,就是你在这里面找到了,不要看到这个东西就直接去拿看一下包对吧,我看一下包走这个里边呢,就有什么。第一,我们直接把它拿过来copy reference对吧,反序列化的一个类啊,反序列化的一类好。反吸的话第嘛啊,第一是的反击的话啊,同样的点对吧,啊还是consumer。它这个点叫Y6低来对吧。啊,这个我是不是可以直接拿过来,这个没问题吧,啊直接拿过来好,那其实KV的反新化就搞定了啊,还有一个东西,还有一个东西。消费者租啊这个东西不要忘了啊,因为你在控制台里边的时候,它默认随机分配一个这块没有人给你分配啊,给分配啊点的。Consumer con,点组跟组相关的,那我们搜一下这个group group ID com,那其实或者你写什么。
08:07
group.id我们在配置文件里面它是不是group ID等于。什么硅谷test什么。这个test叫叫叫cons group对吧,默认值对吧,它是不是用这个直接写这个的,对吧?啊其实你直接写这个是一样的,那我们这这个组呢,呃,我们来一个什么呢?来一个不存在的组,第个data这个组不存在吧,啊不存在好然后这块就搞定了,搞定之后呢,我们往这里放。往放好,那接下来有个问题,你这里面没有写什么。主题发现没?对吧,啊创建消费者,那我们想一下。创建这个主主题。我们是不是有可能在这里面去定义一个。对吧,我们找一下点。
09:01
因为我们的想法,它不是配置信息嘛,对吧,它应该是在这里边负的,那应该就是我们所讲的跟主题相关的,点进来应该跟主题相关的东西呢,叫肯定有topic吧,他不管叫什么名字有topic,我们找一下有没有这种属性啊。这个叫什么?还记得这个参数吗?不包含内部的一个什么topic对吧,啊也这意思,这个昨天之前我们不是配了吗。对吧,啊读为了读取什么?对,为了读取下划线,下划线consumer of这个主题呢,也就是说既然它这个参数在这个代码里面能看见,也就我的代码里边是不是照样能读啊。啊,那我们说一个事情啊,就是你在命令行里面做的事,代码里面一定能做。因为你代码就是命令好,最终还是调用的代码吧,还是一定会走代码的,OK,那这个是这个内容跟我们没关系吧,我们继续走。
10:05
其实已经没了,这下面的都在定义了,又又到上面来了,也就是说这个地方啊,不是在这配的。啊,不是在这配的,而且我们想一下啊,就关于这些东西啊,啊关于这个是动态传进去的呢,其实它也有默认值动态传进去,类似于这个组啊。啊,我们想一下这个组。这一段是不是在consumer配置文件里面配的,但是主题一般,我们在命令行里面在干什么?去写的对吧,啊,在那个配置文件里面不用写限定一个主题,所以呢,主不是在这块进行的。啊,不是在这块,而是像伸展者一样,伸展者的主题是不是在对伸展者这个对象进行操作的时候才要传主题,还有吗?对吧,里面传一个什么producer recorder,是不是在那个里边定义的主题那一样的,你有了生产者之后再聊什么。
11:00
主题的事儿啊,再聊主题的事儿,好,那接下来其实就是我们所说的要订阅。主题了啊点。订阅。啊,订阅主题好,那这个里边呢,它要一个什么。要一个集合,你看那个集合。叫topic对吧,啊叫topic斯,那也就是说当前这一个消费者可以订阅多个主义吗。可以啊,可以订阅多个主题是这个意思啊,测试环境当中呢,我们之前是不是测试都订阅一个呀,啊订阅一个这个呢,其实可以干什么。订阅多个啊,可以订阅多个,OK,那我们点进来,他要一个connection。我在这里边用一个。Every。可谓。对吧,假如说我这个主题呢,只有一个。As。
12:02
呃,思瑞,我们看一下那个方法,我把它变成一个集合。点。Three。他有一个。a.as list对点as list对吧?啊,这个地方呢,可以将一个字符串把它干什么,把它变成一个。就好。啊,变成一个集合OK吧,啊,集合大类下的。那这块你看他啊。这个就提示有优化空间啊,因为有修改的一个写法,因为它这个发现呢,你这个地方只有一个什么。值对吧,哎,这里面只有一个值,所以呢,可以把它变一下。奥,大家回车啊,就是这种黄色的标的呢,你个头把它变一下,如果你有多个值的话,就不会出现这种情况啊,不会出现,那我们看一下,假如说这个里边我们有,我们除了订阅first,我们再订阅一个什么。
13:10
Second可以吧,那这个时候就不会了,对吧?啊刚才呢,是一个叫collections.list的话啊second好,那我们就定义两个主题,First加second啊for加second,不管second有没有,我们先订阅了再说对吧?啊订阅了再说,订阅完主题之后CTR加V,诶发现呢这个东西啊返回值是什么?因为正常的之前我们看到二加V的时候,是不是给我们防御值,这个时候抗到二加V,它提示。Word所以呢,订阅主题这个地方呢,就是简单的给他添加了一个属题,是这种感觉,不是说订阅主题之后,直接给你返回你要的主题里面内容啊,返回主题内容呢,还要我们自己来干什么。获取数据啊考点。
14:05
拉取数据,这里面有一个延迟,我不知道大家还记不记得这个延迟表示什么意思。他如果一直一直拉的话,有可能是空的。就是在那个时间不允许。对,就是我们之前提到一个点卡不卡呢,它是采用拉取的这种模式来获取的,对吧,他一直要维护这个什么。长轮询,这个长轮巡一旦没有数据的时候,是不是等于浪费资源啊,如果说发现有一次没拉取到数据,那我这个延迟时间是不是要长一点,对吧?啊,这个就是给这个延迟时间了啊,就做这个事的啊,做这个事的拉取时间啊,OK,那好。然后呢,我们给他一个毫秒对吧,给一百万一百好ctrl al加V。来看一下它的返回值。呵呵。
15:00
返回值类型叫consumer。生产者那边发送数据叫什么,叫produce对吧?啊叫produce recorder,你要注意啊,生产者我们打开一下啊,随便打开一个这个地方叫produce recorder,然后这个地方呢,叫consumer recorder。说明什么问题啊,是不是一次拉取会拉取多个值啊,啊是批量获取的,你不能一条一条的获取,这个效率太低的吧?啊效率太低了,也就是说这个里面呢,返回值是什么?多个数据,那你要多个数据的话,这不好内容了,所以呢,接下来我们应该干什么解析。并打印。Consumers吧,啊,你是多个啊,便利要便利好,那我们便利consumers点。For对吧,啊for循环好,那你看现在他就把这个什么。
16:01
S没了吧,因为你对SD变类,那它可能就是consumer record,就是你看这两个,这是两个类对吧,那不用想他们两个类的关系也知道啊,实际上就是它那个迭代类型对吧?啊,它那个迭代类型啊,那接下来我们要解析的是不是consumer record里面内容。那就拿着它就调一掉相应的方法呗,对吧,Consumer record点来看。第一个K,第二个分区,第三个topic valueet什么头信息乱七八糟的一堆,对吧?啊,其实我们在那个回调函数里面是不是也能看到类似的啊,而且这一块重点要关注的是这个点。K能获取到,所以说这个东西要不要存到开发集群要吧啊,他都要存下去啊,要存下去不是说它那个K简单的只是为了给我们分区用的,而是要存下去的,你不存下去,你这个地方能获取到吗?
17:00
肯定获取不到的啊,是这个意思,要关注啊,就不光他只因为在控制台里边我们传了K,它打印的是value,因为控制台那行叫什么卡夫卡cons consumer这个东西也是一个具体的类吧,那个类里面写的,那我如果说这块我只写打印它。那不是跟控制台是一样的吗?对吧,能理解这个事儿啊,那我们吧,就这样啊Co,那我们打一下啊,它里边呢这个K。啊k.so对吧,So反应一下这个K,然后再加上我们要的一个什么。Value啊,加上value叫consumer。没考的点。Y啊点Y6,那我们正常的是不是这样去打印啊,打印之后呢,你要关注一下把这个什么。要关闭吗?啊,正常的,我们应该关闭一下吧,我们先给它关闭一下,叫关闭什么连接,是不是关闭连接点啊好,那我们来测一下。
18:09
那大家觉得这个能成吗?我一开始。他就怎么样了?啊,我一开启它就会怎么样啊。我一开启,什么都没有关了。关了吧,对吧,哎,也就是说这个地方注意这个关啊,不是说我们非得调用这个,你不不调用这个它也会关。因为是什么程序走完了就关了。对吧,啊程序,所以这块呢,我们应该给他阻塞起来,让他不停的循环的去调用这个方法吧,因为你拉取一次之后,是不是有新数据过来了之后,你还要重新去拉取这个数据。
19:06
对吧,所以呢,很简单,我们把它写在一个。把这一套东西放在这个里边,既然你这是死循环了,这个代码就。多余了对吧,诶正常情况下是不是这样,你这个消费者一开启的时候,除非你CTRLC给它干掉,或者说你Q杠九给它关闭掉的时候,它才会关掉。对吧,也就是说这个生产者一旦开启就不要去关了,除非你想关闭的时候用Q把它关闭掉,那这个时候我们来看一下点开。干什么阻塞了吧,啊,一直在运行过程当中,一直在运行过程当中,好那这个时候我们只需要去干什么事,开一个对吧,开一个什么一个生产者,那这样啊,我也不开了,我这边有吗。
20:02
我这不有代码吗?这个是不是往first发呀,对吧。深圳的我用这个吧,啊,其实你也可以看到那个内容。点开啊,这边发送完了。是不是消费了?K是什么?那没有K吧,因为没有指定K对吧?啊,当然你可以指定一个K,那我们来指定一个K。那在这是不是加一个K啊啊加一个K逗号K呢,我来一个挨着硅谷吧。硅谷好,然后呢,我们把这个再跑一下。走。还有走完,走完之后呢,我们来看一下。K是不是打印出来了对吧?啊,可以获取到它的一个K啊,是这个意思啊这意思,而且我们发现啊这个second,我们有second这个主题吗。卡不卡逃避斯,然后刚刚。宝哈多宝028号二幺刚刚。
21:02
有三嘛,有三呢是吧?啊,那还不能证明这个问题。来看一下这个地方有什么a data second,没有的吧,对吧,我来一个什么。是。来再启动。启动了对吧,阻塞了吧,生产数据点。生产完了。订阅一个不存在的主题也可以吧,啊也可以啊看而且看不到错吧,其实有一个警告啊,一个警告,你可以把logo附件给它加进来,你能看到那个警告啊,能看到那个警告,有一个警告说那个东西呢,不存在连接不上啊,有个警告,但是不影响我们其他的一个什么。主题的获取啊,也就是说你假如说真的有一个topic写错了,他也不会报错啊,他也不会报错对吧,因为刚才我们看到色肯定没有吧。
22:02
对吧,刚才我们看到了白色的没有啊色的没有绿色。我说了自己创造是什么。生产者、消费者并不会去建造一个。消费者他不会呀。伸展者,如果说它不存在的话。数据有地方放吗?没有地方放嘛,他就会给他默认创建一个,但是消费者你消费了一个不存在的主题,那无非。拉不到数据呗,你就算自己创建了一个新的,你还是一样的道理吗?还不是拉不到数据嘛,对吧,啊所以呢,他就消费者不会去创建,消费者不会创建,生产者他会去创建啊消费者这意思。
我来说两句