数据采集模块
# 数据采集模块
# hadoop
# 安装准备工作
安装工具包
sudo yum install -y epel-release #额外软件源
sudo yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop git wget #安装 psmisc工具包 nc工具包 net-tools工具包 rsync 远程同步 vim编辑器 lrzsz上传下载 ntp时间同步
2
修改静态 ip
sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.42.102
PREFIX=24
GATEWAY=192.168.42.2
DNS1=192.168.42.2
2
3
4
5
6
7
8
9
service network restart
修改主机名
sudo vim /etc/hostname
修改 host
sudo vim /etc/hosts
192.168.42.102 hadoop102
192.168.42.103 hadoop103
192.168.42.104 hadoop104
2
3
windows 上也配置 host
关闭防火墙
sudo systemctl stop firewalld
sudo systemctl disable firewalld
2
创建用户 新版本 hadoop 无法使用 root 用户启动
sudo useradd atguigu
sudo passwd atguigu
2
修改 atguigu 权限
visudo
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
atguigu ALL=(ALL) ALL #在root下添加此行内容
2
3
创建 opt 下的存放目录
cd /opt
sudo mkdir module
sudo mkdir software
sudo chown atguigu:atguigu /opt/module /opt/software
2
3
4
# 安装 java
先卸载
rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
解压
cd /opt/software/
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
2
配置环境变量
sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
#yum 为/usr/lib/jvm/java
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
2
3
4
更新环境变量
source /etc/profile
java -version
2
# 安装 hadoop
解压
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
配置环境变量
sudo vim /etc/profile.d/my_env.sh
##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
2
3
4
更新
source /etc/profile
hadoop version
2
# xsync 脚本
cd /home/atguigu
vim xsync
2
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
chmod +x xsync
sudo mv xsync /bin/ #将脚本移动到/bin中,以便全局调用
2
# ssh 免密
生成 ssh 密钥
ssh-keygen -t rsa
分发 ssh 密钥
ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
2
3
4
5
以上生成和分发 需要在每台机器上执行
# 分发软件
cd /opt/module/
xsync jdk1.8.0_212/
xsync hadoop-3.1.3/
2
3
分发环境变量
sudo scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/my_env.sh
sudo scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/my_env.sh
2
# 集群配置
集群规划
| 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 | |
|---|---|---|---|
| HDFS | NameNode DataNode | DataNode | DataNode SecondaryNameNode |
| Yarn | NodeManager | Resourcemanager NodeManager | NodeManager |
进配置文件夹
cd /opt/module/hadoop-3.1.3/etc/hadoop
core-site.xml
vim core-site.xml
<property>
<name>fs.defaultFS</name>
<!-- 配置hdfs默认的地址 -->
<value>hdfs://hadoop102:8020</value>
</property>
<property>
<!-- 配置hadoop临时存放路径-->
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>
<property>
<!-- 兼容性配置hive -->
<name>hadoop.proxyuser.atguigu.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.atguigu.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.http.staticuser.user</name>
<value>atguigu</value>
</property>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
hdfs-site.xml
<property>
<!-- 2nn的地址 -->
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>
2
3
4
5
yarn-site.xml
<property>
<!--设置NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序-->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<!--设定单个容器可以申领到的最小内存资源-->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<!--设定物理节点有4G内存加入资源池-->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
mapred-site.xml
<!--Hadoop对MapReduce运行框架一共提供了3种实现,在mapred-site.xml中通过"mapreduce.framework.name"这个属性来设置为"classic"、"yarn"或者"local"-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
2
3
4
5
workers
vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
删除 localhost 文件中添加的内容结尾不允许有空格,文件中不允许有空行。
hadoop102
hadoop103
hadoop104
2
3
同步
xsync /opt/module/hadoop-3.1.3/etc/hadoop/
# 格式化
格式化 hdfs
hdfs namenode -format
启动 dfs
start-dfs.sh
如果报 java 未找到 修改 hadoop.env.sh 文件
vim /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh
修改 JAVA_HOME 为 并同步
export JAVA_HOME=/opt/module/jdk1.8.0_212
启动 yarn
start-yarn.sh #在103机器上启动
# hdfs 存储多目录
若 HDFS 存储空间紧张,需要对 DataNode 进行磁盘扩展。
在 DataNode 节点增加磁盘并进行挂载。

