kombu.connection

Broker connection and pools.

Connection

class kombu.connection.Connection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs)

A connection to the broker.

Parameters:URL – Broker URL, or a list of URLs, e.g.
Connection('amqp://guest:guest@localhost:5672//')
Connection('amqp://foo;amqp://bar', failover_strategy='round-robin')
Connection('redis://', transport_options={
    'visibility_timeout': 3000,
})

import ssl
Connection('amqp://', login_method='EXTERNAL', ssl={
    'ca_certs': '/etc/pki/tls/certs/something.crt',
    'keyfile': '/etc/something/system.key',
    'certfile': '/etc/something/system.cert',
    'cert_reqs': ssl.CERT_REQUIRED,
})

SSL compatibility

SSL currently only works with the py-amqp & amqplib transports. For other transports you can use stunnel.

Parameters:
  • hostname – Default host name/address if not provided in the URL.
  • userid – Default user name if not provided in the URL.
  • password – Default password if not provided in the URL.
  • virtual_host – Default virtual host if not provided in the URL.
  • port – Default port if not provided in the URL.
  • ssl – Use SSL to connect to the server. Default is False. May not be supported by the specified transport.
  • transport – Default transport if not specified in the URL.
  • connect_timeout – Timeout in seconds for connecting to the server. May not be supported by the specified transport.
  • transport_options – A dict of additional connection arguments to pass to alternate kombu channel implementations. Consult the transport documentation for available options.
  • heartbeat – Heartbeat interval in int/float seconds. Note that if heartbeats are enabled then the heartbeat_check() method must be called regularly, around once per second.

Note

The connection is established lazily when needed. If you need the connection to be established, then force it by calling connect():

>>> conn = Connection('amqp://')
>>> conn.connect()

and always remember to close the connection:

>>> conn.release()
ChannelPool(limit=None, preload=None)

Pool of channels.

See ChannelPool.

Parameters:
  • limit – Maximum number of active channels. Default is no limit.
  • preload – Number of channels to preload when the pool is created. Default is 0.

Example usage:

>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
Consumer(queues=None, channel=None, *args, **kwargs)

Create new kombu.Consumer instance using this connection.

Pool(limit=None, preload=None)

Pool of connections.

See ConnectionPool.

Parameters:
  • limit – Maximum number of active connections. Default is no limit.
  • preload – Number of connections to preload when the pool is created. Default is 0.

Example usage:

>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
Producer(channel=None, *args, **kwargs)

Create new kombu.Producer instance using this connection.

SimpleBuffer(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue using a channel from this connection.

Same as SimpleQueue(), but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not persistent), and acknowledgements are disabled (no_ack).

SimpleQueue(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue, using a channel from this connection.

If name is a string, a queue and exchange will be automatically created using that name as the name of the queue and exchange, also it will be used as the default routing key.

Parameters:
  • name – Name of the queue/or a Queue.
  • no_ack – Disable acknowledgements. Default is false.
  • queue_opts – Additional keyword arguments passed to the constructor of the automatically created Queue.
  • exchange_opts – Additional keyword arguments passed to the constructor of the automatically created Exchange.
  • channel – Custom channel to use. If not specified the connection default channel is used.
as_uri(include_password=False, mask='**', getfields=<operator.itemgetter object at 0x7ff242b43150>)

Convert connection parameters to URL form.

autoretry(fun, channel=None, **ensure_options)

Decorator for functions supporting a channel keyword argument.

The resulting callable will retry calling the function if it raises connection or channel related errors. The return value will be a tuple of (retval, last_created_channel).

If a channel is not provided, then one will be automatically acquired (remember to close it afterwards).

See ensure() for the full list of supported keyword arguments.

Example usage:

channel = connection.channel()
try:
    ret, channel = connection.autoretry(publish_messages, channel)
finally:
    channel.close()
channel()

Create and return a new channel.

channel_errors

List of exceptions that may be raised by the channel.

clone(**kwargs)

Create a copy of the connection with the same connection settings.

close()

Close the connection (if open).

collect(socket_timeout=None)
completes_cycle(retries)

Return true if the cycle is complete after number of retries.

connect()

Establish connection to server immediately.

connect_timeout = 5
connected

Return true if the connection has been established.

connection

The underlying connection object.

Warning

This instance is transport specific, so do not depend on the interface of this object.

connection_errors

List of exceptions that may be raised by the connection.

create_transport()
cycle = None

Iterator returning the next broker URL to try in the event of connection failure (initialized by failover_strategy).

declared_entities = None

The cache of declared entities is per connection, in case the server loses data.

default_channel

Default channel, created upon access and closed when the connection is closed.

Can be used for automatic channel handling when you only need one channel, and also it is the channel implicitly used if a connection is passed instead of a channel, to functions that require a channel.

drain_events(**kwargs)

Wait for a single event from the server.

Parameters:timeout – Timeout in seconds before we give up.

:raises socket.timeout: if the timeout is exceeded.

ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None)

