专栏首页Spark学习技巧Kafka源码系列之Broker的IO服务及业务处理

Kafka源码系列之Broker的IO服务及业务处理

Kafka源码系列之Broker的IO服务及业务处理

一,kafka角色

Kafka源码系列主要是以kafka 0.8.2.2源码为例。以看spark等源码的经验总结除了一个重要的看源码的思路:先了解部件角色和功能角色,然后逐个功能请求序列画图分析,最后再汇总。那么,下面再啰嗦一下,kafka的角色。kafka在生产中的使用,如下图。

从图中可以看到其主要角色:

1,Zookeeper:Broker需要通过ZooKeeper记录集群的所有Broker,controller等信息,记录Consumer的消费消息的偏移量等信息。

2,Broker:主要负责管理数据,处理数据的生产、消费请求及副本的同步等信息。

3,Topic:标识一个类别的消息。

4,Partition:针对topic进行了进一步细分,增加并发度。牵涉到副本及leader选举。

5,Producer:主要与Broker进行交互,来生产消息到broker。

6,Consumer:主要是从Broker上获取消息,将自己的消费偏移等信息记录与zookeeper。

从各个角色的功能来看,我们整个数据服务请求的中心就是Broker,自然也是由Broker来负责各种事件处理及应答各个部件的。

二,Broker请求及应答机制的实现

在JAVA的网络IO模型彻底讲解的那篇文章里,已经彻底讲解了Java的各种网络IO实现的机制及优缺点。其实,kafka的Broker就是通过JAVA的NIO来实现监听和请求处理及应答的。

主要牵涉到的类:

1),KafkaServer

该类代表了一个kafka Broker的生命周期,处理kafka启动或者停止所需要的所有功能。

2),SocketServer

一个NIO 服务中心。线程模型是

1个Acceptor线程,用来处理新的链接请求

N个加工Processor线程。每个线程拥有一个他们自己的selector,主要负责IO请求及应答。

3),KafkaRequestHandler

实际会在KafkaRequestHandlerPool中创建多个对象,负责加工处理request线程。

会创建M个处理Handler线程。负责处理request请求,将responses重新写会加工线程Processor,以便于其写回给客户端。

4),RequestChannel

该类主要是封装了requestQueue,responseQueues,responseListeners,便于个各类中同时引用并作出自己的处理。

5),KafkaApis

Kafka多样请求的逻辑处理程序。

具体如图:

下面讲解1,2,3,4,5,具体含义:

1,SocketServer.startup(),会启动一个后台线程,该线程会持有一个acceptor,负责接收新的链接请求,并轮训所有的Processor,将新的链接请求加入Processor对象的成员变量ConcurrentLinkedQueue里,Processor会在其run方法里面处理。

// start accepting connections
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup

Processor池的初始化

for(i <- 0 until numProcessorThreads) {
 processors(i) = new Processor(i, 
 time, 
 maxRequestSize, 
 aggregateIdleMeter,
 newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
 numProcessorThreads, 
 requestChannel,
 quotas,
 connectionsMaxIdleMs)
  Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}

accepttor轮训Processor

