前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka Producer整体架构概述及源码分析

Kafka Producer整体架构概述及源码分析

作者头像
857技术社区
发布2022-05-17 15:44:28
5190
发布2022-05-17 15:44:28
举报
文章被收录于专栏:857-Bigdata

整体架构

「线程」

  • 整个 Kafka 客户端由两个线程协调运行,即Main线程和Sender线程。
  • 在Main线程中由KafkaProducer创建消息,然后通过Interceptor、Serializer和Partitioner之后缓存到RecordAccumulator(消息累加器)中。
  • Sender线程 负责从RecordAccumulator中获取消息并发送到Kafka中。

「RecordAccumulator」

  • RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
  • RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory进行配置,默认值是32MB。如果生产者发送消息的速度超过了发送到客户端的速度,则会导致生产者空间不足,此时KafkaProducer send()方法的调用要么会被阻塞,要么抛出异常。
  • KafkaProducer发送消息的速度可以有参数max.block.ms进行配置,此参数默认值为60秒。

「ProducerBatch」

  • Main线程发送过来的消息会被追加到RecordAccumulator的Deque(双端队列)中,在RecordAccumulator的内部每个Partition都维护了一个Deque,Deque中的内容就是ProducerBatch,即:Deque。
  • 消息被写入缓存时,会被追加到Deque的尾部。Sender读取消息时,会从Deque的头部进行读取。
  • ProducerBatch中可以包含一到多个ProducerRecord(生产者创建的消息),这样可以使字节的使用更加紧凑。同时,将娇小的ProducerRecord拼成一个较大的ProducerBatch也可以减少网络请求的次数以提高整体的吞吐量。
  • 如果生产者需要向多个分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。

「BufferPool」

  • 消息在网络上都是以字节进行传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中通过java.io.ByteBuffer实现消息的创建和释放,不过频繁的创建和释放比较消耗资源,在RecordAccumulator的内部还有一个BufferPool,它主要用来试验ByteBuffer的复用,已实现缓存的高效利用。
  • 但是BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会进入BufferPool。此特定值的大小可以通过参数batch.size进行配置以实现缓存不同大小的消息。

