DolphinScheduler源码阅读日记(二)MasterServer工作流调度源码解析

DolphinScheduler源码阅读日记(二)MasterServer工作流调度源码解析

系统架构

系统架构图

MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。

该服务内主要包含:

  • DistributedQuartz: 分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作;
  • MasterSchedulerService: 是一个扫描线程,定时扫描数据库中的t_ds_command表,根据不同的命令类型进行不同的业务操作;
  • WorkflowExecuteRunnable: 主要是负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理;
  • TaskExecuteRunnable: 主要负责任务的处理和持久化,并生成任务事件提交到工作流的事件队列;
  • EventExecuteService: 主要负责工作流实例的事件队列的轮询;
  • StateWheelExecuteThread: 主要负责工作流和任务超时、任务重试、任务依赖的轮询,并生成对应的工作流或任务事件提交到工作流的事件队列;
  • FailoverExecuteThread: 主要负责Master容错和Worker容错的相关逻辑;

核心概念

概念 含义
Process Definition 工作流定义
Process Instance 工作流实例,实际运行时会被包装为WorkflowExecuteRunnable
Command 事件消息

关键流程分析

以下流程都在MasterServer中执行,不再在标题中赘述

拉取事件

MasterSchedulerBootstrap是用于从MySQL中拉取事件(Command)的主要线程,在MasterServer启动时启动,通过findCommands()方法找到待执行的事件,这里的事件不仅限于开始执行工作流,还有其他类型,具体参考CommandType的定义,如下
事件类型定义 org.apache.dolphinscheduler.common.enums.CommandType

而拉取事件流程中,如下
找到待处理事件 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run:126

采用无中心节点设计,所以每个节点通过取模的方式获取当前节点应该处理的事件。节点总数和当前节点的序号是如何生成的呢?从注册中心(ZK/ETCD/JDBC)获取所有节点列表,在本地生成序号,详情参考org.apache.dolphinscheduler.service.queue.MasterPriorityQueue.ServerComparator

根据节点数量和当前节点序号取事件
通过“事件序号%节点数量=当前节点序号”取事件

处理事件,创建工作流

处理事件,创建工作流在,如下
处理事件,创建工作流 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run:137

而创建工作流具体分为如下几个阶段,ProcessDefinition -> ProcessInstance (WorkflowInstance) -> WorkflowExecuteContext -> WorkflowExecuteRunnable,产生WorkflowExecuteRunnable实例即为最终需要调度的工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ProcessDefinition -> ProcessInstance(WorkflowInstance)
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContextFactory#createWorkflowExecuteRunnableContext:56
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContextFactory#createWorkflowInstance:81
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand:317
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#constructProcessInstance:768
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#generateNewProcessInstance:586

// ProcessInstance(WorkflowInstance) -> WorkflowExecuteContext
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnableFactory#createWorkflowExecuteRunnable:79
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContextFactory#createWorkflowExecuteRunnableContext:67

// WorkflowExecuteContext -> WorkflowExecuteRunnable
org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run:137
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnableFactory#createWorkflowExecuteRunnable:80

工作流被创建出来之后会生成一个工作流事件WorkflowEvent,放在内存的阻塞队列workflowEventQueue当中
发送工作流事件 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run:151

调度工作流

MasterSchedulerBootstrap线程启动后,还会再启动一个WorkflowEventLooper工作流事件处理线程,用于消费上一步放入内存的阻塞队列workflowEventQueue当中的工作流事件WorkflowEvent
启动工作流事件处理线程 org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#start:89

WorkflowEventLooper线程会使用WorkflowEventHandler处理工作流事件
消费处理工作流事件 org.apache.dolphinscheduler.server.master.runner.WorkflowEventLooper#run:95

WorkflowEventHandler会将通过调用WorkflowExecuteRunnable工作流的call()方法,将工作流的启动异步提交到WorkflowExecuteThreadPool线程池中执行
异步提交工作流 org.apache.dolphinscheduler.server.master.event.WorkflowStartEventHandler#handleWorkflowEvent:63

最终调用工作流的submitPostNode()方法,开始执行工作流的节点
开始执行工作流节点入口 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#startWorkflow:733

解析工作流节点获取待提交的TaskNode列表submitTaskNodeList,并生成对应的TaskInstance实例
TODO 解析过程比想象的复杂,需要详解分析下
获取待执行的TaskNode,创建TaskInstance实例 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#submitPostNode:1348

将任务添加到内存中的standby优先级队列(堆)readyToSubmitTaskQueue中,并在开始提交任务
开始执行任务 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#submitPostNode:1413

开始提交任务 org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#submitPostNode:1967

之后分发步骤比较复杂,具体拆解如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 开始提交任务
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#submitStandByTask:1967
// 生成任务实例(TaskExecuteRunnable)
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#executeTask:956
// 开始分发任务实例
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#executeTask:993
// 分发
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#tryToDispatchTaskInstance:1011
// 使用TaskDispatchOperator分发
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable#dispatch:41
// 将任务添加到globalTaskDispatchWaitingQueue中
org.apache.dolphinscheduler.server.master.runner.operator.TaskDispatchOperator#handle:34
// GlobalTaskDispatchWaitingQueueLooper线程轮询queue,使用BaseTaskDispatcher分发任务实例给worker
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueueLooper#run:79

使用BaseTaskDispatcher分发任务实例给worker

最后使用找到可用worker节点,通过rpc启动待执行任务
获取可用节点,启动任务 org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher#dispatchTask:74
通过masterRpcClient发送启动任务命令 org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher#doDispatch:87

DolphinScheduler源码阅读日记(二)MasterServer工作流调度源码解析

https://jszero.github.io/2024/07/28/DolphinScheduler源码阅读日记(二)MasterServer工作流调度源码解析/

作者

jszero

发布于

2024-07-28

更新于

2025-05-13

许可协议

评论