Spark
# Spark
# 了解 Spark 吗?
tag:
携程count:8
as:spark 用于解决什么问题
介绍一下 spark
你使用过程中对于 spark 和 hive 有什么见解,优劣
对于 spark 和 hive 区别
spark 或 hive 当中是对底层的那部分进行的优化(mapjoin)
怎么理解 spark 和 hive
# 数据倾斜
tag:
count:1
as:什么是数据倾斜,实习中遇到过吗怎么处理
数据倾斜根本问题存在于 key 的分布不均,在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。

# 如何定位数据倾斜
利用 SparkUI 平台,查看每个 stage 的执行时长,如果绝大多数 stage 耗时少,但个别 stage 耗时长甚至一直卡在那里,这就说明任务发生了数据倾斜。
前提:数据倾斜只会发生在 shuffle 过程中,通常发生在 join、groupby 等
如何定位?同样利用 SparkUI 平台,查看 DAG 执行图,搜索执行时长最长的 stage id 对应的代码,即为数据倾斜代码
# 如何优化数据倾斜
# 参数调整
hive
- sethive.auto.convert.join = true; (是否自动转化成 Map Join)
- sethive.map.aggr=true; (用于控制负载均衡,顶层的聚合操作放在 Map 阶段执行,从而减轻清洗阶段数据传输和 Reduce 阶段的执行时间,提升总体性能,该设置会消耗更多的内存)
- sethive.groupby.skewindata=true; (用于控制负载均衡,当数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行负载均衡)
- sethive.merge.mapfiles=true; (用于 hive 引擎合并小文件使用)
- set mapreduce.map.memory.mb=4096; (设置 Map 内存大小,解决 Memory 占用过大 / 小)
- setmapreduce.reduce.memory.mb=4096;(设置 Reduce 内存大小,解决 Memory 占用过大 / 小)
spark
- setspark.sql.adaptive.enabled=true;(是否开启调整 Partition 功能,如果开启,spark.sql.shuffle.partitions 设置的 Partition 可能会被合并到一个 Reducer 里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个 Executor 的性能,还能缓解小文件问题)
- setspark.sql.hive.convertInsertingPartitionedTable=false;(解决数据无法同步 Impala 问题,使用 Spark3 引擎必填)
- setspark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes=2048M;(Spark 小文件合并)
# map 阶段层面
- 剪裁列和剪裁行,减少全表 全字段查询
- 条件限制,查询一定要带分区字段,子查询需要先限制分区再限制时间及条件限制,减少非必要数据输入
- distribute by rand ():用来控制 map 输出结果的分发,即 map 端如何拆分数据给 reduce 端。 会根据 distribute by 后边定义的列,根据 reduce 的个数进行数据分发,默认是采用 hash 算法。当 distribute by 后边跟的列是:rand () 时,即保证每个分区的数据量基本一致。
- 选择可切分的压缩算法【注意:lzo 压缩文件是可切片的,但是它的可切片特性依赖于其索引,所以需要手动为 lzo 压缩文件创建索引】
- 让每个数据文件大小基本一致
- 过滤异常数据:空值,无效数据
# reduce 阶段层面
# 消除 shuffle
map 端 join:
- 适用场景:大表 join 小表【10M】【不适用于大表 join 大表,如果广播的数据很大,可能内存溢出】 map join 会把小表全部读入内存中,在 map 阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在 map 是进行了 join 操作,省去了 reduce 运行的效率也会高很多,mapjoin 还有一个很大的好处是能够进行不等连接的 join 操作,如果将不等条件写在 where 中,那么 mapreduce 过程中会进行笛卡尔积,运行效率特别低,如果使用 mapjoin 操作,在 map 的过程中就完成了不等值的 join 操作,效率会高很多。
- 对较小的 RDD 创建一个广播变量【数据压缩、高效的通信框架 Netty、BT 协议】,广播给所有的 executor 节点,然后利用 map 算子实现来进行 join 即可
# 增大 reduce 并行度
- 计算公式:hashCode (key)% reduce 个数
优点:实现十分简单;缺点:可能缓解数据倾斜,不一定有效果
# 加盐
方法 1:两阶段聚合(局部聚合 + 全局聚合)
没有固定适应场景
实现思路:
- 第一次是局部聚合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如 (java, 1) (java, 1) (java, 1) (java, 1),就会变成 (1_java, 1) (1_java, 1) (2_java, 1) (2_java, 1)。
- 接着对打上随机数后的数据,进行局部聚合,那么局部聚合结果,就会变成了 (1_java, 2) (2_java, 2)。
- 然后将各个 key 的前缀给去掉,就会变成 (java,2)(java,2),再次进行全局聚合操作,就可以得到最终结果 (java, 4)。
实现原理:将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key,就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
方法 2:采样倾斜 key 并分拆 join 操作
适用场景:join 的时候发生数据倾斜,经检测是由少数 key 的数据量大造成的
实现原理:为数据量特别大的 key 增加随机前缀或后缀,使得这些 key 分散到不同的 task 中;那么此时数据倾斜的 key 变了,如何 join 呢?于是我想到了将另外一份对应相同 key 的数据与随机前缀或者后缀作笛卡尔积,保证两个表可以 join
方法 3:使用随机前缀和扩容 RDD 进行 join
适用场景:如果出现数据倾斜的 key 比较多,无法将这些倾斜拆分出来
实现原理:大表加盐,小表扩容【扩容就是将该表和前缀作笛卡尔积】
# Spark 原理
tag:
count:10
as:Spark 原理?和 MR 的区别?
spark 和 mapreduce 的应用场景。
spark 底层逻辑
spark 的 shuffle
Spark 的执行过程
spark 为什么比 MapReduce 快
介绍 spark 任务执行流程
介绍 spark shuffle
mapreduce 和 spark 的区别
MR 和 Spark 的 shuffle 过程?
# 宽依赖和窄依赖
tag:
count:3
as:Spark 的宽依赖和窄依赖
spark 宽窄依赖
spark 的 RDD 的几个特点
# Spark 算子
tag:
count:4
as:spark reducebykey 和 groupbykey Spark 几种 join spark 算子 spark 算子链
# Spark job
tag:
count:2
as:Spark job task 等怎么划分的
怎么确定 spark 分解成多少个 task,即 spark 任务的并行度怎么指定
# Sprak 优化
tag:
count:1
as:日常对于 spark 的优化,sql 的优化