Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

    • Hadoop
    • 环境安装
    • HDFS
    • winutils
    • IDEA中创建hadoop项目
    • Java操作
    • HDFS的数据流
    • NameNode 工作机制
    • DataNode
    • MapReduce
    • MapReduce原理
    • Yarn
      • windows向yarn提交源码任务
      • 数据压缩
        • Hadoop压缩和解压
      • Yarn架构
      • 资源调度器
      • 容器调度器多队列配置
        • 多队列提交任务
      • 任务的推测执行
    • Hadoop企业优化
    • Hadoop 新特性
    • 日志
    • Hadoop HA高可用
  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • Hadoop
Iekr
2021-10-17
目录

Yarn

# Yarn

# windows 向 yarn 提交源码任务

  1. 在 Configuration 配置文件添加 yarn 的配置属性
  2. 用 Maven 构建 jar
  3. 修改 job 加载驱动类为 打包后的 jar 包
  4. image-20211017145420222

驱动类编码

package com.mywordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WcDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();

        configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");
        configuration.set("mapreduce.framework.name", "yarn");
        configuration.set("mapreduce.app-submission.cross-platform", "true");
        configuration.set("yarn.resourcemanager.hostname", "hadoop103");

        Job job = Job.getInstance(configuration);

        // 2 设置jar加载路径
//        job.setJarByClass(WcDriver.class);
        job.setJar("D:\\code\\mapreduce1\\target\\mapreduce1-1.0-SNAPSHOT.jar");
        // 3 设置map和reduce类
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 数据压缩

采用压缩技术减少了磁盘 IO 但同时增加了 CPU 运算负担 所以压缩特性运用得当能提高性能 但运用不当也可能降低性能

压缩格式 hadoop 自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 否 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 否 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 是 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 是 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 否 和文本处理一样,不需要修改

常用 Snappy 压缩 因为较高 其次是 LZO

不同阶段开启压缩:

  1. 如果输入阶段时为压缩包 则直接传递即可无需更改 Hadoop 自动解压缩并处理

  2. shuffle 阶段 在驱动类设置开启压缩 并指定压缩格式

    //开启压缩模式
    configuration.setBoolean("mapreduce.map.output.compress", true);
    //压缩格式为
    configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,
            CompressionCodec.class);
    
    1
    2
    3
    4
    5
  3. reduce 阶段 输出压缩

    //reduce阶段压缩
    configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
    //指定压缩格式
    configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class,
            CompressionCodec.class);
    
    1
    2
    3
    4
    5

# Hadoop 压缩和解压

package com.compression;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;

public class TestCompression {
    public static void main(String[] args) throws IOException {
        compress("d:/phone_data.txt", BZip2Codec.class);
        decompress("d:/phone_data.txt.bz2");
    }

    //解压
    private static void decompress(String file) throws IOException {
        Configuration configuration = new Configuration();
        //生成压缩格式工厂对象
        CompressionCodecFactory codecFactory = new CompressionCodecFactory(configuration);

        //根据压缩格式工厂获取压缩对象
        CompressionCodec codec = codecFactory.getCodec(new Path(file));

        //输入流
        FileSystem fileSystem = FileSystem.get(configuration);

        FSDataInputStream fsDataInputStream = fileSystem.open(new Path(file));
        CompressionInputStream cis = codec.createInputStream(fsDataInputStream);

        //输出流
        String outputFile = file.substring(0, file.length() - codec.getDefaultExtension().length()); //获取文件名
        FSDataOutputStream fos = fileSystem.create(new Path(outputFile));
        IOUtils.copyBytes(cis, fos, 1024);//复制流 缓存为1024字节

        //关闭流
        IOUtils.closeStream(cis);
        IOUtils.closeStream(fos);


    }

    //压缩
    private static void compress(String file, Class<? extends CompressionCodec> codecClass) throws IOException {
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(configuration);

        //生成压缩格式对象
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, configuration);


        //开输入流
        FSDataInputStream fis = fileSystem.open(new Path(file));
        //输出流
        FSDataOutputStream fos = fileSystem.create(new Path(file + codec.getDefaultExtension()));

        //用压缩格式包装输出流
        CompressionOutputStream cos = codec.createOutputStream(fos);
        IOUtils.copyBytes(fis, cos, 1024);
        IOUtils.closeStream(fis);
        IOUtils.closeStream(cos);


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# Yarn 架构

YARN 主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。

image-20211017165440211

image-20211017172154802

