Spark源码解析 - 分布式计算执行流程

将DF的action操作转为RDD操作

以DataSet#head这个action算子为例

1
2
3
4
5
6
org.apache.spark.sql.Dataset#head:2728
org.apache.spark.sql.Dataset#collectFromPlan:3715
org.apache.spark.sql.execution.TakeOrderedAndProjectExec#executeCollect // 这里是调用了SparkPlan子类UnaryExecNode类型的TakeOrderedAndProjectExec重写的executeCollect方法
org.apache.spark.sql.execution.TakeOrderedAndProjectExec#executeCollect:204 // 使用的是ORDER BY x LIMIT n,所以调用的是TakeOrderedAndProjectExec
org.apache.spark.rdd.RDD#takeOrdered:1524
org.apache.spark.rdd.RDD#reduce:1120 // 至此,调用了sc.runJob(this, reducePartition, mergeResult)启动了分布式计算任务
sequenceDiagram
    participant Dataset
    participant TakeOrderedAndProjectExec
    participant RDD

    Dataset->>Dataset: head(2728)
    Dataset->>Dataset: collectFromPlan(3715)
    Dataset->>TakeOrderedAndProjectExec: executeCollect
    Note right of TakeOrderedAndProjectExec: 调用了SparkPlan子类UnaryExecNode类型的TakeOrderedAndProjectExec重写的executeCollect方法
    TakeOrderedAndProjectExec->>TakeOrderedAndProjectExec: executeCollect(204)
    Note right of TakeOrderedAndProjectExec: 使用的是ORDER BY x LIMIT n,所以调用的是TakeOrderedAndProjectExec
    TakeOrderedAndProjectExec->>RDD: takeOrdered(1524)
    RDD->>RDD: reduce(1120)
    Note right of RDD: 此处调用了sc.runJob(this, reducePartition, mergeResult)启动了分布式计算任务

向SparkContext提交分布式计算任务

SparkContext执行分布式计算任务的调用链

1
2
3
4
5
org.apache.spark.rdd.RDD#reduce:1120
org.apache.spark.SparkContext#runJob:2309
org.apache.spark.SparkContext#runJob:2214 // 终于在这里见到了dagScheduler.runJob的调用
org.apache.spark.scheduler.DAGScheduler#runJob:888
org.apache.spark.scheduler.DAGScheduler#submitJob:860 // 这里是向eventProcessLoop异步发送JobSubmitted事件,等待执行

SparkContext_runJob

Driver端生成DAG并提交分布式任务

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.spark.util.EventLoop#eventThread:47
org.apache.spark.util.EventLoop#eventThread:49 // DAGSchedulerEventProcessLoop获取并处理 JobSubmitted 类型的事件
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#onReceive:2533
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive:2541
org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted:1198 // createResultStage方法会递归创建上游ShuffleMapStage形成DAG
org.apache.spark.scheduler.DAGScheduler#submitStage:1256 // 重点,此处判断父Stage是否都完成,如果没完成递归提交父Stage,否则submitMissingTasks提交当前stage
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks:1332 // 找到未完成的partition
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks:1479 // 通过taskScheduler.submitTasks提交任务
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks:239 // 创建TaskSetManager,用于管理该TaskSet的调度、重试、失败等
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks:257 // 将TaskSetManager加入调度池
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks:274 // 通知后端YarnClusterSchedulerBackend有新任务可调度,触发资源分配和任务启动
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers:618

DAGScheduler_handleJobSubmitted

为任务分配WorkerOffer资源运行

1
2
3
4
5
6
7
8
org.apache.spark.rpc.netty.MessageLoop#receiveLoopRunnable:41
org.apache.spark.rpc.netty.MessageLoop#receiveLoop:75
org.apache.spark.rpc.netty.Inbox#process:115
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive:171 // 除了任务调度时的主动触发,CoarseGrainedSchedulerBackend.DriverEndpoint#onStart 还会周期性地触发这个行为
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers:322 // 为每个存活的executor创建WorkerOffer,描述其可用资源
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers:330
// resourceOffers返回结果是一个双层结构,外层是 Seq,表示每个WorkerOffer(即每个工作节点/Executor)的任务分配情况。内层也是 Seq,表示分配给该节点的所有 TaskDescription(即具体的任务)
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks:402 // 向executor发送LaunchTask消息启动任务
sequenceDiagram
    participant MessageLoop
    participant Inbox
    participant DriverEndpoint

    activate MessageLoop
    MessageLoop->>MessageLoop: receiveLoopRunnable(41)
    MessageLoop->>MessageLoop: receiveLoop(75)
    MessageLoop->>Inbox: process(115)
    activate Inbox
    Inbox->>DriverEndpoint: receive(171)
    activate DriverEndpoint
    DriverEndpoint->>DriverEndpoint: makeOffers(322)
    Note right of DriverEndpoint: 为每个存活的executor创建WorkerOffer,描述其可用资源
    DriverEndpoint->>TaskSchedulerImpl: resourceOffers()
    activate TaskSchedulerImpl
    DriverEndpoint->>DriverEndpoint: launchTasks(402)
    Note right of DriverEndpoint: 向executor发送LaunchTask消息启动任务
    DriverEndpoint->>ExecutorEndpoint: send()
    activate ExecutorEndpoint
    ExecutorEndpoint-->>DriverEndpoint: #32;
    deactivate ExecutorEndpoint
    TaskSchedulerImpl-->>DriverEndpoint: #32;
    deactivate TaskSchedulerImpl
    DriverEndpoint-->>Inbox: #32;
    deactivate DriverEndpoint
    Inbox-->>MessageLoop: #32;
    deactivate Inbox;
    deactivate MessageLoop

