← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~allenap/launchpad/longpoll-dead-rabbit into lp:launchpad

 

Gavin Panella has proposed merging lp:~allenap/launchpad/longpoll-dead-rabbit into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~allenap/launchpad/longpoll-dead-rabbit/+merge/76774

This adds a new abstraction around the bits and bobs in
lp.services.messaging, IMessageSession. This defines things like
connect, disconnect, flush, getConsumer and getProducer.

getConsumer returns an instance of RabbitQueue and getProducer returns
an instance of RabbitRoutingKey. Nothing outside of the messaging
package instantiates these objects any more; they are created via the
session.

Two implementations of IMessageSession are available, RabbitSession
and RabbitUnreliableSession. The latter is a subclass of the former,
and an instance of it is registered as the default IMessageSession
utility. This will allow us to selectively choose different
abstractions depending on the context we're running in.

RabbitUnreliableSession is unreliable in the sense that it does not
guarantee to send the *deferred* messages (see IMessageProducer later)
it's handed or to tell you that it has failed. This will help us cope
when Rabbit goes down or is not available.

RabbitSession - and by extension RabbitUnreliableSession - is a
subclass of threading.local, which means that __init__() is called
once for each thread that the object is used in, the first time the
object is referenced in any way, and the instance dict is different
for each thread it's used it.

RabbitRoutingKey implements IMessageProducer, which has changed to
provide deferred and immediate versions to send messages (send and
sendNow) and to associate consumers (associateConsumer and
associateConsumerNow).

It might actually be worth splitting the deferred and immediate
features by IMessageSession, i.e. having one session that defers
everything and one that does everything immediately. But this works
for now and can be changed later as we learn more.

-- 
https://code.launchpad.net/~allenap/launchpad/longpoll-dead-rabbit/+merge/76774
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/longpoll-dead-rabbit into lp:launchpad.
=== modified file 'lib/lp/app/longpoll/tests/test_longpoll.py'
--- lib/lp/app/longpoll/tests/test_longpoll.py	2011-09-16 15:01:47 +0000
+++ lib/lp/app/longpoll/tests/test_longpoll.py	2011-09-23 16:28:26 +0000
@@ -5,7 +5,10 @@
 
 __metaclass__ = type
 
-from zope.component import adapts
+from zope.component import (
+    adapts,
+    getUtility,
+    )
 from zope.interface import (
     Attribute,
     implements,
@@ -22,10 +25,7 @@
     ILongPollEvent,
     ILongPollSubscriber,
     )
-from lp.services.messaging.queue import (
-    RabbitQueue,
-    RabbitRoutingKey,
-    )
+from lp.services.messaging.interfaces import IMessageSession
 from lp.testing import TestCase
 from lp.testing.fixture import ZopeAdapterFixture
 
@@ -59,7 +59,9 @@
 
     def emit(self, data):
         # Don't cargo-cult this; see .adapters.event.LongPollEvent instead.
-        RabbitRoutingKey(self.event_key).send_now(data)
+        session = getUtility(IMessageSession)
+        producer = session.getProducer(self.event_key)
+        producer.sendNow(data)
 
 
 class TestFunctions(TestCase):
@@ -71,18 +73,20 @@
         # and the ILongPollSubscriber for the given request (or the current
         # request is discovered). It subscribes the latter to the event, then
         # returns the event.
+        session = getUtility(IMessageSession)
         request = LaunchpadTestRequest()
         an_object = FakeObject(12345)
         with ZopeAdapterFixture(FakeEvent):
             event = subscribe(an_object, "foo", request=request)
         self.assertIsInstance(event, FakeEvent)
         self.assertEqual("event-key-12345-foo", event.event_key)
+        session.flush()
         # Emitting an event-key-12345-foo event will put something on the
         # subscriber's queue.
         event_data = {"1234": 5678}
         event.emit(event_data)
         subscriber = ILongPollSubscriber(request)
-        subscribe_queue = RabbitQueue(subscriber.subscribe_key)
+        subscribe_queue = session.getConsumer(subscriber.subscribe_key)
         message = subscribe_queue.receive(timeout=5)
         self.assertEqual(event_data, message)
 
@@ -93,9 +97,10 @@
         an_object = FakeObject(12345)
         with ZopeAdapterFixture(FakeEvent):
             event = emit(an_object, "bar", {})
-            routing_key = RabbitRoutingKey(event.event_key)
-            subscribe_queue = RabbitQueue("whatever")
-            routing_key.associateConsumer(subscribe_queue)
+            session = getUtility(IMessageSession)
+            producer = session.getProducer(event.event_key)
+            subscribe_queue = session.getConsumer("whatever")
+            producer.associateConsumerNow(subscribe_queue)
             # Emit the event again; the subscribe queue was not associated
             # with the event before now.
             event_data = {"8765": 4321}

