ActiveMQ通讯代码01

  • JMS方式调用
  • Queue方式调用
  • Topic方式调用
  • ReqRsp方式调用

1、TestMsg.java

package com.neohope.ActiveMQ.test.beans;

public class TestMsg implements java.io.Serializable{
    private static final long serialVersionUID = 12345678;

    public TestMsg(int taskId, String taskInfo, int taskLevel) {
        this.taskId = taskId;
        this.taskInfo = taskInfo;
        this.taskLevel = taskLevel;
    }

    public int taskId;
    public String taskInfo;
    public int taskLevel;
}

2、MqProducer.java

package com.neohope.ActiveMQ.test.jms;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqProducer {
    private static void ProduceMsg()
    {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("NEOHOPE.TestQueue");

            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for(int i=0; i<100; i++) {
                ObjectMessage message = session.createObjectMessage();
                message.setObject(new TestMsg(i, "task info "+i, 3));
                producer.send(message);
            }
            ObjectMessage message = session.createObjectMessage();
            message.setObject(new TestMsg(100, "-=END=-", 3));
            producer.send(message);

            session.commit();

        } catch (JMSException ex) {
            ex.printStackTrace();
        }
        finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        ProduceMsg();
    }
}

3、MqConsumer.java

package com.neohope.ActiveMQ.test.jms;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqConsumer {
    private static void ReveiveMsg()
    {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createConnection();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("NEOHOPE.TestQueue");
            MessageConsumer consumer = session.createConsumer(destination);
            connection.start();

            while (true) {
                Message msg = consumer.receive();
                if (msg instanceof ObjectMessage) {
                    ObjectMessage objMsg = (ObjectMessage) msg;
                    TestMsg mqmsg = (TestMsg) objMsg.getObject();
                    String taskInfo = mqmsg.taskInfo;
                    System.out.println("msg received: " + taskInfo);
                    if ("-=END=-".equals(taskInfo)) {
                        break;
                    }
                }
            }
            session.commit();
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
        finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","*");
        ReveiveMsg();
    }
}

4、MqListener.java

package com.neohope.ActiveMQ.test.jms;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqListener {
    private static void StartListen()
    {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        final Object wait = new Object();

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createConnection();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("NEOHOPE.TestQueue");
            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg instanceof ObjectMessage) {
                        ObjectMessage objMsg = (ObjectMessage) msg;
                        try {
                            TestMsg mqmsg = (TestMsg) objMsg.getObject();
                            String taskInfo = mqmsg.taskInfo;
                            System.out.println("msg received: " + taskInfo);
                            if ("-=END=-".equals(taskInfo)) {
                                synchronized (wait) {
                                    wait.notify();
                                }
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });

            connection.start();
            synchronized (wait){
                wait.wait();
            }
            session.commit();
        }
        catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
        StartListen();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

*