Barrier主要用于ZooKeeper中的同步。
1、BarrierTest.java
package com.neohope.zookeeper.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
* Created by Hansen
*/
public class BarrierTest implements Watcher, Runnable {
static ZooKeeper zk = null;
static Object mutex;
String root;
int size;
String name;
/**
* 构造函数
*
* @param hostPort
* @param root
* @param name
* @param size
*/
BarrierTest(String hostPort, String root, String name, int size) {
this.root = root;
this.name = name;
this.size = size;
//创建连接
if (zk == null) {
try {
System.out.println("Begin Starting ZK:");
zk = new ZooKeeper(hostPort, 30000, this);
mutex = new Object();
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
// 创建barrier节点
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* exists回调函数
* @param event 发生的事件
* @see org.apache.zookeeper.Watcher
*/
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/**
* 新建节点,并等待其他节点被新建
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean enter() throws KeeperException, InterruptedException{
zk.create(root + "/" + name, "Hi".getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
System.out.println("Begin enter barier:" + name);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait();
} else {
System.out.println("Finished enter barier:" + name);
return true;
}
}
}
}
/**
* 新建节点,并等待其他节点被新建
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean doSomeThing()
{
System.out.println("Begin doSomeThing:" + name);
//do your job here
System.out.println("Finished doSomeThing:" + name);
return true;
}
/**
* 删除自己的节点,并等待其他节点被删除
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean leave() throws KeeperException, InterruptedException{
zk.delete(root + "/" + name, -1);
System.out.println("Begin leave barier:" + name);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
System.out.println("Finished leave barier:" + name);
return true;
}
}
}
}
/**
* 线程函数,等待DataMonitor退出
* @see java.lang.Runnable
*/
@Override
public void run() {
//进入barrier
try {
boolean flag = this.enter();
if (!flag) System.out.println("Error when entering the barrier");
} catch (KeeperException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//处理同步业务
try {
doSomeThing();
Thread.sleep(1000);
} catch (InterruptedException e) {
}
//离开barrier
try {
this.leave();
} catch (KeeperException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
/**
* 入口函数
* @param args
*/
public static void main(String args[]) throws IOException {
String hostPort = "localhost:2181";
String root = "/neohope/barrier";
try {
new Thread(new BarrierTest("127.0.0.1:2181", root,"001", 1)).start();
new Thread(new BarrierTest("127.0.0.1:2181", root,"002", 2)).start();
new Thread(new BarrierTest("127.0.0.1:2181", root,"003", 3)).start();
} catch (Exception e) {
e.printStackTrace();
}
System.in.read();
}
}
2、运行结果(由于Finished enter barier时,第一次同步已经结束了,所以是与Begin doSomeThing混在一起的)
Begin enter barier:001 Begin enter barier:003 Begin enter barier:002 Finished enter barier:001 Begin doSomeThing:001 Finished doSomeThing:001 Finished enter barier:002 Begin doSomeThing:002 Finished doSomeThing:002 Finished enter barier:003 Begin doSomeThing:003 Finished doSomeThing:003 Begin leave barier:002 Begin leave barier:001 Begin leave barier:003 Finished leave barier:002 Finished leave barier:003 Finished leave barier:001