About neohope

一直在努力,还没想过要放弃...

通过JMS初步理解JNDI

JNDI服务模型
jndi-model

1、服务端

package com.neohope.jndi.test;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.Hashtable;

/**
 * Created by Hansen on 2016/5/4.
 */
public class Server {
    private static InitialContext ctx;

    public static void initJNDI() {
        try {
            LocateRegistry.createRegistry(1234);
            final Hashtable jndiProperties = new Hashtable();
            jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.rmi.registry.RegistryContextFactory");
            jndiProperties.put(Context.PROVIDER_URL, "rmi://localhost:1234");
            ctx = new InitialContext(jndiProperties);
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }

    public static void bindJNDI(String name, Object obj) throws NamingException {
        ctx.bind(name, obj);
    }

    public static void unInitJNDI() throws NamingException {
        ctx.close();
    }

    public static void main(String[] args) throws NamingException, IOException {
        initJNDI();
        NeoMessage msg = new NeoMessage("Just A Message");
        bindJNDI("java:com/neohope/jndi/test01", msg);
        System.in.read();
        unInitJNDI();
    }
}

2、客户端

package com.neohope.jndi.test;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

/**
 * Created by Hansen
 */
public class Client {
    public static void main(String[] args) throws NamingException {
        final Hashtable jndiProperties = new Hashtable();
        jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.rmi.registry.RegistryContextFactory");
        jndiProperties.put(Context.PROVIDER_URL, "rmi://localhost:1234");

        InitialContext ctx = new InitialContext(jndiProperties);
        NeoMessage msg = (NeoMessage) ctx.lookup("java:com/neohope/jndi/test01");
        System.out.println(msg.message);
        ctx.close();
    }
}

3、NeoMessage

package com.neohope.jndi.test;

import java.io.Serializable;
import java.rmi.Remote;

/**
 * Created by Hansen
 */
public class NeoMessage implements Remote, Serializable {
    public String message = "";

    public NeoMessage(String message)
    {
        this.message = message;
    }
}

大家可以看出,在这个简单的例子中:
1、服务端仅仅是把数据生成好,放到了LocateRegistry中。
2、而客户端,通过JNDI查到消息,获取到了对应的数据。
3、LocateRegistry完成了跨JVM/主机通讯的任务

反过来思考一下,对于JNDIL是不是更清楚一些了呢?
那再思考一下,那J2EE容器中的数据源是如何统一管理的呢?

JBoss EAP JMS的调用(Queue)

通讯流程图
jms-point-to-point-model.png

首先是Server端的开发及设置:
1、增加一个用户:

bin\add-user.bat

用户名密码随便,但要属于guest组

2、启动Server

standalone.bat -server-config=standalone-full.xml 

3、新建Queue

jboss-cli.bat --connect
jms-queue add --queue-address=jmsQueue --entries=queue/jmsQueue,java:jboss/exported/jms/queue/jmsQueue

到这里服务端已经完成了。

然后是客户端的设置:
1、Sender

package com.neohope.jms.test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

/**
 * Created by Hansen
 */
public class TestQueueSender {
    public static void main(String[] args) throws NamingException, JMSException {

        final Hashtable jndiProperties = new Hashtable();
        jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        jndiProperties.put(Context.PROVIDER_URL, "remote://localhost:4447");
        final InitialContext ctx = new InitialContext(jndiProperties);

        QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/RemoteConnectionFactory");
        QueueConnection connection = factory.createQueueConnection("user001", "user001#");
        QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        Queue queue = (Queue) ctx.lookup("jms/queue/jmsQueue");
        TextMessage msg = session.createTextMessage("Queue Test Messagee");
        QueueSender sender = session.createSender(queue);
        sender.send(msg);


        session.close();
        connection.close();
    }
}

2、Receiver

package com.neohope.jms.test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.util.Hashtable;

/**
 * Created by Hansen
 */
public class TestQueueReceiver {
    public static void main(String[] args) throws NamingException, JMSException, IOException {

        final Hashtable jndiProperties = new Hashtable();
        jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        jndiProperties.put(Context.PROVIDER_URL, "remote://localhost:4447");
        final InitialContext ctx = new InitialContext(jndiProperties);

        QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("jms/RemoteConnectionFactory");
        QueueConnection connection = factory.createQueueConnection("user001", "user001#");
        QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        Queue queue = (Queue) ctx.lookup("jms/queue/jmsQueue");
        QueueReceiver receiver = session.createReceiver(queue);
        receiver.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                try{
                    TextMessage msg=(TextMessage)message;
                    System.out.println("Queue message received:"+msg.getText());
                }
                catch(JMSException e)
                {
                    System.out.println(e);
                }
            }});

        connection.start();
        System.in.read();

        session.close();
        connection.close();
    }
}

