Zookeeper 部署及基础原理

一、zookeeper简介

1.1 zookeeper 介绍

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。

1.2 zookeeper工作机制

Zookeeper从设计模式角度理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。

在这里插入图片描述

工作流程

  1. 服务器去zookeeper集群上注册信息,一般都是创建临时节点。
  2. 获取到当前在线服务器列表,并注册监听。
  3. 如果服务器出现问题,服务器节点下线,会通知zookeeper集群,就是zookeeper集群与这个服务器通信时间过长,就会,删除临时节点。
  4. zookeeper集群,将通知监听此节点的客服端,服务器已下线。
  5. 客服端随后做出反应。

1.3 zookeeper 特点

在这里插入图片描述

  1. zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群
  2. 集群中只要有半数以上节点存活,zookeeper集群就可以正常服务。所以zookeeper适合安装奇数台服务器。
  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个server,数据都是一致的。
  4. 更新请求顺序执行,来自同一个client的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么更新成功,要么失败。
  6. 实时性:在一定时间范围内,Client能读到最新数据。

1.4 数据结构

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

在这里插入图片描述

1.5 应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

  1. 统一命名服务

在这里插入图片描述

在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。

  1. 统一配置管理

在这里插入图片描述

分布式环境下,配置文件同步非常常见:

  • 一般要求一个集群中的所有节点,配置信息都是一致的,比如kafka集群。
  • 对配置文件修改后,希望能够快速同步到各个节点上。

配置文件管理科员交由zookeeper实现:

  • 可将配置信息写入zookeeper上的一个znode
  • 各个客户端服务器监听这个znode
  • 一旦znode中的数据被修改,zookeeper将通知各个客户端服务器。
  1. 统一集群管理

在这里插入图片描述

分布式环境下,实时掌握每个节点的状态是必要的。

  • 可根据节点实时状态做出一些调整

zookeeper可以实现实时监控节点状态变换

  • 可将节点信息写入zookeeper上的znode
  • 监听这个znode可获取它的实时状态变化
  1. 服务器动态上下线

在这里插入图片描述

  1. 软负载均衡

在这里插入图片描述

在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

二、安装zookeeper

2.1 官网地址

https://zookeeper.apache.org/

2.2 下载linux环境下安装的tar包

下载3.5.7版本,这个版本是最新稳定版本

在这里插入图片描述

在这里插入图片描述

2.3 上传到linux,解压缩

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
mv apache-zookeeper-3.5.7 zookeeper-3.5.7

# 添加环境变量到my_env.sh

2.4 修改zookeeper配置

  1. 将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg;
  2. 将zoo.cfg中的dataDir修改为/opt/module/zookeeper-3.5.7/zkData
  3. 创建zkData文件夹
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg

dataDir=/opt/module/zookeeper-3.5.7/zkData

mkdir zkData

2.5 启动zookeeper

在这里插入图片描述

2.6 配置参数解读

在这里插入图片描述

Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

  1. tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒

在这里插入图片描述

  1. initLimit = 10:LF初始通信时限

Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)

在这里插入图片描述

  1. syncLimit = 5:LF同步通信时限

Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。

在这里插入图片描述

  1. dataDir:保存Zookeeper中的数据

注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。

  1. clientPort = 2181:客户端连接端口,通常不做修改。

三、集群操作

3.1 集群安装

  1. 集群规划

在 hadoop102、hadoop103 和 hadoop104 三个节点上都部署 Zookeeper。

  1. 解压安装

(1)在 hadoop102 解压 Zookeeper 安装包到/opt/module/目录下

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

(2)修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7

mv apache-zookeeper-3.5.7-bin/zookeeper-3.5.7

  1. 配置服务器编号

(1)在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData

mkdir zkData

(2)在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件

vim myid

在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)

2

注意:添加 myid 文件,一定要在 Linux 里面创建,在 notepad++里面很可能乱码

(3)拷贝配置好的 zookeeper 到其他机器上

xsync zookeeper-3.5.7

