深入浅出ZooKeeper:功能、特性及核心实现

深入浅出系列

深入浅出ZooKeeper:功能、特性及核心实现

在分布式系统的世界里,有一个“隐形协调者”始终在默默发力——它就是ZooKeeper。无论是Hadoop、Kafka等大数据框架,还是Dubbo等微服务架构,都离不开它的支撑。很多开发者只知道它能实现分布式锁、服务注册,但很少深入了解其背后的设计逻辑:它的核心功能到底有哪些?独特特性是什么?又靠哪些架构和算法,实现了高可用、强一致性的承诺?今天这篇博客,就带你从零到一吃透ZooKeeper的核心逻辑。

一、先搞懂:ZooKeeper到底是什么?

ZooKeeper是一个开源的分布式协调服务,本质上是一个高性能、高可用的分布式键值存储系统,采用类似文件系统的树形结构组织数据,核心目标是为分布式应用提供简单易用的协调机制,封装复杂的分布式一致性问题,让开发者无需从零实现协调逻辑,专注于业务本身。它最初由雅虎开发,2010年成为Apache顶级项目,如今已成为分布式系统领域的基石组件。

简单来说,ZooKeeper就像分布式系统的“管家”,负责处理各个节点之间的“沟通协调”,解决分布式环境中常见的一致性、同步、配置管理等难题,确保整个分布式系统有序、稳定运行。

二、核心功能 (Core Functions):ZooKeeper能帮我们做什么?

ZooKeeper的功能围绕“分布式协调”展开,提供了一套标准化的分布式原语,覆盖分布式场景下的各类高频需求,具体分类及说明如下:

1. 统一命名服务:类似 DNS 的分布式命名系统,提供全局唯一标识,可用于全局ID生成、服务地址映射等场景

2. 配置管理:集中式配置存储与动态推送,支持配置变更实时通知,客户端无需重启即可加载最新配置

3. 集群管理:实时感知节点加入/退出,维护集群成员列表,实现节点状态的动态监控

4. 分布式锁:提供互斥机制,基于临时顺序节点实现,可实现互斥锁或读写锁,保障分布式环境下的资源协调控制

5. 队列管理:支持分布式队列(FIFO)和屏障(Barrier)模式,协调多个节点的同步执行(如等待所有节点就绪后再执行)

6. Master 选举:自动化的领导者选举机制,通过竞争创建临时节点实现,保障集群高可用,避免单点故障

7. 服务注册发现:服务提供者启动时注册自身信息(IP、端口等),消费者通过节点查询动态发现服务,无需硬编码地址

典型应用:Dubbo框架利用其实现服务注册发现,Kafka通过其完成Controller选举,Hadoop借助其实现NameNode HA故障转移,覆盖大数据、微服务等多个领域。

三、核心特点 (Key Characteristics):ZooKeeper的“过人之处”

ZooKeeper之所以能成为分布式协调的“首选工具”,核心在于它具备5个关键特性,这些特性共同保障了其高可用、强一致性和易用性,也是面试中的高频考点,具体如下:

1. 顺序一致性:同一客户端的请求按发送顺序执行,不会出现顺序错乱,由全局有序的事务ID(ZXID)提供支撑

2. 原子性:更新操作要么全部成功,要么全部失败,没有中间状态,避免集群数据不一致,由ZAB协议保障

3. 单一系统镜像:所有客户端无论连接到集群中的哪个节点,看到的数据视图都是一致的,不会出现数据偏差

4. 可靠性:更新一旦生效即持久化,直到被下一次更新覆盖,即使节点宕机重启,也能通过日志和快照恢复数据

5. 实时性:保证客户端最终能读到最新数据,数据变更会在几十到几百毫秒内被所有客户端感知,不保证实时但保证最终一致

6. 高可用:通过2N+1奇数节点部署实现,可容忍N个节点故障

7. 高性能:源于内存存储,读多写少场景下吞吐量极高,可通过Observer节点横向扩展读能力

四、核心架构 (Core Architecture):支撑特性的“底层骨架”

ZooKeeper的所有特性,都依赖其分布式集群架构和独特的数据模型实现。它采用主从架构(Leader-Follower-Observer),结合层次化ZNode数据模型,既保证一致性,又兼顾性能和扩展性,具体拆解如下:

