spark源码(五)DAGScheduler TaskScheduler如何配合提交任务,application、job、stage、taskset、task对应关系是什么?
DAGScheduler TaskScheduler如何配合提交任务,各个阶段如何划分?一个application可能有多个job提交,因为每个行为算子都调用runjob方法,即都对应一次job任务提交一个job可能有多个stage,划分依据主要是RDD是不是出现了shuffle,即每个宽依赖都会导致新stage的生成。一个stage对应一个taskset,且为stage中每个分区创建一个task
上一篇文章我们已经把executor的启动创建介绍完了,这里我们接着介绍,在executor启动后driver如何切分RDD以及最后提交task任务给executor。
为了更好的探查application和job的关系,这里在我们一直使用demo上增加了一行执行“first()行为算子”的代码,现在整体的demo代码如下:
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val rdd: RDD[String] = sc.makeRDD(List(
"spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
))
// 扁平化操作,拆分出数据
val value: RDD[String] = rdd.flatMap(_.split(" "))
// 挑选第一个元素打印(action算子)
println(value.first())
// map转换为(key,1)
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// reduceByKey根据key进行聚合
val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//打印最终获取的所有结果(action算子)
result.collect().foreach(println)
}
这里对于RDD各个算子的详细功能实现原理以及转换算子与行为算子的区别,不是我们本次阅读源码的目的,所以这里略过RDD的具体处理,我们在大方向上追踪任务的拆解和下发执行。
使用过spark的可以知道,这里flatmap、map、reduceByKey都是转换算子,我们就不深入去看,我们着重看下行为算子的处理逻辑,因为只要执行行为算子时才会触发任务的下发执行。所以我们先断点追踪下first的执行逻辑,如下:
withScope可以简单理解为一个代码块的封装,不涉及数据的具体处理逻辑,所以我们接着到take方法中看一下:
take方法中前面的步骤主要是计算分区和获取的结果数,真实的任务提交正是我们框起来的这一行,runJob有多个重载方法被调用封装参数,我们直接到最底层的地方去看看:
可以看到job被提交后,最终是会交给DAGScheduler进行处理。到这我们也可以确定application与job的关系,因为一个行为算子对应一次job提交,所以一个application有多少个job主要看其调用了多少次行为算子。我们接着看下DAGScheduler的处理逻辑:
在这一步DAGScheduler会调用submitJob方法继续提交job,这里我还多标记了两行,主要是说明DAGScheduler会无限阻塞直接job结果返回。我们接着看submitJob的处理逻辑:
[很多重要的逻辑截图不好表示,所以后面源码尽量是直接插入而不是截图]
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 对比分区数,确保没有在一个不存在的分区上发起任务
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// 获取当前job的唯一标识(可以知道job唯一标识是从0开始,每多一个job就加1)
val jobId = nextJobId.getAndIncrement()
//如果要处理的分区为空
if (partitions.isEmpty) {
//克隆相关属性
val clonedProperties = Utils.cloneProperties(properties)
if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
}
val time = clock.getTimeMillis()
//直接发送job启动和结束的事件
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// 返回job运行结果封装类,由于没有要处理的分区数,所以其任务数直接为0
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
//判断分区是否真的不为空
assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
//发送JobSubmitted事件
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
可以看到job的唯一标识是从0开始,且每次有新job则加1。另外在待处理分区为空时,spark会按照流程发送job开始和结束的事件通知,但是并不会向TaskScheduler下发任务,其直接返回的结果类也是记录task数为0。
下面我们还是来着重看下分区不为空的处理逻辑,其会通过eventProcessLoop向DAGScheduler自己提交一个JobSubmitted事件,这个事件接收处理逻辑如下:
我们到具体的handleJobSubmitted方法中看一下:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
//重点一:依据当前RDD创建ResultStage
var finalStage: ResultStage = null
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
barrierJobIdToNumTasksCheckFailures.remove(jobId)
//封装job对象
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
//在job和resultStage间建立互相引用
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
//通过总线发送job启动事件
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//重点二:提交stage
submitStage(finalStage)
}
上面这块代码比较重要,有两个重点,一个是stage的创建,一个stage的提交,下面我们分别来看下:
重点一:依据当前RDD创建ResultStage
按理说这块代码很简单,不应该是重点才是。其实这块是理解DAGScheduler切割job成一个个stage算法的核心。首先我们看下ResultStage的类型信息,可以知道其是Stage的实现类之一:
Stage一共有两个实现类,一个是ResultStage,一个是ShuffleMapStage,而且从Stage的构建参数我们可以知道,stage是有血缘关系的,其会记录自己依赖的父stage。为了便于理解后续的源码,我们先吧stage切割的逻辑先介绍下。
stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程。
这里我们也可以看出job和stage的关系,即一个job至少有一个stage,具体stage的数据要看整个job过程中有多少次宽依赖出现。
下面我们在简单看下createResultStage方法:
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
//通过rdd获取父stage
val parents = getOrCreateParentStages(rdd, jobId)
//获取stage唯一标识,其也是从0开始,每次增1
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//记录stage、job相关信息
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
这块代码很简单,唯一有点复杂的是通过rdd获取其父stage,这块不是我们此次阅读的目的,所以就不深入介绍了,我们继续下一个重点。
重点二:提交stage
有了前面stage划分的理论介绍,这块的阅读就不会那么困惑了,我们还是先看下源码:
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//重点一:获取stage依赖的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//重点二:提交stage任务
submitMissingTasks(stage, jobId.get)
} else {
//如果stage存在父stage,则重复当前过程,知道没有父stage再提交任务
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
这块的方法逻辑大体是:查看stage是否存在父stage,有父stage则重复当前过程,直到没有父stage再提交任务。虽然逻辑不复杂,但是包含的内容很丰富,首先就是重点一getMissingParentStages方法包含了stage的划分算法,其次是重点二submitMissingTasks涉及了stage到task任务的转换与提交,下面我们分别来看下:
重点一:获取stage依赖的父stage
这块主要是通过RDD的血缘和宽窄依赖确定stage的范围以及是否需要开启新stage,源码如下:
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
//将当前rdd加入等待访问的列表,便于后面正式开始往前查找
waitingForVisit += stage.rdd
//请注意,这块是一个内部方法,其会在下面的while循环中被往复调用
def visit(rdd: RDD[_]): Unit = {
//避免rdd被重读访问(重复访问很有可能出现死循环)
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可
for (dep <- rdd.dependencies) {
dep match {
//如果是宽依赖,则以该宽依赖的rdd创建ShuffleMapStage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖,则将该依赖添加到待访问列表
case narrowDep: NarrowDependency[_] =>
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
while (waitingForVisit.nonEmpty) {
//访问未被处理过的rdd,并将其从待访问列表移除,这块要了解remove的用法,其是返回要移除的元素
visit(waitingForVisit.remove(0))
}
missing.toList
}
这块的逻辑跟我们前面介绍的stage划分算法大体相同,简言之就是通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可。
重点二:提交stage任务
这块的内容比较多,对于stage还有参数属性等的一些处理没有细讲,我们只标注了和我们阅读源码目的相关的点,下面还是先看下源码:
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
case _ =>
}
// 重点一:计算将要执行的计算所依赖的分区索引集合
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
var taskBinaryBytes: Array[Byte] = null
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
//重点二:根据stage类型和要计算的分区数创建task集合
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//重点三:向TaskScheduler提交任务集合
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
可以看到这块代码虽然多,但是我们只需要关注三个点就行,一个是分区数的计算,一个是taskset集合的生成,还有一个是task任务的提交,下面我们分别看下:
重点一、二 计算分区数并根据分区数创建task集合
分区数的计算主要还是从RDD获取,task集合的创建也没什么好介绍的,其是根据stage的不同创建了一系列处理逻辑相同的task,只是其处理的分区不同。
至此我们也可以获得stage和taskset的关系,一个stage对应一个taskset,一个taskset则包含有多个task,task的元素数量分stage中要处理的分区数相关。
重点三:向TaskScheduler提交任务集合
这里taskScheduler是一个抽象接口,其只有一个TaskSchedulerImpl实现类,我们看下其提交逻辑:
override def submitTasks(taskSet: TaskSet): Unit = {
//获取task集合信息
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建任务集管理器
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//获取taskset对应的stage信息
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
//将任务管理器和任务相关关联起来
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//重点:调用SchedulerBackend的reviveOffers方法进一步处理提交的taskset集合
backend.reviveOffers()
}
接着看下reviverOffers方法,这个方法有两个实现类,一个在LocalSchedulerBackend中,一个在CoarseGrainedSchedulerBackend中,我们的StandaloneSchedulerBackend实现类正好是继承了后者,所以我们去CoarseGrainedSchedulerBackend中看下其具体实现:
可以看到其会发送一个ReviveOffers事件出去,该事件的的接收者只有一个就是该类本身,我们再接着其接收事件往下看:
接着往里看:
private def makeOffers(): Unit = {
val taskDescs = withLock {
// 重点一:挑选符合条件的executor
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
})
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (taskDescs.nonEmpty) {
//重点二:发送任务
launchTasks(taskDescs)
}
}
这里有两个重点,第一个重点其实很简单,之所以标记出来,是为了和我们前面一篇源码文章中的遗留问题相呼应(spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册)。这里就不细讲那个问题跟此处源码的关联了,感兴趣的可以往前看下,在前面的文章有详细介绍。
重点二:发送任务
还是先看下源码:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
//遍历task任务
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
//验证任务序列化传输的内容是不是超过指定大小
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//获取executor信息
val executorData = executorDataMap(task.executorId)
//executor为task分配资源,该资源在task执行结束后释放
executorData.freeCores -= scheduler.CPUS_PER_TASK
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向executor发送LaunchTask任务
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
可以看到,其会先判断任务的rpc传输大小是否超出限定值,如果超出,则抛异常,如果在正常范围内,则向executor发送task任务,注意这里task是一条条发,而不是整个taskset一起发。
至此我们的源码阅读目的已经达成,这块我们其实有两个行为算子,一个是first一个collect,这里只介绍了first的任务下发过程,collect的并没有介绍,主要是二者下发完全一样,都是通过runJob方法提交job,这里之所以加一个first方法,主要是为了探究application和job的关系,如下监控页面所示,我们有两个行为算子,自然而然有两个job。
总结:
1、application、job、stage、taskset、task关系
application和job对应关系:一个application可能有多个job提交,因为每个行为算子都调用runjob方法,即都对应一次job任务提交
job和stage对应关系:一个job可能有多个stage,划分依据主要是RDD是不是出现了shuffle,即每个宽依赖都会导致新stage的生成。
stage和taskset关系:一个stage对应一个taskset,且为stage中每个分区创建一个task任务。
taskset和task关系:taskset包含一组task,每个task执行逻辑相同,主要是面向stage不同的分区。
2、job、stage唯一标识从0开始统计,每来一个新的则加1。
3、stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程。
4、stage有两个实现类ResultStage和ShuffleMapStage,task对应也有两种任务集合,分别是ResultTask和ShuffleMapTask。
更多推荐
所有评论(0)