并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4

  1. 配置zoo.cfg文件

    (1)重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg

    mv zoo_sample.cfg zoo.cfg

    (2)打开 zoo.cfg 文件

    vim zoo.cfg

    #修改数据存储路径配置

    dataDir=/opt/module/zookeeper-3.5.7/zkData

    #增加如下配置

    #######################cluster##########################

    server.2=hadoop102:2888:3888

    server.3=hadoop103:2888:3888

    server.4=hadoop104:2888:3888

    (3)配置参数解读

    server.A=B:C:D

    A 是一个数字,表示这个是第几号服务器;

    集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server

    B 是这个服务器的地址;

    C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;

    D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    (4)同步 zoo.cfg 配置文件

    xsync zoo.cfg

  2. ZK 集群启动停止脚本

    1. 在 hadoop102 的/home/atguigu/bin 目录下创建脚本

      vim zk.sh

      在脚本中编写以下内容

      #!/bin/bash
      case $1 in
      "start"){
      	for i in hadoop102 hadoop103 hadoop104
      	do
      		echo ---------- zookeeper $i 启动 ------------
      		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
      	done
      };;
      "stop"){
      	for i in hadoop102 hadoop103 hadoop104
      	do
      		echo ---------- zookeeper $i 停止 ------------ 
      		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
      	done
      };;
      "status"){
      	for i in hadoop102 hadoop103 hadoop104
      	do
      		echo ---------- zookeeper $i 状态 ------------ 
      		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
      	done
      };;
      esac
      
    2. 增加脚本执行权限,并分发脚本

      chmod 777 zk.sh

      xsync zk.sh

  3. 启动集群,查看集群状态

    zk.sh start

    zk.sh status

    在这里插入图片描述

3.2 选举机制(重点)

  1. 第一次启动

在这里插入图片描述

假设zookeeper集群中有5台服务器。下面服务器依次启动。

  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING

  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING;

  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;

  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;

  5. 服务器5启动,同4一样当小弟。

  6. 非第一次启动

在这里插入图片描述

  1. 当zookeeper集群中有一台服务器出现以下两种情况之一时,就会进入Leader选举:

    • 服务器初始化
    • 服务器运行期间无法和leader保持连接
  2. 当一台机器进入Leader选举流程时,当前集群可能处于以下两种状态:

    • 集群中本来就存在一个Leader。

      对于这种情况,机器试图选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,只需要和Leader机器建立连接,并进行状态同步即可。

    • 集群中确实不存在Leader。

      假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

                                            (EPOCH,ZXID,SID )
      

      SID为1、2、4的机器投票情况:(1,8,1)(1,8,2)(1,7,4)

      选举Leader规则:

      ① EPOCH大的直接胜出

      ② EPOCH相同,事务id大的胜出

      ③ 事务id相同,服务器id大的胜出

      所以最后2号当选Leader

    SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。

    ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑有关。

    Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

四、客户端命令行操作

常用命令

启动客户端

zkCli.sh -server hadoop102:2181

显示所有操作命令

[zk: hadoop102:2181(CONNECTED) 1] help

在这里插入图片描述

4.1 ZNode数据节点信息

  1. 查看当前znode中所包含的内容

    ls /

    在这里插入图片描述

  2. 查看当前节点详细数据

    ls -s /

    在这里插入图片描述

    参数解读:

    (1)czxid:创建节点的事务 zxid

    每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。

    (2)ctime:znode 被创建的毫秒数(从 1970 年开始)

    (3)mzxid:znode 最后更新的事务 zxid

    (4)mtime:znode 最后修改的毫秒数(从 1970 年开始)

    (5)pZxid:znode 最后更新的子节点 zxid

    (6)cversion:znode 子节点变化号,znode 子节点修改次数

    (7)dataversion:znode 数据变化号

    (8)aclVersion:znode 访问控制列表的变化号

    (9)ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。

    (10)dataLength:znode 的数据长度

    (11)numChildren:znode 子节点数量

4.2 节点类型

节点类型分为四类:

  1. 持久无序号
  2. 持久有序号
  3. 临时无序号
  4. 临时有序号

在这里插入图片描述

持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除。

短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除。

注意:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

  1. 创建持久节点

    # 创建持久无序号节点
    create /test "ce shi"
    
    create /test/test1 "ce shi1"
    
    # 注意:创建节点时,要赋值。
    
    # 获取节点的值
    get -s /test
    
    

