背景:k8s集群中的kafka集群运行一段时间后,数据文件过多的情况下,重启会非常困难,经常需要花费数个小时来加载log文件,这是我们不能够接受的事情。

下面通过对kafka源码的分析,快照文件的工作原理,k8s的宽限时间机制来详细说明并从根源上解决这个问题

日志加载源码调用链

1.kafka启动以后会调用LogManager来做日志文件的加载

2.LogManager的类中调用了loadLogs方法

3.loadLogs方法中会以文件夹为单位(topic分区)循环遍历,调用loadLog方法

4.loadLog方法中调用loadSegments方法加载每个topic分区的日志段

5.日志加载的执行工作是由loadSegments方法中的completeSwapOperations来执行的

全部日志加载与恢复:loadLogs

所有日志的加载与恢复的流程主要包含以下几步:

  1. 加载并记录日志文件夹中标志状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
  2. 并发对每个 tp 的日志进行加载与恢复(下一小节详解)
  3. 记录并异步处理有问题的日志文件夹

/**

 * Recover and load all logs in the given data directories

 */

private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {

  // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,但是不一定所有的磁盘都是可用的

  info(s"Loading logs from log dirs $liveLogDirs")

  val startMs = time.hiResClockMs()

  val threadPools = ArrayBuffer.empty[ExecutorService]

  val offlineDirs = mutable.Set.empty[(String, IOException)]

  val jobs = ArrayBuffer.empty[Seq[Future[_]]]

  var numTotalLogs = 0

  // 遍历所有的磁盘,进行日志加载与恢复,如果出现 IOException,则将该目录记录到 offlineDirs 中进行后续处理

  for (dir <- liveLogDirs) {

 val logDirAbsolutePath = dir.getAbsolutePath

 var hadCleanShutdown: Boolean = false

 try {

   val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)

   threadPools.append(pool)

   // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。

   val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

   if (cleanShutdownFile.exists) {

     info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")

     // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile

     // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471

     Files.deleteIfExists(cleanShutdownFile.toPath)

     hadCleanShutdown = true

   else {

     // log recovery itself is being performed by `Log` class during initialization

     info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")

   }

   // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint

   var recoveryPoints = Map[TopicPartition, Long]()

   try {

     recoveryPoints = this.recoveryPointCheckpoints(dir).read()

   catch {

     case e: Exception =>

       warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +

         s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)

   }

   // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset

   var logStartOffsets = Map[TopicPartition, Long]()

   try {

     logStartOffsets = this.logStartOffsetCheckpoints(dir).read()

   catch {

     case e: Exception =>

       warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +

         s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)

   }

   // 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog

   val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>

     logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)

   val numLogsLoaded = new AtomicInteger(0)

   numTotalLogs += logsToLoad.length

   val jobsForDir = logsToLoad.map { logDir =>

     val runnable: Runnable = () => {

       try {

         debug(s"Loading log $logDir")

         val logLoadStartMs = time.hiResClockMs()

         val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)

         val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs

         val currentNumLoaded = numLogsLoaded.incrementAndGet()

         info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +

           s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")

       catch {

         case e: IOException =>

           offlineDirs.add((logDirAbsolutePath, e))

           error(s"Error while loading log dir $logDirAbsolutePath", e)

       }

     }

     runnable

   }

   jobs += jobsForDir.map(pool.submit)

 catch {

   case e: IOException =>

     offlineDirs.add((logDirAbsolutePath, e))

     error(s"Error while loading log dir $logDirAbsolutePath", e)

 }

  }

  try {

 // 等待所有并发执行的日志加载流程执行完成

 for (dirJobs <- jobs) {

   dirJobs.foreach(_.get)

 }

 // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作

 offlineDirs.foreach { case (dir, e) =>

   logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)

 }

  catch {

 case e: ExecutionException =>

   error(s"There was an error in one of the threads during logs loading: ${e.getCause}")

   throw e.getCause

  finally {

 threadPools.foreach(_.shutdown())

  }

  info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")

}

单 tp 日志加载与恢复

单个 tp 的日志加载与恢复是在 Log 类的静态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,加入到 logsToBeDeleted 集合中等待定时任务对其进行清理。
Log 类的静态代码块中通过 loadSegments 加载日志

private def loadSegments(): Long = {

  // 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件

  val swapFiles = removeTempFilesAndCollectSwapFiles()

  // retryOnOffsetOverflow 兜住可能发生的 LogSegmentOffsetOverflowException 异常,并进行日志切分处理。

  retryOnOffsetOverflow {

    // 加载文件的中的所有文件并进行必要的完整性检查

    logSegments.foreach(_.close())

    segments.clear()

    loadSegmentFiles()

  }

  // 根据 swap 文件恢复完成所有被中断的操作

  completeSwapOperations(swapFiles)

  // 如果不是待删除的 tp 日志,执行 recover 流程

  if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {

    val nextOffset = retryOnOffsetOverflow {

      recoverLog()

    }

    // reset the index size of the currently active log segment to allow more entries

    activeSegment.resizeIndexes(config.maxIndexSize)

    nextOffset

  else {

     if (logSegments.isEmpty) {

        addSegment(LogSegment.open(dir = dir,

          baseOffset = 0,

          config,

          time = time,

          initFileSize = this.initFileSize))

     }

    0

  }

}

recoverLog 的核心代码如下:

// if we have the clean shutdown marker, skip recovery

// 只有未进行 cleanshutdown 的情况下才需要 recovery

if (!hadCleanShutdown) {

  // 取出 recoveryPoint 之后的所有 segment(正常情况下只有一个)

  val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator

  var truncated = false

  while (unflushed.hasNext && !truncated) {

    val segment = unflushed.next()

    info(s"Recovering unflushed segment ${segment.baseOffset}")

    val truncatedBytes =

      try {

        // 清空 segment 对应的 index,逐个 batch 读取校验数据,并重新构造index

        recoverSegment(segment, leaderEpochCache)

      catch {

        case _: InvalidOffsetException =>

          val startOffset = segment.baseOffset

          warn("Found invalid offset during recovery. Deleting the corrupt segment and " +

            s"creating an empty one with starting offset $startOffset")

          segment.truncateTo(startOffset)

      }

    if (truncatedBytes > 0) {

      // 如果前一个 segment 执行了 truncate, 则之后的所有 segment 直接删除

      // unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全部 segment

      warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")

      removeAndDeleteSegments(unflushed.toList,

        asyncDelete = true,

        reason = LogRecovery)

      truncated = true

    }

  }

}

 
completeSwapOperations核心代码:

private def completeSwapOperations(swapFiles: Set[File]): Unit = {

  for (swapFile <- swapFiles) {

    val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))

    val baseOffset = offsetFromFile(logFile)

    val swapSegment = LogSegment.open(swapFile.getParentFile,

      baseOffset = baseOffset,

      config,

      time = time,

      fileSuffix = SwapFileSuffix)

    info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")

    //恢复日志段

    recoverSegment(swapSegment)

    // We create swap files for two cases:

    // (1) Log cleaning where multiple segments are merged into one, and

    // (2) Log splitting where one segment is split into multiple.

    //

    // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment

    // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion

    // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to

    // do a replace with an existing segment.

    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>

      segment.readNextOffset > swapSegment.baseOffset

    }

    replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)

  }

}