4.1 整体架构

ZooKeeper集群采用去中心化的主从架构,无单点故障风险:集群中存在一个Leader节点、多个Follower节点,可根据需求添加Observer节点扩展读性能;所有写请求统一由Leader处理,读请求可由Follower或Observer处理,通过ZAB协议实现集群数据一致性。

4.2 节点角色

集群中各节点角色分工明确,协同保障服务稳定运行,具体职责如下:

A. Leader:处理所有写请求,发起事务提案,协调ZAB广播协议,主导Leader选举,确保集群数据一致性

B. Follower:处理读请求,参与Leader选举投票,接收Leader同步的数据,转发客户端写请求给Leader

C. Observer:处理读请求,不参与投票和Leader选举,只同步Leader数据,核心作用是扩展读性能、降低写延迟

4.3 数据模型

ZooKeeper采用类似文件系统的层次化树形命名空间,核心存储单元为ZNode,整个数据结构是一棵层级树,每个ZNode可存储少量数据(默认≤1MB,通常<1MB),适合存储配置、元数据等轻量信息,是实现各类协调功能的基础。 4.4 ZNode 类型

根据节点的生命周期、特性,ZNode分为6种类型,适配不同分布式场景,具体如下:

A. 持久节点 (Persistent):客户端断连后不删除,需手动执行删除操作,适合存储长期有效的配置信息

B. 临时节点 (Ephemeral):与客户端会话绑定,会话结束自动删除,常用于服务注册、节点状态监控

C. 持久顺序节点 (Persistent_Sequential):具备持久节点特性,创建时自动追加全局递增序号,保证节点名称唯一

D. 临时顺序节点 (Ephemeral_Sequential):具备临时节点特性,创建时自动追加全局递增序号,是实现分布式锁的核心

E. 容器节点 (Container):3.5.3+ 版本新增,当最后一个子节点被删除时,容器节点会自动清理

F. TTL 节点:带过期时间的持久节点,过期后自动删除,适合存储临时有效数据

4.5 关键架构设计原则(含请求处理流程)

ZooKeeper通过一系列设计原则,保障服务的高可用、高性能和可靠性,具体如下:

A. 集群节点部署:推荐部署奇数个节点(3、5、7个),遵循“2f+1”原则(f为允许故障的节点数),确保集群始终能形成多数派,避免脑裂问题。

B. 请求处理流程:
写请求:Follower接收写请求 → 转发给Leader → Leader发起提案 → 集群投票(多数派确认) → 提交日志 → 应用状态机 → 返回结果,全程由ZAB协议保障一致性。

C. 读请求:Follower或Observer直接返回本地数据(可能非最新,但保证单调一致性),无需经过Leader,确保读操作高性能。

D. 数据存储:采用“内存+磁盘”双重存储,内存存储全量ZNode树(快速响应读请求),磁盘通过事务日志(WAL)和快照(Snapshot)实现数据持久化,确保节点宕机可恢复。

E. 会话管理:客户端与集群通过TCP连接建立会话,由客户端心跳维持,超时后清除该会话创建的临时节点;支持自动重连和会话转移,连接不同节点可保持相同会话状态。

五、核心算法 (Core Algorithms):保障特性的“灵魂”

ZooKeeper的高可用、强一致性、顺序性等特性,核心依赖四大算法/协议,其中ZAB协议是核心,结合快速选举、2PC变种和数据同步算法,构成完整的一致性保障体系,具体如下:

5.1 ZAB 协议 (ZooKeeper Atomic Broadcast)

ZAB协议是ZooKeeper最核心的共识算法,本质是Paxos算法的工业级实现和优化,专门适配主从架构,核心作用是保证写操作的原子广播和顺序一致性,分为两个核心阶段:

1. 崩溃恢复 (Crash Recovery):Leader失效后,通过快速选举算法重新选举新Leader,新Leader同步自身数据到所有Follower/Observer,确保集群数据一致后,进入消息广播阶段。

2. 消息广播 (Message Broadcast):Leader接收写请求后,生成事务提案并广播给所有Follower,收集多数派ACK后提交事务,确保所有节点数据同步,流程类似2PC但经过优化。

5.2 Fast Leader Election (快速选举算法)

