Queue实现了生产者——消费者模式。
1、QueueTest.java
package com.neohope.zookeeper.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
* Created by Hansen
*/
public class QueueTest implements Watcher {
static ZooKeeper zk = null;
static Object mutex;
private String root;
/**
* 构造函数
* @param hostPort
* @param name
*/
QueueTest(String hostPort, String name) {
this.root = name;
//创建连接
if (zk == null) {
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(hostPort, 30000, this);
mutex = new Object();
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
// 创建root节点
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
}
/**
* exists回调函数
* @param event 发生的事件
* @see org.apache.zookeeper.Watcher
*/
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/**
* 添加任务队列
* @param i
* @return
*/
boolean produce(int i) throws KeeperException, InterruptedException {
String s = "element"+i;
zk.create(root + "/element", s.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
/**
* 从任务队列获取任务
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException {
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
//首先进行排序,找到id最小的任务编号
Integer min = Integer.MAX_VALUE;
for (String s : list) {
Integer tempValue = new Integer(s.substring(7));
if (tempValue < min) min = tempValue;
}
//从节点获取任务,处理,并删除节点
System.out.println("Processing task: " + root + "/element" + padLeft(min));
byte[] buff = zk.getData(root + "/element" + padLeft(min), false, stat);
System.out.println("The value in task is: " + new String(buff));
zk.delete(root + "/element" + padLeft(min), -1);
return min;
}
}
}
}
/**
* 格式化数字字符串
* @param num
*/
public static String padLeft(int num) {
return String.format("%010d", num);
}
/**
* 入口函数
* @param args
*/
public static void main(String args[]) {
String hostPort = "localhost:2181";
String root = "/neohope/queue";
int max = 10;
QueueTest q = new QueueTest(hostPort, root);
for (int i = 0; i < max; i++) {
try {
q.produce(i);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
}
for (int i = 0; i < max; i++) {
try {
int r = q.consume();
System.out.println("Item: " + r);
} catch (KeeperException ex) {
ex.printStackTrace();
break;
} catch (InterruptedException ex) {
ex.printStackTrace();
break;
}
}
}
}
2、尝试运行一下。