  1. MR 程序提交到客户端所在的节点。
  2. YarnRunner 向 ResourceManager 申请一个 Application。
  3. RM 将该应用程序的资源路径返回给 YarnRunner。
  4. 该程序将运行所需资源提交到 HDFS 上。
  5. 程序资源提交完毕后,申请运行 mrAppMaster。
  6. RM 将用户的请求初始化成一个 Task。
  7. 其中一个 NodeManager 领取到 Task 任务。
  8. 该 NodeManager 创建容器 Container,并产生 MRAppmaster。
  9. Container 从 HDFS 上拷贝资源到本地。
  10. MRAppmaster 向 RM 申请运行 MapTask 资源。
  11. RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器。
  12. MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序。
  13. MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask。
  14. ReduceTask 向 MapTask 获取相应分区的数据。
  15. 程序运行完毕后,MR 会向 RM 申请注销自己。

# 资源调度器

目前,Hadoop 作业调度器主要有三种:FIFO、Capacity Scheduler 和 Fair Scheduler。Hadoop3.1.3 默认的资源调度器是 Capacity Scheduler。

通过 yarn-default.xml 配置

<property>
    <description>The class to use as the resource scheduler.</description>
    <name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
1
2
3
4
5
  1. 先进先出调度器(FIFO)

    image-20211017180750961

  2. 容量调度器(Capacity Scheduler)

    image-20211017180943516

  3. 公平调度器(Fair Scheduler)

    image-20211017181153628

image-20211017181229324

# 容器调度器多队列配置

容量调度器默认为 1 个队列 default 通过修改 capacity-scheduler.xml 文件来配置多队列

vim /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml  #建议用图像界面
1
  1. 修改 yarn.scheduler.capacity.root.queues 的 value 添加新的队列

    <!-- 默认为default队列 可以设置多条队列-->
      <property>
        <name>yarn.scheduler.capacity.root.queues</name>
        <value>default,hive</value>
        <description>
          The queues at the this level (root is the root queue).
        </description>
      </property>
    
    1
    2
    3
    4
    5
    6
    7
    8
  2. 修改 default 队列占比为 40

    <!-- default队列默认占比为100  改为百分之40  剩下交给hive -->
      <property>
        <name>yarn.scheduler.capacity.root.default.capacity</name>
        <value>40</value>
        <description>Default queue target capacity.</description>
      </property>
    
    1
    2
    3
    4
    5
    6
  3. 修改 default 队列允许的最大占比为 60

    <!--default队列最大占比默认为100  改为60 -->
      <property>
        <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
        <value>60</value>
        <description>
          The maximum capacity of the default queue. 
        </description>
      </property>
    
    1
    2
    3
    4
    5
    6
    7
    8
  4. 把 default 队列的配置属性复制一份 修改为新增队列名 hive 并删除其中的 description 标签

    <!--hive队列设置-->
     <property>
        <name>yarn.scheduler.capacity.root.hive.capacity</name>
        <value>60</value>
      </property>
    
      <property>
        <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
        <value>1</value>
      </property>
      <property>
        <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
        <value>80</value>
      </property>
    
      <property>
        <name>yarn.scheduler.capacity.root.hive.state</name>
        <value>RUNNING</value>
      </property>
    
      <property>
        <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
        <value>*</value>
      </property>
    
      <property>
        <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
        <value>*</value>
      </property>
    
      <property>
        <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
        <value>*</value>
      </property>
    
       <property>
         <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime
         </name>
         <value>-1</value>
       </property>
    
       <property>
         <name>yarn.scheduler.capacity.root.hive.default-application-lifetime
         </name>
         <value>-1</value>
       </property>
     <!--hive队列设置结束-->
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
  5. 同步到其他集群中

    xsync /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml
    
    1
  6. 重启 hadoop yarn

    stop-yarn.sh  #103中
    start-yarn.sh
    
    1
    2

image-20211017212821267

# 多队列提交任务

通过 configuration 设置 mapred.job.queue.name 为指定队列名

configuration.set("mapred.job.queue.name","hive");
1

# 任务的推测执行

  1. 推测执行机制

    APPmstr 会监控任务的运行速度如果某个任务运行速度远慢于平均任务 则为拖后腿的任务启动一个备份任务同时运行 谁先运行完 则采取谁的结果

  2. 执行推测任务的前提

    1. 每个 task 只能有一个备份任务

    2. 当前 job 已经完成的 task 必须不小于 5%

    3. 开启了推测执行设置 默认为打开的 在 mapred-site.xml 设置

      <property>
              <name>mapreduce.map.speculative</name>
              <value>true</value>
      </property>
      <property>
              <name>mapreduce.reduce.speculative</name>
              <value>true</value>
      </property>
      
      1
      2
      3
      4
      5
      6
      7
      8
  3. 不能启用推测执行机制情况

    1. 任务间存在严重的负载倾斜
    2. 特殊任务 如任务向数据库中写数据

img

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
MapReduce原理
Hadoop企业优化

← MapReduce原理 Hadoop企业优化→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式