前面我们知道其重要的启动方法里面有关的方法:它的注释是非常具有启发性的, 启动API,以启动Kafka服务器的单个实例。实例化LogManager,SocketServer和请求处理程序-KafkaRequestHandlers。也即告诉我们它会启动kafka服务实例,同时实例是单个的,同时里面实例化日志管理器、socket服务器、kafka请求处理器。而启动kafka服务中,不免会启动kafka控制器,而kafka控制器中涉及到主从broker服务、主从副本、状态机、监听、重平衡以及与其他broker通信。下面我们来看日志相关的信息:
//日志管理器启动
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
//查看LogManager对象
object LogManager {
//恢复点检查点文件
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
//日志起始偏移量检查点文件
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
//生产者id过期检查间隔时间 10分钟
val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
//apply方法
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel): LogManager = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
// read the log configurations from zookeeper
//从zookeeper中读取日志配置
val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
if (!failed.isEmpty) throw failed.head._2
val cleanerConfig = LogCleaner.cleanerConfig(config)
//创建LogManager对象,里面有日志目录
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
topicConfigs = topicConfigs,
initialDefaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxPidExpirationMs = config.transactionIdExpirationMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
time = time)
}
}
//日志管理
@threadsafe
class LogManager(logDirs: Seq[File],
initialOfflineDirs: Seq[File],
val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
val initialDefaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
recoveryThreadsPerDataDir: Int,
val flushCheckMs: Long,
val flushRecoveryOffsetCheckpointMs: Long,
val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
scheduler: Scheduler,
val brokerState: BrokerState,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup {
//当前日志
private val currentLogs = new Pool[TopicPartition, Log]()
//future日志
private val futureLogs = new Pool[TopicPartition, Log]()
}
//查看Log中的文件
//日志实际的段 =>日志段
@threadsafe
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
}
//文件消息、偏移量索引、时间戳索引、事务索引、基准偏移量、索引间隔字节、扰动值时间、最大段时间、时间
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
val offsetIndex: OffsetIndex,
val timeIndex: TimeIndex,
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val maxSegmentMs: Long,
val maxSegmentBytes: Int,
val time: Time) extends Logging {
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
* 从给定的偏移量开始附加给定的消息。 如果需要,将一个条目添加到索引。 假定正在从锁内调用此方法。 重点 线程不安全
*
* @param firstOffset The first offset in the message set.
* @param largestOffset The last offset in the message set
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
* @return the physical position in the file of the appended records
*/
@nonthreadsafe
def append(firstOffset: Long,
largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// append the messages
//拼接消息
require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
//做日志拼接 重要
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
//拼接entry到index索引中 重要
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
}
执行拼接操作log.append(records):
/**
* Append log batches to the buffer 将日志批次追加到缓冲区
* @param records The records to append
* @return the number of bytes written to the underlying file
*/
public int append(MemoryRecords records) throws IOException {
int written = records.writeFullyTo(channel);
size.getAndAdd(written);
return written;
}
/**
* Write all records to the given channel (including partial records).
* 将所有记录写入给定通道(包括部分记录)。
* @param channel The channel to write to
* @return The number of bytes written
* @throws IOException For any IO errors writing to the channel
*/
public int writeFullyTo(GatheringByteChannel channel) throws IOException {
buffer.mark();
int written = 0;
while (written < sizeInBytes())
written += channel.write(buffer);
buffer.reset();
return written;
}
拼接操作offsetIndex.append偏移量索引拼接操作:
/**
* Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
* 将给定偏移量/位置对的条目追加到索引。 该条目的偏移量必须大于所有后续条目的偏移量。
*/
def append(offset: Long, position: Int) {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
if (_entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
mmap.putInt((offset - baseOffset).toInt)
mmap.putInt(position)
_entries += 1
_lastOffset = offset
require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
}
}
}
//MappedByteBuffer
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
_length = raf.length()
val idx = {
if (writable)
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close(), this)
}
}
下面我们来看看日志:
一个主题对应多个分区,而一个分区又对应一个文件夹目录,而一个文件夹目录中有一个log文件和两个索引IndexFile和时间戳文件,甚至如果启用事务会有事务的相关文件。
日志段,日志的一部分。每个段都有两个组成部分:日志和索引。日志是包含实际消息的FileMessageSet。索引是一个OffsetIndex,它从逻辑偏移量映射到物理文件位置。kafka创建topic时可以指定topic的分区个数,每个broker按照自己分到的topic的分区创建对应的log,其中每个log由多个LogSement组成,每个LogSement以LogSement的第一条message索引供segments管理。
其中FileMessageSet通过设定segment之内的start和end来读取segment内的文件,OffsetIndex是Segment里面的Message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引。因此查找每一条记录的话,如果给定topic和offset,则分两步完成:快速定位segmentFile,segment file中查找msg trunk。
日志相关参数信息:
/**
* Helper functions for logs
* 日志的辅助功能
*/
object Log {
/** a log file */
//日志文件后缀 .log
val LogFileSuffix = ".log"
/** an index file */
//索引文件后缀
val IndexFileSuffix = ".index"
/** a time index file */
//时间索引文件后缀
val TimeIndexFileSuffix = ".timeindex"
//生产者快照文件后缀
val ProducerSnapshotFileSuffix = ".snapshot"
/** an (aborted) txn index */
//(中止的)txn索引
val TxnIndexFileSuffix = ".txnindex"
/** a file that is scheduled to be deleted */
// 计划删除的文件
val DeletedFileSuffix = ".deleted"
/** A temporary file that is being used for log cleaning */
// 清理文件后缀 用于日志清理的临时文件
val CleanedFileSuffix = ".cleaned"
/** A temporary file used when swapping files into the log */
//将文件交换到日志时使用的临时文件
val SwapFileSuffix = ".swap"
/** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*/
val CleanShutdownFile = ".kafka_cleanshutdown"
/** a directory that is scheduled to be deleted */
// 计划删除的目录
val DeleteDirSuffix = "-delete"
/** a directory that is used for future partition */
// 用于future分区的目录
val FutureDirSuffix = "-future"
//删除目录正则
private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
//future目录正则
private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")
文件消息
/**
* A {@link Records} implementation backed by a file. An optional start and end position can be applied to this
* instance to enable slicing a range of the log records.
* 由文件支持的{@link Records}实现。 可以将可选的开始位置和结束位置应用于此实例,以允许对一系列日志记录进行切片
*/
public class FileRecords extends AbstractRecords implements Closeable {
//是否分片
private final boolean isSlice;
//开始
private final int start;
//最终
private final int end;
//批次
private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;
// mutable state
private final AtomicInteger size;
//通道
private final FileChannel channel;
//文件
private volatile File file;
/**
* The {@code FileRecords.open} methods should be used instead of this constructor whenever possible.
* The constructor is visible for tests.
* 尽可能使用{@code FileRecords.open}方法代替此构造方法。该构造方法对于测试是可见的。
*/
public FileRecords(File file,
FileChannel channel,
int start,
int end,
boolean isSlice) throws IOException {
this.file = file;
this.channel = channel;
this.start = start;
this.end = end;
this.isSlice = isSlice;
this.size = new AtomicInteger();
if (isSlice) {
// don't check the file size if this is just a slice view
size.set(end - start);
} else {
int limit = Math.min((int) channel.size(), end);
size.set(limit - start);
// if this is not a slice, update the file pointer to the end of the file
// set the file position to the last byte in the file
channel.position(limit);
}
batches = batchesFrom(start);
}
OffsetIndex:
/**
* Find the largest offset less than or equal to the given targetOffset
* and return a pair holding this offset and its corresponding physical file position.
* 查找小于或等于给定targetOffset的最大偏移量,并返回一对包含该偏移量及其对应物理文件位置的偏移量。
*
* @param targetOffset The offset to look up.
* @return The offset found and the corresponding file position for this offset
* If the target offset is smaller than the least entry in the index (or the index is empty),
* the pair (baseOffset, 0) is returned.
*/
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot).asInstanceOf[OffsetPosition]
}
}
}
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
override def indexKey = offset
override def indexValue = position.toLong
}
override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
/**
* Get the nth offset mapping from the index 从索引获取第n个偏移量映射
* @param n The entry number in the index
* @return The offset/position pair at that entry
*/
def entry(n: Int): OffsetPosition = {
maybeLock(lock) {
if(n >= _entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
s"which has size ${_entries}.")
val idx = mmap.duplicate
OffsetPosition(relativeOffset(idx, n), physical(idx, n))
}
}
对日志段进行具体查询,查询哪一个文件
/**
* Lookup lower and upper bounds for the given target. 查找给定目标的上下限。
*/
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
return (-1, -1)
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
// binary search for the entry
var lo = 0
var hi = _entries - 1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
日志管理启动:
//恢复点检查点文件
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
//日志起始偏移量检查点文件
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
/**
* Start the background threads to flush logs and do log cleanup
* 启动backgroud线程去刷新日志和操作log清理
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
//定时清理任务:删除旧日志
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
//日志合并,把小的多个logSegment合并为大的一个logsegment
if (cleanerConfig.enableCleaner)
cleaner.startup()
}
消息组成:
object Message {
/**
* The current offset and size for all the fixed-length fields
*/
val CrcOffset = 0
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
// Only message format version 1 has the timestamp field.
val TimestampOffset = AttributesOffset + AttributesLength
val TimestampLength = 8
val KeySizeOffset_V0 = AttributesOffset + AttributesLength
val KeySizeOffset_V1 = TimestampOffset + TimestampLength
val KeySizeLength = 4
val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
val ValueSizeLength = 4
}
总结:kafka中,一个主题topic中包含多个分区,而一个borker通常会分到多个分区,而特定的分区又对应多个日志目录,因此日志目录中存在日志段的概念,而一个日志段对应一个日志文件和一个日志索引文件和时间戳索引文件。通常如果存在事务的话,还有事务的文件和中止文件。由于kafka中,存在日志段的概念,因此,其采用跳跃表的方式定位到具体的区间,从而定位具体的哪一个日志文件。其所建的索引是采用间隔的方式建立的,也即通过建稀疏索引的方式定位到具体的日志文件。每一个LogSegment以第一个message为索引提供segment管理。在原来的版本中,LogSegment采用FileMessageSet和start、end进行确定,而现在则是采用FileMessageRecord.