Spark SQL 内核精解:三层架构视角下的全链路拆解

适用读者:有一定大数据开发经验,但希望深入理解 Spark SQL 内部原理的数据工程师、数据平台开发人员。

阅读方式:本文采用逐层递进的结构——每章只引入该层的新概念并配模块图,最后一章才拼出完整全景图。建议顺序阅读,不要跳读。

全文线索:跟随一条 SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630',从 Driver 走到 Executor。


第一章:开篇——Spark 不是一个"软件",而是一组分层的组件

1.1 Spark Core 和 Spark SQL:底座与方言编译器

很多人刚开始接触 Spark 时,会模糊地知道它有"Spark SQL"和"DataFrame",但说不清它们和"Spark"本身是什么关系。

从架构上看,Spark 可以拆成两层:

┌─────────────────────────────────────────────────┐
│                  Spark SQL                      │
│   SQL 解析器 + Catalyst 优化器 + Tungsten 执行  │
│   ┌───────────────────────────────────────────┐ │
│   │    DataFrame / Dataset API               │ │
│   └───────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤
│               Spark Core (RDD)                  │
│   DAGScheduler + TaskScheduler + 内存管理 + IO  │
│   ┌───────────────────────────────────────────┐ │
│   │    分布式执行引擎:调度、容错、通信       │ │
│   └───────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤
│             Cluster Manager (外部编排)           │
│         YARN / Kubernetes / Standalone          │
└─────────────────────────────────────────────────┘

一句话说清关系:
- Spark Core 是分布式执行引擎——提供 RDD 抽象、DAG 调度、任务分发、容错、Shuffle 通信机制。它是整个 Spark 的底座
- Spark SQL 是构建在 Core 之上的一层"编译器"——它把 SQL 或 DataFrame API 调用编译成 RDD DAG,然后交给 Spark Core 去执行。
- DataFrame 本质上就是 Catalyst LogicalPlan 的 API 包装。你调用的 .groupBy().agg() 不是"执行",而是在构建一棵 Catalyst 算子树。

对读者意味着什么: 当你写 df.groupBy("city").count() 时,Spark 内部走的是:DataFrame API → Catalyst LogicalPlan → Catalyst PhysicalPlan → RDD DAG → Task → Executor。前面的路径里没有数据流动,直到最后的 RDD DAG 阶段才真正触发计算。

1.2 三层架构总览

Spark 运行时涉及的概念可以归入三个层级:

层级 包含什么 本质
进程层 Driver、Executor、Cluster Manager 跑在机器上的物理进程
抽象层 RDD、Job、Stage、Task、DAG Driver 内存中的逻辑调度单位
数据层 Partition、Shuffle Write/Read 文件 数据的分片与流动形态

核心原则: 这三层之间有严格的映射关系——一个 Stage 包含 N 个 Task,每个 Task 处理一个 Partition,Task 最终由 Executor 中的线程去跑。

为什么要这样分层? 因为你在 Spark Web UI 上看到的是 Job / Stage / Task(抽象层),但你配置的是 --executor-memory(进程层),你慢 SQL 的原因可能出在数据分布(数据层)。不理解分层,就无法把 UI 上的现象和配置参数对应起来。

1.3 从概念到实际的映射速览

你常说的概念 归属层级 一句话本质
spark-submit 进程层 提交命令,启动 Driver
Executor 进程层 跑 Task 的 JVM 进程
RDD 抽象层 数据集的"不存在的代理"——定义了从哪读、怎么算
Stage 抽象层 无需跨节点移动数据的"有机会并行"的任务块
Task 抽象层 处理"一个 Partition"的线程执行单元
Shuffle 数据层 跨节点重分区:网络传输 + 磁盘落盘
Partition 数据层 数据的最小分片:HDFS 的一个块,或 Shuffle 后的一个文件

第二章:进程层——物理世界中跑着哪些进程?

本章目标: 理解 Spark 运行时有哪几类进程、谁管谁、谁住哪里。

2.1 Driver——整个应用的"大脑"

专业定义: Driver 是 Spark 应用中运行 main() 方法的进程,持有 SparkContext,是整个应用的控制中心

职责:
1. 在 Driver 内部构建 SparkContext(包含 DAGScheduler、TaskScheduler 等组件)
2. 将用户代码/ SQL 转化为执行计划
3. 向 Cluster Manager 申请 Executor 资源
4. 将 Task 派发给 Executor
5. 处理 Executor 的心跳、失败重试

关键事实:
- 一个 Spark 应用有且只有一个 Driver
- Driver 运行在提交 spark-submit 的机器上(client 模式),或者运行在集群内的某台机器上(cluster 模式)
- Driver 不参与数据计算,只动脑不动手

区分口决:每次 spark-submit 一个新的应用 → 一个新的 Driver。如果用了 Spark Thrift Server,那是多个 SQL 共享一个常驻 Driver。

2.2 Executor——真正的"苦力"

专业定义: Executor 是运行在 Worker 节点上的 JVM 进程,负责执行 Driver 分发的 Task,并管理该进程内数据的存储和缓存。

