Kafka线程体系


发布于 2024-12-01 / 12 阅读 / 0 评论 /
了解Kafka线程体系,包括Kafka Broker、Producer、Consumer

学习Kafka线程体系,有助于进行Kafka服务端和客户端的排障和调优。

1.Kafka Server线程体系

一个Kafka Server进程中,包含所有的线程及其功能。

1.1.KafkaScheduler

这是一个调度器,也是个线程池,启动过程如下所示:

KafkaServer中有对KafkaScheduler的引用,如下所示:

kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

线程池中线程数量与background.threads配置有关,默认值为10

BrokerServer中有对KafkaScheduler的引用,如下所示:

transactionCoordinator = TransactionCoordinator(config, replicaManager,
        new KafkaScheduler(1, true, "transaction-log-manager-"),
        producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)

主要是用于事务日志文件的管理。

KafkaController中有对KafkaScheduler的引用,如下所示:

private[controller] val kafkaScheduler = new KafkaScheduler(1)
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")

第一个用于Kafka服务的管理,线程名前缀为“kafka-scheduler-”。

第二个用于token的清理。

GroupMetadataManager中有对KafkaScheduler的引用,如下所示:

private val scheduler = new KafkaScheduler(1, true, "group-metadata-manager-")

线程池中线程数量为1,线程名称以“group-metadata-manager-”为前缀。

ZookeeperClient中也有对KafkaScheduler的引用,如下所示:

val reinitializeScheduler = new KafkaScheduler(1, true, s"zk-client-${threadPrefix}reinit-")

线程池中线程数量为1。

1.2.ReplicaFetcherThread

ReplicaFetcherThread是用于Broker中不同的分区replica从leader同步消息的线程。

ReplicaFetcherThread启动过程如下所示:

AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState])
    将partitionAndOffsets根据分区号和分区的leader所在brokerid进行分组,得到partitionsPerFetcher
    遍历partitionsPerFetcher,针对每个fetcherId创建对应的副本同步线程。
        ReplicaFetcherManager.createFetcherThread(fetcherId)
            val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
            new ReplicaFetcherThread(threadName, fetcherId)
                val replicaId = brokerConfig.brokerId
                new ReplicaFetcherBlockingSend(fetcherId, s"broker-$replicaId-fetcher-$fetcherId")

fetcherId分区信息hash值与num.replica.fetchers配置值取模运算,num.replica.fetchers默认值为1。

理论上说,如果集群中有N个Broker,则ReplicaFetcherThread数量最大值为:

( N - 1 ) * ${num.replica.fetchers}

ReplicaFetcherThread数量实际值topic分区有关。

1.3.Acceptor

KafkaBroker在启动过程中,会根据listeners的配置启动对应的SocketServer,一个端口对应一个socketServer,一个socketServer对应一个acceptor线程

KafkaServer.startup()
    一个端口对应一个socketServer
    socketServer = new SocketServer()
        一个socketServer对应一个acceptor线程
        val thread: KafkaThread = KafkaThread.nonDaemon(s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}")

线程名称带有“kafka-socket-acceptor”

1.4.Processor

每一个listener对应一个dataPlaneAcceptor和一个controlPlaneAcceptor。

Processor线程名带有“kafka-network-thread”字样。

dataPlaneAcceptor会启动对应的Processor,Processor数量由num.network.threads参数决定,默认值为8

对每一个listener,添加一个dataPlaneAcceptor,每一个dataPlaneAcceptor对应个Processor
SocketServer.addListeners
    针对每个listener,创建Acceptor和Processor
        SockertServer.createDataPlaneAcceptorAndProcessors()
            val dataPlaneAcceptor = createDataPlaneAcceptor()
            dataPlaneAcceptor.configure()
                SocketServer.addProcessors(num.network.threads)
                    创建num.network.threads个Processor
                    SocketServer.newProcessor(id, listenerName, securityProtocol) // id从0开始递增
                        val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
                        new Processor(id, name)
                            KafkaThread.nonDaemon(name)

对应的线程名带有“data-plane-kafka-network-thread”

controlPlaneAcceptor也去启动对应的Processor,Processor数量为固定值1


KafkaServer.startup()
    socketServer = new SocketServer()
        Sockert实例初始化是,对每一个listener,创建对应的controlPlaneAcceptor
        SockerServer.createControlPlaneAcceptorAndProcessor()
            val controlPlaneAcceptor = createControlPlaneAcceptor()
            controlPlaneAcceptor.addProcessors(1)
                SocketServer.newProcessor(id, listenerName, securityProtocol)
                    val name = s"control-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
                    new Processor(id, name)
                        KafkaThread.nonDaemon(name)

可以看到,对应的线程名带有“control-plane-kafka-network-thread”

1.5.KafkaRequestHandlerPool

Kafka中有两种请求,一种是数据请求,一种是管理请求,分别由不同的KafkaApis实例进行处理。

数据请求的处理线程初始化过程如下

数据请求
参数为num.io.threads,默认值为8
RequestChannel阻塞队列请求数最大为queued.max.requests,默认值500
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.numIoThreads, "data-plane")
    创建numIoThreads个KafkaThread
        KafkaThread.daemon("data-plane-kafka-request-handler-" + id) // id为0到numIoThreads

数据请求的处理线程名带有“data-plane-kafka-request-handler”字样。

管理请求的处理线程初始化过程如下

controll请求,一个listener对应一个controlPlaneRequestProcessor
每个RequestChannel阻塞队列请求数最大为固定值20
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(1, "control-plane")
    创建1个KafkaThread
        KafkaThread.daemon("control-plane-kafka-request-handler-0")

管理请求的处理线程名带有“control-plane-kafka-request-handler”字样。

2.Kafka Producer线程体系

一个Kafka Producer进程中,包含所有的线程及其功能。

主要就两个线程:

(1)main线程:主线程最终把消息写入到了内存中

(2)IO线程,线程名前缀为kafka-producer-network-thread,将内存中的数据发送给Kafka Broker。

3.Kafka Consumer线程体系

一个Kafka Consumer进程中,包含所有的线程及其功能。