Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • Hadoop

  • Zookeeper

  • Hive

  • Flume

  • Kafka

  • Azkaban

  • Hbase

  • Scala

  • Spark

  • Flink

    • Flink 简介
    • Flink 部署
      • 第一个 Word Count
        • 创建项目
        • 批处理
        • 流处理
        • 打包
      • Standalone 模式
        • 安装
      • Yarn 模式
        • Flink on Yarn
        • Session Cluster
        • Per Job Cluster
    • Flink 运行架构
    • Flink 流处理 API
    • Flink 中的 Window
    • Flink 时间语义与 Wartermark
    • ProcessFunction API
  • 离线数仓

  • 青训营

  • DolphinScheduler

  • Doris

  • 大数据
  • Flink
Iekr
2022-11-27
目录

Flink 部署

# Flink 部署

# 第一个 Word Count

# 创建项目

选择 Maven 原型创建项目

手动添加原型

image-20221127214241619

image-20221127214342329

image-20221127214356965

org.apache.flink
link-quickstart-scala
1.10.0
1
2
3

如果无法使用原型,则修改 pom.xml 文件导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <args>
                        <arg>-nobootcp</arg>
                    </args>
                </configuration>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

创建 在 src.main.scala 包创建 com.atguigu 包

# 批处理

object BathWordCount {
  case class WordWithCount(word: String, count: Int)

  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 从集合中创建数据源
    val stream = env.fromElements("hello world", "hello world", "hello world")
    val transformed = stream
      .flatMap(line => line.split(("\\s")))
      .map(w => WordWithCount(w, 1))
      .sum(1)

    // 将计算的结果输出到标准输出
    transformed.print()

    // 执行计算逻辑
    env.execute()

  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 流处理

创建 WordCount.scala

package com.atguigu

// 导入隐式类型转换 implicit
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * @author Iekr
 *         Date:  2022/11/28/0028 18:55 
 */
object WordCount {

  case class WordWithCount(word: String, count: Int)

  def main(args: Array[String]): Unit = {
    // 获取运行时环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置分区(并行任务)数量为1
    env.setParallelism(1)

    // 建立数据源
    // 需要先启动 nc -lk 9999 用于发送数据
    val stream = env.socketTextStream("localhost", 9999, '\n')

    // 写对流的转换处理逻辑
    val transformed: DataStream[WordWithCount] = stream
      //使用空格切分输入的字符串
      .flatMap(line => line.split("\\s"))
      // 类似与MR中的map
      .map(w => WordWithCount(w, 1))
      // 使用 word 字段进行分钟 类似 shuffle
      .keyBy(0)
      // 开了一个5s的滚动窗口
      .timeWindow(Time.seconds(5))
      //  针对count字段进行累加操作 类型mr中的reduce
      .sum(1)

    // 将计算的结果输出到标准输出
    transformed.print()


    // 执行计算逻辑
    env.execute()

  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 打包

image-20221128204306382

打包后的 jar 在

image-20221130162045967

上传不带依赖的即不带后缀的

# Standalone 模式

# 安装

解压安装

tar -zxvf /opt/software/flink-1.10.0-bin-scala_2.11.tgz -C /opt/module/
mv /opt/module/flink-1.10.0/ /opt/module/flink
1
2

添加环境变量

sudo vim /etc/profile.d/my_env.sh 
1

添加以下内容

#FLINK_HOME
export FLINK_HOME=/opt/module/flink
export PATH=$PATH:$FLINK_HOME/bin
1
2
3

修改 /conf/ flink-conf.yaml 文件

vim /opt/module/flink/conf/flink-conf.yaml
1

修改以下内容

jobmanager.rpc.address: hadoop102
1

修改 /conf/ slaves 文件

vim /opt/module/flink/conf/slaves
1

覆盖写入以下内容

hadoop103
hadoop104
1
2

分发到另外两台机器

xsync /opt/module/flink/
sudo xsync /etc/profile.d/my_env.sh 
1
2

启动

cd /opt/module/flink/bin/
./start-cluster.sh
1
2

image-20221128194355819

访问 http://hadoop102:8081 可以对 flink 集群和任务进行监控管理

image-20221128194156300

测试

flink run /opt/module/flink/examples/batch/WordCount.jar
1

image-20221128195611740

# Yarn 模式

# Flink on Yarn

Flink 提供了两种在 yarn 上运行的模式,分别为 Session-Cluster 和 Per-Job-Cluster 模式。

  1. Session-cluster 模式: tt.jpg Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。 在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。

  2. Per-Job-Cluster 模式:

    image-20221128195830495

    一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。 每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

# Session Cluster

启动 hadoop 集群

start-all.sh
1

将 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 依赖放到 flink 目录下的 lib 中

image-20221129184047100

启动 yarn-session

yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
1
  • -n (--container):TaskManager 的数量。
  • -s (--slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余。
  • -jm:JobManager 的内存(单位 MB)。
  • -tm:每个 taskmanager 的内存(单位 MB)。
  • -nm:yarn 的 appName (现在 yarn 的 ui 上的名字)。
  • -d:后台执行。

启动成功后端口是随机生成

image-20221129184725475

访问 http://hadoop102:39737 / 网页

image-20221129184748443

执行我们打包上去的 Word Count 程序

flink run -c com.atguigu.BathWordCount FlinkWC-1.0-SNAPSHOT.jar --host hadoop102 –port 9999
1

image-20221129185148248

# Per Job Cluster

启动 hadoop 集群

start-all.sh
1

不启动 yarn-session,直接执行 job

flink run –m yarn-cluster -c com.atguigu.WordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar
1
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Flink 简介
Flink 运行架构

← Flink 简介 Flink 运行架构→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式