在 hdfs-site.xml 文件中配置多目录,注意新挂载磁盘的访问权限问题。
<property> <name>dfs.datanode.data.dir</name> <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value> </property>1
2
3
4增加磁盘后,保证每个目录数据均衡
bin/start-balancer.sh –threshold 101对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整。
停止数据均衡命令:
bin/stop-balancer.sh1
# LZO 压缩配置
# 编译
hadoop 本身并不支持 lzo 压缩,故需要使用 twitter 提供的 hadoop-lzo 开源组件。hadoop-lzo 需依赖 hadoop 和 lzo 进行编译,编译步骤如下
下载、安装并编译 LZO
sudo yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
tar -zxvf lzo-2.10.tar.gz
cd lzo-2.10
./configure -prefix=/usr/local/hadoop/lzo/
make
sudo make install
2
3
4
5
6
7
编译 hadoop-lzo 源码
下载 hadoop-lzo 的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
解压之后,修改 pom.xml
<hadoop.current.version>2.7.2</hadoop.current.version>
声明两个临时环境变量
export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
export LIBRARY_PATH=/usr/local/hadoop/lzo/lib
2
编译打包
mvn package -Dmaven.test.skip=true
进入 target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的 hadoop-lzo 组件
# 使用
将编译好后的 hadoop-lzo-0.4.20.jar 放入 hadoop-3.1.3/share/hadoop/common/
cd /opt/software
mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-3.1.3/share/hadoop/common/
2
同步分发
xsync /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar
core-site.xml 增加配置支持 LZO 压缩
vim /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
添加以下内容
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
分发
xsync /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
重新启动集群
stop-all.sh
start-dfs.sh #102
start-yarn.sh #103
2
3
# LZO 创建索引
将 bigtable.lzo(150M)上传到集群的 /input 目录
hadoop fs -mkdir /input
hadoop fs -put bigtable.lzo /input
2
执行 wordcount 程序
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output1
没有创建索引则 LZO 文件的切片只有一个

创建索引
# hadoop jar 编译好的lzo jar包 类引用路径 lzo数据文件
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
2

重新执行 wordcount
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output2

# 基准测试
# 测试 HDFS 写性能
向 HDFS 写 10 个 128M 的文件
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

# 测试 HDFS 读性能
读取 HDFS 集群 10 个 128M 的文件
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

# 删除测试生成数据
测试数据默认存储在 /benchmarks/TestDFSIO 上

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
# 使用 sort 测试 mapreduce
使用 RandomWriter 来产生随机数,每个节点运行 10 个 Map 任务,每个 Map 产生大约 1G 大小的二进制随机数
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data
执行 sort 程序
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
验证数据是否真正排好序了
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
# hadoop 参数调优
HDFS 参数调优 hdfs-site.xml
dfs.namenode.handler.count=20 * log2(Cluster Size)1如集群规模为 8 台时,此参数设置为 60
NameNode 有一个工作线程池,用来处理不同 DataNode 的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数 dfs.namenode.handler.count 的默认值 10。设置该值的一般原则是将其设置为集群大小的自然对数乘以 20,即 20logN,N 为集群大小
YARN 参数调优 yarn-site.xml
情景描述:总共 7 台机器,每天几亿条数据,数据源 ->Flume->Kafka->HDFS->Hive
面临问题:数据统计主要用 HiveSQL,没有数据倾斜 (每个 reduce 数据没有偏大或偏小),小文件已经做了合并处理,开启的 JVM 重用,而且 IO 没有阻塞,内存用了不到 50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
解决办法:内存利用率不够。这个一般是 Yarn 的 2 个配置造成的,单个任务可以申请的最大内存大小,和 Hadoop 单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
yarn.nodemanager.resource.memory-mb
表示该节点上 YARN 可使用的物理内存总量,默认是 8192(MB),注意,如果你的节点内存资源不够 8GB,则需要调减小这个值,而 YARN 不会智能的探测节点的物理内存总量。
yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是 8192(MB)。

