- TextMessage
- ListMessage
- MapMesage
- StreamMessage
1、MqConsumerStream.java
package com.neohope.qpid.test;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import javax.jms.*;
public class MqConsumerStream {
public static void main(String[] args) throws Exception {
Connection connection =
new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
MessageConsumer consumer = session.createConsumer(queue);
System.out.println("Receiving as StreamMessage");
StreamMessage m = (StreamMessage) consumer.receive();
System.out.println(m);
System.out.println("==========================================");
System.out.println("Printing stream contents:");
try {
while (true)
System.out.println(m.readObject());
} catch (MessageEOFException e) {
// DONE
}
}
}
2、MqProducerStream.java
package com.neohope.qpid.test;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ListMessage;
import javax.jms.*;
import java.nio.charset.Charset;
import java.util.*;
public class MqProducerStream {
public static void main(String[] args) throws Exception {
Connection connection =
new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
MessageProducer producer = session.createProducer(queue);
StreamMessage msg = ((org.apache.qpid.jms.Session) session).createStreamMessage();
msg.writeInt(99);
msg.writeString("-=99=-");
msg.writeDouble(0.99);
msg.writeBoolean(true);
msg.writeChar('c');
msg.writeBytes("QPID你好".getBytes(Charset.forName("UTF-8")));
producer.send((Message) msg);
System.out.println("Sent: " + msg);
connection.close();
}
}
3、测试
#开启qpid-broker #运行MqConsumerStream.Main #运行MqProducerStream.Main