JBoss EAP JMS的调用(Topic)

通讯流程图
jms-publisher-subscriber-model.png

首先是Server端的开发及设置:
1、增加一个用户:

bin\add-user.bat

用户名密码随便,但要属于guest组

2、启动Server

standalone.bat -server-config=standalone-full.xml 

3、新建Topic

jboss-cli.bat --connect
jms-topic add --topic-address=jmsTopic --entries=topic/jmsTopic,java:jboss/exported/jms/topic/jmsTopic

到这里服务端已经完成了。

然后是客户端的设置:
1、Publisher

package com.neohope.jms.test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

/**
 * Created by Hansen
 */
public class TestTopicPublisher {
    public static void main(String[] args) throws NamingException, JMSException {
        final Hashtable jndiProperties = new Hashtable();
        jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        jndiProperties.put(Context.PROVIDER_URL, "remote://localhost:4447");
        final InitialContext ctx = new InitialContext(jndiProperties);


        TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("jms/RemoteConnectionFactory");
        TopicConnection connection = factory.createTopicConnection("user001", "user001#");
        TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

        Topic topic = (Topic) ctx.lookup("jms/topic/jmsTopic");
        TextMessage msg = session.createTextMessage("Topic Test Message");
        TopicPublisher publisher = session.createPublisher(topic);
        publisher.publish(msg);

        session.close();
        connection.close();
    }
}

2、Subscriber

package com.neohope.jms.test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.IOException;
import java.util.Hashtable;

/**
 * Created by Hansen
 */
public class TestTopicSubscriber {
    public static void main(String[] args) throws NamingException, JMSException, IOException {
        final Hashtable jndiProperties = new Hashtable();
        jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
        jndiProperties.put(Context.PROVIDER_URL, "remote://localhost:4447");
        final InitialContext ctx = new InitialContext(jndiProperties);


        TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("jms/RemoteConnectionFactory");
        TopicConnection connection = factory.createTopicConnection("user001", "user001#");
        TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

        Topic topic = (Topic) ctx.lookup("jms/topic/jmsTopic");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        subscriber.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                try {
                    TextMessage msg = (TextMessage) message;
                    System.out.println("Topic message received:" + msg.getText());
                } catch (JMSException e) {
                    System.out.println(e);
                }
            }
        });

        connection.start();
        System.in.read();

        session.close();
        connection.close();
    }
}

导致惨重代价的Bug

导致惨重代价的Bug

纪念“瞳”解体事件

千年虫事件
在上个世纪,软件行业初期,计算机硬件资源十分昂贵,很多软件为了节省内存会省略掉代表年份的前两位数字”19”,或默认前两位为”19”。按这个规则,1999年12月31日过后,系统日期会更新为1900年1月1日而不是2000年1月1日,这样可能意味着无数的灾难事件,导致数据丢失,系统异常或更加严重的灾难。

幸好大家发现的早,最终全球花了上亿的美元用来升级系统,没有引起毁灭性后果。

水手1号探测器
1962年7月28日,水手1号空间探测器发射升空,但由于程序编码中的轨道计算公式是错误的,导致火箭轨道偏离预定路线。最终在大西洋上空自爆。

南极臭氧层测绘事件
1978年,NASA启动臭氧层测绘的计划。但在设计时,用于该计划的数据分析软件忽略了和预测值有很大差距的数据。直到1985年,才发现南极洲上方的臭氧层空洞,但是英国科学家先发现的。直到NASA重新检测它们的数据,才发现这一错误。在修正错误后,NASA证实南极臭氧层的确有个很大的空洞。

反导系统误报事件
1980年,北美防空联合司令部曾报告称美国遭受导弹袭击。后来证实,这是反馈系统的电路故障问题,但反馈系统软件没有考虑故障问题引发的误报。

1983年,苏联卫星报告有美国导弹入侵,但主管官员的直觉告诉他这是误报。后来事实证明的确是误报。

