上一篇文章我们已经把executor的启动创建介绍完了,这里我们接着介绍,在executor启动后driver如何切分RDD以及最后提交task任务给executor。

为了更好的探查application和job的关系,这里在我们一直使用demo上增加了一行执行“first()行为算子”的代码,现在整体的demo代码如下:

def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("WordCount"))

    val rdd: RDD[String] = sc.makeRDD(List(
      "spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
    ))

    // 扁平化操作,拆分出数据
    val value: RDD[String] = rdd.flatMap(_.split(" "))

    // 挑选第一个元素打印(action算子)
    println(value.first())

    // map转换为(key,1)
    val mapRDD: RDD[(String, Int)] = value.map((_, 1))

    // reduceByKey根据key进行聚合
    val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

    //打印最终获取的所有结果(action算子)
    result.collect().foreach(println)

}

这里对于RDD各个算子的详细功能实现原理以及转换算子与行为算子的区别,不是我们本次阅读源码的目的,所以这里略过RDD的具体处理,我们在大方向上追踪任务的拆解和下发执行。

使用过spark的可以知道,这里flatmap、map、reduceByKey都是转换算子,我们就不深入去看,我们着重看下行为算子的处理逻辑,因为只要执行行为算子时才会触发任务的下发执行。所以我们先断点追踪下first的执行逻辑,如下:

 withScope可以简单理解为一个代码块的封装,不涉及数据的具体处理逻辑,所以我们接着到take方法中看一下:

take方法中前面的步骤主要是计算分区和获取的结果数,真实的任务提交正是我们框起来的这一行,runJob有多个重载方法被调用封装参数,我们直接到最底层的地方去看看:

 可以看到job被提交后,最终是会交给DAGScheduler进行处理。到这我们也可以确定application与job的关系,因为一个行为算子对应一次job提交,所以一个application有多少个job主要看其调用了多少次行为算子。我们接着看下DAGScheduler的处理逻辑:

 在这一步DAGScheduler会调用submitJob方法继续提交job,这里我还多标记了两行,主要是说明DAGScheduler会无限阻塞直接job结果返回。我们接着看submitJob的处理逻辑:

[很多重要的逻辑截图不好表示,所以后面源码尽量是直接插入而不是截图]

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 对比分区数,确保没有在一个不存在的分区上发起任务
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
    // 获取当前job的唯一标识(可以知道job唯一标识是从0开始,每多一个job就加1)
    val jobId = nextJobId.getAndIncrement()
    //如果要处理的分区为空
    if (partitions.isEmpty) {
      //克隆相关属性
      val clonedProperties = Utils.cloneProperties(properties)
      if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
        clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
      }
      val time = clock.getTimeMillis()
      //直接发送job启动和结束的事件
      listenerBus.post(
        SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
      listenerBus.post(
        SparkListenerJobEnd(jobId, time, JobSucceeded))
      // 返回job运行结果封装类,由于没有要处理的分区数,所以其任务数直接为0
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    //判断分区是否真的不为空
    assert(partitions.nonEmpty)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    //发送JobSubmitted事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      Utils.cloneProperties(properties)))
    waiter
  }

可以看到job的唯一标识是从0开始,且每次有新job则加1。另外在待处理分区为空时,spark会按照流程发送job开始和结束的事件通知,但是并不会向TaskScheduler下发任务,其直接返回的结果类也是记录task数为0。

下面我们还是来着重看下分区不为空的处理逻辑,其会通过eventProcessLoop向DAGScheduler自己提交一个JobSubmitted事件,这个事件接收处理逻辑如下:

 我们到具体的handleJobSubmitted方法中看一下:

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    //重点一:依据当前RDD创建ResultStage
    var finalStage: ResultStage = null
    try {
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          (_: Int, value: Int) => value + 1)

        logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
          s"but only ${e.maxConcurrentTasks} are available. " +
          s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")

        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }

      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    barrierJobIdToNumTasksCheckFailures.remove(jobId)
    //封装job对象
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    //在job和resultStage间建立互相引用
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    //通过总线发送job启动事件
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    //重点二:提交stage
    submitStage(finalStage)
  }

上面这块代码比较重要,有两个重点,一个是stage的创建,一个stage的提交,下面我们分别来看下:

