Change history

2.1.7

release-date:2012-04-27 6:00 P.M BST
  • compat consumerset now accepts optional channel argument.

2.1.6

release-date:2012-04-23 1:30 P.M BST
  • SQLAlchemy transport was not working correctly after URL parser change.

  • maybe_declare now stores cached declarations per underlying connection instead of globally, in the rare case that data disappears from the broker after connection loss.

  • Django: Added South migrations.

    Contributed by Joseph Crosland.

2.1.5

release-date:2012-04-13 3:30 P.M BST
  • The url parser removed more than the first leading slash (Issue #121).

  • SQLAlchemy: Can now specify url using + separator

    Example:

    BrokerConnection("sqla+mysql://localhost/db")
    
  • Better support for anonymous queues (Issue #116).

    Contributed by Michael Barrett.

  • Connection.as_uri now quotes url parts (Issue #117).

  • Beanstalk: Can now set message TTR as a message property.

    Contributed by Andrii Kostenko

2.1.4

release-date:2012-04-03 4:00 P.M GMT
  • MongoDB: URL parsing are now delegated to the pymongo library (Fixes Issue #103 and Issue #87).

    Fix contributed by Flavio Percoco Premoli and James Sullivan

  • SQS: A bug caused SimpleDB to be used even if sdb persistence was not enabled (Issue #108).

    Fix contributed by Anand Kumria.

  • Django: Transaction was committed in the wrong place, causing data cleanup to fail (Issue #115).

    Fix contributed by Daisuke Fujiwara.

  • MongoDB: Now supports replica set URLs.

    Contributed by Flavio Percoco Premoli.

  • Redis: Now raises a channel error if a queue key that is currently being consumed from disappears.

    Fix contributed by Stephan Jaekel.

  • All transport ‘channel_errors’ lists now includes StdChannelError.

  • All kombu exceptions now inherit from a common KombuError.

2.1.3

release-date:2012-03-20 3:00 P.M GMT
by:Ask Solem
  • Fixes Jython compatibility issues.
  • Fixes Python 2.5 compatibility issues.

2.1.2

release-date:2012-03-01 01:00 P.M GMT
by:Ask Solem
  • amqplib: Last version broke SSL support.

2.1.1

release-date:2012-02-24 02:00 P.M GMT
by:Ask Solem
  • Connection URLs now supports encoded characters.

  • Fixed a case where connection pool could not recover from connection loss.

    Fix contributed by Florian Munz.

  • We now patch amqplib’s __del__ method to skip trying to close the socket if it is not connected, as this resulted in an annoying warning.

  • Compression can now be used with binary message payloads.

    Fix contributed by Steeve Morin.

2.1.0

release-date:2012-02-04 10:38 P.M GMT
by:Ask Solem
  • MongoDB: Now supports fanout (broadcast) (Issue #98).

    Contributed by Scott Lyons.

  • amqplib: Now detects broken connections by using MSG_PEEK.

  • pylibrabbitmq: Now supports basic_get (Issue #97).

  • gevent: Now always uses the select polling backend.

  • pika transport: Now works with pika 0.9.5 and 0.9.6dev.

    The old pika transport (supporting 0.5.x) is now available as alias oldpika.

    (Note terribly latency has been experienced with the new pika versions, so this is still an experimental transport).

  • Virtual transports: can now set polling interval via the transport options (Issue #96).

    Example:

    >>> BrokerConnection("sqs://", transport_options={
    ...     "polling_interval": 5.0})
    

    The default interval is transport specific, but usually 1.0s (or 5.0s for the Django database transport, which can also be set using the KOMBU_POLLING_INTERVAL setting).

  • Adds convenience function: kombu.common.eventloop().

2.0.0

release-date:2012-01-15 18:34 P.M GMT
by:Ask Solem

Important Notes

Python Compatibility

  • No longer supports Python 2.4.

    Users of Python 2.4 can still use the 1.x series.

    The 1.x series has entered bugfix-only maintenance mode, and will stay that way as long as there is demand, and a willingness to maintain it.

New Transports

  • django-kombu is now part of Kombu core.

    The Django message transport uses the Django ORM to store messages.

    It uses polling, with a default polling interval of 5 seconds. The polling interval can be increased or decreased by configuring the KOMBU_POLLING_INTERVAL Django setting, which is the polling interval in seconds as an int or a float. Note that shorter polling intervals can cause extreme strain on the database: if responsiveness is needed you shall consider switching to a non-polling transport.

    To use it you must use transport alias "django", or as an URL:

    django://

    and then add kombu.transport.django to INSTALLED_APPS, and run manage.py syncdb to create the necessary database tables.

    Upgrading

    If you have previously used django-kombu, then the entry in INSTALLED_APPS must be changed from djkombu to kombu.transport.django:

    INSTALLED_APPS = (…,
                      "kombu.transport.django")

    If you have previously used django-kombu, then there is no need to recreate the tables, as the old tables will be fully compatible with the new version.

  • kombu-sqlalchemy is now part of Kombu core.

    This change requires no code changes given that the sqlalchemy transport alias is used.

News

  • kombu.mixins.ConsumerMixin is a mixin class that lets you easily write consumer programs and threads.

    See Examples and Consumers.

  • SQS Transport: Added support for SQS queue prefixes (Issue #84).

    The queue prefix can be set using the transport option queue_name_prefix:

    BrokerTransport("SQS://", transport_options={
        "queue_name_prefix": "myapp"})
    

    Contributed by Nitzan Miron.

  • Producer.publish now supports automatic retry.

    Retry is enabled by the reply argument, and retry options set by the retry_policy argument:

    exchange = Exchange("foo")
    producer.publish(message, exchange=exchange, retry=True,
                     declare=[exchange], retry_policy={
                        "interval_start": 1.0})
    

    See ensure() for a list of supported retry policy options.

  • Producer.publish now supports a declare keyword argument.

    This is a list of entities (Exchange, or Queue) that should be declared before the message is published.

Fixes

  • Redis transport: Timeout was multiplied by 1000 seconds when using select for event I/O (Issue #86).

1.5.1

release-date:2011-11-30 01:00 P.M GMT
by:Ask Solem
  • Fixes issue with kombu.compat introduced in 1.5.0 (Issue #83).

  • Adds the ability to disable content_types in the serializer registry.

    Any message with a content type that is disabled will be refused. One example would be to disable the Pickle serializer:

    >>> from kombu.serialization import registry
    # by name
    >>> registry.disable("pickle")
    # or by mime-type.
    >>> registry.disable("application/x-python-serialize")
    

1.5.0

release-date:2011-11-27 06:00 P.M GMT
by:Ask Solem
  • kombu.pools: Fixed a bug resulting in resources not being properly released.

    This was caused by the use of __hash__ to distinguish them.

  • Virtual transports: Dead-letter queue is now disabled by default.

    The dead-letter queue was enabled by default to help application authors, but now that Kombu is stable it should be removed. There are after all many cases where messages should just be dropped when there are no queues to buffer them, and keeping them without supporting automatic cleanup is rather considered a resource leak than a feature.

    If wanted the dead-letter queue can still be enabled, by using the deadletter_queue transport option:

    >>> x = BrokerConnection("redis://",
    ...       transport_options={"deadletter_queue": "ae.undeliver"})
    

    In addition, an UndeliverableWarning is now emitted when the dead-letter queue is enabled and a message ends up there.

    Contributed by Ionel Maries Cristian.

  • MongoDB transport now supports Replicasets (Issue #81).

    Contributed by Ivan Metzlar.

  • The Connection.ensure methods now accepts a max_retries value of 0.

    A value of 0 now means do not retry, which is distinct from None which means retry indefinitely.

    Contributed by Dan McGee.

  • SQS Transport: Now has a lowercase sqs alias, so that it can be used with broker URLs (Issue #82).

    Fix contributed by Hong Minhee

  • SQS Transport: Fixes KeyError on message acknowledgements (Issue #73).

    The SQS transport now uses UUID’s for delivery tags, rather than a counter.

    Fix contributed by Brian Bernstein.

  • SQS Transport: Unicode related fixes (Issue #82).

    Fix contributed by Hong Minhee.

  • Redis version check could crash because of improper handling of types (Issue #63).

  • Fixed error with Resource.force_close_all when resources were not yet properly initialized (Issue #78).

1.4.3

release-date:2011-10-27 10:00 P.M BST
  • Fixes bug in ProducerPool where too many resources would be acquired.

1.4.2

release-date:2011-10-26 05:00 P.M BST
by:Ask Solem
  • Eventio: Polling should ignore errno.EINTR

  • SQS: str.encode did only start accepting kwargs after Py2.7.

  • simple_task_queue example didn’t run correctly (Issue #72).

    Fix contributed by Stefan Eletzhofer.

  • Empty messages would not raise an exception not able to be handled by on_decode_error (Issue #72)

    Fix contributed by Christophe Chauvet.

  • CouchDB: Properly authenticate if user/password set (Issue #70)

    Fix contributed by Rafael Duran Castaneda

  • BrokerConnection.Consumer had the wrong signature.

    Fix contributed by Pavel Skvazh

1.4.1

release-date:2011-09-26 04:00 P.M BST
by:Ask Solem
  • 1.4.0 broke the producer pool, resulting in new connections being established for every acquire.

1.4.0

release-date:2011-09-22 05:00 P.M BST
by:Ask Solem
  • Adds module kombu.mixins.

    This module contains a ConsumerMixin class that can be used to easily implement a message consumer thread that consumes messages from one or more kombu.messaging.Consumer instances.

  • New example: Task Queue Example

    Using the ConsumerMixin, default channels and the global connection pool to demonstrate new Kombu features.

  • MongoDB transport did not work with MongoDB >= 2.0 (Issue #66)

    Fix contributed by James Turk.

  • Redis-py version check did not account for beta identifiers in version string.

    Fix contributed by David Ziegler.

  • Producer and Consumer now accepts a connection instance as the first argument.

    The connections default channel will then be used.

    In addition shortcut methods has been added to BrokerConnection:

    >>> connection.Producer(exchange)
    >>> connection.Consumer(queues=..., callbacks=...)
    
  • BrokerConnection has aquired a connected attribute that can be used to check if the connection instance has established a connection.

  • ConnectionPool.acquire_channel now returns the connections default channel rather than establising a new channel that must be manually handled.

  • Added kombu.common.maybe_declare

    maybe_declare(entity) declares an entity if it has not previously been declared in the same process.

  • kombu.compat.entry_to_queue() has been moved to kombu.common

  • New module kombu.clocks now contains an implementation of Lamports logical clock.

1.3.5

release-date:2011-09-16 06:00 P.M BST
by:Ask Solem
  • Python 3: AMQP_PROTOCOL_HEADER must be bytes, not str.

1.3.4

release-date:2011-09-16 06:00 P.M BST
by:Ask Solem
  • Fixes syntax error in pools.reset

1.3.3

release-date:2011-09-15 02:00 P.M BST
by:Ask Solem
  • pools.reset did not support after forker arguments.

1.3.2

release-date:2011-09-10 01:00 P.M BST
by:Mher Movsisyan
  • Broke Python 2.5 compatibility by importing parse_qsl from urlparse
  • Connection.default_channel is now closed when connection is revived after connection failures.
  • Pika: Channel now supports the connection.client attribute as required by the simple interface.
  • pools.set_limit now raises an exception if the limit is lower than the previous limit.
  • pools.set_limit no longer resets the pools.

1.3.1

release-date:2011-10-07 03:00 P.M BST
  • Last release broke after fork for pool reinitialization.

  • Producer/Consumer now has a connection attribute, giving access to the BrokerConnection of the instance.

  • Pika: Channels now have access to the underlying BrokerConnection instance using channel.connection.client.

    This was previously required by the Simple classes and is now also required by Consumer and Producer.

  • Connection.default_channel is now closed at object revival.

  • Adds kombu.clocks.LamportClock.

  • compat.entry_to_queue has been moved to new module kombu.common.

1.3.0

release-date:2011-10-05 01:00 P.M BST
  • Broker connection info can be now be specified using URLs

    The broker hostname can now be given as an URL instead, of the format:

    transport://user:password@hostname:port/virtual_host

    for example the default broker is expressed as:

    >>> BrokerConnection("amqp://guest:guest@localhost:5672//")
    

    Transport defaults to amqp, and is not required. user, password, port and virtual_host is also not mandatory and will default to the corresponding transports default.

    Note

    Note that the path component (virtual_host) always starts with a forward-slash. This is necessary to distinguish between the virtual host ‘’ (empty) and ‘/’, which are both acceptable virtual host names.

    A virtual host of ‘/’ becomes:

    amqp://guest:guest@localhost:5672//

    and a virtual host of ‘’ (empty) becomes:

    amqp://guest:guest@localhost:5672/

    So the leading slash in the path component is always required.

  • Now comes with default global connection and producer pools.

    The acquire a connection using the connection parameters from a BrokerConnection:

    >>> from kombu import BrokerConnection, connections
    >>> connection = BrokerConnection("amqp://guest:guest@localhost//")
    >>> with connections[connection].acquire(block=True):
    ...     # do something with connection
    

    To acquire a producer using the connection parameters from a BrokerConnection:

    >>> from kombu import BrokerConnection, producers
    >>> connection = BrokerConnection("amqp://guest:guest@localhost//")
    >>> with producers[connection].acquire(block=True):
    ...     producer.publish({"hello": "world"}, exchange="hello")
    

    Acquiring a producer will in turn also acquire a connection from the associated pool in connections, so you the number of producers is bound the same limit as number of connections.

    The default limit of 100 connections per connection instance can be changed by doing:

    >>> from kombu import pools
    >>> pools.set_limit(10)
    

    The pool can also be forcefully closed by doing:

    >>> from kombu import pools
    >>> pool.reset()
    
  • SQS Transport: Persistence using SimpleDB is now disabled by default, after reports of unstable SimpleDB connections leading to errors.

  • Producer can now be used as a context manager.

  • Producer.__exit__ now properly calls release instead of close.

    The previous behavior would lead to a memory leak when using the kombu.pools.ProducerPool

  • Now silences all exceptions from import ctypes to match behaviour of the standard Python uuid module, and avoid passing on MemoryError exceptions on SELinux-enabled systems (Issue #52 + Issue #53)

  • amqp is now an alias to the amqplib transport.

  • kombu.syn.detect_environment now returns ‘default’, ‘eventlet’, or ‘gevent’ depending on what monkey patches have been installed.

  • Serialization registry has new attribute type_to_name so it is possible to lookup serializater name by content type.

  • Exchange argument to Producer.publish can now be an Exchange instance.

  • compat.Publisher now supports the channel keyword argument.

  • Acking a message on some transports could lead to KeyError being raised (Issue #57).

  • Connection pool: Connections are no long instantiated when the pool is created, but instantiated as needed instead.

  • Tests now pass on PyPy.

  • Connection.as_uri now includes the password if the keyword argument include_password is set.

  • Virtual transports now comes with a default default_connection_params attribute.

1.2.1

release-date:2011-07-29 12:52 P.M BST
  • Now depends on amqplib >= 1.0.0.

  • Redis: Now automatically deletes auto_delete queues at basic_cancel.

  • serialization.unregister added so it is possible to remove unwanted seralizers.

  • Fixes MemoryError while importing ctypes on SELinux (Issue #52).

  • BrokerConnection.autoretry is a version of ensure that works with arbitrary functions (i.e. it does not need an associated object that implements the revive method.

    Example usage:

    channel = connection.channel()
    try:
        ret, channel = connection.autoretry(send_messages, channel=channel)
    finally:
        channel.close()
    
  • ConnectionPool.acquire no longer force establishes the connection.

    The connection will be established as needed.

  • BrokerConnection.ensure now supports an on_revive callback that is applied whenever the connection is re-established.

  • Consumer.consuming_from(queue) returns True if the Consumer is consuming from queue.

  • Consumer.cancel_by_queue did not remove the queue from queues.

  • compat.ConsumerSet.add_queue_from_dict now automatically declared the queue if auto_declare set.

1.2.0

release-date:2011-07-15 12:00 P.M BST
  • Virtual: Fixes cyclic reference in Channel.close (Issue #49).
  • Producer.publish: Can now set additional properties using keyword arguments (Issue #48).
  • Adds Queue.no_ack option to control the no_ack option for individual queues.
  • Recent versions broke pylibrabbitmq support.
  • SimpleQueue and SimpleBuffer can now be used as contexts.
  • Test requirements specifies PyYAML==3.09 as 3.10 dropped Python 2.4 support
  • Now properly reports default values in Connection.info/.as_uri

1.1.6

release-date:2011-06-13 04:00 P.M BST
  • Redis: Fixes issue introduced in 1.1.4, where a redis connection failure could leave consumer hanging forever.

  • SQS: Now supports fanout messaging by using SimpleDB to store routing tables.

    This can be disabled by setting the supports_fanout transport option:

    >>> BrokerConnection(transport="SQS",
    ...                  transport_options={"supports_fanout": False})
    
  • SQS: Now properly deletes a message when a message is acked.

  • SQS: Can now set the Amazon AWS region, by using the region transport option.

  • amqplib: Now uses localhost as default hostname instead of raising an error.

1.1.5

release-date:2011-06-07 06:00 P.M BST
  • Fixes compatibility with redis-py 2.4.4.

1.1.4

release-date:2011-06-07 04:00 P.M BST
  • Redis transport: Now requires redis-py version 2.4.4 or later.

  • New Amazon SQS transport added.

    Usage:

    >>> conn = BrokerConnection(transport="SQS",
    ...                         userid=aws_access_key_id,
    ...                         password=aws_secret_access_key)
    

    The environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are also supported.

  • librabbitmq transport: Fixes default credentials support.

  • amqplib transport: Now supports login_method for SSL auth.

    BrokerConnection now supports the login_method keyword argument.

    Default login_method is AMQPLAIN.

1.1.3

release-date:2011-04-21 16:00 P.M CEST
  • Redis: Consuming from multiple connections now works with Eventlet.

  • Redis: Can now perform channel operations while the channel is in BRPOP/LISTEN mode (Issue #35).

    Also the async BRPOP now times out after 1 second, this means that cancelling consuming from a queue/starting consuming from additional queues has a latency of up to one second (BRPOP does not support subsecond timeouts).

  • Virtual: Allow channel objects to be closed multiple times without error.

  • amqplib: AttributeError has been added to the list of known connection related errors (Connection.connection_errors).

  • amqplib: Now converts SSLError timeout errors to socket.timeout (http://bugs.python.org/issue10272)

  • Ensures cyclic references are destroyed when the connection is closed.

1.1.2

release-date:2011-04-06 16:00 P.M CEST
  • Redis: Fixes serious issue where messages could be lost.

    The issue could happen if the message exceeded a certain number of kilobytes in size.

    It is recommended that all users of the Redis transport should upgrade to this version, even if not currently experiencing any issues.

1.1.1

release-date:2011-04-05 15:51 P.M CEST
  • 1.1.0 started using Queue.LifoQueue which is only available in Python 2.6+ (Issue #33). We now ship with our own LifoQueue.

1.1.0

release-date:2011-04-05 01:05 P.M CEST

Important Notes

  • Virtual transports: Message body is now base64 encoded by default (Issue #27).

    This should solve problems sending binary data with virtual transports.

    Message compatibility is handled by adding a body_encoding property, so messages sent by older versions is compatible with this release. However – If you are accessing the messages directly not using Kombu, then you have to respect the body_encoding property.

    If you need to disable base64 encoding then you can do so via the transport options:

    BrokerConnection(transport="...",
                     transport_options={"body_encoding": None})
    

    For transport authors:

    You don’t have to change anything in your custom transports, as this is handled automatically by the base class.

    If you want to use a different encoder you can do so by adding a key to Channel.codecs. Default encoding is specified by the Channel.body_encoding attribute.

    A new codec must provide two methods: encode(data) and decode(data).

  • ConnectionPool/ChannelPool/Resource: Setting limit=None (or 0) now disables pool semantics, and will establish and close the resource whenever acquired or released.

  • ConnectionPool/ChannelPool/Resource: Is now using a LIFO queue instead of the previous FIFO behavior.

    This means that the last resource released will be the one acquired next. I.e. if only a single thread is using the pool this means only a single connection will ever be used.

  • BrokerConnection: Cloned connections did not inherit transport_options (__copy__).

  • contrib/requirements is now located in the top directory of the distribution.

  • MongoDB: Now supports authentication using the userid and password arguments to BrokerConnection (Issue #30).

  • BrokerConnection: Default autentication credentials are now delegated to the individual transports.

    This means that the userid and password arguments to BrokerConnection is no longer guest/guest by default.

    The amqplib and pika transports will still have the default credentials.

  • Consumer.__exit__() did not have the correct signature (Issue #32).

  • Channel objects now have a channel_id attribute.

  • MongoDB: Version sniffing broke with development versions of

    mongod (Issue #29).

  • New environment variable KOMBU_LOG_CONNECTION will now emit debug log messages for connection related actions.

    KOMBU_LOG_DEBUG will also enable KOMBU_LOG_CONNECTION.

1.0.7

release-date:2011-03-28 05:45 P.M CEST
  • Now depends on anyjson 0.3.1

    cjson is no longer a recommended json implementation, and anyjson will now emit a deprecation warning if used.

  • Please note that the Pika backend only works with version 0.5.2.

    The latest version (0.9.x) drastically changed API, and it is not compatible yet.

  • on_decode_error is now called for exceptions in message_to_python (Issue #24).

  • Redis: did not respect QoS settings.

  • Redis: Creating a connection now ensures the connection is established.

    This means BrokerConnection.ensure_connection works properly with Redis.

  • consumer_tag argument to Queue.consume can’t be None (Issue #21).

    A None value is now automatically converted to empty string. An empty string will make the server generate a unique tag.

  • BrokerConnection now supports a transport_options argument.

    This can be used to pass additional arguments to transports.

  • Pika: drain_events raised socket.timeout even if no timeout set (Issue #8).

1.0.6

release-date:2011-03-22 04:00 P.M CET
  • The delivery_mode aliases (persistent/transient) were not automatically converted to integer, and would cause a crash if using the amqplib transport.

  • Redis: The redis-py InvalidData exception suddenly changed name to DataError.

  • The KOMBU_LOG_DEBUG environment variable can now be set to log all channel method calls.

    Support for the following environment variables have been added:

    • KOMBU_LOG_CHANNEL will wrap channels in an object that logs every method call.
    • KOMBU_LOG_DEBUG both enables channel logging and configures the root logger to emit messages to standard error.

    Example Usage:

    $ KOMBU_LOG_DEBUG=1 python
    >>> from kombu import BrokerConnection
    >>> conn = BrokerConnection()
    >>> channel = conn.channel()
    Start from server, version: 8.0, properties:
        {u'product': 'RabbitMQ',..............  }
    Open OK! known_hosts []
    using channel_id: 1
    Channel open
    >>> channel.queue_declare("myq", passive=True)
    [Kombu channel:1] queue_declare('myq', passive=True)
    (u'myq', 0, 1)

1.0.5

release-date:2011-03-17 04:00 P.M CET
  • Fixed memory leak when creating virtual channels. All virtual transports affected (redis, mongodb, memory, django, sqlalchemy, couchdb, beanstalk).

  • Virtual Transports: Fixed potential race condition when acking messages.

    If you have been affected by this, the error would show itself as an exception raised by the OrderedDict implementation. (object no longer exists).

  • MongoDB transport requires the findandmodify command only available in MongoDB 1.3+, so now raises an exception if connected to an incompatible server version.

  • Virtual Transports: basic.cancel should not try to remove unknown consumer tag.

1.0.4

release-date:2011-02-28 04:00 P.M CET
  • Added Transport.polling_interval

    Used by django-kombu to increase the time to sleep between SELECTs when there are no messages in the queue.

    Users of django-kombu should upgrade to django-kombu v0.9.2.

1.0.3

release-date:2011-02-12 04:00 P.M CET
  • ConnectionPool: Re-connect if amqplib connection closed
  • Adds Queue.as_dict + Exchange.as_dict.
  • Copyright headers updated to include 2011.

1.0.2

release-date:2011-01-31 10:45 P.M CET
  • amqplib: Message properties were not set properly.
  • Ghettoq backend names are now automatically translated to the new names.

1.0.1

release-date:2011-01-28 12:00 P.M CET
  • Redis: Now works with Linux (epoll)

1.0.0

release-date:2011-01-27 12:00 P.M CET
  • Initial release

0.1.0

release-date:2010-07-22 04:20 P.M CET
  • Initial fork of carrot