Hadoop 宕机
- 如果 MR 造成系统宕机。此时要控制 Yarn 同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是 8192MB)
- 如果写入文件过量造成 NameNode 宕机。那么调高 Kafka 的存储大小,控制从 Kafka 到 HDFS 的写入速度。高峰期的时候用 Kafka 进行缓存,高峰期过去数据同步会自动跟上。
# zookeeper
集群规划
| 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 | |
|---|---|---|---|
| Zookeeper | Zookeeper | Zookeeper | Zookeeper |
tar -zxvf /opt/software/apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
mv /opt/module/apache-zookeeper-3.5.7-bin/ /opt/module/zookeeper
#环境变量
sudo vim /etc/profile.d/my_env.sh
#追加
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
#同步环境变量
source /etc/profile.d/my_env.sh
sudo xsync /etc/profile.d/my_env.sh
#将配置文件改名称为zoo.cfg
cd /opt/module/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
#配置zookeeper文件
vim zoo.cfg
#追加下内容
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
#修改数据存储位置
dataDir=/opt/module/zookeeper/zkData
cd /opt/module/zookeeper
mkdir zkData
#创建myid用于zookeeper标记机器
#编辑为102的2用于唯一标识 103为3 104为4
echo 2 > /opt/module/zookeeper/zkData/myid
#同步
xsync /opt/module/zookeeper/
#103
echo 3 > /opt/module/zookeeper/zkData/myid
#104
echo 4 > /opt/module/zookeeper/zkData/myid
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
启动
#在三台机器上分别启动
zkServer.sh start
2
# 群起脚本
在 hadoop102 的 /home/atguigu/bin 目录下创建脚本
mkdir -p /home/atguigu/bin
cd /home/atguigu/bin
vim zk.sh
2
3
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo "------------- $i -------------"
ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo "------------- $i -------------"
ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo "------------- $i -------------"
ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
done
};;
esac
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
增加权限
chmod 755 zk.sh
zk.sh start
2
# 日志生成
将之前日志生成的带依赖 jar 上传到 102 上
# 代码参数说明
// 参数一:控制发送每条的延时时间,默认是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
// 参数二:循环遍历次数
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
2
3
4
5
# 启动方式
启动方式有两种:
通过 jar -classpath 指定全类名启动
java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain >/opt/module/test.log1通过 jar -jar 启动 必须打包时指定启动类类引用否则无法启动
java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar >/opt/module/test.log1
# 日志收集
我们通过上面方式启动 java 程序会收集控制台中的输出 但我们已经通过 logback 收集日志到指定路径文件了 我们无需保存控制台上的输出内容 可以通过将输出内容直接输出到 /dev/null 中
/dev/null 代表 linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称 “黑洞”。
| 名称 | 代码 | 操作符 | Java 中表示 | Linux 下文件描述符(Debian 为例) |
|---|---|---|---|---|
| 标准输入 (stdin) | 0 | < 或 << | System.in (opens new window) | /dev/stdin -> /proc/self/fd/0 -> /dev/pts/0 |
| 标准输出 (stdout) | 1 | >, >>, 1> 或 1>> | System.out | /dev/stdout -> /proc/self/fd/1 -> /dev/pts/0 |
| 标准错误输出 (stderr) | 2 | 2> 或 2>> | System.err | /dev/stderr -> /proc/self/fd/2 -> /dev/pts/0 |
java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 2>/dev/null 1>/dev/null
可以简写为
java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null 2>&1
在 /tmp/logs 路径下查看生成的日志文件
cd /tmp/logs/
ls
2
# 集群日志生成脚本
先同步日志 jar 到集群其他机器上
xsync /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar
在 /home/atguigu/bin 目录下创建脚本 lg.sh
vim /home/atguigu/bin/lg.sh
#! /bin/bash
for i in hadoop102 hadoop103
do
ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done
2
3
4
5
6
chmod 755 /home/atguigu/bin/lg.sh
/home/atguigu/bin/lg.sh
2
# 集群时间同步修改脚本(非正规临时脚本)
在 /home/atguigu/bin 目录下创建脚本 dt.sh
vim /home/atguigu/bin/dt.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i =========="
ssh -t $i "sudo date -s $1"
done
2
3
4
5
6
7
ssh -t 通常用于 ssh 远程执行 sudo 命令
chmod 755 /home/atguigu/bin/dt.sh
/home/atguigu/bin/dt.sh 2020-03-10
2
# 集群所有进程查看脚本
在 /home/atguigu/bin 目录下创建脚本 xcall.sh
vim /home/atguigu/bin/xcall.sh
#! /bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done
2
3
4
5
6
7
chmod 755 /home/atguigu/bin/xcall.sh
xcall.sh jps
2
# 采集日志 Flume

