launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20592
[Merge] lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll
Free Ekanayaka has proposed merging lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~free.ekanayaka/txlongpoll/queue-manager-into-own-module/+merge/296791
Move QueueManager into its own modude. There's no change in the logic, and it's a move towards deprecating a few APIs of QueueManager (in particular QueueManager.connected/QueueManager.disconnected) in order to be able to leverage the new ClientService from Twisted 16.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~free.ekanayaka/txlongpoll/queue-manager-into-own-module into lp:txlongpoll.
=== modified file 'setup.py'
--- setup.py 2016-06-08 11:02:39 +0000
+++ setup.py 2016-06-08 13:47:51 +0000
@@ -12,7 +12,7 @@
setup(
name='txlongpoll',
- version="0.3.2",
+ version="4.0.0",
packages=find_packages('.') + ['twisted.plugins'],
include_package_data=True,
zip_safe=False,
=== modified file 'txlongpoll/frontend.py'
--- txlongpoll/frontend.py 2011-09-30 09:56:12 +0000
+++ txlongpoll/frontend.py 2016-06-08 13:47:51 +0000
@@ -8,11 +8,12 @@
import json
from twisted.internet.defer import (
+ inlineCallbacks,
Deferred,
- inlineCallbacks,
- returnValue,
- )
+)
from twisted.python import log
+from twisted.python.deprecate import deprecatedModuleAttribute
+from twisted.python.versions import Version
from twisted.web.http import (
BAD_REQUEST,
INTERNAL_SERVER_ERROR,
@@ -21,49 +22,22 @@
)
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
-from txamqp.client import Closed
-from txamqp.queue import (
- Closed as QueueClosed,
- Empty,
- )
-
-
-__all__ = ["QueueManager", "FrontEndAjax"]
-
-
-class NotFound(Exception):
- """Exception raised when a queue is not found in the message server."""
-
-
-class QueueManager(object):
- """
- An AMQP consumer which handles messages sent over a "frontend" queue to
- set up temporary queues. The L{get_message} method should be invoked to
- retrieve one single message from those temporary queues.
-
- @ivar message_timeout: time to wait for a message before giving up in
- C{get_message}.
- @ivar _channel: reference to the current C{AMQChannel}.
- @ivar _client: reference to the current C{AMQClient}.
- """
-
- # The timeout must be lower than the Apache one in front, which by default
- # is 5 minutes.
- message_timeout = 270
-
- def __init__(self, prefix=None):
- self._prefix = prefix
- self._channel = None
- self._client = None
- self._pending_requests = []
- # Preserve compatibility by using special forms for naming when a
- # prefix is specified.
- if self._prefix is not None and len(self._prefix) != 0:
- self._tag_form = "%s.notifications-tag.%%s.%%s" % self._prefix
- self._queue_form = "%s.notifications-queue.%%s" % self._prefix
- else:
- self._tag_form = "%s.%s"
- self._queue_form = "%s"
+from txamqp.queue import Empty
+from txlongpoll.notification import (
+ NotFound,
+ NotificationSource,
+)
+
+
+__all__ = ["DeprecatedQueueManager", "QueueManager", "FrontEndAjax"]
+
+
+class DeprecatedQueueManager(NotificationSource):
+ """
+ Legacy queue manager implementing the connected/disconnected callbacks.
+ This class is deprecated and will eventually be dropped in favour of
+ NotificationSource, which is designed to leverage txamqp's AMQService.
+ """
def disconnected(self):
"""
@@ -88,69 +62,6 @@
self._pending_requests.pop(0).callback(None)
return d
- def _wait_for_connection(self):
- """
- Return a L{Deferred} which will fire when the connection is available.
- """
- pending = Deferred()
- self._pending_requests.append(pending)
- return pending
-
- @inlineCallbacks
- def get_message(self, uuid, sequence):
- """Consume and return one message for C{uuid}.
-
- @param uuid: The identifier of the queue.
- @param sequence: The sequential number for identifying the subscriber
- in the queue.
-
- If no message is received within the number of seconds in
- L{message_timeout}, then the returned Deferred will errback with
- L{Empty}.
- """
- if self._channel is None:
- yield self._wait_for_connection()
- tag = self._tag_form % (uuid, sequence)
- try:
- yield self._channel.basic_consume(
- consumer_tag=tag, queue=(self._queue_form % uuid))
-
- log.msg("Consuming from queue '%s'" % uuid)
-
- queue = yield self._client.queue(tag)
- msg = yield queue.get(self.message_timeout)
- except Empty:
- # Let's wait for the cancel here
- yield self._channel.basic_cancel(consumer_tag=tag)
- self._client.queues.pop(tag, None)
- # Check for the messages arrived in the mean time
- if queue.pending:
- msg = queue.pending.pop()
- returnValue((msg.content.body, msg.delivery_tag))
- raise Empty()
- except QueueClosed:
- # The queue has been closed, presumably because of a side effect.
- # Let's retry after reconnection.
- yield self._wait_for_connection()
- data = yield self.get_message(uuid, sequence)
- returnValue(data)
- except Closed, e:
- if self._client and self._client.transport:
- self._client.transport.loseConnection()
- if e.args and e.args[0].reply_code == 404:
- raise NotFound()
- else:
- raise
- except:
- if self._client and self._client.transport:
- self._client.transport.loseConnection()
- raise
-
- yield self._channel.basic_cancel(consumer_tag=tag, nowait=True)
- self._client.queues.pop(tag, None)
-
- returnValue((msg.content.body, msg.delivery_tag))
-
def reject_message(self, tag):
"""Put back a message."""
return self._channel.basic_reject(tag, requeue=True)
@@ -174,6 +85,21 @@
queue = yield self._client.queue(tag)
queue.put(Empty)
+ def _wait_for_connection(self):
+ """
+ Return a L{Deferred} which will fire when the connection is available.
+ """
+ pending = Deferred()
+ self._pending_requests.append(pending)
+ return pending
+
+QueueManager = DeprecatedQueueManager # For backward compatibility
+deprecatedModuleAttribute(
+ Version("txlongpoll", 4, 0, 0),
+ "Use txlongpoll.notification.NotificationSource instead.",
+ __name__,
+ "QueueManager")
+
class FrontEndAjax(Resource):
"""
=== added file 'txlongpoll/notification.py'
--- txlongpoll/notification.py 1970-01-01 00:00:00 +0000
+++ txlongpoll/notification.py 2016-06-08 13:47:51 +0000
@@ -0,0 +1,120 @@
+# Copyright 2005-2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Fire notifications from by consuming AMQP queues.
+
+This module provides a small abstraction around the AMQP protocol/transport,
+implementing an API that can be consumed by call sites wanting to receive
+notifications from specific streams identified by UUIDs, that map to AMQP
+queues.
+
+See also txlongpoll.frontend.FrontEndAjax.
+"""
+
+from twisted.internet.defer import (
+ inlineCallbacks,
+ returnValue,
+)
+from twisted.python import log
+from txamqp.client import Closed
+from txamqp.queue import (
+ Closed as QueueClosed,
+ Empty,
+ )
+
+
+__all__ = ["NotFound", "NotificationSource"]
+
+
+class NotFound(Exception):
+ """Raised the notifications stream for a given UUID is not available.
+
+ This typically happens when the associated AMQP queue doesn't exist
+ or was delelated from the broker.
+ """
+
+
+class NotificationSource(object):
+ """
+ An AMQP consumer which handles messages sent over a "frontend" queue to
+ set up temporary queues. The L{get_message} method should be invoked to
+ retrieve one single message from those temporary queues.
+
+ @ivar message_timeout: time to wait for a message before giving up in
+ C{get_message}.
+ @ivar _channel: reference to the current C{AMQChannel}.
+ @ivar _client: reference to the current C{AMQClient}.
+ """
+
+ # The timeout must be lower than the Apache one in front, which by default
+ # is 5 minutes.
+ message_timeout = 270
+
+ def __init__(self, prefix=None):
+ self._prefix = prefix
+ self._channel = None
+ self._client = None
+ self._pending_requests = []
+ # Preserve compatibility by using special forms for naming when a
+ # prefix is specified.
+ if self._prefix is not None and len(self._prefix) != 0:
+ self._tag_form = "%s.notifications-tag.%%s.%%s" % self._prefix
+ self._queue_form = "%s.notifications-queue.%%s" % self._prefix
+ else:
+ self._tag_form = "%s.%s"
+ self._queue_form = "%s"
+
+ @inlineCallbacks
+ def get_message(self, uuid, sequence):
+ """Consume and return one message for C{uuid}.
+
+ @param uuid: The identifier of the queue.
+ @param sequence: The sequential number for identifying the subscriber
+ in the queue.
+
+ If no message is received within the number of seconds in
+ L{message_timeout}, then the returned Deferred will errback with
+ L{Empty}.
+ """
+ if self._channel is None:
+ yield self._wait_for_connection()
+ tag = self._tag_form % (uuid, sequence)
+ try:
+ yield self._channel.basic_consume(
+ consumer_tag=tag, queue=(self._queue_form % uuid))
+
+ log.msg("Consuming from queue '%s'" % uuid)
+
+ queue = yield self._client.queue(tag)
+ msg = yield queue.get(self.message_timeout)
+ except Empty:
+ # Let's wait for the cancel here
+ yield self._channel.basic_cancel(consumer_tag=tag)
+ self._client.queues.pop(tag, None)
+ # Check for the messages arrived in the mean time
+ if queue.pending:
+ msg = queue.pending.pop()
+ returnValue((msg.content.body, msg.delivery_tag))
+ raise Empty()
+ except QueueClosed:
+ # The queue has been closed, presumably because of a side effect.
+ # Let's retry after reconnection.
+ yield self._wait_for_connection()
+ data = yield self.get_message(uuid, sequence)
+ returnValue(data)
+ except Closed, e:
+ if self._client and self._client.transport:
+ self._client.transport.loseConnection()
+ if e.args and e.args[0].reply_code == 404:
+ raise NotFound()
+ else:
+ raise
+ except:
+ if self._client and self._client.transport:
+ self._client.transport.loseConnection()
+ raise
+
+ yield self._channel.basic_cancel(consumer_tag=tag, nowait=True)
+ self._client.queues.pop(tag, None)
+
+ returnValue((msg.content.body, msg.delivery_tag))
=== modified file 'txlongpoll/plugin.py'
--- txlongpoll/plugin.py 2012-04-16 07:33:18 +0000
+++ txlongpoll/plugin.py 2016-06-08 13:47:51 +0000
@@ -34,7 +34,7 @@
from txlongpoll.client import AMQFactory
from txlongpoll.frontend import (
FrontEndAjax,
- QueueManager,
+ DeprecatedQueueManager,
)
from txlongpoll.services import (
LogService,
@@ -148,7 +148,7 @@
frontend_port = frontend_config["port"]
frontend_prefix = frontend_config["prefix"]
frontend_interface = frontend_config["interface"]
- frontend_manager = QueueManager(frontend_prefix)
+ frontend_manager = DeprecatedQueueManager(frontend_prefix)
broker_config = config["broker"]
broker_port = broker_config["port"]
=== modified file 'txlongpoll/tests/test_frontend.py'
--- txlongpoll/tests/test_frontend.py 2015-11-23 09:02:27 +0000
+++ txlongpoll/tests/test_frontend.py 2016-06-08 13:47:51 +0000
@@ -3,271 +3,15 @@
from cStringIO import StringIO
import json
-from unittest import defaultTestLoader
from testtools import TestCase
-from testtools.deferredruntest import (
- assert_fails_with,
- run_with_log_observers,
- )
-from twisted.internet import reactor
+from testtools.deferredruntest import run_with_log_observers
from twisted.internet.defer import (
Deferred,
fail,
- inlineCallbacks,
succeed,
)
-from twisted.internet.task import (
- Clock,
- deferLater,
- )
-from txamqp.content import Content
-from txamqp.protocol import (
- AMQChannel,
- AMQClient,
- )
-from txamqp.queue import Empty
-from txlongpoll.frontend import (
- FrontEndAjax,
- NotFound,
- QueueManager,
- )
-from txlongpoll.testing.client import (
- AMQTest,
- QueueWrapper,
- )
-
-
-class QueueManagerTest(AMQTest):
-
- prefix = None
- tag_prefix = ""
- queue_prefix = ""
-
- def setUp(self):
- self.clock = Clock()
- self.manager = QueueManager(self.prefix)
- return AMQTest.setUp(self)
-
- def test_wb_connected(self):
- """
- The C{connected} callback of L{QueueManager} sets the C{_client} and
- C{_channel} attributes.
- """
- d = self.manager.connected((self.client, self.channel))
- self.assertTrue(isinstance(self.manager._client, AMQClient))
- self.assertTrue(isinstance(self.manager._channel, AMQChannel))
- self.assertIs(self.manager._client, self.client)
- self.assertIs(self.manager._channel, self.channel)
- return d
-
- @inlineCallbacks
- def test_get_message(self):
- """
- C{get_message} returns the message exposed to a previously created
- queue.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1", auto_delete=True)
- content = Content("some content")
-
- yield self.channel.basic_publish(
- routing_key=self.queue_prefix + "uuid1",
- content=content)
- message = yield self.manager.get_message("uuid1", "0")
- self.assertEquals(message[0], "some content")
-
- self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
-
- @inlineCallbacks
- def test_reject_message(self):
- """
- C{reject_message} puts back a message in the queue so that it's
- available to subsequent C{get_message} calls.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
- content = Content("some content")
-
- yield self.channel.basic_publish(
- routing_key=self.queue_prefix + "uuid1",
- content=content)
- message, tag = yield self.manager.get_message("uuid1", "0")
- yield self.manager.reject_message(tag)
- message2, tag2 = yield self.manager.get_message("uuid1", "1")
- self.assertEquals(message2, "some content")
-
- @inlineCallbacks
- def test_ack_message(self):
- """
- C{ack_message} confirms the removal of a message from the queue, making
- subsequent C{get_message} calls waiting for another message.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
- content = Content("some content")
-
- yield self.channel.basic_publish(
- routing_key=self.queue_prefix + "uuid1",
- content=content)
- message, tag = yield self.manager.get_message("uuid1", "0")
- yield self.manager.ack_message(tag)
-
- reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
-
- d = self.manager.get_message("uuid1", "1")
- yield event_queue.get()
- yield deferLater(reactor, 0, lambda: None)
- self.clock.advance(self.manager.message_timeout + 1)
- yield assert_fails_with(d, Empty)
-
- @inlineCallbacks
- def test_get_message_after_error(self):
- """
- If an error occurs in C{get_message}, the transport is closed so that
- we can query messages again.
- """
- yield self.manager.connected((self.client, self.channel))
- d = self.manager.get_message("uuid_unknown", "0")
- yield assert_fails_with(d, NotFound)
- self.assertTrue(self.channel.closed)
- yield deferLater(reactor, 0.1, lambda: None)
- self.assertTrue(self.client.transport.disconnected)
-
- @inlineCallbacks
- def test_get_message_during_error(self):
- """
- If an error occurs in C{get_message}, other C{get_message} calls don't
- fail, and are retried on reconnection instead.
- """
- self.factory.initialDelay = 0.1
- self.factory.resetDelay()
- self.amq_disconnected = self.manager.disconnected
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
- content = Content("some content")
-
- reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
-
- d1 = self.manager.get_message("uuid1", "0")
- yield event_queue.get()
-
- d2 = self.manager.get_message("uuid_unknown", "0")
-
- yield assert_fails_with(d2, NotFound)
- self.assertTrue(self.channel.closed)
-
- self.connected_deferred = Deferred()
- yield self.connected_deferred
-
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.basic_publish(
- routing_key=self.queue_prefix + "uuid1",
- content=content)
-
- message = yield d1
- self.assertEquals(message[0], "some content")
-
- @inlineCallbacks
- def test_get_message_timeout(self):
- """
- C{get_message} timeouts after a certain amount of time if no message
- arrived on the queue.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
-
- reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
-
- d = self.manager.get_message("uuid1", "0")
- yield event_queue.get()
- yield deferLater(reactor, 0, lambda: None)
- self.clock.advance(self.manager.message_timeout + 1)
- yield assert_fails_with(d, Empty)
-
- self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
-
- @inlineCallbacks
- def test_wb_timeout_race_condition(self):
- """
- If a message is received between the time the queue gets a timeout and
- C{get_message} cancels the subscription, the message is recovered and
- returned by C{get_message}.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
- content = Content("some content")
-
- reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
- old_timeout = reply._timeout
-
- def timeout(deferred):
- self.channel.basic_publish(
- routing_key=self.queue_prefix + "uuid1",
- content=content)
- old_timeout(deferred)
-
- reply._timeout = timeout
-
- d = self.manager.get_message("uuid1", "0")
- yield event_queue.get()
- yield deferLater(reactor, 0, lambda: None)
- self.clock.advance(self.manager.message_timeout + 1)
-
- message = yield d
- self.assertEquals(message[0], "some content")
-
- @inlineCallbacks
- def test_retry_after_timeout(self):
- """
- If a timeout happens, one can retry to consume message from the queue
- later on.
- """
- yield self.manager.connected((self.client, self.channel))
- yield self.channel.queue_declare(
- queue=self.queue_prefix + "uuid1")
-
- reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
-
- d1 = self.manager.get_message("uuid1", "0")
- yield event_queue.get()
- yield deferLater(reactor, 0, lambda: None)
- self.clock.advance(self.manager.message_timeout + 1)
- yield assert_fails_with(d1, Empty)
-
- # Let's wrap the queue again
- reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
- reply.clock = self.clock
- event_queue = QueueWrapper(reply).event_queue
-
- d2 = self.manager.get_message("uuid1", "1")
- yield event_queue.get()
- yield deferLater(reactor, 0, lambda: None)
- self.clock.advance(self.manager.message_timeout + 1)
- yield assert_fails_with(d2, Empty)
-
-
-class QueueManagerTestWithPrefix(QueueManagerTest):
-
- prefix = "test"
- tag_prefix = "test.notifications-tag."
- queue_prefix = "test.notifications-queue."
+from txlongpoll.frontend import FrontEndAjax
class FakeMessageQueue(object):
@@ -434,7 +178,3 @@
self.assertEqual("Invalid request", data)
self.assertEqual("", request.written.getvalue())
self.assertEqual(400, request.code)
-
-
-def test_suite():
- return defaultTestLoader.loadTestsFromName(__name__)
=== added file 'txlongpoll/tests/test_integration.py'
--- txlongpoll/tests/test_integration.py 1970-01-01 00:00:00 +0000
+++ txlongpoll/tests/test_integration.py 2016-06-08 13:47:51 +0000
@@ -0,0 +1,262 @@
+# Copyright 2005-2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Integration tests running a real RabbitMQ broker."""
+
+from twisted.internet import reactor
+from twisted.internet.defer import (
+ inlineCallbacks,
+ Deferred,
+)
+from twisted.internet.task import (
+ Clock,
+ deferLater,
+ )
+
+from txamqp.content import Content
+from txamqp.protocol import (
+ AMQChannel,
+ AMQClient,
+)
+from txamqp.queue import Empty
+
+from testtools.deferredruntest import assert_fails_with
+
+from txlongpoll.notification import NotFound
+from txlongpoll.frontend import DeprecatedQueueManager
+from txlongpoll.testing.client import (
+ AMQTest,
+ QueueWrapper,
+)
+
+
+class DeprecatedQueueManagerTest(AMQTest):
+
+ prefix = None
+ tag_prefix = ""
+ queue_prefix = ""
+
+ def setUp(self):
+ self.clock = Clock()
+ self.manager = DeprecatedQueueManager(self.prefix)
+ return AMQTest.setUp(self)
+
+ def test_wb_connected(self):
+ """
+ The C{connected} callback of L{DeprecatedQueueManager} sets the
+ C{_client} and C{_channel} attributes.
+ """
+ d = self.manager.connected((self.client, self.channel))
+ self.assertTrue(isinstance(self.manager._client, AMQClient))
+ self.assertTrue(isinstance(self.manager._channel, AMQChannel))
+ self.assertIs(self.manager._client, self.client)
+ self.assertIs(self.manager._channel, self.channel)
+ return d
+
+ @inlineCallbacks
+ def test_get_message(self):
+ """
+ C{get_message} returns the message exposed to a previously created
+ queue.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1", auto_delete=True)
+ content = Content("some content")
+
+ yield self.channel.basic_publish(
+ routing_key=self.queue_prefix + "uuid1",
+ content=content)
+ message = yield self.manager.get_message("uuid1", "0")
+ self.assertEquals(message[0], "some content")
+
+ self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
+
+ @inlineCallbacks
+ def test_reject_message(self):
+ """
+ C{reject_message} puts back a message in the queue so that it's
+ available to subsequent C{get_message} calls.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+ content = Content("some content")
+
+ yield self.channel.basic_publish(
+ routing_key=self.queue_prefix + "uuid1",
+ content=content)
+ message, tag = yield self.manager.get_message("uuid1", "0")
+ yield self.manager.reject_message(tag)
+ message2, tag2 = yield self.manager.get_message("uuid1", "1")
+ self.assertEquals(message2, "some content")
+
+ @inlineCallbacks
+ def test_ack_message(self):
+ """
+ C{ack_message} confirms the removal of a message from the queue, making
+ subsequent C{get_message} calls waiting for another message.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+ content = Content("some content")
+
+ yield self.channel.basic_publish(
+ routing_key=self.queue_prefix + "uuid1",
+ content=content)
+ message, tag = yield self.manager.get_message("uuid1", "0")
+ yield self.manager.ack_message(tag)
+
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+
+ d = self.manager.get_message("uuid1", "1")
+ yield event_queue.get()
+ yield deferLater(reactor, 0, lambda: None)
+ self.clock.advance(self.manager.message_timeout + 1)
+ yield assert_fails_with(d, Empty)
+
+ @inlineCallbacks
+ def test_get_message_after_error(self):
+ """
+ If an error occurs in C{get_message}, the transport is closed so that
+ we can query messages again.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ d = self.manager.get_message("uuid_unknown", "0")
+ yield assert_fails_with(d, NotFound)
+ self.assertTrue(self.channel.closed)
+ yield deferLater(reactor, 0.1, lambda: None)
+ self.assertTrue(self.client.transport.disconnected)
+
+ @inlineCallbacks
+ def test_get_message_during_error(self):
+ """
+ If an error occurs in C{get_message}, other C{get_message} calls don't
+ fail, and are retried on reconnection instead.
+ """
+ self.factory.initialDelay = 0.1
+ self.factory.resetDelay()
+ self.amq_disconnected = self.manager.disconnected
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+ content = Content("some content")
+
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+
+ d1 = self.manager.get_message("uuid1", "0")
+ yield event_queue.get()
+
+ d2 = self.manager.get_message("uuid_unknown", "0")
+
+ yield assert_fails_with(d2, NotFound)
+ self.assertTrue(self.channel.closed)
+
+ self.connected_deferred = Deferred()
+ yield self.connected_deferred
+
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.basic_publish(
+ routing_key=self.queue_prefix + "uuid1",
+ content=content)
+
+ message = yield d1
+ self.assertEquals(message[0], "some content")
+
+ @inlineCallbacks
+ def test_get_message_timeout(self):
+ """
+ C{get_message} timeouts after a certain amount of time if no message
+ arrived on the queue.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+
+ d = self.manager.get_message("uuid1", "0")
+ yield event_queue.get()
+ yield deferLater(reactor, 0, lambda: None)
+ self.clock.advance(self.manager.message_timeout + 1)
+ yield assert_fails_with(d, Empty)
+
+ self.assertNotIn(self.tag_prefix + "uuid1.0", self.client.queues)
+
+ @inlineCallbacks
+ def test_wb_timeout_race_condition(self):
+ """
+ If a message is received between the time the queue gets a timeout and
+ C{get_message} cancels the subscription, the message is recovered and
+ returned by C{get_message}.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+ content = Content("some content")
+
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+ old_timeout = reply._timeout
+
+ def timeout(deferred):
+ self.channel.basic_publish(
+ routing_key=self.queue_prefix + "uuid1",
+ content=content)
+ old_timeout(deferred)
+
+ reply._timeout = timeout
+
+ d = self.manager.get_message("uuid1", "0")
+ yield event_queue.get()
+ yield deferLater(reactor, 0, lambda: None)
+ self.clock.advance(self.manager.message_timeout + 1)
+
+ message = yield d
+ self.assertEquals(message[0], "some content")
+
+ @inlineCallbacks
+ def test_retry_after_timeout(self):
+ """
+ If a timeout happens, one can retry to consume message from the queue
+ later on.
+ """
+ yield self.manager.connected((self.client, self.channel))
+ yield self.channel.queue_declare(
+ queue=self.queue_prefix + "uuid1")
+
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.0")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+
+ d1 = self.manager.get_message("uuid1", "0")
+ yield event_queue.get()
+ yield deferLater(reactor, 0, lambda: None)
+ self.clock.advance(self.manager.message_timeout + 1)
+ yield assert_fails_with(d1, Empty)
+
+ # Let's wrap the queue again
+ reply = yield self.client.queue(self.tag_prefix + "uuid1.1")
+ reply.clock = self.clock
+ event_queue = QueueWrapper(reply).event_queue
+
+ d2 = self.manager.get_message("uuid1", "1")
+ yield event_queue.get()
+ yield deferLater(reactor, 0, lambda: None)
+ self.clock.advance(self.manager.message_timeout + 1)
+ yield assert_fails_with(d2, Empty)
+
+
+class DeprecatedQueueManagerTestWithPrefix(DeprecatedQueueManagerTest):
+
+ prefix = "test"
+ tag_prefix = "test.notifications-tag."
+ queue_prefix = "test.notifications-queue."
Follow ups