Group Rebalance机制v1
Kafka Consumer Group Rebalance最原始的方案是通过Zookeeper来实现的。
Rebalance机制
Consumer在启动时会向Zookeeper注册属于自己的zkNode,这是一种Ephemeral节点,当consumer与zookeeper连接断开时,此节点就会被删除。
与Consumer有关的zkNode如下所示:
val ConsumersPath = "/consumers"
class ZKGroupDirs(val group: String) {
def consumerDir = ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids" // 此路径下使用Ephemeral(临时)zknode记录属于此group id的consumer id
def consumerGroupOffsetsDir = consumerGroupDir + “/offsets" // 此路径记录了此group id在某个topic分区上的消费完成的offset。
def consumerGroupOwnersDir = consumerGroupDir + "/owners" // 记录topic分区由哪些consumer消费
}上述zkNode在group.id新建时会创建,如果有新的consumer,则会出创建对应的子节点。
当consumerRegistryDir路径下的子节点发生变化时,表示消费者发生了变化。
当/brokers/ids路径下的子节点发生变化时,表示Broker数量有变化。
Consumer端正是通过监听这两个路径下的子节点变化,来感知Group和集群的状态。
zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)loadBalancerListener是ZKRebalancerListener类实例,有一个独立线程watcherExecutorThread监听,当需要进行rebalance时,就会调用ZKRebalancerListener.syncedRebalance方法。
V1的问题
这个方案严重依赖zookeeper,会有以下两个问题:
一个是羊群效应(Herd Effect):一个被监听的zknode变化后,大量的watcher通知需要由zookeeper通知到各个consumer,这会导致通知期间其他操作的延迟。因为监听的公共目录,导致任何broker或consumer加入或者推出,都会向其余所有的consumer发送通知触发rebalance,就出现了羊群效应。当然这种粗粒度的监听是不正确的,需要精准定位监听的zknode。
另一个是脑裂(Split Brain):每个Consumer都是通过Zookeeper中保存的这些元数据判断Consumer Group状态、Broker状态以及Rebalance结果,由于Zookeeper只保证“最终一致性”,不保证“Simultaneously Consistent Cross-Client views”,不同的Consumer在同一时刻可能连接到Zookeeper集群中不同服务器,看到的元数据可能不一样,这就会造成不正确的Rebalance尝试。
Deprecated声明
这是0.10.0版本之前使用的策略,在0.11.0.0版本以及以后,此种策略就处于Deprecated状态了。可以参考kafka.utils.ZkUtils和kafka.consumer.ZookeeperConsumerConnector中的注释信息。针对此种策略的操作,也主要是在这两个类中。
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")Group Rebalance机制v2
为了解决V1方案的两个问题,Kafka在0.11版本之后在服务端引入了GroupCoordinator。
V2核心思想
将全部的Consumer Group分成多个子集,每个Consumer Group子集在服务端都对应一个GroupCoordinator对其进行管理,GroupCoordinator时KafkaServer中用于管理Consumer Group的组件。消费者就不再依赖Zookeeper,而只有GroupCoordinator在Zookeeper上添加Watcher,大大缓解了Zookeeper的压力。
消费者在加入或退出Consumer Group时会修改Zookeeper中保存的元数据,这点与上文描述的方案一类似,此时会触发GroupCoordinator设置的Watcher,通知GroupCoordinator开始Rebalance操作。
Rebalance过程
当前消费者准备加入某Consumer Group或是GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的网络位置,消费者会向Kafka集群中的某一Broker发送ConsumerMetadataRequest,此请求中包含groupId,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中就包含管理此ConsumerGroup的GroupCoordinator的相关信息。
消费者根据ConsumerMetadataResponse中的GroupCoordinator信息,连接到GroupCoordinator并周期性地发送HeartbeatRequest。
发送HeartbeatRequest的主要作用是为了告诉GroupCoordinator此消费者正常在线,GroupCoordinator会认为长时间未发送HeartbeatRequest的消费者已经下线,触发新一轮的Rebalance操作。
如果HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作,此时消费者发送JoinGroupRequest给GroupCoordinator,JoinGroupRequest的主要目的是为了通知GroupCoordinator,当前消费者要加入指定的ConsumerGroup。
之后,GroupCoordinator会根据收到的JoinGroupRequest和Zookeeper中的元数据完成对此Consumer Group的分区分配。
GroupCoordinator在分配完成后,将分配结果写入Zookeeper保存,并通过JoinGroupResponse返回给消费者。消费者就可以根据JoinGroupResponse中分配的分区开始消费数据。
消费者成功成为Consumer Group的成员后,会周期性发送HeartbeatRequest。如果HeartbeatResponse包含IllegalGeneration异常,则执行步骤3,如果找不到对应的GoupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup异常),则周期性地执行步骤1,直至成功。
V2的问题
当然,此方案也并非完美的方案,虽然解决了羊群效应和脑裂问题,但是还有两个问题:
(1)一是分区分配的操作是在服务端GroupCoordinator中完成的,这就要求服务端实现Partition的分配策略。当要使用新的Partition分配策略时,就必须修改服务端的代码或配置,然后重启服务,这就显得比较麻烦。
(2)二是不同的Rebalance策略有不同的验证需求,当需要自定义分区分配策略和验证需求时,就会很麻烦。
Group Rebalance机制v3
为了解决上述两个问题,Kafka进行了重新的设计,将分区分配的工作放到了消费者这一端进行,而Consumer Group管理的工作则依然由GroupCoordinator处理。
代码提交可参考https://issues.apache.org/jira/browse/KAFKA-2464
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
Rebalance过程
重平衡过程分为以下几步
FindCoordinator
当一个consumer启动并希望加入group时,它首先要通过向 Kafka 发送一个 FindCoordinator 请求来查找负责该group的 Kafka Broker,被称为Coordinator。这个请求的目的是为了找到负责该消费者组的协调者 Kafka 代理(Broker)。
加入组(JoinGroup)
找到Coordinator后consumer才能发送 JoinGroup 请求 来正式加入该group。
JoinGroup请求包含以下信息:
(1)session.timeout.ms和max.poll.interval.ms。如果消费者组中的某个消费者没有响应,协调器将使用这些属性将消费者踢出消费者组。
(2)消费者成员支持的客户端协议列表,这些协议通常包括消费者配置的分区策略(即 partition.assignment.strategy)。
(3)元数据,元数据包括消费者订阅的主题列表。
Coordinator不会立即响应 JoinGroup 请求。如果存在未完成的消费者请求,协调器会等待所有消费者的 JoinGroup 请求来确保组成员的完整性。这通常涉及到一个初始的延迟,这个延迟由 group.initial.rebalance.delay.ms 参数控制。这个参数定义了协调器在开始处理 JoinGroup 请求之前的等待时间。
举个例子,假设你设置了 group.initial.rebalance.delay.ms=10000(即 10 秒)。在这种情况下,协调器在接收到一个 JoinGroup 请求 后,会等待最多 10 秒,看看是否还有其他消费者加入该组。确保在开始重平衡时,所有消费者尽可能都已加入消费者组。
协调器返回JoinGroup响应时,消费者组内的第一个消费者会接收到活跃成员列表和选定的分配策略,并充当Leader,而其他成员则收到空响应。Leader负责在本地执行分区分配。
同步组(SyncGroup)
接下来,所有成员向协调器发送SyncGroup请求。Leader附上计算出的分区分配,而其他成员仅发送一个空请求。
一旦协调器响应所有SyncGroup请求,每个消费者都会收到分配给他们的分区,然后开始获取消息。
心跳(Heartbeat)
每个消费者定期向协调器发送Heartbeat请求以保持会话活跃(参见:heartbeat.interval.ms)。如果某个消费者在指定时间内未能发送 Heartbeat 请求,协调器可能会认为该消费者已失效,并触发一个 重新平衡(rebalance) 过程。
如果正在进行重新衡,协调器会在 Heartbeat 请求的响应中通知消费者需要重新加入组,以确保分区能够被正确地重新分配。
离开组(LeaveGroup)
当停止一个consumer实例。该consumer将在停止前向Coordinator发送LeaveGroup请求,通知它将退出group。
其余消费者将在下一个心跳时收到通知,必须进行重平衡,重新通过JoinGroup/SyncGroup请求以重新分配分区。
在整个重新平衡的过程中,只要分区还未被重新分配,所有的消费者都将暂停数据处理。这意味着消费者无法消费消息,从而导致潜在的处理延迟。默认情况下,重新平衡的超时时间固定为 5 分钟,这段时间内,消费者的延迟增加可能成为一个严重问题。
V3的问题
第一个问题是无法避免的停机效应。例如当Kafka消费者组中的一个消费者离开或加入时,整个组必须重新平衡,假设有三个消费者(C1、C2、C3)在处理数据。如果C1要离开,C1会发送LeaveGroup请求。此时,C2和C3会停止处理数据,等待协调器重新分配分区。当重新平衡完成后,C2和C3会重新开始处理数据。
但是,如果消费者在暂时故障后重新启动会发生什么?它将重新尝试加入消费者组。这一操作会再次触发重新平衡,导致所有消费者再次停止消费数据直到新的分区分配完成。这种反复的重新平衡可能会导致系统处理效率的显著下降。
另一个可能导致消费者重启的原因是群组的滚动升级。不幸的是,这种情况对消费者组来说是灾难性的。事实上,对于一个由三个消费者组成的消费者组,这样的操作将触发 6 次重平衡,这可能会对消息处理产生重大影响,导致显著的延迟和系统性能的下降。
在 Java 中运行 Kafka 消费者时,一个常见的问题是由于网络中断或长时间的 GC 暂停,导致心跳请求未能及时发送,或者 KafkaConsumer#poll() 方法未能按时调用。
心跳丢失:在第一种情况下,如果协调器在 session.timeout.ms 时间内未收到消费者的心跳信号,它会认为该消费者已经失效并启动重新平衡。这种情况通常是由于网络中断或长时间的 GC 暂停造成的。
轮询超时:在第二种情况下,如果 KafkaConsumer#poll() 方法未能在 max.poll.interval.ms 时间内被调用,可能是由于处理记录所需的时间过长,导致消费者无法及时发起新的轮询。这同样会触发重新平衡。