发布2020-11-03 15:57:45

前面我们知道其重要的启动方法里面有关的方法:它的注释是非常具有启发性的, 启动API,以启动Kafka服务器的单个实例。实例化LogManager,SocketServer和请求处理程序-KafkaRequestHandlers。也即告诉我们它会启动kafka服务实例,同时实例是单个的,同时里面实例化日志管理器、socket服务器、kafka请求处理器。而启动kafka服务中,不免会启动kafka控制器,而kafka控制器中涉及到主从broker服务、主从副本、状态机、监听、重平衡以及与其他broker通信。下面我们来看日志相关的信息:

logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)

object LogManager {

  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
  //生产者id过期检查间隔时间 10分钟
  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000

  def apply(config: KafkaConfig,
            initialOfflineDirs: Seq[String],
            zkClient: KafkaZkClient,
            brokerState: BrokerState,
            kafkaScheduler: KafkaScheduler,
            time: Time,
            brokerTopicStats: BrokerTopicStats,
            logDirFailureChannel: LogDirFailureChannel): LogManager = {
    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
    val defaultLogConfig = LogConfig(defaultProps)

    // read the log configurations from zookeeper
    val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
    if (!failed.isEmpty) throw failed.head._2

    val cleanerConfig = LogCleaner.cleanerConfig(config)

    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
      topicConfigs = topicConfigs,
      initialDefaultConfig = defaultLogConfig,
      cleanerConfig = cleanerConfig,
      recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
      flushCheckMs = config.logFlushSchedulerIntervalMs,
      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
      retentionCheckMs = config.logCleanupIntervalMs,
      maxPidExpirationMs = config.transactionIdExpirationMs,
      scheduler = kafkaScheduler,
      brokerState = brokerState,
      brokerTopicStats = brokerTopicStats,
      logDirFailureChannel = logDirFailureChannel,
      time = time)

class LogManager(logDirs: Seq[File],
                 initialOfflineDirs: Seq[File],
                 val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
                 val initialDefaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 recoveryThreadsPerDataDir: Int,
                 val flushCheckMs: Long,
                 val flushRecoveryOffsetCheckpointMs: Long,
                 val flushStartOffsetCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 val maxPidExpirationMs: Int,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 brokerTopicStats: BrokerTopicStats,
                 logDirFailureChannel: LogDirFailureChannel,
                 time: Time) extends Logging with KafkaMetricsGroup {
  private val currentLogs = new Pool[TopicPartition, Log]() 
  private val futureLogs = new Pool[TopicPartition, Log]()

//日志实际的段 =>日志段
class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

class LogSegment private[log] (val log: FileRecords,
                               val offsetIndex: OffsetIndex,
                               val timeIndex: TimeIndex,
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val maxSegmentMs: Long,
                               val maxSegmentBytes: Int,
                               val time: Time) extends Logging {
   * Append the given messages starting with the given offset. Add
   * an entry to the index if needed.
   * It is assumed this method is being called from within a lock.
   * 从给定的偏移量开始附加给定的消息。 如果需要,将一个条目添加到索引。 假定正在从锁内调用此方法。 重点 线程不安全
   * @param firstOffset The first offset in the message set.
   * @param largestOffset The last offset in the message set
   * @param largestTimestamp The largest timestamp in the message set.
   * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
   * @param records The log entries to append.
   * @return the physical position in the file of the appended records
  def append(firstOffset: Long,
             largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
          .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)
      // append the messages
      require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
      //做日志拼接   重要
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
      // Update the in memory max timestamp and corresponding offset.
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
      // append an entry to the index (if needed)
      //拼接entry到index索引中   重要
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(firstOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
        bytesSinceLastIndexEntry = 0
      bytesSinceLastIndexEntry += records.sizeInBytes


 * Append log batches to the buffer 将日志批次追加到缓冲区
 * @param records The records to append
 * @return the number of bytes written to the underlying file
public int append(MemoryRecords records) throws IOException {
    int written = records.writeFullyTo(channel);
    return written;

     * Write all records to the given channel (including partial records).
     * 将所有记录写入给定通道(包括部分记录)。
     * @param channel The channel to write to
     * @return The number of bytes written
     * @throws IOException For any IO errors writing to the channel
    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
        int written = 0;
        while (written < sizeInBytes())
            written += channel.write(buffer);
        return written;


  * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
  * 将给定偏移量/位置对的条目追加到索引。 该条目的偏移量必须大于所有后续条目的偏移量。
def append(offset: Long, position: Int) {
  inLock(lock) {
    require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    if (_entries == 0 || offset > _lastOffset) {
      trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
      mmap.putInt((offset - baseOffset).toInt)
      _entries += 1
      _lastOffset = offset
      require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
    } else {
      throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
        s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")

  protected var mmap: MappedByteBuffer = {
    val newlyCreated = file.createNewFile()
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
    try {
      /* pre-allocate the file if necessary */
      if(newlyCreated) {
        if(maxIndexSize < entrySize)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))

      /* memory-map the file */
      _length = raf.length()
      val idx = {
        if (writable)
          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
      /* set the position in the index for the next entry */
        // if this is a pre-existing index, assume it is valid and set position to last entry
        idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
    } finally {
      CoreUtils.swallow(raf.close(), this)




其中FileMessageSet通过设定segment之内的start和end来读取segment内的文件,OffsetIndex是Segment里面的Message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引。因此查找每一条记录的话,如果给定topic和offset,则分两步完成:快速定位segmentFile,segment file中查找msg trunk。


 * Helper functions for logs
  * 日志的辅助功能
object Log {

  /** a log file */
  //日志文件后缀 .log
  val LogFileSuffix = ".log"

  /** an index file */
  val IndexFileSuffix = ".index"

  /** a time index file */
  val TimeIndexFileSuffix = ".timeindex"

  val ProducerSnapshotFileSuffix = ".snapshot"

  /** an (aborted) txn index */
  val TxnIndexFileSuffix = ".txnindex"

  /** a file that is scheduled to be deleted */
  // 计划删除的文件
  val DeletedFileSuffix = ".deleted"

  /** A temporary file that is being used for log cleaning */
  // 清理文件后缀 用于日志清理的临时文件
  val CleanedFileSuffix = ".cleaned"

  /** A temporary file used when swapping files into the log */
  val SwapFileSuffix = ".swap"

  /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
   * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
   * avoided by passing in the recovery point, however finding the correct position to do this
   * requires accessing the offset index which may not be safe in an unclean shutdown.
   * For more information see the discussion in PR#2104
  val CleanShutdownFile = ".kafka_cleanshutdown"

  /** a directory that is scheduled to be deleted */
  // 计划删除的目录
  val DeleteDirSuffix = "-delete"

  /** a directory that is used for future partition */
  // 用于future分区的目录
  val FutureDirSuffix = "-future"
  private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
  private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix")


 * A {@link Records} implementation backed by a file. An optional start and end position can be applied to this
 * instance to enable slicing a range of the log records.
 * 由文件支持的{@link Records}实现。 可以将可选的开始位置和结束位置应用于此实例,以允许对一系列日志记录进行切片
public class FileRecords extends AbstractRecords implements Closeable {
    private final boolean isSlice;
    private final int start;
    private final int end;

    private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;

    // mutable state
    private final AtomicInteger size;
    private final FileChannel channel;
    private volatile File file;

     * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible.
     * The constructor is visible for tests.
     * 尽可能使用{@code FileRecords.open}方法代替此构造方法。该构造方法对于测试是可见的。
    public FileRecords(File file,
                       FileChannel channel,
                       int start,
                       int end,
                       boolean isSlice) throws IOException {
        this.file = file;
        this.channel = channel;
        this.start = start;
        this.end = end;
        this.isSlice = isSlice;
        this.size = new AtomicInteger();

        if (isSlice) {
            // don't check the file size if this is just a slice view
            size.set(end - start);
        } else {
            int limit = Math.min((int) channel.size(), end);
            size.set(limit - start);

            // if this is not a slice, update the file pointer to the end of the file
            // set the file position to the last byte in the file

        batches = batchesFrom(start);


 * Find the largest offset less than or equal to the given targetOffset
 * and return a pair holding this offset and its corresponding physical file position.
  * 查找小于或等于给定targetOffset的最大偏移量,并返回一对包含该偏移量及其对应物理文件位置的偏移量。
 * @param targetOffset The offset to look up.
 * @return The offset found and the corresponding file position for this offset
 *         If the target offset is smaller than the least entry in the index (or the index is empty),
 *         the pair (baseOffset, 0) is returned.
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
    extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
def lookup(targetOffset: Long): OffsetPosition = {
  maybeLock(lock) {
    val idx = mmap.duplicate
    val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
    if(slot == -1)
      OffsetPosition(baseOffset, 0)
      parseEntry(idx, slot).asInstanceOf[OffsetPosition]

case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
  override def indexKey = offset
  override def indexValue = position.toLong

  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
      OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))

   * Get the nth offset mapping from the index 从索引获取第n个偏移量映射
   * @param n The entry number in the index
   * @return The offset/position pair at that entry
  def entry(n: Int): OffsetPosition = {
    maybeLock(lock) {
      if(n >= _entries)
        throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
          s"which has size ${_entries}.")
      val idx = mmap.duplicate
      OffsetPosition(relativeOffset(idx, n), physical(idx, n))


 * Lookup lower and upper bounds for the given target. 查找给定目标的上下限。
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
  // check if the index is empty
  if(_entries == 0)
    return (-1, -1)

  // check if the target offset is smaller than the least offset
  if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
    return (-1, 0)

  // binary search for the entry
  var lo = 0
  var hi = _entries - 1
  while(lo < hi) {
    val mid = ceil(hi/2.0 + lo/2.0).toInt
    val found = parseEntry(idx, mid)
    val compareResult = compareIndexEntry(found, target, searchEntity)
    if(compareResult > 0)
      hi = mid - 1
    else if(compareResult < 0)
      lo = mid
      return (mid, mid)

  (lo, if (lo == _entries - 1) -1 else lo + 1)


  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
    (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
  @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
    (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap

  private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()

 *  Start the background threads to flush logs and do log cleanup
 *  启动backgroud线程去刷新日志和操作log清理
def startup() {
  /* Schedule the cleanup task to delete old logs */
  if (scheduler != null) {
    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
                       cleanupLogs _,
                       delay = InitialTaskDelayMs,
                       period = retentionCheckMs,
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
                       flushDirtyLogs _,
                       delay = InitialTaskDelayMs,
                       period = flushCheckMs,
                       checkpointLogRecoveryOffsets _,
                       delay = InitialTaskDelayMs,
                       period = flushRecoveryOffsetCheckpointMs,
                       checkpointLogStartOffsets _,
                       delay = InitialTaskDelayMs,
                       period = flushStartOffsetCheckpointMs,
    scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                       deleteLogs _,
                       delay = InitialTaskDelayMs,
                       unit = TimeUnit.MILLISECONDS)
  if (cleanerConfig.enableCleaner)


object Message {

   * The current offset and size for all the fixed-length fields
  val CrcOffset = 0
  val CrcLength = 4
  val MagicOffset = CrcOffset + CrcLength
  val MagicLength = 1
  val AttributesOffset = MagicOffset + MagicLength
  val AttributesLength = 1
  // Only message format version 1 has the timestamp field.
  val TimestampOffset = AttributesOffset + AttributesLength
  val TimestampLength = 8
  val KeySizeOffset_V0 = AttributesOffset + AttributesLength
  val KeySizeOffset_V1 = TimestampOffset + TimestampLength
  val KeySizeLength = 4
  val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
  val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
  val ValueSizeLength = 4