幸亏这些误报没有激活“核按钮”。在上述两个案例中,如果对方真的发起反击,核战争将全面爆发,后果不堪设想。

辐射治疗超标事件
1985到1987年,Therac-25辐射治疗设备卷入多宗因辐射剂量严重超标引发的医疗事故,其罪魁祸首是医疗设备软件的Bug。据统计,大量患者接受高达100倍的预定剂量的放射治疗,其中至少5人直接死于辐射剂量超标。

AT&T网络终端事件
1990年1月15日,纽约60万用户9个小时无法使用电话服务。原因是,AT&T交换机从故障中恢复后,就会发送一条特殊的小时给临近的设备,但在新版本软件中,这条消息会导致电话交换机重启。于是,每6秒,所有交换机都会重启一次。最后,将程序换回了上一个版本,解决了问题。

宰赫兰导弹事件
在1991年2月的第一次海湾战争中,一枚伊拉克发射的飞毛腿导弹准确击中美国在沙地阿拉伯的宰赫兰基地,当场炸死28个美国士兵,炸伤100多人,造成美军海湾战争中唯一一次伤亡超过百人的损失。

在后来的调查中发现,由于一个简单的计算机bug,使基地的爱国者反导弹系统失效,未能在空中拦截飞毛腿导弹。当时,负责防卫该基地的爱国者反导弹系统已经连续工作了100个小时,每工作一个小时,系统内的时钟会有一个微小的毫秒级延迟,这就是这个失效悲剧的根源。爱国者反导弹系统的时钟寄存器设计为24位,因而时间的精度也只限于24位的精度。在长时间的工作后,这个微小的精度误差被渐渐放大。在工作了100小时后,系统时间的延迟是三分之一秒。

对一般人人来说,0.33秒是微不足道的。但是对一个需要跟踪并摧毁一枚空中飞弹的雷达系统来说,这是灾难性的——侯赛因飞毛腿导弹空速达4.2马赫(每秒1.5公里),这个“微不足道的”0.33秒相当于大约600米的误差。在宰赫兰导弹事件中,雷达在空中发现了导弹,但是由于时钟误差没有能够准确地跟踪它,因此基地的反导弹并没有发射。

Intel奔腾处理器浮点错误
1993年。Intel奔腾处理器在计算特定范围的浮点数除法时,会发生技术错误。Intel最终花费了4.75亿美元来为用户置换处理器。

飞行事故
1993年,瑞典的一架JAS 39鹰狮战斗机因飞行控制软件的Bug而坠毁。

1994年,苏格兰一架吉努克型直升飞机坠毁,29名乘客全部罹难。然而最初指责声都指向飞行员,但后来有证据表明,直升飞机的系统错误才是罪魁祸首。

死Ping
1995-1996年,由于没有进行足够的校验,在收到特殊构造的Ping包时,会导致Windows设备蓝屏并重启。

阿丽亚娜5型运载火箭事件
1996年6月4日,阿丽亚娜5型运载火箭的首次发射点火后,火箭开始偏离路线,最终被逼引爆自毁,整个过程只有短短30秒。(原计划将运送4颗太阳风观察卫星到预定轨道)

阿丽亚娜5型运载火箭基于前一代4型火箭开发。在4型火箭系统中,对一个水平速率的测量值使用了16位的变量及内存,因为在4型火箭系统中反复验证过,这一值不会超过16位的变量,而5型火箭的开发人员简单复制了这部分程序,而没有对新火箭进行数值的验证,结果发生了致命的数值溢出。发射后这个64位带小数点的变量被转换成16位不带小数点的变量,引发了一系列的错误,从而影响了火箭上所有的计算机和硬件,瘫痪了整个系统,因而不得不选择自毁,4亿美元变成一个巨大的烟花。

火星气候探测者号事件
1999年9月,火星气候探测者号在火星坠毁。火星气候探测者号的目的为研究火星气候,花费3亿多美元。探测者号在太空中飞行几个月时间,接近火星时,探测器的控制团队使用英制单位来发送导航指令,而探测器的软件系统使用公制来读取指令。这一错误导致探测器进入过低的火星轨道(大约100公里误差),在火星大气压力和摩擦下解体。

火火星极地登陆者号件
1999年12月,火星极地登录者号在火星坠毁,原因是设计缺陷导致其在达到行星地表之间就关闭了主引擎,最终撞毁。

