Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

    • 数据仓库概念
    • 项目需求及架构设计
    • 数据生成模块
    • 数据采集模块
    • 电商业务简介
    • 业务数据采集模块
      • MySQL安装
        • 安装MySQL
        • 配置MySQL
      • Sqoop安装
        • 修改配置文件
        • 拷贝JDBC驱动
        • 验证Sqoop
        • 测试Sqoop是否能够成功连接数据库
      • 业务数据生成
        • 生成业务数据
      • 同步策略
        • 全量同步策略
        • 增量同步策略
        • 新增及变化策略
        • 特殊策略
      • 业务数据导入HDFS
        • 脚本编写
        • Sqoop 参数解析
        • Sqoop 导入导出注意事项
      • Hive安装部署
        • Hive元数据配置到MySql
        • 初始化元数据并启动Hive
        • 初始化元数据库
        • 启动metastore和hiveserver2
      • Hive 相关总结
        • Hive的架构
        • Hive和数据库比较
        • 内部表和外部表
        • 4个By区别
        • 函数
        • 系统函数
        • 自定义UDF、UDTF
        • 窗口函数
        • RANK
        • OVER
        • 其他
        • Hive优化
        • Hive解决数据倾斜方法
        • 用的是动态分区吗?动态分区的底层原理是什么?
        • Hive里边字段的分隔符用的什么?为什么用\t?有遇到过字段里边有\t的情况吗,怎么处理的?
      • Spoop 相关总结
        • Sqoop导入导出Null存储一致性问题
        • Sqoop数据导出的时候一次执行多长时间
        • 离线指标常识
        • Sqoop在导入数据的时候数据倾斜
        • Sqoop底层运行的任务是什么
    • 数仓分层概念
    • 数仓搭建-ODS层
    • 数仓搭建-DWD层
    • 数仓搭建-DWS层
    • 数仓搭建-DWT层
  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • 离线数仓
Iekr
2022-09-16
目录

业务数据采集模块

# 业务数据采集模块

image-20220916212952552

# MySQL 安装

# 安装 MySQL

卸载自带的 Mysql-libs,只在 hadoop102 中安装

sudo rpm -qa | grep -i -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps
1

安装 mysql 依赖

cd /opt/software/
sudo rpm -ivh 01_mysql-community-common-5.7.29-1.el7.x86_64.rpm
sudo rpm -ivh 02_mysql-community-libs-5.7.29-1.el7.x86_64.rpm
sudo rpm -ivh 03_mysql-community-libs-compat-5.7.29-1.el7.x86_64.rpm
1
2
3
4

安装 mysql-client

sudo rpm -ivh 04_mysql-community-client-5.7.29-1.el7.x86_64.rpm
1

mysql-server

sudo rpm -ivh 05_mysql-community-server-5.7.29-1.el7.x86_64.rpm
1

启动 mysql

sudo systemctl start mysqld
1

查看 mysql 密码

sudo cat /var/log/mysqld.log | grep password
1

# 配置 MySQL

配置只要是 root 用户 + 密码,在任何主机上都能登录 MySQL 数据库。

mysql -uroot -p
1

更改 mysql 密码策略

set global validate_password_length=4;
set global validate_password_policy=0;
1
2

设置简单好记的密码

set password=password("000000");
1

进入 msyql 库,修改 user 表,把 Host 表内容修改为 %

use mysql
select user, host from user;
update user set host="%" where user="root";
flush privileges;
quit;
1
2
3
4
5

# Sqoop 安装

下载地址:http://mirrors.hust.edu.cn/apache/sqoop/1.4.6/

上传安装包 sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 到 hadoop102 的 /opt/software 路径中

解压 sqoop 安装包到指定目录

tar -zxvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/
cd /opt/module/
mv sqoop-1.4.6.bin__hadoop-2.0.4-alpha/ sqoop
1
2
3

# 修改配置文件

进入到 /opt/module/sqoop/conf 目录,重命名配置文件

cd /opt/module/sqoop/conf
mv sqoop-env-template.sh sqoop-env.sh
1
2

修改配置文件

vim sqoop-env.sh 
1

增加如下内容

export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HIVE_HOME=/opt/module/hive
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export ZOOCFGDIR=/opt/module/zookeeper-3.5.7/conf
1
2
3
4
5

# 拷贝 JDBC 驱动

将 mysql-connector-java-5.1.48.jar 上传到 /opt/software 路径

