在kafka启动时,首先执行的broker的操作,然后接着会执行生产者操作,接着将生产者的消息放入到存储中,此时生产者和broker会进行交互,而消费者发送消息,接着消费者会和broker交互。前面我们知道kafka在kafkaApi中会处理具体的请求。首先,我们再次来看kafkaApi的handle,可以看到其入参的参数是RequestChannel.request,也即我们需要找到ReuqestChannel,回忆在RocketMQ中,我们也可以看到请求的参数:ChannelHandlerContext和request在Processor中。也即request.header.apiKey匹配到case样例函数。从Sender中,我们可以看到sendProducerData和poll两个方法中有构建请求和完整响应,此时可以看到kafkaApi中基于请求的,也即必然会有处理器。
因此首先在Sender中sendProducer中先将发送的消息构建成客户端请求ClientRequest,然后将其放入到InFlightRequest中缓存,当收到响应或者出现异常调用RequestCompletionHandler对象。根据其send方法可以看到在KafkaChannel中会设置放送setSend,此时可以看到其在传输层添加了写入的操作动作,而根据Netty基于事件驱动的方式,其就是告诉了网络传输层,消息发送之后,可以进行写入操作了。进入到SocketServer中,此时我们知道SocketServer中包含接收器Acceptor和若干个处理器Processors,而此时的处理器Processor已经启动,因此可以看到run方法中会将启动完整,同时进行轮询,如果running的话,会执行配置新的连接、处理配置新的响应和poll操作、处理完整接收或者发送,处理断开连接操作。对相应的请求进行匹配,从而到kafkaApi中进行具体的处理,然后poll操作中在KafkaChannel中执行写入writeTo和读操作read,此时会将消息写入或者从中获取消息。
/**
* Top-level method that handles all requests and multiplexes to the right api
* 顶级方法,用于处理所有请求并多路复用到正确的api
*/
def handle(request: RequestChannel.Request) {
try {
//使用Scala的模式匹配,配置到每一个对应的请求,进行相应的处理
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
//匹配生产,处理生产请求
case ApiKeys.PRODUCE => handleProduceRequest(request)
//匹配获取消息,处理获取请求
case ApiKeys.FETCH => handleFetchRequest(request)
//匹配偏移量列表,处理偏移量列表请求
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
//匹配元数据,处理主体元数据请求
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
//匹配主和正在同步的副本 ISR:in sync replica 正在同步副本,AR: all replica 所有副本
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
//停止复制,处理停止复制请求
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
//更新元数据
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
//控制器关闭
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
//偏移量提交请求
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
//偏移量获取请求
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
//找到协调器
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
//加入组请求
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
//心跳请求
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
//离开组请求
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
//同步组请求
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
//处理描述组请求
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
//组列表请求
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
//处理Sasl握手请求
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
//创建主题请求
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
//删除主题请求
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
//删除消息请求
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
//初始化生产者id请求
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
//处理leaderEpoch请求的偏移
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
//添加分区到事务请求
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
//添加偏移量到事务请求
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
//创建新分区请求
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
//删除队请求
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
那我们需要弄懂的是,生产者和消费者、broker之间的联系。此时我们可以先弄清楚生产者和broker之间的联系。在SocketServer中,一个Acceptor中包含多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。Processor线程和Handler线程之间传递数据是通过RequestChannel完成的。在发送生产数据sendProducerData(now)中可以看到:
/**
* Create a produce request from the given record batches
* 从给定的消息批次创建生产请求
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//查找创建消息集时使用的最低魔数版本
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
//循环批次
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
/**
* 必要时向下转换为使用的最小魔法。 通常,在生产者开始构建批处理的时间与我们发送请求的时间之间可能会有延迟,
* 并且我们可能已根据过时的元数据选择了消息格式。 在最坏的情况下,我们乐观地选择使用新的消息格式,
* 但是发现代理不支持它,因此需要在客户端上进行下转换,然后再发送。 这旨在处理集群升级周围的极端情况,
* 在这些情况下,代理可能并不都支持相同的消息格式版本。
* 例如,如果分区从支持新魔术版本的代理迁移到不支持新魔术版本的代理,则我们将需要转换。
*/
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
//重要,请求构建 类似createProduceRequest
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
//处理生产响应
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
//新的客户端请求,放入请求 重要
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
//发送消息
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
发送消息的过程中会经历InFightReuqst:这个过程中会不断的充实ClientRequest
//进行消息发送
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
//节点id
String nodeId = clientRequest.destination();
//请求头信息
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), nodeId);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
}
}
//发送消息
Send send = request.toSend(nodeId, header);
//创建flight请求 航班中的要求
InFlightRequest inFlightRequest = new InFlightRequest(
header,
clientRequest.createdTimeMs(),
clientRequest.destination(),
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
request,
send,
now);
//添加,所有的客户端请求都会放入到InFlightRequest
this.inFlightRequests.add(inFlightRequest);
//执行发送方法
selector.send(inFlightRequest.send);
}
可以看到在KafkaChannel中会设置放送setSend,此时可以看到其在传输层添加了写入的操作动作,而根据Netty基于事件驱动的方式,其就是告诉了网络传输层,消息发送之后,可以进行写入操作了。此时会告知kafka的broker,此时需要执行I/O写入操作了。由于此前broke通常会在服务器中比生产者和消费者早启动,因此请求一来,就会进入到接收器中,而接收器根据请求,将请求分发到处理器中,而处理器通过请求通道经过kafkaRequestHandlerPool,将请求转发到kafkaApi业务逻辑处理器层中处理每一个请求。
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
//传输层添加操作,也即写入操作
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
进入到SocketServer中,此时我们知道SocketServer中包含接收器Acceptor和若干个处理器Processors,而此时的处理器Processor已经启动,因此可以看到run方法中会将启动完整,同时进行轮询,如果running的话,会执行配置新的连接、处理配置新的响应和poll操作、处理完整接收或者发送,处理断开连接操作:
SocketServer#Processor#run
//重要 启动
override def run() {
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug("Closing selector - processor " + id)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
而此时,我们可以看到处理新的响应中
processNewResponses-> sendResponse
//发送响应
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
val connectionId = response.request.context.connectionId
trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
// `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
if (channel(connectionId).isEmpty) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
response.request.updateRequestMetrics(0L, response)
}
// Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
// removed from the Selector after discarding any pending staged receives.
// `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
if (openOrClosingChannel(connectionId).isDefined) {
//执行选择器发送操作
selector.send(responseSend)
inflightResponses += (connectionId -> response)
}
}
而我们知道不管是发送消息还是消费消息都会执行重要的poll操作,因此可以看到poll操作中必然会执行读和写的操作。
发送响应:
/**
* Do actual reads and writes to sockets.
* 实际读取和写入套接字。
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
//处理中止发送
handleAbortedSends(responses);
//完整响应
completeResponses(responses);
return responses;
}
//元数据更新
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//执行poll操作
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
//处理完整操作
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//处理完整发送
handleCompletedSends(responses, updatedNow);
//处理完整接收
handleCompletedReceives(responses, updatedNow);
//处理不连接
handleDisconnections(responses, updatedNow);
//处理连接
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
//执行完整响应操作
completeResponses(responses);
return responses;
}
同时poll操作中会有写入和读的操作:
/**
* handle any ready I/O on a set of selection keys
* 在一组选择键上处理任何准备就绪的I/O
*
* @param selectionKeys set of keys to handle
* @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
* @param currentTimeNanos time at which set of keys was determined
*/
// package-private for testing
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
boolean sendFailed = false;
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
//完成所有已完成握手的连接(正常或立即)
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}
//如果通道尚未准备好,请完成准备
if (channel.isConnected() && !channel.ready()) {
try {
//准备通道
channel.prepare();
} catch (AuthenticationException e) {
sensors.failedAuthentication.record();
throw e;
}
if (channel.ready())
//进行认证消息
sensors.successfulAuthentication.record();
}
//尝试读取
attemptRead(key, channel);
if (channel.hasBytesBuffered()) {
keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
//如果通道已准备好,则写入缓冲区中有空间且有数据的任何套接字
if (channel.ready() && key.isWritable()) {
Send send = null;
try {
//进行写操作
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
}finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
在KafkaChannel中进行写和读操作:
//执行写入操作
public Send write() throws IOException {
Send result = null;
//重点 send(send)
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
//执行写入操作
private boolean send(Send send) throws IOException {
//执行写入操作,写入传输层操作,写完整后,移除操作key
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
在KafkaChannel中进行读操作:
//尝试读取
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
/**
* 如果通道已准备好,并且有要从套接字或缓冲区读取的字节,并且尚未暂存或没有进行任何先前的接收,则从该通道读取
*/
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
//如果网络接收不为空,也即读取操作不为空,则
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
//添加数据和通道
addToStagedReceives(channel, networkReceive);
}
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
//网络接收 读操作
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
//创建一个新的网络接收对象
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
//进行接收操作
receive(receive);
//如果接收完整,则将结果进行返回
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
} else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
//pool must be out of memory, mute ourselves.
//缓冲池oom,执行静止操作
mute();
}
return result;
}
//进行接收
private long receive(NetworkReceive receive) throws IOException {
return receive.readFrom(transportLayer);
}
可以看到读写是采用的批量式的:GatheringByteChannel和ScatteringByteChannel
public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}
//使用ByteBuffer的方式写入
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
//在通道中写入
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
//有待写
pending = TransportLayers.hasPendingWrites(channel);
return written;
}