Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

  • 青训营

    • 第四届青训营

      • SQL Optimizer 解析| 青训营笔记
      • 流/批/OLAP 一体的 Flink 引擎介绍| 青训营笔记
      • Exactly Once 语义在 Flink 中的实现| 青训营笔记
      • 流式计算中的 Window 计算| 青训营笔记
      • Spark 原理与实践| 青训营笔记
      • 大数据 Shuffle 原理与实践| 青训营笔记
        • Shuffle介绍
        • Shuffle 算子
          • Spark中对shuffle的抽象-宽依赖、窄依赖
          • Shuffle Dependency 构造
        • Shuffle 过程
          • Shuffle 写数据
          • Hash Shuffle
          • Sort Shuffle
          • Tungsten Sort Shuffle
          • Shuffle 读数据
          • Shuffle 过程的触发流程
          • Shuffle Handle 的创建
          • Shuffle Writer 的实现
          • Shuffle Reader 的实现
          • Shuffle Block FetchIterator
          • External Shuffle Service
          • Shuffle 优化
          • Zero Copy(零拷贝)
          • Netty 零拷贝
          • Shuffle 优化
          • Shuffle 倾斜优化
          • 案例
        • Push Shuffle
          • 为什么需要Push Shuffle?
          • Push Shuffle 的实现
          • Magnet 实现原理
          • Cloud Shuffle Service
          • Cloud Shuffle Service 思想
          • Cloud Shuffle Service 架构
          • Cloud Shuffle Service 读写流程
          • Cloud Shuffle Service AQE
      • Presto 架构原理与优化介绍| 青训营笔记
      • HDFS 原理与应用| 青训营笔记
      • HDFS 高可用与高扩展性机制分析| 青训营笔记
      • 深入浅出 HBase 实战| 青训营笔记
      • 数据湖三剑客:Delta Lake、Hudi 与 Iceberg| 青训营笔记
      • 从 Kafka 到 Pulsar 的数据流演进之路| 青训营笔记
      • Parquet 和 ORC:高性能列式存储| 青训营笔记
      • LSMT 存储引擎浅析| 青训营笔记
      • 浅谈分布式一致性协议| 青训营笔记
      • 走进 YARN 资源管理和调度| 青训营笔记
      • 深入理解 K8S 资源管理和调度| 青训营笔记
      • 实时数据中心建设思路与企业实践| 青训营笔记
      • 用户数据分析理论与最佳实践| 青训营笔记
      • 大数据可视化理论与案例分析| 青训营笔记
  • DolphinScheduler

  • Doris

  • 大数据
  • 青训营
  • 第四届青训营
Iekr
2022-08-01
目录

大数据 Shuffle 原理与实践| 青训营笔记

# 大数据 Shuffle 原理与实践| 青训营笔记

这是我参与「第四届青训营 」笔记创作活动的的第 6 天

# Shuffle 介绍

在开源实现的 MapReduce 中,存在 Map、Shuffle、Reduce 三个阶段。

image-20220801094750050

为什么 shuffle 对性能非常重要

  • M * R 次网络连接
  • 大量数据移动
  • 数据丢失风险
  • 可能存在大量的排序操作
  • 大量的数据序列化、反序列化操作
  • 数据压缩

在大数据场景下,数据 shuffle 表示了不同分区数据交换的过程,不同的 shuffle 策略性能差异较大。目前在各个引擎中 shuffle 都是优化的重点,在 spark 框架中,shuffe 是支撑 spark 进行大规模复杂数据处理的基石。

# Shuffle 算子

Spark 中会产生 shuffle 的算子大概可以分为 4 类

repartition ByKey join Distinct
coalesce groupByKey cogroup distinct
repartition reduceByKey join
aggregateByKey leftOuterJoin
combineByKey intersection
sortByKey subtract
sortBy subtractByKey

# Spark 中对 shuffle 的抽象 - 宽依赖、窄依赖

