ActiveMQ通讯代码03

  • JMS方式调用
  • Queue方式调用
  • Topic方式调用
  • ReqRsp方式调用

1、TestMsg.java

package com.neohope.ActiveMQ.test.beans;

public class TestMsg implements java.io.Serializable{
    private static final long serialVersionUID = 12345678;

    public TestMsg(int taskId, String taskInfo, int taskLevel) {
        this.taskId = taskId;
        this.taskInfo = taskInfo;
        this.taskLevel = taskLevel;
    }

    public int taskId;
    public String taskInfo;
    public int taskLevel;
}

2、MqTopicPublisher.java

package com.neohope.ActiveMQ.test.topic;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Hansen on 2015/12/18.
 */
public class MqTopicPublisher {
    private static void ProduceMsg()
    {
        TopicConnectionFactory connectionFactory = null;
        TopicConnection connection = null;
        TopicSession session = null;

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createTopicConnection();
            connection.start();
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("NEOHOPE.TestTopic");
            TopicPublisher publisher = session.createPublisher(topic);
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            for(int i=0; i<100; i++) {
                ObjectMessage message = session.createObjectMessage();
                message.setObject(new TestMsg(i, "topic task info "+i, 3));
                publisher.send(message);
            }
            ObjectMessage message = session.createObjectMessage();
            message.setObject(new TestMsg(100, "-=END=-", 3));
            publisher.send(message);

            session.commit();

        } catch (JMSException ex) {
            ex.printStackTrace();
        }
        finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        ProduceMsg();
    }
}

3、MqTopicSubscriber.java

package com.neohope.ActiveMQ.test.topic;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Hansen on 2015/12/18.
 */
public class MqTopicSubscriber {
    private static void ReveiveMsg()
    {
        TopicConnectionFactory connectionFactory = null;
        TopicConnection connection = null;
        TopicSession session = null;
        final Object wait = new Object();

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createTopicConnection();
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("NEOHOPE.TestTopic");
            TopicSubscriber subscriber = session.createSubscriber(topic);

            subscriber.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg instanceof ObjectMessage) {
                        ObjectMessage objMsg = (ObjectMessage) msg;
                        try {
                            TestMsg mqmsg = (TestMsg) objMsg.getObject();
                            String taskInfo = mqmsg.taskInfo;
                            System.out.println("msg received: " + taskInfo);
                            if ("-=END=-".equals(taskInfo)) {
                                synchronized (wait) {
                                    wait.notify();
                                }
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });

            connection.start();
            synchronized (wait){
                wait.wait();
            }
            session.commit();
        }
        catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","*");
        ReveiveMsg();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

*