- JMS方式调用
- Queue方式调用
- Topic方式调用
- ReqRsp方式调用
1、TestMsg.java
package com.neohope.ActiveMQ.test.beans;
public class TestMsg implements java.io.Serializable{
private static final long serialVersionUID = 12345678;
public TestMsg(int taskId, String taskInfo, int taskLevel) {
this.taskId = taskId;
this.taskInfo = taskInfo;
this.taskLevel = taskLevel;
}
public int taskId;
public String taskInfo;
public int taskLevel;
}
2、MqProducer.java
package com.neohope.ActiveMQ.test.jms;
import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqProducer {
private static void ProduceMsg()
{
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
try {
connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("NEOHOPE.TestQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i=0; i<100; i++) {
ObjectMessage message = session.createObjectMessage();
message.setObject(new TestMsg(i, "task info "+i, 3));
producer.send(message);
}
ObjectMessage message = session.createObjectMessage();
message.setObject(new TestMsg(100, "-=END=-", 3));
producer.send(message);
session.commit();
} catch (JMSException ex) {
ex.printStackTrace();
}
finally {
try {
if(session!=null) {
session.close();
}
} catch (JMSException ex) {
}
try {
if(connection!=null) {
connection.close();
}
} catch (JMSException ex) {
}
}
}
public static void main(String[] args) {
ProduceMsg();
}
}
3、MqConsumer.java
package com.neohope.ActiveMQ.test.jms;
import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqConsumer {
private static void ReveiveMsg()
{
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
try {
connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("NEOHOPE.TestQueue");
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
while (true) {
Message msg = consumer.receive();
if (msg instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) msg;
TestMsg mqmsg = (TestMsg) objMsg.getObject();
String taskInfo = mqmsg.taskInfo;
System.out.println("msg received: " + taskInfo);
if ("-=END=-".equals(taskInfo)) {
break;
}
}
}
session.commit();
}
catch (JMSException e) {
e.printStackTrace();
}
finally {
try {
if(session!=null) {
session.close();
}
} catch (JMSException ex) {
}
try {
if(connection!=null) {
connection.close();
}
} catch (JMSException ex) {
}
}
}
public static void main(String[] args) {
System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","*");
ReveiveMsg();
}
}
4、MqListener.java
package com.neohope.ActiveMQ.test.jms;
import com.neohope.ActiveMQ.test.beans.TestMsg;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MqListener {
private static void StartListen()
{
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
final Object wait = new Object();
try {
connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("NEOHOPE.TestQueue");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) msg;
try {
TestMsg mqmsg = (TestMsg) objMsg.getObject();
String taskInfo = mqmsg.taskInfo;
System.out.println("msg received: " + taskInfo);
if ("-=END=-".equals(taskInfo)) {
synchronized (wait) {
wait.notify();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
connection.start();
synchronized (wait){
wait.wait();
}
session.commit();
}
catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if(session!=null) {
session.close();
}
} catch (JMSException ex) {
}
try {
if(connection!=null) {
connection.close();
}
} catch (JMSException ex) {
}
}
}
public static void main(String[] args) {
System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "*");
StartListen();
}
}