Queue实现了生产者——消费者模式。
1、QueueTest.java
package com.neohope.zookeeper.test; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.charset.Charset; import java.util.List; /** * Created by Hansen */ public class QueueTest implements Watcher { static ZooKeeper zk = null; static Object mutex; private String root; /** * 构造函数 * @param hostPort * @param name */ QueueTest(String hostPort, String name) { this.root = name; //创建连接 if (zk == null) { try { System.out.println("Starting ZK:"); zk = new ZooKeeper(hostPort, 30000, this); mutex = new Object(); System.out.println("Finished starting ZK: " + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } // 创建root节点 if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } } /** * exists回调函数 * @param event 发生的事件 * @see org.apache.zookeeper.Watcher */ synchronized public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } /** * 添加任务队列 * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException { String s = "element"+i; zk.create(root + "/element", s.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * 从任务队列获取任务 * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException { Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } else { //首先进行排序,找到id最小的任务编号 Integer min = Integer.MAX_VALUE; for (String s : list) { Integer tempValue = new Integer(s.substring(7)); if (tempValue < min) min = tempValue; } //从节点获取任务,处理,并删除节点 System.out.println("Processing task: " + root + "/element" + padLeft(min)); byte[] buff = zk.getData(root + "/element" + padLeft(min), false, stat); System.out.println("The value in task is: " + new String(buff)); zk.delete(root + "/element" + padLeft(min), -1); return min; } } } } /** * 格式化数字字符串 * @param num */ public static String padLeft(int num) { return String.format("%010d", num); } /** * 入口函数 * @param args */ public static void main(String args[]) { String hostPort = "localhost:2181"; String root = "/neohope/queue"; int max = 10; QueueTest q = new QueueTest(hostPort, root); for (int i = 0; i < max; i++) { try { q.produce(i); } catch (KeeperException e) { } catch (InterruptedException e) { } } for (int i = 0; i < max; i++) { try { int r = q.consume(); System.out.println("Item: " + r); } catch (KeeperException ex) { ex.printStackTrace(); break; } catch (InterruptedException ex) { ex.printStackTrace(); break; } } } }
2、尝试运行一下。