Spark 入门
# Spark 入门
部署 Spark 集群大体上分为两种模式:单机模式与集群模式 大多数分布式框架都支持单机模式,方便开发者调试框架的运行环境。但是在生产环境中,并不会使用单机模式。因此,后续直接按照集群模式部署 Spark 集群。 下面详细列举了 Spark 目前支持的部署模式。
- Local 模式:在本地部署单个 Spark 服务
- Standalone 模式:Spark 自带的任务调度模式。(国内常用)
- YARN 模式:Spark 使用 Hadoop 的 YARN 组件进行资源与任务调度。(国内常用)
- Mesos 模式:Spark 使用 Mesos 平台进行资源与任务的调度。
1)官网地址:http://spark.apache.org/
2)文档查看地址:https://spark.apache.org/docs/2.1.1/
3)下载地址:https://spark.apache.org/downloads.html
# Local 模式
上传并解压安装包
cd /opt/software/
tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
cd /opt/module/
mv spark-2.1.1-bin-hadoop2.7/ spark-local
cd spark-local
2
3
4
5
# 官方求 PI 案例
利用蒙特・卡罗算法求 PI
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
--class:表示要执行程序的主类;
--master local[2]
local: 没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算
local [K]: 指定使用 K 个 Core 来运行计算,比如 local [2] 就是运行 2 个 Core 来执行
local [*]: 自动帮你按照 CPU 最多核来设置线程数。比如 CPU 有 4 核,Spark 帮你自动设置 4 个线程计算
spark-examples_2.11-2.1.1.jar:要运行的程序;
10:要运行程序的输入参数(计算圆周率 π 的次数,计算次数越多,准确率越高);

# 官方 WordCount 案例
读取多个输入文件,统计每个单词出现的总次数

准备数据
mkdir input
echo hello world > input/1.txt
echo hello spark > input/2.txt
2
3
启动 spark-shell
bin/spark-shell

sc 是 SparkCore 程序的入口;spark 是 SparkSQL 程序入口;master = local [*] 表示本地模式运行。
sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

查看 web 页面

spark-shell 窗口关闭掉,则 hadoop102:4040 页面关闭
本地模式下,默认的调度器为 FIFO。

# 集群角色
# Master 和 Worker

# Driver 和 Executor

Master 和 Worker 是 Spark 的守护进程,即 Spark 在特定模式下正常运行所必须的进程。Driver 和 Executor 是临时程序,当有具体任务提交到 Spark 集群才会开启的程序
# Standalone 模式
Standalone 模式是 Spark 自带的资源调动引擎,构建一个由 Master + Slave 构成的 Spark 集群,Spark 运行在集群中。
这个要和 Hadoop 中的 Standalone 区别开来。这里的 Standalone 是指只用 Spark 来搭建一个集群,不需要借助其他的框架。是相对于 Yarn 和 Mesos 来说的。
集群规划
| hadoop102 | hadoop103 | hadoop104 | |
|---|---|---|---|
| Spark | Master Worker | Worker | Worker |
解压安装
cd /opt/software/
tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
cd /opt/module/
mv spark-2.1.1-bin-hadoop2.7/ spark-standalone
cd spark-standalone
2
3
4
5
配置 spark
cd conf
mv slaves.template slaves
vim slaves
hadoop102
hadoop103
hadoop104
2
3
4
5
6
7
修改 spark-env.sh 文件,添加 master 节点
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077
2
3
4
5
分发
xsync /opt/module/spark-standalone/
启动集群
cd ..
sbin/start-all.sh
2
查看进程
jps
================atguigu@hadoop102================
3330 Jps
3238 Worker
3163 Master
================atguigu@hadoop103================
2966 Jps
2908 Worker
================atguigu@hadoop104================
2978 Worker
3036 Jps
2
3
4
5
6
7
8
9
10
11
12
如果遇到 “JAVA_HOME not set” 异常,可以在 sbin 目录下的 spark-config.sh 文件中加入如下配置
export JAVA_HOME=/opt/module/jdk1.8.0_212
网页查看 访问 hadoop102:8080
官方求 PI 案例
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
查看 web 页面 hadoop102:8080

一共 12 个 cores 12 核 每个核 1024 内存