=== modified file 'lib/lp/services/configure.zcml'
--- lib/lp/services/configure.zcml	2011-09-16 14:59:47 +0000
+++ lib/lp/services/configure.zcml	2011-09-23 16:28:26 +0000
@@ -14,6 +14,7 @@
   <include package=".longpoll" />
   <include package=".memcache" />
   <include package=".messages" />
+  <include package=".messaging" />
   <include package=".openid" />
   <include package=".profile" />
   <include package=".salesforce" />

=== modified file 'lib/lp/services/longpoll/adapters/event.py'
--- lib/lp/services/longpoll/adapters/event.py	2011-09-16 14:41:21 +0000
+++ lib/lp/services/longpoll/adapters/event.py	2011-09-23 16:28:26 +0000
@@ -9,10 +9,14 @@
     "LongPollEvent",
     ]
 
-from lp.services.messaging.queue import RabbitRoutingKey
-
-
-router_factory = RabbitRoutingKey
+from zope.component import getUtility
+
+from lp.services.messaging.interfaces import IMessageSession
+
+
+def router_factory(event_key):
+    """Get a router for the given `event_key`."""
+    return getUtility(IMessageSession).getProducer(event_key)
 
 
 def generate_event_key(*components):

=== modified file 'lib/lp/services/longpoll/adapters/subscriber.py'
--- lib/lp/services/longpoll/adapters/subscriber.py	2011-09-16 14:59:47 +0000
+++ lib/lp/services/longpoll/adapters/subscriber.py	2011-09-23 16:28:26 +0000
@@ -12,15 +12,15 @@
 from uuid import uuid4
 
 from lazr.restful.interfaces import IJSONRequestCache
-from zope.component import adapts
+from zope.component import (
+    adapts,
+    getUtility,
+    )
 from zope.interface import implements
 from zope.publisher.interfaces import IApplicationRequest
 
 from lp.services.longpoll.interfaces import ILongPollSubscriber
-from lp.services.messaging.queue import (
-    RabbitQueue,
-    RabbitRoutingKey,
-    )
+from lp.services.messaging.interfaces import IMessageSession
 
 
 def generate_subscribe_key():
@@ -50,7 +50,8 @@
                 "key": generate_subscribe_key(),
                 "subscriptions": [],
                 }
-        subscribe_queue = RabbitQueue(self.subscribe_key)
-        routing_key = RabbitRoutingKey(event.event_key)
-        routing_key.associateConsumer(subscribe_queue)
+        session = getUtility(IMessageSession)
+        subscribe_queue = session.getConsumer(self.subscribe_key)
+        producer = session.getProducer(event.event_key)
+        producer.associateConsumer(subscribe_queue)
         cache.objects["longpoll"]["subscriptions"].append(event.event_key)

=== modified file 'lib/lp/services/longpoll/adapters/tests/test_event.py'
--- lib/lp/services/longpoll/adapters/tests/test_event.py	2011-09-16 14:59:47 +0000
+++ lib/lp/services/longpoll/adapters/tests/test_event.py	2011-09-23 16:28:26 +0000
@@ -16,7 +16,10 @@
     LongPollEvent,
     )
 from lp.services.longpoll.interfaces import ILongPollEvent
-from lp.services.messaging.queue import RabbitMessageBase
+from lp.services.longpoll.testing import (
+    capture_longpoll_emissions,
+    LongPollEventRecord,
+    )
 from lp.testing import TestCase
 from lp.testing.matchers import Contains
 
@@ -48,15 +51,14 @@
         # LongPollEvent.emit() sends the given data to `event_key`.
         event = FakeEvent("source", "event")
         event_data = {"hello": 1234}
-        event.emit(event_data)
-        expected_message = {
-            "event_key": event.event_key,
-            "event_data": event_data,
-            }
-        pending_messages = [
-            message for (call, message) in
-            RabbitMessageBase.class_locals.messages]
-        self.assertThat(pending_messages, Contains(expected_message))
+        with capture_longpoll_emissions() as log:
+            event.emit(event_data)
+        expected_message = LongPollEventRecord(
+            event_key=event.event_key, data={
+                "event_key": event.event_key,
+                "event_data": event_data,
+                })
+        self.assertThat(log, Contains(expected_message))
 
 
 class TestFunctions(TestCase):

=== modified file 'lib/lp/services/longpoll/adapters/tests/test_subscriber.py'
--- lib/lp/services/longpoll/adapters/tests/test_subscriber.py	2011-09-16 14:59:47 +0000
+++ lib/lp/services/longpoll/adapters/tests/test_subscriber.py	2011-09-23 16:28:26 +0000
@@ -12,6 +12,7 @@
     Not,
     StartsWith,
     )
+from zope.component import getUtility
 from zope.interface import implements
 
 from canonical.launchpad.webapp.servers import LaunchpadTestRequest
