Spark原理和实践

Spark概念

Spark 是一个基于内存的大数据分布式计算框架
RDD:
Partition:
Job:
Task:
Driver:
Executor:

Spark为什么比MapReduce执行更快

  • DAG 执行模型:Spark 将整个计算构建成一个有向无环图(DAG),可对多步算子进行统一调度和优化;MapReduce 则强制每个 Job 都是单一的 Map→Shuffle→Reduce,阶段之间无融合
  • 算子融合(Pipelining):对多个窄依赖算子(如 map、filter)进行链式执行,在同一个 Task 中完成,不产生中间写盘;MapReduce 每步都要落盘并重新调度
  • 内存优先 + 本地磁盘优化:Spark 缓存中间数据到内存(或本地临时磁盘),shuffle 时只在必要时 spill;MapReduce 则把中间结果全部写到 HDFS,I/O 成本高

RDD、DataFrame 和 Dataset

RDD:弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表了一个弹性的、不可变的、可分区、里面的元素可进行计算的集合。RDD封装了计算逻辑,并不保存数据
- 不可变
- 分区化
- 有容错机制(Lineage)
- 惰性求值
- 分布式计算
DataFrame:结构化数据,类似关系型数据库表,支持 Catalyst 优化器,性能好但不类型安全
Dataset:结合 RDD 的类型安全和 DataFrame 的性能优化,Scala 和 Java 支持较好。特别的,DataFrame=DataSet[Row]

算子

Transformation:从现有的数据集创建一个新的数据集,返回一个新的 RDD 操作。Transformation都是惰性的,它们并不会立刻执行,只是记住了这些应用到 RDD 上的转换动作
Action:触发在 RDD 上的计算,这些计算可以是向应用程序返回结果,也可以是向存储系统保存数据
常见的 Transformation 包括:map、mapVaules、filter、flatMap、mapPartitions、uoin、join、distinct、xxxByKey
常见的 Action 包括:count、collect、collectAsMap、first、reduce、fold、aggregate、saveAsTextFile

宽依赖 & 窄依赖

广播变量 & 累加器

缓存 & checkpoint

数据本地化

GROUP BY、SORT BY、DISTRIBUTE BY、CLUSTER BY 区别是什么?

语法 用途 是否 Shuffle 是否排序 分区行为
GROUP BY 分组聚合(如 sum、count) ✅ 是 ❌ 否 相同 key 会到同一分区
SORT BY 分区内排序(非全局) ❌ 否 ✅ 是 保持原分区
DISTRIBUTE BY 控制数据如何分区(打散数据) ✅ 是 ❌ 否 同 key 到同一分区
CLUSTER BY 等于SORT BY + DISTRIBUTE BY,排序写入、建索引、聚簇优化,如写入parquet ✅ 是 ✅ 是 等价于 DISTRIBUTE + SORT

Spark On Yarn提交流程

Shuffle介绍

Hash-based shuffle

  1. 机制:
    • 普通运行机制:设下游stage的task数量为n,上游stage的shuffle write阶段每个map task,都会生成n个临时文件。下游shuffle read阶段每个task从上有的所有task所在节点,拉取自己所需要的临时文件,每次都拉取鱼自己buffer缓冲区相同大小的数据,然后通过内存中的Map数据结构进行聚合等操作。以此类推,直到最后所有数据都拉取处理完
    • 优化后的运行机制:增加参数spark.shuffle.consolidateFiles参数,默认值为false。开启后会开启consolidate机制,每个并行度(core)会创建一个shuffleFileGroup,每个group会产生下游task数量的临时文件,在同一个executor的core上执行的task会复用临时文件,可以极大减少临时文件数量。shuffle read时根据共享临时文件中offset读取
  2. 总结:
    • 优点:
      1. 可以省略不必要的排序开销
      2. 避免了排序所需的内存开销
    • 缺点:
      1. 生产的文件过多,会对文件系统造成压力
      2. 大量的小文件的随机读写带来一定磁盘开销
      3. 数据块写入时所需的缓存空间也会随之增加,对内存造成压力
      4. 即使是consolidate机制,在reduce task数量过多的情况下,文件也很多

Sorted-based shuffle

  1. 机制:
    • 普通运行机制:shuffle write的每个task将数据写入内存数据结构,数据结构中的数据量达到阈值(默认10000)之后,将数据排序写入磁盘临时文件,最后将一个task的所有spill溢写磁盘临时文件合并,每个map task写入一个磁盘和一个索引文件。排序是为了把所有属于同一个partition的数据放在一起,以便写入磁盘时合并连续数据段,shuffle read可以执行offfset拉取,不需要加载整个文件。
    • bypass运行机制:reduce任务数量比较少的情况,hash-based shuffle会比sorted-based shuffle效率更高。当满足如下条件时,会进行hash,根据hash值通过缓冲写入对应的磁盘文件,省去排序的过程,最后会对每个task产生的所有临时磁盘文件做合并,每个task生成一个磁盘文件,和一个索引文件
      1. shuffle read task的数量小于阈值spark.shuffle.sort.bypassMergeThreshold
      2. 使用的算子没有map-side的聚合行为,比如reduceByKey
    • Tungsten Sorted运行机制:对sort的优化,排序的不是内容本身,是内容序列化之后的指针(元数据),基于序列化之后的二进制数据操作,没有了序列化和反序列化的过程,内存消耗降低。使用的是堆外内存操作,gc开销减少。使用T-S机制的前提比较苛刻,具体如下
      1. shuffle依赖中不带聚合操作或者对输出进行排序的要求
      2. shuffle的序列化器支持序列化值的重定位(仅支持Kryo Serialized)
      3. shuffle过程中的输出分区个数少于16777216个