recoverSegment代码段:

private def recoverSegment(segment: LogSegment,

                           leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {

  val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)

  //重新定义生产者状态(是否是不干净的关闭)

  rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)

  val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)

  // once we have recovered the segment's data, take a snapshot to ensure that we won't

  // need to reload the same segment again while recovering another segment.

  producerStateManager.takeSnapshot()

  bytesTruncated

}

rebuildProducerState代码:

// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be

// free of all side-effects, i.e. it must not update any log-specific state.

private def rebuildProducerState(lastOffset: Long,

                                 reloadFromCleanShutdown: Boolean,

                                 producerStateManager: ProducerStateManager): Unit = lock synchronized {

  checkIfMemoryMappedBufferClosed()

  val messageFormatVersion = config.messageFormatVersion.recordVersion.value

  val segments = logSegments

  val offsetsToSnapshot =

    if (segments.nonEmpty) {

      val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset)

      Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset))

    else {

      Seq(Some(lastOffset))

    }

  info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion")

  // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being

  // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,

  // but we have to be careful not to assume too much in the presence of broker failures. The two most common

  // upgrade cases in which we expect to find no snapshots are the following:

  //

  // 1. The broker has been upgraded, but the topic is still on the old message format.

  // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.

  //

  // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end

  // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot

  // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state

  // from the first segment.

  //我们希望避免在代理运行时不必要地扫描日志以构建生产者状态升级

  //基本思想是利用生产者快照文件的缺失来检测升级案例,但是我们必须小心,不要在代理失败的情况下做太多假设

  //最常见的两种升级的情况下,我们希望找到没有快照如下:

  //1.代理已经升级,但是主题仍然是关于旧的消息格式

  //2.代理已经升级,主题是关于新的消息格式,并且我们已经干净地关闭了

  //如果遇到上述任何一种情况,我们将跳过生产者状态加载,并在日志端写入一个新的快照偏移量(见下)

  //下次重新加载日志时,我们将使用此快照加载生产者状态(或以后的快照)

  //否则,如果没有快照文件,则必须重新构建生产者状态从第一个片段。

  if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||

      (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {

    // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the

    // last two segments and the last offset. This should avoid the full scan in the case that the log needs

    // truncation.

    offsetsToSnapshot.flatten.foreach { offset =>

      producerStateManager.updateMapEndOffset(offset)

      producerStateManager.takeSnapshot()

    }

  else {

    val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset

    producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

    // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end

    // offset (which would be the case on first startup) and there were active producers prior to truncation

    // (which could be the case if truncating after initial loading). If there weren't, then truncating

    // shouldn't change that fact (although it could cause a producerId to expire earlier than expected),

    // and we can skip the loading. This is an optimization for users which are not yet using

    // idempotent/transactional features yet.

    if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {

      val segmentOfLastOffset = floorLogSegment(lastOffset)

      logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>

        val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)

        producerStateManager.updateMapEndOffset(startOffset)

        if (offsetsToSnapshot.contains(Some(segment.baseOffset)))

          producerStateManager.takeSnapshot()

        val maxPosition = if (segmentOfLastOffset.contains(segment)) {

          Option(segment.translateOffset(lastOffset))

            .map(_.position)

            .getOrElse(segment.size)

        else {

          segment.size

        }

        val fetchDataInfo = segment.read(startOffset,

          maxSize = Int.MaxValue,

          maxPosition = maxPosition,

          minOneMessage = false)

        if (fetchDataInfo != null)

          loadProducersFromLog(producerStateManager, fetchDataInfo.records)

      }

    }

    producerStateManager.updateMapEndOffset(lastOffset)

    producerStateManager.takeSnapshot()

  }

}

