QPID通讯代码04

  • TextMessage
  • ListMessage
  • MapMesage
  • StreamMessage

1、MqConsumerStream.java

package com.neohope.qpid.test;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import javax.jms.*;

public class MqConsumerStream {
    public static void main(String[] args) throws Exception {
        Connection connection =
                new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
        MessageConsumer consumer = session.createConsumer(queue);

        System.out.println("Receiving as StreamMessage");
        StreamMessage m = (StreamMessage) consumer.receive();
        System.out.println(m);
        System.out.println("==========================================");
        System.out.println("Printing stream contents:");
        try {
            while (true)
                System.out.println(m.readObject());
        } catch (MessageEOFException e) {
            // DONE
        }
    }
}

Continue reading QPID通讯代码04

QPID通讯代码03

  • TextMessage
  • ListMessage
  • MapMesage
  • StreamMessage

1、MqConsumerMap.java

package com.neohope.qpid.test;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import javax.jms.*;
import java.util.Enumeration;

public class MqConsumerMap {
    public static void main(String[] args) throws Exception {
        Connection connection =
                new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
        MessageConsumer consumer = session.createConsumer(queue);

        System.out.println("Receiving as MapMessage");
        MapMessage m = (MapMessage) consumer.receive();
        System.out.println(m);
        System.out.println("==========================================");
        System.out.println("Printing map contents:");
        Enumeration keys = m.getMapNames();
        while (keys.hasMoreElements()) {
            String key = (String) keys.nextElement();
            System.out.println(key + " => " + m.getObject(key));
        }
    }
}

Continue reading QPID通讯代码03

QPID通讯代码02

  • TextMessage
  • ListMessage
  • MapMesage
  • StreamMessage

1、MqConsumerList.java

package com.neohope.qpid.test;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ListMessage;

import javax.jms.*;
import java.util.Enumeration;
import java.util.Iterator;

public class MqConsumerList {
    public static void main(String[] args) throws Exception {
        Connection connection =
                new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'");

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}");
        MessageConsumer consumer = session.createConsumer(queue);

        System.out.println("Receiving as ListMessage");
        ListMessage m = (ListMessage) consumer.receive();
        System.out.println(m);
        System.out.println("==========================================");
        System.out.println("Printing list contents:");
        Iterator i = m.iterator();
        while (i.hasNext())
            System.out.println(i.next());

        connection.close();
    }
}

Continue reading QPID通讯代码02

QPID通讯代码01

  • TextMessage
  • ListMessage
  • MapMesage
  • StreamMessage

1、MqConsumerText.java

package com.neohope.qpid.test;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;

public class MqConsumerText {

    public static void main(String[] args)
    {
        try
        {
            Properties properties = new Properties();
            properties.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
            properties.setProperty("connectionfactory.qpidConnectionfactory","amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'");
            properties.setProperty("destination.topicExchange","amq.topic");

            Context context = new InitialContext(properties);

            ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = (Destination) context.lookup("topicExchange");

            MessageConsumer messageConsumer = session.createConsumer(destination);
            TextMessage message = (TextMessage)messageConsumer.receive();
            System.out.println(message.getText());

            connection.close();
            context.close();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

Continue reading QPID通讯代码01