Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

  • 离线数仓

  • 青训营

    • 第四届青训营

      • SQL Optimizer 解析| 青训营笔记
      • 流/批/OLAP 一体的 Flink 引擎介绍| 青训营笔记
      • Exactly Once 语义在 Flink 中的实现| 青训营笔记
      • 流式计算中的 Window 计算| 青训营笔记
        • 概述
          • 流式计算和批式计算的对比
          • 处理时间vs事件时间
          • 事件时间窗口
          • Watermark(水平线)
        • Watermark(水平线)
          • 如何产生Watermark
          • 如何传递Watermark
          • Per-partition VS per-subtask watermark 生成
          • 部分 partition/subtask 断流
          • 迟到数据处理
          • 总结
        • Window(窗口)
          • 滚动窗口
          • 滑动窗口
          • 会话窗口
          • 迟到数据处理
          • 增量 VS 全量计算
          • EMIT 触发
          • 总结
        • Window - 高级优化
          • Mini-batch 优化
          • local-global
          • Distinct 计算状态复用
          • Pane 优化
          • 总结
        • 案例分析
          • 计算抖音的日活曲线
          • 使用 FIink SQL 计算大数据任务的资源使用
          • 总结
      • Spark 原理与实践| 青训营笔记
      • 大数据 Shuffle 原理与实践| 青训营笔记
      • Presto 架构原理与优化介绍| 青训营笔记
      • HDFS 原理与应用| 青训营笔记
      • HDFS 高可用与高扩展性机制分析| 青训营笔记
      • 深入浅出 HBase 实战| 青训营笔记
      • 数据湖三剑客:Delta Lake、Hudi 与 Iceberg| 青训营笔记
      • 从 Kafka 到 Pulsar 的数据流演进之路| 青训营笔记
      • Parquet 和 ORC:高性能列式存储| 青训营笔记
      • LSMT 存储引擎浅析| 青训营笔记
      • 浅谈分布式一致性协议| 青训营笔记
      • 走进 YARN 资源管理和调度| 青训营笔记
      • 深入理解 K8S 资源管理和调度| 青训营笔记
      • 实时数据中心建设思路与企业实践| 青训营笔记
      • 用户数据分析理论与最佳实践| 青训营笔记
      • 大数据可视化理论与案例分析| 青训营笔记
  • DolphinScheduler

  • Doris

  • 大数据
  • 青训营
  • 第四届青训营
Iekr
2022-07-30
目录

流式计算中的 Window 计算| 青训营笔记

# 流式计算中的 Window 计算| 青训营笔记

这是我参与「第四届青训营 」笔记创作活动的的第 4 天

# 概述

# 流式计算和批式计算的对比

批式计算 流式计算
数据存储 HDFS、Hive Kafka、Pulsar
数据时效性 天级别 分钟级别
准确性 精准 精准和时效性之间取舍
典型计算引擎 Hive、Spark、Flink Flink
计算模型 Exactly-Once At Least Once / Exactly-Once
资源模型 定时调度 长期持有
主要场景 离线天级别数据报表 实时数仓、实时营销、实时风控

批处理:

  • 批处理模型典型的数仓架构为 T+1 架构,即数据计算时天级别的,当天只能看到前一天的计算结果。

  • 通常使用的计算引擎为 Hive 或者 Spark 等。计算的时候,数据是完全 ready 的,输入和输出都是确定性的。 image-20220730040357776

  • 小时级批计算:将大作业的批处理拆分为每小时的批计算作业

    image-20220730041029971

    • 批计算每次需要申请调度资源,作业完成后释放资源,周期调度问题,消耗大量资源
    • 计算需要时间,并且每次作业时间不指定分钟 / 小时等,数仓是分层的可能无法在指定时间完成作业,而占用资源

实时计算:处理时间窗口 image-20220730041419769

  • 数据实时流动,实时计算,窗口结束直接发送结果,不需要周期调度任务。
  • 数据价值:实时性越高,数据价值越高。对海量的 “流” 式数据进行实时的处理。
  • 为了能够做到更实时,需要支持小时级的批计算出现。分钟级别的时效性,数据存储在消息队列中间件中,长期占有资源模型,主要应用在实时得应用场景:实时数仓、实时营销等。

# 处理时间 vs 事件时间