我们提交任务时也可以通过属性来控制 核数和内存
配置 Executor 可用内存为 2G,使用 CPU 核数为 2 个
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6
7
| 参数 | 解释 | 可选值举例 |
|---|---|---|
| --class | Spark 程序中包含主函数的类 | |
| --master | Spark 程序运行的模式 | 本地模式:local [*]、spark://hadoop102:7077、 Yarn |
| --executor-memory 1G | 指定每个 executor 可用内存为 1G | 符合集群内存配置即可,具体情况具体分析。 |
| --total-executor-cores 2 | 指定所有 executor 使用的 cpu 核数为 2 个 | |
| application-jar | 打包好的应用 jar,包含依赖。这个 URL 在集群中全局可见。 比如 hdfs:// 共享存储系统,如果是 file://path,那么所有的节点的 path 都包含同样的 jar | |
| application-arguments | 传给 main () 方法的参数 |
# 配置历史服务
由于 spark-shell 停止掉后,hadoop102:4040 页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。
先停止 spark
sbin/stop-all.sh
修改 spark-default.conf 文件
cd /opt/module/spark-standalone/conf/
mv spark-defaults.conf.template spark-defaults.conf
vi spark-defaults.conf
2
3
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:8020/directory
2
分发
xsync spark-defaults.conf
启动 hadoop 集群并且保证 logdir 的目录提前存在
start-dfs.sh
hadoop fs -mkdir /directory
2
修改 spark-env.sh 文件
vi spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
-Dspark.history.retainedApplications=30"
2
3
4
5
6
参数 1 含义:WEBUI 访问的端口号为 18080
参数 2 含义:指定历史服务器日志存储路径
参数 3 含义:指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
分发配置文件
xsync spark-env.sh
启动 spark
sbin/start-all.sh
启动历史服务
sbin/start-history-server.sh

再执行任务
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6
7
访问历史服务器
http://hadoop102:18080/

# 配置高可用(HA)

停止 spark 集群
sbin/stop-all.sh
启动 zookeeper
zk.sh start
修改 spark-env.sh 文件添加如下配置
vim conf/spark-env.sh
#注释掉如下内容:
#SPARK_MASTER_HOST=hadoop102
#SPARK_MASTER_PORT=7077
#添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104
-Dspark.deploy.zookeeper.dir=/spark"
#Zookeeper3.5的AdminServer默认端口是8080,和Spark的WebUI冲突 所以要把spark默认8080端口改为8989
export SPARK_MASTER_WEBUI_PORT=8989
2
3
4
5
6
7
8
9
10
11
12
13
14
分发配置
xsync conf/spark-env.sh
启动
sbin/start-all.sh
在 103 上单独启动 master 节点 一共有两个 master 节点
sbin/start-master.sh