该算法是ZAB协议崩溃恢复阶段的核心实现,用于快速选举Leader,避免脑裂,确保选举出数据最新的节点,具体要素如下:

1. 选举轮次 (logicalclock):每轮选举对应一个唯一轮次标识,防止旧轮次投票干扰当前选举结果。

2. 投票内容:包含 (sid, zxid, epoch),即服务器ID、事务ID、Leader纪元,用于判断节点优先级。

3. 胜出规则:1) epoch(纪元)大者优先;2) zxid(事务ID)大者优先;3) sid(服务器ID)大者优先。

4. 终止条件:某节点获得超过半数集群节点的投票,且自身优先级最高,即终止选举成为新Leader。

优势:选举速度快(200ms~2s,依赖tickTime配置),能快速完成Leader故障转移,保障集群高可用。

5.3 2PC 变种 (两阶段提交)

ZAB协议的消息广播阶段采用2PC变种机制,优化了传统2PC的性能,具体流程如下:

1. 阶段一(准备阶段):Leader广播事务提案(Proposal),Follower接收后写入本地事务日志,并返回ACK确认。

2. 阶段二(提交阶段):Leader收到超过半数Follower的ACK后,发送Commit指令,自身先执行事务,再通知所有Follower和Observer执行事务。

优化点:无需等待所有节点ACK,仅需半数以上即可提交,牺牲部分严格一致性换取更高的可用性和性能。

5.4 数据同步算法

Leader与Follower/Observer之间的数据同步,根据节点数据差异大小,采用三种不同同步方式,确保同步效率和一致性:

1. DIFF 同步:场景为节点与Leader数据差异较小;机制为Leader发送节点缺失的差异事务日志,节点回放日志完成同步。

2. TRUNC+DIFF:场景为节点与Leader部分数据冲突;机制为先截断节点不一致的事务日志,再发送差异日志完成同步。

3. SNAP 同步:场景为数据差异过大或新加入节点;机制为Leader直接发送完整的内存快照,节点加载快照后再同步增量日志。

六、关键机制详解

6.1 监听机制 (Watcher)

Watcher机制是ZooKeeper核心的事件通知机制,用于实现配置推送、服务发现等功能,核心特点是一次性触发、轻量级,具体说明如下:

1. 监听内容:客户端可监听ZNode的各类变化,包括数据变更、子节点增减、节点删除。

2. 触发规则:一次性触发(One-time trigger),事件触发后Watcher自动移除,需重新注册才能继续监听。

3. 通知特性:服务端异步推送事件,保证通知顺序性(FIFO),无需客户端轮询,降低资源消耗。

4. 核心流程:客户端注册Watcher → 监听事件发生 → 服务端推送通知 → 客户端执行对应业务逻辑 → Watcher失效。

6.2 会话管理 (Session)

会话是客户端与ZooKeeper集群的连接载体,管理临时节点的生命周期,核心特性如下:

1. 会话超时:由客户端定期发送心跳包维持会话,超时后集群自动清除该会话创建的所有临时节点。

2. 会话重连:客户端与当前节点断开连接后,支持自动重连到集群中的其他正常节点。

3. 会话转移:重连到其他节点后,可保持相同的会话状态,不影响客户端业务逻辑。

6.3 ACL 权限控制

ZooKeeper提供细粒度的ACL(访问控制列表)权限控制,用于保护ZNode节点的安全性,避免未授权访问,具体权限如下:

1. CREATE(缩写c):允许创建该节点的子节点

2. DELETE(缩写d):允许删除该节点的子节点

3. READ(缩写r):允许读取该节点的数据和子节点列表

4. WRITE(缩写w):允许修改该节点的数据

5. ADMIN(缩写a):允许设置该节点的ACL权限

七、性能与可靠性设计

ZooKeeper通过一系列针对性设计,在保证一致性的同时,兼顾性能和可靠性,具体设计策略如下:

1. 读性能扩展:通过Observer节点横向扩展读能力,Observer不参与投票,仅处理读请求,提升整体读吞吐量。

2. 写性能优化:采用顺序写磁盘(事务日志)+ 内存数据库(ZKDatabase),顺序写比随机写效率更高,内存数据库快速响应请求。