image-20220730041518937

  • 处理时间(Processing Time):数据在流式计算系统中真正处理时所在机器的当前时间,是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
  • 事件时间(Event Time):数据产生的时间就是事件创建的时间,它通常由事件中的时间戳描述,比如客户端、传感器、后端代码等上报数据的时间。Flink 通过时间戳分配器访问事件时间戳。
  • 进入时间(Ingestion Time):是数据进入 Flink 的时间。

image-20220730042812773

# 事件时间窗口

image-20220730042203134

  • 实时计算:事件时间窗口
    • 数据实时进入到真实时间发生的窗口中进行计算
    • 可以有效处理数据延迟和乱序
  • 什么时候窗口算结束?
    • 引用 watermark 来表示当前的真实时间
    • 数据存在乱序时,可以用来在乱序容忍和实时性间做一个平衡
    • 当收到 watermark 后有比 watermark 小的数据时认为是延时数据 舍弃

# Watermark(水平线)

在数据中插入一些 watermark,来表示当前的真实时间。

image-20220730042618158

在数据存在乱序的时候,watermark 就比较重要了,它可以用来在乱序容忍和实时性之间做一个平衡。

image-20220730042637927

# Watermark(水平线)

  • Watermark 是一种衡量 Event Time 进展的机制。
  • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
  • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
  • Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。

当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

下图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s-5s,窗口 2 是 6s-10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)

image-20220730081222640

Watermark 就是触发前一窗口的 “关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。

只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。

# 如何产生 Watermark

SQL:

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
)WITH(...);
1
2
3
4
5
6

DateStream:

WatermarkStrategy
        .<TupLe2<Long, String>>forBoundedOutOforderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.fo);
1
2
3

# 如何传递 Watermark

多个 Watermark 以最低 Watermark 为准

image-20220730050406388

# Per-partition VS per-subtask watermark 生成

  • Per-subtask watermark 生成
    • 早期版本都是这种机制。典型的问题是如果一个 source subtask 消费多个 partition,那么多个 Dartition 之间的数据读取可能会加剧乱,序程度。
  • Per-partition watermark 生成
    • 新版本引入了基于每个 partition 单独的 watermark 生成机制,这种机制可以有效避免上面的问题。

# 部分 partition/subtask 断流

根据上面提到的 watermark 传递机制,下游 subtask 会将上游所有 subtask 的 watermark 值的最小值作为自身的 watermark 值。如果上游有一个 subtask 的 watermark 不更新了,则下游的 watermark 都不更新。

解决方案: Idle source

当某个 subtask 断流超过配置的 idle 超时时间时,将当前 subtask 置为 idle,并下发一个 idle 的状态给下游。下游在计算自身 watermark 的时候,可以忽略掉当前是 idle 的那些 subtask。

# 迟到数据处理

因为 watermark 表示当前事件发生的真实时间,那晚于 watermark 的数据到来时,系统会认为这种数据是迟到的数据。

算子自身来决定如何处理迟到数据:

  • Window 聚合,默认会丢弃迟到数据
  • 双流 join,如果是 outer join,则可以认为它不能 join 到任何数据
  • CEP,默认丢弃

# 总结

  1. 含义:表示系统认为的当前真实时间
  2. 生成:可以通过 Watermark Generator 来生成
  3. 传递:取上游所有 subtask 的最小值
  4. 部分数据断流: ldle Source
  5. 迟到数据处理:Window 算子是丢弃;Join 算子认为跟之前的数据无法 join 到

# Window(窗口)

Window 类型:

  • Tumble Window (滚动窗口)
  • SlidingWindow(滑动窗口)
  • SessionWindow (会话窗口)

Window Api:

  • 自定义窗口(window function)
  • 时间窗口(TimeWindow)
  • 累计窗口(CountWindow)

image-20220730053104152

# 滚动窗口

image-20220730053420177

窗口划分:

  1. 每个 key 单独划分
  2. 每条数据只会属于一个窗口

窗口触发: Window 结束时间到达的时候一次性触发

# 滑动窗口

image-20220730053704979

窗口划分:

  1. 每个 key 单独划分
  2. 每条数据可能会属于多个窗口

窗口触发: Window 结束时间到达的时候一次性触发

# 会话窗口

image-20220730053841794

窗口划分:

  1. 每个 key 单独划分
  2. 每条数据会单独划分为一个窗口,如果 window 之间有交集,则会对窗口进行 merge

窗口触发:

Window 结束时间到达的时候一次性触发

# 迟到数据处理

怎么定义迟到?

一条数据到来后,会用 WindowAssigner 给它划分一个 window,一般时间窗口是一个时间区间,比如 [10:00,11:00),如果划分出来的 window end 比当前的 watermark 值还小,说明这个窗口已经触发了计算了,这条数据会被认为是迟到数据。

