Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

    • Flume
    • Flume基础架构
    • Flume安装
    • 入门案例
    • Flume 进阶
      • Flume 事务
      • Agent 内部原理
      • Flume 拓扑结构
        • 简单串联
        • 实现串联 输出到arvo
        • 复制和多路复用
        • 实现复制 selector=replicating
        • 实现多路复用 selector=multiplexing
        • 负载均衡和故障转移
        • 实现故障转移 processor=failover
        • 实现负载均衡 processor=load_balance
        • 聚合
        • 实现聚合
      • 拦截器 Interceptor
    • 自定义组件
    • 面试题
  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • Flume
Iekr
2021-11-11
目录

Flume 进阶

# Flume 进阶

# Flume 事务

image-20211111180016220

# Agent 内部原理

image-20211111183315320

  1. ChannelSelector

    ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。 默认为 Replicating(复制) ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。

  2. SinkProcessor

    SinkProcessor 共有三种类型,分别是 DefaultSinkProcessor、LoadBalancingSinkProcessor 和 FailoverSinkProcessor

    1. DefaultSinkProcessor 对应的是单个的 Sink
    2. LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,
    3. LoadBalancingSinkProcessor 可以实现负载均衡的功能
    4. FailoverSinkProcessor 可以错误恢复的功能。

# Flume 拓扑结构

# 简单串联

image-20211111184745477

这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

# 实现串联 输出到 arvo

flume1 配置文件

#agent1   netcatsource --> memorychannel --> arvosink
a1.sources = r1  
a1.sinks = k1
a1.channels = c1 

a1.sources.r1.type = netcat 
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 22222 

#设置为arvosink 向指定地址:端口输出数据
a1.sinks.k1.type = arvo 
#输出数据的地址 
a1.sinks.k1.hostname = hadoop103
#输出数据的地址
a1.sinks.k1.port = 33333

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

flume2 配置文件

#agent2   netcatsource --> memorychannel --> loggersink
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#输入数据类型改为 arvo
a1.sources.r1.type = arvo 
#输入地址
a1.sources.r1.bind = hadoop102
#输入端口
a1.sources.r1.port = 33333

#设置为logger 写入到log文件 持久化
a1.sinks.k1.type = logger

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

测试

#先启动103 否则102发送数据无人接收  在hadoop103操作 
flume-ng agent --conf conf/ --name a1 --conf-file datas/avrosource_loggersink.conf -Dflume.root.logger=INFO,console
1
2
#在hadoop102操作
flume-ng agent --conf conf/ --name a1 --conf-file datas/netcatsource_avrosink.conf -Dflume.root.logger=INFO,console
#在另外个ssh窗口中操作
nc hadoop102 22222
1
2
3
4

# 复制和多路复用

image-20211111184918332

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。

# 实现复制 selector=replicating

image-20211112073258004

从指定文件中读取日志 复制转发到个 channel 中 channel 再转发给指定的 sink 方

#agent 1  hadoop102
a1.sources = r1

a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = exec 
#读取hive日志文件
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
#selector频道选择器 默认为replicating 为复制 不配置type也是这个方案
a1.sources.r1.selector.type = replicating 
#可选的channel
#a1.sources.r1.selector.optional = c3

#设置为arvosink 向指定地址:端口输出数据
a1.sinks.k1.type = arvo 
#输出数据的地址 
a1.sinks.k1.hostname = hadoop103
#输出数据的地址
a1.sinks.k1.port = 33333

#第二个sinks
a1.sinks.k2.type = arvo 
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
#第二个channel
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000 
a1.channels.c2.transactionCapacity = 100 

#一个sources 对接两个channels
a1.sources.r1.channels = c1 c2
#每个sinks对应一个channel
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

sink 方 1 从 hadoop102 接收数据 再存储到 hdfs 中

#agent2 hadoop103
a1.sources = r1  
a1.sinks = k1
a1.channels = c1 

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 33333

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

sink2 从 hadoop102 接收数据 再通过 File_roll sink 存储到本地目录中

#agent3 hadoop104
a1.sources = r1  
a1.sinks = k1
a1.channels = c1 

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 44444

