Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

    • 数据仓库概念
    • 项目需求及架构设计
    • 数据生成模块
    • 数据采集模块
      • hadoop
        • 安装准备工作
        • 安装java
        • 安装hadoop
        • xsync脚本
        • ssh免密
        • 分发软件
        • 集群配置
        • 格式化
        • hdfs存储多目录
        • LZO压缩配置
        • 编译
        • 使用
        • LZO创建索引
        • 基准测试
        • 测试HDFS写性能
        • 测试HDFS读性能
        • 删除测试生成数据
        • 使用sort测试mapreduce
        • hadoop参数调优
      • zookeeper
        • 群起脚本
      • 日志生成
        • 代码参数说明
        • 启动方式
        • 日志收集
        • 集群日志生成脚本
        • 集群时间同步修改脚本(非正规临时脚本)
        • 集群所有进程查看脚本
      • 采集日志Flume
        • 安装
        • Flume组件选择
        • Flume配置
        • ETL和分类型拦截器
        • flume启动脚本
      • Kafka
        • 群起脚本
        • 创建Kafka Topic
        • 压力测试
        • 生产者
        • 消费者
        • kafka机器数量计算
      • 消费kafka数据flume
        • 日志消费flume配置
        • Flume组件选择
        • 日志消费Flume启动脚本
        • Flume内存优化
      • 采集通道启动/停止脚本
      • 采集模块总结
        • Linux&Shell相关总结
        • Hadoop相关总结
        • Zookeeper相关总结
        • Flume相关总结
        • 组成
        • taildir source
        • channel
        • HDFS sink
        • 拦截器
        • 选择器
        • 监控器
        • Flume 内存
        • 优化
        • Flume采集数据会丢失吗?
        • Kafka相关总结
      • Kafka过期数据清理
    • 电商业务简介
    • 业务数据采集模块
    • 数仓分层概念
    • 数仓搭建-ODS层
    • 数仓搭建-DWD层
    • 数仓搭建-DWS层
    • 数仓搭建-DWT层
  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • 离线数仓
Iekr
2021-12-09
目录

数据采集模块

# 数据采集模块

# hadoop

# 安装准备工作

安装工具包

sudo yum install -y epel-release  #额外软件源
sudo yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop git wget #安装 psmisc工具包  nc工具包 net-tools工具包 rsync 远程同步  vim编辑器 lrzsz上传下载  ntp时间同步 
1
2

修改静态 ip

sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
1
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.42.102
PREFIX=24
GATEWAY=192.168.42.2
DNS1=192.168.42.2
1
2
3
4
5
6
7
8
9
service network restart
1

修改主机名

sudo vim /etc/hostname
1

修改 host

sudo vim /etc/hosts
1
192.168.42.102 hadoop102
192.168.42.103 hadoop103
192.168.42.104 hadoop104
1
2
3

windows 上也配置 host

关闭防火墙

sudo systemctl stop firewalld
sudo systemctl disable firewalld
1
2

创建用户 新版本 hadoop 无法使用 root 用户启动

sudo useradd atguigu
sudo passwd atguigu
1
2

修改 atguigu 权限

visudo
1
## Allow root to run any commands anywhere
root    ALL=(ALL)     ALL
atguigu   ALL=(ALL)     ALL #在root下添加此行内容
1
2
3

创建 opt 下的存放目录

cd /opt
sudo mkdir module
sudo mkdir software
sudo chown atguigu:atguigu /opt/module /opt/software
1
2
3
4

# 安装 java

先卸载

rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
1

解压

cd /opt/software/
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
1
2

配置环境变量

sudo vim /etc/profile.d/my_env.sh
1
#JAVA_HOME
#yum 为/usr/lib/jvm/java
export JAVA_HOME=/opt/module/jdk1.8.0_212 
export PATH=$PATH:$JAVA_HOME/bin
1
2
3
4

更新环境变量

source /etc/profile
java -version
1
2

# 安装 hadoop

解压

tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
1

配置环境变量

sudo vim /etc/profile.d/my_env.sh
1
##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
1
2
3
4

更新

source /etc/profile
hadoop version
1
2

# xsync 脚本

cd /home/atguigu
vim xsync
1
2
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
chmod +x xsync
sudo mv xsync /bin/   #将脚本移动到/bin中,以便全局调用
1
2

# ssh 免密

生成 ssh 密钥

ssh-keygen -t rsa
1

分发 ssh 密钥

ssh-copy-id hadoop102

ssh-copy-id hadoop103

ssh-copy-id hadoop104
1
2
3
4
5

以上生成和分发 需要在每台机器上执行

# 分发软件

cd /opt/module/
xsync jdk1.8.0_212/
xsync hadoop-3.1.3/
1
2
3

分发环境变量

sudo scp /etc/profile.d/my_env.sh  root@hadoop103:/etc/profile.d/my_env.sh
sudo scp /etc/profile.d/my_env.sh  root@hadoop104:/etc/profile.d/my_env.sh
1
2

# 集群配置

集群规划

服务器 hadoop102 服务器 hadoop103 服务器 hadoop104
HDFS NameNode DataNode DataNode DataNode SecondaryNameNode
Yarn NodeManager Resourcemanager NodeManager NodeManager