职责:
1. 接收 Driver 发来的 Task 并在线程池中调度执行
2. 以迭代器方式逐条处理数据
3. 在 Shuffle 时作为生产者写出数据、作为消费者拉取数据
4. 缓存持久化数据(cache() / persist()

关键事实:
- Executor 是"有状态"的——它内部的内存被划分为执行内存(算数据用)和存储内存(缓存数据用)
- Executor 启动后会主动反向注册到 Driver(不是 Driver 来找它)
- 一个节点可以运行多个 Executor,取决于资源配置

常见误区纠正: Executor 不是一口气把数据全加载到内存的。它是流水线式地逐条处理——读一条 → 算一下 → 输出 → 丢弃 → 读下一条。这就是为什么 10TB 数据 + 8GB 内存也能跑(如果没有聚合操作的话)。

2.3 Cluster Manager——不参与计算的"人事部"

专业定义: Cluster Manager 是独立于 Spark 应用的资源管理服务,负责在 Worker 节点上拉起和销毁 Executor 进程

常见实现:

CM 类型 工作方式 Driver 角色
YARN ResourceManager + NodeManager client / cluster 模式
Kubernetes API Server + kubelet driver / executor pod
Standalone Master + Worker Spark 自带的简化版

关键行为:
- CM 不参与 Task 分配、不参与数据计算
- Driver 通过 SparkContext 向 CM 申请资源,CM 负责在 Worker 上拉起 Executor JVM
- 为什么 Driver 不自己拉 Executor? 因为 Driver 只是一段普通代码,没有权限在集群节点启动进程。而且如果 Driver 直接拉,Executor 就成了 Driver 的子进程——一旦 Driver 挂了,Executor 就成了孤儿进程无法回收。CM 统一管理生命周期,能保证资源回收。

2.4 进程层模块图

flowchart TB
    subgraph "用户侧"
        A[spark-submit] --> B{部署模式}
    end

    subgraph "进程层"
        B -->|client| C[Driver
运行在提交机器] B -->|cluster| D[Driver
运行在集群节点] C --> E[SparkContext] D --> E E -->|申请资源| F[Cluster Manager
YARN / K8s / Standalone] F -->|拉起| G[Executor 1
JVM] F -->|拉起| H[Executor 2
JVM] F -->|拉起| I[Executor N
JVM] G -->|反向注册| C H -->|反向注册| C I -->|反向注册| C end subgraph "关键原则" J[Driver: 1个, 动脑不动手] K[Executor: N个, 动手不决策] L[CM: 负责招聘/辞退, 不施工] end

2.5 进程层小结

进程 数量 位置 核心职责
Driver 唯一 提交机 / 集群 解析、调度、协调
Executor 多个 Worker 节点 执行 Task、管理内存
Cluster Manager 1套 独立服务 管理 Executor 生命周期

第三章:Spark Core 抽象层——RDD、DAG 与执行单元

本章目标: 理解 Spark Core 中的核心抽象概念——RDD、依赖关系、Job/Stage/Task——以及它们与进程层的映射关系。

本章引导问题: 在第二章中,我们知道 Driver 负责"调度",Executor 负责"执行"。但调度什么?执行什么?这一章就来回答这个问题。

3.1 RDD——一切计算的起点

专业定义: RDD(Resilient Distributed Dataset)是 Spark Core 中最基础的数据抽象——它代表一个不可变、可分区、可并行计算的数据集。

RDD 不存数据,它只定义"从哪来、怎么算":

// RDD 的接口核心(伪代码)
abstract class RDD {
   partitions: Array[Partition]           // 数据如何分片
   compute(partition): Iterator[Record]   // 如何计算一个分区
   dependencies: Seq[Dependency]          // 依赖哪些父 RDD
}

当你在代码中做如下操作时:

lines = sc.textFile("hdfs://...")           # RDD
errors = lines.filter(lambda l: "ERROR" in l)  # 新 RDD
count = errors.count()                      # action 触发计算

每一步不是在处理数据,而是在构建一个 RDD 依赖链(Lineage)。只有当遇到 count() 这样的 Action 操作时,Spark Core 才会把这个链转化为真正的物理执行。

RDD 的"惰性"本质: Transformation(map / filter / flatMap)构建 DAG,不立即计算;Action(count / collect / saveAsTextFile)才触发 DAGScheduler 去计算。

3.2 RDD 依赖关系——窄依赖 vs 宽依赖

这是理解 Stage 切分的钥匙。

flowchart LR
    subgraph "窄依赖 Narrow Dependency"
        A1[父RDD Partition 0] --> B1[子RDD Partition 0]
        A1 --> B2[子RDD Partition 1]
        A2[父RDD Partition 1] --> B2
        A2 --> B3[子RDD Partition 2]
    end

    subgraph "宽依赖 Wide Dependency = Shuffle"
        C1[父RDD Partition 0] -->|Part of data| D1[子RDD Partition 0]
        C1 -->|Part of data| D2[子RDD Partition 1]
        C2[父RDD Partition 1] -->|Part of data| D1
        C2 -->|Part of data| D2
    end
特征 窄依赖 宽依赖(Shuffle)
父分区与子分区的关系 一对一或固定映射 多对多,需按 Key 重新分配
数据要不要跨节点 不需要 需要跨网络传输
是否切分 Stage 不切 切——形成 Stage 边界
典型算子 map、filter、mapPartitions groupByKey、reduceByKey、distinct、join
错误恢复代价 低——只重算丢失的分区 高——所有父分区都要重算

速记:窄依赖 = 各分区各算各的;宽依赖 = 必须互相交换数据。

3.3 Job / Stage / Task——三层执行粒度

这三个概念构成了 Spark 执行计划的粒度递进

flowchart TB
    subgraph "一次 Spark 应用"
        J1[Job 1
由一次 Action 触发] J2[Job 2
由下一次 Action 触发] end subgraph "Job 1" S1[Stage 0
无 Shuffle] S2[Stage 1
无 Shuffle] S3[Stage 2
无 Shuffle] S1 -->|Shuffle 边界| S2 S2 -->|Shuffle 边界| S3 end subgraph "Stage 1" T1[Task
Partition 0] T2[Task
Partition 1] T3[Task
Partition N] end

三个概念的精确定义:

概念 触发条件 粒度 数量级
Job 一次 Action 操作(count / save / collect) 一个完整的数据流计算 每个应用几个~几十个
Stage Shuffle 边界(宽依赖处切分) 一组无需跨节点移动数据的并行 Task 每个 Job 几个~十几个
Task 一个 Partition 的分布式计算请求 单个数据分片的处理线程 每个 Stage 几十~几万个

Stage 切分公式:

Stage 数 = Shuffle 依赖数 + 1

3.4 进程层与抽象层的映射

进程层                         抽象层
──────────────────────────────────────────────
Driver (1个 JVM)
├── SparkContext
│   ├── DAGScheduler → 负责 Job → Stage 的切分
│   └── TaskScheduler → 负责 Stage → Task 的派发
│
Executor (N个 JVM 进程)
├── 线程池
│   ├── 线程 1 → 跑 Task(Stage 0, Partition 3)
│   ├── 线程 2 → 跑 Task(Stage 0, Partition 5)
│   └── 线程 N → 跑 Task(Stage 1, Partition 0)
└── 内存区域(执行区 + 存储区)

核心映射关系:Task : Partition : Executor 线程 = 1 : 1 : 1

一个 Task 处理一个 Partition,在 Executor 的一个线程中运行。Partition 决定了 Task 的数量,Executor 的线程池决定了并行度。

3.5 本章模块图

flowchart TB
    subgraph "应用层(用户可见)"
        A[RDD/DF API] -->|Transformation| B[RDD Lineage DAG]
        B -->|Action 触发| C[Job]
    end

    subgraph "调度层(Driver 内部)"
        C --> D[DAGScheduler]
        D -->|按 Shuffle 切分| E[Stage 0]
        D -->|按 Shuffle 切分| F[Stage 1]
        E -->|按分区数拆分| G[TaskSet]
        F -->|按分区数拆分| H[TaskSet]
        
        G --> I[TaskScheduler]
        H --> I
        I -->|动态派发| J[Executor 线程池]
    end

    subgraph "执行层(Executor)"
        J --> K[线程1: Task(Stage0, P0)]
        J --> L[线程2: Task(Stage0, P1)]
        J --> M[线程3: Task(Stage0, P2)]
    end

第四章:Spark SQL 前端——Catalyst 优化器

本章目标: 理解 SQL / DataFrame 是如何被 Catalyst 翻译成物理计划的,以及为什么"优化器"能优化。

本章引导问题: 上一章讲的是 Spark Core——RDD 和调度。但我们的 SQL 是怎么变成 RDD 的?谁在做这个翻译工作?答案就在这一章。

4.1 定位:Catalyst 在整个流程中的位置

SQL 字符串 / DataFrame API
       ↓
  ┌─────────┐
  │ Parser  │  SQL → AST(抽象语法树)
  └────┬────┘
       ↓
  ┌──────────┐
  │ Analyzer │  绑定元数据(表在不在?列在不在?)
  └────┬─────┘
       ↓
  ┌───────────┐
  │ Optimizer │  RBO 规则优化(分区剪裁、谓词下推等)
  └────┬──────┘
       ↓
  ┌──────────────┐
  │ PhysicalPlan  │  选择执行策略 + 生成字节码
  └──────┬───────┘
       ↓
  RDD DAG → 交给 Spark Core 调度执行

4.2 Parser:SQL 字符串 → 抽象语法树(AST)

作用: 把人类可读的 SQL 转成机器能处理的树形结构。

输入: SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'

输出(AST,简化):
Project(Distinct(query))
    └── Filter(pt_d='20260630')
            └── Scan(query_table)

SQL 中每个子句映射到 AST 中的一个节点:
- FROM query_tableScan(query_table)(从哪里读)
- WHERE pt_d='20260630'Filter(pt_d='20260630')(怎么过滤)
- SELECT DISTINCT(query)Project(Distinct(query))(选什么列、去重)

底层实现: Spark 使用 ANTLR 解析器 + SqlBase.g4 语法文件。如果你自定义 SQL 语法(比如加一条 HINT),就要改这个文件。

4.3 Analyzer:AST → 绑定元数据后的逻辑计划

Parser 产出的 AST 是"语义未经验证的"——它不知道 query_table 是否存在、pt_d 是什么类型。

Analyzer 的工作:连接 Catalog(元数据仓库),为 AST 中的每个节点"打上语义标签"。

Analyzer 检查清单:
├── query_table 在 Hive MetaStore 中是否存在?
├── pt_d 是否是分区列?
├── query 列的数据类型是 String 还是 Boolean?
└── pt_d='20260630' 的左右类型是否兼容?

实际案例: 如果你把分区列包在函数里(WHERE to_date(pt_d) = '2026-06-30'),Analyzer 阶段无法识别这是一个分区过滤,分区剪裁规则也就不再匹配。这直接导致全表扫描。

4.4 Optimizer(RBO——基于规则的优化)

核心思想: 在不改变语义的前提下,通过规则重写逻辑计划,减少数据读取量和计算量。

这是用户最容易影响性能的阶段——你写的 SQL 写法直接决定了优化器能否发挥效果。

flowchart LR
    subgraph "优化前"
        A1[Project Distinct] --> A2[Filter] --> A3[Scan all partitions]
    end
    
    subgraph "优化后"
        B1[Project Distinct] --> B2[Local Aggregate] --> B3[Filter pt_d] --> B4[Scan only pt_d dir]
        B1 --> B5[Global Aggregate]
    end

最关键的几条优化规则(按效果排序):

优化规则 做了什么 效果量化
分区剪裁 (Partition Pruning) 将分区过滤下推到 Scan 层,只读特定分区目录 Hive 表 365 个分区时,跳过 364/365 的目录遍历
谓词下推 (Predicate Pushdown) 将 WHERE 条件下推到 Parquet/ORC 读取时执行 Parquet 的 Min/Max 索引可以直接跳过不含匹配值的行组
列剪裁 (Column Pruning) 只读取 SELECT 中出现的列 表有 100 列时,IO 降低到 1%
常量折叠 (Constant Folding) 编译期预计算常量表达式 避免 Executor 运行时重复计算

⚠️ 分区剪裁失效的常见原因(面试/排障高频考点):
1. WHERE to_date(pt_d) = '2026-06-30' —— 分区列被函数包裹 → 失效
2. WHERE pt_d = '20260630' —— 类型不匹配(int vs string)→ 失效
3. WHERE pt_d LIKE '202606%' —— 非精确匹配 → 失效

4.5 生成物理计划 + Tungsten Codegen

优化后的逻辑计划需要转换成可执行的物理计划,并选择一个具体的执行策略。

策略选择示例:

逻辑操作 可选物理策略 选择依据
DISTINCT HashAggregate vs SortAggregate 是否有合适的哈希列、数据量预估
JOIN BroadcastHashJoin vs SortMergeJoin vs ShuffledHashJoin 小表是否足够小、数据分布是否有倾斜
聚合 Partial + Final 两阶段 vs 单阶段 是否可通过 partial 减少数据量

Tungsten Whole-Stage Codegen:

这是 Spark 性能创新的核心工程之一(另外两个是 Off-heap 内存管理和 Cache-friendly 数据布局)。简单说就是:

原理: Catalyst 在 Driver 的 JVM 中用 Janino 编译器动态生成一段 Java 代码,把 "读 Parquet → 过滤 → 哈希去重" 融合成一个函数,消除虚函数调用开销。

为什么这能加速?

// 没有 Codegen:Stage 0 的迭代器 -> 下一阶段 -> Stage 1 的迭代器
// 每次迭代都有大量方法调用和数据类型转换

// 有了 Codegen:生成一个闭包,所有逻辑在一个 while 循环里完成
// 数据在内存中保持原始二进制格式,无需反序列化

Codegen 的触发条件: 不是所有查询都能整阶段代码生成。例如 reduceByKey 的聚合函数逻辑可能包含深拷贝、Java 对象装箱等,这些场景下 Codegen 只能覆盖部分步骤。

4.6 本章模块图

flowchart TB
    subgraph "输入"
        SQL[SQL 字符串]
        DF[DataFrame API]
    end
    
    subgraph "Catalyst 优化器(Driver 内)"
        SQL --> P[Parser
ANTLR → AST] DF --> AO[Analyzer
绑定 Catalog 元数据] P --> AO AO --> O[Optimizer
RBO规则] O --> R1[分区剪裁] O --> R2[谓词下推] O --> R3[列剪裁] O --> R4[常量折叠] R1 --> LP[优化后的逻辑计划] R2 --> LP R3 --> LP R4 --> LP LP --> PP[物理计划选择] PP --> CG[WholeStage Codegen
Janino 生成字节码] end subgraph "输出" CG --> RDD_DAG[RDD DAG
交给 Spark Core] end

第五章:调度与执行——从逻辑计划到物理落地

本章目标: 理解 DAGScheduler 和 TaskScheduler 这两个 Driver 内部核心组件的分工协作,以及 Executor 执行 Task 的完整流程。

本章引导问题: Catalyst 输出了物理计划(怎么做)和 Codegen 字节码(具体实现)。但 "让谁去做" 和 "按什么顺序做" 还是未解决的问题。这就要靠调度系统了。

5.1 DAGScheduler——"怎么拆"

专业定义: DAGScheduler 是 SparkContext 内部的核心组件,负责将 RDD Lineage DAG 按宽依赖切分为 Stage,并以 Stage 为单位生成 TaskSet。

它的输入与输出:

输入: RDD DAG(由 Catalyst 物理计划生成)
输出: Stage 数组(每个 Stage 包含一个 TaskSet)

工作原理:

flowchart LR
    subgraph "RDD DAG"
        A[Read HDFS] --> B[Filter: pt_d]
        B --> C[Map: extract query]
        C --> D[Shuffle: DISTINCT
宽依赖] D --> E[Reduce: global distinct] end subgraph "DAGScheduler 切分后" S0["Stage 0
Task: Read→Filter→Map
本地计算, 无 Shuffle"] S1["Stage 1
Task: Reduce→global distinct
等待 Shuffle Read"] S0 -->|Shuffle Write| S1 end

切分规则(决定了几个 Stage):

Stage 数 = 宽依赖(Shuffle)数 + 1

本地性感知(Locality-Aware):
DAGScheduler 在划分 Stage 时,会标记每个 Partition 的首选位置(Preferred Location)——即数据块所在的节点。这个信息后面交给 TaskScheduler 用于决定 Task 发给谁,以实现计算向数据移动

5.2 TaskScheduler——"派给谁"

专业定义: TaskScheduler 是 SparkContext 内部负责将 Task 分配到具体 Executor 去执行的组件,维护 Executor 注册表、管理失败重试和本地性降级。

DAGScheduler 和 TaskScheduler 的分工:

flowchart TB
    subgraph "Driver 内部"
        DAG[DAGScheduler
职责:Stage 切分
产出:TaskSet] TS[TaskScheduler
职责:Task 派发
维护:Executor 心跳表] DAG -->|TaskSet| TS end subgraph "集群" TS -->|派发 Task| EX1[Executor 1
有数据, 有空闲CPU] TS -->|派发 Task| EX2[Executor 2
有数据, 但无空闲CPU
等还是换人?] TS -->|降级| EX3[Executor 3
无本地数据, 但有CPU
→ 网络拉取] end subgraph "本地性降级策略" L1[PROCESS_LOCAL
同一进程] --> L2[NODE_LOCAL
同一节点] L2 --> L3[RACK_LOCAL
同一机架] L3 --> L4[ANY
任意节点] end

关键行为——本地性降级:
1. Task 的首选位置是数据所在的节点(由 DAGScheduler 从 HDFS 块元数据获取)
2. TaskScheduler 先尝试在数据节点上找有空闲 CPU 的 Executor
3. 如果该节点没有空闲资源,等待 spark.locality.wait(默认 3 秒)
4. 超时后降级到更宽松的本地性级别(同一机架、任意节点)

容错机制: 如果 Executor 挂了,TaskScheduler 将该 Executor 上的 Task 重新标记为"待调度",交给其他活的 Executor。这就是"弹性(Resilient)"的来源。

5.3 Executor 执行 Task——流水线、Spill、Shuffle

Task 到达 Executor 后,Executor 内部流程如下:

flowchart TB
    subgraph "Executor JVM"
        subgraph "接收"
            A[反序列化 Task 字节码]
            B[线程池分配线程]
        end
        
        subgraph "执行"
            C[打开 Parquet 分片
返回迭代器 Iterator] D[逐条读取 Row] E{类型判断} E -->|纯过滤/投影| F[处理一条 → 输出 → 丢弃
内存占用 ≈ 一行大小] E -->|聚合/分组| G[维护 HashMap
内存不足时 Spill] end subgraph "产出" H["Shuffle Write
按 Key 哈希分区
写入 .data + .index 文件"] I["最终输出
直接写 HDFS / 返回 Driver"] end A --> B --> C --> D --> E F -->|无 Shuffle| I G -->|有 Shuffle| H end

Spill(磁盘溢写)机制——当内存不够时:

处理 DISTINCT/ GROUP BY 时内存爆炸的过程:
1. Executor 往 HashMap 里加 Key,监控内存使用率
2. 当使用率超过 spark.memory.fraction 阈值
3. 触发 Spill:将当前 HashMap 排序后写入本地磁盘
4. 清空 HashMap,继续处理剩余数据
5. 所有数据处理完 → 外部多路归并(External Merge Sort)
6. 归并时只需维护一个 Min-Heap 比较 Key,不再需要全量 HashMap

Spill 不是 OOM: 如果配置正确,Spark 不会因为单次聚合 OOM——它会 Spill。但如果 Spill 量非常大(写入几十 GB 临时数据),IO 会成为瓶颈,表现为 Task 极慢。这时候需要调整并行度或增加内存。

5.4 Shuffle——为什么它是"性能杀手"

Shuffle = 跨节点网络传输 + 磁盘落盘 + 等待消费

一个完整的 Shuffle 过程:

flowchart LR
    subgraph "Stage 0 (Map 端)"
        M1[Task 0] -->|按 Key 哈希分区| W1[本地磁盘
.data + .index] M2[Task 1] -->|按 Key 哈希分区| W2[本地磁盘
.data + .index] M3[Task 2] -->|按 Key 哈希分区| W3[本地磁盘
.data + .index] end subgraph "网络传输 (Netty)" W1 -->|"Executor 1 → Executor 2"| R1[Reduce Task 0] W2 -->|"Executor 2 → Executor 1"| R2[Reduce Task 1] W3 -->|"Executor 3 → Executor 3"| R3[Reduce Task 2] end subgraph "Stage 1 (Reduce 端)" R1 -->|聚合/排序| O1[最终结果] R2 -->|聚合/排序| O2[最终结果] R3 -->|聚合/排序| O3[最终结果] end

Shuffle 慢的三层原因:

层面 原因 影响
网络 数据跨节点传输 受限于带宽和延迟
磁盘 Map 端必须写入本地磁盘(即使不落盘到 HDFS) 触发大量随机 IO
计算 Reduce 端需拉取所有 Map 端输出并等待最慢的 Task 长尾效应

哪些操作触发 Shuffle:

操作 是否触发 原因
SELECT ... WHERE ... 只过滤,数据不移动
DISTINCT 全局去重,需汇聚相同 Key
GROUP BY 全局聚合,需汇聚相同 Key
ORDER BY 全局排序,需重分区
两张大表 JOIN 按关联 Key 重分区
JOIN ← BROADCAST(t) 小表复制到每个 Executor 本地 Join
map / filter 单条数据转换,无依赖

5.5 统一内存管理(Spark 3.x)

Executor 的 JVM 堆内内存划分:

flowchart TB
    subgraph "JVM Heap (8G 示例)"
        RES["预留内存 Reserved
300MB
防止 OOM"] USR["用户内存 User
比例可配
存储用户对象"] subgraph "统一内存 Unified Pool" EXEC["执行内存 Execution
默认 ~60%
聚合HashMap / Join表 / Shuffle缓冲"] STOR["存储内存 Storage
默认 ~40%
cache数据 / 广播变量"] end RES --- USR USR --- EXEC EXEC -.->|"Execution 内存不足时可
驱逐 Storage 缓存来借用"| STOR STOR -.->|"Storage 需要时
Execution 释放"| EXEC end

"统一内存管理"的核心机制:
- 执行内存和存储内存共享一个 UnifiedPool
- 执行内存不足时,可以向存储内存"借"(触发存储缓存驱逐)
- 存储内存有需要时,执行内存需要归还(如果正在执行则异步释放)

调优启示: 如果 Task 大量 Spill(表明执行内存不足),可以调高 spark.memory.fractionspark.memory.storageFraction 来给执行内存更多空间。


第六章:完整旅程——跟随一条 SQL 走一遍

本章目标: 将前五章的知识串联起来,沿一条 SQL 逐阶段还原完整执行流程。

6.1 起点

SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'
  • query_table:Hive/Parquet 表,有 365 个分区(pt_d 分区列),100 列,总量 10TB
  • 查询:从当天分区中取出所有不重复的 query 值

6.2 阶段一:Catalyst 优化(Driver 内)

Parser: SQL → AST

Project(Distinct(query))
    └── Filter(pt_d='20260630')
            └── Scan(query_table)

Analyzer: 确认表存在、分区列正确、类型匹配

Optimizer:
1. 分区剪裁: pt_d='20260630' → 只读 1 个分区(跳过 364 个),数据量从 10TB → ~30GB
2. 列剪裁: 只读 query 列(跳过 99 列),数据量从 ~30GB → ~300MB
3. 谓词下推: 下推到 Parquet 读取时过滤,利用 Min/Max 索引跳过不含匹配值的行组

物理计划选择: HashAggregate(用 HashMap 做去重,比排序去重快)

Codegen: Driver 生成融合了"读 Parquet → 过滤 → 哈希"的 Java 字节码

6.3 阶段二:DAGScheduler(Driver 内)

输入: Catalyst 产出的物理计划 → RDD DAG

分析: DISTINCT 需要全局去重 → 宽依赖(Shuffle)

切分:

Stage 0(Map 端):每个 Executor 读本地 Parquet 分片 → Filter → 局部去重 → Shuffle Write
Stage 1(Reduce 端):各 Executor 拉取所有 Stage 0 输出 → 全局去重 → 输出结果

TaskSet 生成:
- Stage 0 包含 N 个 Task(N = 该分区目录下 Parquet 文件数,假设是 200 个)
- Stage 1 包含 spark.sql.shuffle.partitions 个 Task(默认 200)

6.4 阶段三:TaskScheduler(Driver 内)

  1. 查看每个 Task 的首选位置(DAGScheduler 从 HDFS 元数据获取的块位置)
  2. 查对应节点是否有空闲 Executor
  3. 有 → 发过去;没有 → 等 3 秒降级到其他节点

假设集群有 10 个 Executor,每个 4 核:
- Stage 0 的 200 个 Task,每轮并发 ~40 个(10 Executor × 4 核)
- Task 持续派发直到全部完成

6.5 阶段四:Executor 执行

Stage 0 执行:
1. 反序列化 Task(包含 Codegen 字节码和 Partition 索引)
2. 打开本地 Parquet 文件片,返回迭代器
3. 逐条读取 → 过滤 pt_d(Parquet 谓词下推层已过滤一部分)→ 局部 HashMap 去重
4. 如果 HashMap 内存超过阈值 → Spill 到磁盘
5. 所有数据处理完 → 按 query 哈希值分区写入 Shuffle Write 文件(.data + .index)
6. 向 Driver 汇报完成,标记 Task 成功

Stage 1 执行(在 Stage 0 全部完成后启动):
1. 每个 Reduce Task 知道自己需要拉取哪些 Map 端的哈希分区
2. 通过 Netty 直连上游 Executor 并发拉取
3. 收到数据 → 全局 HashMap 去重(多路归并合并)
4. 结果写入目标表或返回 Driver

6.6 本阶段全景图

flowchart TB
    subgraph "用户提交"
        A["SELECT DISTINCT(query)
FROM query_table
WHERE pt_d='20260630'"] end subgraph "Driver 进程" B["Parser: SQL → AST"] C["Analyzer: 绑定元数据"] D["Optimizer: 分区剪裁 / 列剪裁"] B --> C --> D D --> E["物理计划选择 + Codegen"] E --> F["DAGScheduler
→ Stage 0 (Map) + Stage 1 (Reduce)"] F --> G["TaskScheduler
→ 按本地性派发 Task"] end subgraph "Executor 进程集群" G --> H["Executor 1
Task(Stage0, P0)"] G --> I["Executor 2
Task(Stage0, P1)"] G --> J["Executor N
Task(Stage0, PN)"] H --> K["Shuffle Write
(按 Key 哈希分区)"] I --> K J --> K K --> L["Shuffle Read (Netty 跨节点拉取)"] L --> M["Global HashAggregate
(如果 OOM → Spill)"] M --> N["写入目标表 / 返回 Driver"] end subgraph "数据流动" O[("HDFS
只读 pt_d='20260630' 分区
只读 query 列")] O --> H O --> I O --> J end

第七章:异常场景与自适应优化(AQE)

本章目标: 理解实际生产中最常见的性能杀手——数据倾斜,以及 Spark 3.x 如何通过 AQE 自动应对。

7.1 数据倾斜——长尾任务的根源

定义: 在 Shuffle 或聚合操作中,某个 Key 的数据量远大于其他 Key,导致处理该 Key 的 Task 运行时间远超其他 Task。

影响:

正常 Task:30 秒完成
倾斜 Task:30 分钟 +(甚至 OOM)
           ↑ 整个 Job 等这个 Task 全部 Job 耗时 ≈ 30+ 分钟

经典场景:

场景 示例 表现
GROUP BY 倾斜 GROUP BY city,北京 80 亿行,其他城市各几万行 一个 Reduce Task 数据量是其他 Task 的几百倍
JOIN 倾斜 JOIN ON user_id,部分用户(刷单 / 爬虫)行为量异常 SortMergeJoin 中单个分区远超其他分区
空值聚合 GROUP BY col 中 col 有大量 NULL NULL 全部进入同一个分区

7.2 传统方案:手工加盐

GROUP BY city 倾斜为例:

-- 第一步:给倾斜 Key 加随机后缀,打散到多个分区做局部聚合
SELECT city, suffix, COUNT(*) AS partial_cnt
FROM (
    SELECT IF(city='北京', CONCAT(city, '_', FLOOR(RAND()*10)), city) AS city_salted, ...
    FROM ...
)
GROUP BY city, suffix

-- 第二步:去掉后缀,做全局聚合
SELECT city, SUM(partial_cnt) AS cnt FROM (
    SELECT SPLIT(city, '_')[0] AS city, partial_cnt FROM step1
)
GROUP BY city

缺点: 需要改写 SQL、需要提前知道哪个 Key 倾斜、引入了额外的一次 Shuffle。

7.3 AQE(Adaptive Query Execution)——Spark 3.x 的自动救星

专业定义: AQE 是 Driver 内部的一个运行时优化框架——在 Shuffle Write 完成后,根据 Executor 上报的真实数据统计信息,动态调整后续物理计划。

AQE 的三板斧:

特性 触发时机 作用
动态合并分区 Shuffle Read 之前 检测到小分区(几 KB)自动合并,减少 Task 数量
动态 Join 策略切换 Shuffle Read 之前 发现某张表比预期小得多,自动切换为 Broadcast Join
动态拆分倾斜分区 Shuffle Read 之前 检测到某个分区数据量远超中位数,自动拆分为子分区

Skew Join 的完整介入时序(重点理解):

flowchart LR
    subgraph "Stage 0 (Map)"
        A[Task 1
产出分区 0: 800MB
产出分区 1: 5MB] -->|Shuffle Write| B[本地磁盘] C[Task 2
产出分区 0: 5MB
产出分区 1: 5MB] -->|Shuffle Write| D[本地磁盘] E[Task 3
产出分区 0: 5MB
产出分区 1: 5MB] -->|Shuffle Write| F[本地磁盘] end subgraph "Stage 0 → Stage 1 之间
Driver AQE 介入" G[Executor 上报分区大小] H[Driver AQE 分析:
分区 0: 810MB
其他分区: ~15MB] I[AQE 决策:
分区 0 拆成 5 个子分区
每个 ~162MB] G --> H --> I end subgraph "Stage 1 (Reduce)" J[Reduce Task 1
处理子分区 0A
~162MB] K[Reduce Task 2
处理子分区 0B
~162MB] L[Reduce Task 3
处理子分区 0C
~162MB] M[Reduce Task 4
处理子分区 1
~15MB] end

AQE 的局限性:
1. AQE 的 Skew Join 优化的是 Join 场景——它是在 Shuffle Write 完成后调整 Reduce 端的并行度
2. 如果 Map 端(Stage 0)的 GROUP BY 本身就已经 OOM 了,AQE 救不了——因为还没走到 Shuffle Write
3. 此时仍然需要手工加盐,或者调整并行度减少单个分区数据量

AQE 和手工加盐的区别:

维度 手工加盐 AQE Skew Join
是否改 Key ✅ 加随机后缀 ❌ 不改 Key,只拆分区
是否需要预知倾斜 Key ✅ 是,需要先分析数据 ❌ 自动检测
是否入侵代码 ✅ 需要改 SQL ❌ 无需改代码
适用范围 GROUP BY、JOIN 都行 主要是 Join

第八章:性能调优实践

8.1 调优优先级(按投入产出比排序)

优先级 层面 操作 收益
🥇 第一 逻辑层(SQL / 表设计) 分区剪裁、列剪裁、广播小表 10x ~ 100x
🥈 第二 物理层(Shuffle 参数) 调整并行度、开启 AQE 2x ~ 10x
🥉 第三 资源层(内存 / 序列化) Kryo序列化、内存比例调整 1.2x ~ 2x

调优黄金法则: 先检查逻辑层,永远不要跳过它去调参数。一个 SQL 上的 SELECT *to_date(pt_d) 造成的浪费,是任何参数调整都补不回来的。

8.2 逻辑层速查(先检查这些!)

检查项 正确做法 反例
分区过滤 WHERE 直接命中分区列 WHERE to_date(pt_d) = '2026-06-30'(被函数包裹,分区剪裁失效)
列剪裁 只写出需要的列名 SELECT *(读 100 列只需 1 列时,IO 放大 100 倍)
广播 Join 小表(< 几百MB)加 /*+ BROADCAST(t) */ 两张几百 GB 的表直接 JOIN(触发 SortMergeJoin)
隐式类型转换 JOIN 键字段类型一致 INT JOIN STRING(无法用 Hash Shuffle)
不必要的 DISTINCT 确认是否真的需要去重 数据本身就不重复时,DISTINCT 浪费一次 Shuffle
不必要的 ORDER BY 最终写入目标表不需要排序 INSERT OVERWRITE ... ORDER BY ...(全局排序极其昂贵)

8.3 物理层速查(Shuffle 相关)

参数 推荐逻辑 说明
spark.sql.shuffle.partitions 目标每个分区 200MB ~ 500MB 数据量大调大,数据量小调小。200 个 Task 处理:200MB × 200 = 40GB Shuffle 数据
spark.sql.adaptive.enabled true(默认) 开启 AQE 运行时优化
spark.sql.adaptive.coalescePartitions.enabled true(默认) 自动合并小分区
spark.sql.adaptive.skewJoin.enabled true(默认) 自动处理 Join 数据倾斜
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB ~ 256MB AQE 合并分区的目标大小

8.4 资源层速查

参数 / 选项 推荐 说明
存储格式 Parquet / ORC + Snappy / ZSTD 列式存储 + 谓词下推 + 高压缩比
spark.serializer KryoSerializer 序列化体积缩小约 1/3,需注册自定义类
spark.memory.fraction 0.6 ~ 0.7 聚合/Join 任务可适当调高,减少 Spill
spark.executor.cores 3 ~ 5 核 避免单 Executor 核数过多导致内存竞争。核心经验:堆内存 / 核数 ≥ 2GB
spark.executor.memory 按数据量估算,避免单个 Executor > 64GB 过大易导致 GC 恶化
spark.sql.autoBroadcastJoinThreshold 默认 10MB,可调大到 100MB ~ 200MB(取决于可用内存) 不超过 2GB(因为要复制到所有 Executor)

8.5 慢 SQL 排查思路

遇到慢 SQL 时,按以下层次排查:

第一层:看 Spark Web UI(端口 4040)

→ 哪个 Stage 时间最长?
   → 该 Stage 的 Task 数量是否正常?
      → 太多(几万个)→ 合并分区或调大 shuffle.partitions
      → 太少(几个)→ 增大并行度
   → 个别 Task 远慢于其他 Task?
      → ✅ 数据倾斜 → 检查 AQE 是否生效,或手工加盐
   → Task 大量失败/重试?
      → ✅ 检查 Executor 日志:OOM?数据倾斜?网络问题?

第二层:看执行计划

EXPLAIN EXTENDED SELECT DISTINCT(query) FROM query_table WHERE pt_d='20260630'
→ PartitionFilters 是否包含 pt_d?
   → 否 → 分区剪裁未生效,检查 WHERE 条件是否包裹了函数
→ 是否出现 SortMergeJoin 而不是 BroadcastHashJoin?
   → 检查小表大小和 broadcastJoinThreshold
→ Scan 的 Output 列数是否合理?
   → 列剪裁是否生效(不是 SELECT *)

第三层:看 GC 时间

→ Spark UI Stage 页 → GC Time 占比
   → > 10% → 内存压力大
   → 措施:增加 spark.executor.memory,或减少 spark.executor.cores

第四层:看 Shuffle 读写量

→ Shuffle Write / Read 异常大
   → 是否有不必要的 DISTINCT 或 ORDER BY?
   → Join 是否满足广播条件?加 BROADCAST hint 试试
   → 是否数据本身有大量重复/倾斜?

8.6 一张完整的 SQL 调优决策树

flowchart TB
    A[SQL 慢] --> B{看看哪颗最慢}
    B --> C["Cat Stage 慢
数据量大"] B --> D["Shuffle Stage 慢"] B --> E["输出 Stage 慢
写入目标表"] C --> F{"检查 SQL 写法"} F --> G["分区剪裁生效?"] F --> H["列剪裁生效?"] F --> I["是否不需要 DISTINCT?"] G -->|否| J[改 WHERE 条件
去掉函数包裹] H -->|否| K[去掉 SELECT *] I -->|是| L[去掉 DISTINCT] D --> M{"检查 Task 分布"} M --> N["个别 Task 特别慢
→ 数据倾斜"] M --> O["Task 数量异常
→ 并行度问题"] N --> P["开启 AQE Skew Join
或手工加盐"] O --> Q["调整 shuffle.partitions"] E --> R{"检查写入方式"} R --> S["是否 dynamic partition?"] R --> T["文件数过多
小文件问题"] S -->|是| U[检查分区列基数
调小并行度] T --> V[合并小文件
用 coalesce/repartition]

附录A:完整知识全景图

本章读完后再回看此图——它是对全文的浓缩,不是预习材料。

flowchart TB
    subgraph "用户层"
        U["spark-submit / JDBC / DataFrame API"]
    end

    subgraph "进程层 - Driver (JVM)"
        D1["SparkContext 初始化"]
        D1 --> D2["Catalyst 优化器"]
        
        subgraph "Catalyst 优化流水线"
            D2_1["Parser
ANTLR → AST"] D2_2["Analyzer
绑定 Catalog 元数据"] D2_3["Optimizer (RBO)
分区剪裁 / 谓词下推 / 列剪裁"] D2_4["Physical Plan + Codegen
策略选择 + Janino 字节码"] D2_1 --> D2_2 --> D2_3 --> D2_4 end D2 --> D3["DAGScheduler
RDD DAG → Stage 划分"] D3 --> D4["TaskScheduler
Stage → Task 动态派发"] end subgraph "编排层 - Cluster Manager" CM["YARN / K8s / Standalone"] CM -->|"拉起"| EX1 CM -->|"拉起"| EX2 CM -->|"销毁"| EX_D end subgraph "进程层 - Executor 集群" EX1["Executor 1
JVM + 线程池"] EX2["Executor N
JVM + 线程池"] EX_D["已销毁的 Executor
资源已回收"] subgraph "Executor 内部" EX_T["接收 Task → 反序列化
→ 迭代器式逐条处理"] EX_M["统一内存管理
Execution + Storage 池"] EX_S["Shuffle
Map: 写入本地磁盘
Reduce: Netty 拉取"] EX_T --> EX_M --> EX_S end end subgraph "数据层" DATA_IN["HDFS / Hive
Parquet + 分区目录"] DATA_SHUFFLE["Shuffle 中间文件
.data + .index"] DATA_OUT["目标表 / 输出文件"] end subgraph "动态优化 AQE" AQE["Shuffle Write 完成后
Driver 动态分析"] AQE_C["动态合并小分区"] AQE_S["动态拆分倾斜分区"] AQE_J["动态切换 Join 策略"] end %% 连接关系 U --> D1 EX1 --> EX_T EX2 --> EX_T DATA_IN --> EX_T EX_T --> DATA_SHUFFLE DATA_SHUFFLE -.->|"Stage 边界
跨节点拉取"| EX_T EX_T --> DATA_OUT D4 -->|"Task 派发"| EX_T EX_T -->|"心跳 & 分区大小上报"| D4 DATA_SHUFFLE -.->|"分区元数据"| AQE AQE -->|"动态调整"| D4

附录B:常见疑问深度解答

下面的问题是在主流程中会自然产生的疑问。放在这里供查阅,但不属于主文的知识依赖链。

B.1 "结果集发回 Driver" 算不算 Shuffle?

不算。

Shuffle 的特指是 跨节点数据重洗牌 + 磁盘落盘 + 被下游 Stage 消费。Executor 把最终结果发回 Driver:
- 不经过磁盘(直接网络传输)
- 没有下游 Stage 来消费(这是最终产出)

在 ETL 任务中,大多数结果直接 INSERT OVERWRITE 写入 HDFS,连发回 Driver 这一步都省了。

B.2 "没有 Shuffle" 是不是一定只有一个 Stage?

是的。

公式:Stage 数 = Shuffle 依赖数 + 1。如果整个物理计划中没有宽依赖(Shuffle),则只有一个 Stage,所有 Task 并行执行,各算各的。

B.3 一个应用有几个 Driver?两个 SQL 提交有几个 Driver?

一个 Spark 应用(一次 spark-submit)有且仅有一个 Driver。

  • 独立应用模式: 每次提交都拉起一个新的 Driver → N 个应用 = N 个 Driver
  • Spark Thrift Server: 一个常驻 Driver 服务接收所有 SQL → 1 个 Driver 处理多个 SQL

B.4 为什么说 SELECT * 是性能杀手?

因为列剪裁失效了。

在列式存储(Parquet/ORC)下:
- 列剪裁有效时:只读 query 1 列,IO = 表大小的 1%
- 列剪裁失效时:读全部 100 列,IO = 表大小的 100%

对于 10TB 的表,这就是 100GB vs 10TB 的差距。列剪裁是调优时第一个要检查的东西

B.5 WITH(CTE)和嵌套子查询性能有区别吗?

在 Spark 中——没有区别。 CTE 在 Catalyst 优化器阶段会被内联展开,本质上和直接写 FROM (SELECT ...) 等价。Spark 不会自动缓存 CTE 结果,如果 CTE 被引用两次,会执行两次。如果要复用结果,用 cache()createOrReplaceTempView() + cacheTable()

在 ClickHouse 中——有区别。 ClickHouse 的 WITH 默认是"代码替换",多次引用重复执行。要复用需显式加 MATERIALIZED 关键字。

B.6 Spark 和 MapReduce 的核心区别是什么?

维度 MapReduce Spark
过程模型 Map → Shuffle → Reduce 严格两阶段 DAG 多阶段,任意层级依赖
中间结果 必须写入 HDFS 内存/本地磁盘,不落 HDFS
容错 重新运行整个任务 RDD Lineage 只重算丢失的分区
SQL 支持 Hive(独立系统) Catalyst 内建优化器

Spark 把 MapReduce 的 "Map → Shuffle → Reduce" 固定两阶段模式扩展成了更一般的 DAG 模式,同时利用内存避免中间结果写入 HDFS。这是性能差距的根本原因。


附录C:深度阅读与参考资料

官方文档

资源 地址 适合阅读阶段
Spark SQL Guide https://spark.apache.org/docs/latest/sql-programming-guide.html 入门后通读
Tuning Spark https://spark.apache.org/docs/latest/tuning.html 调优实战前
Monitoring https://spark.apache.org/docs/latest/monitoring.html 需要排查慢 SQL 时
Spark Configuration https://spark.apache.org/docs/latest/configuration.html 参数级调优参考

核心技术论文(按阅读顺序)

  1. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (Zaharia et al., NSDI 2012)
  2. 提出 RDD 概念的原始论文。读完第三章后读,能理解 Lineage 容错的设计动机。

  3. Spark SQL: Relational Data Processing in Spark (Armbrust et al., SIGMOD 2015)

  4. Spark SQL 和 Catalyst 优化器的原论文。读完第四、五章后读最为合适。

  5. Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark (Armbrust et al., SIGMOD 2018)

  6. 如果后续要了解 Structured Streaming,这是必读。

内部实现解析

资源 说明
Jacek Laskowski 的 "The Internals of Spark SQL" https://github.com/jaceklaskowski/spark-sql-internals —— 逐类逐文件的 Spark SQL 源码解析,适合作为代码阅读时的参考路线图
Spark 源码中的 SqlBase.g4 parser 目录下的 ANTLR 语法文件,看它你就知道 Spark 支持哪些 SQL 语法
Tungsten 设计文档 SPARK-12795 https://issues.apache.org/jira/browse/SPARK-12795 —— Tungsten 项目的设计目标和技术细节

周边系统理解

系统 与 Spark 的关系 推荐学习资料
YARN Spark 最常用的资源管理器 《Hadoop: The Definitive Guide》第 4 章 "YARN"
HDFS Spark 最常用的数据存储层 同上第 3 章 "HDFS"
Parquet / ORC Spark 最常用的列式存储格式 Parquet 官方文档:https://parquet.apache.org/docs/
Hive Spark SQL 可以通过 Hive Metastore 读取元数据 Hive 分区表设计模式

调试工具

工具 用途
Spark Web UI (端口 4040) 实时查看 Job/Stage/Task 运行状态、GC 时间、Shuffle 读写量
EXPLAIN EXTENDED 查看 SQL 的执行计划,确认分区剪裁、列剪裁、Join 策略是否生效
spark.sql.debug.maxToStringFields 设置大值后打印更多计划细节
history-server 查看历史任务的 UI 信息
Spark Measure (Nexus 开源) 自定义指标采集,用于持续监控

修订记录

v1.0 - 2026.06.30:初版。三层架构 + 逐模块图解 + 单 SQL 全程跟随。


如果你遇到具体的慢 SQL 或异常场景,欢迎带着 EXPLAIN EXTENDED 输出 + Spark UI 截图 来进一步讨论。