大数据 Shuffle 原理与实践| 青训营笔记
# 大数据 Shuffle 原理与实践| 青训营笔记
这是我参与「第四届青训营 」笔记创作活动的的第 6 天
# Shuffle 介绍
在开源实现的 MapReduce 中,存在 Map、Shuffle、Reduce 三个阶段。

为什么 shuffle 对性能非常重要
- M * R 次网络连接
- 大量数据移动
- 数据丢失风险
- 可能存在大量的排序操作
- 大量的数据序列化、反序列化操作
- 数据压缩
在大数据场景下,数据 shuffle 表示了不同分区数据交换的过程,不同的 shuffle 策略性能差异较大。目前在各个引擎中 shuffle 都是优化的重点,在 spark 框架中,shuffe 是支撑 spark 进行大规模复杂数据处理的基石。
# Shuffle 算子
Spark 中会产生 shuffle 的算子大概可以分为 4 类
| repartition | ByKey | join | Distinct |
|---|---|---|---|
| coalesce | groupByKey | cogroup | distinct |
| repartition | reduceByKey | join | |
| aggregateByKey | leftOuterJoin | ||
| combineByKey | intersection | ||
| sortByKey | subtract | ||
| sortBy | subtractByKey |
# Spark 中对 shuffle 的抽象 - 宽依赖、窄依赖

- 窄依赖:表示每一个父 (上游) RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
- NarrowDependency
- OneToOneDependency
- RangeDependency
- PruneDependency
- 宽依赖(会产生 Shuffle):表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
- ShuffleDependency:创建会产生 shuffle 的 RDD 时,RDD 会创建 Shuffle Dependency 来描述 Shuffle 相关的信息。下面为算子内部的依赖关系
- CoGroupedRDD
- Cogroup
- fullOuterJoin、rightOuterJoin、 leftOuterJoin
- join
- Cogroup
- ShuffledRDD
- combineByKeyWithClassTag
- combineByKey
- reduceByKey
- Coalesce
- sortByKey
- sortBy
- combineByKeyWithClassTag
- CoGroupedRDD
- ShuffleDependency:创建会产生 shuffle 的 RDD 时,RDD 会创建 Shuffle Dependency 来描述 Shuffle 相关的信息。下面为算子内部的依赖关系
# Shuffle Dependency 构造
构造函数
- A single key-value pair RDD, i.e. RDD[Product2[K, V]],
- Partitioner (available as partitioner property),
- Serializer,
- Optional key ordering (of Scala’s scala.math.Ordering type),
- Optional Aggregator,
- mapSideCombine flag which is disabled (i.e. false) by default.
Partitioner
- 用来将 record 映射到具体的 partition 的方法
- 接口
- numberPartitions
- getPartition
- 经典实现
- HashPartitioner

Aggregator
- 在 map 侧合并部分 record 的函数
- 接口
- createCombiner:只有一个 value 的时候初始化的方法
- mergeValue:合并一个 value 到 Aggregator 中
- mergeCombiners:合并两个 Aggregator
# Shuffle 过程
Shuffle 实现的发展历程
- Spark 0.8 及以前 Hash Based Shuffle
- Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
- Spark 0.9 引入 ExternalAppendOnlyMap
- Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle
- Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
- Spark 1.4 引入 Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle 退出历史舞台
# Shuffle 写数据
# Hash Shuffle
写数据
每个 partition 会映射到一个独立的文件
.png)
写数据优化
每个 partition 会映射到一个文件片段

- 优点:不需要排序
- 缺点:打开,创建的文件过多
# Sort Shuffle
写数据
每个 task 生成一个包含所有 partiton 数据的文件

- 优点:打开的文件少、支持 map-side combine
- 缺点:需要排序
# Tungsten Sort Shuffle

- 优点:更快的排序效率,更高的内存利用效率
- 缺点:不支持 map-side combine
# Shuffle 读数据
每个 reduce task 分别获取所有 map task 生成的属于自己的片段

# Shuffle 过程的触发流程
val text = sc.textFile("mytextfile.txt")
val counts = text
.flatMap(line => Line.split(" "))
.map(word => (word,1))
.reduceByKey(_+_)
counts.coLlect
2
3
4
5
6

# Shuffle Handle 的创建
Register Shuffle 时做的最重要的事情是根据不同条件创建不同的 shuffle Handle
- 由 action 算子触发 DAG Scheduler 进行 shuffle register
- Shuffle Register 会根据不同的条件决定注册不同的 ShuffleHandle

- spark.shuffle.sort.bypassMergeThreshold 默认为 200
# Shuffle Writer 的实现
三种 ShuffleHandle 对应了三种不同的 ShuffleWriter 的实现

BypassMergeSortShuffleWriter:HashShuffle

- 不需要排序,节省时间
- 写操作的时候会打开大量文件
UnsafeShuffleWriter:TunstonShuffle

使用类似内存页储存序列化数据
数据写入后不再反序列化
只适用 partition 数量较小的情况,使用堆外内存,没有垃圾回收的开销,没有对象模型类型开销
只根据 partition 排序 Long Array

数据不移动
只保留 24 bit 用于存储 partition 数,其他空间用于存储这些 Long Array 数据,所以超过
的 partition 数就不适用该 Writer
SortSHuffleWriter:SortShuffle

- 支持 combine
- 需要 combine 时,使用 PartitionedAppendOnlyMap,本质是个 HashTable
- 不需要 combine 时 PartitionedPairBuffer 本质是个 array
# Shuffle Reader 的实现
网络时序图

- 使用基于 netty 的网络通信框架,并接受 reducetask 的 fetch 请求
- 使用堆外内存,零拷贝
- 位置信息记录在 MapOutputTracker 中
- 主要会发送两种类型的请求
- 首先发起 openBlocks 请求获得 streamId
- 然后再处理 Chunk 请求或 Stream 请求
# Shuffle Block FetchIterator