集群规划:
| 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 | |
|---|---|---|---|
| Flume (采集日志) | Flume | Flume |
# 安装
tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
cd /opt/module/
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
#将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar
sudo vim /etc/profile.d/my_env.sh
#FLUME_HOME
export FLUME_HOME=/opt/module/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile.d/my_env.sh
2
3
4
5
6
7
8
9
10
11
12
同步
cd /opt/module/
xsync flume/
sudo xsync /etc/profile.d/my_env.sh
2
3
# Flume 组件选择
- Source
(1)Taildir Source 相比 Exec Source、Spooling Directory Source 的优势
TailDir Source:** 断点续传、多目录。**Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。
Exec Source 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失。
Spooling Directory Source 监控目录,不支持断点续传。
(2)batchSize 大小如何设置?
答:Event 1K 左右时,500-1000 合适(默认为 100)
- Channel
采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据是存储在磁盘中。
注意在 Flume1.7 以前,Kafka Channel 很少有人使用,因为发现 parseAsFlumeEvent 这个配置起不了作用。也就是无论 parseAsFlumeEvent 配置为 true 还是 false,都会转为 Flume Event。这样的话,造成的结果是,会始终都把 Flume 的 headers 中的信息混合着内容一起写入 Kafka 的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
# Flume 配置

file-flume-kafka.conf 文件
cd /opt/module/flume/conf
vim file-flume-kafka.conf
2
a1.sources=r1
a1.channels=c1 c2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# ETL 和分类型拦截器
本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要用于,过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往 Kafka 的不
在 ide 创建 flume-interceptor 工程 导入依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
创建 com.atguigu.flume.interceptor 包 包下创建 LogETLInterceptor 类 并实现 Interceptor 接口
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//将event转换为string 方便处理
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
//判断日志是启动日志还是事件日志 并向header中添加标识
if (log.contains("start")) {
//清洗启动日志
if (LogUtils.validateStart(log)) {
return event;
}
} else {
//清洗事件日志
if (LogUtils.validateEvent(log)) {
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> intercepts = new ArrayList<>();
for (Event event : list) {
Event intercept = intercept(event);
if (intercept != null) {
intercepts.add(intercept);
}
}
return intercepts;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
LogUtils 工具类
package com.atguigu.flume.interceptor;
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
public static boolean validateStart(String log) {
if (log == null) {
return false;
}
//开头不是以{或结尾不是以}结束的 不是json字符串
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
return false;
}
return true;
}
public static boolean validateEvent(String log) {
if (log == null) {
return false;
}
//以 | 切割
String[] logConents = log.split("\\|");
//判断长度
if (logConents.length != 2) {
return false;
}
//判断服务器时间是否够13位 并且判断是否全为数字
if (logConents[0].length() != 13 || !NumberUtils.isDigits(logConents[0])) {
return false;
}
//判断json完整性
if (!logConents[1].trim().startsWith("{") || !logConents[1].trim().endsWith("}")) {
return false;
}
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
分类型拦截器 LogTypeInterceptor
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//取出body数据
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
//header
Map<String, String> headers = event.getHeaders();
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> resultEvent = new ArrayList<>();
for (Event event : list) {
resultEvent.add(event);
}
return resultEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
打包上传 jar 包到 flume 的 lib 中 选择不带依赖即可
同步分发
xsync /opt/module/flume/
# flume 启动脚本
在 /home/atguigu/bin 目录下创建脚本 f1.sh
vim /home/atguigu/bin/f1.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
说明 1:nohup,该命令可以在你退出帐户 / 关闭终端之后继续运行相应的进程。nohup 就是不挂起的意思,不挂断地运行命令。
说明 2:awk 默认分隔符为空格
说明 3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。
cd /home/atguigu/bin/
chmod 755 f1.sh
f1.sh start
2
3
# Kafka
集群规划:
| 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 | |
|---|---|---|---|
| Kafka | Kafka | Kafka | Kafka |
要先启动 zookeeper
tar -zxvf /opt/software/kafka_2.11-2.4.1.tgz -C /opt/module/
cd /opt/module/
mv kafka_2.11-2.4.1/ kafka
cd kafka
mkdir logs
#环境变量
sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile.d/my_env.sh
2
3
4
5
6
7
8
9
10
11
12
13
14
配置 kafka
vim /opt/module/kafka/config/server.properties
除了删除功能是新添加属性 其他都是修改
#broker的全局唯一编号,不能重复
broker.id=0
#添加删除功能
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
2
3
4
5
6
7
8
9
10
11
12
同步
xsync /opt/module/kafka
sudo xsync /etc/profile.d/my_env.sh
2
修改每个 kafka 中 broker.id
vim /opt/module/kafka/config/server.properties
#102 为 0, 103为1 , 104为2
2
# 群起脚本
在 /home/atguigu/bin 目录下创建脚本 kf.sh
vim /home/atguigu/bin/kf.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
};;
esac
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cd /home/atguigu/bin/
chmod 755 kf.sh
kf.sh start
2
3
# 创建 Kafka Topic
进入到 /opt/module/kafka/ 目录下分别创建:启动日志主题、事件日志主题
kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_start
kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_event
2
查询 topic 列表
kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
删除 topic
kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_start
kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_event
2
生产消息
kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start
消费消息
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
# 压力测试
用 Kafka 官方自带的脚本,对 Kafka 进行压测。Kafka 压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络 IO)。一般都是网络 IO 达到瓶颈。
# 生产者
kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
record-size 是一条信息有多大,单位是字节。
num-records 是总共发送多少条信息。
throughput 是每秒多少条信息,设成 - 1,表示不限流,可测出生产者最大吞吐量。

