前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka学习六-生产延迟操作

kafka学习六-生产延迟操作

作者头像
路行的亚洲
发布2020-11-03 15:58:01
6310
发布2020-11-03 15:58:01
举报
文章被收录于专栏:后端技术学习

这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件?

从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的:

代码语言:javascript
复制
case ApiKeys.PRODUCE => handleProduceRequest(request)

这里以处理生产请求为例,看其操作流程:

代码语言:javascript
复制
/**
 * Handle a produce request
  * 处理生产请求
 */
def handleProduceRequest(request: RequestChannel.Request) {
  //拿到生产者请求的内容
  val produceRequest = request.body[ProduceRequest]
  //追加大小
  val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes

  // the callback for sending a produce response
  //发送生产响应的回调
  def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {

    val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
    var errorInResponse = false

    //生产响应回调
    def produceResponseCallback(bandwidthThrottleTimeMs: Int) {
      //生产请求的ack = 0的请求
      /**
        * 如果生产者request.required.acks = 0,则不需要任何操作; 但是,如果在处理请求时出现任何错误,
        * 由于生产者不希望响应,则服务器将关闭套接字服务器,以便生产者客户端将知道发生了一些错误并刷新其元数据
        */
      if (produceRequest.acks == 0) { //acks = 0 即不需要acks,没啥需要特别做的
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          //关闭连接
          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          //发送没有操作响应的操作
          //发送NoOp响应豁免节流阀
          sendNoOpResponseExemptThrottle(request)
        }
      } else { //ack =1 //发送响应
        sendResponseMaybeThrottle(request, requestThrottleMs =>
          new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))
      }
    }
    //acks =-1的情况
    // When this callback is triggered, the remote API call has completed
    request.apiRemoteCompleteTimeNanos = time.nanoseconds

    quotas.produce.maybeRecordAndThrottle(
      request.session.sanitizedUser,
      request.header.clientId,
      numBytesAppended,
      produceResponseCallback)
  }

  //处理统计回调
  def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = {
    processingStats.foreach { case (tp, info) =>
      updateRecordsProcessingStats(request, tp, info)
    }
  }

    // call the replica manager to append messages to the replicas
    //副本管理进行追加消息调用  重要
    replicaManager.appendRecords(
      timeout = produceRequest.timeout.toLong,
      requiredAcks = produceRequest.acks,
      internalTopicsAllowed = internalTopicsAllowed,
      isFromClient = true,
      entriesPerPartition = authorizedRequestInfo,
      responseCallback = sendResponseCallback,
      processingStatsCallback = processingStatsCallback)

    // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
    // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log
    /**
      * 如果将请求放入炼狱延迟组件,它将具有保留的引用,因此无法进行垃圾收集;
      * 因此,我们在这里清除其数据是为了让GC回收其内存,因为它已被附加到日志中
      */
    produceRequest.clearPartitionRecords()
  }
}

这里主要关注三个情况,对acks的操作,这里acks=0,1,-1的情况,其中0表示不需要响应,此时不做任何操作,也即免录,等于1的时候,立即进行响应。这里我们重点关注-1的情况,因为此时会涉及到延迟组件操作。

代码语言:javascript
复制
//记录用户/客户ID更改了一些被限制的指标(产生/消耗的字节,请求处理时间等)如果违反配额,
//则在延迟后调用回调,否则立即调用回调。节流时间计算可能被覆盖 子类。
def maybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int => Unit): Int = {
  if (quotasEnabled) {
      //获取或创建QuotaSensors
    val clientSensors = getOrCreateQuotaSensors(sanitizedUser, clientId)
    recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
  } else {
    // Don't record any metrics if quotas are not enabled at any level
    val throttleTimeMs = 0
    callback(throttleTimeMs)
    throttleTimeMs
  }
}

执行appendRecords追加消息操作

代码语言:javascript
复制
 /**
  * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  * the callback function will be triggered either when timeout or the required acks are satisfied;
  * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
   * 将消息追加到分区的leader副本,然后等待它们被复制到其他副本; 当超时或所需的acks满足时,将触发回调函数;
   * 如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁。
  */
 def appendRecords(timeout: Long,
                   requiredAcks: Short,
                   internalTopicsAllowed: Boolean,
                   isFromClient: Boolean,
                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                   delayedProduceLock: Option[Lock] = None,
                   processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
   if (isValidRequiredAcks(requiredAcks)) {
     val sTime = time.milliseconds
     //追加到本地log
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

     val produceStatus = localProduceResults.map { case (topicPartition, result) =>
       topicPartition ->
               ProducePartitionStatus(
                 result.info.lastOffset + 1, // required offset
                 new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // respon
     }

     //处理统计回调函数
     processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))

     //延迟生产请求要求
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
       // create delayed produce operation
       //创建延迟生产操作
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       //创建新的延迟生产
       val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

       // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
       //创建(主题,分区)对的列表,以用作此延迟的生产操作的键
       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed produce operation is being created, new
       // requests may arrive and hence make this operation completable.
       //尝试立即完成请求,否则将其放入炼狱,这是因为在创建延迟的生产操作时,新的请求可能会到达并因此使该操作可完成。
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

     } else {
       // we can respond immediately
       val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
       responseCallback(produceResponseStatus)
     }
   } else {
     // If required.acks is outside accepted range, something is wrong with the client
     // Just return an error and don't handle the request at all
     val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
       topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
         LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
     }
     //响应回调
     responseCallback(responseStatus)
   }
 }  

重点关注:DelayedProduce,这里可以看下图

从这张图中,我们可以看到延迟生产是属于延迟操作组件中的一部分。

代码语言:javascript
复制
class DelayedProduce(delayMs: Long,
                     produceMetadata: ProduceMetadata,
                     replicaManager: ReplicaManager,
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     lockOpt: Option[Lock] = None)
  extends DelayedOperation(delayMs, lockOpt)

可以看到延迟生产继承了延迟操作,也即它用于延迟操作中的所有方法。

DelayedOperation调用过程

同时基于时间轮进行时间过期的检测操作。

也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的acks=-1时,意味着生产者需要等待该分区的所有副本都与leader副本同步完成之后再向生产者应答,此时必然会经历延迟操作。

也即DelayedProduce的作用则是协助副本管理器在acks=-1时,延迟回调responseCallback向生产者做出响应。

同时可以看到:DelayedProduce能够知晓的条件以及逻辑处理:

写操作发送异常,此时会更新该分区的ProduceResponsePartitionStatus.PartitionResponse.errorCode,同时更新acksPending=false

当分区Leader发生迁移时,此时需要更新该分区的生产分区状态和acksPending=false

ISR副本同步完成,Leader副本的HW高水位已大于requiredOffset。通过Partition.checkEnoughReplicaReachOffset处理后会修改DelayedProduce初始化时对PartitionResponse.errorCode所设置的默认值

也即如果需要调用延迟操作,需要先经过tryComplete,此时会进行相关条件的检查,如果满足,则执行forceComplete,然后完成延迟onComplete操作,然后执行响应。

从老外给生产延迟炼狱可以看到这个过程对于生产者来说是煎熬的,因为它需要等到主从副本同步再进行响应,这个过程确实有炼狱的感觉。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档