进入到 /opt/software/ 路径,拷贝 jdbc 驱动到 sqoop 的 lib 目录下

cd /opt/software/
cp mysql-connector-java-5.1.48.jar /opt/module/sqoop/lib/
1
2

# 验证 Sqoop

我们可以通过某一个 command 来验证 sqoop 配置是否正确:

cd /opt/module/sqoop
bin/sqoop help
1
2

# 测试 Sqoop 是否能够成功连接数据库

bin/sqoop list-databases --connect jdbc:mysql://hadoop102:3306/ --username root --password 000000
1

出现如下输出:读取 mysql 中能读取到的库

information_schema
mysql
performance_schema
sys
1
2
3
4

# 业务数据生成

创建数据库 gmall

image-20220916214911620

导入数据库结构脚本(gmall2020-03-16.sql)

# 生成业务数据

在 hadoop102 的 /opt/module/ 目录下创建 db_log 文件夹

mkdir /opt/module/db_log/
1

把 gmall-mock-db-2020-03-16-SNAPSHOT.jar 和 application.properties 上传到 hadoop102 的 /opt/module/db_log 路径上

根据需求修改 application.properties 相关配置

logging.level.root=info

spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://hadoop102:3306/gmall?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=000000

logging.pattern.console=%m%n

mybatis-plus.global-config.db-config.field-strategy=not_null

#业务日期
mock.date=2020-03-10
#是否重置
mock.clear=1

#是否生成新用户
mock.user.count=50
#男性比例
mock.user.male-rate=20

#收藏取消比例
mock.favor.cancel-rate=10
#收藏数量
mock.favor.count=100

#购物车数量
mock.cart.count=10
#每个商品最多购物个数
mock.cart.sku-maxcount-per-cart=3

#用户下单比例
mock.order.user-rate=80
#用户从购物中购买商品比例
mock.order.sku-rate=70
#是否参加活动
mock.order.join-activity=1
#是否使用购物券
mock.order.use-coupon=1
#购物券领取人数
mock.coupon.user-count=10

#支付比例
mock.payment.rate=70
#支付方式 支付宝:微信 :银联
mock.payment.payment-type=30:60:10

#评价比例 好:中:差:自动
mock.comment.appraise-rate=30:10:10:50

#退款原因比例:质量问题 商品描述与实际描述不一致 缺货 号码不合适 拍错 不想买了 其他
mock.refund.reason-rate=30:10:20:5:15:5:5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

并在该目录下执行,如下命令,生成 2020-03-10 日期数据:

java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
1

在配置文件 application.properties 中修改

mock.date=2020-03-11
mock.clear=0
1
2

再次执行命令,生成 2020-03-11 日期数据:

java -jar gmall-mock-db-2020-03-16-SNAPSHOT.jar
1

# 同步策略

数据同步策略的类型包括:全量表、增量表、新增及变化表、特殊表

  • 全量表:存储完整的数据。
  • 增量表:存储新增加的数据。
  • 新增及变化表:存储新增加的数据和变化的数据。
  • 特殊表:只需要存储一次。

# 全量同步策略

image-20220916220117127

# 增量同步策略

image-20220916220215361

# 新增及变化策略

每日新增及变化,就是存储创建时间和操作时间都是今天的数据。

适用场景为,表的数据量大,既会有新增,又会有变化。

例如:用户表、订单表、优惠卷领用表。

# 特殊策略

某些特殊的维度表,可不必遵循上述同步策略。

  1. 客观世界维度:没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以只存一份固定值。
  2. 日期维度:日期维度可以一次性导入一年或若干年的数据。
  3. 地区维度:省份表、地区表

# 业务数据导入 HDFS

image-20220916220419446

# 脚本编写

在 /home/atguigu/bin 目录下创建

cd /home/atguigu/bin
vim gmall_mysql_to_hdfs.sh
1
2

添加如下内容:

#!/bin/bash

sqoop=/opt/module/sqoop/bin/sqoop
do_date=`date -d '-1 day' +%F`

if [[ -n "$2" ]]; then
    do_date=$2
fi

import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop102:3306/gmall?userSSL=false \
--username root \
--password 000000 \
--target-dir /origin_data/gmall/db/$1/$do_date \
--delete-target-dir \
--query "$2 and  \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'

# 创建 lzop索引
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/gmall/db/$1/$do_date
}

