← Back to team overview

openstack team mailing list archive

[ANN] Kombu acquires support for AMQP heartbeats and cancel notifications

 

Dear list,

I believe this is of interest to the Openstack people.

I've just started maintaining a fork of the amqplib library,
that differs by using AMQP 0.9.1 instead of 0.8, and that it supports
heartbeats and the RabbitMQ extensions (consumer cancel notifications,
publisher confirms and more).

Heartbeats are used to detect if a connection has been closed (on either end),
and is sometimes required in environments where network intermediates
complicates connection loss detection (like certain firewalls).

Consumer cancel notifications are important for HA, as it's the only
way RabbitMQ can tell consumers that the queue it's consuming from went away.

As you may or may not know, the amqp:// transport in Kombu now
uses two different underlying libraries:

 1) librabbitmq if installed

      Python extension written in C (using the rabbitmq-c library)
      http://github.com/celery/librabbitmq

 2) amqplib as a fallback.

The new client (http://pypi.python.org/pypi/amqp) will be the default fallback
in 3.0 after librabbitmq has also been updated to support the new features.

So for now if you requires these features you have to manually switch to the
new client, luckily that's as easy as installing kombu 2.3 or later,
+ the 'amqp' library:

 $ pip install kombu>=2.3 amqp

then setting the broker URL to use 'pyamqp://' (or setting the broker transport
to be 'pyamqp'):

 from kombu import Connection
 conn = Connection('pyamqp://guest:guest@localhost://')

There's more information in the Kombu 2.3 changelog:
http://kombu.readthedocs.org/en/latest/changelog.html


NOTE TO IMPLEMENTORS:

- Consumer cancel notifications

Requires no changes to your code,
all you need is to properly reconnect when one of the
errors in Connection.channel_errors occur, which is handled
automatically by Connection.ensure / Connection.autoretry (I don't believe
Nova uses that, but it probably should).

- Heartbeats

For heartbeats to be used you need to periodically call the
Connection.heartbeat_check method at regular intervals (
suggested is twice the rate of the configured heartbeat).

This shouldn't be any problem for Nova since it's using
Eventlet. Special care must be taken so that the heartbeat value
is not specified so low that blocking calls can defer heartbeats
being sent out.

An example of enabling heartbeats with eventlet could be:

import weakref
from kombu import Connection
from eventlet import spawn_after

def monitor_heartbeats(connection, rate=2):
if not connection.heartbeat:
    return
interval = connection.heartbeat / 2.0
cref = weakref.ref(connection)

def heartbeat_check():
    conn = cref()
    if conn is not None and conn.connected:
        conn.heartbeat_check(rate=rate)
        spawn_after(interval, heartbeat_check)

 return spawn_after(interval, heartbeat_check)

connection = Connection('pyamq://', heartbeat=10)

or:

connection = Connection('pyamqp://?heartbeat=10')


If you have any questions about implementing this,
or about Kombu in general then please don't hesitate to contact
me on this list, on twitter @asksol, or on IRC (also asksol)


Regards,

-- 
Ask Solem
twitter.com/asksol | +44 (0)7713357179



-- 
Ask Solem
twitter.com/asksol | +44 (0)7713357179



Follow ups