本例中一共写入 10w 条消息,吞吐量为 7.05 MB/sec,每次写入的平均延迟为 408.12 毫秒,最大的延迟为 603.00 毫秒。
# 消费者
kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
--zookeeper 指定 zookeeper 的链接信息
--topic 指定 topic 的名称
--fetch-size 指定每次 fetch 的数据的大小
--messages 总共要消费的消息个数

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始测试时间,测试结束数据,共消费数据 9.5367MB,吞吐量 0.7875MB/s****,共消费 100000 条,平均每秒消费 **8257 条。
# kafka 机器数量计算
Kafka 机器数量(经验公式)=2(峰值生产速度 * 副本数 / 100)+1*
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署 Kafka 的数量。
比如我们的峰值生产速度是 50M/s。副本数为 2。
Kafka 机器数量 = 2*(50*2/100)+ 1=3 台
# 消费 kafka 数据 flume
此 flume 用于消费 kafka 中的数据 上传至 hdfs 上
集群规划
| 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 | |
|---|---|---|---|
| Flume(消费 Kafka) | Flume |

# 日志消费 flume 配置
目前是启动日志和事件日志写在同一个 config 配置中 建议拆分为两个配置文件分开运行 解耦
在 hadoop104 的 /opt/module/flume/conf 目录下创建 kafka-flume-hdfs.conf 文件
vim /opt/module/flume/conf/kafka-flume-hdfs.conf
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#控制消费速度 每秒5000条
a1.sources.r1.batchSize = 5000
#延迟时间如果指定毫秒数没有到达指定消费速度 同样消费速度
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
#checkpointDir为存储日志 dataDirs为存储数据位置 两目录可以指定一个多目录路径提高读写性能
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
#文件最大值
a1.channels.c2.maxFileSize = 2146435071
#数据容量
a1.channels.c2.capacity = 1000000
#等待时长 如果容量未达到指定大小,并且超过指定时间则提交到sink
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件 以下3个配置属性满足任意一个则滚动到下一个文件
#等待多少秒才滚动到下一个文件 默认为30s 建议为3600s
a1.sinks.k1.hdfs.rollInterval = 10
#多少字节滚动一次 默认为1024
a1.sinks.k1.hdfs.rollSize = 134217728
#event写入的个数滚动 默认为10 0则为禁用
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是否原生文件 此处设置为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
# 设置压缩格式
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

