1.清理策略说明
清理机制由log.cleanup.policy参数配置,这是一个服务端参数。我们还可以为每个topic配置对应的cleanup.policy参数。
目前清理策略有两种——delete和compact。
2.delete清理机制
如果cleanup.policy为delete,则LogManager会对过期的日志段进行删除。
delete机制对应的模型如下图所示:
清理机制由LogManager来实现,在KafkaServer启动时启动对应的清理线程。
KafkaServer.startup
创建LogManager实例
LogManager.startup // 启动日志管理
LogManager.startupWithConfigOverrides
KafkaScheduler.schedule("kafka-log-retention", LogManager.cleanupLogs()) // 延迟时间固定为30秒,调度周期由log.retention.check.interval.ms决定,默认为5分钟
KafkaScheduler.schedule("kafka-log-flusher", LogManager.flushDirtyLogs()) // 延迟时间固定为30秒,调度周期由log.flush.scheduler.interval.ms决定,默认为Long.MAX_VALUE
KafkaScheduler.schedule("kafka-recovery-point-checkpoint", LogManager.checkpointLogRecoveryOffsets()) // 延迟时间固定为30秒,
KafkaScheduler.schedule("kafka-log-start-offset-checkpoint", LogManager.checkpointLogStartOffsets()) // 延迟时间固定为30秒,
KafkaScheduler.schedule("kafka-delete-logs", LogManager.deleteLogs()) // 延迟时间固定为30秒,仅调度一次。当有新的log需要被删除,会被再次动态调度
如果log.cleaner.enable配置为true(默认值为true)
创建LogCleaner实例
LogCleaner.startup // 启动LogCleaner实例
创建清理线程CleanerThread,线程数由log.cleaner.threads配置决定,默认值为1,线程名称为kafka-log-cleaner-thread-${threadId}
CleanerThread.startup // 启动每个CleanerThread线程
在任务执行过程中,如果有发现没有需要被清理的log,则调用CountDownLatch.await,线程进入等待,等待时间为由cleaner.backoff.ms参数决定,默认值为15秒。等待完成后,继续往下执行。
LogCleanerManager.maintainUncleanablePartitions // 执行清理
具体的清理工作由LogManager.cleanupLogs函数来完成。
LogManager.cleanupLogs
从当前内存中的“分区-日志目录”映射中过滤出cleanup.policy为delete的分区
deletableLogs = currentLogs.filter { !log.config.compact }
遍历deletableLogs,执行删除操作
UnifiedLog.deleteOldSegments
如果如果分区的cleanup.policy为delete
UnifiedLog.deleteLogStartOffsetBreachedSegments()
// 删除baseOffset小于logStartOffset的日志段
// 如果当前日志段为active LogSegment,则不清理。
UnifiedLog.deleteRetentionSizeBreachedSegments()
// 根据保留大小进行清理,与retention.bytes有关
UnifiedLog.deleteRetentionMsBreachedSegments()
// 根据保留时间进行清理,与retention.ms有关
如果如果分区的cleanup.policy为delete
UnifiedLog.deleteLogStartOffsetBreachedSegments()
// 删除baseOffset小于logStartOffset的日志段
// 如果当前日志段为active LogSegment,则不清理。
这里涉及到对应retention.ms和retention.bytes参数的比较。
3.compact压缩机制
compact会将同一个key的消息进行聚合,仅保留最后一条数据,其他数据全部无效,会被清理。
compact机制对应的模型如下图所示:
仅保留同一个key值对应的最大offset的value值。
kafka集群内置__consumer_offsets的清理策略是compact。