@@ -27,10 +28,7 @@
     ILongPollEvent,
     ILongPollSubscriber,
     )
-from lp.services.messaging.queue import (
-    RabbitQueue,
-    RabbitRoutingKey,
-    )
+from lp.services.messaging.interfaces import IMessageSession
 from lp.testing import TestCase
 from lp.testing.matchers import Contains
 
@@ -86,9 +84,11 @@
         subscriber = ILongPollSubscriber(request)
         subscriber.subscribe(event)
         message = '{"hello": 1234}'
-        routing_key = RabbitRoutingKey(event.event_key)
-        routing_key.send_now(message)
-        subscribe_queue = RabbitQueue(subscriber.subscribe_key)
+        session = getUtility(IMessageSession)
+        routing_key = session.getProducer(event.event_key)
+        routing_key.send(message)
+        session.flush()
+        subscribe_queue = session.getConsumer(subscriber.subscribe_key)
         self.assertEqual(
             message, subscribe_queue.receive(timeout=5))
 

=== modified file 'lib/lp/services/longpoll/testing.py'
--- lib/lp/services/longpoll/testing.py	2011-09-19 12:58:06 +0000
+++ lib/lp/services/longpoll/testing.py	2011-09-23 16:28:26 +0000
@@ -20,12 +20,12 @@
 
 
 class LoggingRouter:
-    """A test double for instances of `RabbitRoutingKey`.
+    """A test double for `IMessageProducer`.
 
     Saves messages as `LongPollEventRecord` tuples to a log.
 
     :param log: A callable accepting a single `LongPollEventRecord`.
