Kafka TopicId不一致导致的血案


发布于 2025-09-06 / 15 阅读 / 0 评论 /
Kafka TopicId不一致导致了一次事故

问题现象

使用低版本(小于2.8)的kafka客户端向高版本(>=2.8)的kafka集群以zookeeper模式进行topic创建和分区的扩容后,topic会因为topicId不一致导致副本同步失败,consumer会因为leaderEpoch不一致导致消费失败,producer会因为NO LEADER导致无法生产消息。

问题复现方法

稳定复现方法如下:

第一步:使用kafka低版本小于2.8)客户端工具kafka-topics.sh向kafka高版本(>=2.8)集群创建topic,以zookeeper连接模式创建,创建成功。

第二步:启动生产者生产消息,正常生产。

第三步:启动消费者消费消息,正常消费。

第四步:使用kafka低版本客户端工具kafka-topics.sh以zookeeper模式增加分区,分区扩容成功。

第五步:检查生产者和消费者,正常生产消费,新扩容的分区也能被正常生产和消费。

第六步:进行controller切换,可以通过手动删除zk上的/controller节点实现。切换后,zookeeper上的topicId与新扩容的分区目录下的partition.matadata保持一致,但是与之前的分区目录下的partition.matadata不一致。

第七步:检查生产者和消费者,正常生产消费。

第八步:重启某个节点。启动成功后,该节点退出ISR,生产者生产消息报错。

源码分析

Kafka 在 2.8 版本引入了 topicId 的概念,topic 在创建时会 controller 会生成一个 topicId(默认生成策略为 uuid)并存储到 zookeeper中,一个 topic 唯一映射一个 topicId。

在 Kafka 2.8.0 之前,Kafka 主要依赖 Topic 名称 来标识一个 Topic。但这在某些管理操作(如使用 MirrorMaker 2 进行跨集群同步)时可能带来歧义,例如当两个同名的 Topic 内容完全不同时。

topicId 的引入解决了这个问题:

  • 唯一性:全局唯一的标识符。

  • 持久性:一旦分配,在 Topic 的整个生命周期内不会改变。

  • 内部管理:Kafka 内部使用 topicId 来管理元数据,使其与易变的 Topic 名称解耦。

创建topic时生成topicId

kafka-2.8中,通过zookeeper模式创建topic的过程如下:

// 通过zookeeper创建topic
kafka.zk.AdminZkClient#createTopic(usesTopicId = false)
	kafka.zk.AdminZkClient#createTopicWithAssignment(usesTopicId)
		kafka.zk.AdminZkClient#writeTopicPartitionAssignment(isUpdate = false, usesTopicId)
			// 因usesTopicId为false,topicIdOpt为None
			val topicIdOpt = if (usesTopicId) Some(Uuid.randomUuid()) else None
			zkClient.createTopicAssignment(topic, topicIdOpt, assignment.map { case (k, v) => k -> v.replicas })
				TopicZNode.encode(topicId, persistedAssignments)
					// 只有topicId不为空时,才生成topic_id
					topicId.foreach(id => topicAssignment += "topic_id" -> id.toString)

可以看出,以zookeeper模式新创建的topic在zookeeper的zknode数据是不带topic_id的。

当zookeeper模式创建topic后,KafkaController会接收到TopicChange event,并对事件进行如下处理:

// 新建topic,controller接收到TopicChange event
kafka.controller.KafkaController#process(TopicChange)
	kafka.controller.KafkaController#processTopicChange
		// 如果当前不是active controller,则直接退出,不做处理
		if (!isActive) return
		// 获取zookeeper上所有的topic列表
		val topics = zkClient.getAllTopicsInCluster(true)
		// 获取新建的topic列表
		val newTopics = topics -- controllerContext.allTopics
		// 获取删除的topic列表
		val deletedTopics = controllerContext.allTopics.diff(topics)
		// 保持controller中缓存的topic列表与zookeeper保持一致
		controllerContext.setAllTopics(topics)
		// 处理新增的topic
		kafka.controller.KafkaController#registerPartitionModificationsHandlers(newTopics.toSeq)
		// 获取新增topic分区分配的结构
		val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)
		// 对已删除topic的处理
		deletedTopics.foreach(controllerContext.removeTopic)
		kafka.controller.KafkaController#processTopicIds(topicIdAssignments = addedPartitionReplicaAssignment)
			// 如果当前版本大不小于2.8.0,则需要将topicId不存在的topic设置topicId
			if (interBrokerProtocolVersion >= KAFKA_2_8_IV0)
				val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
				withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
					// 对topicId不存在的topic,设置为Uuid.randomUuid()
					topicId = Uuid.randomUuid()
					SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicId, assignments), ZkVersion.MatchAnyVersion)
			// 将topicId和topicName映射添加到controllerContext中
			kafka.controller.ControllerContext#addTopicId
				topicIds.put(topic, id)
				topicNames.put(id, topic)