「ProducerBatch与batch.size关系」

  • 当一条消息ProducerRecord进入RecordAccumulator中时,会先寻找与消息分区所对应的的Deque(如果没有则新创建),在从这个Deque的尾部获取一个ProducerBatch(如果没有则新创建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,否则需要创建一个新的ProducerBatch。
  • 在新建ProducerBatch时需要评估这条消息的大小是否超过batch.size,如果不超过,就以batch.size的大小来创建这个ProducerBatch,这样在使用完后还可以通过BufferPool的管理进行复用。若果超过,则以消息的大小来创建ProducerBatch,此内存区域不会被复用。

「Sender」

  • Sender从RecordAccumulator中获取缓存的消息后,会进一步将原本<TopicPartition, Deque>的保存形式进一步转换为<Node,List>的形式,其中Node表示Kafka集群中的Broker节点。
  • 对于网络连接来说,生产者客户端与具体的Broker节点建立连接,也就是向具体的Broker节点发送消息,而并不关心消息属于哪个分区;对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
  • 在转换成<Node,List>的形式之后,Sender还会进一步封装成<Node,List>的形式,这样就可以将Request请求发送到各个Node。

「InFlightRequests」

  • 请求从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式是Map<NodeId, Deque>,其主要作用是缓存已经发出去但还没有收到响应的请求。与此同时,InFlightRequests还提供了趣多管理类的方法,并且通过配置参数还可以限制每个连接(即客户端与Node之间的连接)最多缓存的请求数。此参数为max.in.flight.requests.per.connection,默认值是5。超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。
  • 通过比较Deque的size与配置的最大连接数可以判断对应的node是否已经堆积了很多未响应的请求。如果已有较大未响应请求的堆积,那么说明这个Node节点负载较大或者网络连接有问题,再继续向其发送请求会增大请求超时的可能。

源码分析及图解原理

RecordAccumulator

在RecordAccumulator中,最核心的参数就是:

代码语言:javascript
复制
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:

「再从源码角度来看如何添加到缓冲区队列里的,主要看这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:」

代码语言:javascript
复制
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
public RecordAppendResult append(TopicPartition tp,
                               long timestamp,
                               byte[] key,
                               byte[] value,
                               Header[] headers,
                               Callback callback,
                               long maxTimeToBlock) throws InterruptedException {
  // We keep track of the number of appending thread to make sure we do not miss batches in
  // abortIncompleteBatches().
  appendsInProgress.incrementAndGet();
  ByteBuffer buffer = null;
  if (headers == null) headers = Record.EMPTY_HEADERS;
  try {
      // check if we have an in-progress batch
      // 其实就是一个putIfAbsent操作的方法,不展开分析
      Deque<ProducerBatch> dq = getOrCreateDeque(tp);
      // batches是线程安全的,但是Deque不是线程安全的
      // 已有在处理中的batch
      synchronized (dq) {
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null)
              return appendResult;
      }

      // we don't have an in-progress record batch try to allocate a new batch
      // 创建一个新的ProducerBatch
      byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
      // 分配一个内存
      int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
      log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
      // 申请不到内存
      buffer = free.allocate(size, maxTimeToBlock);
      synchronized (dq) {
          // Need to check if producer is closed again after grabbing the dequeue lock.
          if (closed)
              throw new IllegalStateException("Cannot send after the producer is closed.");

          // 再次尝试添加,因为分配内存的那段代码并不在synchronized块中
          // 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去
          RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
          if (appendResult != null) {
              // 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去?
              // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
              return appendResult;
          }

          // 创建ProducerBatch
          MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
          ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
          FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

          dq.addLast(batch);
          // incomplete是一个Set集合,存放不完整的batch
          incomplete.add(batch);

          // Don't deallocate this buffer in the finally block as it's being used in the record batch
          buffer = null;

          // 返回记录添加结果类
          return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
      }
  } finally {
      // 释放要还的内存
      if (buffer != null)
          free.deallocate(buffer);
      appendsInProgress.decrementAndGet();
  }
}

附加tryAppend()方法,不多说,都在代码注释里:

代码语言:javascript
复制
  /**
 *  Try to append to a ProducerBatch.
 *
 *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
 *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
 *  and memory records built) in one of the following cases (whichever comes first): right before send,
 *  if it is expired, or when the producer is closed.
 */
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
    // 获取最新加入的ProducerBatch
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            // 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    // 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null
    return null;
}

以上代码见图解:

Sender

Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData

「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」

代码语言:javascript
复制
private long sendProducerData(long now) {
  // 获取当前集群的所有信息
  Cluster cluster = metadata.fetch();
  // get the list of partitions with data ready to send
  // @return ReadyCheckResult类的三个变量解释
  // 1.Set<Node> readyNodes 准备好发送的节点
  // 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
  // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
  RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
  // if there are any partitions whose leaders are not known yet, force metadata update
  // 如果有些topic不知道leader信息,更新metadata
  if (!result.unknownLeaderTopics.isEmpty()) {
      // The set of topics with unknown leader contains topics with leader election pending as well as
      // topics which may have expired. Add the topic again to metadata to ensure it is included
      // and request metadata update, since there are messages to send to the topic.
      for (String topic : result.unknownLeaderTopics)
          this.metadata.add(topic);
      this.metadata.requestUpdate();
  }

  // 去除不能发送信息的节点
  // remove any nodes we aren't ready to send to
  Iterator<Node> iter = result.readyNodes.iterator();
  long notReadyTimeout = Long.MAX_VALUE;
  while (iter.hasNext()) {
      Node node = iter.next();
      if (!this.client.ready(node, now)) {
          iter.remove();
          notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
      }
  }

  // 获取将要发送的消息
  // create produce requests
  Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
          this.maxRequestSize, now);

  // 保证发送消息的顺序
  if (guaranteeMessageOrder) {
      // Mute all the partitions drained
      for (List<ProducerBatch> batchList : batches.values()) {
          for (ProducerBatch batch : batchList)
              this.accumulator.mutePartition(batch.topicPartition);
      }
  }

  // 过期的batch
  List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
  boolean needsTransactionStateReset = false;
  // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
  // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
  // we need to reset the producer id here.
  if (!expiredBatches.isEmpty())
      log.trace("Expired {} batches in accumulator", expiredBatches.size());
  for (ProducerBatch expiredBatch : expiredBatches) {
      failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
      if (transactionManager != null && expiredBatch.inRetry()) {
          needsTransactionStateReset = true;
      }
      this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
  }
  if (needsTransactionStateReset) {
      transactionManager.resetProducerId();
      return 0;
  }
  sensors.updateProduceRequestMetrics(batches);
  // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
  // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
  // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
  // with sendable data that aren't ready to send since they would cause busy looping.
  // 1.The amount of time to block if there is nothing to do
  // 2.waiting for a channel to become ready; if zero, block indefinitely;
  long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
  if (!result.readyNodes.isEmpty()) {
      log.trace("Nodes with data ready to send: {}", result.readyNodes);
      // if some partitions are already ready to be sent, the select time would be 0;
      // otherwise if some partition already has some data accumulated but not ready yet,
      // the select time will be the time difference between now and its linger expiry time;
      // otherwise the select time will be the time difference between now and the metadata expiry time;
      pollTimeout = 0;
  }

  // 发送消息
  // 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去
  sendProduceRequests(batches, now);
  return pollTimeout;
}

