「线程」
❝
❞
「RecordAccumulator」
❝
❞
「ProducerBatch」
❝
❞
「BufferPool」
❝
❞
「ProducerBatch与batch.size关系」
❝
❞
「Sender」
❝
❞
「InFlightRequests」
❝
❞
在RecordAccumulator中,最核心的参数就是:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:
「再从源码角度来看如何添加到缓冲区队列里的,主要看这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:」
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 其实就是一个putIfAbsent操作的方法,不展开分析
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// batches是线程安全的,但是Deque不是线程安全的
// 已有在处理中的batch
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
// 创建一个新的ProducerBatch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 分配一个内存
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 申请不到内存
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// 再次尝试添加,因为分配内存的那段代码并不在synchronized块中
// 有可能这时候其他线程已经创建好RecordBatch了,finally会把分配好的内存还回去
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// 作者自己都说了,希望不要总是发生,多个线程都去申请内存,到时候还不是要还回去?
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 创建ProducerBatch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
// incomplete是一个Set集合,存放不完整的batch
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
// 返回记录添加结果类
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
// 释放要还的内存
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
附加tryAppend()方法,不多说,都在代码注释里:
/**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
// 获取最新加入的ProducerBatch
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
// 记录添加结果类包含future、batch是否已满的标记、是否是新batch创建的标记
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
// 如果这个Deque没有ProducerBatch元素,或者已经满了不足以加入本条消息则返回null
return null;
}
以上代码见图解:
Sender里最重要的方法莫过于run()方法,其中比较核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData
「其中pollTimeout需要认真读注释,意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了」
private long sendProducerData(long now) {
// 获取当前集群的所有信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// @return ReadyCheckResult类的三个变量解释
// 1.Set<Node> readyNodes 准备好发送的节点
// 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
// 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 如果有些topic不知道leader信息,更新metadata
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 去除不能发送信息的节点
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 获取将要发送的消息
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
// 保证发送消息的顺序
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 过期的batch
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
boolean needsTransactionStateReset = false;
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
if (transactionManager != null && expiredBatch.inRetry()) {
needsTransactionStateReset = true;
}
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
}
if (needsTransactionStateReset) {
transactionManager.resetProducerId();
return 0;
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
// 1.The amount of time to block if there is nothing to do
// 2.waiting for a channel to become ready; if zero, block indefinitely;
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
// 发送消息
// 最后调用client.send() 把ProducerBatch转换为对应的ProduceRequest,并调用NetworkClient将消息写入网络发送出去
sendProduceRequests(batches, now);
return pollTimeout;
}
sendProduceRequests()详解
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
// 将ProducerBatch转换为ProduceRequest
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
// 将ProduceRequest转换为clientRequest
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 调用NetworkClient将消息写入网络发送出去
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
其中也需要了解这个方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的类中3个关键参数的解释都在注释里。
/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
* {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
* is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
* <li>The record set is full</li>
* <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
* <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
* are immediately considered ready).</li>
* <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
/**
* @return ReadyCheckResult类的三个变量解释
* 1.Set<Node> readyNodes 准备好发送的节点
* 2.long nextReadyCheckDelayMs 下次检查节点的延迟时间
* 3.Set<String> unknownLeaderTopics 哪些topic找不到leader节点
*
* 一个节点满足以下任一条件则表示可以发送数据
* 1.batch满了
* 2.batch没满,但是等了lingerMs的时间
* 3.accumulator满了
* 4.accumulator关了
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
synchronized (deque) {
// leader没有且队列非空则添加unknownLeaderTopics
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
// 如果readyNodes不包含leader且muted不包含part
// mute这个变量跟producer端的一个配置有关系:max.in.flight.requests.per.connection=1
// 主要防止topic同分区下的消息乱序问题,限制了producer在单个broker连接上能够发送的未响应请求的数量
// 如果设置为1,则producer在收到响应之前无法再给该broker发送该topic的PRODUCE请求
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 等待时间
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// batch满了
boolean full = deque.size() > 1 || batch.isFull();
// batch过期
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
// 目前还没有leader,下次重试
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
还有一个方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,从accumulator缓冲区获取要发送的数据,最大一次性发max.request.size大小的数据(最上面的配置参数里有):
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* maxSize也就是producer端配置参数max.request.size来控制的,一次最多发多少
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
// for循环获取要发的batch
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
// 获取node的partition
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
// 避免每次都从一个partition取,要雨露均沾
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp, now))
continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
// 加锁,不用说了吧
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
// 查看是否在backoff期间
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
// 超过maxSize且ready里有东西
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
// 有一种特殊的情况,batch的大小超过了maxSize,且batch是空的。也就是一个batch大小直接大于一次发送的maxSize
// 这种情况下最终还是会发送这个batch在一次请求
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
// 添加batch,并且close
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
以上几个方法主要做了如下几件事: