Spark Executor结束状态传递过程


发布于 2024-08-12 / 28 阅读 / 0 评论 /
探索Spark Executor在任务执行过程中的状态传递

Spark Executor结束状态传递也是状态机的一部分,探索这部分内容有助于我们分析并解决任务卡住的问题,特别是Spark提供的一些命令行工具在实时提交任务后卡住的现象。

结束状态整体流程是:Executor执行到一定阶段后,生成对应的状态更新信息,发送给Driver;Driver接收到状态更新的请求后,进行对应事件的处理。

1.Executor中StatusUpdate事件生成过程

这一步在TaskRunner中完成,随着任务的执行,生成不同的StatusUpdate事件。

如下所示:

TaskRunner.run // 任务执行
    ExecutorBackend.statusUpdate(TaskState.RUNNING)
    如果正常结束
        ExecutorBackend.statusUpdate(TaskState.FINISHED)
            组装StatusUpdate对象
            向Driver发送StatusUpdate事件
    如果执行过程中异常
        ExecutorBackend.statusUpdate(TaskState.FAILED)
        或ExecutorBackend.statusUpdate(TaskState.KILLED)

spark on yarn中,ExecutorBackend实现类为CoarseGrainedExecutorBackend。

ExecutorBackend.statusUpdate的实质就是向Driver发送StatusUpdate事件。

2.Driver对StatusUpdate事件的处理过程

Driver收到状态更新事件后,对该事件进行处理,更新任务状态。

CoarseGrainedSchedulerBackend.DriverEndpoint#receive(StatusUpdate)
    TaskSchedulerImpl.statusUpdate
        成功状态
            TaskResultGetter.enqueueSuccessfulTask
                TaskSchedulerImpl.handleSuccessfulTask
                    TaskSetManager.handleSuccessfulTask
                        DAGScheduler.taskEnded
                            DAGSchedulerEventProcessLoop.post(CompletionEvent)
        失败状态
            TaskResultGetter.enqueueFailedTask
                TaskSchedulerImpl.handleFailedTask
                    TaskSetManager.handleFailedTask
                        DAGScheduler.taskEnded
                            DAGSchedulerEventProcessLoop.post(CompletionEvent)
        RUNNING状态
            TaskInfo.launchSucceeded // 设置任务状态,启动成功

不同状态,处理逻辑不同。

这一步最后生成了CompletionEvent事件,并发送给DAGScheduler。

3.DAGScheduler对CompletionEvent事件的处理过程

DAGScheduler接收到CompletionEvent事件后,进行结束处理

DAGScheduler.onReceive(CompletionEvent)
    DAGScheduler.doOnReceive
        DAGScheduler.handleTaskCompletion
            DAGScheduler.postTaskEnd
            如果任务成功
                JobWaiter.taskSucceeded
            如果任务失败
                JobWaiter.jobFailed

这里的JobWaiter正是在DAGScheduler.submitJob中生成的监听器,用于任务结束后的回调。

  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

如果DAGScheduler一直接收不到CompletionEvent事件,那么JobWaiter就会一直处于等待状态,DAGScheduler.runJob方法也就一直无法返回。这会引发一个典型的现象就是命令行终端阻塞。