Hive实战merge
# Hive 实战 merge
# 文件字段说明
视频表
| 字段 | 备注 | 详细描述 |
|---|---|---|
| video id | 视频唯一 id(String) | 11 位字符串 |
| uploader | 视频上传者(String) | 上传视频的用户名 String |
| age | 视频年龄(int) | 视频在平台上的整数天 |
| category | 视频类别(Array <String> ) | 上传视频指定的视频分类 |
| length | 视频长度(Int) | 整形数字标识的视频长度 |
| views | 观看次数(Int) | 视频被浏览的次数 |
| rate | 视频评分(Double) | 满分 5 分 |
| Ratings | 流量(Int) | 视频的流量,整型数字 |
| conments | 评论数(Int) | 一个视频的整数评论数 |
| related ids | 相关视频 id(Array <String> ) | 相关视频的 id,最多 20 个 |
用户表
| 字段 | 备注 | 字段类型 |
|---|---|---|
| uploader | 上传者用户名 | string |
| videos | 上传视频数 | int |
| friends | 朋友数量 | int |
# ETL Mapper 处理
书写 ETL Mapper 编码
导入坐标
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-client-runtime</artifactId>-->
<!-- <version>3.1.3</version>-->
<!-- </dependency>-->
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Mapper 类
package com.atguigu.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Counter pass;
private Counter fail;
private StringBuffer sb = new StringBuffer();
private Text result = new Text();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
pass = context.getCounter("ETL", "Pass"); //计数器
fail = context.getCounter("ETL", "Fail");
}
/**
* 将一行日志进行处理 字段不够的抛弃 第四个字段中的空格去掉 将最后相关视频的分割符改为 &
*
* @param key 行号
* @param value 一行日志
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] field = line.split("\t");
//判断字数是否足够
if (field.length >= 9) {
//处理数据
//去掉第四个字段的空格
field[3] = field[3].replace(" ", ""); //原本 a & b ==> 变成 a&b
//拼接成一行
sb.setLength(0); //清空
for (int i = 0; i < field.length; i++) {
//如果当前拼接的字段是我们这一行的最后一个字段 则直接追加
if (i == field.length - 1) {
sb.append(field[i]);
} else if (i <= 8) { //前面的字段都是用 \t 隔开
//如果拼的是前9个字段
sb.append(field[i]).append("\t");
} else {
//剩下的分割符为&
sb.append(field[i]).append("&"); //最后一个字段为一个数组 元素之间用&隔开
}
}
result.set(sb.toString());
context.write(result, NullWritable.get()); //写入上下文
pass.increment(1);
} else {
//丢弃数据 此数据不足9个字段
fail.increment(1);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
Driver
package com.atguigu.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ETLDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// args = new String[]{"d:/input","d:/output"}; //本地测试
Configuration configuration = new Configuration();
configuration.set("hive.execution.engine","yarn-tez"); //改为tez引擎 本地模式不要修改请使用默认引擎
Job job = Job.getInstance();
job.setJarByClass(ETLDriver.class);
job.setMapperClass(ETLMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullPointerException.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
打包成 jar 包上传到集群中 运行自定义 mapreduce
yarn jar etltool20211110-1.0-SNAPSHOT.jar com.atguigu.etl.ETLDriver /gulivideo/video /gulivideo/video_etl
1
# 创建表
- 创库
create database gulivideo;
use gulivideo;
1
2
2
- 外部表
-- video表
create external table video_ori(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
row format delimited fields terminated by "\t"
collection items terminated by "&"
location '/gulivideo/video_etl';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- user表
create external table user_ori(
uploader string,
videos int,
friends int)
row format delimited fields terminated by "\t"
location '/gulivideo/user';
1
2
3
4
5
6
7
2
3
4
5
6
7
- 内部表
-- video_orc表
create table video_orc(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
stored as orc
tblproperties("orc.compress"="SNAPPY");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
-- user_orc表
create table user_orc(
uploader string,
videos int,
friends int)
stored as orc
tblproperties("orc.compress"="SNAPPY");
1
2
3
4
5
6
7
2
3
4
5
6
7
- 插入数据
-- 从外部表中插入数据
insert into table video_orc select * from video_ori;
insert into table user_orc select * from user_ori;
1
2
3
2
3
# 需求实现
# 统计视频观看数 top10
使用 order by 按照 views 字段做一个全局排序即可,同时我们设置只显示前 10 条。
SELECT
videoid,
views
FROM
video_orc
ORDER BY
views DESC
LIMIT 10;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 统计视频类别热度 Top10
- 即统计每个类别有多少个视频,显示出包含视频最多的前 10 个类别
- 我们需要按照类别 group by 聚合,然后 count 组内的 videoId 个数即可。
- 因为当前表结构为:一个视频对应一个或多个类别。所以如果要 group by 类别,需要先将类别进行列转行 (展开),然后再进行 count 即可。
-- category列转行
SELECT
videoid,
cate
FROM
video_orc LATERAL VIEW explode(category) tbl as cate;
1
2
3
4
5
6
2
3
4
5
6
-- 在上表基础上,统计各个类别有多少视频,并排序取前十
SELECT
cate,
COUNT(videoid) n
FROM
t1
GROUP BY
cate
ORDER BY
n desc limit 10;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
完整语句
SELECT
cate,
COUNT(videoid) n
FROM
(SELECT
videoid,
cate
FROM
video_orc LATERAL VIEW explode(category) tbl as cate)t1
GROUP BY
cate
ORDER BY
n desc limit 10;
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 统计出视频观看数最高的 20 个视频的所属类别以及类别包含 Top20 视频的个数
- 统计前 20 视频和类别
SELECT
videoid,
views,
category
FROM
video_orc
ORDER BY
views DESC
LIMIT 20;
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 打散类别 列转行
SELECT
videoid,
cate
FROM
t1 LATERAL VIEW explode(category) tbl as cate;
1
2
3
4
5
2
3
4
5
- 按照类别统计个数
SELECT
cate,
COUNT(videoid) n
FROM
t2
GROUP BY
cate
ORDER BY
n DESC;
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 完整语句
SELECT
cate,
COUNT(videoid) n
FROM
(
SELECT
videoid,
cate
FROM
(
SELECT
videoid,
views,
category
FROM
video_orc
ORDER BY
views DESC
LIMIT 20 ) t1 LATERAL VIEW explode(category) tbl as cate ) t2
GROUP BY
cate
ORDER BY
n DESC;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 统计视频观看数 Top50 所关联视频的所属类别排序
- 统计观看数前 50 的视频的关联视频
SELECT
videoid,
views,
relatedid
FROM
video_orc
ORDER BY
views DESC
LIMIT 50;
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 打散关联视频 列转行
SELECT
explode(relatedid) videoid
FROM
t1;
1
2
3
4
2
3
4
- 和原表 join 获取关联视频的类别
SELECT
DISTINCT t2.videoid,
v.category
FROM
t2
JOIN video_orc v on
t2.videoid = v.videoid;
1
2
3
4
5
6
7
2
3
4
5
6
7
- 打散类别
SELECT
explode(category) cate
FROM
t3;
1
2
3
4
2
3
4
- 类别热度表 每个类别出现次数
SELECT
cate,
COUNT(videoid) n
FROM
(
SELECT
videoid,
cate
FROM
video_orc LATERAL VIEW explode(category) tbl as cate) g1
GROUP BY
cate
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
- 和类别热度表 join 并排序
SELECT
DISTINCT t4.cate,
t5.n
FROM
t4
JOIN t5 ON
t4.cate = t5.cate
ORDER BY
t5.n DESC;
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 完整语句
SELECT
DISTINCT t4.cate,
t5.n
FROM
(
SELECT
explode(category) cate
FROM
(
SELECT
DISTINCT t2.videoid,
v.category
FROM
(
SELECT
explode(relatedid) videoid
FROM
(
SELECT
videoid,
views,
relatedid
FROM
video_orc
ORDER BY
views DESC
LIMIT 50 ) t1 ) t2
JOIN video_orc v on
t2.videoid = v.videoid ) t3 ) t4
JOIN (
SELECT
cate,
COUNT(videoid) n
FROM
(
SELECT
videoid,
cate
FROM
video_orc LATERAL VIEW explode(category) tbl as cate) g1
GROUP BY
cate ) t5 ON
t4.cate = t5.cate
ORDER BY
t5.n DESC;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 统计每个类别中的视频热度 Top10,以 Music 为例
- 把视频表的类别炸开,生成中间表格 video_category
CREATE
TABLE
video_category STORED AS orc TBLPROPERTIES("orc.compress"="SNAPPY") AS SELECT
videoid,
uploader,
age,
cate,
length,
views,
rate,
ratings,
comments,
relatedid
FROM
video_orc LATERAL VIEW explode(category) tbl as cate;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 从 video_category 直接查询 Music 类的前 10 视频
SELECT
videoid,
views
FROM
video_category
WHERE
cate ="Music"
ORDER BY
views DESC
LIMIT 10;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 统计每个类别中视频流量 Top10,以 Music 为例
- 从 video_category 直接查询 Music 类的流量前 10 视频
SELECT
videoid,
ratings
FROM
video_category
WHERE
cate ="Music"
ORDER BY
ratings DESC
LIMIT 10;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 统计上传视频最多的用户 Top10 以及他们上传的观看次数在前 20 的视频
# 理解 1. 上传视频观看数最多前十用户每人前 20 条视频
- 统计上传视频中 观看数量最大的 Top10 上传用户
SELECT
uploader,
videos
FROM
user_orc
ORDER BY
videos DESC
LIMIT 10;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 和 video_orc 联立,找出这些用户上传的视频,并按照热度排名
SELECT
t1.uploader,
v.videoid,
RANK() OVER(PARTITION BY t1.uploader ORDER BY v.views DESC) hot
FROM
t1
LEFT JOIN video_orc v ON
t1.uploader = v.uploader;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 求前 20
SELECT
t2.uploader,
t2.videoid,
t2.hot
FROM
t2
WHERE
hot <= 20;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 完整语句
SELECT
t2.uploader,
t2.videoid,
t2.hot
FROM
(SELECT
t1.uploader,
v.videoid,
RANK() OVER(PARTITION BY t1.uploader ORDER BY v.views DESC) hot
FROM
(SELECT
uploader,
videos
FROM
user_orc
ORDER BY
videos DESC
LIMIT 10)t1
LEFT JOIN video_orc v ON
t1.uploader = v.uploader)t2
WHERE
hot <= 20;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 理解 2. 上传视频数前十的用户 是否存在视频播放数总榜前 20
- 统计视频上传最多的用户 Top10
SELECT
uploader,
videos
FROM
user_orc
ORDER BY
videos DESC
LIMIT 10;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 观看数前 20 的视频
SELECT
videoid,
uploader,
views
FROM
video_orc
ORDER BY
views DESC
LIMIT 20;
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 联立两表,看看有没有他们上传的
SELECT
t1.uploader,
t2.videoid
FROM
t1
LEFT JOIN t2 ON
t1.uploader = t2.uploader;
1
2
3
4
5
6
7
2
3
4
5
6
7
- 完整语句
SELECT
t1.uploader,
t2.videoid
FROM
(SELECT
uploader,
videos
FROM
user_orc
ORDER BY
videos DESC
LIMIT 10)t1
LEFT JOIN (SELECT
videoid,
uploader,
views
FROM
video_orc
ORDER BY
views DESC
LIMIT 20)t2 ON
t1.uploader = t2.uploader;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 统计每个类别视频观看数 Top10
- 从 video_category 表查出每个类别视频观看数排名
SELECT
cate,
videoid,
views,
RANK() OVER(PARTITION BY cate ORDER BY views DESC) hot
FROM
video_category;
1
2
3
4
5
6
7
2
3
4
5
6
7
- 取每个类别的 Top10
SELECT
cate,
videoid,
views
FROM
t1
WHERE
hot <= 10;
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 完整语句
SELECT
cate,
videoid,
views
FROM
(SELECT
cate,
videoid,
views,
RANK() OVER(PARTITION BY cate ORDER BY views DESC) hot
FROM
video_category)t1
WHERE
hot <= 10;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48