学习Kafka线程体系,有助于进行Kafka服务端和客户端的排障和调优。
1.Kafka Server线程体系
一个Kafka Server进程中,包含所有的线程及其功能。
1.1.KafkaScheduler
这是一个调度器,也是个线程池,启动过程如下所示:
KafkaServer中有对KafkaScheduler的引用,如下所示:
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
线程池中线程数量与background.threads配置有关,默认值为10
BrokerServer中有对KafkaScheduler的引用,如下所示:
transactionCoordinator = TransactionCoordinator(config, replicaManager,
new KafkaScheduler(1, true, "transaction-log-manager-"),
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
主要是用于事务日志文件的管理。
KafkaController中有对KafkaScheduler的引用,如下所示:
private[controller] val kafkaScheduler = new KafkaScheduler(1)
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")
第一个用于Kafka服务的管理,线程名前缀为“kafka-scheduler-”。
第二个用于token的清理。
GroupMetadataManager中有对KafkaScheduler的引用,如下所示:
private val scheduler = new KafkaScheduler(1, true, "group-metadata-manager-")
线程池中线程数量为1,线程名称以“group-metadata-manager-”为前缀。
ZookeeperClient中也有对KafkaScheduler的引用,如下所示:
val reinitializeScheduler = new KafkaScheduler(1, true, s"zk-client-${threadPrefix}reinit-")
线程池中线程数量为1。
1.2.ReplicaFetcherThread
ReplicaFetcherThread是用于Broker中不同的分区replica从leader同步消息的线程。
ReplicaFetcherThread启动过程如下所示:
AbstractFetcherManager.addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState])
将partitionAndOffsets根据分区号和分区的leader所在brokerid进行分组,得到partitionsPerFetcher
遍历partitionsPerFetcher,针对每个fetcherId创建对应的副本同步线程。
ReplicaFetcherManager.createFetcherThread(fetcherId)
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
new ReplicaFetcherThread(threadName, fetcherId)
val replicaId = brokerConfig.brokerId
new ReplicaFetcherBlockingSend(fetcherId, s"broker-$replicaId-fetcher-$fetcherId")
fetcherId分区信息hash值与num.replica.fetchers配置值取模运算,num.replica.fetchers默认值为1。
理论上说,如果集群中有N个Broker,则ReplicaFetcherThread数量最大值为:
( N - 1 ) * ${num.replica.fetchers}
ReplicaFetcherThread数量实际值topic分区有关。
1.3.Acceptor
KafkaBroker在启动过程中,会根据listeners的配置启动对应的SocketServer,一个端口对应一个socketServer,一个socketServer对应一个acceptor线程
KafkaServer.startup()
一个端口对应一个socketServer
socketServer = new SocketServer()
一个socketServer对应一个acceptor线程
val thread: KafkaThread = KafkaThread.nonDaemon(s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}")
线程名称带有“kafka-socket-acceptor”
1.4.Processor
每一个listener对应一个dataPlaneAcceptor和一个controlPlaneAcceptor。
Processor线程名带有“kafka-network-thread”字样。
dataPlaneAcceptor会启动对应的Processor,Processor数量由num.network.threads参数决定,默认值为8
对每一个listener,添加一个dataPlaneAcceptor,每一个dataPlaneAcceptor对应个Processor
SocketServer.addListeners
针对每个listener,创建Acceptor和Processor
SockertServer.createDataPlaneAcceptorAndProcessors()
val dataPlaneAcceptor = createDataPlaneAcceptor()
dataPlaneAcceptor.configure()
SocketServer.addProcessors(num.network.threads)
创建num.network.threads个Processor
SocketServer.newProcessor(id, listenerName, securityProtocol) // id从0开始递增
val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
new Processor(id, name)
KafkaThread.nonDaemon(name)
对应的线程名带有“data-plane-kafka-network-thread”
controlPlaneAcceptor也去启动对应的Processor,Processor数量为固定值1
KafkaServer.startup()
socketServer = new SocketServer()
Sockert实例初始化是,对每一个listener,创建对应的controlPlaneAcceptor
SockerServer.createControlPlaneAcceptorAndProcessor()
val controlPlaneAcceptor = createControlPlaneAcceptor()
controlPlaneAcceptor.addProcessors(1)
SocketServer.newProcessor(id, listenerName, securityProtocol)
val name = s"control-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
new Processor(id, name)
KafkaThread.nonDaemon(name)
可以看到,对应的线程名带有“control-plane-kafka-network-thread”
1.5.KafkaRequestHandlerPool
Kafka中有两种请求,一种是数据请求,一种是管理请求,分别由不同的KafkaApis实例进行处理。
数据请求的处理线程初始化过程如下
数据请求
参数为num.io.threads,默认值为8
RequestChannel阻塞队列请求数最大为queued.max.requests,默认值500
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.numIoThreads, "data-plane")
创建numIoThreads个KafkaThread
KafkaThread.daemon("data-plane-kafka-request-handler-" + id) // id为0到numIoThreads
数据请求的处理线程名带有“data-plane-kafka-request-handler”字样。
管理请求的处理线程初始化过程如下
controll请求,一个listener对应一个controlPlaneRequestProcessor
每个RequestChannel阻塞队列请求数最大为固定值20
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(1, "control-plane")
创建1个KafkaThread
KafkaThread.daemon("control-plane-kafka-request-handler-0")
管理请求的处理线程名带有“control-plane-kafka-request-handler”字样。
2.Kafka Producer线程体系
一个Kafka Producer进程中,包含所有的线程及其功能。
主要就两个线程:
(1)main线程:主线程最终把消息写入到了内存中
(2)IO线程,线程名前缀为kafka-producer-network-thread,将内存中的数据发送给Kafka Broker。
3.Kafka Consumer线程体系
一个Kafka Consumer进程中,包含所有的线程及其功能。