3. 高可用:2N+1节点部署,容忍N个节点故障,Leader故障后快速选举新Leader,避免单点故障。

4. 数据持久化:通过事务日志(log)记录所有写操作,定期生成内存快照(snapshot),双重保障数据不丢失。

5. 快速恢复:节点重启时,先加载最新快照,再回放增量事务日志,快速恢复到故障前的状态。

八、典型应用场景

ZooKeeper的核心价值在于提供分布式协调能力,广泛应用于大数据、微服务等领域,具体场景及实现方式如下:

1. HBase:用于Master选举、元数据存储,保障HBase集群的高可用。

2. Kafka:用于Broker注册、Topic元数据存储、Controller选举,协调Kafka集群运行。

3. Dubbo:作为服务注册中心,实现服务提供者注册和消费者动态发现。

4. Hadoop:用于NameNode HA自动故障转移,避免NameNode单点故障。

5. 分布式锁:基于临时顺序节点 + Watcher监听,实现分布式环境下的资源互斥访问。

九、版本演进要点

ZooKeeper版本迭代过程中,不断优化性能、增加新特性,核心版本演进要点如下:

3.4.x:稳定版,完善Observer节点、ACL权限控制,是目前应用最广泛的版本。

3.5.x:支持动态重新配置、容器节点、SSL加密,提升集群灵活性和安全性。

3.6.x:新增持久化监听器(解决Watcher一次性触发问题)、流式快照,优化性能。

3.7.x+:性能优化,移除Jetty依赖,简化部署,提升稳定性。

十、与其他系统对比

ZooKeeper、etcd、Consul是分布式协调/配置存储领域的主流工具,三者在算法、数据模型、定位上各有侧重,具体对比如下:

1. 共识算法:ZooKeeper采用ZAB,etcd采用Raft,Consul采用Raft。

2. 数据模型:ZooKeeper为层次树形,etcd为扁平KV,Consul支持多模型。

3. 监听机制:ZooKeeper为Watcher(一次性),etcd为Watch(可持久),Consul为健康检查+Watch。

4. 定位:ZooKeeper侧重强一致协调,etcd侧重配置存储,Consul侧重服务发现+健康检查。

5. 性能侧重:ZooKeeper侧重读优化,etcd侧重读写均衡,Consul侧重服务网格集成。

十一、总结:ZooKeeper的核心价值

ZooKeeper 的核心价值在于通过 ZAB 协议 实现了高可用的分布式一致性协调,以层次化的 ZNode 数据模型为基础,配合临时节点+Watcher 机制,为分布式系统提供了可靠的状态同步、配置管理、leader 选举等基础设施能力。

其架构设计遵循”顺序一致性 + 最终一致性”的折中策略,在保证核心协调功能的同时,通过 Observer 等机制实现了读性能的水平扩展;通过事务日志和快照实现数据持久化,通过快速选举算法实现故障快速恢复,最终成为分布式系统中不可或缺的协调基石。

当然,ZooKeeper也有局限性:写性能受Leader瓶颈限制(单集群写TPS通常不超过1000)、单个ZNode数据上限默认1MB、Watcher机制为一次性触发等,实际使用时需结合业务场景合理设计,优先用于读多写少的分布式协调场景。

如果觉得这篇博客对你有帮助,欢迎点赞、收藏,也可以在评论区留言讨论你在使用ZooKeeper时遇到的问题~

ZooKeeper配置集群

以在同一台机器上的三个节点的集群为例:

1、在每个节点的zoo.cfg增加下面的配置(只给出了变动的部分)

dataDir=D:/Publish/ZooKeeper/node01
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node02
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node03
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888

2、在每个dataDir增加一个myid文件,内容分别为1,2,3

3、现在可以启动哦

4、如果是在不同的服务器上,则dataDir、clientPort及2888:3888都不需要变动,localhost换成对应的计算机名称或ip即可。我这里是在一台电脑上运行的,所以要避免路径及端口冲突。

ZooKeeper Queue(Java)

Queue实现了生产者——消费者模式。

