00:00
你好,我是杨思正,今天我们开始看一下kafka producer的相关内容。卡普卡自定义了一套网络协议,我们可以使用任意语言来实现这一套协议,实现向卡夫卡集群push message以及从卡夫卡集群拉取消息的效果。在卡卡2.8.0版本的源码中,Clas模块是官方默认提供了Java版本的producer consumer的实现,我们本课时重点关注其中的producer部分。按照国际惯例,我们首先来写一个DEMO,带领同学们了解一下kafka producer的基本使用,具体的示例代码已经放到了文稿之中。这里我们直接在卡夫卡的源码中写一下这个DEMO。首先我们创建一个慢方法,在其中创建一个proper对象,然后添加卡夫卡brokeer集群的地址配置,具体的配置名称是bootstrap.service指定的是本地源码启动的这个地址,然后添加AX这个配置,这个配置指定了卡夫卡集群响应之前。
01:05
需要有多少副本成功复制了发送过去的message?All表示的是isr集合中全部的副本都已经成功复制了这条消息才会给producer返回相应的响应。同学们如果想知道这些配置的其他配置选项的含义,可以参考官方的文档,我们这里找到卡夫卡官网的doc,然后找到它的producer conflicts,在其中搜索X,我们就可以看到它相关的配置值,以及这些配置值的具体含义。例如,我们前面配置的all就是下面这个含义,Leader将等待全部的isr集合中的副本复制了这条消息才会返回响应。接下来指定message k和value的序列化器,这两个序列化器负责将KV序列化成字节数组。指定完相关的配置之后,接下来创建卡夫卡R对象,传入上面创建的对象。接下来写一个for循环,这个循环会执行十次,在每一次循环中都会创建一个producer record对象。
02:09
其中记录了该message的目标topic以及key和value值。接下来调用前面创建的kaka producer.s的方法将消息发送到KA普卡集群。我们注意send的方法,第一个参数是前面创建的producer record对象。第二个参数传入的是一个匿名的combat对象,当producer接受到卡夫卡集群发来的A确认消息之后,会调用其oncomletion方法完成回调。我们再来看send方法的返回值。它返回的是个future对象,由此我们可以知道send的方法是异步发送。如果需要同步发送的话,我们直接调用返回future的get方法即可。这个get方法返回的是record data,其中包含了该message落到了哪个partition上,以及这条message被broker分配的offset是多少。看完producer DEMO的代码之后,首先来执行kaca council council.she命令,启动一个命令行consumer。
03:15
启动命令行consumer完成之后,我们再来执行producer DEMO。我们在控制台中可以看到如下的输出,这些输出就是卡普卡broker返回的record date信息。我们来到控制台consumer,可以看到producer发送来的信息。了解了卡夫卡producer的基本使用之后,我们开始深入producer的架构进行分析。千言万语都不及一张图,下图就展示了producer的核心架构,这里描述一下这张图中涉及到的核心组件。在这里涉及到两个线程,一个是我们的业务线程,也就是图中的主线程。
04:05
另一个是sender线程,我们一个一个来说,首先是主线程的逻辑,主线程会调用卡卡producer.send方法发送数据,在send方法中,首先会通过producer intercepts对message进行过滤或者是修改。然后通过序列化器对message的key和value进行序列化,然后通过partitioner,根据一定的策略为message选择合适的partition,将封装成produce record写入到record accumulator中进行暂存。在record accumulator对象中维护了多个队列,我们可以将它看作是message的缓冲区,它用来实现的批量发送。接下来我们再来看ther线程的逻辑。sender线程首先从record accumulator中批量获取message数据,构造成client request,然后将client request对象交给network client客户端进行发送,Network client客户端会将请求放入卡普卡channel的缓存中。
05:08
最后执行网络IO将请求发送出去,在network client收到响应之后,调用client request回调函数,最终会触发每个message上注册的call back函数。介绍完卡卡producer的核心架构和流程之后。我们开始深入kaka producer点的方法,也就是主线程的核心逻辑还是开局一张图,后面的都好说,这里描述一下这张图的核心流程。主线程首先会调用producer intercepts.on soon的方法对message进行拦截或者修改,也就是这一条线,然后通过wait on me date方法更新卡夫卡集群的信息,其底层实际上是通过唤醒S线程来更新ma date信息的,Ma data中保存的是卡普卡的原数据信息。接下来分别调用kerializer和valueerializer。对K和value进行序列化,序列化得到K和value对应的字节数组,接下来调用partition方法,确定此次message要发送的目标partition,接下来调用send read only方法,添加read only的头,接下来预估一下此次发送的message大小,然后调用aend方法将message写入到record accumulator中,最后调用wakeup方法唤醒S线程。
06:29
后续就由线程从record acculator中批量发送message到卡夫卡集群了。了解了卡普卡producer点方法的核心流程之后,我们来看一下它的具体实现。首先这里调用了produce intercepts的的方法。然后调用了do的方法。在do的方法中。会调用wait on的方法。获取卡夫卡集群动员数据。然后调用keyizer和value sizer2个序列化器,分别序列化K和value,然后调用partitioner获取目标partition编号。
07:09
然后预估此次发送的message大小。接下来尝试向record accumul中追加message,如果追加失败,还会再次尝试一次,最后唤醒car线程来批量发送message。接下来我们来看producers,其中维护了一个producer intercept集合。R intercepts中的UN的方法,UN acknowledgement方法、UN error方法实际上是循环调用了其中维护的producer intercepts集合的方法。我们可以通过实现producer intercept接口的方法来拦截或修改待发送的message,也可以通过实现unment方法先与用户的callba对卡夫卡集群的响应进行预处理。在我们通过卡卡producer发送message的时候,我们只是明确指定了message要发送到哪个topic中。
08:06
并没有明确指定要写入到哪个partition中。但是,同一个topic的不同partition可能位于卡普卡集群中的不同broke上。所以producer需要明确的知道该topic下所有partition的原信息,也就是这些part所在broker的IP端口的信息,这样才能与partition所在的broker建立网络连接并发送。在卡卡producer中,使用node topic info3个类来记录卡普卡的原数据。首先,Node表示的是卡普卡集群中的note节点,其中维护了这个brokeer节点的host IP port等基础信息。Topic partition用来抽象topic中的一个partition。其中维护了topic的名称以及partition的编号等信息。接下来是partition,它用来抽象一个partition的具体信息。其中leader字段记录了leader副本所在的节点,ID re字段记录了全部副本所在的节点信息。
09:11
INC repl car字段记录了isr集合中所有副本所在的节点信息。卡夫卡producer中会将三个维度的基础信息封装成cluster对象。下面是cluster对象包含的核心数据。首先,Isbot cons字段标识了当前原数据信息,是producer初始化的配置信息,还是启动之后从卡夫卡集群中拉取到的?Note字段记录了卡普卡集群中全部的note集合。下面的这三个字段记录了topic的相关信息,这三个字段按照topic的属性进行了分类,Controller字段记录了卡不卡集群中的controller所在的节点。Partitions by topic partition可以按照partition查询partition的具体信息。
10:02
Petitions by topic可以根据topic查询旗下的partition信息。Available partitions by topic可以根据topic查询旗下全部可用的partition信息。接下来是partition by node,使用这个集合,我们可以根据node ID查询落到其上的partition信息。接下来是no by ID,从名称我们就可以看出,这个集合是用来根据node ID查询node对象。最后两个字段记录了一些唯一标识,第一个是cluster resource记录的是卡夫卡集群的唯一标识topic ID记录了topic以及对应的唯一标识。了解了cluster的数据结构之后,我们再上一层,Lu对象会被维护到ma data中,Ma data同时还维护了cluster的版本号、过期时间、监听器。等基础信息。其核心的数据结构如下图所示。下面我们来看一下data中的核心数据结构。首先,Refresh back off Ms,它用来记录两次更新数据请求的最小时间差。
11:10
默认情况下是100毫秒。为了防止更新操作过于频繁造成卡卡broke的压力,一般涉及到重试操作的时候,都会添加一个这种退避时间对下游系统进行保护。method expel Ms用于记录原数据的失效信息。也就是需要更新原数据的时间间隔,默认情况下是五分钟。update version和request version是原数据的两个版本号。lastfresh Ms记录了最一次尝试更新数据的时间戳,Last successful refresh Ms记录了最近一次成功更新原数据的时间戳。接下来的接下来的这个exception字段记录了更新原数据失败的相关信息。CA字段指向了catch对象。
12:00
MY中储存了更新到的原数据信息。我们可以看到cat中有一个cluster instance字段,它指向的就是我们更新得到的luster对象,下面有两个布尔类型的字段,这两个布尔类型的字段用来标识当前的原数据,是需要全部更新还是需要部分更新?这里还定义了一个监听器,当原数据发生变更的时候,会触发我们自定义的cluster resource listener监听器。我们自定义的这些cluster resource listener监听器就会被记录到这个字段中。最后的这个last c集合是为每个partition记录一个对应的计元,当发生leader副本切换的时候,对应的计元就会增加,此时我们就需要更新原数据。经过上面的分析,我们可以得到下面的这张关系对应图。我们可以看到,Cluster和nod topic partition in four之间的对应关系都是一对多。
13:02
Me catch和cluster之间的对应关系是一对一和me catch。之间的对应关系是一对一。静态数据结构分析完之后,我们再来看kafka producer.on data方法是如何工作的。在我们的业务线程调用went on。Me data方法的时候,首先会从data catch中获取当前缓存的class对象,也就是这里的data.fe方法。我们进入点fe方法就可以看到它请求的是。method.cluster方法。其中读取的就是其中的cluster instance字段。接下来更新date,在producer这个子类中维护了topics这个集合。如果发现新写入的的话,会将topic添加到producer的。
14:01
Topics和new topics结合中。同时还会更新last refresh Ms和need update这两个字段。这两个字段设置之后,会尽快触发原数据的更新。同时还会递增request version版本号。如果是以前存在的topic,这里只会更新它的时间戳。接下来我们调用cluster的partition by topic集合,获取目标topic partition数量。要是目标topic的原数据存在,则直接返回cluster and waittime对象。无需后续的更新操作。如果目标topic原数据不存在。则开始执行下面的循环。首先更新producer ma缓存。然后更新当前update version,并设置相应的标,尽快的触发原数据的更新。
15:01
我们进到点request update for topic这个方法中。我们可以看到,如果是新的topic。这里直接会更新last refresh Ms和need update。同时会增加request version,返回的是update version。如果是已经存在的topic,但是原数据不存在,则会设置need for update字段,然后也是返回update version这个字段。设置完上述的标记之后,这里会唤醒sender线程,由send的线程去更新原数据,接下来就是调用method data.a update方法。阻塞,等待原数据更新。停止阻塞的条件是更新后的update version大于当前的update。如果是超时的话,这里也会直接抛出异常。我们看一下method data.wait object方法,这里会直接调用time.weight object方法,注意第一个参数传入是this,也就是producer这个对象。
16:09
在这里直接对producer对象进行加速,然后调用它的wait进行等待,这里也指定了超时时间。注意wait object方法的第二个参数。也就是condition,每次循环的时候都会检测这个condition条件是否满足,满足的话直接退出循环。这里的contention比较的就是update version。当前线程阻塞完成之后,会再次调用mat.fat方法获取最新的cluster。接下来就是一些统计的信息,计算更新原数据的耗时,然后获取目标topic对应的partition数目。如果原数据依旧不合法,会再次进行循环。
17:01
如果原数据合法,则会将cluster以及相关的耗时封装成luster and waittime对象返回。我们这里的业务线程只是阻塞,等待原数据的更新,具体原数据是如何更新的,我们将在sender线程工作的流程中进行详细的分析。在了解了weight on data方法的核心逻辑之后,我们再来看序列化器的相关内容。分布式系统中各个节点相互通信,必然涉及到内存对象与字节流之间的转化,也就是序列化与反序列化。卡夫卡中的序列化器接口是ializer。他负责将对象转换成字节数组。反序列化器就是DR,负责将字节数组转化成内存中的对象。下面展示了ializer和izer接口的时间类。
18:03
从这张图中我们可以看出,卡普卡自带了常用Java类型的izer实现和disizer实现。当然,我们可以自定义izer和disizer的实现来处理复杂的类型。下面我们以stringializer实现为例,简单说明一下ializer的核心实现。首先是conflict方法,它指定stringizer按照何种编码格式来序列化字符串。使用方法,用来将string转换成字节数组。在wait on data方法拿到最新的原数据之后,就可以开始确定待发送的message要发送到哪个partition。如果我们明确指定了目标。则卡夫卡producer会以用户指定的为准。但是一般情况下,业务并不关心需要写入到topic的哪个,此时就会通过partitioner结合原数据进行计算,得到一个目标partition。
19:04
下图展示了partitioner接口的全部实现,从名字也可以看出default partitioner是默认的实现。其中的partition方法首先会检查message是否有K,如果有K的话,则会取K的哈,然后与partition总数取模,得到目标的partition编号,这样就可以保证同一个K的message进入同一个partition。如果message没有K,则通过sticky catch.partition方法计算目标ition。这里解释一下sticky partition的功能。我们前面介绍整个kafka producer的流程的时候说过,Record accumulator是一个缓冲区,主线程发送的message会先写入到这个record accumulator中。然后sender线程在攒够message的时候会进行批量的发送。触发sender线程批量发送message的条件主要有下面两个方面,第一是message的延迟时间到了,也就是说我们的业务场景对message发送的延迟是有要求的,Message不能一直堆积在produce端。
20:06
我们可以通过linger.ms配降低message的发送延迟。另一个触发条件就是message堆积的足够多,达到了一定的阈值才适合批量的发送,这样批量发送的有效负载才是最高的。我们可以通过调节bat.size这个配置来修改批量发送的大小,默认情况下半点size为16K sticky positionition catch主要实现的是粘性选择,就是尽可能的先往一个positionition发送message。让发往这个partition的缓冲区快速填满,这样的话就可以降低message的发送延迟。我们不用担心出现partition数据量不均衡的情况,因为只要业务运行的时间足够长,Message还是会均匀的发送到每个partition上的。这个效果卡夫卡的开发人员已经验证过了,我们不用担心。下面来看stickition catch的实现,其中维护了一个concurrent map,其中的K是topic value就是当前年度的partition编号。在partition方法中,Sticky partition catch会首先从index catch中获取年柱的partition编号,如果没有,则调用ex partition方法向index catch中写入一个在ne partition方法中。
21:21
会首先获取目标topic中可用的,并从中随机选择一个写入到index catch。最后同学们可能会问,什么时候更新年柱的编号呢?我们来看一下卡卡producer.s的方法中有这么一个代码片段。这里首先像record accumulator中追加,如果当前年度的ition已经没有空间继续写入了,则会进入这个if代码块,更换一个目标partition,再次进行尝试,这里就会调用partitioner.on new batch方法。
22:00
底层就会调用sticky partition catch.next partition,更换年度的partition编号。更新完成之后,就会再次调用record accumulator点的方法写入message。Uniform stickyitioner这个partitioner的底层实现也是依赖sticky partition实现粘性发送的,这里就不再展开介绍了。再来看round Robin partitioner实现。从名字就可以看出,它是按照轮询策略计算目标partition,其中也维护了一个concurrent map,其中的K是topic value是一个递增的atomic integer。在round Ruby partition.partition方法中。首先会查找目标topic的partition数,然后自增对应的a topic in值,然后与partition总数进行取模,这样就可以得到目标partition的编号了。本课时,我们首先介绍了卡夫卡producer的基础使用。
23:03
然后介绍了KA卡producer的核心架构,最后我们深入介绍了KA卡producer点的方法,这也是我们业务线程的核心操作。下一课时,我们将开始介绍kafka producer中record accumulator相关的内容。本课时相关的文章和视频也会放到我的微信公众号以及B站中。感谢大家的观看,我们下节课再见。
我来说两句