import_order_info(){
  import_data order_info "select
                            id, 
                            final_total_amount, 
                            order_status, 
                            user_id, 
                            out_trade_no, 
                            create_time, 
                            operate_time,
                            province_id,
                            benefit_reduce_amount,
                            original_total_amount,
                            feight_fee      
                        from order_info
                        where (date_format(create_time,'%Y-%m-%d')='$do_date' 
                        or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}

import_coupon_use(){
  import_data coupon_use "select
                          id,
                          coupon_id,
                          user_id,
                          order_id,
                          coupon_status,
                          get_time,
                          using_time,
                          used_time
                        from coupon_use
                        where (date_format(get_time,'%Y-%m-%d')='$do_date'
                        or date_format(using_time,'%Y-%m-%d')='$do_date'
                        or date_format(used_time,'%Y-%m-%d')='$do_date')"
}

import_order_status_log(){
  import_data order_status_log "select
                                  id,
                                  order_id,
                                  order_status,
                                  operate_time
                                from order_status_log
                                where date_format(operate_time,'%Y-%m-%d')='$do_date'"
}

import_activity_order(){
  import_data activity_order "select
                                id,
                                activity_id,
                                order_id,
                                create_time
                              from activity_order
                              where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_user_info(){
  import_data "user_info" "select 
                            id,
                            name,
                            birthday,
                            gender,
                            email,
                            user_level, 
                            create_time,
                            operate_time
                          from user_info 
                          where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date' 
                          or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}

import_order_detail(){
  import_data order_detail "select 
                              od.id,
                              order_id, 
                              user_id, 
                              sku_id,
                              sku_name,
                              order_price,
                              sku_num, 
                              od.create_time  
                            from order_detail od
                            join order_info oi
                            on od.order_id=oi.id
                            where DATE_FORMAT(od.create_time,'%Y-%m-%d')='$do_date'"
}

import_payment_info(){
  import_data "payment_info"  "select 
                                id,  
                                out_trade_no, 
                                order_id, 
                                user_id, 
                                alipay_trade_no, 
                                total_amount,  
                                subject, 
                                payment_type, 
                                payment_time 
                              from payment_info 
                              where DATE_FORMAT(payment_time,'%Y-%m-%d')='$do_date'"
}

import_comment_info(){
  import_data comment_info "select
                              id,
                              user_id,
                              sku_id,
                              spu_id,
                              order_id,
                              appraise,
                              comment_txt,
                              create_time
                            from comment_info
                            where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_order_refund_info(){
  import_data order_refund_info "select
                                id,
                                user_id,
                                order_id,
                                sku_id,
                                refund_type,
                                refund_num,
                                refund_amount,
                                refund_reason_type,
                                create_time
                              from order_refund_info
                              where date_format(create_time,'%Y-%m-%d')='$do_date'"
}

import_sku_info(){
  import_data sku_info "select 
                          id,
                          spu_id,
                          price,
                          sku_name,
                          sku_desc,
                          weight,
                          tm_id,
                          category3_id,
                          create_time
                        from sku_info where 1=1"
}

import_base_category1(){
  import_data "base_category1" "select 
                                  id,
                                  name 
                                from base_category1 where 1=1"
}

import_base_category2(){
  import_data "base_category2" "select
                                  id,
                                  name,
                                  category1_id 
                                from base_category2 where 1=1"
}

import_base_category3(){
  import_data "base_category3" "select
                                  id,
                                  name,
                                  category2_id
                                from base_category3 where 1=1"
}

import_base_province(){
  import_data base_province "select
                              id,
                              name,
                              region_id,
                              area_code,
                              iso_code
                            from base_province
                            where 1=1"
}

import_base_region(){
  import_data base_region "select
                              id,
                              region_name
                            from base_region
                            where 1=1"
}

import_base_trademark(){
  import_data base_trademark "select
                                tm_id,
                                tm_name
                              from base_trademark
                              where 1=1"
}

import_spu_info(){
  import_data spu_info "select
                            id,
                            spu_name,
                            category3_id,
                            tm_id
                          from spu_info
                          where 1=1"
}

import_favor_info(){
  import_data favor_info "select
                          id,
                          user_id,
                          sku_id,
                          spu_id,
                          is_cancel,
                          create_time,
                          cancel_time
                        from favor_info
                        where 1=1"
}

import_cart_info(){
  import_data cart_info "select
                        id,
                        user_id,
                        sku_id,
                        cart_price,
                        sku_num,
                        sku_name,
                        create_time,
                        operate_time,
                        is_ordered,
                        order_time
                      from cart_info
                      where 1=1"
}

import_coupon_info(){
  import_data coupon_info "select
                          id,
                          coupon_name,
                          coupon_type,
                          condition_amount,
                          condition_num,
                          activity_id,
                          benefit_amount,
                          benefit_discount,
                          create_time,
                          range_type,
                          spu_id,
                          tm_id,
                          category3_id,
                          limit_num,
                          operate_time,
                          expire_time
                        from coupon_info
                        where 1=1"
}

import_activity_info(){
  import_data activity_info "select
                              id,
                              activity_name,
                              activity_type,
                              start_time,
                              end_time,
                              create_time
                            from activity_info
                            where 1=1"
}

import_activity_rule(){
    import_data activity_rule "select
                                    id,
                                    activity_id,
                                    condition_amount,
                                    condition_num,
                                    benefit_amount,
                                    benefit_discount,
                                    benefit_level
                                from activity_rule
                                where 1=1"
}

import_base_dic(){
    import_data base_dic "select
                            dic_code,
                            dic_name,
                            parent_code,
                            create_time,
                            operate_time
                          from base_dic
                          where 1=1" 
}

case $1 in
  "order_info")
     import_order_info
;;
  "base_category1")
     import_base_category1
;;
  "base_category2")
     import_base_category2
