← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll

 

Free Ekanayaka has proposed merging lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~free.ekanayaka/txlongpoll/queue-manager-into-own-module/+merge/296791

Move QueueManager into its own modude. There's no change in the logic, and it's a move towards deprecating a few APIs of QueueManager (in particular QueueManager.connected/QueueManager.disconnected) in order to be able to leverage the new ClientService from Twisted 16.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll.
=== modified file 'setup.py'
--- setup.py	2016-06-08 11:02:39 +0000
+++ setup.py	2016-06-08 13:47:51 +0000
@@ -12,7 +12,7 @@
 
 setup(
     name='txlongpoll',
-    version="0.3.2",
+    version="4.0.0",
     packages=find_packages('.') + ['twisted.plugins'],
     include_package_data=True,
     zip_safe=False,

=== modified file 'txlongpoll/frontend.py'
--- txlongpoll/frontend.py	2011-09-30 09:56:12 +0000
+++ txlongpoll/frontend.py	2016-06-08 13:47:51 +0000
@@ -8,11 +8,12 @@
 import json
 
 from twisted.internet.defer import (
+    inlineCallbacks,
     Deferred,
-    inlineCallbacks,
-    returnValue,
-    )
+)
 from twisted.python import log
+from twisted.python.deprecate import deprecatedModuleAttribute
+from twisted.python.versions import Version
 from twisted.web.http import (
     BAD_REQUEST,
     INTERNAL_SERVER_ERROR,
@@ -21,49 +22,22 @@
     )
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
-from txamqp.client import Closed
-from txamqp.queue import (
-    Closed as QueueClosed,
-    Empty,
-    )
-
-
-__all__ = ["QueueManager", "FrontEndAjax"]
-
-
-class NotFound(Exception):
-    """Exception raised when a queue is not found in the message server."""
-
-
-class QueueManager(object):
-    """
-    An AMQP consumer which handles messages sent over a "frontend" queue to
-    set up temporary queues.  The L{get_message} method should be invoked to
-    retrieve one single message from those temporary queues.
-
-    @ivar message_timeout: time to wait for a message before giving up in
-        C{get_message}.
-    @ivar _channel: reference to the current C{AMQChannel}.
-    @ivar _client: reference to the current C{AMQClient}.
-    """
-
-    # The timeout must be lower than the Apache one in front, which by default
-    # is 5 minutes.
-    message_timeout = 270
-
-    def __init__(self, prefix=None):
-        self._prefix = prefix
-        self._channel = None
-        self._client = None
-        self._pending_requests = []
-        # Preserve compatibility by using special forms for naming when a
-        # prefix is specified.
-        if self._prefix is not None and len(self._prefix) != 0:
-            self._tag_form = "%s.notifications-tag.%%s.%%s" % self._prefix
-            self._queue_form = "%s.notifications-queue.%%s" % self._prefix
-        else:
-            self._tag_form = "%s.%s"
-            self._queue_form = "%s"
+from txamqp.queue import Empty
+from txlongpoll.notification import (
+    NotFound,
+    NotificationSource,
+)
+
+
+__all__ = ["DeprecatedQueueManager", "QueueManager", "FrontEndAjax"]
+
+
+class DeprecatedQueueManager(NotificationSource):
+    """
+    Legacy queue manager implementing the connected/disconnected callbacks.
+    This class is deprecated and will eventually be dropped in favour of
+    NotificationSource, which is designed to leverage txamqp's AMQService.
+    """
 
     def disconnected(self):
         """
@@ -88,69 +62,6 @@
             self._pending_requests.pop(0).callback(None)
         return d
 
-    def _wait_for_connection(self):
-        """
-        Return a L{Deferred} which will fire when the connection is available.
-        """
-        pending = Deferred()
-        self._pending_requests.append(pending)
-        return pending
-
-    @inlineCallbacks
-    def get_message(self, uuid, sequence):
-        """Consume and return one message for C{uuid}.
-
-        @param uuid: The identifier of the queue.
-        @param sequence: The sequential number for identifying the subscriber
-            in the queue.
-
-        If no message is received within the number of seconds in
-        L{message_timeout}, then the returned Deferred will errback with
-        L{Empty}.
-        """
-        if self._channel is None:
-            yield self._wait_for_connection()
-        tag = self._tag_form % (uuid, sequence)
-        try:
-            yield self._channel.basic_consume(
-                consumer_tag=tag, queue=(self._queue_form % uuid))
-
-            log.msg("Consuming from queue '%s'" % uuid)
-
-            queue = yield self._client.queue(tag)
-            msg = yield queue.get(self.message_timeout)
-        except Empty:
-            # Let's wait for the cancel here
-            yield self._channel.basic_cancel(consumer_tag=tag)
-            self._client.queues.pop(tag, None)
-            # Check for the messages arrived in the mean time
-            if queue.pending:
-                msg = queue.pending.pop()
-                returnValue((msg.content.body, msg.delivery_tag))
-            raise Empty()
-        except QueueClosed:
-            # The queue has been closed, presumably because of a side effect.
-            # Let's retry after reconnection.
-            yield self._wait_for_connection()
-            data = yield self.get_message(uuid, sequence)
-            returnValue(data)
-        except Closed, e:
-            if self._client and self._client.transport:
-                self._client.transport.loseConnection()
-            if e.args and e.args[0].reply_code == 404:
-                raise NotFound()
-            else:
-                raise
-        except:
-            if self._client and self._client.transport:
-                self._client.transport.loseConnection()
-            raise
-
-        yield self._channel.basic_cancel(consumer_tag=tag, nowait=True)
-        self._client.queues.pop(tag, None)
-
-        returnValue((msg.content.body, msg.delivery_tag))
-
     def reject_message(self, tag):
         """Put back a message."""
         return self._channel.basic_reject(tag, requeue=True)
@@ -174,6 +85,21 @@
             queue = yield self._client.queue(tag)
             queue.put(Empty)
 
+    def _wait_for_connection(self):
+        """
+        Return a L{Deferred} which will fire when the connection is available.
+        """
+        pending = Deferred()
+        self._pending_requests.append(pending)
+        return pending
+
+QueueManager = DeprecatedQueueManager  # For backward compatibility
+deprecatedModuleAttribute(
+        Version("txlongpoll", 4, 0, 0),
+        "Use txlongpoll.notification.NotificationSource instead.",
+        __name__,
+        "QueueManager")
+
 
 class FrontEndAjax(Resource):
     """