Ensure operation completes, regardless of any channel/connection errors occurring.

Will retry by establishing the connection, and reapplying the function.

Parameters:
  • fun – Method to apply.
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.

Example

This is an example ensuring a publish operation:

>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)

>>> def errback(exc, interval):
...     logger.error('Error: %r', exc, exc_info=1)
...     logger.info('Retry in %s seconds.', interval)

>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
ensure_connection(errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30, callback=None)

Ensure we have a connection to the server.

If not retry establishing the connection with the settings specified.

Parameters:
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.
  • callback – Optional callback that is called for every internal iteration (1 s)
failover_strategy = 'round-robin'

Strategy used to select new hosts when reconnecting after connection failure. One of “round-robin”, “shuffle” or any custom iterator constantly yielding new URLs to try.

get_heartbeat_interval()
get_manager(*args, **kwargs)
get_transport_cls()

Get the currently used transport class.

heartbeat = None

Heartbeat value, currently only supported by the py-amqp transport.

heartbeat_check(rate=2)

Allow the transport to perform any periodic tasks required to make heartbeats work. This should be called approximately every second.

If the current transport does not support heartbeats then this is a noop operation.

Parameters:rate – Rate is how often the tick is called compared to the actual heartbeat value. E.g. if the heartbeat is set to 3 seconds, and the tick is called every 3 / 2 seconds, then the rate is 2. This value is currently unused by any transports.
host

The host as a host name/port pair separated by colon.

hostname = None
info()

Get connection info.

is_evented
login_method = None
manager

Experimental manager that can be used to manage/monitor the broker instance. Not available for all transports.

maybe_close_channel(channel)

Close given channel, but ignore connection and channel errors.

maybe_switch_next()

Switch to next URL given by the current failover strategy (if any).

password = None
port = None
qos_semantics_matches_spec
recoverable_channel_errors

List of channel related exceptions that can be automatically recovered from without re-establishing the connection.

recoverable_connection_errors

List of connection related exceptions that can be recovered from, but where the connection must be closed and re-established first.

register_with_event_loop(loop)
release()

Close the connection (if open).

revive(new_channel)

Revive connection after connection re-established.

ssl = None
supports_heartbeats
switch(url)

Switch connection parameters to use a new URL (does not reconnect)

transport
transport_options = None

Additional transport specific options, passed on to the transport instance.

uri_prefix = None
userid = None
virtual_host = '/'

Pools

See also

The shortcut methods Connection.Pool() and Connection.ChannelPool() is the recommended way to instantiate these classes.

class kombu.connection.ConnectionPool(connection, limit=None, preload=None)
LimitExceeded = <class 'kombu.exceptions.ConnectionLimitExceeded'>
acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)
force_close_all()

Close and remove all resources in the pool (also those in use).

Can be used to close resources from parent processes after fork (e.g. sockets/connections).

class kombu.connection.ChannelPool(connection, limit=None, preload=None)
LimitExceeded = <class 'kombu.exceptions.ChannelLimitExceeded'>
acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)
force_close_all()

Close and remove all resources in the pool (also those in use).

Can be used to close resources from parent processes after fork (e.g. sockets/connections).

Table Of Contents

Previous topic

Logging - kombu.log

Next topic

Message Objects - kombu.message

This Page