进配置文件夹

cd /opt/module/hadoop-3.1.3/etc/hadoop
1

core-site.xml

vim core-site.xml 
1
<property>
        <name>fs.defaultFS</name>
    <!-- 配置hdfs默认的地址 -->
        <value>hdfs://hadoop102:8020</value>
    </property>
    <property>
        <!-- 配置hadoop临时存放路径-->
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-3.1.3/data</value>
    </property>
    <property>
        <!-- 兼容性配置hive -->
        <name>hadoop.proxyuser.atguigu.hosts</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.proxyuser.atguigu.groups</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>atguigu</value>
    </property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

hdfs-site.xml

<property>
    <!-- 2nn的地址 -->
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop104:9868</value>
    </property>
1
2
3
4
5

yarn-site.xml

<property>
    <!--设置NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序-->
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop103</value>
    </property>
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
	<!--设定单个容器可以申领到的最小内存资源-->
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>4096</value>
    </property>
<!--设定物理节点有4G内存加入资源池-->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>4096</value>
    </property>
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

mapred-site.xml

<!--Hadoop对MapReduce运行框架一共提供了3种实现,在mapred-site.xml中通过"mapreduce.framework.name"这个属性来设置为"classic"、"yarn"或者"local"-->
<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
1
2
3
4
5

workers

vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
1

删除 localhost 文件中添加的内容结尾不允许有空格,文件中不允许有空行。

hadoop102
hadoop103
hadoop104
1
2
3

同步

xsync /opt/module/hadoop-3.1.3/etc/hadoop/
1

# 格式化

格式化 hdfs

hdfs namenode -format
1

启动 dfs

start-dfs.sh
1

如果报 java 未找到 修改 hadoop.env.sh 文件

vim /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh
1

修改 JAVA_HOME 为 并同步

export JAVA_HOME=/opt/module/jdk1.8.0_212
1

启动 yarn

start-yarn.sh #在103机器上启动
1

# hdfs 存储多目录

若 HDFS 存储空间紧张,需要对 DataNode 进行磁盘扩展。

  1. 在 DataNode 节点增加磁盘并进行挂载。

    image-20211209154611618

  2. 在 hdfs-site.xml 文件中配置多目录,注意新挂载磁盘的访问权限问题。

    <property>
        <name>dfs.datanode.data.dir</name>
    <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
    </property>
    
    1
    2
    3
    4
  3. 增加磁盘后,保证每个目录数据均衡

    bin/start-balancer.sh –threshold 10
    
    1

    对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整。

    停止数据均衡命令:

    bin/stop-balancer.sh
    
    1

# LZO 压缩配置

# 编译

hadoop 本身并不支持 lzo 压缩,故需要使用 twitter 提供的 hadoop-lzo 开源组件。hadoop-lzo 需依赖 hadoop 和 lzo 进行编译,编译步骤如下

下载、安装并编译 LZO

sudo yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
tar -zxvf lzo-2.10.tar.gz
cd lzo-2.10
./configure -prefix=/usr/local/hadoop/lzo/
make
sudo make install
1
2
3
4
5
6
7

编译 hadoop-lzo 源码

下载 hadoop-lzo 的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip

解压之后,修改 pom.xml

<hadoop.current.version>2.7.2</hadoop.current.version>
1

声明两个临时环境变量

export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
1
2

编译打包

mvn package -Dmaven.test.skip=true
1

进入 target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的 hadoop-lzo 组件

# 使用

将编译好后的 hadoop-lzo-0.4.20.jar 放入 hadoop-3.1.3/share/hadoop/common/

cd /opt/software
mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-3.1.3/share/hadoop/common/
1
2

同步分发

xsync /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar
1

core-site.xml 增加配置支持 LZO 压缩

vim /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
1

添加以下内容

<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>

<property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

分发

xsync /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
1

重新启动集群

stop-all.sh
start-dfs.sh #102
start-yarn.sh  #103
1
2
3

# LZO 创建索引

将 bigtable.lzo(150M)上传到集群的 /input 目录

hadoop fs -mkdir /input
hadoop fs -put bigtable.lzo /input
1
2

执行 wordcount 程序

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output1
1

没有创建索引则 LZO 文件的切片只有一个

image-20211209163816027

创建索引

# hadoop jar 编译好的lzo jar包  类引用路径  lzo数据文件
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
1
2

image-20211210182357330

重新执行 wordcount

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output2
1

image-20211210182447681

# 基准测试

# 测试 HDFS 写性能

向 HDFS 写 10 个 128M 的文件

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB
1

image-20211210184417692

# 测试 HDFS 读性能

读取 HDFS 集群 10 个 128M 的文件

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB
1

image-20211210184740263

# 删除测试生成数据

测试数据默认存储在 /benchmarks/TestDFSIO 上

image-20211210184829949

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
1

# 使用 sort 测试 mapreduce

使用 RandomWriter 来产生随机数,每个节点运行 10 个 Map 任务,每个 Map 产生大约 1G 大小的二进制随机数

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data
1

执行 sort 程序

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
1

验证数据是否真正排好序了

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
1

