专栏首页后端技术学习kafka学习六-生产延迟操作

kafka学习六-生产延迟操作

这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件?

从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的:

case ApiKeys.PRODUCE => handleProduceRequest(request)

这里以处理生产请求为例,看其操作流程:

/**
 * Handle a produce request
  * 处理生产请求
 */
def handleProduceRequest(request: RequestChannel.Request) {
  //拿到生产者请求的内容
  val produceRequest = request.body[ProduceRequest]
  //追加大小
  val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes

  // the callback for sending a produce response
  //发送生产响应的回调
  def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {

    val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
    var errorInResponse = false

    //生产响应回调
    def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
      //生产请求的ack = 0的请求
      /**
        * 如果生产者request.required.acks = 0,则不需要任何操作; 但是,如果在处理请求时出现任何错误,
        * 由于生产者不希望响应,则服务器将关闭套接字服务器,以便生产者客户端将知道发生了一些错误并刷新其元数据
        */
      if (produceRequest.acks == 0) { //acks = 0 即不需要acks,没啥需要特别做的
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          //关闭连接
          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          //发送没有操作响应的操作
          //发送NoOp响应豁免节流阀
          sendNoOpResponseExemptThrottle(request)
        }
      } else { //ack =1 //发送响应
        sendResponseMaybeThrottle(request, requestThrottleMs =>
          new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))
      }
    }
    //acks =-1的情况
    // When this callback is triggered, the remote API call has completed
    request.apiRemoteCompleteTimeNanos = time.nanoseconds

    quotas.produce.maybeRecordAndThrottle(
      request.session.sanitizedUser,
      request.header.clientId,
      numBytesAppended,
      produceResponseCallback)
  }

  //处理统计回调
  def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
    processingStats.foreach { case (tp, info) =>
      updateRecordsProcessingStats(request, tp, info)
    }
  }

    // call the replica manager to append messages to the replicas
    //副本管理进行追加消息调用  重要
    replicaManager.appendRecords(
      timeout = produceRequest.timeout.toLong,
      requiredAcks = produceRequest.acks,
      internalTopicsAllowed = internalTopicsAllowed,
      isFromClient = true,
      entriesPerPartition = authorizedRequestInfo,
      responseCallback = sendResponseCallback,
      processingStatsCallback = processingStatsCallback)

    // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
    // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
    /**
      * 如果将请求放入炼狱延迟组件,它将具有保留的引用,因此无法进行垃圾收集;
      * 因此,我们在这里清除其数据是为了让GC回收其内存,因为它已被附加到日志中
      */
    produceRequest.clearPartitionRecords()
  }
}

这里主要关注三个情况,对acks的操作,这里acks=0,1,-1的情况,其中0表示不需要响应,此时不做任何操作,也即免录,等于1的时候,立即进行响应。这里我们重点关注-1的情况,因为此时会涉及到延迟组件操作。

//记录用户/客户ID更改了一些被限制的指标(产生/消耗的字节,请求处理时间等)如果违反配额,
//则在延迟后调用回调,否则立即调用回调。节流时间计算可能被覆盖 子类。
def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
  if (quotasEnabled) {
      //获取或创建QuotaSensors
    val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
    recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
  } else {
    // Don't record any metrics if quotas are not enabled at any level
    val throttleTimeMs = 0
    callback(throttleTimeMs)
    throttleTimeMs
  }
}

执行appendRecords追加消息操作

 /**
  * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  * the callback function will be triggered either when timeout or the required acks are satisfied;
  * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
   * 将消息追加到分区的leader副本,然后等待它们被复制到其他副本; 当超时或所需的acks满足时,将触发回调函数;
   * 如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁。
  */
 def appendRecords(timeout: Long,
                   requiredAcks: Short,
                   internalTopicsAllowed: Boolean,
                   isFromClient: Boolean,
                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                   delayedProduceLock: Option[Lock] = None,
                   processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
   if (isValidRequiredAcks(requiredAcks)) {
     val sTime = time.milliseconds
     //追加到本地log
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

     val produceStatus = localProduceResults.map { case (topicPartition, result) =>
       topicPartition ->
               ProducePartitionStatus(
                 result.info.lastOffset + 1, // required offset
                 new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // respon
     }

     //处理统计回调函数
     processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))

     //延迟生产请求要求
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
       // create delayed produce operation
       //创建延迟生产操作
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       //创建新的延迟生产
       val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

       // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
       //创建(主题,分区)对的列表,以用作此延迟的生产操作的键
       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed produce operation is being created, new
       // requests may arrive and hence make this operation completable.
       //尝试立即完成请求,否则将其放入炼狱,这是因为在创建延迟的生产操作时,新的请求可能会到达并因此使该操作可完成。
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

     } else {
       // we can respond immediately
       val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
       responseCallback(produceResponseStatus)
     }
   } else {
     // If required.acks is outside accepted range, something is wrong with the client
     // Just return an error and don't handle the request at all
     val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
       topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
         LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
     }
     //响应回调
     responseCallback(responseStatus)
   }
 }  

