- JMS方式调用
- Queue方式调用
- Topic方式调用
- ReqRsp方式调用
1、TestMsg.java
package com.neohope.ActiveMQ.test.beans; public class TestMsg implements java.io.Serializable{ private static final long serialVersionUID = 12345678; public TestMsg(int taskId, String taskInfo, int taskLevel) { this.taskId = taskId; this.taskInfo = taskInfo; this.taskLevel = taskLevel; } public int taskId; public String taskInfo; public int taskLevel; }
2、MqQueueSender.java
package com.neohope.ActiveMQ.test.queue; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MqQueueSender { private static void ProduceMsg() { QueueConnectionFactory connectionFactory = null; QueueConnection connection = null; QueueSession session = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createQueueConnection(); connection.start(); session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("NEOHOPE.TestQueue"); QueueSender sender = session.createSender(queue); sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int i=0; i<100; i++) { ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(i, "task info "+i, 3)); sender.send(message); } ObjectMessage message = session.createObjectMessage(); message.setObject(new TestMsg(100, "-=END=-", 3)); sender.send(message); session.commit(); } catch (JMSException ex) { ex.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { ProduceMsg(); } }
3、MqQueueReceive.java
package com.neohope.ActiveMQ.test.queue; import com.neohope.ActiveMQ.test.beans.TestMsg; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MqQueueReceive { private static void ReveiveMsg() { QueueConnectionFactory connectionFactory = null; QueueConnection connection = null; QueueSession session = null; final Object wait = new Object(); try { connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); connection = connectionFactory.createQueueConnection(); session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("NEOHOPE.TestQueue"); QueueReceiver receive = session.createReceiver(queue); receive.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) msg; try { TestMsg mqmsg = (TestMsg) objMsg.getObject(); String taskInfo = mqmsg.taskInfo; System.out.println("msg received: " + taskInfo); if ("-=END=-".equals(taskInfo)) { synchronized (wait) { wait.notify(); } } } catch (JMSException e) { e.printStackTrace(); } } } }); connection.start(); synchronized (wait){ wait.wait(); } session.commit(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if(session!=null) { session.close(); } } catch (JMSException ex) { } try { if(connection!=null) { connection.close(); } } catch (JMSException ex) { } } } public static void main(String[] args) { System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*"); ReveiveMsg(); } }