1.LogSegment初始化过程
Segment的抽象为kafka.log.LogSegment,包含log和index文件。
kafka.log.LogSegment构造过程如下所示:
kafka.log.LogSegment(baseOffset)
new FileMessageSet(file = Log.logFilename(dir, startOffset))
new File(dir, filenamePrefixFromOffset(offset) + ".log") // 20位的数字,0补齐
new OffsetIndex(Log.indexFilename(dir, startOffset))
new File(dir, filenamePrefixFromOffset(offset) + ".index") // 20位的数字,0补齐
注意,baseOffset为long类型,最大值为9223372036854775807,共19位
2.Broker初始化过程对Segment的处理
Broker初始化过程会对数据目录下的所有segment进行加载。主要过程如下所示:
kafka.log.Log(dir) // 每一个数据目录,初始化一个Log实例
kafka.log.Log#loadSegments // 加载segments
遍历dir下的所有文件
文件不可读则抛异常
文件名以.index为后缀
确保有对应的.log为后缀的文件
文件名以.log为后缀
解析文件名,得到startOffset
创建kafka.log.LogSegment(startOffset)实例
如果dir下没有对应的segment,则初始化一个startOffset=0的segment
kafka.log.Log#roll // 创建kafka.log.LogSegment(0)实例
这里涉及到segment的滚动
3.Segment滚动过程
Segment滚动就是根据当前最新的offset创建新的segment的过程。
kafka.log.Log#roll // 滚动
kafka.log.LogSegment(logEndOffset) // 根据logEndOffset创建一个新的segment
在写入消息和清理过期segment的过程中都会有滚动操作。
kafka.log.Log#append // 写入消息时
kafka.log.Log#maybeRoll(messagesSize) // 根据消息的大小判断是否滚动创建新segment
kafka.log.Log#roll // 如果时间和大小都满足滚动的要求,则调用
kafka.log.Log#deleteOldSegments // 删除过期segment
kafka.log.Log#roll // 如果要删除的segment数量等于当前segment数量,则创建一个新的
滚动过程涉及到logEndOffset的变化,需要根据logEndOffset来确定segment的起始offset。
4.logEndOffset变更逻辑
logEndOffset的定义如下:
activeSegment = segments.lastEntry.getValue // 最新的一个segment
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
logEndOffset = nextOffsetMetadata.messageOffset
logEndOffset也就是当前分区最新的一个segment的下一条消息的offset。
logEndOffset的变更点在updateLogEndOffset方法,如下所示:
private def updateLogEndOffset(messageOffset: Long) { // 直接更改当前分区的最新offset
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
}
调用updateLogEndOffset进行更新的场景有四个:
kafka.log.Log#append // 新消息写入
updateLogEndOffset(appendInfo.lastOffset + 1) // 变更为最后一条消息的offset加1
kafka.log.Log#roll // segment滚动
创建新的segment后,将此segment的起始offset赋值为logEndOffset
updateLogEndOffset(nextOffsetMetadata.messageOffset)
kafka.log.Log#truncateTo
如果第一个segment的起始offset大于目标offset
truncateFullyAndStartAt(targetOffset) // 从targetOffset开始清理,
清理所有的segment
创建一个segment,baseOffset为targetOffset
updateLogEndOffset(newOffset)
如果第一个segment的起始offset大于或等于目标offset
将baseOffset大于targetOffset的所有segment清理
activeSegment.truncateTo(targetOffset) // 最新的segment清理部分数据,从targetOffset开始清理
updateLogEndOffset(targetOffset)
truncateTo是将某个offset之后的消息(包含)都删除,然后以此offset作为logEndOffset。