Spark SQL 内核精解:三层架构视角下的全链路拆解
适用读者:有一定大数据开发经验,但希望深入理解 Spark SQL 内部原理的数据工程师、数据平台开发人员。
阅读方式:本文采用逐层递进的结构——每章只引入该层的新概念并配模块图,最后一章才拼出完整全景图。建议顺序阅读,不要跳读。
全文线索:跟随一条
SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630',从 Driver 走到 Executor。
第一章:开篇——Spark 不是一个"软件",而是一组分层的组件
1.1 Spark Core 和 Spark SQL:底座与方言编译器
很多人刚开始接触 Spark 时,会模糊地知道它有"Spark SQL"和"DataFrame",但说不清它们和"Spark"本身是什么关系。
从架构上看,Spark 可以拆成两层:
┌─────────────────────────────────────────────────┐
│ Spark SQL │
│ SQL 解析器 + Catalyst 优化器 + Tungsten 执行 │
│ ┌───────────────────────────────────────────┐ │
│ │ DataFrame / Dataset API │ │
│ └───────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤
│ Spark Core (RDD) │
│ DAGScheduler + TaskScheduler + 内存管理 + IO │
│ ┌───────────────────────────────────────────┐ │
│ │ 分布式执行引擎:调度、容错、通信 │ │
│ └───────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤
│ Cluster Manager (外部编排) │
│ YARN / Kubernetes / Standalone │
└─────────────────────────────────────────────────┘
一句话说清关系:
- Spark Core 是分布式执行引擎——提供 RDD 抽象、DAG 调度、任务分发、容错、Shuffle 通信机制。它是整个 Spark 的底座。
- Spark SQL 是构建在 Core 之上的一层"编译器"——它把 SQL 或 DataFrame API 调用编译成 RDD DAG,然后交给 Spark Core 去执行。
- DataFrame 本质上就是 Catalyst LogicalPlan 的 API 包装。你调用的 .groupBy().agg() 不是"执行",而是在构建一棵 Catalyst 算子树。
对读者意味着什么: 当你写
df.groupBy("city").count()时,Spark 内部走的是:DataFrame API → Catalyst LogicalPlan → Catalyst PhysicalPlan → RDD DAG → Task → Executor。前面的路径里没有数据流动,直到最后的 RDD DAG 阶段才真正触发计算。
1.2 三层架构总览
Spark 运行时涉及的概念可以归入三个层级:
| 层级 | 包含什么 | 本质 |
|---|---|---|
| 进程层 | Driver、Executor、Cluster Manager | 跑在机器上的物理进程 |
| 抽象层 | RDD、Job、Stage、Task、DAG | Driver 内存中的逻辑调度单位 |
| 数据层 | Partition、Shuffle Write/Read 文件 | 数据的分片与流动形态 |
核心原则: 这三层之间有严格的映射关系——一个 Stage 包含 N 个 Task,每个 Task 处理一个 Partition,Task 最终由 Executor 中的线程去跑。
为什么要这样分层? 因为你在 Spark Web UI 上看到的是 Job / Stage / Task(抽象层),但你配置的是
--executor-memory(进程层),你慢 SQL 的原因可能出在数据分布(数据层)。不理解分层,就无法把 UI 上的现象和配置参数对应起来。
1.3 从概念到实际的映射速览
| 你常说的概念 | 归属层级 | 一句话本质 |
|---|---|---|
spark-submit |
进程层 | 提交命令,启动 Driver |
| Executor | 进程层 | 跑 Task 的 JVM 进程 |
| RDD | 抽象层 | 数据集的"不存在的代理"——定义了从哪读、怎么算 |
| Stage | 抽象层 | 无需跨节点移动数据的"有机会并行"的任务块 |
| Task | 抽象层 | 处理"一个 Partition"的线程执行单元 |
| Shuffle | 数据层 | 跨节点重分区:网络传输 + 磁盘落盘 |
| Partition | 数据层 | 数据的最小分片:HDFS 的一个块,或 Shuffle 后的一个文件 |
第二章:进程层——物理世界中跑着哪些进程?
本章目标: 理解 Spark 运行时有哪几类进程、谁管谁、谁住哪里。
2.1 Driver——整个应用的"大脑"
专业定义: Driver 是 Spark 应用中运行 main() 方法的进程,持有 SparkContext,是整个应用的控制中心。
职责:
1. 在 Driver 内部构建 SparkContext(包含 DAGScheduler、TaskScheduler 等组件)
2. 将用户代码/ SQL 转化为执行计划
3. 向 Cluster Manager 申请 Executor 资源
4. 将 Task 派发给 Executor
5. 处理 Executor 的心跳、失败重试
关键事实:
- 一个 Spark 应用有且只有一个 Driver
- Driver 运行在提交 spark-submit 的机器上(client 模式),或者运行在集群内的某台机器上(cluster 模式)
- Driver 不参与数据计算,只动脑不动手
区分口决:每次
spark-submit一个新的应用 → 一个新的 Driver。如果用了 Spark Thrift Server,那是多个 SQL 共享一个常驻 Driver。
2.2 Executor——真正的"苦力"
专业定义: Executor 是运行在 Worker 节点上的 JVM 进程,负责执行 Driver 分发的 Task,并管理该进程内数据的存储和缓存。
职责:
1. 接收 Driver 发来的 Task 并在线程池中调度执行
2. 以迭代器方式逐条处理数据
3. 在 Shuffle 时作为生产者写出数据、作为消费者拉取数据
4. 缓存持久化数据(cache() / persist())
关键事实:
- Executor 是"有状态"的——它内部的内存被划分为执行内存(算数据用)和存储内存(缓存数据用)
- Executor 启动后会主动反向注册到 Driver(不是 Driver 来找它)
- 一个节点可以运行多个 Executor,取决于资源配置
常见误区纠正: Executor 不是一口气把数据全加载到内存的。它是流水线式地逐条处理——读一条 → 算一下 → 输出 → 丢弃 → 读下一条。这就是为什么 10TB 数据 + 8GB 内存也能跑(如果没有聚合操作的话)。
2.3 Cluster Manager——不参与计算的"人事部"
专业定义: Cluster Manager 是独立于 Spark 应用的资源管理服务,负责在 Worker 节点上拉起和销毁 Executor 进程。
常见实现:
| CM 类型 | 工作方式 | Driver 角色 |
|---|---|---|
| YARN | ResourceManager + NodeManager | client / cluster 模式 |
| Kubernetes | API Server + kubelet | driver / executor pod |
| Standalone | Master + Worker | Spark 自带的简化版 |
关键行为:
- CM 不参与 Task 分配、不参与数据计算
- Driver 通过 SparkContext 向 CM 申请资源,CM 负责在 Worker 上拉起 Executor JVM
- 为什么 Driver 不自己拉 Executor? 因为 Driver 只是一段普通代码,没有权限在集群节点启动进程。而且如果 Driver 直接拉,Executor 就成了 Driver 的子进程——一旦 Driver 挂了,Executor 就成了孤儿进程无法回收。CM 统一管理生命周期,能保证资源回收。
2.4 进程层模块图
flowchart TB
subgraph "用户侧"
A[spark-submit] --> B{部署模式}
end
subgraph "进程层"
B -->|client| C[Driver
运行在提交机器]
B -->|cluster| D[Driver
运行在集群节点]
C --> E[SparkContext]
D --> E
E -->|申请资源| F[Cluster Manager
YARN / K8s / Standalone]
F -->|拉起| G[Executor 1
JVM]
F -->|拉起| H[Executor 2
JVM]
F -->|拉起| I[Executor N
JVM]
G -->|反向注册| C
H -->|反向注册| C
I -->|反向注册| C
end
subgraph "关键原则"
J[Driver: 1个, 动脑不动手]
K[Executor: N个, 动手不决策]
L[CM: 负责招聘/辞退, 不施工]
end
2.5 进程层小结
| 进程 | 数量 | 位置 | 核心职责 |
|---|---|---|---|
| Driver | 唯一 | 提交机 / 集群 | 解析、调度、协调 |
| Executor | 多个 | Worker 节点 | 执行 Task、管理内存 |
| Cluster Manager | 1套 | 独立服务 | 管理 Executor 生命周期 |
第三章:Spark Core 抽象层——RDD、DAG 与执行单元
本章目标: 理解 Spark Core 中的核心抽象概念——RDD、依赖关系、Job/Stage/Task——以及它们与进程层的映射关系。
本章引导问题: 在第二章中,我们知道 Driver 负责"调度",Executor 负责"执行"。但调度什么?执行什么?这一章就来回答这个问题。
3.1 RDD——一切计算的起点
专业定义: RDD(Resilient Distributed Dataset)是 Spark Core 中最基础的数据抽象——它代表一个不可变、可分区、可并行计算的数据集。
RDD 不存数据,它只定义"从哪来、怎么算":
// RDD 的接口核心(伪代码)
abstract class RDD {
partitions: Array[Partition] // 数据如何分片
compute(partition): Iterator[Record] // 如何计算一个分区
dependencies: Seq[Dependency] // 依赖哪些父 RDD
}
当你在代码中做如下操作时:
lines = sc.textFile("hdfs://...") # RDD
errors = lines.filter(lambda l: "ERROR" in l) # 新 RDD
count = errors.count() # action 触发计算
每一步不是在处理数据,而是在构建一个 RDD 依赖链(Lineage)。只有当遇到 count() 这样的 Action 操作时,Spark Core 才会把这个链转化为真正的物理执行。
RDD 的"惰性"本质: Transformation(map / filter / flatMap)构建 DAG,不立即计算;Action(count / collect / saveAsTextFile)才触发 DAGScheduler 去计算。
3.2 RDD 依赖关系——窄依赖 vs 宽依赖
这是理解 Stage 切分的钥匙。
flowchart LR
subgraph "窄依赖 Narrow Dependency"
A1[父RDD Partition 0] --> B1[子RDD Partition 0]
A1 --> B2[子RDD Partition 1]
A2[父RDD Partition 1] --> B2
A2 --> B3[子RDD Partition 2]
end
subgraph "宽依赖 Wide Dependency = Shuffle"
C1[父RDD Partition 0] -->|Part of data| D1[子RDD Partition 0]
C1 -->|Part of data| D2[子RDD Partition 1]
C2[父RDD Partition 1] -->|Part of data| D1
C2 -->|Part of data| D2
end
| 特征 | 窄依赖 | 宽依赖(Shuffle) |
|---|---|---|
| 父分区与子分区的关系 | 一对一或固定映射 | 多对多,需按 Key 重新分配 |
| 数据要不要跨节点 | 不需要 | 需要跨网络传输 |
| 是否切分 Stage | 不切 | 切——形成 Stage 边界 |
| 典型算子 | map、filter、mapPartitions | groupByKey、reduceByKey、distinct、join |
| 错误恢复代价 | 低——只重算丢失的分区 | 高——所有父分区都要重算 |
速记:窄依赖 = 各分区各算各的;宽依赖 = 必须互相交换数据。
3.3 Job / Stage / Task——三层执行粒度
这三个概念构成了 Spark 执行计划的粒度递进:
flowchart TB
subgraph "一次 Spark 应用"
J1[Job 1
由一次 Action 触发]
J2[Job 2
由下一次 Action 触发]
end
subgraph "Job 1"
S1[Stage 0
无 Shuffle]
S2[Stage 1
无 Shuffle]
S3[Stage 2
无 Shuffle]
S1 -->|Shuffle 边界| S2
S2 -->|Shuffle 边界| S3
end
subgraph "Stage 1"
T1[Task
Partition 0]
T2[Task
Partition 1]
T3[Task
Partition N]
end
三个概念的精确定义:
| 概念 | 触发条件 | 粒度 | 数量级 |
|---|---|---|---|
| Job | 一次 Action 操作(count / save / collect) | 一个完整的数据流计算 | 每个应用几个~几十个 |
| Stage | Shuffle 边界(宽依赖处切分) | 一组无需跨节点移动数据的并行 Task | 每个 Job 几个~十几个 |
| Task | 一个 Partition 的分布式计算请求 | 单个数据分片的处理线程 | 每个 Stage 几十~几万个 |
Stage 切分公式:
Stage 数 = Shuffle 依赖数 + 1
3.4 进程层与抽象层的映射
进程层 抽象层
──────────────────────────────────────────────
Driver (1个 JVM)
├── SparkContext
│ ├── DAGScheduler → 负责 Job → Stage 的切分
│ └── TaskScheduler → 负责 Stage → Task 的派发
│
Executor (N个 JVM 进程)
├── 线程池
│ ├── 线程 1 → 跑 Task(Stage 0, Partition 3)
│ ├── 线程 2 → 跑 Task(Stage 0, Partition 5)
│ └── 线程 N → 跑 Task(Stage 1, Partition 0)
└── 内存区域(执行区 + 存储区)
核心映射关系:Task : Partition : Executor 线程 = 1 : 1 : 1
一个 Task 处理一个 Partition,在 Executor 的一个线程中运行。Partition 决定了 Task 的数量,Executor 的线程池决定了并行度。
3.5 本章模块图
flowchart TB
subgraph "应用层(用户可见)"
A[RDD/DF API] -->|Transformation| B[RDD Lineage DAG]
B -->|Action 触发| C[Job]
end
subgraph "调度层(Driver 内部)"
C --> D[DAGScheduler]
D -->|按 Shuffle 切分| E[Stage 0]
D -->|按 Shuffle 切分| F[Stage 1]
E -->|按分区数拆分| G[TaskSet]
F -->|按分区数拆分| H[TaskSet]
G --> I[TaskScheduler]
H --> I
I -->|动态派发| J[Executor 线程池]
end
subgraph "执行层(Executor)"
J --> K[线程1: Task(Stage0, P0)]
J --> L[线程2: Task(Stage0, P1)]
J --> M[线程3: Task(Stage0, P2)]
end
第四章:Spark SQL 前端——Catalyst 优化器
本章目标: 理解 SQL / DataFrame 是如何被 Catalyst 翻译成物理计划的,以及为什么"优化器"能优化。
本章引导问题: 上一章讲的是 Spark Core——RDD 和调度。但我们的 SQL 是怎么变成 RDD 的?谁在做这个翻译工作?答案就在这一章。
4.1 定位:Catalyst 在整个流程中的位置
SQL 字符串 / DataFrame API
↓
┌─────────┐
│ Parser │ SQL → AST(抽象语法树)
└────┬────┘
↓
┌──────────┐
│ Analyzer │ 绑定元数据(表在不在?列在不在?)
└────┬─────┘
↓
┌───────────┐
│ Optimizer │ RBO 规则优化(分区剪裁、谓词下推等)
└────┬──────┘
↓
┌──────────────┐
│ PhysicalPlan │ 选择执行策略 + 生成字节码
└──────┬───────┘
↓
RDD DAG → 交给 Spark Core 调度执行
4.2 Parser:SQL 字符串 → 抽象语法树(AST)
作用: 把人类可读的 SQL 转成机器能处理的树形结构。
输入: SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'
输出(AST,简化):
Project(Distinct(query))
└── Filter(pt_d='20260630')
└── Scan(query_table)
SQL 中每个子句映射到 AST 中的一个节点:
- FROM query_table → Scan(query_table)(从哪里读)
- WHERE pt_d='20260630' → Filter(pt_d='20260630')(怎么过滤)
- SELECT DISTINCT(query) → Project(Distinct(query))(选什么列、去重)
底层实现: Spark 使用 ANTLR 解析器 + SqlBase.g4 语法文件。如果你自定义 SQL 语法(比如加一条 HINT),就要改这个文件。
4.3 Analyzer:AST → 绑定元数据后的逻辑计划
Parser 产出的 AST 是"语义未经验证的"——它不知道 query_table 是否存在、pt_d 是什么类型。
Analyzer 的工作:连接 Catalog(元数据仓库),为 AST 中的每个节点"打上语义标签"。
Analyzer 检查清单:
├── query_table 在 Hive MetaStore 中是否存在?
├── pt_d 是否是分区列?
├── query 列的数据类型是 String 还是 Boolean?
└── pt_d='20260630' 的左右类型是否兼容?
实际案例: 如果你把分区列包在函数里(
WHERE to_date(pt_d) = '2026-06-30'),Analyzer 阶段无法识别这是一个分区过滤,分区剪裁规则也就不再匹配。这直接导致全表扫描。
4.4 Optimizer(RBO——基于规则的优化)
核心思想: 在不改变语义的前提下,通过规则重写逻辑计划,减少数据读取量和计算量。
这是用户最容易影响性能的阶段——你写的 SQL 写法直接决定了优化器能否发挥效果。
flowchart LR
subgraph "优化前"
A1[Project Distinct] --> A2[Filter] --> A3[Scan all partitions]
end
subgraph "优化后"
B1[Project Distinct] --> B2[Local Aggregate] --> B3[Filter pt_d] --> B4[Scan only pt_d dir]
B1 --> B5[Global Aggregate]
end
最关键的几条优化规则(按效果排序):
| 优化规则 | 做了什么 | 效果量化 |
|---|---|---|
| 分区剪裁 (Partition Pruning) | 将分区过滤下推到 Scan 层,只读特定分区目录 | Hive 表 365 个分区时,跳过 364/365 的目录遍历 |
| 谓词下推 (Predicate Pushdown) | 将 WHERE 条件下推到 Parquet/ORC 读取时执行 | Parquet 的 Min/Max 索引可以直接跳过不含匹配值的行组 |
| 列剪裁 (Column Pruning) | 只读取 SELECT 中出现的列 | 表有 100 列时,IO 降低到 1% |
| 常量折叠 (Constant Folding) | 编译期预计算常量表达式 | 避免 Executor 运行时重复计算 |
⚠️ 分区剪裁失效的常见原因(面试/排障高频考点):
1.WHERE to_date(pt_d) = '2026-06-30'—— 分区列被函数包裹 → 失效
2.WHERE pt_d = '20260630'—— 类型不匹配(int vs string)→ 失效
3.WHERE pt_d LIKE '202606%'—— 非精确匹配 → 失效
4.5 生成物理计划 + Tungsten Codegen
优化后的逻辑计划需要转换成可执行的物理计划,并选择一个具体的执行策略。
策略选择示例:
| 逻辑操作 | 可选物理策略 | 选择依据 |
|---|---|---|
| DISTINCT | HashAggregate vs SortAggregate | 是否有合适的哈希列、数据量预估 |
| JOIN | BroadcastHashJoin vs SortMergeJoin vs ShuffledHashJoin | 小表是否足够小、数据分布是否有倾斜 |
| 聚合 | Partial + Final 两阶段 vs 单阶段 | 是否可通过 partial 减少数据量 |
Tungsten Whole-Stage Codegen:
这是 Spark 性能创新的核心工程之一(另外两个是 Off-heap 内存管理和 Cache-friendly 数据布局)。简单说就是:
原理: Catalyst 在 Driver 的 JVM 中用 Janino 编译器动态生成一段 Java 代码,把 "读 Parquet → 过滤 → 哈希去重" 融合成一个函数,消除虚函数调用开销。
为什么这能加速?
// 没有 Codegen:Stage 0 的迭代器 -> 下一阶段 -> Stage 1 的迭代器
// 每次迭代都有大量方法调用和数据类型转换
// 有了 Codegen:生成一个闭包,所有逻辑在一个 while 循环里完成
// 数据在内存中保持原始二进制格式,无需反序列化
Codegen 的触发条件: 不是所有查询都能整阶段代码生成。例如
reduceByKey的聚合函数逻辑可能包含深拷贝、Java 对象装箱等,这些场景下 Codegen 只能覆盖部分步骤。
4.6 本章模块图
flowchart TB
subgraph "输入"
SQL[SQL 字符串]
DF[DataFrame API]
end
subgraph "Catalyst 优化器(Driver 内)"
SQL --> P[Parser
ANTLR → AST]
DF --> AO[Analyzer
绑定 Catalog 元数据]
P --> AO
AO --> O[Optimizer
RBO规则]
O --> R1[分区剪裁]
O --> R2[谓词下推]
O --> R3[列剪裁]
O --> R4[常量折叠]
R1 --> LP[优化后的逻辑计划]
R2 --> LP
R3 --> LP
R4 --> LP
LP --> PP[物理计划选择]
PP --> CG[WholeStage Codegen
Janino 生成字节码]
end
subgraph "输出"
CG --> RDD_DAG[RDD DAG
交给 Spark Core]
end
第五章:调度与执行——从逻辑计划到物理落地
本章目标: 理解 DAGScheduler 和 TaskScheduler 这两个 Driver 内部核心组件的分工协作,以及 Executor 执行 Task 的完整流程。
本章引导问题: Catalyst 输出了物理计划(怎么做)和 Codegen 字节码(具体实现)。但 "让谁去做" 和 "按什么顺序做" 还是未解决的问题。这就要靠调度系统了。
5.1 DAGScheduler——"怎么拆"
专业定义: DAGScheduler 是 SparkContext 内部的核心组件,负责将 RDD Lineage DAG 按宽依赖切分为 Stage,并以 Stage 为单位生成 TaskSet。
它的输入与输出:
输入: RDD DAG(由 Catalyst 物理计划生成)
输出: Stage 数组(每个 Stage 包含一个 TaskSet)
工作原理:
flowchart LR
subgraph "RDD DAG"
A[Read HDFS] --> B[Filter: pt_d]
B --> C[Map: extract query]
C --> D[Shuffle: DISTINCT
宽依赖]
D --> E[Reduce: global distinct]
end
subgraph "DAGScheduler 切分后"
S0["Stage 0
Task: Read→Filter→Map
本地计算, 无 Shuffle"]
S1["Stage 1
Task: Reduce→global distinct
等待 Shuffle Read"]
S0 -->|Shuffle Write| S1
end
切分规则(决定了几个 Stage):
Stage 数 = 宽依赖(Shuffle)数 + 1
本地性感知(Locality-Aware):
DAGScheduler 在划分 Stage 时,会标记每个 Partition 的首选位置(Preferred Location)——即数据块所在的节点。这个信息后面交给 TaskScheduler 用于决定 Task 发给谁,以实现计算向数据移动。
5.2 TaskScheduler——"派给谁"
专业定义: TaskScheduler 是 SparkContext 内部负责将 Task 分配到具体 Executor 去执行的组件,维护 Executor 注册表、管理失败重试和本地性降级。
DAGScheduler 和 TaskScheduler 的分工:
flowchart TB
subgraph "Driver 内部"
DAG[DAGScheduler
职责:Stage 切分
产出:TaskSet]
TS[TaskScheduler
职责:Task 派发
维护:Executor 心跳表]
DAG -->|TaskSet| TS
end
subgraph "集群"
TS -->|派发 Task| EX1[Executor 1
有数据, 有空闲CPU]
TS -->|派发 Task| EX2[Executor 2
有数据, 但无空闲CPU
等还是换人?]
TS -->|降级| EX3[Executor 3
无本地数据, 但有CPU
→ 网络拉取]
end
subgraph "本地性降级策略"
L1[PROCESS_LOCAL
同一进程] --> L2[NODE_LOCAL
同一节点]
L2 --> L3[RACK_LOCAL
同一机架]
L3 --> L4[ANY
任意节点]
end
关键行为——本地性降级:
1. Task 的首选位置是数据所在的节点(由 DAGScheduler 从 HDFS 块元数据获取)
2. TaskScheduler 先尝试在数据节点上找有空闲 CPU 的 Executor
3. 如果该节点没有空闲资源,等待 spark.locality.wait(默认 3 秒)
4. 超时后降级到更宽松的本地性级别(同一机架、任意节点)
容错机制: 如果 Executor 挂了,TaskScheduler 将该 Executor 上的 Task 重新标记为"待调度",交给其他活的 Executor。这就是"弹性(Resilient)"的来源。
5.3 Executor 执行 Task——流水线、Spill、Shuffle
Task 到达 Executor 后,Executor 内部流程如下:
flowchart TB
subgraph "Executor JVM"
subgraph "接收"
A[反序列化 Task 字节码]
B[线程池分配线程]
end
subgraph "执行"
C[打开 Parquet 分片
返回迭代器 Iterator]
D[逐条读取 Row]
E{类型判断}
E -->|纯过滤/投影| F[处理一条 → 输出 → 丢弃
内存占用 ≈ 一行大小]
E -->|聚合/分组| G[维护 HashMap
内存不足时 Spill]
end
subgraph "产出"
H["Shuffle Write
按 Key 哈希分区
写入 .data + .index 文件"]
I["最终输出
直接写 HDFS / 返回 Driver"]
end
A --> B --> C --> D --> E
F -->|无 Shuffle| I
G -->|有 Shuffle| H
end
Spill(磁盘溢写)机制——当内存不够时:
处理 DISTINCT/ GROUP BY 时内存爆炸的过程:
1. Executor 往 HashMap 里加 Key,监控内存使用率
2. 当使用率超过 spark.memory.fraction 阈值
3. 触发 Spill:将当前 HashMap 排序后写入本地磁盘
4. 清空 HashMap,继续处理剩余数据
5. 所有数据处理完 → 外部多路归并(External Merge Sort)
6. 归并时只需维护一个 Min-Heap 比较 Key,不再需要全量 HashMap
Spill 不是 OOM: 如果配置正确,Spark 不会因为单次聚合 OOM——它会 Spill。但如果 Spill 量非常大(写入几十 GB 临时数据),IO 会成为瓶颈,表现为 Task 极慢。这时候需要调整并行度或增加内存。
5.4 Shuffle——为什么它是"性能杀手"
Shuffle = 跨节点网络传输 + 磁盘落盘 + 等待消费
一个完整的 Shuffle 过程:
flowchart LR
subgraph "Stage 0 (Map 端)"
M1[Task 0] -->|按 Key 哈希分区| W1[本地磁盘
.data + .index]
M2[Task 1] -->|按 Key 哈希分区| W2[本地磁盘
.data + .index]
M3[Task 2] -->|按 Key 哈希分区| W3[本地磁盘
.data + .index]
end
subgraph "网络传输 (Netty)"
W1 -->|"Executor 1 → Executor 2"| R1[Reduce Task 0]
W2 -->|"Executor 2 → Executor 1"| R2[Reduce Task 1]
W3 -->|"Executor 3 → Executor 3"| R3[Reduce Task 2]
end
subgraph "Stage 1 (Reduce 端)"
R1 -->|聚合/排序| O1[最终结果]
R2 -->|聚合/排序| O2[最终结果]
R3 -->|聚合/排序| O3[最终结果]
end
Shuffle 慢的三层原因:
| 层面 | 原因 | 影响 |
|---|---|---|
| 网络 | 数据跨节点传输 | 受限于带宽和延迟 |
| 磁盘 | Map 端必须写入本地磁盘(即使不落盘到 HDFS) | 触发大量随机 IO |
| 计算 | Reduce 端需拉取所有 Map 端输出并等待最慢的 Task | 长尾效应 |
哪些操作触发 Shuffle:
| 操作 | 是否触发 | 原因 |
|---|---|---|
SELECT ... WHERE ... |
❌ | 只过滤,数据不移动 |
DISTINCT |
✅ | 全局去重,需汇聚相同 Key |
GROUP BY |
✅ | 全局聚合,需汇聚相同 Key |
ORDER BY |
✅ | 全局排序,需重分区 |
两张大表 JOIN |
✅ | 按关联 Key 重分区 |
JOIN ← BROADCAST(t) |
❌ | 小表复制到每个 Executor 本地 Join |
| map / filter | ❌ | 单条数据转换,无依赖 |
5.5 统一内存管理(Spark 3.x)
Executor 的 JVM 堆内内存划分:
flowchart TB
subgraph "JVM Heap (8G 示例)"
RES["预留内存 Reserved
300MB
防止 OOM"]
USR["用户内存 User
比例可配
存储用户对象"]
subgraph "统一内存 Unified Pool"
EXEC["执行内存 Execution
默认 ~60%
聚合HashMap / Join表 / Shuffle缓冲"]
STOR["存储内存 Storage
默认 ~40%
cache数据 / 广播变量"]
end
RES --- USR
USR --- EXEC
EXEC -.->|"Execution 内存不足时可
驱逐 Storage 缓存来借用"| STOR
STOR -.->|"Storage 需要时
Execution 释放"| EXEC
end
"统一内存管理"的核心机制:
- 执行内存和存储内存共享一个 UnifiedPool
- 执行内存不足时,可以向存储内存"借"(触发存储缓存驱逐)
- 存储内存有需要时,执行内存需要归还(如果正在执行则异步释放)
调优启示: 如果 Task 大量 Spill(表明执行内存不足),可以调高
spark.memory.fraction或spark.memory.storageFraction来给执行内存更多空间。
第六章:完整旅程——跟随一条 SQL 走一遍
本章目标: 将前五章的知识串联起来,沿一条 SQL 逐阶段还原完整执行流程。
6.1 起点
SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'
- 表
query_table:Hive/Parquet 表,有 365 个分区(pt_d分区列),100 列,总量 10TB - 查询:从当天分区中取出所有不重复的 query 值
6.2 阶段一:Catalyst 优化(Driver 内)
Parser: SQL → AST
Project(Distinct(query))
└── Filter(pt_d='20260630')
└── Scan(query_table)
Analyzer: 确认表存在、分区列正确、类型匹配
Optimizer:
1. 分区剪裁: pt_d='20260630' → 只读 1 个分区(跳过 364 个),数据量从 10TB → ~30GB
2. 列剪裁: 只读 query 列(跳过 99 列),数据量从 ~30GB → ~300MB
3. 谓词下推: 下推到 Parquet 读取时过滤,利用 Min/Max 索引跳过不含匹配值的行组
物理计划选择: HashAggregate(用 HashMap 做去重,比排序去重快)
Codegen: Driver 生成融合了"读 Parquet → 过滤 → 哈希"的 Java 字节码
6.3 阶段二:DAGScheduler(Driver 内)
输入: Catalyst 产出的物理计划 → RDD DAG
分析: DISTINCT 需要全局去重 → 宽依赖(Shuffle)
切分:
Stage 0(Map 端):每个 Executor 读本地 Parquet 分片 → Filter → 局部去重 → Shuffle Write
Stage 1(Reduce 端):各 Executor 拉取所有 Stage 0 输出 → 全局去重 → 输出结果
TaskSet 生成:
- Stage 0 包含 N 个 Task(N = 该分区目录下 Parquet 文件数,假设是 200 个)
- Stage 1 包含 spark.sql.shuffle.partitions 个 Task(默认 200)
6.4 阶段三:TaskScheduler(Driver 内)
- 查看每个 Task 的首选位置(DAGScheduler 从 HDFS 元数据获取的块位置)
- 查对应节点是否有空闲 Executor
- 有 → 发过去;没有 → 等 3 秒降级到其他节点
假设集群有 10 个 Executor,每个 4 核:
- Stage 0 的 200 个 Task,每轮并发 ~40 个(10 Executor × 4 核)
- Task 持续派发直到全部完成
6.5 阶段四:Executor 执行
Stage 0 执行:
1. 反序列化 Task(包含 Codegen 字节码和 Partition 索引)
2. 打开本地 Parquet 文件片,返回迭代器
3. 逐条读取 → 过滤 pt_d(Parquet 谓词下推层已过滤一部分)→ 局部 HashMap 去重
4. 如果 HashMap 内存超过阈值 → Spill 到磁盘
5. 所有数据处理完 → 按 query 哈希值分区写入 Shuffle Write 文件(.data + .index)
6. 向 Driver 汇报完成,标记 Task 成功
Stage 1 执行(在 Stage 0 全部完成后启动):
1. 每个 Reduce Task 知道自己需要拉取哪些 Map 端的哈希分区
2. 通过 Netty 直连上游 Executor 并发拉取
3. 收到数据 → 全局 HashMap 去重(多路归并合并)
4. 结果写入目标表或返回 Driver
6.6 本阶段全景图
flowchart TB
subgraph "用户提交"
A["SELECT DISTINCT(query)
FROM query_table
WHERE pt_d='20260630'"]
end
subgraph "Driver 进程"
B["Parser: SQL → AST"]
C["Analyzer: 绑定元数据"]
D["Optimizer: 分区剪裁 / 列剪裁"]
B --> C --> D
D --> E["物理计划选择 + Codegen"]
E --> F["DAGScheduler
→ Stage 0 (Map) + Stage 1 (Reduce)"]
F --> G["TaskScheduler
→ 按本地性派发 Task"]
end
subgraph "Executor 进程集群"
G --> H["Executor 1
Task(Stage0, P0)"]
G --> I["Executor 2
Task(Stage0, P1)"]
G --> J["Executor N
Task(Stage0, PN)"]
H --> K["Shuffle Write
(按 Key 哈希分区)"]
I --> K
J --> K
K --> L["Shuffle Read (Netty 跨节点拉取)"]
L --> M["Global HashAggregate
(如果 OOM → Spill)"]
M --> N["写入目标表 / 返回 Driver"]
end
subgraph "数据流动"
O[("HDFS
只读 pt_d='20260630' 分区
只读 query 列")]
O --> H
O --> I
O --> J
end
第七章:异常场景与自适应优化(AQE)
本章目标: 理解实际生产中最常见的性能杀手——数据倾斜,以及 Spark 3.x 如何通过 AQE 自动应对。
7.1 数据倾斜——长尾任务的根源
定义: 在 Shuffle 或聚合操作中,某个 Key 的数据量远大于其他 Key,导致处理该 Key 的 Task 运行时间远超其他 Task。
影响:
正常 Task:30 秒完成
倾斜 Task:30 分钟 +(甚至 OOM)
↑ 整个 Job 等这个 Task 全部 Job 耗时 ≈ 30+ 分钟
经典场景:
| 场景 | 示例 | 表现 |
|---|---|---|
| GROUP BY 倾斜 | GROUP BY city,北京 80 亿行,其他城市各几万行 |
一个 Reduce Task 数据量是其他 Task 的几百倍 |
| JOIN 倾斜 | JOIN ON user_id,部分用户(刷单 / 爬虫)行为量异常 |
SortMergeJoin 中单个分区远超其他分区 |
| 空值聚合 | GROUP BY col 中 col 有大量 NULL |
NULL 全部进入同一个分区 |
7.2 传统方案:手工加盐
以 GROUP BY city 倾斜为例:
-- 第一步:给倾斜 Key 加随机后缀,打散到多个分区做局部聚合
SELECT city, suffix, COUNT(*) AS partial_cnt
FROM (
SELECT IF(city='北京', CONCAT(city, '_', FLOOR(RAND()*10)), city) AS city_salted, ...
FROM ...
)
GROUP BY city, suffix
-- 第二步:去掉后缀,做全局聚合
SELECT city, SUM(partial_cnt) AS cnt FROM (
SELECT SPLIT(city, '_')[0] AS city, partial_cnt FROM step1
)
GROUP BY city
缺点: 需要改写 SQL、需要提前知道哪个 Key 倾斜、引入了额外的一次 Shuffle。
7.3 AQE(Adaptive Query Execution)——Spark 3.x 的自动救星
专业定义: AQE 是 Driver 内部的一个运行时优化框架——在 Shuffle Write 完成后,根据 Executor 上报的真实数据统计信息,动态调整后续物理计划。
AQE 的三板斧:
| 特性 | 触发时机 | 作用 |
|---|---|---|
| 动态合并分区 | Shuffle Read 之前 | 检测到小分区(几 KB)自动合并,减少 Task 数量 |
| 动态 Join 策略切换 | Shuffle Read 之前 | 发现某张表比预期小得多,自动切换为 Broadcast Join |
| 动态拆分倾斜分区 | Shuffle Read 之前 | 检测到某个分区数据量远超中位数,自动拆分为子分区 |
Skew Join 的完整介入时序(重点理解):
flowchart LR
subgraph "Stage 0 (Map)"
A[Task 1
产出分区 0: 800MB
产出分区 1: 5MB] -->|Shuffle Write| B[本地磁盘]
C[Task 2
产出分区 0: 5MB
产出分区 1: 5MB] -->|Shuffle Write| D[本地磁盘]
E[Task 3
产出分区 0: 5MB
产出分区 1: 5MB] -->|Shuffle Write| F[本地磁盘]
end
subgraph "Stage 0 → Stage 1 之间
Driver AQE 介入"
G[Executor 上报分区大小]
H[Driver AQE 分析:
分区 0: 810MB
其他分区: ~15MB]
I[AQE 决策:
分区 0 拆成 5 个子分区
每个 ~162MB]
G --> H --> I
end
subgraph "Stage 1 (Reduce)"
J[Reduce Task 1
处理子分区 0A
~162MB]
K[Reduce Task 2
处理子分区 0B
~162MB]
L[Reduce Task 3
处理子分区 0C
~162MB]
M[Reduce Task 4
处理子分区 1
~15MB]
end
AQE 的局限性:
1. AQE 的 Skew Join 优化的是 Join 场景——它是在 Shuffle Write 完成后调整 Reduce 端的并行度
2. 如果 Map 端(Stage 0)的 GROUP BY 本身就已经 OOM 了,AQE 救不了——因为还没走到 Shuffle Write
3. 此时仍然需要手工加盐,或者调整并行度减少单个分区数据量
AQE 和手工加盐的区别:
| 维度 | 手工加盐 | AQE Skew Join |
|---|---|---|
| 是否改 Key | ✅ 加随机后缀 | ❌ 不改 Key,只拆分区 |
| 是否需要预知倾斜 Key | ✅ 是,需要先分析数据 | ❌ 自动检测 |
| 是否入侵代码 | ✅ 需要改 SQL | ❌ 无需改代码 |
| 适用范围 | GROUP BY、JOIN 都行 | 主要是 Join |
第八章:性能调优实践
8.1 调优优先级(按投入产出比排序)
| 优先级 | 层面 | 操作 | 收益 |
|---|---|---|---|
| 🥇 第一 | 逻辑层(SQL / 表设计) | 分区剪裁、列剪裁、广播小表 | 10x ~ 100x |
| 🥈 第二 | 物理层(Shuffle 参数) | 调整并行度、开启 AQE | 2x ~ 10x |
| 🥉 第三 | 资源层(内存 / 序列化) | Kryo序列化、内存比例调整 | 1.2x ~ 2x |
调优黄金法则: 先检查逻辑层,永远不要跳过它去调参数。一个 SQL 上的
SELECT *或to_date(pt_d)造成的浪费,是任何参数调整都补不回来的。
8.2 逻辑层速查(先检查这些!)
| 检查项 | 正确做法 | 反例 |
|---|---|---|
| 分区过滤 | WHERE 直接命中分区列 | WHERE to_date(pt_d) = '2026-06-30'(被函数包裹,分区剪裁失效) |
| 列剪裁 | 只写出需要的列名 | SELECT *(读 100 列只需 1 列时,IO 放大 100 倍) |
| 广播 Join | 小表(< 几百MB)加 /*+ BROADCAST(t) */ |
两张几百 GB 的表直接 JOIN(触发 SortMergeJoin) |
| 隐式类型转换 | JOIN 键字段类型一致 | INT JOIN STRING(无法用 Hash Shuffle) |
| 不必要的 DISTINCT | 确认是否真的需要去重 | 数据本身就不重复时,DISTINCT 浪费一次 Shuffle |
| 不必要的 ORDER BY | 最终写入目标表不需要排序 | INSERT OVERWRITE ... ORDER BY ...(全局排序极其昂贵) |
8.3 物理层速查(Shuffle 相关)
| 参数 | 推荐逻辑 | 说明 |
|---|---|---|
spark.sql.shuffle.partitions |
目标每个分区 200MB ~ 500MB | 数据量大调大,数据量小调小。200 个 Task 处理:200MB × 200 = 40GB Shuffle 数据 |
spark.sql.adaptive.enabled |
true(默认) |
开启 AQE 运行时优化 |
spark.sql.adaptive.coalescePartitions.enabled |
true(默认) |
自动合并小分区 |
spark.sql.adaptive.skewJoin.enabled |
true(默认) |
自动处理 Join 数据倾斜 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64MB ~ 256MB |
AQE 合并分区的目标大小 |
8.4 资源层速查
| 参数 / 选项 | 推荐 | 说明 |
|---|---|---|
| 存储格式 | Parquet / ORC + Snappy / ZSTD | 列式存储 + 谓词下推 + 高压缩比 |
spark.serializer |
KryoSerializer |
序列化体积缩小约 1/3,需注册自定义类 |
spark.memory.fraction |
0.6 ~ 0.7 | 聚合/Join 任务可适当调高,减少 Spill |
spark.executor.cores |
3 ~ 5 核 | 避免单 Executor 核数过多导致内存竞争。核心经验:堆内存 / 核数 ≥ 2GB |
spark.executor.memory |
按数据量估算,避免单个 Executor > 64GB | 过大易导致 GC 恶化 |
spark.sql.autoBroadcastJoinThreshold |
默认 10MB,可调大到 100MB ~ 200MB(取决于可用内存) | 不超过 2GB(因为要复制到所有 Executor) |
8.5 慢 SQL 排查思路
遇到慢 SQL 时,按以下层次排查:
第一层:看 Spark Web UI(端口 4040)
→ 哪个 Stage 时间最长?
→ 该 Stage 的 Task 数量是否正常?
→ 太多(几万个)→ 合并分区或调大 shuffle.partitions
→ 太少(几个)→ 增大并行度
→ 个别 Task 远慢于其他 Task?
→ ✅ 数据倾斜 → 检查 AQE 是否生效,或手工加盐
→ Task 大量失败/重试?
→ ✅ 检查 Executor 日志:OOM?数据倾斜?网络问题?
第二层:看执行计划
EXPLAIN EXTENDED SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'
→ PartitionFilters 是否包含 pt_d?
→ 否 → 分区剪裁未生效,检查 WHERE 条件是否包裹了函数
→ 是否出现 SortMergeJoin 而不是 BroadcastHashJoin?
→ 检查小表大小和 broadcastJoinThreshold
→ Scan 的 Output 列数是否合理?
→ 列剪裁是否生效(不是 SELECT *)
第三层:看 GC 时间
→ Spark UI Stage 页 → GC Time 占比
→ > 10% → 内存压力大
→ 措施:增加 spark.executor.memory,或减少 spark.executor.cores
第四层:看 Shuffle 读写量
→ Shuffle Write / Read 异常大
→ 是否有不必要的 DISTINCT 或 ORDER BY?
→ Join 是否满足广播条件?加 BROADCAST hint 试试
→ 是否数据本身有大量重复/倾斜?
8.6 一张完整的 SQL 调优决策树
flowchart TB
A[SQL 慢] --> B{看看哪颗最慢}
B --> C["Cat Stage 慢
数据量大"]
B --> D["Shuffle Stage 慢"]
B --> E["输出 Stage 慢
写入目标表"]
C --> F{"检查 SQL 写法"}
F --> G["分区剪裁生效?"]
F --> H["列剪裁生效?"]
F --> I["是否不需要 DISTINCT?"]
G -->|否| J[改 WHERE 条件
去掉函数包裹]
H -->|否| K[去掉 SELECT *]
I -->|是| L[去掉 DISTINCT]
D --> M{"检查 Task 分布"}
M --> N["个别 Task 特别慢
→ 数据倾斜"]
M --> O["Task 数量异常
→ 并行度问题"]
N --> P["开启 AQE Skew Join
或手工加盐"]
O --> Q["调整 shuffle.partitions"]
E --> R{"检查写入方式"}
R --> S["是否 dynamic partition?"]
R --> T["文件数过多
小文件问题"]
S -->|是| U[检查分区列基数
调小并行度]
T --> V[合并小文件
用 coalesce/repartition]
附录A:完整知识全景图
本章读完后再回看此图——它是对全文的浓缩,不是预习材料。
flowchart TB
subgraph "用户层"
U["spark-submit / JDBC / DataFrame API"]
end
subgraph "进程层 - Driver (JVM)"
D1["SparkContext 初始化"]
D1 --> D2["Catalyst 优化器"]
subgraph "Catalyst 优化流水线"
D2_1["Parser
ANTLR → AST"]
D2_2["Analyzer
绑定 Catalog 元数据"]
D2_3["Optimizer (RBO)
分区剪裁 / 谓词下推 / 列剪裁"]
D2_4["Physical Plan + Codegen
策略选择 + Janino 字节码"]
D2_1 --> D2_2 --> D2_3 --> D2_4
end
D2 --> D3["DAGScheduler
RDD DAG → Stage 划分"]
D3 --> D4["TaskScheduler
Stage → Task 动态派发"]
end
subgraph "编排层 - Cluster Manager"
CM["YARN / K8s / Standalone"]
CM -->|"拉起"| EX1
CM -->|"拉起"| EX2
CM -->|"销毁"| EX_D
end
subgraph "进程层 - Executor 集群"
EX1["Executor 1
JVM + 线程池"]
EX2["Executor N
JVM + 线程池"]
EX_D["已销毁的 Executor
资源已回收"]
subgraph "Executor 内部"
EX_T["接收 Task → 反序列化
→ 迭代器式逐条处理"]
EX_M["统一内存管理
Execution + Storage 池"]
EX_S["Shuffle
Map: 写入本地磁盘
Reduce: Netty 拉取"]
EX_T --> EX_M --> EX_S
end
end
subgraph "数据层"
DATA_IN["HDFS / Hive
Parquet + 分区目录"]
DATA_SHUFFLE["Shuffle 中间文件
.data + .index"]
DATA_OUT["目标表 / 输出文件"]
end
subgraph "动态优化 AQE"
AQE["Shuffle Write 完成后
Driver 动态分析"]
AQE_C["动态合并小分区"]
AQE_S["动态拆分倾斜分区"]
AQE_J["动态切换 Join 策略"]
end
%% 连接关系
U --> D1
EX1 --> EX_T
EX2 --> EX_T
DATA_IN --> EX_T
EX_T --> DATA_SHUFFLE
DATA_SHUFFLE -.->|"Stage 边界
跨节点拉取"| EX_T
EX_T --> DATA_OUT
D4 -->|"Task 派发"| EX_T
EX_T -->|"心跳 & 分区大小上报"| D4
DATA_SHUFFLE -.->|"分区元数据"| AQE
AQE -->|"动态调整"| D4
附录B:常见疑问深度解答
下面的问题是在主流程中会自然产生的疑问。放在这里供查阅,但不属于主文的知识依赖链。
B.1 "结果集发回 Driver" 算不算 Shuffle?
不算。
Shuffle 的特指是 跨节点数据重洗牌 + 磁盘落盘 + 被下游 Stage 消费。Executor 把最终结果发回 Driver:
- 不经过磁盘(直接网络传输)
- 没有下游 Stage 来消费(这是最终产出)
在 ETL 任务中,大多数结果直接 INSERT OVERWRITE 写入 HDFS,连发回 Driver 这一步都省了。
B.2 "没有 Shuffle" 是不是一定只有一个 Stage?
是的。
公式:Stage 数 = Shuffle 依赖数 + 1。如果整个物理计划中没有宽依赖(Shuffle),则只有一个 Stage,所有 Task 并行执行,各算各的。
B.3 一个应用有几个 Driver?两个 SQL 提交有几个 Driver?
一个 Spark 应用(一次 spark-submit)有且仅有一个 Driver。
- 独立应用模式: 每次提交都拉起一个新的 Driver → N 个应用 = N 个 Driver
- Spark Thrift Server: 一个常驻 Driver 服务接收所有 SQL → 1 个 Driver 处理多个 SQL
B.4 为什么说 SELECT * 是性能杀手?
因为列剪裁失效了。
在列式存储(Parquet/ORC)下:
- 列剪裁有效时:只读 query 1 列,IO = 表大小的 1%
- 列剪裁失效时:读全部 100 列,IO = 表大小的 100%
对于 10TB 的表,这就是 100GB vs 10TB 的差距。列剪裁是调优时第一个要检查的东西。
B.5 WITH(CTE)和嵌套子查询性能有区别吗?
在 Spark 中——没有区别。 CTE 在 Catalyst 优化器阶段会被内联展开,本质上和直接写 FROM (SELECT ...) 等价。Spark 不会自动缓存 CTE 结果,如果 CTE 被引用两次,会执行两次。如果要复用结果,用 cache() 或 createOrReplaceTempView() + cacheTable()。
在 ClickHouse 中——有区别。 ClickHouse 的 WITH 默认是"代码替换",多次引用重复执行。要复用需显式加 MATERIALIZED 关键字。
B.6 Spark 和 MapReduce 的核心区别是什么?
| 维度 | MapReduce | Spark |
|---|---|---|
| 过程模型 | Map → Shuffle → Reduce 严格两阶段 | DAG 多阶段,任意层级依赖 |
| 中间结果 | 必须写入 HDFS | 内存/本地磁盘,不落 HDFS |
| 容错 | 重新运行整个任务 | RDD Lineage 只重算丢失的分区 |
| SQL 支持 | Hive(独立系统) | Catalyst 内建优化器 |
Spark 把 MapReduce 的 "Map → Shuffle → Reduce" 固定两阶段模式扩展成了更一般的 DAG 模式,同时利用内存避免中间结果写入 HDFS。这是性能差距的根本原因。
附录C:深度阅读与参考资料
官方文档
| 资源 | 地址 | 适合阅读阶段 |
|---|---|---|
| Spark SQL Guide | https://spark.apache.org/docs/latest/sql-programming-guide.html | 入门后通读 |
| Tuning Spark | https://spark.apache.org/docs/latest/tuning.html | 调优实战前 |
| Monitoring | https://spark.apache.org/docs/latest/monitoring.html | 需要排查慢 SQL 时 |
| Spark Configuration | https://spark.apache.org/docs/latest/configuration.html | 参数级调优参考 |
核心技术论文(按阅读顺序)
- Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (Zaharia et al., NSDI 2012)
-
提出 RDD 概念的原始论文。读完第三章后读,能理解 Lineage 容错的设计动机。
-
Spark SQL: Relational Data Processing in Spark (Armbrust et al., SIGMOD 2015)
-
Spark SQL 和 Catalyst 优化器的原论文。读完第四、五章后读最为合适。
-
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark (Armbrust et al., SIGMOD 2018)
- 如果后续要了解 Structured Streaming,这是必读。
内部实现解析
| 资源 | 说明 |
|---|---|
| Jacek Laskowski 的 "The Internals of Spark SQL" | https://github.com/jaceklaskowski/spark-sql-internals —— 逐类逐文件的 Spark SQL 源码解析,适合作为代码阅读时的参考路线图 |
| Spark 源码中的 SqlBase.g4 | parser 目录下的 ANTLR 语法文件,看它你就知道 Spark 支持哪些 SQL 语法 |
| Tungsten 设计文档 SPARK-12795 | https://issues.apache.org/jira/browse/SPARK-12795 —— Tungsten 项目的设计目标和技术细节 |
周边系统理解
| 系统 | 与 Spark 的关系 | 推荐学习资料 |
|---|---|---|
| YARN | Spark 最常用的资源管理器 | 《Hadoop: The Definitive Guide》第 4 章 "YARN" |
| HDFS | Spark 最常用的数据存储层 | 同上第 3 章 "HDFS" |
| Parquet / ORC | Spark 最常用的列式存储格式 | Parquet 官方文档:https://parquet.apache.org/docs/ |
| Hive | Spark SQL 可以通过 Hive Metastore 读取元数据 | Hive 分区表设计模式 |
调试工具
| 工具 | 用途 |
|---|---|
| Spark Web UI (端口 4040) | 实时查看 Job/Stage/Task 运行状态、GC 时间、Shuffle 读写量 |
| EXPLAIN EXTENDED | 查看 SQL 的执行计划,确认分区剪裁、列剪裁、Join 策略是否生效 |
| spark.sql.debug.maxToStringFields | 设置大值后打印更多计划细节 |
| history-server | 查看历史任务的 UI 信息 |
| Spark Measure (Nexus 开源) | 自定义指标采集,用于持续监控 |
修订记录
v1.0 - 2026.06.30:初版。三层架构 + 逐模块图解 + 单 SQL 全程跟随。
如果你遇到具体的慢 SQL 或异常场景,欢迎带着 EXPLAIN EXTENDED 输出 + Spark UI 截图 来进一步讨论。
还没有评论,来第一个吧