Executor端处理任务并返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive:183
org.apache.spark.executor.Executor#launchTask:270 // 在threadPool线程池中异步执行
org.apache.spark.executor.Executor.TaskRunner#run:466 // 反序列化任务
org.apache.spark.executor.Executor.TaskRunner#run:501 // 执行任务
org.apache.spark.scheduler.Task#run:131
org.apache.spark.scheduler.ResultTask#runTask:90
org.apache.spark.executor.Executor.TaskRunner#run:642 // 返回给driver StatusUpdate事件
org.apache.spark.executor.CoarseGrainedExecutorBackend#statusUpdate:265

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive:149
org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate:815
org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask:119
org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask:870
org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask:824
org.apache.spark.scheduler.DAGScheduler#taskEnded:304 // 回调DAGScheduler事件总线
org.apache.spark.util.EventLoop#post:105

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive:2588
org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion:1686
org.apache.spark.scheduler.JobWaiter#taskSucceeded:59 // resultHandler是用户调用sc.runJob时传入的hook
org.apache.spark.rdd.RDD#reduce:1112
// 至此,job结束结果收集到driver端,任务执行结束
sequenceDiagram
    participant CoarseGrainedExecutorBackend
    participant Executor
    participant TaskRunner
    participant Task
    participant ResultTask
    participant DriverEndpoint
    participant TaskSchedulerImpl
    participant TaskResultGetter
    participant TaskSetManager
    participant DAGScheduler
    participant DAGSchedulerEventProcessLoop
    participant JobWaiter
    participant RDD

    %% Executor端接收LaunchTask事件并处理
    activate CoarseGrainedExecutorBackend
    CoarseGrainedExecutorBackend->>CoarseGrainedExecutorBackend: receive(183)
    CoarseGrainedExecutorBackend->>Executor: launchTask(270)
    activate Executor
    Executor->>TaskRunner: 异步 run(466)
    activate TaskRunner
    Executor-->>CoarseGrainedExecutorBackend: #32;
    deactivate Executor
    deactivate CoarseGrainedExecutorBackend
    Note right of TaskRunner: 反序列化任务
    TaskRunner->>TaskRunner: run(501)
    Note right of TaskRunner: 执行任务
    TaskRunner->>Task: run(131)
    activate Task
    Task->>ResultTask: runTask(90)
    activate ResultTask
    ResultTask-->>Task: #32;
    deactivate ResultTask
    Task-->>TaskRunner: #32;
    deactivate Task
    TaskRunner->>TaskRunner: run(642)
    Note right of TaskRunner: 返回给driver StatusUpdate事件
    TaskRunner->>CoarseGrainedExecutorBackend: statusUpdate(265)
    activate CoarseGrainedExecutorBackend
    CoarseGrainedExecutorBackend->>DriverEndpoint: 发送StatusUpdate事件
    activate DriverEndpoint
    DriverEndpoint-->>CoarseGrainedExecutorBackend: #32;
    deactivate DriverEndpoint
    CoarseGrainedExecutorBackend-->>TaskRunner: #32;
    deactivate CoarseGrainedExecutorBackend
    deactivate TaskRunner
    activate DriverEndpoint
    
    %% Driver端接收StatusUpdate事件并处理
    DriverEndpoint->>DriverEndpoint: receive(149)
    Note right of DriverEndpoint: 接收StatusUpdate事件
    DriverEndpoint->>TaskSchedulerImpl: statusUpdate(815)
    activate TaskSchedulerImpl
    TaskSchedulerImpl->>TaskResultGetter: 异步 enqueueSuccessfulTask(119)
    activate TaskResultGetter
    TaskSchedulerImpl-->>DriverEndpoint: #32;
    deactivate TaskSchedulerImpl
    deactivate DriverEndpoint

    %% TaskResultGetter中异步线程处理成功任务
    TaskResultGetter->>TaskSchedulerImpl: handleSuccessfulTask(870)
    activate TaskSchedulerImpl
    TaskSchedulerImpl->>TaskSetManager: handleSuccessfulTask(824)
    activate TaskSetManager
    TaskSetManager->>DAGScheduler: taskEnded(304)
    activate DAGScheduler
    DAGScheduler->>DAGSchedulerEventProcessLoop: post(105)
    activate DAGSchedulerEventProcessLoop
    DAGSchedulerEventProcessLoop-->>DAGScheduler: #32;
    deactivate DAGSchedulerEventProcessLoop
    DAGScheduler-->>TaskSetManager: #32;
    deactivate DAGScheduler
    TaskSetManager-->>TaskSchedulerImpl: #32;
    deactivate TaskSetManager
    TaskSchedulerImpl-->>TaskResultGetter: #32;
    deactivate TaskSchedulerImpl
    deactivate TaskResultGetter

    %% 调用任务启动前指定的hook返回结果
    activate DAGSchedulerEventProcessLoop
    DAGSchedulerEventProcessLoop->>DAGSchedulerEventProcessLoop: doOnReceive(2588)
    DAGSchedulerEventProcessLoop->>DAGScheduler: handleTaskCompletion(1686)
    activate DAGScheduler
    DAGScheduler->>JobWaiter: taskSucceeded(59)
    activate JobWaiter
    Note right of JobWaiter: resultHandler是用户调用sc.runJob时传入的hook
    JobWaiter->>RDD: reduce(1114)
    activate RDD
    RDD-->>JobWaiter: #32;
    deactivate RDD
    JobWaiter-->>DAGScheduler: #32;
    deactivate JobWaiter
    DAGScheduler-->>DAGSchedulerEventProcessLoop: #32;
    deactivate DAGScheduler
    deactivate DAGSchedulerEventProcessLoop