重点关注:DelayedProduce,这里可以看下图

从这张图中,我们可以看到延迟生产是属于延迟操作组件中的一部分。

class DelayedProduce(delayMs: Long,
                     produceMetadata: ProduceMetadata,
                     replicaManager: ReplicaManager,
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     lockOpt: Option[Lock] = None)
  extends DelayedOperation(delayMs, lockOpt)

可以看到延迟生产继承了延迟操作,也即它用于延迟操作中的所有方法。

DelayedOperation调用过程

同时基于时间轮进行时间过期的检测操作。

也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的acks=-1时,意味着生产者需要等待该分区的所有副本都与leader副本同步完成之后再向生产者应答,此时必然会经历延迟操作。

也即DelayedProduce的作用则是协助副本管理器在acks=-1时,延迟回调responseCallback向生产者做出响应。

同时可以看到:DelayedProduce能够知晓的条件以及逻辑处理:

写操作发送异常,此时会更新该分区的ProduceResponsePartitionStatus.PartitionResponse.errorCode,同时更新acksPending=false

当分区Leader发生迁移时,此时需要更新该分区的生产分区状态和acksPending=false

ISR副本同步完成,Leader副本的HW高水位已大于requiredOffset。通过Partition.checkEnoughReplicaReachOffset处理后会修改DelayedProduce初始化时对PartitionResponse.errorCode所设置的默认值

也即如果需要调用延迟操作,需要先经过tryComplete,此时会进行相关条件的检查,如果满足,则执行forceComplete,然后完成延迟onComplete操作,然后执行响应。

从老外给生产延迟炼狱可以看到这个过程对于生产者来说是煎熬的,因为它需要等到主从副本同步再进行响应,这个过程确实有炼狱的感觉。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-11-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kafka学习三-broker的入口

    前面我们通过学习scala知道通常如果想运行scala程序,必然会有一个入口,而这个入口可以通过kafka的启动脚本kafka-server-start.sh可...

    路行的亚洲
  • Scala学习一

    当记录方法返回值的变量被声明为lazy时,方法的执行将被推迟,直到我们首次使用该值时,方法才会执行。类似java中的延迟加载。同时lazy不能修饰var类型的变...

    路行的亚洲
  • Kafka学习五

    前面我们知道其重要的启动方法里面有关的方法:它的注释是非常具有启发性的, 启动API,以启动Kafka服务器的单个实例。实例化LogManager,Socket...

    路行的亚洲
  • PyText简介 - Facebook自然语言处理框架

    自然语言处理(NLP)在现代深度学习生态中越来越常见。从流行的深度学习框架到云端API的支持,例如Google云、Azure、AWS或Bluemix,NLP是深...

    用户1408045
  • kotlin标准库扩展之 let run apply also(二)

    with同样也是标准库的一个扩展,不同的是它不是功能性的扩展,他只是起一个承接的作用方便大家标识和转呈

    大话swift
  • Android自动化之-ATX录放编辑器WEditor的安装使用与异常解决

    ATX录放编辑器WEditor,可以便利查找操作的情况以及各种元素和包名。最近在搞手机群控和安卓自动化用到了这个,记录下

    十四君
  • 十年资深架构师告诉Java程序员成为架构师必须要掌握的知识点一、分布式架构二、工程化专题三、微服务架构四、性能优化五、源码分析六、项目实战

    美的让人心动
  • 同步、异步、堵塞、非堵塞和函数调用及I/O之间的组合概念

            在我们工作和学习中,经常会接触到“同步”、“异步”、“堵塞”和“非堵塞”这些概念,但是并不是每个人都能将它们的关系和区别说清楚。本文将对这些基本...

    方亮
  • hello大皮——微型 python web 框架:Bottle (一)

    Bottle 是一个非常小巧但高效的微型 Python Web 框架,它被设计为仅仅只有一个文件的Python模块,并且除Python标准库外,它不依赖于任何第...

    用户5908113
  • 用于推断开放式工作场所浓度的环境物理系统(CS CY)

    开放式工作区中的核心挑战之一是确保劳动者在执行任务时集中精力。因此,能够推断出劳动者的集中度将使建筑设计师,经理和劳动者能够估计不同的开放式布局将产生什么影响并...

    小童

扫码关注云+社区

领取腾讯云代金券