Apache Qpid - AMQP Messaging for Java JMS, C++, Python, Ruby, and .NET | Apache Qpid Documentation |
Includes and Namespaces
#include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/SubscriptionManager.h>
using namespace qpid::client; using namespace qpid::framing;
Opening and closing connections and sessions
Connection connection; try { connection.open(host, port); Session session = connection.newSession(); ... connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } return 1;
Declaring and binding queues:
session.queueDeclare(arg::queue="message_queue"); session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key");
Sending a message:
message.getDeliveryProperties().setRoutingKey("routing_key"); message.setData("Hi, Mom!"); session.messageTransfer(arg::content=message, arg::destination="amq.direct");
Sending a message (asynchronous):
#include <qpid/client/AsyncSession.h> async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); ... session.sync();
Replying to a message:
Message request, response; ... if (request.getMessageProperties().hasReplyTo()) { string routingKey = request.getMessageProperties().getReplyTo().getRoutingKey(); string exchange = request.getMessageProperties().getReplyTo().getExchange(); response.getDeliveryProperties().setRoutingKey(routingKey); messageTransfer(arg::content=response, arg::destination=exchange); }
A message listener:
class Listener : public MessageListener{ private: SubscriptionManager& subscriptions; public: Listener(SubscriptionManager& subscriptions); virtual void received(Message& message); };
void Listener::received(Message& message) { std::cout << "Message: " << message.getData() << std::endl; if (endCondition(message)) { subscriptions.cancel(message.getDestination()); } }
Using a message listener with a subscription manager:
SubscriptionManager subscriptions(session);
Listener listener(subscriptions); subscriptions.subscribe(listener, "message_queue"); subscriptions.run();
Using a LocalQueue with a subscription manager
SubscriptionManager subscriptions(session);
LocalQueue local_queue; subscriptions.subscribe(local_queue, string("message_queue"));
Message message; for (int i=0; i<10; i++) { local_queue.get(message, 10000); std::cout << message.getData() << std::endl; }