Kafka日志清理机制


发布于 2024-08-08 / 24 阅读 / 0 评论 /
本文基于kafka3.6.0源码,描述kafka日志段的清理机制

1.清理策略说明

清理机制由log.cleanup.policy参数配置,这是一个服务端参数。我们还可以为每个topic配置对应的cleanup.policy参数。

目前清理策略有两种——delete和compact。

2.delete清理机制

如果cleanup.policy为delete,则LogManager会对过期的日志段进行删除。

delete机制对应的模型如下图所示:

清理机制由LogManager来实现,在KafkaServer启动时启动对应的清理线程。

KafkaServer.startup
    创建LogManager实例
    LogManager.startup // 启动日志管理
        LogManager.startupWithConfigOverrides
            KafkaScheduler.schedule("kafka-log-retention", LogManager.cleanupLogs()) // 延迟时间固定为30秒,调度周期由log.retention.check.interval.ms决定,默认为5分钟
            KafkaScheduler.schedule("kafka-log-flusher", LogManager.flushDirtyLogs()) // 延迟时间固定为30秒,调度周期由log.flush.scheduler.interval.ms决定,默认为Long.MAX_VALUE
            KafkaScheduler.schedule("kafka-recovery-point-checkpoint", LogManager.checkpointLogRecoveryOffsets()) // 延迟时间固定为30秒,
            KafkaScheduler.schedule("kafka-log-start-offset-checkpoint", LogManager.checkpointLogStartOffsets()) // 延迟时间固定为30秒,
            KafkaScheduler.schedule("kafka-delete-logs", LogManager.deleteLogs()) // 延迟时间固定为30秒,仅调度一次。当有新的log需要被删除,会被再次动态调度
            如果log.cleaner.enable配置为true(默认值为true)
                创建LogCleaner实例
                LogCleaner.startup // 启动LogCleaner实例
                    创建清理线程CleanerThread,线程数由log.cleaner.threads配置决定,默认值为1,线程名称为kafka-log-cleaner-thread-${threadId}
                    CleanerThread.startup // 启动每个CleanerThread线程
                        在任务执行过程中,如果有发现没有需要被清理的log,则调用CountDownLatch.await,线程进入等待,等待时间为由cleaner.backoff.ms参数决定,默认值为15秒。等待完成后,继续往下执行。
                        LogCleanerManager.maintainUncleanablePartitions // 执行清理

具体的清理工作由LogManager.cleanupLogs函数来完成。

LogManager.cleanupLogs
    从当前内存中的“分区-日志目录”映射中过滤出cleanup.policy为delete的分区
    deletableLogs = currentLogs.filter { !log.config.compact }
    遍历deletableLogs,执行删除操作
        UnifiedLog.deleteOldSegments
            如果如果分区的cleanup.policy为delete
                UnifiedLog.deleteLogStartOffsetBreachedSegments()
                    // 删除baseOffset小于logStartOffset的日志段
                    // 如果当前日志段为active LogSegment,则不清理。
                UnifiedLog.deleteRetentionSizeBreachedSegments()
                    // 根据保留大小进行清理,与retention.bytes有关
                UnifiedLog.deleteRetentionMsBreachedSegments()
                    // 根据保留时间进行清理,与retention.ms有关
            如果如果分区的cleanup.policy为delete
                UnifiedLog.deleteLogStartOffsetBreachedSegments()
                    // 删除baseOffset小于logStartOffset的日志段
                    // 如果当前日志段为active LogSegment,则不清理。

这里涉及到对应retention.ms和retention.bytes参数的比较。

3.compact压缩机制

compact会将同一个key的消息进行聚合,仅保留最后一条数据,其他数据全部无效,会被清理。

compact机制对应的模型如下图所示:

仅保留同一个key值对应的最大offset的value值。

kafka集群内置__consumer_offsets的清理策略是compact。