ActiveMQ通讯代码04

  • JMS方式调用
  • Queue方式调用
  • Topic方式调用
  • ReqRsp方式调用

1、TestMsg.java

package com.neohope.ActiveMQ.test.beans;

public class TestMsg implements java.io.Serializable{
    private static final long serialVersionUID = 12345678;

    public TestMsg(int taskId, String taskInfo, int taskLevel) {
        this.taskId = taskId;
        this.taskInfo = taskInfo;
        this.taskLevel = taskLevel;
    }

    public int taskId;
    public String taskInfo;
    public int taskLevel;
}

2、MqQueueSender.java

package com.neohope.ActiveMQ.test.queue;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqQueueSender {
    private static void ProduceMsg()
    {
        QueueConnectionFactory connectionFactory = null;
        QueueConnection  connection = null;
        QueueSession session = null;

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createQueueConnection();
            connection.start();
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("NEOHOPE.TestQueue");
            QueueSender sender = session.createSender(queue);
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            for(int i=0; i<100; i++) {
                ObjectMessage message = session.createObjectMessage();
                message.setObject(new TestMsg(i, "task info "+i, 3));
                sender.send(message);
            }
            ObjectMessage message = session.createObjectMessage();
            message.setObject(new TestMsg(100, "-=END=-", 3));
            sender.send(message);

            session.commit();

        } catch (JMSException ex) {
            ex.printStackTrace();
        }
        finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        ProduceMsg();
    }
}

3、MqQueueReceive.java

package com.neohope.ActiveMQ.test.queue;

import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MqQueueReceive {
    private static void ReveiveMsg()
    {
        QueueConnectionFactory connectionFactory = null;
        QueueConnection connection = null;
        QueueSession session = null;
        final Object wait = new Object();

        try {
            connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            connection = connectionFactory.createQueueConnection();
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("NEOHOPE.TestQueue");
            QueueReceiver receive = session.createReceiver(queue);

            receive.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg instanceof ObjectMessage) {
                        ObjectMessage objMsg = (ObjectMessage) msg;
                        try {
                            TestMsg mqmsg = (TestMsg) objMsg.getObject();
                            String taskInfo = mqmsg.taskInfo;
                            System.out.println("msg received: " + taskInfo);
                            if ("-=END=-".equals(taskInfo)) {
                                synchronized (wait) {
                                    wait.notify();
                                }
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });

            connection.start();
            synchronized (wait){
                wait.wait();
            }
            session.commit();
        }
        catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(session!=null) {
                    session.close();
                }
            } catch (JMSException ex) {
            }
            try {
                if(connection!=null) {
                    connection.close();
                }
            } catch (JMSException ex) {
            }
        }
    }

    public static void main(String[] args) {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
        ReveiveMsg();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

*