在这里插入图片描述

  1. 创建持久带序号节点

    # 创建持久无序号节点
    create -s /test/test- "seq test"
    

    在这里插入图片描述

  2. 创建临时节点

    create -e /test/test2 "-e test"
    
  3. 创建临时带序号节点

    create -e -s /test/test2- "seq test"
    

退出客户端,再进入,会发现临时节点已经被删除。

4.3 监听器原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

在这里插入图片描述

  1. 监听节点值的变化

    1. 在一个客户端上注册监听/test节点数据的变化

      get -w /test

    2. 在另一个客户端上修改/test的数据

      set /test “new data”

    3. 查看监听的客户端的反应:

    在这里插入图片描述

    注意:在hadoop103再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。

  2. 监听节点的子节点的变化

    1. 在一个客户端上注册监听/test的子节点的变化

      ls -w /test

    2. 在另一个客户端上创建一个/test的子节点

      create -e -s /test/test- “new seq-node”

    3. 查看监听的客户端的反应:

    在这里插入图片描述

4.4 节点的删除和查看

  1. 删除节点

    delete /test/test1

  2. 递归删除节点

    deleteall /test

  3. 查看节点的状态

    stat /test

    在这里插入图片描述

五、客户端 API 操作

前提:保证 hadoop102、hadoop103、hadoop104 服务器上 Zookeeper 集群服务端启动。

5.1 IDEA环境搭建

  1. 创建一个项目:zookeeper

  2. 再pom文件中添加依赖

    	<dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.8.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.5.7</version>
            </dependency>
    				
    				<!--Curator 框架依赖 分布式锁-->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.3.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.3.0</version>
            </dependency>
        </dependencies>
    
  3. 拷贝log4j.properties文件到项目根目录

    需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

    log4j.rootLogger=INFO, stdout 
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n 
    log4j.appender.logfile=org.apache.log4j.FileAppender 
    log4j.appender.logfile.File=target/spring.log 
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
    
  4. 创建包名com.atguigu.zk

  5. 创建类名称zkClient

5.2 创建 ZooKeeper 客户端

package com.atroot.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

/**
 * @auther Kewei
 * @date 2021/10/4 14:51
 */

public class zkClient {

    private String connectString = "hadoop101:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                List<String> children = null;
                try {
                    children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });
    }

    @Test
    public void create() throws InterruptedException, KeeperException {
        String createNode = zkClient.create("/roottest", "test01".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }

    @Test
    public void getChildren() throws InterruptedException, KeeperException {
        List<String> children = zkClient.getChildren("/", true);

        for (String child : children) {
            System.out.println(child);
        }

        Thread.sleep(Long.MAX_VALUE);
    }

    @Test
    public void exits() throws InterruptedException, KeeperException {
        Stat stat = zkClient.exists("/test", false);

        System.out.println(stat == null ? "not exits " : "exit");
    }
}

5.3 客户端向服务端写数据流程

  1. 写数据请求发送给Leader节点
    1. 客户端client向leader节点发送写数据请求
    2. leader收到写数据请求后,自己会先写数据,顺便将写数据的请求发给一个follower,让它也进行写数据。
    3. follower写完数据,向leader作出回应,说自己已经写好了。此时已经有半数以上(2个)的服务器已经完成了写数据的操作。
    4. leader向客户端回应,说我们已经写好数据了。
    5. 最后leader通知还没有写数据的follower,让它们写数据,它们写完数据,作出相应。

在这里插入图片描述

  1. 写数据请求发送给follower节点
    1. 客户端client向follower节点发送写数据请求
    2. follower收到请求后,马上向leader发送写数据请求(因为follower没有权限)
    3. leader收到写数据请求后,自己会先写数据,顺便将写数据的请求发给一个follower,让它也进行写数据。
    4. follower写完数据,向leader作出回应,说自己已经写好了。此时已经有半数以上(2个)的服务器已经完成了写数据的操作。
    5. leader向客户端连接的follower回应,说我们已经写好数据了。
    6. follower向客户端回应,说已经写好数据了。
    7. 最后leader通知还没有写数据的follower,让它们写数据,它们写完数据,作出相应。

在这里插入图片描述

六、服务器动态上下线监听案例

6.1 需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

6.2 需求分析

