- JMS方式调用
- Queue方式调用
- Topic方式调用
- ReqRsp方式调用
1、TestMsg.java
package com.neohope.ActiveMQ.test.beans;
public class TestMsg implements java.io.Serializable{
    private static final long serialVersionUID = 12345678;
    public TestMsg(int taskId, String taskInfo, int taskLevel) {
        this.taskId = taskId;
        this.taskInfo = taskInfo;
        this.taskLevel = taskLevel;
    }
    public int taskId;
    public String taskInfo;
    public int taskLevel;
}
2、MqQueueSender.java
package com.neohope.ActiveMQ.test.queue;
import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqQueueSender {
    private static void ProduceMsg()
    {
        QueueConnectionFactory connectionFactory = null;
        QueueConnection  connection = null;
        QueueSession session = null;
        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createQueueConnection();
            connection.start();
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("NEOHOPE.TestQueue");
            QueueSender sender = session.createSender(queue);
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            for(int i=0; i<100; i++) {
                ObjectMessage message = session.createObjectMessage();
                message.setObject(new TestMsg(i, "task info "+i, 3));
                sender.send(message);
            }
            ObjectMessage message = session.createObjectMessage();
            message.setObject(new TestMsg(100, "-=END=-", 3));
            sender.send(message);
            session.commit();
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
        finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }
    public static void main(String[] args) {
        ProduceMsg();
    }
}
3、MqQueueReceive.java
package com.neohope.ActiveMQ.test.queue;
import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqQueueReceive {
    private static void ReveiveMsg()
    {
        QueueConnectionFactory connectionFactory = null;
        QueueConnection connection = null;
        QueueSession session = null;
        final Object wait = new Object();
        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createQueueConnection();
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("NEOHOPE.TestQueue");
            QueueReceiver receive = session.createReceiver(queue);
            receive.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg instanceof ObjectMessage) {
                        ObjectMessage objMsg = (ObjectMessage) msg;
                        try {
                            TestMsg mqmsg = (TestMsg) objMsg.getObject();
                            String taskInfo = mqmsg.taskInfo;
                            System.out.println("msg received: " + taskInfo);
                            if ("-=END=-".equals(taskInfo)) {
                                synchronized (wait) {
                                    wait.notify();
                                }
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            connection.start();
            synchronized (wait){
                wait.wait();
            }
            session.commit();
        }
        catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }
    public static void main(String[] args) {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
        ReveiveMsg();
    }
}