Kafka源码分析-网络层-2

  • 许久不写字,发现写作水平严重退步啊~~~
  • 以前也是个文艺青年,现在也要写出诗意的代码啊~
  • 没找到以前写的诗,咱们还是一起撸代码吧...

Kafka网络层一哥:SocketServer类

  • 所在文件: core/src/main/scala/kafka/network/SocketServer.scala;
  • 统筹组织所有的网络层组件;
  • startup()方法: (1) 根据配置的若干endpoint创建相应的Acceptor及相关联的一组Processor线程;
for (i <- processorBeginIndex until processorEndIndex) {
          processors(i) = new Processor(i,
            time,
            maxRequestSize,
            requestChannel,
            connectionQuotas,
            connectionsMaxIdleMs,
            protocol,
            config.values,
            metrics
          )
        }
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

(2) 创建Acceptor运行的线程并启动;

Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, 
                          endpoint.port), acceptor, false).start()
acceptor.awaitStartup()

Kafka网络层头号马仔:Acceptor类

  • 所在文件: core/src/main/scala/kafka/network/SocketServer.scala
  • Acceptor类对象创建: (1) 创建监听ServerSocket:
val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
    }

(2) 开启分配到的若干Processor:

this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }

(3) run()-利用NIO的selector来接收网络连接:

var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }

这里面最主要的就是accept(key, processors(currentProcessor)) (4) accept: 设置新连接socket的参数后交由Processor处理:

socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)

Kafka网络层堂口扛把子:Processor类

  • 所在文件:core/src/main/scala/kafka/network/SocketServer.scala;
  • 从单个连接进来的request都由它处理;
  • 每个Processor对象会创建自己的nio selector;
  • 每个连接有唯一标识ConnectionId:$localHost:$localPort-$remoteHost:$remotePort,这个非常重要!!!
  • accept(socketChannel::SocketChannel):将新连接的SocketChannel保存到并发队列Q1中;
  • run()是核心, 包裹在一个循环里,直接线程退出,while(isRunning) {},里面依次调用如下函数: (1) configureNewConnections():从并发队列Q1里取出SocketChannel,添加到自身的nio selector中,监听读事件; (2) processNewResponses():处理当前所有处理完成的request相应的response, 这些response都是从RequestChannel获得(requestChannel.receiveResponse),根据request的类型来决定从当前连接的nio selector中暂时删除读事件监听/添加写事件/关闭当前连接; (3) selector.poll(300): 这个就不用解释了, 这个selector是对nio selector的又一封装,我们后面一章会讲到,它完成具体的数据接收和发送; (4) selector.completedReceives.asScala.foreach: 处理当前所有的从selector返回的完整request,将其put到RequestChannel的一个阻塞队列里,供应用层获取并处理;同时会暂时删除些连接上的读事件监听:selector.mute(receive.source); (5) selector.completedSends.asScala.foreach: 处理当前所有的从selector返回的写操作,重新将读事件添加到连接的selector监听中selector.unmute(send.destination); (6) selector.disconnected.asScala.foreach: 处理当前所有将关闭的连接;

Kafka网络层中间人:RequestChannel类

  • 所在文件: core/src/main/scala/kafka/network/RequestChannel.scala;
  • 保存所有从网络层拿到的完整request和需要发送的response;
  • 一般是RequestHandler会周期性从RequestChannel获取request,并将response保存回RequestChannel;
  • processNewResponses() 处理RequestChannel中所有的response;

Kafka网络层看门小弟:ConnectionQuotas类

  • 所在文件:core/src/main/scala/kafka/network/SocketServer.scala;
  • 当某个IP到kafka的连接数过多时,将抛出TooManyConnectionsException异常;
  • 实现就是通过加锁的加减计数;

SocketServer 图解:

SocketServer.png

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏思考的代码世界

Python网络数据采集之读取文档|第05天

19130
来自专栏玄魂工作室

Msfvenom后门重新学习与分析-windows篇

Msfvenom 是msf框架配套的攻击载荷生成器。 什么是攻击荷载: Payload:目标系统上渗透成功后执行的代码 msfvenom命令行选项如下: ? ?...

41280
来自专栏逸鹏说道

我这么玩Web Api(一)

帮助页面或用户手册(Microsoft and Swashbuckle Help Page) 前言   你需要为客户编写Api调用手册?你需要测试你的Api接口...

32250
来自专栏运维

安装rhel6系统分区指导

系统分区指导 1,Unless you have a reason for doing otherwise, we recommend that you cr...

19530
来自专栏Kubernetes

深度解析Kubernetes Local Persistent Volume(二)

摘要:上一篇博客”深度解析Kubernetes Local Persistent Volume(一)“对local volume的基本原理和注意事项进行了分析,...

1.8K30
来自专栏纯洁的微笑

springboot(十一):Spring boot中mongodb的使用

mongodb是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多。由于很多公司使用了云服务,服务器默认都开放了外网地址,...

39060
来自专栏马洪彪

Java生成条码二维码

一、概述 可用barcode4j或zxing等第三方库,推荐zxing。 barcode4j资料链接:http://barcode4j.sourceforge....

52480
来自专栏微信公众号:Java团长

Java NIO:NIO概述

在上一篇博文中讲述了几种IO模型,现在我们开始进入Java NIO编程主题。NIO是Java 4里面提供的新的API,目的是用来解决传统IO的问题。本文下面分别...

10310
来自专栏熊二哥

JavaNIO快速入门

NIO是Jdk中非常重要的一个组成部分,基于它的Netty开源框架可以很方便的开发高性能、高可靠性的网络服务器和客户端程序。本文将就其核心基础类型Channel...

1.1K90
来自专栏美团技术团队

Android Hook技术防范漫谈

背景 当下,数据就像水、电、空气一样无处不在,说它是“21世纪的生产资料”一点都不夸张,由此带来的是,各行业对于数据的争夺热火朝天。随着互联网和数据的思维深入人...

64070

扫码关注云+社区

领取腾讯云代金券