Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • JavaSE

  • JavaEE

    • MyBatis
    • Jackson
    • Jedis
    • Maven
    • POI
    • Spring
    • Spring MVC
    • Maven 高级
    • Dubbo
    • Zookeeper
      • 安装
      • 数据模型
      • 服务端常用命令
      • 客户端常用命令
      • Curator
        • 获取客户端
        • 关闭客户端
        • 创建节点
        • 查询节点
        • 修改节点
        • 删除节点
        • Watch事件监听
        • NodeCache
        • PathChildrenCache
        • TreeCache
      • 分布式锁
        • Zookeeper分布式锁原理
        • Curator 实现分布式锁
      • 集群搭建
        • Leader选举
      • 模拟集群异常
      • 集群角色
    • Spring Security
    • Spring Boot
    • Spring Boot 高级
    • RabbitMQ
    • RabbitMQ 高级
    • Spring Cloud
    • Docker
    • ElasticSearch
    • ElasticSearch 高级
  • Linux

  • MySQL

  • NoSQL

  • Python

  • Python模块

  • 机器学习

  • 设计模式

  • 传智健康

  • 畅购商城

  • 博客项目

  • JVM

  • JUC

  • Golang

  • Kubernetes

  • 硅谷课堂

  • C

  • 源码

  • 神领物流

  • RocketMQ

  • 短链平台

  • 后端
  • JavaEE
Iekr
2021-09-14
目录

Zookeeper

# Zookeeper

Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务

是一个分布式 开源的分布式应用程序的协调服务

主要功能包括:配置管理 分布式锁 集群管理

# 安装

tar -zxvf /opt/software/apache-zookeeper-3.5.6-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.5.6-bin /opt/zookeeper
mkdir zkdata
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg

vim zoo.cfg
1
2
3
4
5
6
7

修改存储路径

dataDir=/opt/zookeeper/zkdata
1

启动

cd /opt/zookeeper/bin/
./zkServer.sh start
1
2

# 数据模型

image-20210914221354385

# 服务端常用命令

#启动
./zkServer.sh start
#状态
./zkServer.sh status
#停止
./zkServer.sh stop
#重启
./zkServer.sh restart
1
2
3
4
5
6
7
8

# 客户端常用命令

#连接  如果是本机直接忽略后面的参数 忽略连接必须为默认端口2181
./zkCli.sh -server localhost:2181
1
2
  • create [节点 -e -s -es] 路径 [内容] 创建文件
  • get [节点] 路径 获取文件内容
  • ls 路径 查看路径下的所有文件
  • set [节点] 路径 内容 修改文件内容
  • delete [节点] 路径 删除文件
  • deleteall [节点] 路径 删除该文件夹下的所有文件
  • quit 退出
  • help 帮助

image-20210914223057012

# Curator

ZooKeeper 客户端库

  • 原生 Java Api
  • ZkClient
  • Curator

https://curator.apache.org/

坐标

<dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>

        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# 获取客户端

        //第一种方式
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000,10);
        //多个服务端 connectString参数 地址用逗号隔开  连接超时时间  会话超时时间  重试策略
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.124:2181",
                60 * 1000, 15 * 1000, retryPolicy);
        client.start();  //开启连接
        client.close();
        //第二种方式  namespace为设置根节点路径
        client2 = CuratorFrameworkFactory.builder().connectString("192.168.130.124:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("itheima").build();
        client2.start();
1
2
3
4
5
6
7
8
9
10

# 关闭客户端

client2.close();
1

# 创建节点

//创建节点  节点默持久化的节点   内容默认将当前客户端ip作为内容
String path = client2.create().forPath("/app1");
System.out.println(path);
//带内容创建
client2.create().forPath("/app2","hello".getBytes(StandardCharsets.UTF_8));
//设置节点类型  使用withMode方法 值为一个枚举值
client2.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
//创建多级节点  creatingParentContainersIfNeeded如果父节点不存在则自动创建
client2.create().creatingParentContainersIfNeeded().forPath("/app4/p1");
1
2
3
4
5
6
7
8
9

# 查询节点

//查询节点
byte[] bytes = client2.getData().forPath("/app1");
System.out.println(new String(bytes));

//查询子节点
List<String> path = client2.getChildren().forPath("/app4");
System.out.println(path);

//查询节点状态信息  ls -s  需要一个Stat类
Stat stat =new Stat();
client2.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
1
2
3
4
5
6
7
8
9
10
11
12

# 修改节点

//修改内容
client2.setData().forPath("/app1","hello".getBytes(StandardCharsets.UTF_8));
//根据版本修改
Stat stat =new Stat();
client2.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
int version = stat.getVersion();
//withVersion  查询版本与之前版本是否相同 相同则修改 不相同则不执行
client2.setData().withVersion(version).forPath("/app1","hello world".getBytes(StandardCharsets.UTF_8));
1
2
3
4
5
6
7
8
9

# 删除节点

//删除节点
client2.delete().forPath("/app1");
//删除带有子节点的节点
client2.delete().deletingChildrenIfNeeded().forPath("/app4");
//必须删除成功
client2.delete().guaranteed().forPath("/app2");
//回调
client2.delete().guaranteed().inBackground(new BackgroundCallback() {
    @Override
    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
        //curatorEvent 存储删除后的信息 如路径 是否成功
        System.out.println(curatorEvent);
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# Watch 事件监听

ZooKeeper 允许用户在指定节点上注册一些 Watcher, 并且在一些特定事情触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特征

Watch 机制来实现了发布 / 订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者

Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听

  • NodeCache 只是监听某一个特点的节点
  • PathChildrenCache 监控一个 ZNdoe 的子节点
  • TreeCache 可以监控整个树上的所有节点 类似于 NodeCache 和 PathChildrenCache 结合

# NodeCache

//创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client2, "/app1");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        System.out.println("节点发生变化");
        //获取修改节点后的数据
        byte[] data = nodeCache.getCurrentData().getData();
        System.out.println(new String(data));
    }
});