image-20220801032113040

  • 窄依赖:表示每一个父 (上游) RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
    • NarrowDependency
    • OneToOneDependency
    • RangeDependency
    • PruneDependency
  • 宽依赖(会产生 Shuffle):表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
    • ShuffleDependency:创建会产生 shuffle 的 RDD 时,RDD 会创建 Shuffle Dependency 来描述 Shuffle 相关的信息。下面为算子内部的依赖关系
      • CoGroupedRDD
        • Cogroup
          • fullOuterJoin、rightOuterJoin、 leftOuterJoin
          • join
      • ShuffledRDD
        • combineByKeyWithClassTag
          • combineByKey
          • reduceByKey
        • Coalesce
        • sortByKey
          • sortBy

# Shuffle Dependency 构造

构造函数

  • A single key-value pair RDD, i.e. RDD[Product2[K, V]],
  • Partitioner (available as partitioner property),
  • Serializer,
  • Optional key ordering (of Scala’s scala.math.Ordering type),
  • Optional Aggregator,
  • mapSideCombine flag which is disabled (i.e. false) by default.

Partitioner

  • 用来将 record 映射到具体的 partition 的方法
  • 接口
    • numberPartitions
    • getPartition
  • 经典实现
    • HashPartitioner

image-20220801101450467

Aggregator

  • 在 map 侧合并部分 record 的函数
  • 接口
    • createCombiner:只有一个 value 的时候初始化的方法
    • mergeValue:合并一个 value 到 Aggregator 中
    • mergeCombiners:合并两个 Aggregator

# Shuffle 过程

Shuffle 实现的发展历程

  • Spark 0.8 及以前 Hash Based Shuffle
  • Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
  • Spark 0.9 引入 ExternalAppendOnlyMap
  • Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle
  • Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
  • Spark 1.4 引入 Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle 退出历史舞台

# Shuffle 写数据

# Hash Shuffle

写数据

每个 partition 会映射到一个独立的文件

image (1)

写数据优化

每个 partition 会映射到一个文件片段

606406c0-7230-481c-8e1f-b0610377be57

  • 优点:不需要排序
  • 缺点:打开,创建的文件过多

# Sort Shuffle

写数据

每个 task 生成一个包含所有 partiton 数据的文件

1a5bd618-be67-479f-8078-dd0337a97b12

  • 优点:打开的文件少、支持 map-side combine
  • 缺点:需要排序

# Tungsten Sort Shuffle

195230-364a26e9c63e6ec5.png

  • 优点:更快的排序效率,更高的内存利用效率
  • 缺点:不支持 map-side combine

# Shuffle 读数据

每个 reduce task 分别获取所有 map task 生成的属于自己的片段

89244d56-00f4-42b6-91c0-b52d38b0ae4e

# Shuffle 过程的触发流程

val text = sc.textFile("mytextfile.txt")
val counts = text
    .flatMap(line => Line.split(" "))
    .map(word => (word,1))
    .reduceByKey(_+_)
counts.coLlect
1
2
3
4
5
6

image-20220801133244323

# Shuffle Handle 的创建

Register Shuffle 时做的最重要的事情是根据不同条件创建不同的 shuffle Handle

  • 由 action 算子触发 DAG Scheduler 进行 shuffle register
  • Shuffle Register 会根据不同的条件决定注册不同的 ShuffleHandle

image-20220801135521872

  • spark.shuffle.sort.bypassMergeThreshold 默认为 200

# Shuffle Writer 的实现

三种 ShuffleHandle 对应了三种不同的 ShuffleWriter 的实现

