前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka基础组件和辅助类库简介

Kafka基础组件和辅助类库简介

作者头像
扫帚的影子
发布2018-09-05 17:14:44
6840
发布2018-09-05 17:14:44
举报
  • 在正式开始扒代码之前, 先来个开胃菜,简单介绍一下kafka的基础组件和一些代码实现中用到的基础类库

Kafka基础组件概述

  • KafkaServer是整个Kafka的核心组件,里面包含了kafka对外提供功能的所有角色;
  • 一图顶千言:

kafkaserver1.png

Kafka辅助类库简介

KafkaScheduler

  • 所在文件: core/src/main/scala/kafka/utils/KafkaScheduler.scala
  • 功能: 接收需周期性执行的任务和延迟作务的添加, 使用一组thread pool来执行具体的任务;
  • 实现: 封装了 java.util.concurrent.ScheduledThreadPoolExecutor;
  • 接口(原有注释已经很清晰):
代码语言:javascript
复制
/**
   * Initialize this scheduler so it is ready to accept scheduling of tasks
   */
  def startup()
  
  /**
   * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. 
   * This includes tasks scheduled with a delayed execution.
   */
  def shutdown()
  
  /**
   * Check if the scheduler has been started
   */
  def isStarted: Boolean
  
  /**
   * Schedule a task
   * @param name The name of this task
   * @param delay The amount of time to wait before the first execution
   * @param period The period with which to execute the task. If < 0 the task will execute only once.
   * @param unit The unit for the preceding times.
   */
  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)

ZkUtils

  • 所在文件: core/scr/main/scala/kafka/utils/ZkUtils.scala
  • 功能: 封装了可能用到的对zk上节点的创建,读,写,解析(主要是json)操作;
  • 实现: 使用了一个小众的类库 I0Itec 来操作zk;
  • 涉及到以下zk节点:
代码语言:javascript
复制
  val ConsumersPath = "/consumers"
  val BrokerIdsPath = "/brokers/ids"
  val BrokerTopicsPath = "/brokers/topics"
  val ControllerPath = "/controller"
  val ControllerEpochPath = "/controller_epoch"
  val ReassignPartitionsPath = "/admin/reassign_partitions"
  val DeleteTopicsPath = "/admin/delete_topics"
  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
  val BrokerSequenceIdPath = "/brokers/seqid"
  val IsrChangeNotificationPath = "/isr_change_notification"
  val EntityConfigPath = "/config"
  val EntityConfigChangesPath = "/config/changes"

Pool

  • 所在文件: core/src/main/scala/kafka/utils/Pool.scala
  • 功能: 简单的并发对象池;
  • 实现: 对ConcurrentHashMap的封裝;
  • getAndMaybePut实现小技巧, 使用了double check技术, 在有值的情况下降低锁的开销;
代码语言:javascript
复制
def getAndMaybePut(key: K) = {
    if (valueFactory.isEmpty)
      throw new KafkaException("Empty value factory in pool.")
    val curr = pool.get(key)
    if (curr == null) {
      createLock synchronized {
        val curr = pool.get(key)
        if (curr == null)
          pool.put(key, valueFactory.get(key))
        pool.get(key)
      }
    }
    else
      curr
  }

Logging

  • 所在文件: core/src/main/scala/kafka/utils/Logging.scala
  • 功能: 定义了trait Logging 供其他类继承,方便写日志;
  • 实现: 对org.apache.log4j.Logger的封装;

FileLock

  • 所在文件: core/src/main/scala/kafka/utils/FileLock.scala
  • 功能: 文件锁, 相当于linux的/usr/bin/lockf;
  • 实现: 使用java.nio.channels.FileLock实现;

ByteBounderBlockingQueue

  • 所在文件: core/src/main/scala/kafkak/utils/ByteBoundedBlockingQueue.scala;
  • 功能: 阻塞队列, 队列满的衡量标准有两条: 队列内元素个数达到了上限, 队列内所有元素的size之各达到了上限;
  • 实现: 使用java.util.concurrent.LinkedBlockingQueue实现, 加上了对队列内已有元素size大小的check;
  • 接口:
代码语言:javascript
复制
def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean
def offer(e: E): Boolean
def put(e: E): Boolean
def poll(timeout: Long, unit: TimeUnit)
def poll()
def take(): E
...

DelayedItem

  • 所在文件: core/src/main/scala/kafaka/utils/DelayedItem.scala
  • 功能: 定义了可以放入到DelayQueue队列的对象;
  • 实现: 实现了Delayed接口;
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016.12.26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka基础组件概述
  • Kafka辅助类库简介
    • KafkaScheduler
      • ZkUtils
        • Pool
        • Logging
        • FileLock
        • ByteBounderBlockingQueue
        • DelayedItem
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档