kombu.simple is a simple interface to AMQP queueing. It is only slightly different from the Queue class in the Python Standard Library, which makes it excellent for users with basic messaging needs.
Instead of defining exchanges and queues, the simple classes only requires two arguments, a connection channel and a name. The name is used as the queue, exchange and routing key. If the need arises, you can specify a Queue as the name argument instead.
In addition, the BrokerConnection comes with shortcuts to create simple queues using the current connection:
>>> queue = connection.SimpleQueue("myqueue")
>>> # ... do something with queue
>>> queue.close()
This is equivalent to:
>>> from kombu import SimpleQueue, SimpleBuffer
>>> channel = connection.channel()
>>> queue = SimpleBuffer(channel)
>>> # ... do something with queue
>>> channel.close()
>>> queue.close()
The simple interface defines two classes; SimpleQueue, and SimpleBuffer. The former is used for persistent messages, and the latter is used for transient, buffer-like queues. They both have the same interface, so you can use them interchangeably.
Here is an example using the SimpleQueue class to produce and consume logging messages:
from __future__ import with_statement
from socket import gethostname
from time import time
from kombu import BrokerConnection
class Logger(object):
def __init__(self, connection, queue_name="log_queue",
serializer="json", compression=None):
self.queue = connection.SimpleQueue(self.queue_name)
self.serializer = serializer
self.compression = compression
def log(self, message, level="INFO", context={}):
self.queue.put({"message": message,
"level": level,
"context": context,
"hostname": socket.gethostname(),
"timestamp": time()},
serializer=self.serializer,
compression=self.compression)
def process(self, callback, n=1, timeout=1):
for i in xrange(n):
log_message = self.queue.get(block=True, timeout=1)
entry = log_message.payload # deserialized data.
callback(entry)
log_message.ack() # remove message from queue
def close(self):
self.queue.close()
if __name__ == "__main__":
from contextlib import closing
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
with closing(Logger(connection)) as logger:
# Send message
logger.log("Error happened while encoding video",
level="ERROR",
context={"filename": "cutekitten.mpg"})
# Consume and process message
# This is the callback called when a log message is
# received.
def dump_entry(entry):
date = datetime.fromtimestamp(entry["timestamp"])
print("[%s %s %s] %s %r" % (date,
entry["hostname"],
entry["level"],
entry["message"],
entry["context"]))
# Process a single message using the callback above.
logger.process(dump_entry, n=1)