image-20220801134301599

  • BypassMergeSortShuffleWriter:HashShuffle image-20220801134609707

    • 不需要排序,节省时间
    • 写操作的时候会打开大量文件
  • UnsafeShuffleWriter:TunstonShuffle

    image-20220801134856337

    • 使用类似内存页储存序列化数据

    • 数据写入后不再反序列化

    • 只适用 partition 数量较小的情况,使用堆外内存,没有垃圾回收的开销,没有对象模型类型开销

    • 只根据 partition 排序 Long Array

      image-20220801135127337

    • 数据不移动

    • 只保留 24 bit 用于存储 partition 数,其他空间用于存储这些 Long Array 数据,所以超过224 的 partition 数就不适用该 Writer

  • SortSHuffleWriter:SortShuffle image-20220801135800710

    • 支持 combine
    • 需要 combine 时,使用 PartitionedAppendOnlyMap,本质是个 HashTable
    • 不需要 combine 时 PartitionedPairBuffer 本质是个 array

# Shuffle Reader 的实现

网络时序图

image-20220801140820559

  • 使用基于 netty 的网络通信框架,并接受 reducetask 的 fetch 请求
    • 使用堆外内存,零拷贝
  • 位置信息记录在 MapOutputTracker 中
  • 主要会发送两种类型的请求
    • 首先发起 openBlocks 请求获得 streamId
    • 然后再处理 Chunk 请求或 Stream 请求

# Shuffle Block FetchIterator

f4b32a52-fd11-403b-ac8d-29a8ab268be1

  • 区分 local 和 remote 节省网络消耗

  • 防止 OOM

    • maxBytesInFlight
    • maxReqsInFlight
    • maxBlocksInFlightPerAddress
    • maxReqSizeShuffleToMem
    • maxAttemptsOnNettyOOM

# External Shuffle Service

67bf2c59-9350-4fbd-9377-344b225de0ff

为了解决 Executor 为了服务数据的 fetch 请求导致无法退出问题,我们在每个节点上部署一个 External Shuffle Service,这样产生数据的 Executor 在不需要继续处理任务时,可以随意退出。从而优化了 Spark 作业的资源利用率,MapTask 在运行结束后可以正常退出。

# Shuffle 优化

# Zero Copy(零拷贝)

DMA (Direct Memory Access) : 直接存储器存取,是指外部设备不通过 CPU 而直接与系统内存交换数据的接口技术。

不使用 zero copy

image-20220801142204849

使用 sendfile

image-20220801142454380

使用 sendfile + DMA gather copy

image-20220801142516789

# Netty 零拷贝

  • 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo () 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝

# Shuffle 优化

  • 避免 shuffle

    • 使用 broadcast 替代 join

      image-20220801154536628

  • 使用可以 map-side 预聚合的算子

    • 没有使用 map-side 预聚合前 image-20220801154618087
    • 使用 map-side 预聚合后 image-20220801154635076
  • shuffle 参数优化

spark.default.parallelism && spark.sql.shuffle.partitions
spark.hadoopRDD.ignoreEmptySplits
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
spark.sql.file.maxPartitionBytes
spark.sql.adaptive.enark.sql.adaptivetPostShufflelnputSize
spark.reducer.maxSizelnFlight
spark.reducer.maxReqslnFlight
spark.reducer.maxBlockslnFlightPerAddress
1
2
3
4
5
6
7
8

# Shuffle 倾斜优化

image-20220801155215221

  • 数据倾斜影响
    • 作业运行时间变长
    • Task OOM 导致作业失败
  • 可以通过提高并行度解决 image-20220801155502517
    • 优点:足够简单
    • 缺点:只缓解、不根治问题
  • Spark AQE Skew Join:AQE 根据 shuffle 文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的分区,然后各自进行 join。 image-20220801155731646

# 案例

image-20220801160035574

优化前

image-20220801160207831

spark.sql.adaptive.shuffle.targetPostShufflelnputSize: 64M -> 512M
spark.sql.files.maxPartitionBytes: 1 G - > 40G
1
2

优化后

image-20220801160221296

# Push Shuffle

# 为什么需要 Push Shuffle?