辐射治疗超标事件
2000年11月,巴拿马美国国家癌症研究所,从美国Multidata公司引入了放射治疗设备及软件,但其辐射剂量计算值有误(软件本身运行医生画四个方块来保护患者健康组织,但医生需要五块,于是医生自己想办法欺骗了软件,殊不知该操作方式将放射剂量进行了加倍处理)。部分患者接受了超标剂量的治疗,至少有5人死亡。后续几年中,又有21人死亡,但很难确定这21人中到底有多少人是死于本身的癌症,还是辐射治疗剂量超标引发的不良后果。

北美大停电事件
2003年8月,由于GE的一个监控软件,没有有效的通知操作人员有一个地方电站断掉了,电力缺口导致了多米诺骨牌效应,最终导致了加拿大安大略州和美国八个州断电,影响到了5千万人,总损失达60亿美元。

丰田普锐斯混合动力汽车召回事件
2005年10月,丰田宣布召回16000辆锐斯混合动力汽车。这次召回的原因是“发动机熄火意外”及“警示灯无故开启”。本次召回的根本原因不是硬件问题,而是汽车嵌入式编程代码的问题。

东京证券交易所事件
2005年12月,J-COM公司上市,开盘价格61.2万日元一股,日本瑞穗证券的一个交易员接到了客户委托“请以61万日元的价格,卖出1股J-Com的股票”。但交易员输入成了“以每股1日元的价格,卖出61万股”。由于交易系统限制,交易系统自动调整为“以57万日元,出售61万股”。

2分钟后,操作员发现操作失误,执行了3次撤单操作全部失败。J-COM股票一路狂跌,瑞穗证券拼命拉高到77.2万日元,仅此一项,瑞穗证券一共损失了约270亿日。但仍引起了很大的连锁效应。

更大的问题是,J-COM的股票一共只发行了14000多股,但卖出去的可远不止这么多。最后经过协商,瑞穗证券用每股91万日元的价格,现金清算了股民手上的9万多股,全部损失,扩大到400多亿日元。

然后瑞穗证券状告东京证券和富士通,官司打了十年。最后判定,以当日瑞穗证券电话联络东京证券的时间点为分界线,之前全部由瑞穗证券承担,之后产生的损失由东京证券承担70%瑞穗证券承担30%。富士通及程序员没有收到罚款。

恩,痛定思痛,决定开始开发了新的交易系统,开发商仍然是富士通。

Gmail故障
2009年2月份Google的Gmail故障,导致用户几个小时内无法访问邮箱。据Google反馈,故障是因数据中心之间的负载均衡软件的Bug引发的。

温州7.23动车事故
2011年7月23日,甬温线浙江省温州市境内,由北京南站开往福州站的D301次列车与杭州站开往福州南站的D3115次列车发生动车组列车追尾事故,造成40人死亡、172人受伤,中断行车32小时35分,直接经济损失近2亿元。

上海铁路局事后反馈,“7·23”动车事故是由于温州南站信号设备在设计上存在严重缺陷,遭雷击发生故障后,导致本应显示为红灯的区间信号机错误显示为绿灯。

骑士资本事件
2012年8月1日,骑士资本的技术人员,在1台设备中部署了错误版本的应用(一共8台,7台正常),该设备触发了n年前的代码,在一个小时内就执行完了原本应该在几天内完成的交易,导致错误的买入卖出,直接将公司推向了破产的边缘,投资人紧急注资4亿美元才得以幸免,最终损失高达5亿美元。

12306订票助手拖垮GitHub
2013年1月15日,GitHub受到中国大陆的DDOS攻击,网站几乎被拖垮。

最后发现,是由于春运期间,各大浏览器厂商集成了一位网友iFish(木鱼)的“订票助手”插件,帮助春运大军抢票回家。但该软件的早期版本,直接引用了GitHub的Raw File而不是静态文件,并且在访问文件失败时,每5S会重试一次。这样,抢票大军的每一个人,没5S都会向GitHub发送一次请求,造成对GitHub的DDOS攻击。

OpenSSL Bleeding Heart漏洞
2014年4月,谷歌和芬兰安全公司Codenomicon分别报告了OpenSSL存在重大缓冲区溢出漏洞。在OpenSSL心跳扩展的源码中,没有去校验缓冲区的长度,导致攻击者可以直接获取网站的私钥信息。这次漏洞的影响面很广,几乎所有厂商都在打补丁和更新私钥及证书。