1、QueueTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class QueueTest implements Watcher {
    static ZooKeeper zk = null;
    static Object mutex;
    private String root;

    /**
     * 构造函数
     * @param hostPort
     * @param name
     */
    QueueTest(String hostPort, String name) {
        this.root = name;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }

            // 创建root节点
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out.println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 添加任务队列
     * @param i
     * @return
     */
    boolean produce(int i) throws KeeperException, InterruptedException {
        String s = "element"+i;
        zk.create(root + "/element", s.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL);

        return true;
    }


    /**
     * 从任务队列获取任务
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException {
        Stat stat = null;

        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    //首先进行排序,找到id最小的任务编号
                    Integer min = Integer.MAX_VALUE;
                    for (String s : list) {
                        Integer tempValue = new Integer(s.substring(7));
                        if (tempValue < min) min = tempValue;
                    }

                    //从节点获取任务,处理,并删除节点
                    System.out.println("Processing task: " + root + "/element" + padLeft(min));
                    byte[] buff = zk.getData(root + "/element" + padLeft(min), false, stat);
                    System.out.println("The value in task is: " + new String(buff));
                    zk.delete(root + "/element" + padLeft(min), -1);

                    return min;
                }
            }
        }
    }

    /**
     * 格式化数字字符串
     * @param num
     */
    public static String padLeft(int num) {
        return String.format("%010d", num);
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) {
        String hostPort = "localhost:2181";
        String root = "/neohope/queue";
        int max = 10;
        QueueTest q = new QueueTest(hostPort, root);

        for (int i = 0; i < max; i++) {
            try {
                q.produce(i);
            } catch (KeeperException e) {

            } catch (InterruptedException e) {
            }
        }

        for (int i = 0; i < max; i++) {
            try {
                int r = q.consume();
                System.out.println("Item: " + r);
            } catch (KeeperException ex) {
                ex.printStackTrace();
                break;
            } catch (InterruptedException ex) {
                ex.printStackTrace();
                break;
            }
        }
    }
}

2、尝试运行一下。

ZooKeeper Barrier(Java)

Barrier主要用于ZooKeeper中的同步。

1、BarrierTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class BarrierTest implements Watcher, Runnable {
    static ZooKeeper zk = null;
    static Object mutex;
    String root;
    int size;
    String name;

    /**
     * 构造函数
     *
     * @param hostPort
     * @param root
     * @param name
     * @param size
     */
    BarrierTest(String hostPort, String root, String name, int size) {
        this.root = root;
        this.name = name;
        this.size = size;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Begin Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }

        // 创建barrier节点
        if (zk != null) {
            try {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: "
                                + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean enter() throws KeeperException, InterruptedException{
        zk.create(root + "/" + name, "Hi".getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);

        System.out.println("Begin enter barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);

                if (list.size() < size) {
                    mutex.wait();
                } else {
                    System.out.println("Finished enter barier:" + name);
                    return true;
                }
            }
        }
    }


    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean doSomeThing()
    {
        System.out.println("Begin doSomeThing:" + name);
        //do your job here
        System.out.println("Finished doSomeThing:" + name);
        return true;
    }

    /**
     * 删除自己的节点,并等待其他节点被删除
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */

    boolean leave() throws KeeperException, InterruptedException{
        zk.delete(root + "/" + name, -1);

        System.out.println("Begin leave barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    System.out.println("Finished leave barier:" + name);
                    return true;
                }
            }
        }
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        //进入barrier
        try {
            boolean flag = this.enter();
            if (!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        //处理同步业务
        try {
            doSomeThing();
            Thread.sleep(1000);
        } catch (InterruptedException e) {

        }

        //离开barrier
        try {
            this.leave();
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) throws IOException {
        String hostPort = "localhost:2181";
        String root = "/neohope/barrier";

        try {
            new Thread(new BarrierTest("127.0.0.1:2181", root,"001", 1)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"002", 2)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"003", 3)).start();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.in.read();
    }
}

2、运行结果(由于Finished enter barier时,第一次同步已经结束了,所以是与Begin doSomeThing混在一起的)

Begin enter barier:001
Begin enter barier:003
Begin enter barier:002

Finished enter barier:001
Begin doSomeThing:001
Finished doSomeThing:001
Finished enter barier:002
Begin doSomeThing:002
Finished doSomeThing:002
Finished enter barier:003
Begin doSomeThing:003
Finished doSomeThing:003

Begin leave barier:002
Begin leave barier:001
Begin leave barier:003
Finished leave barier:002
Finished leave barier:003
Finished leave barier:001

ZooKeeper DataPublisher(Java)

1、DataPublisher.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

/**
 * Created by Hansen
 */
public class DataPublisher {

    public void publishTest(String hostPort,String znode) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 30000, new Watcher() {
            public void process(WatchedEvent event) {
                //do nothing
            }});

        //删掉节点
        Stat stat =zk.exists(znode, false);
        if(stat!=null)
        {
            zk.delete(znode, -1);
        }

        //开始测试
        zk.create(znode,"test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        byte[] buff =zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.setData(znode,"test02".getBytes(), -1);
        buff = zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.delete(znode, -1);
        zk.close();
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        DataPublisher publisher = new DataPublisher();
        publisher.publishTest(hostPort,znode);
    }
}