# hadoop 参数调优

  • HDFS 参数调优 hdfs-site.xml

    dfs.namenode.handler.count=20 * log2(Cluster Size)
    
    1

    如集群规模为 8 台时,此参数设置为 60

    NameNode 有一个工作线程池,用来处理不同 DataNode 的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数 dfs.namenode.handler.count 的默认值 10。设置该值的一般原则是将其设置为集群大小的自然对数乘以 20,即 20logN,N 为集群大小

  • YARN 参数调优 yarn-site.xml

    • 情景描述:总共 7 台机器,每天几亿条数据,数据源 ->Flume->Kafka->HDFS->Hive

    • 面临问题:数据统计主要用 HiveSQL,没有数据倾斜 (每个 reduce 数据没有偏大或偏小),小文件已经做了合并处理,开启的 JVM 重用,而且 IO 没有阻塞,内存用了不到 50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

    • 解决办法:内存利用率不够。这个一般是 Yarn 的 2 个配置造成的,单个任务可以申请的最大内存大小,和 Hadoop 单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

      • yarn.nodemanager.resource.memory-mb

        表示该节点上 YARN 可使用的物理内存总量,默认是 8192(MB),注意,如果你的节点内存资源不够 8GB,则需要调减小这个值,而 YARN 不会智能的探测节点的物理内存总量。

      • yarn.scheduler.maximum-allocation-mb

        单个任务可申请的最多物理内存量,默认是 8192(MB)。

      • image-20211210192154377

  • Hadoop 宕机

    • 如果 MR 造成系统宕机。此时要控制 Yarn 同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是 8192MB)
    • 如果写入文件过量造成 NameNode 宕机。那么调高 Kafka 的存储大小,控制从 Kafka 到 HDFS 的写入速度。高峰期的时候用 Kafka 进行缓存,高峰期过去数据同步会自动跟上。

# zookeeper

集群规划

服务器 hadoop102 服务器 hadoop103 服务器 hadoop104
Zookeeper Zookeeper Zookeeper Zookeeper
tar -zxvf /opt/software/apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
mv /opt/module/apache-zookeeper-3.5.7-bin/ /opt/module/zookeeper

#环境变量
sudo vim /etc/profile.d/my_env.sh 
#追加
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin


#同步环境变量
source /etc/profile.d/my_env.sh 
sudo xsync /etc/profile.d/my_env.sh 

#将配置文件改名称为zoo.cfg
cd /opt/module/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg

#配置zookeeper文件
vim zoo.cfg 
#追加下内容
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
#修改数据存储位置
dataDir=/opt/module/zookeeper/zkData

cd /opt/module/zookeeper
mkdir zkData

#创建myid用于zookeeper标记机器
#编辑为102的2用于唯一标识 103为3 104为4
echo 2 > /opt/module/zookeeper/zkData/myid

#同步
xsync /opt/module/zookeeper/

#103
echo 3 > /opt/module/zookeeper/zkData/myid
#104
echo 4 > /opt/module/zookeeper/zkData/myid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

启动

#在三台机器上分别启动
zkServer.sh start
1
2

# 群起脚本

在 hadoop102 的 /home/atguigu/bin 目录下创建脚本

mkdir -p /home/atguigu/bin
cd /home/atguigu/bin
vim zk.sh
1
2
3
#!/bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo "------------- $i -------------"
        ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
    done 
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo "------------- $i -------------"
        ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
    done
};;
"status"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo "------------- $i -------------"
        ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
    done
};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

增加权限

chmod 755 zk.sh
zk.sh start
1
2

# 日志生成

将之前日志生成的带依赖 jar 上传到 102 上

# 代码参数说明

// 参数一:控制发送每条的延时时间,默认是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;

// 参数二:循环遍历次数
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
1
2
3
4
5

# 启动方式

启动方式有两种:

  1. 通过 jar -classpath 指定全类名启动

    java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain  >/opt/module/test.log
    
    1
  2. 通过 jar -jar 启动 必须打包时指定启动类类引用否则无法启动

    java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar  >/opt/module/test.log
    
    1

# 日志收集

我们通过上面方式启动 java 程序会收集控制台中的输出 但我们已经通过 logback 收集日志到指定路径文件了 我们无需保存控制台上的输出内容 可以通过将输出内容直接输出到 /dev/null 中

/dev/null 代表 linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称 “黑洞”。

名称 代码 操作符 Java 中表示 Linux 下文件描述符(Debian 为例)
标准输入 (stdin) 0 < 或 << System.in (opens new window) /dev/stdin -> /proc/self/fd/0 -> /dev/pts/0
标准输出 (stdout) 1 >, >>, 1> 或 1>> System.out /dev/stdout -> /proc/self/fd/1 -> /dev/pts/0
标准错误输出 (stderr) 2 2> 或 2>> System.err /dev/stderr -> /proc/self/fd/2 -> /dev/pts/0
java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 2>/dev/null 1>/dev/null
1

可以简写为

java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar  >/dev/null 2>&1
1

在 /tmp/logs 路径下查看生成的日志文件

cd /tmp/logs/
ls
1
2

# 集群日志生成脚本

先同步日志 jar 到集群其他机器上

xsync /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar
1

在 /home/atguigu/bin 目录下创建脚本 lg.sh