Twitter丢失400万活跃用户
2015年2月,Twitter在在季度财报中指出,苹果iOS8升级后出现的漏洞让Twitter至少损失了400万用户。首先是iOS8和 Twitter的兼容性问题流失了100万用户,Safari流览器升级后的共用链接功能无法自动升级,这一问题流失了300万用户,另外还有100万iPhone使用者在升级系统后忘记了Twitter密码,导致无法使用而离开了Twitter。

日本天文卫星“瞳”解体事件
2016年03月,日本天文卫星升空不到一个月(仅正常工作3天),就解体了,该卫星造价约19亿人民币,并搭载了多国的观测设备。
首先,由于干扰,追星仪发生了故障,启用了备用的陀螺仪。
但是,陀螺仪也有故障,导致没有旋转的卫星开始加速旋转,并达到阀值。
然后,为了阻止卫星旋转,卫星开始反向喷气,但程序员把喷气的方向搞反了,卫星越转越快,最终解体。

============================================================
注:本文主要是整理了系统Bug导致的惨痛代价,没有记录下面几种情况(设计失败,黑客攻击,病毒爆发)
*从计算机诞生以来,众多失败的软件项目,没有加到列表中
*1982年,苏联天然气管道爆炸事件,涉及植入恶意代码,没有加到列表中

ZooKeeper配置集群

以在同一台机器上的三个节点的集群为例:

1、在每个节点的zoo.cfg增加下面的配置(只给出了变动的部分)

dataDir=D:/Publish/ZooKeeper/node01
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node02
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888
dataDir=D:/Publish/ZooKeeper/node03
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2888:3888

2、在每个dataDir增加一个myid文件,内容分别为1,2,3

3、现在可以启动哦

4、如果是在不同的服务器上,则dataDir、clientPort及2888:3888都不需要变动,localhost换成对应的计算机名称或ip即可。我这里是在一台电脑上运行的,所以要避免路径及端口冲突。

ZooKeeper Queue(Java)

Queue实现了生产者——消费者模式。

1、QueueTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class QueueTest implements Watcher {
    static ZooKeeper zk = null;
    static Object mutex;
    private String root;

    /**
     * 构造函数
     * @param hostPort
     * @param name
     */
    QueueTest(String hostPort, String name) {
        this.root = name;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }

            // 创建root节点
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out.println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 添加任务队列
     * @param i
     * @return
     */
    boolean produce(int i) throws KeeperException, InterruptedException {
        String s = "element"+i;
        zk.create(root + "/element", s.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL);

        return true;
    }


    /**
     * 从任务队列获取任务
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException {
        Stat stat = null;

        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    //首先进行排序,找到id最小的任务编号
                    Integer min = Integer.MAX_VALUE;
                    for (String s : list) {
                        Integer tempValue = new Integer(s.substring(7));
                        if (tempValue < min) min = tempValue;
                    }

                    //从节点获取任务,处理,并删除节点
                    System.out.println("Processing task: " + root + "/element" + padLeft(min));
                    byte[] buff = zk.getData(root + "/element" + padLeft(min), false, stat);
                    System.out.println("The value in task is: " + new String(buff));
                    zk.delete(root + "/element" + padLeft(min), -1);

                    return min;
                }
            }
        }
    }

    /**
     * 格式化数字字符串
     * @param num
     */
    public static String padLeft(int num) {
        return String.format("%010d", num);
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) {
        String hostPort = "localhost:2181";
        String root = "/neohope/queue";
        int max = 10;
        QueueTest q = new QueueTest(hostPort, root);

        for (int i = 0; i < max; i++) {
            try {
                q.produce(i);
            } catch (KeeperException e) {

            } catch (InterruptedException e) {
            }
        }

        for (int i = 0; i < max; i++) {
            try {
                int r = q.consume();
                System.out.println("Item: " + r);
            } catch (KeeperException ex) {
                ex.printStackTrace();
                break;
            } catch (InterruptedException ex) {
                ex.printStackTrace();
                break;
            }
        }
    }
}

2、尝试运行一下。

ZooKeeper Barrier(Java)

Barrier主要用于ZooKeeper中的同步。

1、BarrierTest.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by Hansen
 */
public class BarrierTest implements Watcher, Runnable {
    static ZooKeeper zk = null;
    static Object mutex;
    String root;
    int size;
    String name;

