1.问题描述
在高可用的测试场景中,有一个案例是通过tc命令模拟计算机节点80%网络丢包率,测试结果发现该节点上的KafkaBroker不可用,对应的9092服务端口不处于监听状态。
2.问题分析
首先,从kafka.log日志入手,发现以下异常日志
Failed to process feature ZK node change event. The broker will eventually exit.
根据日志中的broker exit,我们初步判定,这个日志后就是broker退出的过程。
2.1.日志输出过程
下面,我们通过源码解析这个日志触发的过程,如下所示:
KafkaServer.startup()
featureChangeListener = new FinalizedFeatureChangeListener()
thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
featureChangeListener.initOrThrow()
thread.start()
info("Starting")
ChangeNotificationProcessorThread.doWork() // 循环执行while (isRunning)
queue.take.updateLatestOrThrow()
如果上述命令遇到异常,则打印错误日志,抛出FatalExitError异常
error("Failed to process feature ZK node change event. The broker will eventually exit.", e)
throw new FatalExitError(1)
在ChangeNotificationProcessorThread.doWork方法中,如果遇到FatalExitError,则执行以下操作
shutdownInitiated.countDown()
shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
在ChangeNotificationProcessorThread.doWork方法中,如果遇到其他异常,则忽略,仅打印错误日志
error("Error due to", e)
info("Stopped")
这里FinalizedFeatureChangeListener是对/feature这个zknode的监听,包括StateChangeHandler和ZNodeChangeHandler。
输出了对应的日志后,通过Exit.exit方法给KafkaBroker进程发送终止的信号。而这个中止信号会引发kafka shutdown hook钩子函数的执行。
2.2.kafka-shutdown-hook执行过程
下面是kafka-shutdown-hook注册和执行过程。
Kafka#main()
给当前进程添加kafka-shutdown-hook,对应的钩子函数为KafkaServer.shutdown()
遇到Exit.exit函数调用后,就会执行shutdown钩子函数
KafkaServer#shutdown()
info("shutting down")
CoreUtils.swallow(controlledShutdown(), this)
info("Starting controlled shutdown")
如果shutdown成功
info("Controlled shutdown succeeded")
如果shutdown不成功
info("Remaining partitions to move")
info(s"Error from controller")
warn("Retrying controlled shutdown after the previous attempt failed...")
如果shutdown过程中出现异常
warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: ")
warn("Retrying controlled shutdown after the previous attempt failed...")
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
info("Stopping socket server request processors")
info("Stopped socket server request processors")
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
info("shutting down")
info("shut down completely")
CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
info("shutting down")
info("shut down completely")
CoreUtils.swallow(kafkaScheduler.shutdown(), this)
debug("Shutting down task scheduler.")
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
info("Shutdown complete.")
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
info("Shutdown complete.")
CoreUtils.swallow(authorizer.foreach(_.close()), this)
logger.info("close() called on authorizer."); // 在RangerKafkaAuthorizer中输出
CoreUtils.swallow(kafkaController.shutdown(), this)
debug("Resigning")
info("Resigned")
CoreUtils.swallow(socketServer.shutdown(), this)
info("Shutting down socket server")
SocketServer.stopProcessingRequests()
info("Stopping socket server request processors")
info("Stopped socket server request processors")
info("Shutdown completed")
info("shut down completed")
这里主要是通过日志的形式,展现整个过程。跟我们看到的kafka.log日志完全吻合。