.snapshot文件工作原理

snapshot文件内容:记载着生产者最新的offset

 1.每次日志翻滚时都会生成一个snapshot文件

 2.使用kafka-server-stop.sh关闭时,会生成一个snapshot文件,是kafka重新启动时,快速加载的重要保证

 3.重新启动时,kafka只会保留最近的3个snapshot文件。

k8s宽限时间概念

什么是 terminationGracePeriodSeconds?
解释这个参数之前,先来回忆一下K8S滚动升级的步骤:
1.K8S首先启动新的POD
2.K8S等待新的POD进入Ready状态
3.K8S创建Endpoint,将新的POD纳入负载均衡
4.K8S移除与老POD相关的Endpoint,并且将老POD状态设置为Terminating,此时将不会有新的请求到达老POD
5.同时 K8S 会给老POD发送SIGTERM信号,并且等待 terminationGracePeriodSeconds 这么长的时间。(默认为30秒)
6.超过terminationGracePeriodSeconds等待时间后, K8S 会强制结束老POD
看到这里,我想大家应该明白了,terminationGracePeriodSeconds 就是K8S给你程序留的最后的缓冲时间,来处理关闭之前的操作。

总结:宽限时间(terminationGracePeriodSeconds )可以定义优雅关闭的宽限期,即在收到停止请求后,
有多少时间来进行资源释放或者做其它操作,如果到了最大时间还没有停止,会被强制结束。

示例:

apiVersion: v1

kind: Pod

metadata:

  name: busybox

spec:

  containers:

    - name: busybox

      image: busybox:stable

      command: ["/bin/sh""-c""sleep 3600"]

  terminationGracePeriodSeconds: 5

如何完美解决k8s中kafka重启日志加载慢的问题

通过对上面三块内容的了解,kafka重启日志加载慢问题的根本原因已经展现出来了。那就是如果kafka没有能在默认宽限时间内生成snapshot文件,启动时就会挨个加载所有的日志段(segment),这是我们不能够接受的事情。那么我们如何设置宽限时间terminationGracePeriodSeconds的大小呢?

K8S 会给老POD发送SIGTERM信号,并且等待 terminationGracePeriodSeconds 这么长的时间(默认为30秒)。也就是说如果kafka程序在宽限时间以内结束了,还没有达到30秒的时候,kafka退出了,pod也是能够自动退出的。terminationGracePeriodSeconds 只是保证pod强制退出,减少僵尸pod的保障,那么我们只要将terminationGracePeriodSeconds 设置成为我们能够接受的最大值即可

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