首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

How to create a topic in kafka(2)

前言

上一篇How to create a topic in kafka(1)中的介绍了怎样创建一个topic以及对应的replica-assignment参数的一些使用细节,本文继续来讲述一下自动分配方案的具体算法实现,包括未指定机架的分配策略和指定机架的分配策略。

承接

如果在创建topic的时候并没有指定replica-assignment参数,那么就需要采用kafka默认的分区副本分配策略来创建topic。主要的是以下这6行代码:

第一行的作用就是验证一下执行kafka-topics.sh时参数列表中是否包含有partitions和replication-factor这两个参数,如果没有包含则报出:Missing required argument "[partitions]"或者Missing required argument "[replication-factor]",并给出参数的提示信息列表。

第2-5行的作用是获取paritions、replication-factor参数所对应的值以及验证是否包含disable-rack-aware这个参数。从0.10.x版本开始,kafka可以支持指定broker的机架信息,如果指定了机架信息则在副本分配时会尽可能地让分区的副本分不到不同的机架上。指定机架信息是通过kafka的配置文件config/server.properties中的broker.rack参数来配置的,比如配置当前broker所在的机架为“RACK1”:

最后一行通过AdminUtils.createTopic方法来继续创建,至此代码流程又进入到下一个无底洞,不过暂时不用担心,下面是这个方法的详细内容,看上去只有几行而已:

总共只有3行,最后一行还是见过的,在使用replica-assignment参数解析验证之后调用的,主要用来在/brokers/topics路径下写入相应的节点。回过头来看第一句,它是用来获取集群中每个broker的brokerId和机架信息(Option[String]类型)信息的列表,为下面的 AdminUtils.assignReplicasToBrokers()方法做分区副本分配前的准备工作。AdminUtils.assignReplicasToBrokers()首先是做一些简单的验证工作:分区个数partitions不能小于等于0、副本个数replicationFactor不能小于等于0以及副本个数replicationFactor不能大于broker的节点个数,其后的步骤就是方法最重要的两大核心:assignReplicasToBrokersRackUnaware和assignReplicasToBrokersRackAware,看这个名字也应该猜出个一二来,前者用来针对不指定机架信息的情况,而后者是用来针对指定机架信息的情况,后者更加复杂一点。

未指定机架的分配策略

为了能够循序渐进的说明问题,这里先来讲解assignReplicasToBrokersRackUnaware,对应的代码如下:

主构造函数参数列表中的fixedStartIndex和startPartitionId的值是从上游AdminUtils.assignReplicasToBrokers()方法调用传下来,都是-1,分别表示第一个副本分配的位置和起始分区编号。assignReplicasToBrokers这个方法的核心是遍历每个分区partition然后从brokerArray(brokerId的列表)中选取replicationFactor个brokerId分配给这个partition。

方法首先创建一个可变的Map用来存放本方法将要返回的结果,即分区partition和分配副本的映射关系。由于fixedStartIndex为-1,所以startIndex是一个随机数,用来计算一个起始分配的brokerId,同时由于startPartitionId为-1,所以currentPartitionId的值为0,可见默认创建topic时总是从编号为0的分区依次轮询进行分配。nextReplicaShift表示下一次副本分配相对于前一次分配的位移量,这个字面上理解有点绕,不如举个例子:假设集群中有3个broker节点,即代码中的brokerArray,创建某topic有3个副本和6个分区,那么首先从partitionId(partition的编号)为0的分区开始进行分配,假设第一次计算(由rand.nextInt(brokerArray.length)随机)到nextReplicaShift为1,第一次随机到的startIndex为2,那么partitionId为0的第一个副本的位置(这里指的是brokerArray的数组下标)firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length = (0+2)%3 = 2,第二个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?,这里引入了一个新的方法replicaIndex,不过这个方法很简单,具体如下:

继续计算 replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0。继续计算下一个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以partitionId为0的副本分配位置列表为[2,0,1],如果brokerArray正好是从0开始编号,也正好是顺序不间断的,即brokerArray为[0,1,2]的话,那么当前partitionId为0的副本分配策略为[2,0,1]。如果brokerId不是从零开始,也不是顺序的(有可能之前集群的其中broker几个下线了),最终的brokerArray为[2,5,8],那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题,可以简单的假设brokerArray就是[0,1,2]。

同样计算下一个分区,即partitionId为1的副本分配策略。此时nextReplicaShift还是为2,没有满足自增的条件。这个分区的firstReplicaIndex = (1+2)%3=0。第二个副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三个副本的位置replicaIndex(0,2,1,3) = 2,最终partitionId为2的分区分配策略为[0,1,2]。

以此类推,更多的分配细节可以参考下面的demo,topic-test4的分区分配策略和上面陈述的一致:

