Spark Executor结束状态传递也是状态机的一部分,探索这部分内容有助于我们分析并解决任务卡住的问题,特别是Spark提供的一些命令行工具在实时提交任务后卡住的现象。
结束状态整体流程是:Executor执行到一定阶段后,生成对应的状态更新信息,发送给Driver;Driver接收到状态更新的请求后,进行对应事件的处理。
1.Executor中StatusUpdate事件生成过程
这一步在TaskRunner中完成,随着任务的执行,生成不同的StatusUpdate事件。
如下所示:
TaskRunner.run // 任务执行
ExecutorBackend.statusUpdate(TaskState.RUNNING)
如果正常结束
ExecutorBackend.statusUpdate(TaskState.FINISHED)
组装StatusUpdate对象
向Driver发送StatusUpdate事件
如果执行过程中异常
ExecutorBackend.statusUpdate(TaskState.FAILED)
或ExecutorBackend.statusUpdate(TaskState.KILLED)
spark on yarn中,ExecutorBackend实现类为CoarseGrainedExecutorBackend。
ExecutorBackend.statusUpdate的实质就是向Driver发送StatusUpdate事件。
2.Driver对StatusUpdate事件的处理过程
Driver收到状态更新事件后,对该事件进行处理,更新任务状态。
CoarseGrainedSchedulerBackend.DriverEndpoint#receive(StatusUpdate)
TaskSchedulerImpl.statusUpdate
成功状态
TaskResultGetter.enqueueSuccessfulTask
TaskSchedulerImpl.handleSuccessfulTask
TaskSetManager.handleSuccessfulTask
DAGScheduler.taskEnded
DAGSchedulerEventProcessLoop.post(CompletionEvent)
失败状态
TaskResultGetter.enqueueFailedTask
TaskSchedulerImpl.handleFailedTask
TaskSetManager.handleFailedTask
DAGScheduler.taskEnded
DAGSchedulerEventProcessLoop.post(CompletionEvent)
RUNNING状态
TaskInfo.launchSucceeded // 设置任务状态,启动成功
不同状态,处理逻辑不同。
这一步最后生成了CompletionEvent事件,并发送给DAGScheduler。
3.DAGScheduler对CompletionEvent事件的处理过程
DAGScheduler接收到CompletionEvent事件后,进行结束处理
DAGScheduler.onReceive(CompletionEvent)
DAGScheduler.doOnReceive
DAGScheduler.handleTaskCompletion
DAGScheduler.postTaskEnd
如果任务成功
JobWaiter.taskSucceeded
如果任务失败
JobWaiter.jobFailed
这里的JobWaiter正是在DAGScheduler.submitJob中生成的监听器,用于任务结束后的回调。
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
如果DAGScheduler一直接收不到CompletionEvent事件,那么JobWaiter就会一直处于等待状态,DAGScheduler.runJob方法也就一直无法返回。这会引发一个典型的现象就是命令行终端阻塞。