Kafka0.10.0对Segment的管理


发布于 2025-01-17 / 12 阅读 / 0 评论 /
Kafka0.10.0版本对Segment的管理,主要介绍Segment滚动机制

1.LogSegment初始化过程

Segment的抽象为kafka.log.LogSegment,包含log和index文件。

kafka.log.LogSegment构造过程如下所示:

kafka.log.LogSegment(baseOffset)
    new FileMessageSet(file = Log.logFilename(dir, startOffset))
        new File(dir, filenamePrefixFromOffset(offset) + ".log") // 20位的数字,0补齐
    new OffsetIndex(Log.indexFilename(dir, startOffset))
        new File(dir, filenamePrefixFromOffset(offset) + ".index") // 20位的数字,0补齐

注意,baseOffset为long类型,最大值为9223372036854775807,共19位

2.Broker初始化过程对Segment的处理

Broker初始化过程会对数据目录下的所有segment进行加载。主要过程如下所示:

kafka.log.Log(dir) // 每一个数据目录,初始化一个Log实例
    kafka.log.Log#loadSegments // 加载segments
        遍历dir下的所有文件
            文件不可读则抛异常
            文件名以.index为后缀
                确保有对应的.log为后缀的文件
            文件名以.log为后缀
                解析文件名,得到startOffset
                创建kafka.log.LogSegment(startOffset)实例
        如果dir下没有对应的segment,则初始化一个startOffset=0的segment
            kafka.log.Log#roll // 创建kafka.log.LogSegment(0)实例

这里涉及到segment的滚动

3.Segment滚动过程

Segment滚动就是根据当前最新的offset创建新的segment的过程。

kafka.log.Log#roll // 滚动
    kafka.log.LogSegment(logEndOffset) // 根据logEndOffset创建一个新的segment

在写入消息和清理过期segment的过程中都会有滚动操作。

kafka.log.Log#append // 写入消息时
    kafka.log.Log#maybeRoll(messagesSize) // 根据消息的大小判断是否滚动创建新segment
        kafka.log.Log#roll // 如果时间和大小都满足滚动的要求,则调用
    
kafka.log.Log#deleteOldSegments // 删除过期segment
    kafka.log.Log#roll // 如果要删除的segment数量等于当前segment数量,则创建一个新的 

滚动过程涉及到logEndOffset的变化,需要根据logEndOffset来确定segment的起始offset。

4.logEndOffset变更逻辑

logEndOffset的定义如下:

activeSegment = segments.lastEntry.getValue // 最新的一个segment
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
logEndOffset = nextOffsetMetadata.messageOffset

logEndOffset也就是当前分区最新的一个segment的下一条消息的offset。

logEndOffset的变更点在updateLogEndOffset方法,如下所示:

private def updateLogEndOffset(messageOffset: Long) { // 直接更改当前分区的最新offset
    nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
}

调用updateLogEndOffset进行更新的场景有四个:

kafka.log.Log#append // 新消息写入
    updateLogEndOffset(appendInfo.lastOffset + 1) // 变更为最后一条消息的offset加1

kafka.log.Log#roll // segment滚动
    创建新的segment后,将此segment的起始offset赋值为logEndOffset
    updateLogEndOffset(nextOffsetMetadata.messageOffset)

kafka.log.Log#truncateTo
    如果第一个segment的起始offset大于目标offset
        truncateFullyAndStartAt(targetOffset) // 从targetOffset开始清理,
            清理所有的segment
            创建一个segment,baseOffset为targetOffset
            updateLogEndOffset(newOffset)
    如果第一个segment的起始offset大于或等于目标offset
        将baseOffset大于targetOffset的所有segment清理
        activeSegment.truncateTo(targetOffset) // 最新的segment清理部分数据,从targetOffset开始清理
        updateLogEndOffset(targetOffset)

truncateTo是将某个offset之后的消息(包含)都删除,然后以此offset作为logEndOffset。