Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

    • Hadoop
    • 环境安装
    • HDFS
    • winutils
    • IDEA中创建hadoop项目
    • Java操作
    • HDFS的数据流
    • NameNode 工作机制
    • DataNode
    • MapReduce
      • Hadoop原生MapReduce
      • 常用数据序列化类型
      • WordCount编写
      • HaDoop序列化
        • 统计流量案例
    • MapReduce原理
    • Yarn
    • Hadoop企业优化
    • Hadoop 新特性
    • 日志
    • Hadoop HA高可用
  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • Hadoop
Iekr
2021-09-02
目录

MapReduce

# MapReduce

MapReduce 是一个分布式运算程序的编程框架 将用户编写的业务逻辑代码和自带默认组件整合一贯完整的分布式运算程序 并发运行在一个 Haoop 集群上

优点:它简单的实现一些接口,就可以完成一个分布式程序

缺点:每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。

image-20210902111736018

# Hadoop 原生 MapReduce

存储在 /opt/module/hadoop-3.1.3/share/hadoop/mapreduce

  • yarn jar MapReduce 路径 wordcount 输入 hdfs 文件路径 输出 hdfs 文件保存路径还是在 hdfs 上 (必须是不存在的文件夹否则报错)
cd /opt/module/hadoop-3.1.3
yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /fiddler.md /output
1
2

打开 hdfs 中 output 中的 part-r-xxxx 里面会统计出每个字词出现的次数

# 常用数据序列化类型

image-20210902113503162

# WordCount 编写

使用 IDEA 中创建 hadoop 项目 创建 maven 项目

pom.xml

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

WcDriver 类

package com.mywordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1.获取Job实例
        Job job = Job.getInstance(new Configuration());
        //2.设置jar包
        job.setJarByClass(WcDriver.class);

        //设置Mapper和Reducer
        job.setMapOutputKeyClass(WcDriver.class);
        job.setReducerClass(WcReducer.class);

        //设置Map和Reduce的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入输出文件
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //提交job
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

WcMapper

package com.mywordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private IntWritable one = new IntWritable(1);
    private Text word = new Text();

    /**
     * 框架将数据拆成一行一行输入进来 把数据变成(单词,1)的形式
     *
     * @param key     行号
     * @param value   行内容
     * @param context 任务本身
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //行数据
        String line = value.toString();

        //拆分成若干个单词
        String[] words = line.split(" ");

        //将(单词,1)写回框架
        for (String word : words) {
            this.word.set(word);
            context.write(this.word, this.one);
        }

    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

WcReducer

package com.mywordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    /**
     * 框架把单词分好组给我们, 我们将同一个单词的次数进行增加
     *
     * @param key     单词
     * @param values  此时为1 数量
     * @param context 任务本身
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //累加
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        result.set(sum);
        context.write(key, result);

    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

image-20210902145934845

打包项目 把 maprduce1-1.0 上传到集群中 打包前注意 java 版本 请用 1.8 打包

https://zhuanlan.zhihu.com/p/348660719 还有 pom.xml 版本要设置

image-20210902150008329

  • yarn jar mapreduce1-1.0-SNAPSHOT.jar 全类名引用路径 /fiddler.md/output2
yarn jar mapreduce1-1.0-SNAPSHOT.jar com.mywordcount.WcDriver /fiddler.md /output2
1

# HaDoop 序列化

序列号就是把内存中的对象,转换为二进制序列 以便持久化

JAVA 序列化是一个重量级序列化框架 会附带额外的信息 (校验信息 header 继承体系等)

但 Hadoop 不需要这么多信息,所以 Hadoop 拥有自己的一套序列化体系 (Writable)

# 统计流量案例

  1. FlowBean 类

    package com.flow;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class FlowBean implements Writable {
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        @Override
        public String toString() {
            return "FlowBean{" +
                    "upFlow=" + upFlow +
                    ", downFlow=" + downFlow +
                    ", sumFlow=" + sumFlow +
                    '}';
        }
    
        public void set(long upFlow, long downFlow) {
            this.downFlow = downFlow;
            this.upFlow = upFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        /**
         * 将对象数据写出到框架指定地方  序列化
         *
         * @param dataOutput 数据的容器
         * @throws IOException
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
            dataOutput.writeLong(sumFlow);
        }
    
    
        /**
         * 从框架指定地方读取数据填充对象  反序列化
         *
         * @param dataInput
         * @throws IOException
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            //读写顺序要一致
            this.upFlow = dataInput.readLong();
            this.downFlow = dataInput.readLong();
            this.sumFlow = dataInput.readLong();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
  2. FlowMapper 类

    package com.flow;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        private Text phone = new Text();
        private FlowBean flow = new FlowBean();
    
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            //拿到一行数据
            String line = value.toString();
            //切分
            String[] split = line.split("\t");
            //封装
            phone.set(split[1]);
            flow.set(
                    Long.parseLong(split[split.length - 3]),//upFlow
                    Long.parseLong(split[split.length - 2]) //downFlow
            );
            context.write(phone, flow);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
  3. FlowReducer 类

    package com.flow;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    
        private FlowBean flow = new FlowBean();
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            //累加流量
            long sumUpFlow = 0;
            long sumDownFlow = 0;
    
            for (FlowBean value : values) {
                sumUpFlow += value.getUpFlow();
                sumDownFlow += value.getDownFlow();
            }
    
            //封装为flow对象
            flow.set(sumUpFlow, sumDownFlow);
    
            context.write(key, flow);
    
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
  4. FlowDriver 类

    package com.flow;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowDriver {
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Job job = Job.getInstance(new Configuration());
    
            job.setJarByClass(FlowDriver.class);
    
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            boolean completion = job.waitForCompletion(true);
            System.exit(completion ? 0 : 1);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
  5. 打包成 jar 文件上传到 hadoop 中

    #创建目录
    hadoop fs -mkdir /input
    hadoop fs -put /home/atguigu/phone_data.txt /input
    yarn jar mapreduce1-1.0-SNAPSHOT.jar com.flow.FlowDriver /input /output3
    
    1
    2
    3
    4
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
DataNode
MapReduce原理

← DataNode MapReduce原理→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式