重点一:依据当前RDD创建ResultStage

按理说这块代码很简单,不应该是重点才是。其实这块是理解DAGScheduler切割job成一个个stage算法的核心。首先我们看下ResultStage的类型信息,可以知道其是Stage的实现类之一:

 Stage一共有两个实现类,一个是ResultStage,一个是ShuffleMapStage,而且从Stage的构建参数我们可以知道,stage是有血缘关系的,其会记录自己依赖的父stage。为了便于理解后续的源码,我们先吧stage切割的逻辑先介绍下。

stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程

 这里我们也可以看出job和stage的关系,即一个job至少有一个stage具体stage的数据要看整个job过程中有多少次宽依赖出现

下面我们在简单看下createResultStage方法:

  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    //通过rdd获取父stage
    val parents = getOrCreateParentStages(rdd, jobId)
    //获取stage唯一标识,其也是从0开始,每次增1
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    //记录stage、job相关信息
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

 这块代码很简单,唯一有点复杂的是通过rdd获取其父stage,这块不是我们此次阅读的目的,所以就不深入介绍了,我们继续下一个重点。

重点二:提交stage

有了前面stage划分的理论介绍,这块的阅读就不会那么困惑了,我们还是先看下源码:

  private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug(s"submitStage($stage (name=${stage.name};" +
        s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //重点一:获取stage依赖的父stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //重点二:提交stage任务
          submitMissingTasks(stage, jobId.get)
        } else {
          //如果stage存在父stage,则重复当前过程,知道没有父stage再提交任务
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

这块的方法逻辑大体是:查看stage是否存在父stage,有父stage则重复当前过程,直到没有父stage再提交任务。虽然逻辑不复杂,但是包含的内容很丰富,首先就是重点一getMissingParentStages方法包含了stage的划分算法,其次是重点二submitMissingTasks涉及了stage到task任务的转换与提交,下面我们分别来看下:

重点一:获取stage依赖的父stage

这块主要是通过RDD的血缘和宽窄依赖确定stage的范围以及是否需要开启新stage,源码如下:

 private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ListBuffer[RDD[_]]
    //将当前rdd加入等待访问的列表,便于后面正式开始往前查找
    waitingForVisit += stage.rdd
    //请注意,这块是一个内部方法,其会在下面的while循环中被往复调用
    def visit(rdd: RDD[_]): Unit = {
      //避免rdd被重读访问(重复访问很有可能出现死循环)
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          // 通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可
          for (dep <- rdd.dependencies) {
            dep match {
              //如果是宽依赖,则以该宽依赖的rdd创建ShuffleMapStage
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              //如果是窄依赖,则将该依赖添加到待访问列表
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.prepend(narrowDep.rdd)
            }
          }
        }
      }
    }
    while (waitingForVisit.nonEmpty) {
      //访问未被处理过的rdd,并将其从待访问列表移除,这块要了解remove的用法,其是返回要移除的元素
      visit(waitingForVisit.remove(0))
    }
    missing.toList
  }

这块的逻辑跟我们前面介绍的stage划分算法大体相同,简言之就是通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可。

重点二:提交stage任务

这块的内容比较多,对于stage还有参数属性等的一些处理没有细讲,我们只标注了和我们阅读源码目的相关的点,下面还是先看下源码:

  private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    logDebug("submitMissingTasks(" + stage + ")")

    stage match {
      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
        mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
      case _ =>
    }

    // 重点一:计算将要执行的计算所依赖的分区索引集合
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    val properties = jobIdToActiveJob(jobId).properties

    runningStages += stage
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {

      var taskBinaryBytes: Array[Byte] = null
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }

      if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
        logWarning(s"Broadcasting large task binary with size " +
          s"${Utils.bytesToString(taskBinaryBytes.length)}")
      }
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage
        return
      case e: Throwable =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    
    //重点二:根据stage类型和要计算的分区数创建task集合
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.nonEmpty) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      //重点三:向TaskScheduler提交任务集合
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
  }

可以看到这块代码虽然多,但是我们只需要关注三个点就行,一个是分区数的计算,一个是taskset集合的生成,还有一个是task任务的提交,下面我们分别看下:

重点一、二 计算分区数并根据分区数创建task集合