杂项

LiveListenerBus 注册的消息队列有以下几个(见伴生对象):

  • shared(SHARED_QUEUE):所有非内部监听器共享的队列
  • appStatus(APP_STATUS_QUEUE):应用状态相关监听器队列
  • executorManagement(EXECUTOR_MANAGEMENT_QUEUE)Executor 管理相关监听器队列
  • eventLog(EVENT_LOG_QUEUE):事件日志监听器队列

Task 类型

  • ShuffleMapTask:当 driver 将一个作业(如 action 操作)划分为多个 stage 时,除了最后一个 stage,前面的 stage 都是 shuffle stage。每个 shuffle stage 会被拆分为多个 ShuffleMapTask,这些任务会处理数据分区并将结果写入 shuffle 文件,供下游 stage 使用。只有当依赖的 RDD 之间存在 shuffle(如 groupByKeyreduceByKey 等)时,才会生成 ShuffleMapTask
  • ResultTask:最后一个 stage(即最终输出 stage)会被拆分为多个 ResultTaskResultTask 负责将最终结果返回给 driver,比如 collectcountsaveAsTextFile 等操作。只有当所有依赖的 ShuffleMapTask 执行完成后,driver 才会提交 ResultTask

资源申请(另启一篇写)

YarnAllocator定时向集群申请资源
固定资源模式

driver启动的时候会向yarn申请资源
1
2
3
4
org.apache.spark.deploy.yarn.ApplicationMaster#runDriver:521
org.apache.spark.deploy.yarn.ApplicationMaster#createAllocator:489
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources:388
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources:393

动态资源申请
ExecutorAllocationManager 动态资源管理器
SparkContext 初始化的时候,如果开启了动态资源申请,会初始化 ExecutorAllocationManager 用于动态资源申请。ExecutorAllocationManager 通过事件监听(ExecutorAllocationListener 监听 StageTask 的提交和完成等)和定期调度(start 方法中启动线程定时做schedule),动态感知任务负载,并与 YarnAllocator 协作,自动申请或释放 Executor,实现资源的弹性伸缩

异步线程定时调用 YarnAllocator 向集群申请资源
1
2
3
org.apache.spark.deploy.yarn.ApplicationMaster#launchReporterThread:651
org.apache.spark.deploy.yarn.ApplicationMaster#allocationThreadImpl:581
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources:388 // 更新资源请求,用于下面的amClient.allocate去做真正的申请动作
作者

jszero

发布于

2025-08-10

更新于

2025-08-18

许可协议

评论