# Flume 组件选择
- FileChannel 和 MemoryChannel 区别
MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从失败中恢复数据。
企业选型:
- 金融类公司、对钱要求非常准确的公司通常会选择 FileChannel
- 传输的是普通日志信息(京东内部一天丢 100 万 - 200 万条,这是非常正常的),通常选择 MemoryChannel。
FileChannel 优化
通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据
Sink:HDFS Sink
(1)HDFS 存入大量小文件,有什么影响?
** 元数据层面:** 每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命
** 计算层面:** 默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS 小文件处理
官方默认的这三个参数配置写入 HDFS 后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综合作用,效果如下:
(1)文件在达到 128M 时会滚动生成新文件
(2)文件创建超 3600 秒时会滚动生成新文件
# 日志消费 Flume 启动脚本
cd /home/atguigu/bin
vim f2.sh
2
#! /bin/bash
case $1 in
"start"){
for i in hadoop104
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop104
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
done
};;
esac
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
chmod 755 f2.sh
f2.sh start
f2.sh stop
2
3
# Flume 内存优化
如果启动消费 Flume 抛出如下异常则表示该机器最小内存不满足于 Flume 配置的最小启动内存
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2
我们可以修改 Flume 的默认内存配置
vim /opt/module/flume/conf/flume-env.sh
#添加以下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
2
分发配置文件
xsync flume-env.sh
Flume 内存参数设置及优化
- JVM heap 一般设置为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)
- -Xmx 与 - Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 fullgc。
- -Xms 表示 JVM Heap (堆内存) 最小尺寸,初始分配;-Xmx 表示 JVM Heap (堆内存) 最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc。
# 采集通道启动 / 停止脚本
该脚本控制全部组件的启动和停止
cd /home/atguigu/bin
vim cluster.sh
2
注意要先启动 zookeeper 再起 kafka 关闭时候先关 kafka 再关 zookeeper
#! /bin/bash
case $1 in
"start"){
echo " -------- 启动 集群 -------"
echo " -------- 启动 hadoop集群 -------"
/opt/module/hadoop-3.1.3/sbin/start-dfs.sh
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
#启动 Zookeeper集群
zk.sh start
sleep 4s;
#启动 Flume采集集群
f1.sh start
#启动 Kafka采集集群
kf.sh start
sleep 6s;
#启动 Flume消费集群
f2.sh start
};;
"stop"){
echo " -------- 停止 集群 -------"
#停止 Flume消费集群
f2.sh stop
#停止 Kafka采集集群
kf.sh stop
sleep 6s;
#停止 Flume采集集群
f1.sh stop
#停止 Zookeeper集群
zk.sh stop
echo " -------- 停止 hadoop集群 -------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh
};;
esac
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
chmod 755 cluster.sh
cluster.sh start
cluster.sh stop
2
3
# 采集模块总结
# Linux&Shell 相关总结
Linux 常用高级命令
| 序号 | 命令 | 命令解释 |
|---|---|---|
| 1 | top | 查看内存 |
| 2 | df -h | 查看磁盘存储情况 |
| 3 | iotop | 查看磁盘 IO 读写 (yum install iotop 安装) |
| 4 | iotop -o | 直接查看比较高的磁盘读写程序 |
| 5 | netstat -tunlp | grep 端口号 | 查看端口占用情况 |
| 6 | uptime | 查看报告系统运行时长及平均负载 |
| 7 | ps aux | 查看进程 |
Shell 常用工具
awk、sed、cut、sort
# Hadoop 相关总结
Hadoop 默认不支持 LZO 压缩,如果需要支持 LZO 压缩,需要添加 jar 包,并在 hadoop 的 cores-site.xml 文件中添加相关压缩配置。需要掌握让 LZO 文件支持切片。
Hadoop 常用端口号,50070,8088,19888,9000
Hadoop 配置文件以及简单的 Hadoop 集群搭建。8 个配置文件
core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers(2.7 为 slaves)
默认块大小
- 集群模式 128m
- 本地模式 32m
- hadoop1.x 64m
- 在业务开发中 hdfs 块大小通常设置为 128m 或 256m
- hive 的文件块大小 256m
小文件问题
危害
- 占用 namenode 内存 一个文件块占用 namenode 150 字节
- 增加切片数 每个文件开启一个 maptask (默认内存为 1g)
解决办法
- har 归档 自定义 FileInputFormat
- combineInputformat 减少切片个数,进而减少的是 maptask
- 开启 JVM 重用
MapReduce
- shuffle 优化
- map 方法之后 reduce 方法之前 混洗过程
Yarn
工作机制
调度器
FIFO、容量、公平调度器 Apache 默认调度器:容量 CDH 默认调度器:公平调度器
FIFO 调度器特点:单队列,先进先出,在企业开发中没人使用
容量调度器:支持多队列,先进来的任务优先享有资源
公平调度器:支持多队列,每个任务公平享有资源 并发度最高。
对并发度要求比较高,同时机器性能比较好,选择公平; 大公司 如果并发度不高,机器性能比较差,选择容量: 中小公司
生产环境下队列怎么创建? 容量调度器默认只有一个 default 队列; 按照框架名称:hive、spark、flink
按照业务名称:登录、购物车、支付模块、部门 1、部门 2 (居多)
好处:解耦、降低风险、可以实现任务降级(部门 1》部门 2》购物车)
# Zookeeper 相关总结
选举机制:
- 半数机制 pax,安装奇数台
- 10 台服务器几台:3 台
- 20 台服务器几台:5 台
- 100 台服务器几台:11 台
- 不是越多越好,也不是越少越好。 如果多,通信时间长,效率低;如果太少,可靠性差。
常用命令:
- ls、get、create
# Flume 相关总结
# 组成
Flume 组成,Put 事务,Take 事务
Taildir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。
File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
Kafka Channel:减少了 Flume 的 Sink 阶段,提高了传输效率。
Source 到 Channel 是 Put 事务
Channel 到 Sink 是 Take 事务
# taildir source
- 断点续传、多目录
- 在 Apache flume1.7 之后产生的;如果是 CDH,1.6 之后;
- 自定义 source 实现断点续传的功能(只要能自定义,有很多想象空间了)
- taildir source 挂了怎么办? 不会丢失、可能产生数据重复
- 对重复数据怎么处理? 不处理;
处理:(自身:自定义 source 实现事务,额外增加了运算量)
- 在下一级处理:hive 的数仓里面(dwd 层,数据清洗 ETL)
- spark Streaming 里面处理 去重的手段:group by (id) 开窗(id), 只取窗口第一条
- 是否支持递归读取文件? 不支持;自定义 tail source (递归 + 读取) 消费 kafka + 上传 hdfs
# channel
- File Channel : 基于磁盘、效率低、可靠性高
- memoryChannel:基于内存、效率高、可靠性低
- KafkaChannel:数据存储在 Kafka 里面,基于磁盘、效率高于 memoryChannel+kafkasink, 因为省了 sink flume1.6 时 topic + 内容; 无论 true 还是 false 都不起作用; bug flume1.7 解决 bug, 被大家广泛使用;
- 在生产环境:如果下一级是 kafka 的话,优先选择 KafkaChannel; 如果不是 kafka,如果更关心可靠性选择 FileChannel;如果更关心性能,选择 memoryChannel
# HDFS sink
控制小文件:
时间(1-2 小时)、大小(128m)、event 个数 (0 禁止)
压缩
开启压缩流;指定压缩编码方式(lzop/snappy)
# 拦截器
ETL(判断 json 的完整性 { }; 服务器时间(13 全数字)) 项目中自定义了:ETL 拦截器和区分类型拦截器。 用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
分类型(启动日志、事件日志) kafka(的 topic 要满足下一级所又消费者) 一张表一个 topic
商品列表、商品详情、商品点击 广告 点赞、评论、收藏 后台活跃、通知 故障 启动日志自定义拦截器步骤
- 定义一个类 实现 interceptor 接口
- 重写 4 个方法:初始化、关闭、单 event、多 event()
- initialize 初始化
- public Event intercept (Event event) 处理单个 Event
- public List
<Event>intercept(List<Event>events) 处理多个 Event,在这个方法中调用 Event intercept (Event event) - close 方法
- 创建一个静态内部类 Builder
- 静态内部类,实现 Interceptor.Builder
拦截器不要行不行? 看具体业务需求决定
# 选择器

