Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

    • Spark
    • Spark 入门
    • SprakCore
      • RDD之装饰者模式
      • RDD 弹性分布式数据集
      • RDD 特性
      • RDD 编程
        • RDD创建
        • 从集合中创建RDD
        • 从外部存储系统的数据集创建
        • 分区规则
        • 默认分区规则
        • 指定分区
        • 读取文件指定分区
    • SparkSQL
  • Flink

  • 离线数仓

  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • Spark
Iekr
2021-12-17
目录

SprakCore

# SprakCore

# RDD 之装饰者模式

我们的 bufferreadReader 就是典型的装饰者模式,通过原有类不断增强功能,并保持原有类的功能

装饰者模式:动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

image-20211217162418781

并且拥有一个特殊的特征 只有最外层的装饰者调用时 才会调用内部原有的装饰对象 称为惰性加载

img

而我们 RDD 也是通过装饰者马上来包装不断装饰原有的功能

image-20211217162734927

# RDD 弹性分布式数据集

image-20211217164425412

# RDD 特性

image-20211217174135839

# RDD 编程

image-20211217174655627

算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。

# RDD 创建

在 Spark 中创建 RDD 的创建方式可以分为三种:从集合中创建 RDD、从外部存储创建 RDD、从其他 RDD 创建。

# 从集合中创建 RDD

  • sc.parallelize(list)
  • sc.makeRDD(list)
package com.atguigu.spark.day02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//通过读取内存集合中的数据 创建RDD
object Spark01_CreateRDD_mem {
  def main(args: Array[String]): Unit = {
    //Spark配置文件对象
    val conf: SparkConf = new SparkConf().setAppName(" Spark01_CreateRDD_mem").setMaster("local[*]")
    //创建SparkContext对象
    val sc: SparkContext = new SparkContext(conf)
    //创建一个集合
    val list: List[Int] = List(1, 2, 3, 4)

    //根据集合创建RDD 方式一
    //    val rdd: RDD[Int] = sc.parallelize(list)
    //根据集合创建RDD 方式二 底层调用是parallelize方法
    val rdd: RDD[Int] = sc.makeRDD(list)

    rdd.collect().foreach(println)


    sc.stop()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 从外部存储系统的数据集创建

  • sc.textFile(path)
package com.atguigu.spark.day02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


//通过读取外部文件 创建RDD
object Spark02_CreateRDD_file {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Spark02_CreateRDD_file").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //读取本地文件数据
    val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
    rdd.collect().foreach(println)

    //从HDFS读取数据
    val hdfsRdd: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input")
    hdfsRdd.collect().foreach(println)

    sc.stop()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 分区规则

# 默认分区规则

  • rdd.partitions 查看分区
package com.atguigu.spark.day02

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}

/**
 * 默认分区
 * - 从集合中创建RDD
 *    取决于分配给应用的CPU的核数   如为* 则为cpu全部核数
 * - 读取外部文件创建RDD
 *    math.min(取决于分配给应用的CPU的核数,2)
 */
object Spark03_Partition_default {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Spark03_Partition_default").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //通过集合创建RDD
//    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
    //查看分区效果
    val partitions: Array[Partition] = rdd.partitions
    println(partitions.size) //分区数
    rdd.saveAsTextFile("D:\\code\\spark\\output")


    sc.stop()

  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

image-20211217200837737

# 指定分区

  • sc.makeRDD(data , partition)
package com.atguigu.spark.day02

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}

/**
 * 指定分区分区
 *  -根据下标与分区数进行运算 求出 [x,y) 开始到结束下标元素的具体分区分配
 *    - start (i * arr.lent) / partition
 *    - end ((i +1) * arr.lent ) / partition
 */
object Spark04_Partition_mem {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Spark04_Partition_mem").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //通过集合创建RDD
//    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),4) //默认分区数为cpu核数
    //1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4
    //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)

    //2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4
    //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)

    //3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5
    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)

    //查看分区效果
    val partitions: Array[Partition] = rdd.partitions
    println(partitions.size) //分区数
    rdd.saveAsTextFile("D:\\code\\spark\\output")


    sc.stop()

  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

image-20211217201746464

# 读取文件指定分区

package com.atguigu.spark.day02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 读取外部文件 创建RDD
 *  - 默认分区规则
 *    math.min(分配的核数,2)
 *   - 指定分区 minPartitions 最小分区数 并不是实际分区个数
 *   - 在实际计算分区个数的时候 会根据文件的总大小和 最小分区数进行相除运算
 *      -  如果余数为0  最小分区数为实际分区数
 *      - 如果余数不为0 则实际分区数要看实际切片
 *
 */
object Spark05_Partition_file {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Spark05_Partition_file").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //    val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt") //默认分区为2个
    //2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
    //val rdd: RDD[String] = sc.textFile("input/3.txt",3)

    //3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空}
    val rdd: RDD[String] = sc.textFile("input/4.txt", 3)

    rdd.saveAsTextFile("D:\\code\\spark\\output")

    sc.stop()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

image-20211217225032444

getSplits 文件返回的是切片规划,真正读取是在 compute 方法中创建 LineRecordReader 读取的,有两个关键变量

start=split.getStart()

end = start + split.getLength

假设读取 text.txt 指定最小分区为 5

abc
ef
g
hj
klm
1
2
3
4
5

此文件总大小为 19 字节 19%5=3 每个分区每次追加个数要求为 19/6=3 字节 19 字节总大小 / 3 每次字节 = 6 个分区 余 1 所有实际分区数为 6+1 =7

如果 目前总大小 /(总大小 / 最小分区数) < 1.1 则不切片 则以指定分区数 详情源码看上图的 while 循环

分区 0 的数据为 每个分区每次读取 3 字节 因为读取是读取一行如果这一行数据超过了每次读取则整行读取 并读取下个分区

0~3 索引的数据

abc\r\n
1

分区 1 的数据为 3~6 索引的数据

ef\r\n
1

分区 2 的数据为 6~9 索引的数据

g\r\n
1

分区 3 数据为 9~12 索引的数据

hj\r\n
1

分区 4 数据为 12~15 索引的数据

分区 5 数据为 15~18 索引的数据

klm\r
1

分区 6 数据为 18~1 索引的数据

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Spark 入门
SparkSQL

← Spark 入门 SparkSQL→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式