vim /home/atguigu/bin/lg.sh
1
#! /bin/bash

for i in hadoop102 hadoop103 
do
	ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done
1
2
3
4
5
6
chmod 755 /home/atguigu/bin/lg.sh
/home/atguigu/bin/lg.sh
1
2

# 集群时间同步修改脚本(非正规临时脚本)

在 /home/atguigu/bin 目录下创建脚本 dt.sh

vim /home/atguigu/bin/dt.sh
1
#!/bin/bash

for i in hadoop102 hadoop103 hadoop104
do
    echo "========== $i =========="
    ssh -t $i "sudo date -s $1"
done
1
2
3
4
5
6
7

ssh -t 通常用于 ssh 远程执行 sudo 命令

chmod 755 /home/atguigu/bin/dt.sh
/home/atguigu/bin/dt.sh 2020-03-10
1
2

# 集群所有进程查看脚本

在 /home/atguigu/bin 目录下创建脚本 xcall.sh

vim /home/atguigu/bin/xcall.sh
1
#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
    echo --------- $i ----------
    ssh $i "$*"
done
1
2
3
4
5
6
7
 chmod 755 /home/atguigu/bin/xcall.sh
 xcall.sh jps
1
2

# 采集日志 Flume

image-20211211153656826

集群规划:

服务器 hadoop102 服务器 hadoop103 服务器 hadoop104
Flume (采集日志) Flume Flume

# 安装

tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
cd /opt/module/
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
#将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar
sudo vim /etc/profile.d/my_env.sh 

#FLUME_HOME
export FLUME_HOME=/opt/module/flume
export PATH=$PATH:$FLUME_HOME/bin

source /etc/profile.d/my_env.sh 
1
2
3
4
5
6
7
8
9
10
11
12

同步

cd /opt/module/
xsync flume/
sudo xsync /etc/profile.d/my_env.sh 
1
2
3

# Flume 组件选择

  1. Source

(1)Taildir Source 相比 Exec Source、Spooling Directory Source 的优势

TailDir Source:** 断点续传、多目录。**Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。

Exec Source 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失。

Spooling Directory Source 监控目录,不支持断点续传。

(2)batchSize 大小如何设置?

答:Event 1K 左右时,500-1000 合适(默认为 100)

  1. Channel

采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据是存储在磁盘中。

注意在 Flume1.7 以前,Kafka Channel 很少有人使用,因为发现 parseAsFlumeEvent 这个配置起不了作用。也就是无论 parseAsFlumeEvent 配置为 true 还是 false,都会转为 Flume Event。这样的话,造成的结果是,会始终都把 Flume 的 headers 中的信息混合着内容一起写入 Kafka 的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

# Flume 配置

image-20211211154528000

file-flume-kafka.conf 文件

cd /opt/module/flume/conf
vim file-flume-kafka.conf
1
2
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

image-20211211160212975

# ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。

ETL 拦截器主要用于,过滤时间戳不合法和 Json 数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往 Kafka 的不

在 ide 创建 flume-interceptor 工程 导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

创建 com.atguigu.flume.interceptor 包 包下创建 LogETLInterceptor 类 并实现 Interceptor 接口

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //将event转换为string 方便处理
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        //判断日志是启动日志还是事件日志 并向header中添加标识
        if (log.contains("start")) {
            //清洗启动日志
            if (LogUtils.validateStart(log)) {
                return event;
            }
        } else {
            //清洗事件日志
            if (LogUtils.validateEvent(log)) {
                return event;
            }
        }
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> intercepts = new ArrayList<>();
        for (Event event : list) {
            Event intercept = intercept(event);
            if (intercept != null) {
                intercepts.add(intercept);
            }

        }
        return intercepts;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

LogUtils 工具类