rep(默认) mul(选择性发往下一级通道)
# 监控器
ganglia 发现尝试提交的次数 远远大于最终提交成功次数; 说明 flume 性能不行;
自身;提高自己的内存 4-6g flume_env.sh 外援:增加 flume 台数 服务器配置(16g/32g 8T)
# Flume 内存
开发中在 flume-env.sh 中设置 JVM heap 为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)
-Xmx 与 - Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 fullgc。
-Xms 表示 JVM Heap (堆内存) 最小尺寸,初始分配;-Xmx 表示 JVM Heap (堆内存) 最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc。
# 优化
File Channel 能多目录就多目录(要求在不同的磁盘),提高吞吐量 checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据
HDFS Sink 控制小文件; 时间(1-2 小时)、大小(128m)、event 个数 (0 禁止)
- HDFS 存入大量小文件,有什么影响?
** 元数据层面:** 每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命
** 计算层面:** 默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
- HDFS 小文件处理
官方默认的这三个参数配置写入 HDFS 后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综合作用,效果如下:
(1)文件在达到 128M 时会滚动生成新文件
(2)文件创建超 3600 秒时会滚动生成新文件
监控器
# Flume 采集数据会丢失吗?
不会,Channel 存储可以存储在 File 中,数据传输自身有事务。
# Kafka 相关总结
Kafka 架构

基本信息:
Kafka 组成:zk 里面存储 broker 信息 消费者信息 唯独没有生产者信息。

