Kafka0.10查看group信息current-offset为unknown


发布于 2025-01-19 / 9 阅读 / 0 评论 /
kafka-consumer-groups.sh的describe模式查看group信息,发现某些消费分区的current-offset为unknown,其他分区正常

1.问题描述

在kafka0.10版本的环境中,我们通过kafka-consumer-groups.sh --new-consumer模式describe一个group信息,发现有些消费分区对应的current-offset为unknown。

部分分区offset为unknown有一种场景是正常的,那就是这个分区长时间没有消息生产,导致消费组offset的信息超过offsets.retention.minutes(默认是1440分钟,即1天)未被更新,那么对应的offset信息会被清理,这是kafka自我保护的机制,防止消息堆积过多导致服务异常。

2.kafka-consumer-groups.sh源码解析

describe过程如下所示:

kafka-consumer-groups.sh --describe
kafka.admin.ConsumerGroupCommand
    consumerGroupService赋值
    如果命令行中有--new-consumer,则初始化KafkaConsumerGroupService(opts)
    如果命令行中没有--new-consumer,则初始化ZkConsumerGroupService(opts)
    consumerGroupService.describe
        consumerGroupService.describeGroup


ZkConsumerGroupService#describeGroup
    kafka.utils.ZkUtils#getTopicsByConsumerGroup
        读取zookeeper上/consumers节点的信息

KafkaConsumerGroupService#describeGroup
    kafka.admin.AdminClient#describeConsumerGroup
        kafka.admin.AdminClient#describeGroup
            coordinator = findCoordinator(groupId)
            向coordinator发送DescribeGroups请求DescribeGroupsRequest
            将请求结果封装成GroupSummary类型返回

DescribeGroups请求处理过程如下所示:

# coordinator接收到DescribeGroups请求
kafka.server.KafkaApis#handle(DescribeGroupsRequest)
    kafka.server.KafkaApis#handleDescribeGroupRequest
        kafka.coordinator.GroupCoordinator#handleDescribeGroup
            kafka.coordinator.GroupMetadataManager#getGroup
                groupsCache.get(groupId)

这里group信息完全从缓存中获取,而缓存groupCache在以下场景中会变化:

groupsCache的变化
kafka.coordinator.GroupCoordinator#handleLeaveGroup
    group = groupManager.getGroup(groupId)
    member = group.get(consumerId)
    kafka.coordinator.GroupCoordinator#removeHeartbeatForLeavingMember(group, member)
        移除consumer的心跳
    kafka.coordinator.GroupCoordinator#onMemberFailure(group, member)

如果拿到的group信息中,没有某个分区的offset提交信息,则展示为unknown,如下图所示: