The puka module implements a client for AMQP 0-9-1 protocol. It’s tuned to work with RabbitMQ broker, but should work fine with other message brokers that support this protocol.
AMQP protocol defines a model which consists of Exchanges, Bindings and Queues:
Exchange ---binding---> Queue
For more information see:
- RabbitMQ tutorials.
- AMQP 0-9-1 Specification and Definitions
The puka module defines a single class:
Consturctor for Client class. amqp_url is an url-like address of the RabbitMQ server. Default points to amqp://guest:guest@localhost:5672/. pubacks tells if the client should take advantage of the publiser-acks feature on the server - autodetect by default. client_properties is a dict of properties to add to the connection properties sent to the server.
The puka module exposes the following exceptions, as defined by AMQP protocol.
Additionally puka defines a connection error:
Client object (Client) provides the public methods described below. They can be grouped in three distinct subcategories.
The following methods are responsible for networking and socket handling.
Return a socket’s file descriptor (a small integer). This is useful with select.select().
Return a socket.socket() object.
Return true if the send buffer is full, and needs to be written to the socket.
Run any outstanding user callbacks for any of the promises.
Wait up to timeout seconds for an event on given promise. If the event was received before timeout, run the callback for the promise and return AMQP response.
Enter an event loop, keep on handling network and executing user callbacks for up to timeout seconds.
Cause the event loop to break on next iteration.
Functions below return a promise. It’s a small number, that identifies an asynchronous request. You can wait for a promise to be done and receive a response for it.
Establishes an asynchronous connection with the server. You’re forbidden to do any other action before this step is finished.
Immediately closes the connection. All buffered data will be lost. All outstanding promises will be closed with an error.
For details see the documentation of exchange bindings RabbitMQ feature.
Return a consume_promise.
Return a consume_promise.
Given a consume_promise returned by basic_consume() or basic_consume_multi() changes the prefetch_count for that consumes.
Given a consume_promise returned by basic_consume() or basic_consume_multi() cancels the consume. You can wait on the returned promise.
Given a result of basic_consume or basic_consume_multi promise (ie: a message) acknowledges it. It’s an asynchronous method.
Given a result of basic_consume or basic_consume_multi promise (ie: a message) rejects it. It’s an asynchronous method.
Synchronously send a message:
import puka
client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)
promise = client.queue_declare(queue='test')
client.wait(promise)
promise = client.basic_publish(exchange='', routing_key='test',
body="Hello world!")
client.wait(promise)
promise = client.close()
client.wait(promise)
Synchronously receive three messages:
import puka
client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)
promise = client.queue_declare(queue='test')
client.wait(promise)
consume_promise = client.basic_consume(queue='test')
for i in range(3):
result = client.wait(consume_promise)
print " [x] Received message %r" % (result,)
client.basic_ack(result)
promise = client.basic_cancel(consume_promise)
client.wait(promise)
promise = client.close()
client.wait(promise)
Asynchronously send a message:
import puka
def on_connection(promise, result):
client.queue_declare(queue='test', callback=on_queue_declare)
def on_queue_declare(promise, result):
client.basic_publish(exchange='', routing_key='test',
body="Hello world!",
callback=on_basic_publish)
def on_basic_publish(promise, result):
print " [*] Message sent"
client.loop_break()
client = puka.Client("amqp://localhost/")
client.connect(callback=on_connection)
client.loop()
promise = client.close()
client.wait(promise)