我们无法预先获知startIndex和nextReplicaShift的值,因为都是随机产生的。startIndex和nextReplicaShift的值可以通过最终的分区分配方案来反推,比如上面的topic-test4,第一个分区(即partitionId=0的分区)的第一个副本为2,那么可由2 = (0+startIndex)%3推断出startIndex为2。之所以startIndex随机是因为这样可以在多个topic的情况下尽可能的均匀分布分区副本,如果这里固定为一个特定值,那么每次的第一个副本都是在这个broker上,进而就会导致少数几个broker所分配到的分区副本过多而其余broker分配到的过少,最终导致负载不均衡。尤其是某些topic的副本数和分区数都比较少,甚至都为1的情况下,所有的副本都落到了那个指定的broker上。与此同时,在分配时位移量nextReplicaShift也可以更好的使得分区副本分配的更加均匀。

指定机架的分配策略

下面我们再来看一下指定机架信息的副本分配情况,即方法assignReplicasToBrokersRackAware,注意assignReplicasToBrokersRackUnaware的执行前提是所有的broker都没有配置机架信息,而assignReplicasToBrokersRackAware的执行前提是所有的broker都配置了机架信息,如果出现部分broker配置了机架信息而另一部分没有配置的话,则会抛出AdminOperationException的异常,如果还想要顺利创建topic的话,此时需加上“--disable-rack-aware”,详细demo如下:

assignReplicasToBrokersRackAware方法的详细内容如下,这段代码内容偏多,仅供参考,看得辣眼睛的小伙伴可以习惯性的忽略,后面会做详细的文字介绍。

第一步获得brokerId和rack信息的映射关系列表brokerRackMap ,之后调用getRackAlternatedBrokerList()方法对brokerRackMap做进一步的处理生成一个brokerId的列表,这么解释比较拗口,不如举个demo。假设目前有3个机架rack1、rack2和rack3,以及9个broker,分别对应关系如下:

那么经过getRackAlternatedBrokerList()方法处理过后就变成了[0, 3, 6, 1, 4, 7, 2, 5, 8]这样一个列表,显而易见的这是轮询各个机架上的broker而产生的,之后你可以简单的将这个列表看成是brokerId的列表,对应assignReplicasToBrokersRackUnaware()方法中的brokerArray,但是其中包含了简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似,同样包含startIndex、currentPartiionId, nextReplicaShift的概念,循环为每一个分区分配副本。分配副本时处理第一个副本之外,其余的也调用replicaIndex方法来获得一个broker,但是这里和assignReplicasToBrokersRackUnaware()不同的是,这里不是简单的将这个broker添加到当前分区的副本列表之中,还要经过一层的筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表之中:

如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他的机架中没有任何一个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) racksWithReplicas.size == numRacks)

如果此broker中已经拥有该分区的副本,并且还有其他broker中没有该分区的副本。对应代码中的(!brokersWithReplicas.contains(broker) brokersWithReplicas.size == numBrokers))

无论是带机架信息的策略还是不带机架信息的策略,上层调用方法AdminUtils.assignReplicasToBrokers()最后都是获得一个[Int, Seq[Int]]类型的副本分配列表,其最后作为kafka zookeeper节点/brokers/topics/节点数据。至此kafka的topic创建就讲解完了,有些同学会感到很疑问,全文通篇(包括上一篇)都是在讲述如何分配副本,最后得到的也不过是个分配的方案,并没有真正创建这些副本的环节,其实这个观点没有任何问题,对于通过kafka提供的kafka-topics.sh脚本创建topic的方法来说,它只是提供一个副本的分配方案,并在kafka zookeeper中创建相应的节点而已。kafka broker的服务会注册监听/brokers/topics/目录下是否有节点变化,如果有新节点创建就会监听到,然后根据其节点中的数据(即topic的分区副本分配方案)来创建对应的副本,具体的细节笔者会在后面的副本管理中有详细介绍。

既然整个kafka-topics.sh脚本的作用就只是创建一个zookeeper的节点,并且写上一些分配的方案数据而已,那么我们直接创建一个zookeeper节点来创建一个topic可不可以呢?答案是可以的。在开启的kafka broker的情况下(如果未开启kafka服务的情况下创建zk节点的话,待kafka启动之后是不会再创建实际副本的,只有watch到当前通知才可以),通过zkCli创建一个与topic-test1副本分配方案相同的topic-test6,详细如下:

这里再来进一步check下topic-test1和topic-test6是否完全相同:

答案显而易见。前面的篇幅也提到了通过kafka-topics.sh脚本的创建方式会对副本的分配有大堆的合格性的校验,但是直接创建zk节点的方式没有这些校验,比如创建一个topic-test7,这个topic节点的数据为:{"version":1,"partitions":{"2":[0,1],"1":[1],"3":[1,0],"0":[0,1]}},可以看出paritionId为1的分区只有一个副本,我们来检测下是否创建成功:

我们上篇文章中知道kafka-topics.sh内部就是调用了一下kafka.admin.TopicCommand而已,那么我们也调用一下这个可不可以?Of course,下面举一个简单的demo,创建一个副本数为2,分区数为4的topic-test8:

可以看到这种方式和kafka-topics.sh的方式如出一辙,可以用这种方式继承到自动化系统中以创建topic,当然对于topic的删、改、查等都可以通过这种方法来实现,具体的篇幅限制就不一一细表了。

后语

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180207G064OJ00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券