=== added file 'txlongpoll/notification.py'
--- txlongpoll/notification.py	1970-01-01 00:00:00 +0000
+++ txlongpoll/notification.py	2016-06-08 13:47:51 +0000
@@ -0,0 +1,120 @@
+# Copyright 2005-2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Fire notifications from by consuming AMQP queues.
+
+This module provides a small abstraction around the AMQP protocol/transport,
+implementing an API that can be consumed by call sites wanting to receive
+notifications from specific streams identified by UUIDs, that map to AMQP
+queues.
+
+See also txlongpoll.frontend.FrontEndAjax.
+"""
+
+from twisted.internet.defer import (
+    inlineCallbacks,
+    returnValue,
+)
+from twisted.python import log
+from txamqp.client import Closed
+from txamqp.queue import (
+    Closed as QueueClosed,
+    Empty,
+    )
+
+
+__all__ = ["NotFound", "NotificationSource"]
+
+
+class NotFound(Exception):
+    """Raised the notifications stream for a given UUID is not available.
+
+    This typically happens when the associated AMQP queue doesn't exist
+    or was delelated from the broker.
+    """
+
+
+class NotificationSource(object):
+    """
+    An AMQP consumer which handles messages sent over a "frontend" queue to
+    set up temporary queues.  The L{get_message} method should be invoked to
+    retrieve one single message from those temporary queues.
+
+    @ivar message_timeout: time to wait for a message before giving up in
+        C{get_message}.
+    @ivar _channel: reference to the current C{AMQChannel}.
+    @ivar _client: reference to the current C{AMQClient}.
+    """
+
+    # The timeout must be lower than the Apache one in front, which by default
+    # is 5 minutes.
+    message_timeout = 270
+
+    def __init__(self, prefix=None):
+        self._prefix = prefix
+        self._channel = None
+        self._client = None
+        self._pending_requests = []
+        # Preserve compatibility by using special forms for naming when a
+        # prefix is specified.
+        if self._prefix is not None and len(self._prefix) != 0:
+            self._tag_form = "%s.notifications-tag.%%s.%%s" % self._prefix
+            self._queue_form = "%s.notifications-queue.%%s" % self._prefix
+        else:
+            self._tag_form = "%s.%s"
+            self._queue_form = "%s"
+
+    @inlineCallbacks
+    def get_message(self, uuid, sequence):
+        """Consume and return one message for C{uuid}.
+
+        @param uuid: The identifier of the queue.
+        @param sequence: The sequential number for identifying the subscriber
+            in the queue.
+
+        If no message is received within the number of seconds in
+        L{message_timeout}, then the returned Deferred will errback with
+        L{Empty}.
+        """
+        if self._channel is None:
+            yield self._wait_for_connection()
+        tag = self._tag_form % (uuid, sequence)
+        try:
+            yield self._channel.basic_consume(
+                consumer_tag=tag, queue=(self._queue_form % uuid))
+
+            log.msg("Consuming from queue '%s'" % uuid)
+
+            queue = yield self._client.queue(tag)
+            msg = yield queue.get(self.message_timeout)
+        except Empty:
+            # Let's wait for the cancel here
+            yield self._channel.basic_cancel(consumer_tag=tag)
+            self._client.queues.pop(tag, None)
+            # Check for the messages arrived in the mean time
+            if queue.pending:
+                msg = queue.pending.pop()
+                returnValue((msg.content.body, msg.delivery_tag))
+            raise Empty()
+        except QueueClosed:
+            # The queue has been closed, presumably because of a side effect.
+            # Let's retry after reconnection.
+            yield self._wait_for_connection()
+            data = yield self.get_message(uuid, sequence)
+            returnValue(data)
+        except Closed, e:
+            if self._client and self._client.transport:
+                self._client.transport.loseConnection()
+            if e.args and e.args[0].reply_code == 404:
+                raise NotFound()
+            else:
+                raise
+        except:
+            if self._client and self._client.transport:
+                self._client.transport.loseConnection()
+            raise
+
+        yield self._channel.basic_cancel(consumer_tag=tag, nowait=True)
+        self._client.queues.pop(tag, None)
+
+        returnValue((msg.content.body, msg.delivery_tag))

=== modified file 'txlongpoll/plugin.py'
--- txlongpoll/plugin.py	2012-04-16 07:33:18 +0000
+++ txlongpoll/plugin.py	2016-06-08 13:47:51 +0000
@@ -34,7 +34,7 @@
 from txlongpoll.client import AMQFactory
 from txlongpoll.frontend import (
     FrontEndAjax,
-    QueueManager,
+    DeprecatedQueueManager,
     )
 from txlongpoll.services import (
     LogService,
@@ -148,7 +148,7 @@
         frontend_port = frontend_config["port"]
         frontend_prefix = frontend_config["prefix"]
         frontend_interface = frontend_config["interface"]
-        frontend_manager = QueueManager(frontend_prefix)
+        frontend_manager = DeprecatedQueueManager(frontend_prefix)
 
         broker_config = config["broker"]
         broker_port = broker_config["port"]

=== modified file 'txlongpoll/tests/test_frontend.py'
--- txlongpoll/tests/test_frontend.py	2015-11-23 09:02:27 +0000
+++ txlongpoll/tests/test_frontend.py	2016-06-08 13:47:51 +0000
@@ -3,271 +3,15 @@
 
 from cStringIO import StringIO
 import json
-from unittest import defaultTestLoader
 
 from testtools import TestCase
-from testtools.deferredruntest import (
-    assert_fails_with,
-    run_with_log_observers,
-    )
-from twisted.internet import reactor
+from testtools.deferredruntest import run_with_log_observers
 from twisted.internet.defer import (
     Deferred,
     fail,
-    inlineCallbacks,
     succeed,
     )
-from twisted.internet.task import (
-    Clock,
-    deferLater,
-    )
-from txamqp.content import Content
-from txamqp.protocol import (
-    AMQChannel,
-    AMQClient,
-    )
-from txamqp.queue import Empty
-from txlongpoll.frontend import (
-    FrontEndAjax,
-    NotFound,
-    QueueManager,
-    )
-from txlongpoll.testing.client import (
-    AMQTest,
-    QueueWrapper,
-    )
-
-
-class QueueManagerTest(AMQTest):
-
-    prefix = None
-    tag_prefix = ""
-    queue_prefix = ""
-
-    def setUp(self):
-        self.clock = Clock()
-        self.manager = QueueManager(self.prefix)
-        return AMQTest.setUp(self)
-
-    def test_wb_connected(self):
-        """
-        The C{connected} callback of L{QueueManager} sets the C{_client} and
-        C{_channel} attributes.
-        """
-        d = self.manager.connected((self.client, self.channel))
-        self.assertTrue(isinstance(self.manager._client, AMQClient))
-        self.assertTrue(isinstance(self.manager._channel, AMQChannel))
-        self.assertIs(self.manager._client, self.client)
-        self.assertIs(self.manager._channel, self.channel)
-        return d
-
-    @inlineCallbacks
-    def test_get_message(self):
-        """
-        C{get_message} returns the message exposed to a previously created
-        queue.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1", auto_delete=True)
-        content = Content("some content")
-
-        yield self.channel.basic_publish(
-            routing_key=self.queue_prefix + "uuid1",
-            content=content)
-        message = yield self.manager.get_message("uuid1", "0")
-        self.assertEquals(message[0], "some content")
-
-        self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
-
-    @inlineCallbacks
-    def test_reject_message(self):
-        """
-        C{reject_message} puts back a message in the queue so that it's
-        available to subsequent C{get_message} calls.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-        content = Content("some content")
-
-        yield self.channel.basic_publish(
-            routing_key=self.queue_prefix + "uuid1",
-            content=content)
-        message, tag = yield self.manager.get_message("uuid1", "0")
-        yield self.manager.reject_message(tag)
-        message2, tag2 = yield self.manager.get_message("uuid1", "1")
-        self.assertEquals(message2, "some content")
-
-    @inlineCallbacks
-    def test_ack_message(self):
-        """
-        C{ack_message} confirms the removal of a message from the queue, making
-        subsequent C{get_message} calls waiting for another message.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-        content = Content("some content")
-
-        yield self.channel.basic_publish(
-            routing_key=self.queue_prefix + "uuid1",
-            content=content)
-        message, tag = yield self.manager.get_message("uuid1", "0")
-        yield self.manager.ack_message(tag)
-
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-
-        d = self.manager.get_message("uuid1", "1")
-        yield event_queue.get()
-        yield deferLater(reactor, 0, lambda: None)
-        self.clock.advance(self.manager.message_timeout + 1)
-        yield assert_fails_with(d, Empty)
-
-    @inlineCallbacks
-    def test_get_message_after_error(self):
-        """
-        If an error occurs in C{get_message}, the transport is closed so that
-        we can query messages again.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        d = self.manager.get_message("uuid_unknown", "0")
-        yield assert_fails_with(d, NotFound)
-        self.assertTrue(self.channel.closed)
-        yield deferLater(reactor, 0.1, lambda: None)
-        self.assertTrue(self.client.transport.disconnected)
-
-    @inlineCallbacks
-    def test_get_message_during_error(self):
-        """
-        If an error occurs in C{get_message}, other C{get_message} calls don't
-        fail, and are retried on reconnection instead.
-        """
-        self.factory.initialDelay = 0.1
-        self.factory.resetDelay()
-        self.amq_disconnected = self.manager.disconnected
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-        content = Content("some content")
-
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-
-        d1 = self.manager.get_message("uuid1", "0")
-        yield event_queue.get()
-
-        d2 = self.manager.get_message("uuid_unknown", "0")
-
-        yield assert_fails_with(d2, NotFound)
-        self.assertTrue(self.channel.closed)
-
-        self.connected_deferred = Deferred()
-        yield self.connected_deferred
-
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.basic_publish(
-            routing_key=self.queue_prefix + "uuid1",
-            content=content)
-
-        message = yield d1
-        self.assertEquals(message[0], "some content")
-
-    @inlineCallbacks
-    def test_get_message_timeout(self):
-        """
-        C{get_message} timeouts after a certain amount of time if no message
-        arrived on the queue.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-
-        d = self.manager.get_message("uuid1", "0")
-        yield event_queue.get()
-        yield deferLater(reactor, 0, lambda: None)
-        self.clock.advance(self.manager.message_timeout + 1)
-        yield assert_fails_with(d, Empty)
-
-        self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
-
-    @inlineCallbacks
-    def test_wb_timeout_race_condition(self):
-        """
-        If a message is received between the time the queue gets a timeout and
-        C{get_message} cancels the subscription, the message is recovered and
-        returned by C{get_message}.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-        content = Content("some content")
-
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-        old_timeout = reply._timeout
-
-        def timeout(deferred):
-            self.channel.basic_publish(
-                routing_key=self.queue_prefix + "uuid1",
-                content=content)
-            old_timeout(deferred)
-
-        reply._timeout = timeout
-
-        d = self.manager.get_message("uuid1", "0")
-        yield event_queue.get()
-        yield deferLater(reactor, 0, lambda: None)
-        self.clock.advance(self.manager.message_timeout + 1)
-
-        message = yield d
-        self.assertEquals(message[0], "some content")
-
-    @inlineCallbacks
-    def test_retry_after_timeout(self):
-        """
-        If a timeout happens, one can retry to consume message from the queue
-        later on.
-        """
-        yield self.manager.connected((self.client, self.channel))
-        yield self.channel.queue_declare(
-            queue=self.queue_prefix + "uuid1")
-
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-
-        d1 = self.manager.get_message("uuid1", "0")
-        yield event_queue.get()
-        yield deferLater(reactor, 0, lambda: None)
-        self.clock.advance(self.manager.message_timeout + 1)
-        yield assert_fails_with(d1, Empty)
-
-        # Let's wrap the queue again
-        reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
-        reply.clock = self.clock
-        event_queue = QueueWrapper(reply).event_queue
-
-        d2 = self.manager.get_message("uuid1", "1")
-        yield event_queue.get()
-        yield deferLater(reactor, 0, lambda: None)
-        self.clock.advance(self.manager.message_timeout + 1)
-        yield assert_fails_with(d2, Empty)
-
-
-class QueueManagerTestWithPrefix(QueueManagerTest):
-
-    prefix = "test"
-    tag_prefix = "test.notifications-tag."
-    queue_prefix = "test.notifications-queue."
+from txlongpoll.frontend import FrontEndAjax
 
 
 class FakeMessageQueue(object):
@@ -434,7 +178,3 @@
         self.assertEqual("Invalid request", data)
         self.assertEqual("", request.written.getvalue())
         self.assertEqual(400, request.code)
-
-
-def test_suite():
-    return defaultTestLoader.loadTestsFromName(__name__)

=== added file 'txlongpoll/tests/test_integration.py'
--- txlongpoll/tests/test_integration.py	1970-01-01 00:00:00 +0000
+++ txlongpoll/tests/test_integration.py	2016-06-08 13:47:51 +0000
@@ -0,0 +1,262 @@
+# Copyright 2005-2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Integration tests running a real RabbitMQ broker."""
+
+from twisted.internet import reactor
+from twisted.internet.defer import (
+    inlineCallbacks,
+    Deferred,
+)
+from twisted.internet.task import (
+    Clock,
+    deferLater,
+    )
+
+from txamqp.content import Content
+from txamqp.protocol import (
+    AMQChannel,
+    AMQClient,
+)
+from txamqp.queue import Empty
+
+from testtools.deferredruntest import assert_fails_with
+
+from txlongpoll.notification import NotFound
+from txlongpoll.frontend import DeprecatedQueueManager
+from txlongpoll.testing.client import (
+    AMQTest,
+    QueueWrapper,
+)
+
+
+class DeprecatedQueueManagerTest(AMQTest):
+
+    prefix = None
+    tag_prefix = ""
+    queue_prefix = ""
+
+    def setUp(self):
+        self.clock = Clock()
+        self.manager = DeprecatedQueueManager(self.prefix)
+        return AMQTest.setUp(self)
+
+    def test_wb_connected(self):
+        """
+        The C{connected} callback of L{DeprecatedQueueManager} sets the
+        C{_client} and C{_channel} attributes.
+        """
+        d = self.manager.connected((self.client, self.channel))
+        self.assertTrue(isinstance(self.manager._client, AMQClient))
+        self.assertTrue(isinstance(self.manager._channel, AMQChannel))
+        self.assertIs(self.manager._client, self.client)
+        self.assertIs(self.manager._channel, self.channel)
+        return d
+
+    @inlineCallbacks
+    def test_get_message(self):
+        """
+        C{get_message} returns the message exposed to a previously created
+        queue.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1", auto_delete=True)
+        content = Content("some content")
+
+        yield self.channel.basic_publish(
+            routing_key=self.queue_prefix + "uuid1",
+            content=content)
+        message = yield self.manager.get_message("uuid1", "0")
+        self.assertEquals(message[0], "some content")
+
+        self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
+
+    @inlineCallbacks
+    def test_reject_message(self):
+        """
+        C{reject_message} puts back a message in the queue so that it's
+        available to subsequent C{get_message} calls.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+        content = Content("some content")
+
+        yield self.channel.basic_publish(
+            routing_key=self.queue_prefix + "uuid1",
+            content=content)
+        message, tag = yield self.manager.get_message("uuid1", "0")
+        yield self.manager.reject_message(tag)
+        message2, tag2 = yield self.manager.get_message("uuid1", "1")
+        self.assertEquals(message2, "some content")
+
+    @inlineCallbacks
+    def test_ack_message(self):
+        """
+        C{ack_message} confirms the removal of a message from the queue, making
+        subsequent C{get_message} calls waiting for another message.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+        content = Content("some content")
+
+        yield self.channel.basic_publish(
+            routing_key=self.queue_prefix + "uuid1",
+            content=content)
+        message, tag = yield self.manager.get_message("uuid1", "0")
+        yield self.manager.ack_message(tag)
+
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+
+        d = self.manager.get_message("uuid1", "1")
+        yield event_queue.get()
+        yield deferLater(reactor, 0, lambda: None)
+        self.clock.advance(self.manager.message_timeout + 1)
+        yield assert_fails_with(d, Empty)
+
+    @inlineCallbacks
+    def test_get_message_after_error(self):
+        """
+        If an error occurs in C{get_message}, the transport is closed so that
+        we can query messages again.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        d = self.manager.get_message("uuid_unknown", "0")
+        yield assert_fails_with(d, NotFound)
+        self.assertTrue(self.channel.closed)
+        yield deferLater(reactor, 0.1, lambda: None)
+        self.assertTrue(self.client.transport.disconnected)
+
+    @inlineCallbacks
+    def test_get_message_during_error(self):
+        """
+        If an error occurs in C{get_message}, other C{get_message} calls don't
+        fail, and are retried on reconnection instead.
+        """
+        self.factory.initialDelay = 0.1
+        self.factory.resetDelay()
+        self.amq_disconnected = self.manager.disconnected
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+        content = Content("some content")
+
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+
+        d1 = self.manager.get_message("uuid1", "0")
+        yield event_queue.get()
+
+        d2 = self.manager.get_message("uuid_unknown", "0")
+
+        yield assert_fails_with(d2, NotFound)
+        self.assertTrue(self.channel.closed)
+
+        self.connected_deferred = Deferred()
+        yield self.connected_deferred
+
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.basic_publish(
+            routing_key=self.queue_prefix + "uuid1",
+            content=content)
+
+        message = yield d1
+        self.assertEquals(message[0], "some content")
+
+    @inlineCallbacks
+    def test_get_message_timeout(self):
+        """
+        C{get_message} timeouts after a certain amount of time if no message
+        arrived on the queue.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+
+        d = self.manager.get_message("uuid1", "0")
+        yield event_queue.get()
+        yield deferLater(reactor, 0, lambda: None)
+        self.clock.advance(self.manager.message_timeout + 1)
+        yield assert_fails_with(d, Empty)
+
+        self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
+
+    @inlineCallbacks
+    def test_wb_timeout_race_condition(self):
+        """
+        If a message is received between the time the queue gets a timeout and
+        C{get_message} cancels the subscription, the message is recovered and
+        returned by C{get_message}.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+        content = Content("some content")
+
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+        old_timeout = reply._timeout
+
+        def timeout(deferred):
+            self.channel.basic_publish(
+                routing_key=self.queue_prefix + "uuid1",
+                content=content)
+            old_timeout(deferred)
+
+        reply._timeout = timeout
+
+        d = self.manager.get_message("uuid1", "0")
+        yield event_queue.get()
+        yield deferLater(reactor, 0, lambda: None)
+        self.clock.advance(self.manager.message_timeout + 1)
+
+        message = yield d
+        self.assertEquals(message[0], "some content")
+
+    @inlineCallbacks
+    def test_retry_after_timeout(self):
+        """
+        If a timeout happens, one can retry to consume message from the queue
+        later on.
+        """
+        yield self.manager.connected((self.client, self.channel))
+        yield self.channel.queue_declare(
+            queue=self.queue_prefix + "uuid1")
+
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+
+        d1 = self.manager.get_message("uuid1", "0")
+        yield event_queue.get()
+        yield deferLater(reactor, 0, lambda: None)
+        self.clock.advance(self.manager.message_timeout + 1)
+        yield assert_fails_with(d1, Empty)
+
+        # Let's wrap the queue again
+        reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
+        reply.clock = self.clock
+        event_queue = QueueWrapper(reply).event_queue
+
+        d2 = self.manager.get_message("uuid1", "1")
+        yield event_queue.get()
+        yield deferLater(reactor, 0, lambda: None)
+        self.clock.advance(self.manager.message_timeout + 1)
+        yield assert_fails_with(d2, Empty)
+
+
+class DeprecatedQueueManagerTestWithPrefix(DeprecatedQueueManagerTest):
+
+    prefix = "test"
+    tag_prefix = "test.notifications-tag."
+    queue_prefix = "test.notifications-queue."


Follow ups