区分 local 和 remote 节省网络消耗
防止 OOM
- maxBytesInFlight
- maxReqsInFlight
- maxBlocksInFlightPerAddress
- maxReqSizeShuffleToMem
- maxAttemptsOnNettyOOM
# External Shuffle Service

为了解决 Executor 为了服务数据的 fetch 请求导致无法退出问题,我们在每个节点上部署一个 External Shuffle Service,这样产生数据的 Executor 在不需要继续处理任务时,可以随意退出。从而优化了 Spark 作业的资源利用率,MapTask 在运行结束后可以正常退出。
# Shuffle 优化
# Zero Copy(零拷贝)
DMA (Direct Memory Access) : 直接存储器存取,是指外部设备不通过 CPU 而直接与系统内存交换数据的接口技术。
不使用 zero copy

使用 sendfile

使用 sendfile + DMA gather copy

# Netty 零拷贝
- 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
- CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
- Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo () 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝
# Shuffle 优化
避免 shuffle
使用 broadcast 替代 join

使用可以 map-side 预聚合的算子
- 没有使用 map-side 预聚合前

- 使用 map-side 预聚合后

- 没有使用 map-side 预聚合前
shuffle 参数优化
spark.default.parallelism && spark.sql.shuffle.partitions
spark.hadoopRDD.ignoreEmptySplits
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
spark.sql.file.maxPartitionBytes
spark.sql.adaptive.enark.sql.adaptivetPostShufflelnputSize
spark.reducer.maxSizelnFlight
spark.reducer.maxReqslnFlight
spark.reducer.maxBlockslnFlightPerAddress
2
3
4
5
6
7
8
# Shuffle 倾斜优化

- 数据倾斜影响
- 作业运行时间变长
- Task OOM 导致作业失败
- 可以通过提高并行度解决
- 优点:足够简单
- 缺点:只缓解、不根治问题
- Spark AQE Skew Join:AQE 根据 shuffle 文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的分区,然后各自进行 join。

# 案例

优化前

spark.sql.adaptive.shuffle.targetPostShufflelnputSize: 64M -> 512M
spark.sql.files.maxPartitionBytes: 1 G - > 40G
2
优化后

# Push Shuffle
# 为什么需要 Push Shuffle?
Shuffle 阶段常见问题
- 数据存储在本地磁盘,没有备份
- IO 并发:大量 RPC 请求(M*R)
- IO 吞吐:随机读、写放大(3X)
- GC 频繁,影响 NodeManager

- Avg IO size 太小,造成了大量的随机 IO,严重影响磁盘的吞吐
- M*R 次读请求,造成大量的网络连接,影响稳定性
# Push Shuffle 的实现
为了优化该问题,有很多公司都做了思路相近的优化,push shuffle
- Facebook: cosco (opens new window)
- LinkedIn:magnet (opens new window)
- Uber:Zeus (opens new window)
- Alibaba: RSS (opens new window)
- Tencent: FireStorm (opens new window)
- Bytedance: Cloud Shuffle Service
- Spark3.2: push based shuffle (opens new window)
# Magnet 实现原理
Magnet 主要流程

主要为边写边 push 的模式,在原有的 shuffle 基础上尝试 push 聚合数据,但并不强制完成,读取时优先读取 push 聚合的结果,对于没有来得及完成聚合或者聚合失败的情况,则 fallback 到原模式。
- Spark driver 组件,协调整体的 shuffle 操作
- map 任务的 shuffle writer 过程完成后,增加了一个额外的操作 push-merge,将数据复制一份推到远程 shuffle 服务上
- magnet shuffle service 是一个强化版的 ESS。将隶属于同一个 shuffle partition 的 block,会在远程传输到 magnet 后被 merge 到一个文件中
- reduce 任务从 magnet shuffle service 接收合并好的 shuffle 数据

- bitmap: 存储已 merge 的 mapper id,防止重复 merge
- position offset: 如果本次 block 没有正常 merge,可以恢复到上一个 block 的位置
- currentMapId: 标识当前正 append 的 block,保证不同 mapper 的 block 能依次 append
Magnet 可靠性
- 如果 Map task 输出的 Block 没有成功 Push 到 magnet 上,并且反复重试仍然失败,则 reduce task 直接从 ESS 上拉取原始 block 数据
- 如果 magnet 上的 block 因为重复或者冲突等原因,没有正常完成 merge 的过程,则 reduce task 直接拉取末完成 merge 的 block
- 如果 reduce 拉取已经 merge 好的 block 失败,则会直接拉取 merge 前的原始 block
- 本质上,magnet 中维护了两份 shuffle 数据的副本
# Cloud Shuffle Service
# Cloud Shuffle Service 思想
- IO 集合:所有 Mapper 的同一 Partition 数据都远程写到同一个文件 (或者多个文件)
- 备份:HDFS 太重,使用双磁盘副本(成本低、速度快)
- 写入速度:主从 InMemory 副本,异步刷盘,极小的失败几率去换取高速写入速度
# Cloud Shuffle Service 架构

- Zookeeper WorkerList [服务发现]
- CSS Worker [Partitions / Disk | Hdfs]
- Spark Driver [集成启动 CSS Master]
- CSS Master [Shuffle 规划 / 统计]
- CSS ShuffleClient [Write / Read]
- Spark Executor [Mapper + Reducer]
# Cloud Shuffle Service 读写流程

# Cloud Shuffle Service AQE
一个 Partition 会最终对应到多个 Epoch file,每个 EPoch 目前设置是 512MB
在聚合文件时主动将文件切分为若干块,当触发 AQE 时,按照已经切分好的文件块进行拆分。