什么情况下会产生迟到数据?

只有事件时间下才会有迟到的数据。

处理方式:

  1. Allowlateness:这种方式需要设置一个允许迟到的时间。设置之后,窗口正常计算结束后,不会马上清理状态,而是会多保留 allowLateness 这么长时间,在这段时间内如果还有数据到来,则继续之前的状态进行计算。适用于:DataStream、SQL
  2. SideOutput (侧输出流):这种方式需要对迟到数据打一个 tag,然后在 DataStream 上根据这个 tag 获取到迟到数据流,然后业务层面自行选择进行处理。 适用于:DataStream
  3. 丢弃数据,这个是 Flink 迟到数据的默认处理。

# 增量 VS 全量计算

增量计算:

  • 每条数据到来,直接进行计算,window 只存储计算结果。比如计算 sum,状态中只需要存储 sum 的结果,不需要保存每条数据。
  • 典型的 reduce、aggregate 等函数都是增量计算
  • SQL 中的聚合只有增量计算

全量计算:

  • 每条数据到来,会存储到 window 的 state 中。等到 window 触发计算的时候,将所有数据拿出来一起计算。
  • 典型的 process 函数就是全量计算

# EMIT 触发

通常来讲,window 都是在结束的时候才能输出结果,比如 1h 的 tumble window,只有在 1 个小时结束的时候才能统一输出结果。

如果窗口比较大,比如 1h 或者 1 天,甚至于更大的话 **, 那计算结果输出的延迟就比较高,失去了实时计算的意义 **。

EMIT 输出指的是,在 window 没有结束的时候,提前把 window 计算的部分结果输出出来。


怎么实现?

在 DataStream 里面可以通过自定义 Trigger 来实现,Trigger 的结果可以是:

  • CONTINUE
  • FIRE (触发计算,但是不清理)
  • PURGE
  • FIRE_AND_PURGE

SQL 可以使用以下配置方式来实现

table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay={time}
1
2

# 总结

  1. 三种(滚动、滑动、会话)窗口的定义
  2. 迟到数据处理:AllowLateness、SideOutput
  3. 增量计算和全量计算模型公司章
  4. EMIT 触发提前输出窗口的结果

# Window - 高级优化

# Mini-batch 优化

当未开启 MiniBatch 时,Aggregate 的处理模式是每来一条数据,查询一次状态,进行聚合计算,然后写入一次状态。当有 4 条数据时,需要操作 2*4 次状态

在这里插入图片描述

当开启 MicroBatch 时,对于缓存下来的 N 条数据一起触发,同 key 的数据只会读写状态一次。例如下缓存的 4 条 A 的记录,只会对状态读写各一次。所以当数据的 key 的重复率越大,攒批的大小越大,那么对状态的访问会越少,得到的吞吐量越高。

image-20220730082147431

显然,Mini-Batch 机制会导致数据处理出现一定的延迟,用户需要自己权衡时效性和吞吐量的重要程度再决定。

Mini-Batch 聚合默认是关闭的。要开启它,可以设定如下 3 个参数。

val tEnv: TableEnvironment = ...
val configuration = tEnv.getConfig().getConfiguration()
 
configuration.setString("table.exec.mini-batch.enabled", "true")         // 启用
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")    // 缓存超时时长
configuration.setString("table.exec.mini-batch.size", "5000")            // 缓存大小
1
2
3
4
5
6

# local-global

Local-Global 其实就是自动利用两阶段聚合思想解决数据倾斜的优化方案(是不是很方便),与 MapReduce 中引入 Combiner 类似。

image-20220730065102324

要启用 Local-Global 聚合,需要在启用 Mini-Batch 的基础上指定如下参数。

configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE")
1

# Distinct 计算状态复用