;;
  "base_category3")
     import_base_category3
;;
  "order_detail")
     import_order_detail
;;
  "sku_info")
     import_sku_info
;;
  "user_info")
     import_user_info
;;
  "payment_info")
     import_payment_info
;;
  "base_province")
     import_base_province
;;
  "base_region")
     import_base_region
;;
  "base_trademark")
     import_base_trademark
;;
  "activity_info")
      import_activity_info
;;
  "activity_order")
      import_activity_order
;;
  "cart_info")
      import_cart_info
;;
  "comment_info")
      import_comment_info
;;
  "coupon_info")
      import_coupon_info
;;
  "coupon_use")
      import_coupon_use
;;
  "favor_info")
      import_favor_info
;;
  "order_refund_info")
      import_order_refund_info
;;
  "order_status_log")
      import_order_status_log
;;
  "spu_info")
      import_spu_info
;;
  "activity_rule")
      import_activity_rule
;;
  "base_dic")
      import_base_dic
;;

"first")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
   import_base_province
   import_base_region
   import_base_trademark
   import_activity_info
   import_activity_order
   import_cart_info
   import_comment_info
   import_coupon_use
   import_coupon_info
   import_favor_info
   import_order_refund_info
   import_order_status_log
   import_spu_info
   import_activity_rule
   import_base_dic
;;
"all")
   import_base_category1
   import_base_category2
   import_base_category3
   import_order_info
   import_order_detail
   import_sku_info
   import_user_info
   import_payment_info
   import_base_trademark
   import_activity_info
   import_activity_order
   import_cart_info
   import_comment_info
   import_coupon_use
   import_coupon_info
   import_favor_info
   import_order_refund_info
   import_order_status_log
   import_spu_info
   import_activity_rule
   import_base_dic
;;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438

使用脚本导入数据,记得先启动 Hadoop

chmod 755 gmall_mysql_to_hdfs.sh
gmall_mysql_to_hdfs.sh first 2020-03-10 # 初次导入
gmall_mysql_to_hdfs.sh all 2020-03-11 # 每日导入
1
2
3

# Sqoop 参数解析

import \
--connect jdbc:mysql://hadoop102:3306/gmall \
--username root \
--password 000000 \
# 存储目标路径
--target-dir /origin_data/gmall/db/$1/$do_date \ 
# mr的目录路径必须不存在 加上此参数时如目标路径存在则先删除再执行mr
--delete-target-dir \ 
# 查询条件 and  \$CONDITIONS是sqoop固定语法
--query "$2 and  \$CONDITIONS" \ 
# 执行多少map任务 sqoop默认为4个
--num-mappers 1 \ 
# 数据的分隔符
--fields-terminated-by '\t' \ 
# 启动压缩
--compress \ 
# 压缩格式
--compression-codec lzop \ 
# string类型字段的null值替换为 \n,mysql中存储空值为null值,但hive存储空值为\N
--null-string '\\N' \ 
# 非string类型字段的null值替换为 \n
--null-non-string '\\N' 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# Sqoop 导入导出注意事项