package com.atguigu.flume.interceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {
    public static boolean validateStart(String log) {
        if (log == null) {
            return false;
        }

        //开头不是以{或结尾不是以}结束的 不是json字符串
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
            return false;
        }

        return true;
    }

    public static boolean validateEvent(String log) {

        if (log == null) {
            return false;
        }
        //以 | 切割
        String[] logConents = log.split("\\|");
        //判断长度
        if (logConents.length != 2) {
            return false;
        }
        //判断服务器时间是否够13位 并且判断是否全为数字
        if (logConents[0].length() != 13 || !NumberUtils.isDigits(logConents[0])) {
            return false;
        }

        //判断json完整性
        if (!logConents[1].trim().startsWith("{") || !logConents[1].trim().endsWith("}")) {
            return false;
        }


        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

分类型拦截器 LogTypeInterceptor

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //取出body数据
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        //header
        Map<String, String> headers = event.getHeaders();
        if (log.contains("start")) {
            headers.put("topic", "topic_start");
        } else {
            headers.put("topic", "topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> resultEvent = new ArrayList<>();
        for (Event event : list) {
            resultEvent.add(event);
        }
        return resultEvent;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

打包上传 jar 包到 flume 的 lib 中 选择不带依赖即可

同步分发

xsync /opt/module/flume/
1

# flume 启动脚本

在 /home/atguigu/bin 目录下创建脚本 f1.sh

vim /home/atguigu/bin/f1.sh
1
#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1  &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

说明 1:nohup,该命令可以在你退出帐户 / 关闭终端之后继续运行相应的进程。nohup 就是不挂起的意思,不挂断地运行命令。

说明 2:awk 默认分隔符为空格

说明 3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

cd /home/atguigu/bin/
chmod 755 f1.sh
f1.sh start
1
2
3

# Kafka

集群规划:

服务器 hadoop102 服务器 hadoop103 服务器 hadoop104
Kafka Kafka Kafka Kafka

要先启动 zookeeper

tar -zxvf /opt/software/kafka_2.11-2.4.1.tgz -C /opt/module/
cd /opt/module/
mv kafka_2.11-2.4.1/ kafka
cd kafka
mkdir logs

#环境变量
sudo vim /etc/profile.d/my_env.sh

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile.d/my_env.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14

配置 kafka

vim /opt/module/kafka/config/server.properties
1

除了删除功能是新添加属性 其他都是修改

#broker的全局唯一编号,不能重复
broker.id=0

#添加删除功能
#删除topic功能使能
delete.topic.enable=true

#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs

#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
1
2
3
4
5
6
7
8
9
10
11
12

同步

xsync /opt/module/kafka 
sudo xsync /etc/profile.d/my_env.sh
1
2

修改每个 kafka 中 broker.id

vim /opt/module/kafka/config/server.properties
#102 为 0, 103为1  , 104为2
1
2

# 群起脚本

在 /home/atguigu/bin 目录下创建脚本 kf.sh

vim /home/atguigu/bin/kf.sh
1
#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
    done
};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cd /home/atguigu/bin/
chmod 755 kf.sh
kf.sh start
1
2
3

# 创建 Kafka Topic

进入到 /opt/module/kafka/ 目录下分别创建:启动日志主题、事件日志主题

kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka  --create --replication-factor 1 --partitions 1 --topic topic_start
kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka   --create --replication-factor 1 --partitions 1 --topic topic_event
1
2

查询 topic 列表

kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
1

删除 topic

kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_start
kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_event
1
2

生产消息

kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start
1

消费消息

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
1

# 压力测试

用 Kafka 官方自带的脚本,对 Kafka 进行压测。Kafka 压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络 IO)。一般都是网络 IO 达到瓶颈。

# 生产者

kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
1

record-size 是一条信息有多大,单位是字节。

num-records 是总共发送多少条信息。

throughput 是每秒多少条信息,设成 - 1,表示不限流,可测出生产者最大吞吐量。

image-20211211214537511

本例中一共写入 10w 条消息,吞吐量为 7.05 MB/sec,每次写入的平均延迟为 408.12 毫秒,最大的延迟为 603.00 毫秒。

# 消费者

kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
1

--zookeeper 指定 zookeeper 的链接信息

--topic 指定 topic 的名称

--fetch-size 指定每次 fetch 的数据的大小

--messages 总共要消费的消息个数

image-20211211214836809

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
1

开始测试时间,测试结束数据,共消费数据 9.5367MB,吞吐量 0.7875MB/s****,共消费 100000 条,平均每秒消费 **8257 条。

# kafka 机器数量计算

Kafka 机器数量(经验公式)=2(峰值生产速度 * 副本数 / 100)+1*

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署 Kafka 的数量。

比如我们的峰值生产速度是 50M/s。副本数为 2。

Kafka 机器数量 = 2*(50*2/100)+ 1=3 台

# 消费 kafka 数据 flume

此 flume 用于消费 kafka 中的数据 上传至 hdfs 上

集群规划

服务器 hadoop102 服务器 hadoop103 服务器 hadoop104
Flume(消费 Kafka) Flume

image-20211211235139863

# 日志消费 flume 配置

目前是启动日志和事件日志写在同一个 config 配置中 建议拆分为两个配置文件分开运行 解耦

在 hadoop104 的 /opt/module/flume/conf 目录下创建 kafka-flume-hdfs.conf 文件

vim /opt/module/flume/conf/kafka-flume-hdfs.conf
1
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#控制消费速度 每秒5000条
a1.sources.r1.batchSize = 5000
#延迟时间如果指定毫秒数没有到达指定消费速度 同样消费速度
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
#checkpointDir为存储日志 dataDirs为存储数据位置 两目录可以指定一个多目录路径提高读写性能
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
#文件最大值
a1.channels.c2.maxFileSize = 2146435071
#数据容量
a1.channels.c2.capacity = 1000000
#等待时长 如果容量未达到指定大小,并且超过指定时间则提交到sink
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

## 不要产生大量小文件 以下3个配置属性满足任意一个则滚动到下一个文件
#等待多少秒才滚动到下一个文件 默认为30s 建议为3600s
a1.sinks.k1.hdfs.rollInterval = 10
#多少字节滚动一次 默认为1024
a1.sinks.k1.hdfs.rollSize = 134217728
#event写入的个数滚动 默认为10  0则为禁用
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是否原生文件 此处设置为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

# 设置压缩格式
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

image-20211212002541874

# Flume 组件选择

  1. FileChannel 和 MemoryChannel 区别

MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从失败中恢复数据。

企业选型:

  • 金融类公司、对钱要求非常准确的公司通常会选择 FileChannel
  • 传输的是普通日志信息(京东内部一天丢 100 万 - 200 万条,这是非常正常的),通常选择 MemoryChannel。
  1. FileChannel 优化

    通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。

    官方说明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

    checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据

  2. Sink:HDFS Sink

    (1)HDFS 存入大量小文件,有什么影响?

    ** 元数据层面:** 每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命

    ** 计算层面:** 默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

    (2)HDFS 小文件处理

    官方默认的这三个参数配置写入 HDFS 后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

    基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综合作用,效果如下:

    (1)文件在达到 128M 时会滚动生成新文件

    (2)文件创建超 3600 秒时会滚动生成新文件

# 日志消费 Flume 启动脚本

cd /home/atguigu/bin
vim f2.sh
1
2
#! /bin/bash

case $1 in
"start"){
        for i in hadoop104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"
        done
};;
"stop"){
        for i in hadoop104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
chmod 755 f2.sh
f2.sh start
f2.sh stop
1
2
3

# Flume 内存优化

如果启动消费 Flume 抛出如下异常则表示该机器最小内存不满足于 Flume 配置的最小启动内存

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
1
2

我们可以修改 Flume 的默认内存配置

vim /opt/module/flume/conf/flume-env.sh
1
#添加以下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
1
2

分发配置文件

xsync flume-env.sh
1

Flume 内存参数设置及优化

  1. JVM heap 一般设置为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)
  2. -Xmx 与 - Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 fullgc。
  3. -Xms 表示 JVM Heap (堆内存) 最小尺寸,初始分配;-Xmx 表示 JVM Heap (堆内存) 最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc。