Remote Shuffle

Shuffle存在的问题

  • 磁盘随机读写严重:离线数据量增长,一定会增加分区数量来增大并发,因此shuffle文件会以mapper*reducer平方关系增加。单个shuffler文件大概是几k,几十k的样子。磁盘随机读写非常严重
  • shuffle read负载不均衡:请求的多个container分布在同一台机器上时,reduce时会请求多次
  • 数据无备份:shuffle文件存储在mapper所在本地磁盘,磁盘故障或者宕机会导致无法读取

Remote Shuffle service

RSS过程

  1. mapper端数据不落盘,按照reducer划分,push到对应的rss内存中
  2. rss内存中merge不同mapper发过来的数据,写到分布式存储中
  3. reducer直接去分布式存储中读取已经merge好的shuffle数据
    优化点
  4. 减少随机读:按照reducer组织数据,每个文件只归属于一个reducer,reducer读取变成顺序读
  5. 负载均衡:通过rss和hdfs来实现shuffle read负载均衡
  6. 数据多备份:分布式存储shuffle文件多备份,提高可用性

Push-based shuffle

sorted-based shuffle的增强。mapper端结束后主动将数据推送给reduce节点,避免reduce拉取。与rss的区别是,rss写的是外部存储,pss写的是reduce节点,交由reduce节点的ess来管理。但是shuffle数据推送过去之后,如果executor被回收或者宕机,那么数据会丢失不可用。任务规模越大、时间越长,发生executor丢失的概率越高,Push-based Shuffle风险越大,所以PSS更适合中等规模的任务。

Q:什么是ess?External shuffle service
A:
背景:1. 在动态资源申请机制下,executor被移除之后仍然需要保留其上运行的任务结果状态 2.对于shuffle而言,map task溢写的临时文件
定义:运行在集群中每一个节点上的常驻进程,独立于Spark application和executor存在,开启这个服务之后,executor会从这个服务拉取shuffle文件,而不是从上游executor拉取。即executor终止之后,状态依然是可用的

不同类型shuffle对比

维度/类型 Hash Shuffle Sort-based Shuffle Push-based Shuffle (Spark 3.1+) Remote Shuffle (RSS、Uniffle 等)
是否默认启用 否(已废弃) ✅ 是(Spark 默认 Shuffle 类型) 否(需配置启用) 否(需接入插件)
数据写入方式 每个下游 partition 写一个临时文件 将所有输出排序后写一个文件 + 索引 Map 端主动推送数据至 Reduce 端 Map 端写入远程 Shuffle 服务
文件数量 多(Map端每个任务 × Reduce数) 少(一个任务一个输出文件 + index) 少(推到 Reduce 本地) 可控,由服务端统一管理
是否排序 ❌ 否 ✅ 是 ✅ 是(基于 Sort Shuffle) 取决于服务配置(一般是支持排序的)
性能表现 较快但容易产生大量小文件 稍慢但更稳定 性能好,减少 Reduce 拉取等待 性能稳定,容错强
容错能力 差,executor 丢失后数据丢 差,executor 丢失后 Reduce 会失败 较差(数据存在 Reduce 节点) ✅ 强,数据在远程独立服务中
是否支持动态资源释放 ❌ 否(数据存在 executor) ❌ 否 ❌ 否(数据存在 Reduce 节点) ✅ 支持(Executor 可动态释放)
是否需要外部服务 ❌ 否 ❌ 否 ❌ 否 ✅ 是(部署 Shuffle Server)
推荐使用场景 旧版本 Spark,小任务 ✅ 默认通用型 Shuffle 中等规模任务,想减少 Reduce 阶段等待 大规模作业、Spark on K8s、动态资源分配场景
是否支持 Combine ✅ 是 ✅ 是 ✅ 是 ✅ 是(服务端支持)
是否支持 Spark on K8s 不推荐 ✅ 支持 ✅ 支持 ✅ 推荐

实践中遇到的问题

  1. roaringbitmap依赖重复 sbt shade
  2. 迭代计算 StackOverflowError?血缘过深,checkpoint切断依赖
  3. 数据倾斜的问题group by join,加盐处理
  4. spark写数据库,数据库连接过多,每个executor初始化一个连接池
作者

jszero

发布于

2025-05-13

更新于

2025-08-09

许可协议

评论