val ready = selector.select(500)
if(ready > 0) { 
 val keys = selector.selectedKeys()  
 val iter = keys.iterator()
 while(iter.hasNext && isRunning) { 
    var key: SelectionKey = null 
    try {      
      key = iter.next    
        iter.remove()     
         if(key.isAcceptable)       
           accept(key, processors(currentProcessor))  
               else      
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")    
                    // round robin to the next processor thread  
                        currentProcessor = (currentProcessor + 1) % processors.length    }
                         catch {      
                         case e: Throwable => error("Error while accepting connection", e) 
                            }

2,Processor的run方法里面,会针对可读事件调用read方法里将request请求信息通过requestChannel.sendRequest(req)添加到RequestChannel的成员变量里面。

requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)

3,在KafkaServer的startup方法里面构建KafkaRequestHandlerPool对象的时候,会构建若干handler线程。

for(i <- 0 until numThreads) {
 runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
 threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
 threads(i).start()
}

在KafakRequestHandler的方法里面会对我们的request进行处理

req = requestChannel.receiveRequest(300) apis.handle(req)

实际上,是通过KafkaApis对象的handle方法进行各种逻辑的处理的。

def handle(request: RequestChannel.Request) {
 try{
    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
    request.requestId match {
 case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
 case RequestKeys.FetchKey => handleFetchRequest(request)
 case RequestKeys.OffsetsKey => handleOffsetRequest(request)
 case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
 case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
 case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
 case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
 case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
 case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
 case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
 case requestId => throw new KafkaException("Unknown api code " + requestId)
    }
  } catch {
 case e: Throwable =>
      request.requestObj.handleError(e, requestChannel, request)
      error("error when handling request %s".format(request.requestObj), e)
  } finally
 request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}

4,在每一种请求处理结束之后会产生对应的response

requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))

并将response存储到RequestChannel的responseQueues存储。

5,最终,由我们的Processor在其run方法里面,取出RequestChannel的responseQueues存储的时间,匹配到写事件,然后通过其write方法对具体的request进行应答。

else if(key.isWritable)
  write(key)

三,总结

这是一个典型的Reactor多线程模型,并且实现了IO线程和业务线程进行隔离。这样做的优点有以下几种:

1,充分利用资源

可以充分利用CPU资源,增加并发度,使业务响应速度加快。

2,故障隔离:

业务处理线程,无论是处理耗时,还是发生阻塞,都不会影响IO请求线程。保证服务器能在某些业务线程出故障的情况下,正常进行IO请求应答。

3,可维护性

职责单一,可维护性高,方便定位问题。

此处再次建议大家仔细阅读,浪尖关于JAVA的网络IO模型彻底讲解那篇文章,彻底领会其意境。

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪尖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-06-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 基于Spark Mllib的文本分类

    基于Spark Mllib的文本分类 文本分类是一个典型的机器学习问题,其主要目标是通过对已有语料库文本数据训练得到分类模型,进而对新文本进行类别标签的预测。这...

    Spark学习技巧
  • 面试|再次讲解Threadlocal使用及其内存溢出

    浪尖整理本文主要是想帮助大家完全消化面试中常见的ThreadLocal问题。希望读懂此文以后大家可以掌握(没耐心的可以直接阅读底部总结):

    Spark学习技巧
  • 美团面试题:JVM堆内存溢出后,其他线程是否可继续工作?

    最近网上出现一个美团面试题:“一个线程OOM后,其他线程还能运行吗?”。我看网上出现了很多不靠谱的答案。这道题其实很有难度,涉及的知识点有jvm内存分配、作用域...

    Spark学习技巧
  • form表单提交与ajax消息传递

    后台: def index(request): if request.method == 'POST': print(request.POST) # 普通...

    小小咸鱼YwY
  • Kafka初始化流程与请求处理

    扫帚的影子
  • Flask之请求和响应

    人生不如戏
  • 什么是线程死锁?如何避免死锁?

    多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。

    崔笑颜
  • IEEE 公布 282 位新增Fellow,华人占比28%,37位来自中国

    刚刚,国际电气和电子工程师协会(IEEE)公布了 2020年新晋 Fellow 名单。

    AI科技评论
  • R语言提取PDF文件中的文本内容

    综上步骤,我们便可以随便获取任意章节的任意内容。那么接下来就是对这些文字的应用,各位集思广益吧。

    一粒沙
  • Nokia Booklet 3G使用指南

    在购买联通3G上网卡套餐之前,为了避免买来的usim卡不能使用,我特地先去nokia的booklet支持网站查了查帖子。结果发现有个捷克的兄弟从美国的Bestb...

    py3study

扫码关注云+社区

领取腾讯云代金券