    /**
     * 构造函数
     *
     * @param hostPort
     * @param root
     * @param name
     * @param size
     */
    BarrierTest(String hostPort, String root, String name, int size) {
        this.root = root;
        this.name = name;
        this.size = size;

        //创建连接
        if (zk == null) {
            try {
                System.out.println("Begin Starting ZK:");
                zk = new ZooKeeper(hostPort, 30000, this);
                mutex = new Object();
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }

        // 创建barrier节点
        if (zk != null) {
            try {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: "
                                + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean enter() throws KeeperException, InterruptedException{
        zk.create(root + "/" + name, "Hi".getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);

        System.out.println("Begin enter barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);

                if (list.size() < size) {
                    mutex.wait();
                } else {
                    System.out.println("Finished enter barier:" + name);
                    return true;
                }
            }
        }
    }


    /**
     * 新建节点,并等待其他节点被新建
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    boolean doSomeThing()
    {
        System.out.println("Begin doSomeThing:" + name);
        //do your job here
        System.out.println("Finished doSomeThing:" + name);
        return true;
    }

    /**
     * 删除自己的节点,并等待其他节点被删除
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */

    boolean leave() throws KeeperException, InterruptedException{
        zk.delete(root + "/" + name, -1);

        System.out.println("Begin leave barier:" + name);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() > 0) {
                    mutex.wait();
                } else {
                    System.out.println("Finished leave barier:" + name);
                    return true;
                }
            }
        }
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        //进入barrier
        try {
            boolean flag = this.enter();
            if (!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        //处理同步业务
        try {
            doSomeThing();
            Thread.sleep(1000);
        } catch (InterruptedException e) {

        }

        //离开barrier
        try {
            this.leave();
        } catch (KeeperException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String args[]) throws IOException {
        String hostPort = "localhost:2181";
        String root = "/neohope/barrier";

        try {
            new Thread(new BarrierTest("127.0.0.1:2181", root,"001", 1)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"002", 2)).start();
            new Thread(new BarrierTest("127.0.0.1:2181", root,"003", 3)).start();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.in.read();
    }
}

2、运行结果(由于Finished enter barier时,第一次同步已经结束了,所以是与Begin doSomeThing混在一起的)

Begin enter barier:001
Begin enter barier:003
Begin enter barier:002

Finished enter barier:001
Begin doSomeThing:001
Finished doSomeThing:001
Finished enter barier:002
Begin doSomeThing:002
Finished doSomeThing:002
Finished enter barier:003
Begin doSomeThing:003
Finished doSomeThing:003

Begin leave barier:002
Begin leave barier:001
Begin leave barier:003
Finished leave barier:002
Finished leave barier:003
Finished leave barier:001

ZooKeeper DataPublisher(Java)

1、DataPublisher.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

/**
 * Created by Hansen
 */
public class DataPublisher {

    public void publishTest(String hostPort,String znode) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 30000, new Watcher() {
            public void process(WatchedEvent event) {
                //do nothing
            }});

        //删掉节点
        Stat stat =zk.exists(znode, false);
        if(stat!=null)
        {
            zk.delete(znode, -1);
        }

        //开始测试
        zk.create(znode,"test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        byte[] buff =zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.setData(znode,"test02".getBytes(), -1);
        buff = zk.getData(znode, false, null);
        System.out.println("data is " + new String(buff,"UTF-8"));
        zk.delete(znode, -1);
        zk.close();
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        DataPublisher publisher = new DataPublisher();
        publisher.publishTest(hostPort,znode);
    }
}

2、与Zookeeper Watcher配合使用,试一下。

ZooKeeper Watcher(Java)

1、Executor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * Created by Hansen
 */
public class Executor implements Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;

    /**
     * 构造函数
     * @param hostPort  host:port
     * @param znode      /xxx/yyy/zzz
     */
    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        dm = new DataMonitor(hostPort, znode, null, this);
    }