-    :param routing_key: See `RabbitRoutingKey.__init__`.
+    :param routing_key: See `IMessageSession.getProducer`.
     """
 
     def __init__(self, log, routing_key):

=== modified file 'lib/lp/services/messaging/interfaces.py'
--- lib/lp/services/messaging/interfaces.py	2011-07-01 14:14:46 +0000
+++ lib/lp/services/messaging/interfaces.py	2011-09-23 16:28:26 +0000
@@ -8,47 +8,80 @@
     'EmptyQueueException',
     'IMessageProducer',
     'IMessageConsumer',
+    'IMessageSession',
     ]
 
 
-from zope.interface import Interface
+from zope.interface import (
+    Attribute,
+    Interface,
+    )
 
 
 class EmptyQueueException(Exception):
     """Raised if there are no queued messages on a non-blocking read."""
-    pass
+
+
+class IMessageSession(Interface):
+
+    connection = Attribute("A connection to the messaging system.")
+
+    def connect():
+        """Connect to the messaging system.
+
+        If the session is already connected this should be a no-op.
+        """
+
+    def disconnect():
+        """Disconnect from the messaging system.
+
+        If the session is already disconnected this should be a no-op.
+        """
+
+    def flush():
+        """Run deferred tasks."""
+
+    def finish():
+        """Flush the session and reset."""
+
+    def reset():
+        """Reset the session."""
+
+    def defer(func, *args, **kwargs):
+        """Schedule something to happen when this session is finished."""
+
+    def getProducer(name):
+        """Get a `IMessageProducer` associated with this session."""
+
+    def getConsumer(name):
+        """Get a `IMessageConsumer` associated with this session."""
 
 
 class IMessageConsumer(Interface):
+
     def receive(blocking=True):
         """Receive data from the queue.
 
         :raises EmptyQueueException: If non-blocking and the queue is empty.
         """
 
-    def close():
-        """Cleanup nicely."""
-
 
 class IMessageProducer(Interface):
 
     def send(data):
         """Serialize `data` into JSON and send it to the queue on commit."""
 
-    def send_now(data):
+    def sendNow(data):
         """Serialize `data` into JSON and send it to the queue immediately."""
 
-    def close():
-        """Cleanup nicely."""
-
     def associateConsumer(consumer):
+        """Make the consumer receive messages from this producer on commit.
+
+        :param consumer: An `IMessageConsumer`
+        """
+
+    def associateConsumerNow(consumer):
         """Make the consumer receive messages from this producer.
 
         :param consumer: An `IMessageConsumer`
         """
-
-    def disassociateConsumer(consumer):
-        """Make the consumer stop receiving messages from this producer.
-
-        :param consumer: An `IMessageConsumer`
-        """

=== renamed file 'lib/lp/services/messaging/queue.py' => 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/queue.py	2011-07-05 09:01:53 +0000
+++ lib/lp/services/messaging/rabbit.py	2011-09-23 16:28:26 +0000
@@ -5,108 +5,184 @@
 
 __metaclass__ = type
 __all__ = [
-    "RabbitRoutingKey",
-    "RabbitQueue",
+    "session",
+    "unreliable_session",
     ]
 
-from amqplib import client_0_8 as amqp
+from collections import deque
+from functools import partial
 import json
-from threading import local as thread_local
+import threading
 import time
+
+from amqplib import client_0_8 as amqp
 import transaction
+from transaction._transaction import Status as TransactionStatus
 from zope.interface import implements
 
 from canonical.config import config
 from lp.services.messaging.interfaces import (
+    EmptyQueueException,
     IMessageConsumer,
     IMessageProducer,
-    EmptyQueueException,
+    IMessageSession,
     )
 
+
 LAUNCHPAD_EXCHANGE = "launchpad-exchange"
 
 
-class MessagingDataManager:
-    """A Zope transaction data manager for Launchpad messaging.
-
-    This class implements the necessary code to send messages only when
-    the Zope transactions are committed.  It will iterate over the messages
-    and send them using queue.send(oncommit=False).
+class RabbitSessionTransactionSync:
+
+    implements(transaction.interfaces.ISynchronizer)
+
+    def __init__(self, session):
+        self.session = session
+
+    def newTransaction(self, txn):
+        pass
+
+    def beforeCompletion(self, txn):
+        pass
+
+    def afterCompletion(self, txn):
+        if txn.status == TransactionStatus.COMMITTED:
+            self.session.finish()
+        else:
+            self.session.reset()
+
+
+class RabbitSession(threading.local):
+
+    implements(IMessageSession)
+
+    exchange = LAUNCHPAD_EXCHANGE
+
+    def __init__(self):
+        super(RabbitSession, self).__init__()
+        self._connection = None
+        self._deferred = deque()
+        # Maintain sessions according to transaction boundaries. Keep a strong
+        # reference to the sync because the transaction manager does not. We
+        # need one per thread (definining it here is enough to ensure that).
+        self._sync = RabbitSessionTransactionSync(self)
+        transaction.manager.registerSynch(self._sync)
+
+    @property
+    def connection(self):
+        """See `IMessageSession`.
+
+        Don't return closed connection.
+        """
+        if self._connection is None:
+            return None
+        elif self._connection.transport is None:
+            return None
+        else:
+            return self._connection
+
+    def connect(self):
+        """See `IMessageSession`.
+
+        Open a connection for this thread if necessary. Connections cannot be
+        shared between threads.
+        """
+        if self._connection is None or self._connection.transport is None:
+            self._connection = amqp.Connection(
+                host=config.rabbitmq.host, userid=config.rabbitmq.userid,
+                password=config.rabbitmq.password,
+                virtual_host=config.rabbitmq.virtual_host, insist=False)
+        return self._connection
+
+    def disconnect(self):
+        """See `IMessageSession`."""
+        if self._connection is not None:
+            try:
+                self._connection.close()
+            finally:
+                self._connection = None
+
+    def flush(self):
+        """See `IMessageSession`."""
+        tasks = self._deferred
+        while len(tasks) != 0:
+            tasks.popleft()()
+
+    def finish(self):
+        """See `IMessageSession`."""
+        try:
+            self.flush()
+        finally:
+            self.reset()
+
+    def reset(self):
+        """See `IMessageSession`."""
+        self._deferred.clear()
+        self.disconnect()
+
+    def defer(self, func, *args, **kwargs):
+        """See `IMessageSession`."""
+        self._deferred.append(partial(func, *args, **kwargs))
+
+    def getProducer(self, name):
+        """See `IMessageSession`."""
+        return RabbitRoutingKey(self, name)
+
+    def getConsumer(self, name):
+        """See `IMessageSession`."""
+        return RabbitQueue(self, name)
+
+
+# Per-thread sessions.
+session = RabbitSession()
+
+
+class RabbitUnreliableSession(RabbitSession):
+    """An "unreliable" `RabbitSession`.
+
+    Unreliable in this case means that certain errors in deferred tasks are
+    silently suppressed, `AMQPException` in particular. This means that
+    services can continue to function even in the absence of a running and
+    fully functional message queue.
     """
-    def __init__(self, messages):
-        self.messages = messages
-
-    def _cleanup(self):
-        """Completely remove the list of stored messages"""
-        del self.messages[:]
-
-    def abort(self, txn):
-        self._cleanup()
-
-    def tpc_begin(self, txn):
-        pass
-
-    def tpc_vote(self, txn):
-        pass
-
-    def tpc_finish(self, txn):
-        self._cleanup()
-
-    def tpc_abort(self, txn):
-        self._cleanup()
-
-    def sortKey(self):
-        """Ensure that messages are sent after PostgresSQL connections
-        are committed."""
-        return "zz_messaging_%s" % id(self)
-
-    def commit(self, txn):
-        for send_func, data in self.messages:
-            send_func(data)
-        self._cleanup()
+
+    ignored_errors = (
+        amqp.AMQPException,
+        )
+
+    def finish(self):
+        """See `IMessageSession`.
+
+        Suppresses errors listed in `ignored_errors`.
+        """
+        try:
+            super(RabbitUnreliableSession, self).finish()
+        except self.ignored_errors:
+            pass
+
+
+# Per-thread "unreliable" sessions.
+unreliable_session = RabbitUnreliableSession()
 
 
 class RabbitMessageBase:
     """Base class for all RabbitMQ messaging."""
 