分区数的计算主要还是从RDD获取,task集合的创建也没什么好介绍的,其是根据stage的不同创建了一系列处理逻辑相同的task,只是其处理的分区不同

至此我们也可以获得stage和taskset的关系,一个stage对应一个taskset,一个taskset则包含有多个task,task的元素数量分stage中要处理的分区数相关

重点三:向TaskScheduler提交任务集合

这里taskScheduler是一个抽象接口,其只有一个TaskSchedulerImpl实现类,我们看下其提交逻辑:

override def submitTasks(taskSet: TaskSet): Unit = {
    //获取task集合信息
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //创建任务集管理器
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //获取taskset对应的stage信息
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets.foreach { case (_, ts) =>
        ts.isZombie = true
      }
      //将任务管理器和任务相关关联起来
      stageTaskSets(taskSet.stageAttemptId) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run(): Unit = {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //重点:调用SchedulerBackend的reviveOffers方法进一步处理提交的taskset集合
    backend.reviveOffers()
  }

接着看下reviverOffers方法,这个方法有两个实现类,一个在LocalSchedulerBackend中,一个在CoarseGrainedSchedulerBackend中,我们的StandaloneSchedulerBackend实现类正好是继承了后者,所以我们去CoarseGrainedSchedulerBackend中看下其具体实现:

 可以看到其会发送一个ReviveOffers事件出去,该事件的的接收者只有一个就是该类本身,我们再接着其接收事件往下看:

 接着往里看:

    private def makeOffers(): Unit = {
      val taskDescs = withLock {
        // 重点一:挑选符合条件的executor
        val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort),
              executorData.resourcesInfo.map { case (rName, rInfo) =>
                (rName, rInfo.availableAddrs.toBuffer)
              })
        }.toIndexedSeq
        scheduler.resourceOffers(workOffers)
      }
      if (taskDescs.nonEmpty) {
        //重点二:发送任务
        launchTasks(taskDescs)
      }
    }

这里有两个重点,第一个重点其实很简单,之所以标记出来,是为了和我们前面一篇源码文章中的遗留问题相呼应(spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册)。这里就不细讲那个问题跟此处源码的关联了,感兴趣的可以往前看下,在前面的文章有详细介绍。

重点二:发送任务

还是先看下源码:

private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
      //遍历task任务
      for (task <- tasks.flatten) {
        val serializedTask = TaskDescription.encode(task)
        //验证任务序列化传输的内容是不是超过指定大小
        if (serializedTask.limit() >= maxRpcMessageSize) {
          Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
                s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //获取executor信息
          val executorData = executorDataMap(task.executorId)
          //executor为task分配资源,该资源在task执行结束后释放
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          task.resources.foreach { case (rName, rInfo) =>
            assert(executorData.resourcesInfo.contains(rName))
            executorData.resourcesInfo(rName).acquire(rInfo.addresses)
          }

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")
          //向executor发送LaunchTask任务
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

可以看到,其会先判断任务的rpc传输大小是否超出限定值,如果超出,则抛异常,如果在正常范围内,则向executor发送task任务,注意这里task是一条条发,而不是整个taskset一起发。

至此我们的源码阅读目的已经达成,这块我们其实有两个行为算子,一个是first一个collect,这里只介绍了first的任务下发过程,collect的并没有介绍,主要是二者下发完全一样,都是通过runJob方法提交job,这里之所以加一个first方法,主要是为了探究application和job的关系,如下监控页面所示,我们有两个行为算子,自然而然有两个job。

总结:

1、application、job、stage、taskset、task关系
application和job对应关系:一个application可能有多个job提交,因为每个行为算子都调用runjob方法,即都对应一次job任务提交
job和stage对应关系:一个job可能有多个stage,划分依据主要是RDD是不是出现了shuffle,即每个宽依赖都会导致新stage的生成。
stage和taskset关系:一个stage对应一个taskset,且为stage中每个分区创建一个task任务。
taskset和task关系:taskset包含一组task,每个task执行逻辑相同,主要是面向stage不同的分区。

2、job、stage唯一标识从0开始统计,每来一个新的则加1。

3、stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程

4、stage有两个实现类ResultStage和ShuffleMapStage,task对应也有两种任务集合,分别是ResultTask和ShuffleMapTask。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