hadoop103 上的 master 处于待命状态
在 102 将 sprak-local/input 数据上传到 hadoop 集群的 /input 目录
hadoop fs -put /opt/module/spark-local/input/ /input
spark HA 集群访问 注意 master 为两个 spark master
bin/spark-shell \
--master spark://hadoop102:7077,hadoop103:7077 \
--executor-memory 2g \
--total-executor-cores 2
2
3
4
执行 Wordcount 程序
sc.textFile("hdfs://hadoop102:8020/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
测试高可用 在 102 上 kill 掉 Master 进程
查看 hadoop103:8989 是否从 STANDBY 变为 Alive
# 运行流程
Spark 有 standalone-client 和 standalone-cluster 两种模式,主要区别在于:Driver 程序的运行节点。
客户端模式
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077,hadoop103:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6
7
8
--deploy-mode client,表示 Driver 程序运行在本地客户端 默认为 client

集群模式模式
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077,hadoop103:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6
7
8

查看 http://hadoop102:8989 / 页面,点击 Completed Drivers 里面的 Worker 的结果



# Yarn 模式
Spark 客户端直接连接 Yarn,不需要额外构建 Spark 集群。
停止 Standalone 模式下的 spark 集群
sbin/stop-all.sh
sbin/stop-master.sh #103的master
zk.sh stop
2
3
解压安装
cd /opt/software
tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
cd /opt/module/
mv spark-2.1.1-bin-hadoop2.7/ spark-yarn
2
3
4
修改 hadoop 配置文件 /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml 添加如下内容
因为测试环境虚拟机内存较少,防止执行过程进行被意外杀死,做如下配置
vi /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<!-- Spark2中jersey版本是2.22,但是yarn中还需要依赖1.9,版本不兼容 -->
<property>
<name>yarn.timeline-service.enabled</name>
<value>false</value>
</property>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
分发
xsync /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
修改 /opt/module/spark/conf/spark-env.sh,添加 YARN_CONF_DIR 配置,保证后续运行任务的路径都变成集群路径
cd /opt/module/spark-yarn/conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
2
3
4
5
分发 spark-yarn
xsync /opt/module/spark-yarn/
启动 HDFS 以及 YARN 集群
start-dfs.sh #102
start-yarn.sh #103
2
执行求 PI 案例
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
--master yarn,表示 Yarn 方式运行
如果遇到 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig 报错 上面配置 yarn-site.xml 已经解决此报错问题
1. 找到 yarn 下面相关包
find /usr/hdp/ |grep jersey
2. 拷贝 jar 到 spark
所缺的类在 jersey-core-1.9.jar 和 jersey-client-1.9.jar 两个 jar 包中 将 jersey-core-1.9.jar 和 jersey-client-1.9.jar 这两个包拷贝到 $SPARK_HOME/jars 目录下
# 配置历史服务
由于是重新解压的 Spark 压缩文件,所以需要针对 Yarn 模式,再次配置一下历史服务器。
修改 spark-default.conf.template
cd /opt/module/spark-yarn/conf/
mv spark-defaults.conf.template spark-defaults.conf
vi spark-defaults.conf
#配置spark历史服务
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:8020/directory
xsync spark-defaults.conf
2
3
4
5
6
7
8
9
修改 spark-env.sh 配置
vi spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
-Dspark.history.retainedApplications=30"
xsync spark-env.sh
2
3
4
5
6
7
8
# 配置查看历史日志
为了从 Yarn 上关联到 Spark 历史服务器,需要配置关联路径
修改配置文件 /opt/module/spark/conf/spark-defaults.conf
vim /opt/module/spark-yarn/conf/spark-defaults.conf
spark.yarn.historyServer.address=hadoop102:18080
spark.history.ui.port=18080
xsync /opt/module/spark-yarn/conf/spark-defaults.conf
2
3
4
5
6
启动 spark 历史服务
cd /opt/module/spark-yarn/
sbin/start-history-server.sh
2
重新提交任务
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
查询 http://hadoop103:8088/cluster

# 运行流程
Spark 有 yarn-client 和 yarn-cluster 两种模式,主要区别在于:Driver 程序的运行节点。
yarn-client:Driver 程序运行在客户端,适用于交互、调试,希望立即看到 app 的输出。
yarn-cluster:Driver 程序运行在由 ResourceManager 启动的 APPMaster 适用于生产环境。
客户端模式(默认)
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6

集群模式
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
2
3
4
5
6


默认无法访问需要在 yarn-site.xml 添加配置并启动 yarn 历史服务器
vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop104:19888/jobhistory/logs</value>
</property>
2
3
4
启动历史服务器
mapred --daemon start historyserver
http://hadoop102:19888/jobhistory/logs/hadoop103:44236/container_1639655468064_0005_01_000001/container_1639655468064_0005_01_000001/atguigu/stdout?start=-4096


# Mesos 模式
Spark 客户端直接连接 Mesos;不需要额外构建 Spark 集群。国内应用比较少,更多的是运用 Yarn 调度。
# 几种模式对比
| 模式 | Spark 安装机器数 | 需启动的进程 | 所属者 |
|---|---|---|---|
| Local | 1 | 无 | Spark |
| Standalone | 3 | Master 及 Worker | Spark |
| Yarn | 1 | Yarn 及 HDFS | Hadoop |
# 端口号总结
- Spark 历史服务器端口号:18080 (类比于 Hadoop 历史服务器端口号:19888)
- Spark Master Web 端口号:8080(类比于 Hadoop 的 NameNode Web 端口号:9870 (50070))
- Spark Master 内部通信服务端口号:7077 (类比于 Hadoop 的 8020 (9000) 端口)
- Spark 查看当前 Spark-shell 运行任务情况端口号:4040
- Hadoop YARN 任务运行情况查看端口号:8088
# WordCount 案例
Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE 中编制程序,然后打成 Jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用 Maven 来管理 Jar 包的依赖。
新建 maven 项目 并添加 scala 框架支持 导入 pom 文件
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.5.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
如果 maven 版本为 3.2.x,插件下载报错,那么修改插件版本为 3.3.2
创建伴生对象 WordCount,编写代码
package com.atguigu.spark.day01
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//创建SparckConfig配置文件
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//创建sparkContext对象
val sc: SparkContext = new SparkContext(conf)
//读取外部数据
val textRDD: RDD[String] = sc.textFile(args(0))
//对读取到的内容进行切割并进行扁平化操作
val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))
//对数据集中的内容进行结构的转换 -- 计数
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
//对相同单词 出现次数进行汇总
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//存储为文件
reduceRDD.saveAsTextFile(args(1))
// val res: Array[(String, Int)] = reduceRDD.collect()
//释放资源
sc.stop()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
本地运行

添加打包插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.spark.day01.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
maven 点击 package 打包,将 WordCount.jar (不带依赖) 上传到 /opt/module/spark-yarn 目录
在 HDFS 上创建,存储输入数据文件的路径 /input
hadoop fs -mkdir /input
hadoop fs -put /opt/module/spark-local-standalone/input/1.txt /input
2
执行任务
cd /opt/module/spark-yarn
bin/spark-submit \
--class com.atguigu.spark.day01.WordCount \
--master yarn \
WordCount.jar \
/input \
/output
2
3
4
5
6
7