-    class_locals = thread_local()
-
-    channel = None
-
-    def _initialize(self):
-        # Open a connection and channel for this thread if necessary.
-        # Connections cannot be shared between threads.
-        if not hasattr(self.class_locals, "rabbit_connection"):
-            conn = amqp.Connection(
-                host=config.rabbitmq.host, userid=config.rabbitmq.userid,
-                password=config.rabbitmq.password,
-                virtual_host=config.rabbitmq.virtual_host, insist=False)
-            self.class_locals.rabbit_connection = conn
-
-            # Initialize storage for oncommit messages.
-            self.class_locals.messages = []
-
-        conn = self.class_locals.rabbit_connection
-        self.channel = conn.channel()
-        #self.channel.access_request(
-        #    '/data', active=True, write=True, read=True)
-        self.channel.exchange_declare(
-            LAUNCHPAD_EXCHANGE, "direct", durable=False,
-            auto_delete=False, nowait=False)
-
-    def close(self):
-        # Note the connection is not closed - it is shared with other
-        # queues. Just close our channel.
-        if self.channel:
-            self.channel.close()
-
-    def _disconnect(self):
-        """Disconnect from rabbit. The connection is shared, so this will
-        break other RabbitQueue instances."""
-        self.close()
-        if hasattr(self.class_locals, 'rabbit_connection'):
-            self.class_locals.rabbit_connection.close()
-            del self.class_locals.rabbit_connection
+    def __init__(self, session):
+        self.session = IMessageSession(session)
+        self._channel = None
+
+    @property
+    def channel(self):
+        if self._channel is None or not self._channel.is_open:
+            connection = self.session.connect()
+            self._channel = connection.channel()
+            #self._channel.access_request(
+            #    '/data', active=True, write=True, read=True)
+            self._channel.exchange_declare(
+                self.session.exchange, "direct", durable=False,
+                auto_delete=False, nowait=False)
+        return self._channel
 
 
 class RabbitRoutingKey(RabbitMessageBase):
@@ -114,39 +190,31 @@
 
     implements(IMessageProducer)
 
-    def __init__(self, routing_key):
+    def __init__(self, session, routing_key):
+        super(RabbitRoutingKey, self).__init__(session)
         self.key = routing_key
 
     def associateConsumer(self, consumer):
         """Only receive messages for requested routing key."""
-        self._initialize()
+        self.session.defer(self.associateConsumerNow, consumer)
+
+    def associateConsumerNow(self, consumer):
+        """Only receive messages for requested routing key."""
         self.channel.queue_bind(
-            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
-            routing_key=self.key, nowait=False)
-
-    def disassociateConsumer(self, consumer):
-        """Stop receiving messages for the requested routing key."""
-        self._initialize()
-        self.channel.queue_unbind(
-            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
+            queue=consumer.name, exchange=self.session.exchange,
             routing_key=self.key, nowait=False)
 
     def send(self, data):
         """See `IMessageProducer`."""
-        self._initialize()
-        messages = self.class_locals.messages
-        # XXX: The data manager should close channels and flush too
-        if not messages:
-            transaction.get().join(MessagingDataManager(messages))
-        messages.append((self.send_now, data))
+        self.session.defer(self.sendNow, data)
 
-    def send_now(self, data):
+    def sendNow(self, data):
         """Immediately send a message to the broker."""
-        self._initialize()
         json_data = json.dumps(data)
         msg = amqp.Message(json_data)
         self.channel.basic_publish(
-            exchange=LAUNCHPAD_EXCHANGE, routing_key=self.key, msg=msg)
+            exchange=self.session.exchange,
+            routing_key=self.key, msg=msg)
 
 
 class RabbitQueue(RabbitMessageBase):
@@ -154,16 +222,16 @@
 
     implements(IMessageConsumer)
 
-    def __init__(self, name):
+    def __init__(self, session, name):
+        super(RabbitQueue, self).__init__(session)
         self.name = name
-        self._initialize()
         self.channel.queue_declare(self.name, nowait=False)
 
     def receive(self, timeout=0.0):
         """Pull a message from the queue.
 
         :param timeout: Wait a maximum of `timeout` seconds before giving up,
-            trying at least once.  If timeout is None, block forever.
+            trying at least once.
         :raises: EmptyQueueException if the timeout passes.
         """
         starttime = time.time()
@@ -181,10 +249,10 @@
         # XXX The code below will be useful when we can implement this
         # properly.
         result = []