Hive 中的 Null 在底层是以 “\N” 来存储,而 MySQL 中的 Null 在底层就是 Null,为了保证数据两端的一致性。在导出数据时采用 --input-null-string 和 --input-null-non-string 两个参数。导入数据时采用 --null-string 和 --null-non-string 。

# Hive 安装部署

把 apache-hive-3.1.2-bin.tar.gz 上传到 linux 的 /opt/software 目录下,只在 hadoop102 上安装

tar -zxvf /opt/software/apache-hive-3.1.2-bin.tar.gz -C /opt/module/
mv /opt/module/apache-hive-3.1.2-bin/ /opt/module/hive
1
2

添加 hive 环境变量

sudo vim /etc/profile.d/my_env.sh
1

添加以下内容

#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin
1
2
3

刷新环境变量

source /etc/profile.d/my_env.sh
1

解决日志 Jar 包冲突,进入目录

cd /opt/module/hive/lib
mv log4j-slf4j-impl-2.10.0.jar log4j-slf4j-impl-2.10.0.jar.bak
1
2

# Hive 元数据配置到 MySql

将 MySQL 的 JDBC 驱动拷贝到 Hive 的 lib 目录下

cp /opt/software/mysql-connector-java-5.1.48.jar /opt/module/hive/lib/
1

在 $HIVE_HOME/conf 目录下新建 hive-site.xml 文件

cd  /opt/module/hive/conf/
vim hive-site.xml
1
2

添加如下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>000000</value>
    </property>

    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>

    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop102:9083</value>
    </property>

    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>

    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop102</value>
    </property>

    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>
</configuration>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 初始化元数据并启动 Hive

# 初始化元数据库

在 mysql 上新建元数据库

mysql -uroot -p000000
1
create database metastore;
quit;
1
2

初始化 Hive 元数据库

schematool -initSchema -dbType mysql -verbose
1

修改元数据库字符集,Hive 元数据库的字符集默认为 Latin1,由于其不支持中文字符,所以建表语句中如果包含中文注释,会出现乱码现象。如需解决乱码问题,须做以下修改。

修改 Hive 元数据库中存储注释的字段的字符集为 utf-8。

use metastore;
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE mediumtext character set utf8;
quit;
1
2
3
4

# 启动 metastore 和 hiveserver2

Hive 2.x 以上版本,要先启动这两个服务,否则会报以下错误:

FAILED: HiveException java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
1

在 /opt/module/hive/bin 目录编写 hive 服务启动脚本

cd /opt/module/hive/bin/
vim hiveservices.sh
1
2

添加以下内容

#!/bin/bash
HIVE_LOG_DIR=$HIVE_HOME/logs

mkdir -p $HIVE_LOG_DIR

#检查进程是否运行正常,参数1为进程名,参数2为进程端口
function check_process()
{
    pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')
    ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)
    echo $pid
    [[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1
}

function hive_start()
{
    metapid=$(check_process HiveMetastore 9083)
    cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"
    cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"
    [ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"
    server2pid=$(check_process HiveServer2 10000)
    cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"
    [ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"
}

function hive_stop()
{
    metapid=$(check_process HiveMetastore 9083)
    [ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"
    server2pid=$(check_process HiveServer2 10000)
    [ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"
}

case $1 in
"start")
    hive_start
    ;;
"stop")
    hive_stop
    ;;
"restart")
    hive_stop
    sleep 2
    hive_start
    ;;
"status")
    check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"
    check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常"
    ;;
*)
    echo Invalid Args!
    echo 'Usage: '$(basename $0)' start|stop|restart|status'
    ;;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

启动脚本

chmod +x hiveservices.sh
hiveservices.sh start
1
2

查询 hive 后台服务状态

hiveservices.sh status
1
Metastore服务运行正常
HiveServer2服务运行正常
1
2

启动 hive

hive
1
show databases;
1

# Hive 相关总结

# Hive 的架构

image-20220917201822036

如何选择处理引擎:

  • mr 基于磁盘 统计周、月、年,数据量 比较大的场景(7 天)
  • tez 基于内存 最快;数据量的场景(OOM)
  • spark 基于磁盘(Shuffle)+ 内存 绝大多数(主要处理当天的任务)