这里会为新创建的topic生成TopicId,并把TopicId更新到zookeeper和controller上下文中。

所以,以zookeeper模式创建topic后,zookeeper上topic对应的zknode会带有topicId信息。

扩容topic分区时zk上的topicId被置空

kafka-2.8.2版本中,通过zookeeper模式扩容topic分区的过程如下:

kafka.admin.TopicCommand.ZookeeperTopicService#alterTopic
	kafka.zk.AdminZkClient#addPartitions
		kafka.zk.AdminZkClient#createPartitionsWithAssignment
			kafka.zk.AdminZkClient#writeTopicPartitionAssignment(topic, combinedAssignment, isUpdate = true)
				// 首先从zookeeper获取topicId
				val topicIds = zkClient.getTopicIdsForTopics(Set(topic))
				// 然后为新增的分区添加元数据
        		zkClient.setTopicAssignment(topic, topicIds.get(topic), assignment)

kafka-2.8.0有topicId概念,所以新增的分区会和原来zookeeper上的topicId保持一致。

kafka-1.0.0版本中,通过zookeeper模式扩容topic分区的过程如下:

// kafka-1.0.0扩容分区的处理过程
kafka.admin.TopicCommand#alterTopic
	// 从zookeeepr中拿到所有topic的列表
	val topics = getTopics(zkUtils, opts)
	// 遍历topics列表
	topics.foreach { topic =>
		// 如果命令行有“--partitions”选项,扩容需要设置次选项,那就执行扩容操作
		// 如果是内置topic,则抛异常
		// 获取命令行中设置的分区数
		val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
		// 获取当前分区结构
		val existingAssignment = zkUtils.getReplicaAssignmentForTopics
		// 生成新的分区结构
		val newAssignment
		// 从zookeeper获取所有的broker信息,即活跃的broker
		val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
		// 新增分区
		AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
			// 对新增的分区进行处理,以update模式对zookeeper上的zknode进行更新
			AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK
				kafka.admin.AdminUtils#validateCreateOrUpdateTopic
				kafka.admin.AdminUtils#writeTopicPartitionAssignment
					// 获取topic对应的zknode
					val zkPath = getTopicPath(topic)
					// 分区结构转换为
					val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
						// 仅包含2个字段——version和partitions
						Json.encode(Map("version" -> 1, "partitions" -> map))
					// 更新topic对应的zkPath的数据
					zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
	}

这里就可以解释“为什么扩容分区后,对应topic在zknode上的topic_id字段被清空”

zookeeper上新增分区后,KafkaController会接收到PartitionModifications event,Kafka2.8.2版本对这个event的处理逻辑如下:

// 扩容一个topic的分区,controller接收到PartitionModifications event
kafka.controller.KafkaController#process(PartitionModifications)
	kafka.controller.KafkaController#processPartitionModifications
		// 如果当前不是active controller,则直接退出,不做处理
		if (!isActive) return
		// 获取topic的所有分区信息
		val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
		// 获取新增的分区信息
		val partitionsToBeAdded = partitionReplicaAssignment.filter
		info(s"New partitions to be added $partitionsToBeAdded")
		// 更新controller上下文中的分区信息
		controllerContext.updatePartitionFullReplicaAssignment(partitionsToBeAdded)
		// 新增分区处理
		kafka.controller.KafkaController#onNewPartitionCreation(newPartitions = partitionsToBeAdded)
			info(s"New partition creation callback for ${newPartitions.mkString(",")}")
			// 分区状态机更新
			partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
				kafka.controller.ZkPartitionStateMachine#handleStateChanges
					kafka.controller.AbstractControllerBrokerRequestBatch#sendRequestsToBrokers
						// 向分区副本对应的broker发送LeaderAndIsrRequest请求
						sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
						// 向分区副本对应的broker发送UpdateMetadataRequest请求
						sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
						// 向分区副本对应的broker发送UStopReplicaRequest请求
						sendStopReplicaRequests(controllerEpoch, stateChangeLog)
			// 副本状态机更新
			replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
				kafka.controller.ZkReplicaStateMachine#handleStateChanges
					kafka.controller.AbstractControllerBrokerRequestBatch#sendRequestsToBrokers
			partitionStateMachine.handleStateChanges(newPartitions.toSeq)
			replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)

并不会对topic的zknode数据进行修改。