2、与Zookeeper Watcher配合使用,试一下。

ZooKeeper Watcher(Java)

1、Executor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * Created by Hansen
 */
public class Executor implements Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;

    /**
     * 构造函数
     * @param hostPort  host:port
     * @param znode      /xxx/yyy/zzz
     */
    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        dm = new DataMonitor(hostPort, znode, null, this);
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!dm.bEnd) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    /**
     * 关闭zk连接
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeConnectionClosing(int rc) {
        synchronized (this) {
            notifyAll();
        }

        System.out.println("Connection is closing: "+ rc);
    }

    /**
     * znode节点状态或连接状态发生变化
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeStatusUpdate(byte[] data) {
        if (data == null) {
            System.out.println("data is null");
        } else {
            try {
                String s = new String(data,"UTF-8");
                System.out.println("data is "+s);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String[] args) throws IOException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、DataMonitor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Arrays;

/**
 * Created by Hansen
 */
public class DataMonitor implements Watcher, AsyncCallback.StatCallback {
    ZooKeeper zk;
    String znode;
    Watcher chainedWatcher;
    DataMonitorListener listener;

    boolean bEnd;
    byte prevData[];

    /**
     * 构造函数,并开始监视
     * @param hostPort          host:port
     * @param znode              /xxx/yyy/zzz
     * @param chainedWatcher   传递事件到下一个Watcher
     * @param listener          回调对象
     */
    public DataMonitor(String hostPort, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) throws IOException {
        this.zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;

        // 检查节点状态
        zk.exists(znode, true, this, null);
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    @Override
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // 连接状态发生变化
            switch (event.getState()) {
                case SyncConnected:
                    // 不需要做任何事情
                    break;
                case Expired:
                    // 连接超时,关闭连接
                    System.out.println("SESSIONEXPIRED ending");
                    bEnd = true;
                    listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            //节点状态发生变化
            if (path != null && path.equals(znode)) {
                //检查节点状态
                zk.exists(znode, true, this, null);
            }
        }

        //传递事件
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    /**
     * exists回调函数
     * @param rc     zk返回值
     * @param path   路径
     * @param ctx    Context
     * @param stat   状态
     *
     * @see org.apache.zookeeper.AsyncCallback.StatCallback
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists = false;
        if(rc== KeeperException.Code.OK.intValue()) {
            //节点存在
            exists = true;
        }
        else if(rc== KeeperException.Code.NONODE.intValue()){
            //节点没有找到
            exists = false;
        }
        else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){
            //Session过期
            bEnd = true;
            System.out.println("SESSIONEXPIRED ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else if( rc==KeeperException.Code.NOAUTH.intValue())
        {
            //授权问题
            bEnd = true;
            System.out.println("NOAUTH ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else
        {
            //重试
            zk.exists(znode, true, this, null);
            return;
        }

        //获取数据
        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        //调用listener
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.znodeStatusUpdate(b);
            prevData = b;
        }
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * znode节点状态或连接状态发生变化
         */
        void znodeStatusUpdate(byte data[]);

        /**
         * 关闭zonde连接
         *
         * @param rc ZooKeeper返回值
         */
        void znodeConnectionClosing(int rc);
    }
}

3、运行Executor

4、运行zkCli.cmd

zkCli.cmd -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01
[zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02
[zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test
[zk: 127.0.0.1:2181(CONNECTED) 6] quit

5、观察Executor的输出