搭建多少台 Kafka:2(生产者峰值生产速率 * 副本 / 100)+1 =3 2 *(生产者峰值生产速率 * 2/100)+ 1= 3 => 生产者峰值生产速率 < 50m/s 50m/s * 60 秒 = 3G
副本数:2 个居多、3 个
- 好处:提高可靠性;
- 坏处:增加了网络 IO
压测(生产者峰值生产速率) 消费速率
默认数据保存多久 默认保存 7 天 但在生产环境中一般设置为 3 天
Kafka 的磁盘预留多大空间 100g 数据 * 2 个副本 * 3 天 / 0.7
数据量计算 100 万日活 1 个人 100 条日志 100 万 * 100 条 = 1 亿条 平均速度是的多少 1 亿条 /(24*3600s)=1150 条 /s 每秒多少 m 1 条日志 1k => 1m/s 生产环境,你的数据量什么时候达到峰值?618 1111 早上中午、晚上 晚上 8 点以后 只要不超过 50m/s 就行 20-30m/s
分区数设置多少? 先设置一个分区; 压测他的 峰值生产速率 tp;峰值消费速率 tc 用户有个期望的吞吐量 p p/min (tp,tc)= 分区数 P 100m/s tp 20m/s tc 30m/s
100/20 = 5 个分区 企业当中分区数一般为 3-10 个 消费者要有对应的 CPU 核数
ISR 主要解决 Leader 挂了谁当老大? 在 ISR 队列里面都有机会当老大;
- 旧版:延迟时间和延迟条数;
- 新版:延迟时间
分区分配策略
range(默认) 容易导致数据倾斜
10 个 3 个分区 [0 1 2 3] [4 5 6] [7 8 9]
round robin 能够减少数据倾斜 hash 随机打散,再采用轮询的方式;
监控 Kafka
- eagle
- Kafkamanager
- Kafkamontor
Kafka 挂掉 重新启动
短时间内数据进入 flume channel 长时间,日志服务器保存数据 30 天
丢失数据
- ack = 0:发送过去数据,就不管了;可靠性最差,传输效率是最快的
- ack = 1:发送过去数据,Leader 应答;可靠性一般;传输效率一般;
- ack = -1:发送过去数据,Leader+follower 应答;可靠性最高;传输效率最低
- 在生产环境,通常不会用 0:选择 1 的最多; 在绝大多数场景都是传输的普通日志,都是效率至上;选择 1 金融行业,和钱有关的行业,要选 - 1
重复数据
- 幂等性 + 事务 + ack=-1 下一级去重(hive 的 dwd 层;sparksteaming)group by 和开窗 幂等性的是(单分区、会话内不重;Kafka 重启容易数据重复)
积压
- 自身:增加分区,消费者 CPU 核数
- 外援:增加消费者消费速度 flume ==> kafka ==> flume ==> hdfs batchsize 1000 条 ==>2000 条~3000 条
参数优化
- 网络和 io 操作线程配置优化
- 计算型任务线程数
num.network.threads =cpu核数+1默认为 3 - IO 密集型任务线程数
num.io.threads = CPU核数*2
- 计算型任务线程数
- log 数据文件刷盘策略
- 每当 producer 写入 10000 条消息时,刷数据到磁盘
log.flush.interval.messages=10000 - 每间隔 1 秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
- 每当 producer 写入 10000 条消息时,刷数据到磁盘
- 日志保留策略配置
- 日志保留时间小时 log.retention.hours=72 默认为 168 小时
- 默认副本数
- 这个参数指新创建一个 topic 时,默认的 Replica 数量,Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。
offsets.topic.replication.factor=3
- 这个参数指新创建一个 topic 时,默认的 Replica 数量,Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。
- Producer 优化(producer.properties)
- 在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定。
buffer.memory=33554432(32m) - 默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和 Broker 的存储压力。
compression.type=none
- 在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定。
- Kafka 内存调整(kafka-server-start.sh)
- 默认内存 1 个 G,生产环境尽量不要超过 6 个 G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
- 默认内存 1 个 G,生产环境尽量不要超过 6 个 G。
- 网络和 io 操作线程配置优化
其他
- Kafka 高效读写数据
- Kafka 本身是分布式集群,同时采用分区技术,并发度高
- 顺序写磁盘
- 零复制技术
- Kafka 支持传输
- kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中,常常会出现一条消息大于 1M,如果不对 kafka 进行配置。则会出现生产者无法将消息推送到 kafka 或消费者无法去消费 kafka 里面的数据,这时我们就要对 kafka 进行以下配置:(server.properties)
replica.fetch.max.byes=12582912message.max.byes=11534336
- kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中,常常会出现一条消息大于 1M,如果不对 kafka 进行配置。则会出现生产者无法将消息推送到 kafka 或消费者无法去消费 kafka 里面的数据,这时我们就要对 kafka 进行以下配置:(server.properties)
# Kafka 过期数据清理
保证数据没有被引用(没人消费他) 日志清理保存的策略只有 delete 和 compact 两种log.cleanup.policy=delete启用删除策略log.cleanup.policy=compact启用压缩策略
- Kafka 高效读写数据