//开启监听  如果为true 则开启监听时,加载缓冲数据
nodeCache.start(true);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

发生增删改操作都会触发监听 并且当此方法 (线程) 结束后不再监听 一般我们持续让此线程存活

# PathChildrenCache

//创建监听器
PathChildrenCache pathChildrenCache = new PathChildrenCache(client2,"/app2",true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
        System.out.println("子节点发生变化");
        System.out.println(pathChildrenCacheEvent);
        //监听子节点的数据变更,并获取到对应的数据
        //1.获取类型
        PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
        //判断类型
        if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
            byte[] data = pathChildrenCacheEvent.getData().getData();
            System.out.println(new String(data));
        }

    }
});
pathChildrenCache.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

当发生连接时 childEvent 会执行一次 我们可以通过 type 来判断

# TreeCache

//创建监听器
TreeCache treeCache = new TreeCache(client2, "/app2");
treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
        System.out.println("节点发生变化");
        byte[] data = treeCacheEvent.getData().getData();
        System.out.println(new String(data));
    }
});
treeCache.start();
1
2
3
4
5
6
7
8
9
10
11

# 分布式锁

在以前我们涉及并发同步的时候,我们往往采用 synchronized 或者 Lock 的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个 JVM 之下

我们分布式集群工作时,属于多 JVM 下的工作环境,跨 JVM 之间已经无法通过多线程的锁解决同步问题

分布式锁 ------ 处理跨机器的进程之间的数据同步问题

# Zookeeper 分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点

  1. 客户端获取锁时,在 lock 节点下创建临时顺序节点
  2. 然后获取 lock 下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就任务该客户端获取到了锁。使用完锁后,将该节点删除
  3. 如果发现自己创建的节点并非 lock 所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器, 监听删除事件
  4. 如果发现比自己小的那个节点被删除,则客户端的 Watcher 会收到相应的通知,此时再次判断自己创建的节点是否是 lock 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听

# Curator 实现分布式锁

在 Curator 中有五种锁方案

  • InterProcessSemaphoreMutex 分布式排它锁 (非可重入锁)
  • InterProcessMutex 分布式可重入排它锁
  • InterProcessReadWriteLock 分布式读写锁
  • InterProcessMultiLock 将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2 共享信号量

获取锁对象

private InterProcessMutex lock;

public Ticket() {
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
    CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.130.124:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("itheima").build();
    client.start();
    lock = new InterProcessMutex(client, "lock");
}
1
2
3
4
5
6
7
8

获取锁

lock.acquire(3, TimeUnit.SECONDS);
1

释放锁

lock.release();
1

# 集群搭建

先安装 ZooKeeper

在每个 ZooKeeper 的 data 目录下创建一个 myid 文件,内容分别是 1 2 3 这个文件就是记录每个服务器的 ID

#1 
echo "1" > /opt/zookeeper/zkdata/myid

#2
echo "2" > /opt/zookeeper/zkdata/myid

#3
echo "2" > /opt/zookeeper/zkdata/myid
1
2
3
4
5
6
7
8

配置每一台的 ZooKeeper 的 zoo.cfg

vim /opt/zookeeper/conf/zoo.cfg
1

添加集群地址 ip 和端口根据配置修改。后面为 设置的 myid

server.1=realtime-1:2888:3888
server.2=realtime-2:2888:3888
server.3=realtime-3:2888:3888
1
2
3

server. 服务器 ID = 服务器 IP 地址:服务器之间通讯端口:服务器之间投票选举端口

启动集群

#1
./opt/zookeeper/bin/zkServer.sh start
#2
./opt/zookeeper/bin/zkServer.sh start
#3
./opt/zookeeper/bin/zkServer.sh start
1
2
3
4
5
6

# Leader 选举

Serverid: 服务器 id 编号越大在选择算法的权重越大

Zxid: 数据 id 值越大数据越新 在选举中数据越新 权重越大

如果某台 ZooKeeper 获得超过半数的选票 则此 ZooKeeper 就可以成为 Leader 了

# 模拟集群异常

关闭 3 号集群

./opt/zookeeper/bin/zkServer.sh stop
1

1 号和 2 号状态没有变化

再关闭 1 号机器

./opt/zookeeper/bin/zkServer.sh stop
1

此时只有 2 号机器 并没有关闭 但处于休眠状态

再把 1 号机器重新启动

./opt/zookeeper/bin/zkServer.sh start
1

2 号机器重新运行,

再把 3 号重新开启,然后停掉 2 号

此时重新选举 leader 3 号超过半数票成为 leader

# 集群角色

  • Leader 领导者
    1. 处理事务请求
    2. 集群内部各服务器的调度者
  • Follower 跟随者
    1. 处理客户端非事务请求,转发事务请求给 Leader 服务器
    2. 参与 Leader 选举投票
  • Observer 观察者
    1. 处理客户端非事务请求,转发事务请求给 Leader 服务器

image-20210916111548520

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Dubbo
Spring Security

← Dubbo Spring Security→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式