Spark源码解析 - 分布式计算执行流程
将DF的action操作转为RDD操作
以DataSet#head这个action算子为例
1 | org.apache.spark.sql.Dataset#head:2728 |
sequenceDiagram
participant Dataset
participant TakeOrderedAndProjectExec
participant RDD
Dataset->>Dataset: head(2728)
Dataset->>Dataset: collectFromPlan(3715)
Dataset->>TakeOrderedAndProjectExec: executeCollect
Note right of TakeOrderedAndProjectExec: 调用了SparkPlan子类UnaryExecNode类型的TakeOrderedAndProjectExec重写的executeCollect方法
TakeOrderedAndProjectExec->>TakeOrderedAndProjectExec: executeCollect(204)
Note right of TakeOrderedAndProjectExec: 使用的是ORDER BY x LIMIT n,所以调用的是TakeOrderedAndProjectExec
TakeOrderedAndProjectExec->>RDD: takeOrdered(1524)
RDD->>RDD: reduce(1120)
Note right of RDD: 此处调用了sc.runJob(this, reducePartition, mergeResult)启动了分布式计算任务
向SparkContext提交分布式计算任务
SparkContext执行分布式计算任务的调用链
1 | org.apache.spark.rdd.RDD#reduce:1120 |
Driver端生成DAG并提交分布式任务
1 | org.apache.spark.util.EventLoop#eventThread:47 |
为任务分配WorkerOffer资源运行
1 | org.apache.spark.rpc.netty.MessageLoop#receiveLoopRunnable:41 |
sequenceDiagram
participant MessageLoop
participant Inbox
participant DriverEndpoint
activate MessageLoop
MessageLoop->>MessageLoop: receiveLoopRunnable(41)
MessageLoop->>MessageLoop: receiveLoop(75)
MessageLoop->>Inbox: process(115)
activate Inbox
Inbox->>DriverEndpoint: receive(171)
activate DriverEndpoint
DriverEndpoint->>DriverEndpoint: makeOffers(322)
Note right of DriverEndpoint: 为每个存活的executor创建WorkerOffer,描述其可用资源
DriverEndpoint->>TaskSchedulerImpl: resourceOffers()
activate TaskSchedulerImpl
DriverEndpoint->>DriverEndpoint: launchTasks(402)
Note right of DriverEndpoint: 向executor发送LaunchTask消息启动任务
DriverEndpoint->>ExecutorEndpoint: send()
activate ExecutorEndpoint
ExecutorEndpoint-->>DriverEndpoint: #32;
deactivate ExecutorEndpoint
TaskSchedulerImpl-->>DriverEndpoint: #32;
deactivate TaskSchedulerImpl
DriverEndpoint-->>Inbox: #32;
deactivate DriverEndpoint
Inbox-->>MessageLoop: #32;
deactivate Inbox;
deactivate MessageLoop
Executor端处理任务并返回结果
1 | org.apache.spark.executor.CoarseGrainedExecutorBackend#receive:183 |
sequenceDiagram
participant CoarseGrainedExecutorBackend
participant Executor
participant TaskRunner
participant Task
participant ResultTask
participant DriverEndpoint
participant TaskSchedulerImpl
participant TaskResultGetter
participant TaskSetManager
participant DAGScheduler
participant DAGSchedulerEventProcessLoop
participant JobWaiter
participant RDD
%% Executor端接收LaunchTask事件并处理
activate CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend->>CoarseGrainedExecutorBackend: receive(183)
CoarseGrainedExecutorBackend->>Executor: launchTask(270)
activate Executor
Executor->>TaskRunner: 异步 run(466)
activate TaskRunner
Executor-->>CoarseGrainedExecutorBackend: #32;
deactivate Executor
deactivate CoarseGrainedExecutorBackend
Note right of TaskRunner: 反序列化任务
TaskRunner->>TaskRunner: run(501)
Note right of TaskRunner: 执行任务
TaskRunner->>Task: run(131)
activate Task
Task->>ResultTask: runTask(90)
activate ResultTask
ResultTask-->>Task: #32;
deactivate ResultTask
Task-->>TaskRunner: #32;
deactivate Task
TaskRunner->>TaskRunner: run(642)
Note right of TaskRunner: 返回给driver StatusUpdate事件
TaskRunner->>CoarseGrainedExecutorBackend: statusUpdate(265)
activate CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend->>DriverEndpoint: 发送StatusUpdate事件
activate DriverEndpoint
DriverEndpoint-->>CoarseGrainedExecutorBackend: #32;
deactivate DriverEndpoint
CoarseGrainedExecutorBackend-->>TaskRunner: #32;
deactivate CoarseGrainedExecutorBackend
deactivate TaskRunner
activate DriverEndpoint
%% Driver端接收StatusUpdate事件并处理
DriverEndpoint->>DriverEndpoint: receive(149)
Note right of DriverEndpoint: 接收StatusUpdate事件
DriverEndpoint->>TaskSchedulerImpl: statusUpdate(815)
activate TaskSchedulerImpl
TaskSchedulerImpl->>TaskResultGetter: 异步 enqueueSuccessfulTask(119)
activate TaskResultGetter
TaskSchedulerImpl-->>DriverEndpoint: #32;
deactivate TaskSchedulerImpl
deactivate DriverEndpoint
%% TaskResultGetter中异步线程处理成功任务
TaskResultGetter->>TaskSchedulerImpl: handleSuccessfulTask(870)
activate TaskSchedulerImpl
TaskSchedulerImpl->>TaskSetManager: handleSuccessfulTask(824)
activate TaskSetManager
TaskSetManager->>DAGScheduler: taskEnded(304)
activate DAGScheduler
DAGScheduler->>DAGSchedulerEventProcessLoop: post(105)
activate DAGSchedulerEventProcessLoop
DAGSchedulerEventProcessLoop-->>DAGScheduler: #32;
deactivate DAGSchedulerEventProcessLoop
DAGScheduler-->>TaskSetManager: #32;
deactivate DAGScheduler
TaskSetManager-->>TaskSchedulerImpl: #32;
deactivate TaskSetManager
TaskSchedulerImpl-->>TaskResultGetter: #32;
deactivate TaskSchedulerImpl
deactivate TaskResultGetter
%% 调用任务启动前指定的hook返回结果
activate DAGSchedulerEventProcessLoop
DAGSchedulerEventProcessLoop->>DAGSchedulerEventProcessLoop: doOnReceive(2588)
DAGSchedulerEventProcessLoop->>DAGScheduler: handleTaskCompletion(1686)
activate DAGScheduler
DAGScheduler->>JobWaiter: taskSucceeded(59)
activate JobWaiter
Note right of JobWaiter: resultHandler是用户调用sc.runJob时传入的hook
JobWaiter->>RDD: reduce(1114)
activate RDD
RDD-->>JobWaiter: #32;
deactivate RDD
JobWaiter-->>DAGScheduler: #32;
deactivate JobWaiter
DAGScheduler-->>DAGSchedulerEventProcessLoop: #32;
deactivate DAGScheduler
deactivate DAGSchedulerEventProcessLoop
杂项
LiveListenerBus 注册的消息队列有以下几个(见伴生对象):
- shared(SHARED_QUEUE):所有非内部监听器共享的队列
- appStatus(APP_STATUS_QUEUE):应用状态相关监听器队列
- executorManagement(EXECUTOR_MANAGEMENT_QUEUE):
Executor管理相关监听器队列 - eventLog(EVENT_LOG_QUEUE):事件日志监听器队列
Task 类型
- ShuffleMapTask:当
driver将一个作业(如action操作)划分为多个stage时,除了最后一个stage,前面的stage都是shuffle stage。每个shuffle stage会被拆分为多个ShuffleMapTask,这些任务会处理数据分区并将结果写入shuffle文件,供下游stage使用。只有当依赖的RDD之间存在shuffle(如groupByKey、reduceByKey等)时,才会生成ShuffleMapTask - ResultTask:最后一个
stage(即最终输出stage)会被拆分为多个ResultTask。ResultTask负责将最终结果返回给driver,比如collect、count、saveAsTextFile等操作。只有当所有依赖的ShuffleMapTask执行完成后,driver才会提交ResultTask
资源申请(另启一篇写)
YarnAllocator定时向集群申请资源
固定资源模式
1 | org.apache.spark.deploy.yarn.ApplicationMaster#runDriver:521 |
动态资源申请ExecutorAllocationManager 动态资源管理器SparkContext 初始化的时候,如果开启了动态资源申请,会初始化 ExecutorAllocationManager 用于动态资源申请。ExecutorAllocationManager 通过事件监听(ExecutorAllocationListener 监听 Stage、Task 的提交和完成等)和定期调度(start 方法中启动线程定时做schedule),动态感知任务负载,并与 YarnAllocator 协作,自动申请或释放 Executor,实现资源的弹性伸缩
1 | org.apache.spark.deploy.yarn.ApplicationMaster#launchReporterThread:651 |
Spark源码解析 - 分布式计算执行流程