sendProduceRequests()详解

代码语言:javascript
复制
 private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
      if (batches.isEmpty())
          return;

      Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

      // find the minimum magic version used when creating the record sets
      byte minUsedMagic = apiVersions.maxUsableProduceMagic();
      for (ProducerBatch batch : batches) {
          if (batch.magic() < minUsedMagic)
              minUsedMagic = batch.magic();
      }

      for (ProducerBatch batch : batches) {
          TopicPartition tp = batch.topicPartition;
          MemoryRecords records = batch.records();

          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
          // that the producer starts building the batch and the time that we send the request, and we may have
          // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
          // the new message format, but found that the broker didn't support it, so we need to down-convert on the
          // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
          // not all support the same message format version. For example, if a partition migrates from a broker
          // which is supporting the new magic version to one which doesn't, then we will need to convert.
          if (!records.hasMatchingMagic(minUsedMagic))
              records = batch.records().downConvert(minUsedMagic, 0, time).records();
          produceRecordsByPartition.put(tp, records);
          recordsByPartition.put(tp, batch);
      }

      String transactionalId = null;
      if (transactionManager != null && transactionManager.isTransactional()) {
          transactionalId = transactionManager.transactionalId();
      }
      // 将ProducerBatch转换为ProduceRequest
      ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
              produceRecordsByPartition, transactionalId);
      RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };

      String nodeId = Integer.toString(destination);
      // 将ProduceRequest转换为clientRequest
      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
              requestTimeoutMs, callback);
      // 调用NetworkClient将消息写入网络发送出去
      client.send(clientRequest, now);
      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
  }

其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。