所以,使用kafka-1.0.0对topic扩容分区后,topicId被持久置空。

controller切换下发topicId到partition.metadata文件

删除/controller或者对当前controller节点进行重启都会触发controller角色的切换

controller切换

controller切换过程如下:

kafka.controller.KafkaController#elect
	// 获取当前active controller
	activeControllerId = zkClient.getControllerId.getOrElse(-1)
	// 如果activeControllerId不为-1,表示当前有active controller,输出日志,并退出当前elect流程
		return
	// 如果activeControllerId为-1,表示当前没有active controller,需要进行如下的elect流程
		kafka.controller.KafkaController#onControllerFailover
			info("Initializing controller context")
			kafka.controller.KafkaController#initializeControllerContext
				val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
				kafka.controller.KafkaController#processTopicIds(topicIdAssignments = replicaAssignmentAndTopicIds)
					// 根据是否有topicId进行分组,两组
					val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
					// 将没有topicId的topic进行元数据更新,在zookeeper上增加topic_id信息,topicId为Uuid.randomUuid()
					withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
					// topicId都生成后,更新controller上下文中的topicId信息
					kafka.controller.ControllerContext#addTopicId
						topicIds.put(topic, id)
			    		topicNames.put(id, topic)
			info("Initializing controller context")
			info("Sending update metadata request")
			// 发送UpdateMetadataRequest和LeaderAndIsr请求
    		kafka.controller.KafkaController#sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
    			// 确保当前context中leaderAndIsrRequestMap,stopReplicaRequestMap和updateMetadataRequestBrokerSet为空,否则抛异常。表示不存在未发送的请求。
    			brokerRequestBatch.newBatch()
    			// 生成UpdateMetadataRequest请求,保存到updateMetadataRequestBrokerSet中
      			brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokerIds, partitions)
      				updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
      				partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition)
      			// 向指定的分区对应的broker发送UpdateMetadataRequest请求
      			brokerRequestBatch.sendRequestsToBrokers(controllerEpoch)
					val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
					sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
						controllerContext.topicIds.getOrElse(topic, Uuid.ZERO_UUID)
					sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
						val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava)
						sendRequest(broker, updateMetadataRequestBuilder, responseCallback)
					sendStopReplicaRequests(controllerEpoch, stateChangeLog)
    		info(s"Ready to serve as the new controller with epoch $epoch")

controller初始化时,从zookeeper读取topicId数据,这里被低版本扩容的topic就不存在topicId,所以会自动生成一个新的topicId,并更新到zookeeper和controller上下文。

controller在发送LeaderAndIsr请求时,会从controller上下文中获取topicId并包含在请求中。

LeaderAndIsr请求处理

各个Broker接收到LeaderAndIsr请求,并进行如下处理:

// Broker处理LeaderAndIsrRequest请求
kafka.server.KafkaApis#handleLeaderAndIsrRequest
	// 副本角色确认
	kafka.server.ReplicaManager#becomeLeaderOrFollower
		stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId for ${requestPartitionStates.size} partitions")
		stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState correlation id $correlationId from controller $controllerId epoch ${leaderAndIsrRequest.controllerEpoch}")
		// 检查LeaderAndIsrRequest请求中的topicId与本地内存中的topicId是否一致
		kafka.cluster.Partition#checkOrSetTopicId(requestTopicId: Uuid)
			if (log.topicId == Uuid.ZERO_UUID) {
				// 当log中的topicId为空
				log.assignTopicId(requestTopicId)
					// 创建partition.metadata文件,并写入topicId信息
					kafka.log.Log#assignTopicId(topicId = requestTopicId)
				true
			} else if (log.topicId != requestTopicId) {
				stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not match the topic Id for partition $topicPartition provided in the request: $requestTopicId.")
				false
			} else {
				// topic ID in log exists and matches request topic ID
				true
			}

其中kafka.log.Log#assignTopicId用于partition.metadata文件内容的创建和设置。

TopicId进行更新

kafka.log.Log#assignTopicId处理过程如下:

// 对已存在的topic进行topicId的更新
kafka.log.Log#assignTopicId(topicId: Uuid)
	// 更新内存中的topicId
	this.topicId = topicId
	// 当partitionMetadataFile不存在时
	if (!partitionMetadataFile.exists()) {
		partitionMetadataFile.record(topicId)
			// 更新dirtyTopicIdOpt
			dirtyTopicIdOpt = Some(topicId)
		scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
			kafka.log.Log#maybeFlushMetadataFile
				// 尝试写partition.metadata文件
				kafka.server.PartitionMetadataFile#maybeFlush
					如果dirtyTopicIdOpt存在,则执行以下方式写入。不存在则不写
						// 写入名为“partition.metadata.tmp”的文件
						val fileOutputStream = new FileOutputStream(tempPath.toFile)
						val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
						writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
						writer.flush()
						fileOutputStream.getFD().sync()
						// 将”partition.metadata.tmp“文件重命名为”partition.metadata“
						Utils.atomicMoveWithFallback(tempPath, path)
    }