# 采集通道启动 / 停止脚本

该脚本控制全部组件的启动和停止

cd /home/atguigu/bin
vim cluster.sh
1
2

注意要先启动 zookeeper 再起 kafka 关闭时候先关 kafka 再关 zookeeper

#! /bin/bash

case $1 in
"start"){
	echo " -------- 启动 集群 -------"

	echo " -------- 启动 hadoop集群 -------"
	/opt/module/hadoop-3.1.3/sbin/start-dfs.sh 
	ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"

	#启动 Zookeeper集群
	zk.sh start

sleep 4s;

	#启动 Flume采集集群
	f1.sh start

	#启动 Kafka采集集群
	kf.sh start

sleep 6s;

	#启动 Flume消费集群
	f2.sh start

	};;
"stop"){
    echo " -------- 停止 集群 -------"


    #停止 Flume消费集群
	f2.sh stop

	#停止 Kafka采集集群
	kf.sh stop

    sleep 6s;

	#停止 Flume采集集群
	f1.sh stop

	#停止 Zookeeper集群
	zk.sh stop

	echo " -------- 停止 hadoop集群 -------"
	ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
	/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh 
};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
chmod 755 cluster.sh
cluster.sh start
cluster.sh stop
1
2
3

# 采集模块总结

# Linux&Shell 相关总结

Linux 常用高级命令

序号 命令 命令解释
1 top 查看内存
2 df -h 查看磁盘存储情况
3 iotop 查看磁盘 IO 读写 (yum install iotop 安装)
4 iotop -o 直接查看比较高的磁盘读写程序
5 netstat -tunlp | grep 端口号 查看端口占用情况
6 uptime 查看报告系统运行时长及平均负载
7 ps aux 查看进程

Shell 常用工具

awk、sed、cut、sort

# Hadoop 相关总结

  1. Hadoop 默认不支持 LZO 压缩,如果需要支持 LZO 压缩,需要添加 jar 包,并在 hadoop 的 cores-site.xml 文件中添加相关压缩配置。需要掌握让 LZO 文件支持切片。

  2. Hadoop 常用端口号,50070,8088,19888,9000

  3. Hadoop 配置文件以及简单的 Hadoop 集群搭建。8 个配置文件

    core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers(2.7 为 slaves)

  4. 默认块大小

    • 集群模式 128m
    • 本地模式 32m
    • hadoop1.x 64m
    • 在业务开发中 hdfs 块大小通常设置为 128m 或 256m
    • hive 的文件块大小 256m
  5. 小文件问题

    1. 危害

      • 占用 namenode 内存 一个文件块占用 namenode 150 字节
      • 增加切片数 每个文件开启一个 maptask (默认内存为 1g)
    2. 解决办法

      • har 归档 自定义 FileInputFormat
      • combineInputformat 减少切片个数,进而减少的是 maptask
      • 开启 JVM 重用
  6. MapReduce

    • shuffle 优化
    • map 方法之后 reduce 方法之前 混洗过程
  7. Yarn

    1. 工作机制

    2. 调度器

      1. FIFO、容量、公平调度器 Apache 默认调度器:容量 CDH 默认调度器:公平调度器

      2. FIFO 调度器特点:单队列,先进先出,在企业开发中没人使用

      3. 容量调度器:支持多队列,先进来的任务优先享有资源

      4. 公平调度器:支持多队列,每个任务公平享有资源 并发度最高。

      5. 对并发度要求比较高,同时机器性能比较好,选择公平; 大公司 如果并发度不高,机器性能比较差,选择容量: 中小公司

      6. 生产环境下队列怎么创建? 容量调度器默认只有一个 default 队列; 按照框架名称:hive、spark、flink

        按照业务名称:登录、购物车、支付模块、部门 1、部门 2 (居多)

        好处:解耦、降低风险、可以实现任务降级(部门 1》部门 2》购物车)

