- TextMessage
- ListMessage
- MapMesage
- StreamMessage
1、MqConsumerStream.java
package com.neohope.qpid.test; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import javax.jms.*; public class MqConsumerStream { public static void main(String[] args) throws Exception { Connection connection = new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); MessageConsumer consumer = session.createConsumer(queue); System.out.println("Receiving as StreamMessage"); StreamMessage m = (StreamMessage) consumer.receive(); System.out.println(m); System.out.println("=========================================="); System.out.println("Printing stream contents:"); try { while (true) System.out.println(m.readObject()); } catch (MessageEOFException e) { // DONE } } }
2、MqProducerStream.java
package com.neohope.qpid.test; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.ListMessage; import javax.jms.*; import java.nio.charset.Charset; import java.util.*; public class MqProducerStream { public static void main(String[] args) throws Exception { Connection connection = new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); MessageProducer producer = session.createProducer(queue); StreamMessage msg = ((org.apache.qpid.jms.Session) session).createStreamMessage(); msg.writeInt(99); msg.writeString("-=99=-"); msg.writeDouble(0.99); msg.writeBoolean(true); msg.writeChar('c'); msg.writeBytes("QPID你好".getBytes(Charset.forName("UTF-8"))); producer.send((Message) msg); System.out.println("Sent: " + msg); connection.close(); } }
3、测试
#开启qpid-broker #运行MqConsumerStream.Main #运行MqProducerStream.Main