MapReduce
# MapReduce
MapReduce 是一个分布式运算程序的编程框架 将用户编写的业务逻辑代码和自带默认组件整合一贯完整的分布式运算程序 并发运行在一个 Haoop 集群上
优点:它简单的实现一些接口,就可以完成一个分布式程序
缺点:每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。

# Hadoop 原生 MapReduce
存储在 /opt/module/hadoop-3.1.3/share/hadoop/mapreduce
- yarn jar MapReduce 路径 wordcount 输入 hdfs 文件路径 输出 hdfs 文件保存路径还是在 hdfs 上 (必须是不存在的文件夹否则报错)
cd /opt/module/hadoop-3.1.3
yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /fiddler.md /output
1
2
2
打开 hdfs 中 output 中的 part-r-xxxx 里面会统计出每个字词出现的次数
# 常用数据序列化类型

# WordCount 编写
使用 IDEA 中创建 hadoop 项目 创建 maven 项目
pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
WcDriver 类
package com.mywordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取Job实例
Job job = Job.getInstance(new Configuration());
//2.设置jar包
job.setJarByClass(WcDriver.class);
//设置Mapper和Reducer
job.setMapOutputKeyClass(WcDriver.class);
job.setReducerClass(WcReducer.class);
//设置Map和Reduce的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出文件
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WcMapper
package com.mywordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* 框架将数据拆成一行一行输入进来 把数据变成(单词,1)的形式
*
* @param key 行号
* @param value 行内容
* @param context 任务本身
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//行数据
String line = value.toString();
//拆分成若干个单词
String[] words = line.split(" ");
//将(单词,1)写回框架
for (String word : words) {
this.word.set(word);
context.write(this.word, this.one);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WcReducer
package com.mywordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* 框架把单词分好组给我们, 我们将同一个单词的次数进行增加
*
* @param key 单词
* @param values 此时为1 数量
* @param context 任务本身
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//累加
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

打包项目 把 maprduce1-1.0 上传到集群中 打包前注意 java 版本 请用 1.8 打包
https://zhuanlan.zhihu.com/p/348660719 还有 pom.xml 版本要设置

- yarn jar mapreduce1-1.0-SNAPSHOT.jar 全类名引用路径 /fiddler.md/output2
yarn jar mapreduce1-1.0-SNAPSHOT.jar com.mywordcount.WcDriver /fiddler.md /output2
1
# HaDoop 序列化
序列号就是把内存中的对象,转换为二进制序列 以便持久化
JAVA 序列化是一个重量级序列化框架 会附带额外的信息 (校验信息 header 继承体系等)
但 Hadoop 不需要这么多信息,所以 Hadoop 拥有自己的一套序列化体系 (Writable)
# 统计流量案例
FlowBean 类
package com.flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; @Override public String toString() { return "FlowBean{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; } public void set(long upFlow, long downFlow) { this.downFlow = downFlow; this.upFlow = upFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 将对象数据写出到框架指定地方 序列化 * * @param dataOutput 数据的容器 * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } /** * 从框架指定地方读取数据填充对象 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { //读写顺序要一致 this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81FlowMapper 类
package com.flow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text phone = new Text(); private FlowBean flow = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { //拿到一行数据 String line = value.toString(); //切分 String[] split = line.split("\t"); //封装 phone.set(split[1]); flow.set( Long.parseLong(split[split.length - 3]),//upFlow Long.parseLong(split[split.length - 2]) //downFlow ); context.write(phone, flow); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28FlowReducer 类
package com.flow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean flow = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { //累加流量 long sumUpFlow = 0; long sumDownFlow = 0; for (FlowBean value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); } //封装为flow对象 flow.set(sumUpFlow, sumDownFlow); context.write(key, flow); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29FlowDriver 类
package com.flow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean completion = job.waitForCompletion(true); System.exit(completion ? 0 : 1); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34打包成 jar 文件上传到 hadoop 中
#创建目录 hadoop fs -mkdir /input hadoop fs -put /home/atguigu/phone_data.txt /input yarn jar mapreduce1-1.0-SNAPSHOT.jar com.flow.FlowDriver /input /output31
2
3
4
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48