从源码中我们发现在Sender的run方法中,并没有涉及到append追加操作。因此可以看到源码中,如果消息收集器中的消息收集结果为空或者新的消息批次已经创建好,进行sender唤醒,执行wakeup操作的,唤醒Sender线程的。因此可以看到核心代码就是append和sender线程唤醒启动,最终将发送的结果进行返回:
//在消息收集器中追加信息,为批量发送消息做准备 重要 append重点
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//如果消息收集器中的消息收集结果为空或者新的消息批次已经创建好,进行sender唤醒 sender重点
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
//返回执行发送消息成功与否结果,方便后续操作
return result.future;
和消息收集器RecordAccumulator的名称一样,我们知道accumulator的英文意思就有积累的意思,只有积累到一定的量时,才会进行Sender线程的唤醒。下面我们来看到怎样进行消息追加的。
唤醒Sender线程操作:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
//追踪append追加线程的数量,确保在完成Batches()中的中止时不会丢失批次。可以在构造函数中看到初始值是0
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
//如果header为空,则消息为空
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
//检查是否有正在进行的批次处理
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//如果有,则执行tryAppend 追加操作
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
//如果追加的消息不为空,则进行返回
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
//没有正在进行中的消息批次尝试分配新批次
//最大可用生产消息魔数
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
//最大批次大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
//进行缓冲池内存分配
buffer = free.allocate(size, maxTimeToBlock);
//对消息进行追加到批次操作
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//进行消息追加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//进行消息构建
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
//创建生产者批次对象
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
//添加批次消息
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
// 不要在finally块中取消分配此缓冲区,因为它已在消息批次中使用
buffer = null;
//返回消息追加结果
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
//进行缓冲区回收使用,同时处理批次数量自减
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
尝试将消息追加到生产批次中。如果已满,我们将返回null并创建一个新批次。我们还将关闭批处理以进行记录追加,以释放压缩缓冲区等资源。在以下情况之一(以先到者为准)中,批处理将完全关闭(即,将记录批处理标头写入并建立内存记录):在发送之前,到期或生产者关闭时。
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
//拿到双端队列的队尾
ProducerBatch last = deque.peekLast();
//如果队尾不为空,则执行追加操作,否者直接返回null
if (last != null) {
//执行追加操作
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
//执行追加操作
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
//拿到最大消息大小
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
//拿到最后追加的时间
this.lastAppendTime = now;
//创建消息元数据结果对象
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
//如果必须将批次拆分为几个新批次并重新发送,我们必须将所有future结果都返回给用户。
thunks.add(new Thunk(callback, future));
//消息计数
this.recordCount++;
//重要
return future;
}
}
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
//control消息只能添加到control批次中
if (isControlRecord != isControlBatch)
throw new IllegalArgumentException("Control records can only be appended to control batches");
//偏移量进行校验
if (lastOffset != null && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
"(Offsets must increase monotonically).", offset, lastOffset));
//对时间戳进行校验
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
//对魔数进行校验
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
//时间戳
if (firstTimestamp == null)
firstTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
return null;
} else {
//进行消息追加
return appendLegacyRecord(offset, timestamp, key, value);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
//进行消息追加
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
ensureOpenForRecordAppend();
//压缩类型
if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;
//消息大小
int size = LegacyRecord.recordSize(magic, key, value);
//进行消息头写入
AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
if (timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;
//进行消息写入 crc
long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
//偏移量、时间戳、消息大小写入
recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
return crc;
}
//写入消息信息
public static long write(DataOutputStream out,
byte magic,
long timestamp,
ByteBuffer key,
ByteBuffer value,
CompressionType compressionType,
TimestampType timestampType) throws IOException {
byte attributes = computeAttributes(magic, compressionType, timestampType);
long crc = computeChecksum(magic, attributes, timestamp, key, value);
//进行消息写入
write(out, magic, crc, attributes, timestamp, key, value);
return crc;
}
//将一条记录写入缓冲区,如果该记录的压缩类型为none,则其值有效载荷应该已经使用指定的类型进行了压缩
private static void write(DataOutputStream out,
byte magic,
long crc,
byte attributes,
long timestamp,
ByteBuffer key,
ByteBuffer value) throws IOException {
if (magic != RecordBatch.MAGIC_VALUE_V0 && magic != RecordBatch.MAGIC_VALUE_V1)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + timestamp);
// write crc
out.writeInt((int) (crc & 0xffffffffL));
// write magic value
out.writeByte(magic);
// write attributes
out.writeByte(attributes);
// maybe write timestamp
if (magic > RecordBatch.MAGIC_VALUE_V0)
out.writeLong(timestamp);
// write the key
if (key == null) {
out.writeInt(-1);
} else {
int size = key.remaining();
out.writeInt(size);
Utils.writeTo(out, key, size);
}
// write the value
if (value == null) {
out.writeInt(-1);
} else {
int size = value.remaining();
out.writeInt(size);
Utils.writeTo(out, value, size);
}
}
private void recordWritten(long offset, long timestamp, int size) {
if (numRecords == Integer.MAX_VALUE)
throw new IllegalArgumentException("Maximum number of records per batch exceeded, max records: " + Integer.MAX_VALUE);
if (offset - baseOffset > Integer.MAX_VALUE)
throw new IllegalArgumentException("Maximum offset delta exceeded, base offset: " + baseOffset +
", last offset: " + offset);
numRecords += 1;
uncompressedRecordsSizeInBytes += size;
lastOffset = offset;
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
maxTimestamp = timestamp;
offsetOfMaxTimestamp = offset;
}
}
kafkaProducer创建消息,经过拦截器、序列化key/value器和分区器之后,进行消息的收集到消息收集器。当消息收集到满足条件时,也即批次消息满时,会将Sender线程进行唤醒。
从源码中,我们可以看到消息收集器中有若干个双端队列Deque,通常一个双端队列中会有若干个消息批次,通常入队的消息会放到队尾,从队首进行消息的获取。
这个类充当队列,该队列将消息收集到内存消息MemoryRecords实例中,以发送到服务器。消息收集器使用有限数量的内存,并且当该内存耗尽时,append调用将阻塞,除非明确禁用此行为。
public final class RecordAccumulator {
//log信息
private final Logger log;
//是否关闭
private volatile boolean closed;
//在处理时进行刷新
private final AtomicInteger flushesInProgress;
//在处理时进行追加
private final AtomicInteger appendsInProgress;
//批量大小
private final int batchSize;
//压缩类型
private final CompressionType compression;
private final long lingerMs;
//重试间隔的最短时间
private final long retryBackoffMs;
//缓冲池
private final BufferPool free;
//时间
private final Time time;
//api版本
private final ApiVersions apiVersions;
//批量
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
//不完整批次,未发送完成的消息批次
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
//以下变量仅由发送方线程访问,因此我们不需要保护它们。
private final Set<TopicPartition> muted;
//drainIndex
private int drainIndex;
//事务管理器
private final TransactionManager transactionManager;
处理将生产请求发送到Kafka集群的后台线程。该线程发出元数据请求以更新其对群集的信息,然后将生产请求发送到适当的节点。
public class Sender implements Runnable {
//日志信息
private final Logger log;
/* the state of each nodes connection */
//kafka客户端 每个节点连接的状态
private final KafkaClient client;
/* the record accumulator that batches records */
//消息收集器 批量消息
private final RecordAccumulator accumulator;
/* the metadata for the client */
//元数据
private final Metadata metadata;
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
//生产者是否应保证broker上的消息顺序的标志
private final boolean guaranteeMessageOrder;
/* the maximum request size to attempt to send to the server */
//尝试发送到服务器的最大请求大小
private final int maxRequestSize;
/* the number of acknowledgements to request from the server */
//要从服务器请求的确认数
private final short acks;
/* the number of times to retry a failed request before giving up */
//放弃之前重试失败请求的次数
private final int retries;
/* the clock instance used for getting the time */
//用于获取时间的时钟实例
private final Time time;
/* true while the sender thread is still running */
//当发送方线程仍在运行时为true
private volatile boolean running;
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
//当caller想忽略所有未发送/正在进行的消息并强制关闭时为true
private volatile boolean forceClose;
/* metrics */
//发送的度量信息 相关指标
private final SenderMetrics sensors;
/* the max time to wait for the server to respond to the request*/
//等待服务器响应请求的最长时间
private final int requestTimeout;
/* The max time to wait before retrying a request which has failed */
//重试失败的请求之前等待的最长时间
private final long retryBackoffMs;
/* current request API versions supported by the known brokers */
//已知broker支持的当前请求API版本
private final ApiVersions apiVersions;
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
//与事务相关的所有状态,特别是生产者ID,生产者时期和序列号
private final TransactionManager transactionManager;
}
唤醒中会执行run方法
/**
* Run a single iteration of sending
* 运行一次发送
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
//如果事务不为空,则放入事务信息
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
//重置生产者id
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}
//发送生产者数据 重点
long pollTimeout = sendProducerData(now);
//执行poll轮询操作 重点
client.poll(pollTimeout, now);
}
此时会执行两个方法sendProducerData(now)和poll(pollTimeout, now)。
//发送生产者数据
private long sendProducerData(long now) {
//执行fetch操作,获取元数据信息
Cluster cluster = metadata.fetch();
//获取具有准备发送到可发送大小的数据的分区列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//如果存在尚不知道leader的任何分区,请强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
//删除所有我们不准备发送给的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
//创建发送请求
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
//将所有耗尽的分区Mute
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//过期批次
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
/**
* 如果先前已将过期的批次发送到代理,则重置生产者ID。
* 同时更新过期批次的指标。 请参阅@ TransactionState.resetProducerId的文档,
* 以了解为什么我们需要在此处重置生产者ID。
*/
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
//更新生产请求指标
sensors.updateProduceRequestMetrics(batches);
/**
* 如果我们有任何准备发送的节点+具有可发送的数据,请以0超时进行轮询,这样可以立即循环并尝试发送更多数据。
* 否则,超时将由节点进行分区,该分区具有尚未发送的数据(例如,徘徊,回退)。
* 请注意,这特别不包括带有可发送数据且尚未准备好发送的节点,因为它们会导致繁忙的循环。
*/
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
/**
* 如果已经准备好发送某些分区,则选择时间将为0;
* 否则,如果某个分区已经积累了一些数据但尚未就绪,则选择时间将是现在与其有效期之间的时间差;
* 否则,选择时间将是现在与元数据到期时间之间的时间差;
*/
pollTimeout = 0;
}
//发送生产请求 重点
sendProduceRequests(batches, now);
return pollTimeout;
}
/**
* 从给定的消息批次创建生产请求
**/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//查找创建消息集时使用的最低魔数版本
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
//循环批次
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
/**
* 必要时向下转换为使用的最小魔法。 通常,在生产者开始构建批处理的时间与我们发送请求的时间之间可能会有延迟,
* 并且我们可能已根据过时的元数据选择了消息格式。 在最坏的情况下,我们乐观地选择使用新的消息格式,
* 但是发现代理不支持它,因此需要在客户端上进行下转换,然后再发送。 这旨在处理集群升级周围的极端情况,
* 在这些情况下,代理可能并不都支持相同的消息格式版本。
* 例如,如果分区从支持新魔术版本的代理迁移到不支持新魔术版本的代理,则我们将需要转换。
*/
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
//处理生产响应
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
//新的客户端请求
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
//发送消息
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
//执行发送消息操作
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
//获取目的地
String nodeId = clientRequest.destination();
//如果不是内部请求
if (!isInternalRequest) {
/**
* 如果此请求来自NetworkClient之外,请确认我们可以发送数据。
* 如果请求是内部请求,我们相信内部代码已完成此验证。
* 对于某些内部请求,验证会稍有不同(例如,可以在处于READY状态之前发送ApiVersionsRequests。)
*/
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
//构建版本信息
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
//执行发送
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException e) {
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), e);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
}
}
//进行消息发送
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
//节点id
String nodeId = clientRequest.destination();
//请求头信息
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), nodeId);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
}
}
//发送消息
Send send = request.toSend(nodeId, header);
//创建flight请求 飞行中的要求
InFlightRequest inFlightRequest = new InFlightRequest(
header,
clientRequest.createdTimeMs(),
clientRequest.destination(),
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
//执行发送方法
selector.send(inFlightRequest.send);
}
/**
* Queue the given request for sending in the subsequent {@link #poll(long)} calls
* 将给定的请求排队,以在随后的{@link #poll(long)}调用中发送
*
* @param send The request to send
*/
public void send(Send send) {
String connectionId = send.destination();
//打开kafka通道
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
//确保通过`disconnected`发出通知,将频道保持在触发关闭的状态
this.failedSends.add(connectionId);
} else {
try {
//设置发送信息
channel.setSend(send);
} catch (Exception e) {
// update the state for consistency, the channel will be discarded after `close`
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
throw e;
}
}
}
}
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
//传输层添加操作
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//执行poll操作
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
//处理完整操作
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
//执行完整响应操作
completeResponses(responses);
return responses;
}
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
//是否取得读取进度的最后一次通话
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.unmute();
}
}
outOfMemory = false;
}
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
//从具有缓冲数据的通道进行轮询(但仅从基础套接字进行轮询)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// Poll from channels where the underlying socket has more data
//从基础套接字具有更多数据的通道进行轮询
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
//清除所有选中的键,以便将它们包括在下次选择的就绪计数中
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
//使用select末尾的时间来确保不关闭pollSelectionKeys中刚刚处理的任何连接
maybeCloseOldestConnection(endSelect);
// Add to completedReceives after closing expired connections to avoid removing
// channels with completed receives until all staged receives are completed.
//关闭过期的连接后,添加到completedReceives中,以避免在所有暂存的接收完成之前删除具有完成接收的通道。
addToCompletedReceives();
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
boolean sendFailed = false;
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
//完成所有已完成握手的连接(正常或立即)
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}
/* if channel is not ready finish prepare */
//如果通道尚未准备好,请完成准备
if (channel.isConnected() && !channel.ready()) {
try {
channel.prepare();
} catch (AuthenticationException e) {
sensors.failedAuthentication.record();
throw e;
}
if (channel.ready())
sensors.successfulAuthentication.record();
}
//尝试读取
attemptRead(key, channel);
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
//next poll call otherwise data may be stuck in said buffers forever. If we attempt
//to process buffered data and no progress is made, the channel buffered status is
//cleared to avoid the overhead of checking every time.
keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
//如果通道已准备好,则写入缓冲区中有空间且有数据的任何套接字
if (channel.ready() && key.isWritable()) {
Send send = null;
try {
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else if (e instanceof AuthenticationException) // will be logged later as error by clients
log.debug("Connection with {} disconnected due to authentication exception", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
//尝试读取
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
/**
* 如果通道已准备好,并且有要从套接字或缓冲区读取的字节,并且尚未暂存或没有进行任何先前的接收,则从该通道读取
*/
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
//如果网络接收不为空,也即读取操作不为空,则
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
//添加数据和通道
addToStagedReceives(channel, networkReceive);
}
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
//网络接收 读操作
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
//创建一个新的网络接收对象
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
//进行接收操作
receive(receive);
//如果接收完整,则将结果进行返回
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
} else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
//pool must be out of memory, mute ourselves.
//缓冲池oom,执行静止操作
mute();
}
return result;
}
addToStagedReceives添加数据和通道
/**
* adds a receive to staged receives
* 向分段接收添加接收
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
//放入到双端队列
deque.add(receive);
}
//执行写入操作
public Send write() throws IOException {
Send result = null;
//重点 send(send)
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
//执行写入操作
private boolean send(Send send) throws IOException {
//执行写入操作,写入传输层操作,写完整后,移除操作key
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
//使用ByteBuffer的方式写入
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
//在通道中写入
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
//有待写
pending = TransportLayers.hasPendingWrites(channel);
return written;
}