Shuffle 阶段常见问题

  • 数据存储在本地磁盘,没有备份
  • IO 并发:大量 RPC 请求(M*R)
  • IO 吞吐:随机读、写放大(3X)
  • GC 频繁,影响 NodeManager

image-20220801160545591

  • Avg IO size 太小,造成了大量的随机 IO,严重影响磁盘的吞吐
  • M*R 次读请求,造成大量的网络连接,影响稳定性

# Push Shuffle 的实现

为了优化该问题,有很多公司都做了思路相近的优化,push shuffle

  • Facebook: cosco (opens new window)
  • LinkedIn:magnet (opens new window)
  • Uber:Zeus (opens new window)
  • Alibaba: RSS (opens new window)
  • Tencent: FireStorm (opens new window)
  • Bytedance: Cloud Shuffle Service
  • Spark3.2: push based shuffle (opens new window)

# Magnet 实现原理

Magnet 主要流程

image-20220801160911588

主要为边写边 push 的模式,在原有的 shuffle 基础上尝试 push 聚合数据,但并不强制完成,读取时优先读取 push 聚合的结果,对于没有来得及完成聚合或者聚合失败的情况,则 fallback 到原模式。

  • Spark driver 组件,协调整体的 shuffle 操作
  • map 任务的 shuffle writer 过程完成后,增加了一个额外的操作 push-merge,将数据复制一份推到远程 shuffle 服务上
  • magnet shuffle service 是一个强化版的 ESS。将隶属于同一个 shuffle partition 的 block,会在远程传输到 magnet 后被 merge 到一个文件中
  • reduce 任务从 magnet shuffle service 接收合并好的 shuffle 数据

image-20220801164100903

  • bitmap: 存储已 merge 的 mapper id,防止重复 merge
  • position offset: 如果本次 block 没有正常 merge,可以恢复到上一个 block 的位置
  • currentMapId: 标识当前正 append 的 block,保证不同 mapper 的 block 能依次 append

Magnet 可靠性

  • 如果 Map task 输出的 Block 没有成功 Push 到 magnet 上,并且反复重试仍然失败,则 reduce task 直接从 ESS 上拉取原始 block 数据
  • 如果 magnet 上的 block 因为重复或者冲突等原因,没有正常完成 merge 的过程,则 reduce task 直接拉取末完成 merge 的 block
  • 如果 reduce 拉取已经 merge 好的 block 失败,则会直接拉取 merge 前的原始 block
  • 本质上,magnet 中维护了两份 shuffle 数据的副本

# Cloud Shuffle Service

# Cloud Shuffle Service 思想

  • IO 集合:所有 Mapper 的同一 Partition 数据都远程写到同一个文件 (或者多个文件)
  • 备份:HDFS 太重,使用双磁盘副本(成本低、速度快)
  • 写入速度:主从 InMemory 副本,异步刷盘,极小的失败几率去换取高速写入速度

# Cloud Shuffle Service 架构

0164a207-7d97-4ff6-9b1d-a0e68977c793

  • Zookeeper WorkerList [服务发现]
  • CSS Worker [Partitions / Disk | Hdfs]
  • Spark Driver [集成启动 CSS Master]
  • CSS Master [Shuffle 规划 / 统计]
  • CSS ShuffleClient [Write / Read]
  • Spark Executor [Mapper + Reducer]

# Cloud Shuffle Service 读写流程

e1d6d671-0933-4783-828d-d8bf0bbfc88d

# Cloud Shuffle Service AQE

一个 Partition 会最终对应到多个 Epoch file,每个 EPoch 目前设置是 512MB

在聚合文件时主动将文件切分为若干块,当触发 AQE 时,按照已经切分好的文件块进行拆分。

image-20220801170338848

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Spark 原理与实践| 青训营笔记
Presto 架构原理与优化介绍| 青训营笔记

← Spark 原理与实践| 青训营笔记 Presto 架构原理与优化介绍| 青训营笔记→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式