在这里插入图片描述

6.3 具体实现

服务端向zookeeper注册代码

package com.atroot.zkcasel;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @auther Kewei
 * @date 2021/10/4 15:53
 */

public class DistributeServer {
    private String connectString = "hadoop101:2181";
    private int sessionTime = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeServer server = new DistributeServer();

        // 创建连接
        server.getConnect();

        // 注册
        server.regist(args[0]);

        // 业务逻辑
        server.besiness();
    }

    private void besiness() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws InterruptedException, KeeperException {
        String create = zk.create("/server/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname+" is online");
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

客户端监听代码

package com.atroot.zkcasel;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @auther Kewei
 * @date 2021/10/4 16:02
 */

public class DistributeClient {
    private String connectString = "hadoop101:2181";
    private int sessionTime = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeClient client = new DistributeClient();

        client.connect();

        client.getServerList();

        client.beniness();
    }

    private void beniness() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws InterruptedException, KeeperException {
        List<String> children = zk.getChildren("/server", true);

        ArrayList<String> servers = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData("/server/" + child, false, null);

            servers.add(new String(data));
        }

        System.out.println(servers);
    }

    private void connect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    getServerList();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

七、ZooKeeper 分布式锁案例

什么叫做分布式锁?

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

在这里插入图片描述

7.1 原生zookeeper代码实现分布式锁

分布式锁实现

package com.atroot.zkcase2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @auther Kewei
 * @date 2021/10/5 8:47
 */

public class DistributedLock {
    private final String connectString = "hadoop101:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {

        // 创建连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }

                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });

        // 等待前一个进程结束
        connectLatch.await();

        //判断节点是否存在
        Stat exists = zk.exists("/locks", false);

        if (exists == null){
            String s = zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 加锁
    public void zklock() throws InterruptedException, KeeperException {
        // 创建临时带序号的节点
        currentMode = zk.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 判断是否是最小节点
        List<String> children = zk.getChildren("/locks", false);

        if (children.size() == 1){
            return;
        }else {
            Collections.sort(children);
            String thisNode = currentMode.substring("/locks/".length());
            int index = children.indexOf(thisNode);

            if (index == -1){
                System.out.println("数据异常");
            }else if(index == 0){
                return;
            }else {
                waitPath = "/locks/"+children.get(index-1);
                zk.getData(waitPath,true,null);

                waitLatch.await();
            }
        }
    }

    // 解锁
    public void unzklock() throws InterruptedException, KeeperException {
        // 删除节点
        zk.delete(currentMode,-1);
    }
}

分布式锁测试

package com.atroot.zkcase2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * @auther Kewei
 * @date 2021/10/5 9:17
 */

public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zklock();
                    System.out.println("线程一 获取到锁");

                    Thread.sleep(3*1000);
                    lock1.unzklock();
                    System.out.println("线程一 释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zklock();
                    System.out.println("线程二 获取到锁");

                    Thread.sleep(3*1000);
                    lock2.unzklock();
                    System.out.println("线程二 释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

7.2 Curator 框架实现分布式锁案例

1)原生的 Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

代码实现

package com.atroot.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @auther Kewei
 * @date 2021/10/5 9:26
 */

public class CuratorLockTest {

    public static void main(String[] args) {

        //创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        //创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(3*1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(3*1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
								.connectString("hadoop101:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        curatorFramework.start();

        System.out.println("zookeeper 启动成功");
        return curatorFramework;
    }
}

八、原理问题

8.1 选举机制

半数机制,超过半数的投票通过,即通过。

  1. 第一次启动选举规则:

    投票过半数时,服务器 id 大的胜出

  2. 第二次启动选举规则:

    EPOCH 大的直接胜出

    EPOCH 相同,事务 id 大的胜出

    事务 id 相同,服务器 id 大的胜出

8.2 生产集群安装多少 zk 合适?

安装奇数台。

生产经验:

  • 10 台服务器:3 台 zk;
  • 20 台服务器:5 台 zk;
  • 100 台服务器:11 台 zk;
  • 200 台服务器:11 台 zk

服务器台数多:好处,提高可靠性;坏处:提高通信延时

8**.3 常用命令**

ls、get、create、delete

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

努力生活的黄先生

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值