+
         def callback(msg):
             result.append(json.loads(msg.body))
 
         self.channel.basic_consume(self.name, callback=callback)
         self.channel.wait()
         return result[0]
-

=== renamed file 'lib/lp/services/messaging/tests/test_rabbit_queue.py' => 'lib/lp/services/messaging/tests/test_rabbit.py'
--- lib/lp/services/messaging/tests/test_rabbit_queue.py	2011-07-01 15:17:23 +0000
+++ lib/lp/services/messaging/tests/test_rabbit.py	2011-09-23 16:28:26 +0000
@@ -5,93 +5,371 @@
 
 __metaclass__ = type
 
+from functools import partial
+from itertools import count
+import thread
+
+from amqplib import client_0_8 as amqp
 import transaction
+from transaction._transaction import Status as TransactionStatus
+from zope.component import getUtility
 
-from canonical.testing.layers import RabbitMQLayer
+from canonical.testing.layers import (
+    LaunchpadFunctionalLayer,
+    RabbitMQLayer,
+    )
 from lp.services.messaging.interfaces import (
     EmptyQueueException,
     IMessageConsumer,
     IMessageProducer,
+    IMessageSession,
     )
-from lp.services.messaging.queue import (
+from lp.services.messaging.rabbit import (
+    RabbitMessageBase,
     RabbitQueue,
     RabbitRoutingKey,
+    RabbitSession,
+    RabbitSessionTransactionSync,
+    RabbitUnreliableSession,
+    session as global_session,
+    unreliable_session as global_unreliable_session,
     )
 from lp.testing import TestCase
-
-
-class TestRabbitQueue(TestCase):
+from lp.testing.faketransaction import FakeTransaction
+from lp.testing.matchers import Provides
+
+# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
+# of distinct names.
+queue_names = ("queue.%d" % num for num in count(1))
+key_names = ("key.%d" % num for num in count(1))
+
+
+class FakeRabbitSession:
+
+    def __init__(self):
+        self.log = []
+
+    def finish(self):
+        self.log.append("finish")
+
+    def reset(self):
+        self.log.append("reset")
+
+
+class TestRabbitSessionTransactionSync(TestCase):
+
+    def test_interface(self):
+        self.assertThat(
+            RabbitSessionTransactionSync(None),
+            Provides(transaction.interfaces.ISynchronizer))
+
+    def test_afterCompletion_COMMITTED(self):
+        txn = FakeTransaction()
+        txn.status = TransactionStatus.COMMITTED
+        fake_session = FakeRabbitSession()
+        sync = RabbitSessionTransactionSync(fake_session)
+        sync.afterCompletion(txn)
+        self.assertEqual(["finish"], fake_session.log)
+
+    def test_afterCompletion_ACTIVE(self):
+        txn = FakeTransaction()
+        txn.status = TransactionStatus.ACTIVE
+        fake_session = FakeRabbitSession()
+        sync = RabbitSessionTransactionSync(fake_session)
+        sync.afterCompletion(txn)
+        self.assertEqual(["reset"], fake_session.log)
+
+
+class RabbitTestCase(TestCase):
+
     layer = RabbitMQLayer
 
-    def setUp(self):
-        super(TestCase, self).setUp()
-        self.queue_name = 'whatever'
-        self.queue = RabbitQueue(self.queue_name)
-        self.key_name = "arbitrary.routing.key"
-        self.key = RabbitRoutingKey(self.key_name)
-        self.key.associateConsumer(self.queue)
-
     def tearDown(self):
-        self.queue._disconnect()
-        super(TestCase, self).tearDown()
-
-    def test_implements(self):
-        self.assertTrue(IMessageConsumer.providedBy(self.queue))
-        self.assertTrue(IMessageProducer.providedBy(self.key))
-
-    def test_send_now(self):
+        super(RabbitTestCase, self).tearDown()
+        global_session.reset()
+        global_unreliable_session.reset()
+
+
+class TestRabbitSession(RabbitTestCase):
+
+    def test_interface(self):
+        session = RabbitSession()
+        self.assertThat(session, Provides(IMessageSession))
+
+    def test_connect(self):
+        session = RabbitSession()
+        self.assertIs(None, session.connection)
+        connection = session.connect()
+        self.assertIsNot(None, session.connection)
+        self.assertIs(connection, session.connection)
+
+    def test_disconnect(self):
+        session = RabbitSession()
+        session.connect()
+        session.disconnect()
+        self.assertIs(None, session.connection)
+
+    def test_connection(self):
+        # The connection property is None once a connection has been closed.
+        session = RabbitSession()
+        session.connect()
+        # Close the connection without using disconnect().
+        session.connection.close()
+        self.assertIs(None, session.connection)
+
+    def test_defer(self):
+        task = lambda foo, bar: None
+        session = RabbitSession()
+        session.defer(task, "foo", bar="baz")
+        self.assertEqual(1, len(session._deferred))
+        [deferred_task] = session._deferred
+        self.assertIsInstance(deferred_task, partial)
+        self.assertIs(task, deferred_task.func)
+        self.assertEqual(("foo",), deferred_task.args)
+        self.assertEqual({"bar": "baz"}, deferred_task.keywords)
+
+    def test_flush(self):
+        # RabbitSession.flush() runs deferred tasks.
+        log = []
+        task = lambda: log.append("task")
+        session = RabbitSession()
+        session.defer(task)
+        session.connect()
+        session.flush()
+        self.assertEqual(["task"], log)
+        self.assertEqual([], list(session._deferred))
+        self.assertIsNot(None, session.connection)
+
+    def test_reset(self):
+        # RabbitSession.reset() resets session variables and does not run
+        # deferred tasks.
+        log = []
+        task = lambda: log.append("task")
+        session = RabbitSession()
+        session.defer(task)
+        session.connect()
+        session.reset()
+        self.assertEqual([], log)
+        self.assertEqual([], list(session._deferred))
+        self.assertIs(None, session.connection)
+
+    def test_finish(self):
+        # RabbitSession.finish() resets session variables after running
+        # deferred tasks.
+        log = []
+        task = lambda: log.append("task")
+        session = RabbitSession()
+        session.defer(task)
+        session.connect()
+        session.finish()
+        self.assertEqual(["task"], log)
+        self.assertEqual([], list(session._deferred))
+        self.assertIs(None, session.connection)
+
+    def test_getProducer(self):
+        session = RabbitSession()
+        producer = session.getProducer("foo")
+        self.assertIsInstance(producer, RabbitRoutingKey)
+        self.assertIs(session, producer.session)
+        self.assertEqual("foo", producer.key)
+
+    def test_getConsumer(self):
+        session = RabbitSession()
+        consumer = session.getConsumer("foo")
+        self.assertIsInstance(consumer, RabbitQueue)
+        self.assertIs(session, consumer.session)
+        self.assertEqual("foo", consumer.name)
+
+
+class TestRabbitUnreliableSession(RabbitTestCase):
+
+    def raise_AMQPException(self):
+        raise amqp.AMQPException(123, "Suffin broke.", "Whut?")
+
+    def test_finish_suppresses_some_errors(self):
+        session = RabbitUnreliableSession()
+        session.defer(self.raise_AMQPException)
+        session.finish()
+        # Look, no exceptions!
+
+    def raise_Exception(self):
+        raise Exception("That hent worked.")
+
+    def test_finish_does_not_suppress_other_errors(self):
+        session = RabbitUnreliableSession()
+        session.defer(self.raise_Exception)
+        self.assertRaises(Exception, session.finish)
+
+
+class TestRabbitMessageBase(RabbitTestCase):
+
+    def test_session(self):
+        base = RabbitMessageBase(global_session)
+        self.assertIs(global_session, base.session)
+
+    def test_channel(self):
+        # Referencing the channel property causes the session to connect.
+        base = RabbitMessageBase(global_session)
+        self.assertIs(None, base.session.connection)
+        channel = base.channel
+        self.assertIsNot(None, base.session.connection)
+        self.assertIsNot(None, channel)
+        # The same channel is returned every time.
+        self.assertIs(channel, base.channel)
+
+    def test_channel_session_closed(self):
+        # When the session is disconnected the channel is thrown away too.
+        base = RabbitMessageBase(global_session)
+        channel1 = base.channel
+        base.session.disconnect()
+        channel2 = base.channel
+        self.assertNotEqual(channel1, channel2)
+
+
+class TestRabbitRoutingKey(RabbitTestCase):
+
+    def test_interface(self):
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        self.assertThat(routing_key, Provides(IMessageProducer))
+
+    def test_associateConsumer(self):
+        # associateConsumer() only associates the consumer at transaction
+        # commit time. However, order is preserved.
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumer(consumer)
+        routing_key.sendNow('now')
+        routing_key.send('later')
+        # There is nothing in the queue because the consumer has not yet been
+        # associated with the routing key.
+        self.assertRaises(EmptyQueueException, consumer.receive, timeout=2)
+        transaction.commit()
+        # Now that the transaction has been committed, the consumer is
+        # associated, and receives the deferred message.
+        self.assertEqual('later', consumer.receive(timeout=2))
+
+    def test_associateConsumerNow(self):
+        # associateConsumerNow() associates the consumer right away.
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+        routing_key.sendNow('now')
+        routing_key.send('later')
+        # There is already something in the queue.
+        self.assertEqual('now', consumer.receive(timeout=2))
+        transaction.commit()
+        # Now that the transaction has been committed there is another item in
+        # the queue.
+        self.assertEqual('later', consumer.receive(timeout=2))
+
+    def test_send(self):
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+
+        for data in range(90, 100):
+            routing_key.send(data)
+
+        routing_key.sendNow('sync')
+        # There is nothing in the queue except the sync we just sent.
+        self.assertEqual('sync', consumer.receive(timeout=2))
+
+        # Messages get sent on commit
+        transaction.commit()
+        for data in range(90, 100):
+            self.assertEqual(data, consumer.receive())
+
+        # There are no more messages. They have all been consumed.
+        routing_key.sendNow('sync')
+        self.assertEqual('sync', consumer.receive(timeout=2))
+
+    def test_sendNow(self):
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+
         for data in range(50, 60):
-            self.key.send_now(data)
-            received_data = self.queue.receive(timeout=5)
-            self.assertEqual(received_data, data)
-
-    def test_receive_consumes(self):
+            routing_key.sendNow(data)
+            received_data = consumer.receive(timeout=2)
+            self.assertEqual(data, received_data)
+
+
+class TestRabbitQueue(RabbitTestCase):
+
+    def test_interface(self):
+        consumer = RabbitQueue(global_session, next(queue_names))
+        self.assertThat(consumer, Provides(IMessageConsumer))
+
+    def test_receive(self):
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+
         for data in range(55, 65):
-            self.key.send_now(data)
-            self.assertEqual(self.queue.receive(timeout=5), data)
+            routing_key.sendNow(data)
+            self.assertEqual(data, consumer.receive(timeout=2))
 
-        # None of the messages we received were put back. They were all
-        # consumed.
+        # All the messages received were consumed.
         self.assertRaises(
             EmptyQueueException,
-            self.queue.receive, timeout=5)
+            consumer.receive, timeout=2)
 
         # New connections to the queue see an empty queue too.
