ZooKeeper DataPublisher(Java)

1、DataPublisher.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

/**
 * Created by Hansen
 */
public class DataPublisher {

    public void publishTest(String hostPort,String znode) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 30000, new Watcher() {
            public void process(WatchedEvent event) {
                //do nothing
            }});

        //删掉节点
        Stat stat =zk.exists(znode, false);
        if(stat!=null)
        {
            zk.delete(znode, -1);
        }

        //开始测试
        zk.create(znode,"test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        byte[] buff =zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.setData(znode,"test02".getBytes(), -1);
        buff = zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.delete(znode, -1);
        zk.close();
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        DataPublisher publisher = new DataPublisher();
        publisher.publishTest(hostPort,znode);
    }
}

2、与Zookeeper Watcher配合使用,试一下。

ZooKeeper Watcher(Java)

1、Executor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * Created by Hansen
 */
public class Executor implements Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;

    /**
     * 构造函数
     * @param hostPort  host:port
     * @param znode      /xxx/yyy/zzz
     */
    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        dm = new DataMonitor(hostPort, znode, null, this);
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!dm.bEnd) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    /**
     * 关闭zk连接
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeConnectionClosing(int rc) {
        synchronized (this) {
            notifyAll();
        }

        System.out.println("Connection is closing: "+ rc);
    }

    /**
     * znode节点状态或连接状态发生变化
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeStatusUpdate(byte[] data) {
        if (data == null) {
            System.out.println("data is null");
        } else {
            try {
                String s = new String(data,"UTF-8");
                System.out.println("data is "+s);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String[] args) throws IOException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、DataMonitor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Arrays;

/**
 * Created by Hansen
 */
public class DataMonitor implements Watcher, AsyncCallback.StatCallback {
    ZooKeeper zk;
    String znode;
    Watcher chainedWatcher;
    DataMonitorListener listener;

    boolean bEnd;
    byte prevData[];

    /**
     * 构造函数,并开始监视
     * @param hostPort          host:port
     * @param znode              /xxx/yyy/zzz
     * @param chainedWatcher   传递事件到下一个Watcher
     * @param listener          回调对象
     */
    public DataMonitor(String hostPort, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) throws IOException {
        this.zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;

        // 检查节点状态
        zk.exists(znode, true, this, null);
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    @Override
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // 连接状态发生变化
            switch (event.getState()) {
                case SyncConnected:
                    // 不需要做任何事情
                    break;
                case Expired:
                    // 连接超时,关闭连接
                    System.out.println("SESSIONEXPIRED ending");
                    bEnd = true;
                    listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            //节点状态发生变化
            if (path != null && path.equals(znode)) {
                //检查节点状态
                zk.exists(znode, true, this, null);
            }
        }

        //传递事件
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    /**
     * exists回调函数
     * @param rc     zk返回值
     * @param path   路径
     * @param ctx    Context
     * @param stat   状态
     *
     * @see org.apache.zookeeper.AsyncCallback.StatCallback
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists = false;
        if(rc== KeeperException.Code.OK.intValue()) {
            //节点存在
            exists = true;
        }
        else if(rc== KeeperException.Code.NONODE.intValue()){
            //节点没有找到
            exists = false;
        }
        else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){
            //Session过期
            bEnd = true;
            System.out.println("SESSIONEXPIRED ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else if( rc==KeeperException.Code.NOAUTH.intValue())
        {
            //授权问题
            bEnd = true;
            System.out.println("NOAUTH ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else
        {
            //重试
            zk.exists(znode, true, this, null);
            return;
        }

        //获取数据
        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        //调用listener
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.znodeStatusUpdate(b);
            prevData = b;
        }
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * znode节点状态或连接状态发生变化
         */
        void znodeStatusUpdate(byte data[]);

        /**
         * 关闭zonde连接
         *
         * @param rc ZooKeeper返回值
         */
        void znodeConnectionClosing(int rc);
    }
}

3、运行Executor

4、运行zkCli.cmd

zkCli.cmd -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01
[zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02
[zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test
[zk: 127.0.0.1:2181(CONNECTED) 6] quit

5、观察Executor的输出

ZooKeeper增删改查(Shell)

1、语法

ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

2、示例

zkCli.cmd -server 127.0.0.1:2181

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 2] create /zktest test01
Created /zktest

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /
[zktest, zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test01
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x4
mtime = Sun May 01 09:34:09 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 5] set /zktest test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 7] delete /zktest

[zk: 127.0.0.1:2181(CONNECTED) 8] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 9] quit
Quitting...