代码语言:javascript
复制
/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
*   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
*   is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
*     <li>The record set is full</li>
*     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
*     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
*     are immediately considered ready).</li>
*     <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
/**
* @return ReadyCheckResult类的三个变量解释
* 1.Set<Node> readyNodes 准备好发送的节点
* 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
* 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
*
* 一个节点满足以下任一条件则表示可以发送数据
* 1.batch满了
* 2.batch没满,但是等了lingerMs的时间
* 3.accumulator满了
* 4.accumulator关了
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
  Set<Node> readyNodes = new HashSet<>();
  long nextReadyCheckDelayMs = Long.MAX_VALUE;
  Set<String> unknownLeaderTopics = new HashSet<>();
  boolean exhausted = this.free.queued() > 0;
  for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
      TopicPartition part = entry.getKey();
      Deque<ProducerBatch> deque = entry.getValue();
      Node leader = cluster.leaderFor(part);
      synchronized (deque) {
          // leader没有且队列非空则添加unknownLeaderTopics
          if (leader == null && !deque.isEmpty()) {
              // This is a partition for which leader is not known, but messages are available to send.
              // Note that entries are currently not removed from batches when deque is empty.
              unknownLeaderTopics.add(part.topic());

              // 如果readyNodes不包含leader且muted不包含part
              // mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1
              // 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量
              // 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求
          } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
              ProducerBatch batch = deque.peekFirst();
              if (batch != null) {
                  long waitedTimeMs = batch.waitedTimeMs(nowMs);
                  boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                  // 等待时间
                  long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                  // batch满了
                  boolean full = deque.size() > 1 || batch.isFull();
                  // batch过期
                  boolean expired = waitedTimeMs >= timeToWaitMs;
                  boolean sendable = full || expired || exhausted || closed || flushInProgress();
                  if (sendable && !backingOff) {
                      readyNodes.add(leader);
                  } else {
                      long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                      // Note that this results in a conservative estimate since an un-sendable partition may have
                      // a leader that will later be found to have sendable data. However, this is good enough
                      // since we'll just wake up and then sleep again for the remaining time.
                      // 目前还没有leader,下次重试
                      nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                  }
              }
          }
      }
  }
  return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):

代码语言:javascript
复制
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
  if (nodes.isEmpty())
      return Collections.emptyMap();
  Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
  for (Node node : nodes) {
      // for循环获取要发的batch
      List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
      batches.put(node.id(), ready);
  }
  return batches;
}

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
  int size = 0;
  // 获取node的partition
  List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
  List<ProducerBatch> ready = new ArrayList<>();
  /* to make starvation less likely this loop doesn't start at 0 */
  // 避免每次都从一个partition取,要雨露均沾
  int start = drainIndex = drainIndex % parts.size();
  do {
      PartitionInfo part = parts.get(drainIndex);
      TopicPartition tp = new TopicPartition(part.topic(), part.partition());
      this.drainIndex = (this.drainIndex + 1) % parts.size();

      // Only proceed if the partition has no in-flight batches.
      if (isMuted(tp, now))
          continue;

      Deque<ProducerBatch> deque = getDeque(tp);
      if (deque == null)
          continue;

      // 加锁,不用说了吧
      synchronized (deque) {
          // invariant: !isMuted(tp,now) && deque != null
          ProducerBatch first = deque.peekFirst();
          if (first == null)
              continue;

          // first != null
          // 查看是否在backoff期间
          boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
          // Only drain the batch if it is not during backoff period.
          if (backoff)
              continue;

          // 超过maxSize且ready里有东西
          if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
              // there is a rare case that a single batch size is larger than the request size due to
              // compression; in this case we will still eventually send this batch in a single request
              // 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize
              // 这种情况下最终还是会发送这个batch在一次请求
              break;
          } else {
              if (shouldStopDrainBatchesForPartition(first, tp))
                  break;

              boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
              ProducerIdAndEpoch producerIdAndEpoch =
                  transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
              ProducerBatch batch = deque.pollFirst();
              if (producerIdAndEpoch != null && !batch.hasSequence()) {
                  // If the batch already has an assigned sequence, then we should not change the producer id and
                  // sequence number, since this may introduce duplicates. In particular, the previous attempt
                  // may actually have been accepted, and if we change the producer id and sequence here, this
                  // attempt will also be accepted, causing a duplicate.
                  //
                  // Additionally, we update the next sequence number bound for the partition, and also have
                  // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                  // even if we receive out of order responses.
                  batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                  transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                  log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                          "{} being sent to partition {}", producerIdAndEpoch.producerId,
                      producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                  transactionManager.addInFlightBatch(batch);
              }
              // 添加batch,并且close
              batch.close();
              size += batch.records().sizeInBytes();
              ready.add(batch);

              batch.drained(now);
          }
      }
  } while (start != drainIndex);
  return ready;
}

总结

以上几个方法主要做了如下几件事:

  • 从RecordAccumulator中读取ProducerBatch,获取node列表,并将ProducerBatch与node建立对应关系;
  • 将ProducerBatch转换为ProducerRequest,再进一步转换为ClientRequest;
  • 调用NetWorkClient的send方法将消息发送出去
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 整体架构
  • 源码分析及图解原理
    • RecordAccumulator
      • Sender
      • 总结
      相关产品与服务
      批量计算
      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档