    /**
     * 线程函数,等待DataMonitor退出
     * @see java.lang.Runnable
     */
    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!dm.bEnd) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    /**
     * 关闭zk连接
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeConnectionClosing(int rc) {
        synchronized (this) {
            notifyAll();
        }

        System.out.println("Connection is closing: "+ rc);
    }

    /**
     * znode节点状态或连接状态发生变化
     * @see com.neohope.zookeeper.test.DataMonitor.DataMonitorListener
     */
    @Override
    public void znodeStatusUpdate(byte[] data) {
        if (data == null) {
            System.out.println("data is null");
        } else {
            try {
                String s = new String(data,"UTF-8");
                System.out.println("data is "+s);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 入口函数
     * @param args
     */
    public static void main(String[] args) throws IOException {
        String hostPort = "localhost:2181";
        String znode = "/neohope/test";

        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、DataMonitor.java

package com.neohope.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Arrays;

/**
 * Created by Hansen
 */
public class DataMonitor implements Watcher, AsyncCallback.StatCallback {
    ZooKeeper zk;
    String znode;
    Watcher chainedWatcher;
    DataMonitorListener listener;

    boolean bEnd;
    byte prevData[];

    /**
     * 构造函数,并开始监视
     * @param hostPort          host:port
     * @param znode              /xxx/yyy/zzz
     * @param chainedWatcher   传递事件到下一个Watcher
     * @param listener          回调对象
     */
    public DataMonitor(String hostPort, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) throws IOException {
        this.zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;

        // 检查节点状态
        zk.exists(znode, true, this, null);
    }

    /**
     * exists回调函数
     * @param event     发生的事件
     * @see org.apache.zookeeper.Watcher
     */
    @Override
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // 连接状态发生变化
            switch (event.getState()) {
                case SyncConnected:
                    // 不需要做任何事情
                    break;
                case Expired:
                    // 连接超时,关闭连接
                    System.out.println("SESSIONEXPIRED ending");
                    bEnd = true;
                    listener.znodeConnectionClosing(KeeperException.Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            //节点状态发生变化
            if (path != null && path.equals(znode)) {
                //检查节点状态
                zk.exists(znode, true, this, null);
            }
        }

        //传递事件
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    /**
     * exists回调函数
     * @param rc     zk返回值
     * @param path   路径
     * @param ctx    Context
     * @param stat   状态
     *
     * @see org.apache.zookeeper.AsyncCallback.StatCallback
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists = false;
        if(rc== KeeperException.Code.OK.intValue()) {
            //节点存在
            exists = true;
        }
        else if(rc== KeeperException.Code.NONODE.intValue()){
            //节点没有找到
            exists = false;
        }
        else if(rc==KeeperException.Code.SESSIONEXPIRED.intValue() ){
            //Session过期
            bEnd = true;
            System.out.println("SESSIONEXPIRED ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else if( rc==KeeperException.Code.NOAUTH.intValue())
        {
            //授权问题
            bEnd = true;
            System.out.println("NOAUTH ending");
            listener.znodeConnectionClosing(rc);
            return;
        }
        else
        {
            //重试
            zk.exists(znode, true, this, null);
            return;
        }

        //获取数据
        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        //调用listener
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.znodeStatusUpdate(b);
            prevData = b;
        }
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * znode节点状态或连接状态发生变化
         */
        void znodeStatusUpdate(byte data[]);

        /**
         * 关闭zonde连接
         *
         * @param rc ZooKeeper返回值
         */
        void znodeConnectionClosing(int rc);
    }
}

3、运行Executor

4、运行zkCli.cmd

zkCli.cmd -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zk: 127.0.0.1:2181(CONNECTED) 2] create /neohope/test test01
[zk: 127.0.0.1:2181(CONNECTED) 3] set /neohope/test test02
[zk: 127.0.0.1:2181(CONNECTED) 4] set /neohope/test test03
[zk: 127.0.0.1:2181(CONNECTED) 5] delete /neohope/test
[zk: 127.0.0.1:2181(CONNECTED) 6] quit

5、观察Executor的输出

ZooKeeper增删改查(Shell)

1、语法

ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

2、示例

zkCli.cmd -server 127.0.0.1:2181

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 2] create /zktest test01
Created /zktest

[zk: 127.0.0.1:2181(CONNECTED) 3] ls /
[zktest, zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test01
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x4
mtime = Sun May 01 09:34:09 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 5] set /zktest test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 4] get /zktest
test02
cZxid = 0x4
ctime = Sun May 01 09:34:09 CST 2016
mZxid = 0x5
mtime = Sun May 01 09:37:16 CST 2016
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

[zk: 127.0.0.1:2181(CONNECTED) 7] delete /zktest

[zk: 127.0.0.1:2181(CONNECTED) 8] ls /
[zookeeper]

[zk: 127.0.0.1:2181(CONNECTED) 9] quit
Quitting...