这里,就重新生成了partition.metadata文件,并写入了新的内容。但是不会对已存在的partiton.metadata文件进行更新。

所以,切换controller后,没有topicId的分区生成新的topicId,然后随着LeaderAndIsrRequest下发到Broker,Broker在处理LeaderAndIsrRequest请求时,对未生成partition.metadata文件的分区新增partition.metadata文件,并写入新的topicId。

broker重启的影响

一个broker重启过程如下:

kafka controller监听/brokers/ids,当子节点有变化时,触发BrokerChange事件
kafkacontroller处理BrokerChange事件
kafka.controller.KafkaController#process(BrokerChange)
 kafka.controller.KafkaController#processBrokerChange
  // 如果不是active controller,则直接返回
  if (!isActive) return
  // 对新启动的broker进行处理
  kafka.controller.KafkaController#onBrokerStartup
   info(s"New broker startup callback for ${newBrokers.mkString(",")}")
   // 新启动的broker,则把对应节点处于Offline状态的replica从replicasOnOfflineDirs中移除
      newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
      val newBrokersSet = newBrokers.toSet
      // 获得当前活跃的brokers
      val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
      // 向当前所有活跃的brokers发送UpdateMetadataRequest请求
      sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
      // 向新启动的broker发送UpdateMetadataRequest,以通知他们当前分区leader信息
      sendUpdateMetadataRequest(newBrokers, controllerContext.partitionsWithLeaders)
      // 获取新启动的broker上所有的副本
      val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
      // 向新启动的broker发送状态变更请求,让新启动的broker上的副本处于OnlineReplica状态
      kafka.controller.ZkReplicaStateMachine#handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
       controllerBrokerRequestBatch.newBatch()
       // 对replicas根据replicaId进行分组(replicaId表示brokerId)
       replicas.groupBy(replicaId).foreach {
        // 往不同的replicaId对应的broker发送
        kafka.controller.ZkReplicaStateMachine#doHandleStateChanges(replicaId,replicas,targetState)
         controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
          // 往leaderAndIsrRequestMap添加记录
          leaderAndIsrRequestMap.put
         // 修改controllerContext中replica的状态
         controllerContext.putReplicaState(replica, OnlineReplica)
       }
       controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
        sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
     sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
     sendStopReplicaRequests(controllerEpoch, stateChangeLog)
      partitionStateMachine.triggerOnlinePartitionStateChange()

节点的重启会触发Controller发送LeaderAndIsrRequest请求。

其他节点收到LeaderAndIsrRequest请求后,会对有状态更新的副本下发partition.metadata文件。

解决方案

为避免topicId不一致,提出以下措施

分区topicId不一致检测工具

开发提供分区topicId检测工具,用于检测哪些存在topicId不一致的分区副本。

工具原理

比较集群内每一个broker数据目录下的分区文件中的topic_id与zookeeper中记录的topic_id是否一致。

broker数据目录记录的topic_id在分区副本目录下的partition.metadata文件中,例如:/data/kafka-data/test001-1/partition.metadata

zookeeper中记录的topic_id记录在对应topic的zknode中,通过访问get方法拿到topic_id数据,比如: get /brokers/topics/test001

分区topicId不一致修复方案

主要是修复分区不一致的情况。

修复步骤

分为以下三步:

第一步:备份文件leader-epoch-checkpoint和partition.metadata

find /data/kafka-data -type f -name "partition.metadata" | grep -v -E 'consumer_offsets|transaction_state' | xargs tar -czf partition-metadata-backup.tar.gz

find /data/kafka-data -type f -name "leader-epoch-checkpoint" | grep -v -E 'consumer_offsets|transaction_state'| xargs tar -czf leader-epoch-checkpoint-backup.tar.gz

第二步:删除文件leader-epoch-checkpoint和partition.metadata

find /data/kafka-data -type f -name "partition.metadata" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm

find /data/kafka-data -type f -name "leader-epoch-checkpoint" | grep -v -E 'consumer_offsets|transaction_state'| xargs rm

第三步:重启对应的broker,注意最后启动controller节点

注意事项

此方案不是无损方案,在变更期间要重启kafka节点,服务重启期间,可能会有部分topic无法读写,上层业务会有报错。但数据是安全的,业务端可正常失败重试。