# Hive 和数据库比较

Hive MySQL
数据量 大 小
速度 大数据场景快 小数据场景快
场景 查询 增删改查

Hive 和数据库除了拥有类似的查询语言,再无类似之处。

  1. 数据存储位置:Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
  2. 数据更新:Hive 中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的
  3. 执行延迟:Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive 的并行计算显然能体现出优势。
  4. 数据规模:Hive 支持很大规模的数据计算;数据库可以支持的数据规模较小。

# 内部表和外部表

  1. 内部表:当我们删除一个内部表时,Hive 也会删除这个表中数据。内部表不适合和其他工具共享数据。
  2. 外部表:删除该表并不会删除掉原始数据,删除的是表的元数据

什么时候创建内部表,什么时候创建外部表

  • 绝大数表都是外部表
  • 只有自己使用的临时表,才是内部表

# 4 个 By 区别

  1. Sort By:分区内有序;
  2. Order By:全局排序,只有一个 Reducer;
  3. Distrbute By:类似 MR 中 Partition,进行分区,结合 sort by 使用。
  4. Cluster By:当 Distribute by 和 Sorts by 字段相同时,可以使用 Cluster by 方式。Cluster by 除了具有 Distribute by 的功能外还兼具 Sort by 的功能。但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC。

# 函数

# 系统函数

  1. 日 date_add、date_sub
  2. 周 next_day
  3. 月 last_day、date_formate ()
  4. 解析 json get_json_object

# 自定义 UDF、UDTF

在项目中是否自定义过 UDF、UDTF 函数,以及用他们处理了什么问题,及自定义步骤?

  • 用 UDF 函数解析公共字段;用 UDTF 函数解析事件字段。
  • 自定义 UDF:继承 UDF 类,重写 evaluate 方法
  • 自定义 UDTF:继承自 GenericUDTF 类,重写 3 个方法:initialize (自定义输出的列名和类型),process(将结果返回 forward (result)),close

为什么要自定义 UDF/UDTF?

  • 因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,方便调试.

# 窗口函数

# RANK

  • RANK () 排序相同时会重复,总数不会变
  • DENSE_RANK () 排序相同时会重复,总数会减少
  • ROW_NUMBER () 会根据顺序计算

# OVER

OVER ():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化

  • CURRENT ROW:当前行
  • n PRECEDING:往前 n 行数据
  • n FOLLOWING:往后 n 行数据
  • UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING 表示到后面的终点

# 其他

  • LAG (col,n):往前第 n 行数据
  • LEAD (col,n):往后第 n 行数据
  • NTILE (n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。

# Hive 优化

  1. MapJoin

    • 如果不指定 MapJoin 或者不符合 MapJoin 的条件,那么 Hive 解析器会将 Join 操作转换成 Common Join,即:在 Reduce 阶段完成 join。容易发生数据倾斜。可以用 MapJoin 把小表全部加载到内存在 map 端进行 join,避免 reducer 处理。
  2. 行列过滤

    • 列处理:在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *。

    • 行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面,那么就会先全表关联,之后再过滤。

  3. 采用分区技术,创建分区表(防止计算数据的全部扫描)、分桶(对数据量比较大的数据,进行采样)

  4. 合理设置 Map 个数和 reduce 个数

    • 128m 数据对应 1 个 maptask 对应 1G 的内存 切片:max (0,min (块大小,Long 的最大值))
  5. 处理小文件

    • 在 Map 执行前合并小文件,减少 Map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。
    • JVM 重用
    • (map only 任务默认开启,map reduce 任务需要手动开启):将小于 16m 的文件定义为小文件;将小文件合并到 256m;
  6. 压缩:设置 map 端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了 IO 读写和网络传输,能提高很多效率)

  7. 列式存储 查询速度快(parquet、ORC)orc 用的比较的多,parquet 主要配合 spark 使用;增加压缩数据的比例。

  8. 开启 map 端 combiner(不影响最终业务逻辑) set hive.map.aggr=true;

# Hive 解决数据倾斜方法

数据倾斜长啥样?

image-20220917210939505


怎么产生的数据倾斜?

不同数据类型关联产生数据倾斜

情形:比如用户表中 user_id 字段为 int,log 表中 user_id 字段既有 string 类型也有 int 类型。当按照 user_id 进行两个表的 Join 操作时。