SELECT
date_time,
shop_id,
COUNT (DISTINCT item_id) AS item_coll,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('iphone')) AS item_col2,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('android')) AS item_col3,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('pc')) AS item_col4,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('wap' )) AS item_co L5,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('other')) AS item_col6,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ( 'iphone', 'android' )) AS item_col7,
COUNT (DISTINCT item_id) FILTER (WHERE flag IN ('pc', 'other')) AS item_co18,
COUNT (DISTINCT item_id ) FILTER (WHERE flag IN ( 'iphone ' 'android', 'wap')) AS item_col9,
COUNT (DISTINCT item id) FILTER (WHERE flaq IN ('iphone', 'android', 'wap', 'pc', 'other')) As item col10
COUNT (DISTINCT visitor_id) AS visitor_col1,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('iphone')) AS visitor_col2,
COUNT (DISTINCT visitor_id)FILTER(WHERE flag IN ('android')) AS visitor_col3,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('pc')) AS visitor_col4,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('wap')) AS visitor_co15,
COUNT (DISTINCT visitor_id)FILTER(WHERE flag IN ('other' )) AS visitor_col6,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('iphone', 'android' )) AS visitor_col7,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('pc', 'other')) visitor_col8,
COUNT (DISTINCT visitor_id) FILTER (WHERE flag IN ('iphone', 'android', 'wap' ))AS visitor_col9,
COUNT (DISTINCT visitor_id)FILTER (WHERE flag IN ( 'iphone', 'android', 'wap', 'pc', 'other')) AS visitor_col10
FROM logs
GROUP BY date_time, shop_id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

image-20220730074615983

# Pane 优化

image-20220730074811430

# 总结

  1. Mini-batch 优化解决频繁访问状态的问题
  2. local-global 优化解决倾斜问题
  3. Distinct 状态复用降低状态量
  4. Pane 优化降低滑动窗口的状态存储量

# 案例分析

# 计算抖音的日活曲线

SELECT
COUNT(DISTINCT uid) as dau
TUMBLE_START(event_time, INTERVAL '1' DAY) as wstart,
LOCALTIMESTAMP AS current_ts
FROM user_activity
GROUP BY
TUMBLE(event_time, INTERVAL '1' DAY)
1
2
3
4
5
6
7

开启 EMIT 触发

table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=5min
1
2

上面的实现的问题是:所有数据都需要在一个 subtask 中完成窗口计算,无法并行计算。

我们通过两阶段聚合来把数据打散,完成第一轮聚合,第二轮聚合只需要对各个分桶的结果求和即可。(分桶为了打散数据并倾斜优化)

SELECT
    SUM(partial_cnt) as dau
    TUMBLE_START(event_time, INTERVAL '1' DAY) as wstart,
    LOCALTIMESTAMP ascurrent_ts
FROM (
    SELECT
        COUNT(DISTINCT uid) as partial_cnt,
        TUMBLE_ROWTIME(event_time, INTERVAL '1' DAY) as event_time
    FROM user_activity
    GROUP BY
        TUMBLE(event_time, INTERVAL, '1' DAY),
        MOD(uid, 100000) -- 根据uid分为10000个桶
)
GROUP BY TUMBLE(event_time, INTERVAL '1' DAY)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

开启 EMIT 触发,并开启 retract 撤回流

table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=5min
table.exec.window.allow-retract-input=true
1
2
3

# 使用 FIink SQL 计算大数据任务的资源使用

问题描述: 大数据任务(特指离线任务)运行时通常会有多个 container 启动并运行,每个 container 在运行结束的时候,YARN 会负责将它的资源使用 (CPU、内存)情况上报。一般大数据任务运行时间从几分钟到几小时不等。


需求: 根据 YARN 上报的各个 container 的信息,在任务结束的时候,尽快的计算出一个任务运行所消耗的总的资源。

假设前后两个 container 结束时间差不超过 10min


典型的可以通过会话窗口来将数据划分到一个 window 中,然后再将结果求和即可。

SELECT
    application_id
    SUM(cpu_usage) as cpu_total
    SUM(memory_usage) as memory_total,
FROM resource_usage
GROUP BY
    application_id,
    SESSION(event_time,INTERVAL '10' MINUTE)
1
2
3
4
5
6
7
8

# 总结

  1. 第一部分介绍了流式计算基本概念,以及和批式计算的区别
  2. 第二部分介绍了 watermark 的含义、如何生成、如何传递,以及如何处理部分 partition 断流的问题
  3. 第三部分介绍了三种基本的 window 的定义,以及迟到数据处理、增量计算 vS 全量计算、EMIT 输出; 同时也介绍了 local-global 优化、mini-batch 优化、distinct 状态优化、滑动窗口的 pane 的优化等
  4. 两个案例介绍滚动窗口、会话窗口,以及两阶段聚合解决倾斜问题
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Exactly Once 语义在 Flink 中的实现| 青训营笔记
Spark 原理与实践| 青训营笔记

← Exactly Once 语义在 Flink 中的实现| 青训营笔记 Spark 原理与实践| 青训营笔记→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式