-        self.queue._disconnect()
-        key = RabbitRoutingKey(self.key_name)
-        queue = RabbitQueue(self.queue_name)
-        key.associateConsumer(queue)
-        key.send_now('new conn sync')
-        self.assertEqual(queue.receive(timeout=5), 'new conn sync')
-
-    def test_send(self):
-        for data in range(90, 100):
-            self.key.send(data)
-
-        self.key.send_now('sync')
-        # There is nothing in the queue except the sync we just sent.
-        self.assertEqual(self.queue.receive(timeout=5), 'sync')
-
-        # Messages get sent on commit
-        transaction.commit()
-        for data in range(90, 100):
-            self.assertEqual(self.queue.receive(), data)
-
-        # There are no more messages. They have all been consumed.
-        self.key.send_now('sync')
-        self.assertEqual(self.queue.receive(timeout=5), 'sync')
+        consumer.session.disconnect()
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+        self.assertRaises(
+            EmptyQueueException,
+            consumer.receive, timeout=2)
+
+
+class TestRabbit(RabbitTestCase):
+    """Integration-like tests for the RabbitMQ messaging abstractions."""
+
+    def get_synced_sessions(self):
+        thread_id = thread.get_ident()
+        try:
+            syncs_set = transaction.manager._synchs[thread_id]
+        except KeyError:
+            return set()
+        else:
+            return set(
+                sync.session for sync in syncs_set.data.itervalues()
+                if isinstance(sync, RabbitSessionTransactionSync))
+
+    def test_global_session(self):
+        self.assertIsInstance(global_session, RabbitSession)
+        self.assertIn(global_session, self.get_synced_sessions())
+
+    def test_global_unreliable_session(self):
+        self.assertIsInstance(
+            global_unreliable_session, RabbitUnreliableSession)
+        self.assertIn(global_unreliable_session, self.get_synced_sessions())
 
     def test_abort(self):
+        consumer = RabbitQueue(global_session, next(queue_names))
+        routing_key = RabbitRoutingKey(global_session, next(key_names))
+        routing_key.associateConsumerNow(consumer)
+
         for data in range(90, 100):
-            self.key.send(data)
-
-        self.key.send_now('sync')
-        # There is nothing in the queue except the sync we just sent.
-        self.assertEqual(self.queue.receive(timeout=5), 'sync')
-
-        # Messages get forgotten on abort.
+            routing_key.send(data)
+
+        # Messages sent using send() are forgotten on abort.
         transaction.abort()
-
-        # There are no more messages. They have all been consumed.
-        self.key.send_now('sync2')
-        self.assertEqual(self.queue.receive(timeout=5), 'sync2')
+        self.assertRaises(
+            EmptyQueueException,
+            consumer.receive, timeout=2)
+
+
+class TestRabbitWithLaunchpad(RabbitTestCase):
+    """Integration-like tests for the RabbitMQ messaging abstractions."""
+
+    layer = LaunchpadFunctionalLayer
+
+    def test_utility(self):
+        # The unreliable session is registered as the default IMessageSession
+        # utility.
+        self.assertIs(
+            global_unreliable_session,
+            getUtility(IMessageSession))