# Zookeeper 相关总结

选举机制:

  • 半数机制 pax,安装奇数台
  • 10 台服务器几台:3 台
  • 20 台服务器几台:5 台
  • 100 台服务器几台:11 台
  • 不是越多越好,也不是越少越好。 如果多,通信时间长,效率低;如果太少,可靠性差。

常用命令:

  • ls、get、create

# Flume 相关总结

# 组成

Flume 组成,Put 事务,Take 事务

​ Taildir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。

​ File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。

​ Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。

​ Kafka Channel:减少了 Flume 的 Sink 阶段,提高了传输效率。

​ Source 到 Channel 是 Put 事务

​ Channel 到 Sink 是 Take 事务

# taildir source
  1. 断点续传、多目录
  2. 在 Apache flume1.7 之后产生的;如果是 CDH,1.6 之后;
  3. 自定义 source 实现断点续传的功能(只要能自定义,有很多想象空间了)
  4. taildir source 挂了怎么办? 不会丢失、可能产生数据重复
  5. 对重复数据怎么处理? 不处理; 处理:(自身:自定义 source 实现事务,额外增加了运算量)
    1. 在下一级处理:hive 的数仓里面(dwd 层,数据清洗 ETL)
    2. spark Streaming 里面处理 去重的手段:group by (id) 开窗(id), 只取窗口第一条
  6. 是否支持递归读取文件? 不支持;自定义 tail source (递归 + 读取) 消费 kafka + 上传 hdfs
# channel
  1. File Channel : 基于磁盘、效率低、可靠性高
  2. memoryChannel:基于内存、效率高、可靠性低
  3. KafkaChannel:数据存储在 Kafka 里面,基于磁盘、效率高于 memoryChannel+kafkasink, 因为省了 sink flume1.6 时 topic + 内容; 无论 true 还是 false 都不起作用; bug flume1.7 解决 bug, 被大家广泛使用;
  4. 在生产环境:如果下一级是 kafka 的话,优先选择 KafkaChannel; 如果不是 kafka,如果更关心可靠性选择 FileChannel;如果更关心性能,选择 memoryChannel
# HDFS sink
  1. 控制小文件:

    时间(1-2 小时)、大小(128m)、event 个数 (0 禁止)

  2. 压缩

    开启压缩流;指定压缩编码方式(lzop/snappy)

# 拦截器

  1. ETL(判断 json 的完整性 { }; 服务器时间(13 全数字)) 项目中自定义了:ETL 拦截器和区分类型拦截器。 用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些

  2. 分类型(启动日志、事件日志) kafka(的 topic 要满足下一级所又消费者) 一张表一个 topic

    商品列表、商品详情、商品点击
    广告
    点赞、评论、收藏
    后台活跃、通知
    故障
    启动日志
    
  3. 自定义拦截器步骤

    1. 定义一个类 实现 interceptor 接口
    2. 重写 4 个方法:初始化、关闭、单 event、多 event()
      • initialize 初始化
      • public Event intercept (Event event) 处理单个 Event
      • public List <Event> intercept(List <Event> events) 处理多个 Event,在这个方法中调用 Event intercept (Event event)
      • close 方法
    3. 创建一个静态内部类 Builder
      • 静态内部类,实现 Interceptor.Builder
  4. 拦截器不要行不行? 看具体业务需求决定

# 选择器

image-20220414220614503

rep(默认) mul(选择性发往下一级通道)

# 监控器

ganglia 发现尝试提交的次数 远远大于最终提交成功次数; 说明 flume 性能不行;

自身;提高自己的内存 4-6g flume_env.sh 外援:增加 flume 台数 服务器配置(16g/32g 8T)

# Flume 内存

开发中在 flume-env.sh 中设置 JVM heap 为 4G 或更高,部署在单独的服务器上(4 核 8 线程 16G 内存)

-Xmx 与 - Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 fullgc。

-Xms 表示 JVM Heap (堆内存) 最小尺寸,初始分配;-Xmx 表示 JVM Heap (堆内存) 最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发 fullgc。

# 优化

  1. File Channel 能多目录就多目录(要求在不同的磁盘),提高吞吐量 checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据

  2. HDFS Sink 控制小文件; 时间(1-2 小时)、大小(128m)、event 个数 (0 禁止)

    1. HDFS 存入大量小文件,有什么影响?

    ** 元数据层面:** 每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命

    ** 计算层面:** 默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

    1. HDFS 小文件处理

    官方默认的这三个参数配置写入 HDFS 后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

    基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综合作用,效果如下:

    (1)文件在达到 128M 时会滚动生成新文件

    (2)文件创建超 3600 秒时会滚动生成新文件

  3. 监控器

# Flume 采集数据会丢失吗?

不会,Channel 存储可以存储在 File 中,数据传输自身有事务。

# Kafka 相关总结

Kafka 架构

image-20220916084132502