#将event数据存储在指定sink为file_roll 本地存储模式
a1.sinks.k1.type = file_roll
#存放目录 输出的本地目录必须是已经存在的目录
a1.sinks.k1.sink.directory = /opt/module/flume/demo
#默认为30s 滚动文件 设置为0将不再滚动
a1.sinks.k1.sink.rollInterval = 30

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

启动 先启动 103 和 104 的监听 再启动 102 的监听

# 实现多路复用 selector=multiplexing

agent1

#agent 1  hadoop102
a1.sources = r1

a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = exec 
#读取hive日志文件
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log


#复用配置
#selector频道选择器 默认为replicating  multiplexing为复用  需要配合拦截器使用
a1.sources.r1.selector.type = multiplexing 
# header的key 根据event的header里面指定key 判断值 分发给哪个channel 
a1.sources.r1.selector.header = state
#CZ为自定义value值 为上面指定key中对应值 如key中值为CZ 则分发给 c1 channel
a1.sources.r1.selector.mapping.CZ = c1
#值为US 则分发到 c2 channel
a1.sources.r1.selector.mapping.US = c2


# 设置拦截器 (用于向headers中添加指定键值对)
#拦截器名称
a1.sources.r1.interceptors = i1
#拦截器类型 static 向header添加 自定义键值对
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
#多个值只能通过自定义拦截器定义 此处是写死为CZ
a1.sources.r1.interceptors.i1.value = CZ


#设置为arvosink 向指定地址:端口输出数据
a1.sinks.k1.type = arvo 
#输出数据的地址 
a1.sinks.k1.hostname = hadoop103
#输出数据的地址
a1.sinks.k1.port = 33333

#第二个sinks
a1.sinks.k2.type = arvo 
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
#第二个channel
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000 
a1.channels.c2.transactionCapacity = 100 

#一个sources 对接两个channels
a1.sources.r1.channels = c1 c2
#每个sinks对应一个channel
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

agent2 和 agent3 与上面复制一样 或 自定义

# 负载均衡和故障转移

image-20211111185044758

Flume 支持使用将多个 sink 逻辑上分到一个 sink 组,sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。

# 实现故障转移 processor=failover

image-20211112101633912

#agent1 hadoop102
a1.sources = r1  
a1.sinks = k1 k2
a1.channels = c1 

a1.sources.r1.type = netcat 
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 22222

#sinks1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 33333
#sinks2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444

#定义sinkgroups
a1.sinkgroups = g1
#该组下面有哪些sink实例
a1.sinkgroups.g1.sinks = k1 k2
#failover为故障转移  默认为一对一
a1.sinkgroups.g1.processor.type = failover
#优先级 值越大优先级越大
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#sink连接超时时间 默认为30000毫秒
a1.sinkgroups.g1.processor.maxpenalty = 10000


a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
#sinks绑定的channel 应为一个
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 实现负载均衡 processor=load_balance

#agent1 hadoop102
a1.sources = r1  
a1.sinks = k1 k2
a1.channels = c1 

a1.sources.r1.type = netcat 
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 22222

#sinks1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 33333
#sinks2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 44444

#定义sinkgroups
a1.sinkgroups = g1
#该组下面有哪些sink实例
a1.sinkgroups.g1.sinks = k1 k2
#load_balance 为负载均衡
a1.sinkgroups.g1.processor.type = load_balance
# 默认为round_robin轮询sink    random为随机发给某个sink
a1.sinkgroups.g1.processor.selector = random
#连接超时时间 30000毫秒
a1.sinkgroups.g1.processor.maxpenalty = 10000


a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 


a1.sources.r1.channels = c1 
#sinks绑定的channel 应为一个
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 聚合

image-20211111185129828

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的 flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

# 实现聚合

image-20211112105001063

将分开的 agent 的 sink 全部汇总到一个 agent 上 再进行持久化

# 拦截器 Interceptor

更多类型拦截器查看官方文档

通过配置文件 配置拦截器 agent 名称.sources.r1.interceptors

a1.sources = r1
a1.sinks = k1
a1.channels = c1 

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 设置拦截器 (用于向headers中添加时间戳)
#拦截器名称
a1.sources.r1.interceptors = i1
#拦截器类型 timestamp 向header添加时间戳
a1.sources.r1.interceptors.i1.type = timestamp

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

image-20211111194553208

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
入门案例
自定义组件

← 入门案例 自定义组件→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式