后果:处理此特殊值的 reduce 耗时;只有一个 reduce 任务 默认的 Hash 操作会按 int 型的 id 来进行分配,这样会导致所有 string 类型 id 的记录都分配到一个 Reducer 中。

解决方式:把数字类型转换成字符串类型

select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
1
2
3

解决数据倾斜的方法?

  1. group by:注:group by 优于 distinct group;解决方式:采用 sum () group by 的方式来替换 count (distinct) 完成计算。

  2. mapjoin 开启

  3. 开启数据倾斜时负载均衡 set hive.groupby.skewindata=true; 思想:就是先随机分发并处理,再按照 key group by 来分发处理。 操作:当选项设定为 true,生成的查询计划会有两个 MRJob。

    • 第一个 MRJob 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果。这样处理的结果是相同的 GroupBy Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;
    • 第二个 MRJob 再根据预处理的数据结果按照 GroupBy Key 分布到 Reduce 中(这个过程可以保证相同的原始 GroupBy Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
    • 所以需要第二次的 mapreduce, 这次就回归正常 shuffle, 但是数据分布不均匀的问题在第一次 mapreduce 已经有了很大的改善,因此基本解决数据倾斜。因为大量计算已经在第一次 mr 中随机分布到各个节点完成。
  4. 控制空值分布 将为空的 key 转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个 Reducer。 实践中,可以使用 case when 对空值赋上随机值。此方法比直接写 is not null 更好,因为前者 job 数为 1,后者为 2.

    --- 使用case when实例1
    select userid, name from user_info a 
    join (
    select case when userid is null  then  cast (rand(47)* 100000 as int )
    else userid end from user_read_log
    ) b  on a.userid = b.userid;
    
    ---- 使用case when实例2:
    select  '${date}' as thedate,
        a.search_type,
        a.query,
        a.category,
        a.cat_name,
        a.brand_id,
        a.brand_name,
        a.dir_type,
        a.rewcatid,
        a.new_cat_name,
        a.new_brand_id,
        f.brand_name as new_brand_name,
        a.pv,
        a.uv,
        a.ipv,
        a.ipvuv,
        a.trans_amt,
        a.trans_num,
        a.alipay_uv
    from fdi_search_query_cat_qp_temp a
    left outer join brand f
    on  f.pt='${date}000000' and case when a.new_brand_id is null then concat('hive',rand() ) else a.new_brand_id end = f.brand_id;
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30

    如果上述的方法还不能解决,比如当有多个 JOIN 的时候,建议建立临时表,然后拆分 HIVE SQL 语句。

# 用的是动态分区吗?动态分区的底层原理是什么?

  • 静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。
  • 详细来说,静态分区的列实在编译时期,通过用户传递来决定的;动态分区只有在 SQL 执行时才能决定。
  • 动态分区是基于查询参数的位置去推断分区的名称,从而建立分区

# Hive 里边字段的分隔符用的什么?为什么用 \t?有遇到过字段里边有 \t 的情况吗,怎么处理的?

hive 默认的字段分隔符为 ascii 码的控制符 \001(^A), 建表的时候用 fields terminated by '\001'

遇到过字段里边有 \t 的情况,自定义 InputFormat,替换为其他分隔符再做后续处理

# Spoop 相关总结

# Sqoop 导入导出 Null 存储一致性问题

Hive 中的 Null 在底层是以 “\N” 来存储,而 MySQL 中的 Null 在底层就是 Null,为了保证数据两端的一致性。在导出数据时采用 --input-null-string 和 --input-null-non-string 两个参数。导入数据时采用 --null-string 和 --null-non-string。

# Sqoop 数据导出的时候一次执行多长时间

Sqoop 任务一般情况 40 -50 分钟的都有。取决于数据量(11:11,6:18)。

# 离线指标常识

第二天 8 点前,必须把报表指标计算完毕

# Sqoop 在导入数据的时候数据倾斜

https://blog.csdn.net/lizhiguo18/article/details/103969906

sqoop 抽数的并行化主要涉及到两个参数:num-mappers:启动 N 个 map 来并行导入数据,默认 4 个;split-by:按照某一列来切分表的工作单元。

通过 ROWNUM () 生成一个严格均匀分布的字段,然后指定为分割字段

# Sqoop 底层运行的任务是什么

只有 Map 阶段,没有 Reduce 阶段的任务。

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
电商业务简介
数仓分层概念

← 电商业务简介 数仓分层概念→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式