基本信息:

  1. Kafka 组成:zk 里面存储 broker 信息 消费者信息 唯独没有生产者信息。 image-20220916104954380

  2. 搭建多少台 Kafka:2(生产者峰值生产速率 * 副本 / 100)+1 =3 2 *(生产者峰值生产速率 * 2/100)+ 1= 3 => 生产者峰值生产速率 < 50m/s 50m/s * 60 秒 = 3G

  3. 副本数:2 个居多、3 个

    • 好处:提高可靠性;
    • 坏处:增加了网络 IO
  4. 压测(生产者峰值生产速率) 消费速率

  5. 默认数据保存多久 默认保存 7 天 但在生产环境中一般设置为 3 天

  6. Kafka 的磁盘预留多大空间 100g 数据 * 2 个副本 * 3 天 / 0.7

  7. 数据量计算 100 万日活 1 个人 100 条日志 100 万 * 100 条 = 1 亿条 平均速度是的多少 1 亿条 /(24*3600s)=1150 条 /s 每秒多少 m 1 条日志 1k => 1m/s 生产环境,你的数据量什么时候达到峰值?618 1111 早上中午、晚上 晚上 8 点以后 只要不超过 50m/s 就行 20-30m/s

  8. 分区数设置多少? 先设置一个分区; 压测他的 峰值生产速率 tp;峰值消费速率 tc 用户有个期望的吞吐量 p p/min (tp,tc)= 分区数 P 100m/s tp 20m/s tc 30m/s

    100/20 = 5 个分区 企业当中分区数一般为 3-10 个 消费者要有对应的 CPU 核数

  9. ISR 主要解决 Leader 挂了谁当老大? 在 ISR 队列里面都有机会当老大;

    • 旧版:延迟时间和延迟条数;
    • 新版:延迟时间
  10. 分区分配策略

    • range(默认) 容易导致数据倾斜

      10 个 3 个分区 [0 1 2 3] [4 5 6] [7 8 9]

    • round robin 能够减少数据倾斜 hash 随机打散,再采用轮询的方式;

  11. 监控 Kafka

    • eagle
    • Kafkamanager
    • Kafkamontor
  12. Kafka 挂掉 重新启动

    短时间内数据进入 flume channel 长时间,日志服务器保存数据 30 天

  13. 丢失数据

    • ack = 0:发送过去数据,就不管了;可靠性最差,传输效率是最快的
    • ack = 1:发送过去数据,Leader 应答;可靠性一般;传输效率一般;
    • ack = -1:发送过去数据,Leader+follower 应答;可靠性最高;传输效率最低
    • 在生产环境,通常不会用 0:选择 1 的最多; 在绝大多数场景都是传输的普通日志,都是效率至上;选择 1 金融行业,和钱有关的行业,要选 - 1
  14. 重复数据

    • 幂等性 + 事务 + ack=-1 下一级去重(hive 的 dwd 层;sparksteaming)group by 和开窗 幂等性的是(单分区、会话内不重;Kafka 重启容易数据重复)
  15. 积压

    • 自身:增加分区,消费者 CPU 核数
    • 外援:增加消费者消费速度 flume ==> kafka ==> flume ==> hdfs batchsize 1000 条 ==>2000 条~3000 条
  16. 参数优化

    • 网络和 io 操作线程配置优化
      • 计算型任务线程数 num.network.threads =cpu核数+1 默认为 3
      • IO 密集型任务线程数 num.io.threads = CPU核数*2
    • log 数据文件刷盘策略
      • 每当 producer 写入 10000 条消息时,刷数据到磁盘 log.flush.interval.messages=10000
      • 每间隔 1 秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000
    • 日志保留策略配置
      • 日志保留时间小时 log.retention.hours=72 默认为 168 小时
    • 默认副本数
      • 这个参数指新创建一个 topic 时,默认的 Replica 数量,Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。 offsets.topic.replication.factor=3
    • Producer 优化(producer.properties)
      • 在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小。缓冲区满了之后可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定。 buffer.memory=33554432 (32m)
      • 默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和 Broker 的存储压力。 compression.type=none
    • Kafka 内存调整(kafka-server-start.sh)
      • 默认内存 1 个 G,生产环境尽量不要超过 6 个 G。 export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
  17. 其他

    • Kafka 高效读写数据
      1. Kafka 本身是分布式集群,同时采用分区技术,并发度高
      2. 顺序写磁盘
      3. 零复制技术
    • Kafka 支持传输
      • kafka 对于消息体的大小默认为单条最大值是 1M 但是在我们应用场景中,常常会出现一条消息大于 1M,如果不对 kafka 进行配置。则会出现生产者无法将消息推送到 kafka 或消费者无法去消费 kafka 里面的数据,这时我们就要对 kafka 进行以下配置:(server.properties) replica.fetch.max.byes=12582912 message.max.byes=11534336
    • # Kafka 过期数据清理

      保证数据没有被引用(没人消费他) 日志清理保存的策略只有 delete 和 compact 两种
      • log.cleanup.policy=delete 启用删除策略
      • log.cleanup.policy=compact 启用压缩策略
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
数据生成模块
电商业务简介

← 数据生成模块 电商业务简介→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式