1、Executor.java
package com.neohope.zookeeper.test; import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.io.UnsupportedEncodingException; /** * Created by Hansen */ public class Executor implements Runnable, DataMonitor.DataMonitorListener { DataMonitor dm; /** * 构造函数 * @param hostPort host:port * @param znode /xxx/yyy/zzz */ public Executor(String hostPort, String znode) throws KeeperException, IOException { dm = new DataMonitor(hostPort, znode, null, this); } /** * 线程函数,等待DataMonitor退出 * @see java.lang.Runnable */ @Override public void run() { try { synchronized (this) { while (!dm.bEnd) { wait(); } } } catch (InterruptedException e) { } } /** * 关闭zk连接 * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener */ @Override public void znodeConnectionClosing(int rc) { synchronized (this) { notifyAll(); } System.out.println("Connection is closing: "+ rc); } /** * znode节点状态或连接状态发生变化 * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener */ @Override public void znodeStatusUpdate(byte[] data) { if (data == null) { System.out.println("data is null"); } else { try { String s = new String(data,"UTF-8"); System.out.println("data is "+s); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } /** * 入口函数 * @param args */ public static void main(String[] args) throws IOException { String hostPort = "localhost:2181"; String znode = "/neohope/test"; try { new Executor(hostPort, znode).run(); } catch (Exception e) { e.printStackTrace(); } } }
2、DataMonitor.java
package com.neohope.zookeeper.test; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Arrays; /** * Created by Hansen */ public class DataMonitor implements Watcher, AsyncCallback.StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; DataMonitorListener listener; boolean bEnd; byte prevData[]; /** * 构造函数,并开始监视 * @param hostPort host:port * @param znode /xxx/yyy/zzz * @param chainedWatcher 传递事件到下一个Watcher * @param listener 回调对象 */ public DataMonitor(String hostPort, String znode, Watcher chainedWatcher, DataMonitorListener listener) throws IOException { this.zk = new ZooKeeper(hostPort, 30000, this); this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // 检查节点状态 zk.exists(znode, true, this, null); } /** * exists回调函数 * @param event 发生的事件 * @see org.apache.zookeeper.Watcher */ @Override public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // 连接状态发生变化 switch (event.getState()) { case SyncConnected: // 不需要做任何事情 break; case Expired: // 连接超时,关闭连接 System.out.println("SESSIONEXPIRED ending"); bEnd = true; listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue()); break; } } else { //节点状态发生变化 if (path != null && path.equals(znode)) { //检查节点状态 zk.exists(znode, true, this, null); } } //传递事件 if (chainedWatcher != null) { chainedWatcher.process(event); } } /** * exists回调函数 * @param rc zk返回值 * @param path 路径 * @param ctx Context * @param stat 状态 * * @see org.apache.zookeeper.AsyncCallback.StatCallback */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists = false; if(rc== KeeperException.Code.OK.intValue()) { //节点存在 exists = true; } else if(rc== KeeperException.Code.NONODE.intValue()){ //节点没有找到 exists = false; } else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){ //Session过期 bEnd = true; System.out.println("SESSIONEXPIRED ending"); listener.znodeConnectionClosing(rc); return; } else if( rc==KeeperException.Code.NOAUTH.intValue()) { //授权问题 bEnd = true; System.out.println("NOAUTH ending"); listener.znodeConnectionClosing(rc); return; } else { //重试 zk.exists(znode, true, this, null); return; } //获取数据 byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { return; } } //调用listener if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.znodeStatusUpdate(b); prevData = b; } } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * znode节点状态或连接状态发生变化 */ void znodeStatusUpdate(byte data[]); /** * 关闭zonde连接 * * @param rc ZooKeeper返回值 */ void znodeConnectionClosing(int rc); } }
3、运行Executor
4、运行zkCli.cmd
zkCli.cmd -server 127.0.0.1:2181 [zk: 127.0.0.1:2181(CONNECTED) 1] ls / [zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01 [zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02 [zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03 [zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test [zk: 127.0.0.1:2181(CONNECTED) 6] quit
5、观察Executor的输出