launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20671
[Merge] lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll
Free Ekanayaka has proposed merging lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~free.ekanayaka/txlongpoll/notification-ack-and-reject/+merge/298216
Add acknowledgement/rejection support to txlongpoll.notification.Notification.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll.
=== modified file 'txlongpoll/notification.py'
--- txlongpoll/notification.py 2016-06-06 13:07:31 +0000
+++ txlongpoll/notification.py 2016-06-23 12:20:23 +0000
@@ -10,6 +10,7 @@
See also txlongpoll.frontend.FrontEndAjax.
"""
+from functools import partial
from twisted.internet import reactor
from twisted.internet.error import ConnectionClosed as TransportClosed
@@ -50,6 +51,10 @@
"""
+class Bounced(Exception):
+ """Raised if a Notification could not be ack'ed or rejected."""
+
+
class NotificationConnector(object):
"""Provide ready-to-use AMQP channels."""
@@ -104,19 +109,30 @@
class Notification(object):
"""A single notification from a stream."""
- def __init__(self, source, message):
+ def __init__(self, source, channel, message):
"""
- @param source: The NotificationSource the message was received through.
+ @param source: The NotificationSource that generated the notification.
+ @param channel: The AMQChannel the message was received through.
@param message: The raw txamqp.message.Message received from the
underlying AMQP queue.
"""
self._source = source
+ self._channel = channel
self._message = message
@property
def payload(self):
+ """Return the content of the notification."""
return self._message.content.body
+ def ack(self):
+ """Confirm that the notification was successfully processed."""
+ return self._source._done(self, True)
+
+ def reject(self):
+ """Reject the the notification, it will be re-queued."""
+ return self._source._done(self, False)
+
class NotificationSource(object):
"""
@@ -235,7 +251,7 @@
else:
raise Timeout()
- returnValue(Notification(self, msg))
+ returnValue(Notification(self, channel, msg))
@inlineCallbacks
def _check_retriable(self, method, **kwargs):
@@ -275,6 +291,32 @@
finally:
self._channel_lock.release()
+ @inlineCallbacks
+ def _done(self, notification, successful):
+ """Confirm that a notification has been handled (successfully or not).
+
+ @param notification: The Notification to confirm.
+ @param successful: If True, then the notification has been correctly
+ processed and will be deleted. If False, it will be re-queued and
+ be available at the next NotificationSource.get() call for the
+ same UUID.
+ """
+ channel = notification._channel
+ if successful:
+ method = channel.basic_ack
+ else:
+ method = partial(channel.basic_reject, requeue=True)
+
+ yield self._channel_lock.acquire()
+ try:
+ yield method(delivery_tag=notification._message.delivery_tag)
+ except Closed:
+ # If we hit any channel or connection error, we raise an error
+ # since there's no way this can be re-tried.
+ raise Bounced()
+ finally:
+ self._channel_lock.release()
+
class _Retriable(Exception):
"""Raised by _check_retriable in case of transient errors."""
=== modified file 'txlongpoll/tests/test_integration.py'
--- txlongpoll/tests/test_integration.py 2016-06-23 07:56:35 +0000
+++ txlongpoll/tests/test_integration.py 2016-06-23 12:20:23 +0000
@@ -34,6 +34,7 @@
NotificationSource,
NotFound,
Timeout,
+ Bounced,
)
from txlongpoll.client import AMQP0_8_SPEC_PATH
from txlongpoll.frontend import DeprecatedQueueManager
@@ -315,6 +316,64 @@
self.assertEqual("hello", notification.payload)
self.assertEqual(2, proxy.connections)
+ @inlineCallbacks
+ def test_reject_notification(self):
+ """
+ Calling reject() on a Notification puts the associated message back in
+ the queue so that it's available to subsequent get() calls.
+ """
+ yield self.channel.basic_publish(
+ routing_key="uuid", content=Content("hello"))
+ notification = yield self.source.get("uuid", 0)
+ yield notification.reject()
+
+ notification = yield self.source.get("uuid", 1)
+ self.assertEqual("hello", notification.payload)
+
+ @inlineCallbacks
+ def test_ack_message(self):
+ """
+ Calling ack() on a Notification confirms the removal of the
+ associated message from the queue, making subsequent calls
+ waiting for another message.
+ """
+ yield self.channel.basic_publish(
+ routing_key="uuid", content=Content("hello"))
+ notification = yield self.source.get("uuid", 0)
+ yield notification.ack()
+
+ yield self.channel.basic_publish(
+ routing_key="uuid", content=Content("hello 2"))
+ notification = yield self.source.get("uuid", 1)
+ self.assertEqual("hello 2", notification.payload)
+
+ @inlineCallbacks
+ def test_ack_with_broker_shutdown(self):
+ """
+ If rabbitmq gets shutdown before we ack a Notification, an error is
+ raised.
+ """
+ client = yield self.service.whenConnected()
+
+ yield self.channel.basic_publish(
+ routing_key="uuid", content=Content("hello"))
+ notification = yield self.source.get("uuid", 0)
+
+ self.rabbit.cleanUp()
+
+ yield client.disconnected.wait()
+
+ try:
+ yield notification.ack()
+ except Bounced:
+ pass
+ else:
+ self.fail("Notification not bounced")
+
+ self.rabbit.config = RabbitServerResources(
+ port=self.rabbit.config.port) # Ensure that we use the same port
+ self.rabbit.setUp()
+
class DeprecatedQueueManagerTest(AMQTest):
=== modified file 'txlongpoll/tests/test_notification.py'
--- txlongpoll/tests/test_notification.py 2016-06-23 07:04:53 +0000
+++ txlongpoll/tests/test_notification.py 2016-06-23 12:20:23 +0000
@@ -12,6 +12,8 @@
succeeded,
failed,
)
+# XXX This testtools API should probably be public, see #1589657.
+from testtools.twistedsupport._deferred import extract_result
from twisted.internet.task import Clock
from twisted.logger import Logger
@@ -25,6 +27,7 @@
NotificationSource,
Timeout,
NotFound,
+ Bounced,
)
from txlongpoll.testing.unit import (
FakeConnector,
@@ -281,6 +284,83 @@
channel.basic_cancel_ok(consumer_tag="uuid2.1")
self.assertThat(deferred2, fires_with_payload("foo"))
+ def test_notification_ack(self):
+ """
+ Calling Notification.ack() acknowledges a notification.
+ """
+ deferred = self.source.get("uuid", 1)
+ channel = self.connector.transport.channel(1)
+ channel.basic_consume_ok(consumer_tag="uuid.1")
+ channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
+ channel.basic_cancel_ok(consumer_tag="uuid.1")
+ notification = extract_result(deferred)
+ self.connector.transport.outgoing.clear()
+ notification.ack()
+ [frame] = self.connector.transport.outgoing[1]
+ self.assertEqual("ack", frame.payload.method.name)
+ self.assertEqual((1, False), frame.payload.args)
+
+ def test_notification_ack_is_serialized(self):
+ """
+ Calls to Notification.ack() are serialized, so they don't get spurious
+ errors from unrelated channel commands.
+ """
+ deferred = self.source.get("uuid1", 1)
+ channel = self.connector.transport.channel(1)
+ channel.basic_consume_ok(consumer_tag="uuid1.1")
+ channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
+ channel.basic_cancel_ok(consumer_tag="uuid1.1")
+ notification = extract_result(deferred)
+
+ # Simulate a concurrent get() call locking the channel during its
+ # basic-consume.
+ self.source.get("uuid2", 1)
+
+ # Calling Notification.ack() now will not result in any outgoing frame,
+ # since the call will wait for the basic-consume above to complete.
+ self.connector.transport.outgoing.clear()
+ notification.ack()
+ self.assertEqual({}, self.connector.transport.outgoing)
+
+ # As soon as the channel lock is released, the frame is sent.
+ channel.basic_consume_ok(consumer_tag="uuid2.1")
+ [frame] = self.connector.transport.outgoing[1]
+ self.assertEqual("ack", frame.payload.method.name)
+ self.assertEqual((1, False), frame.payload.args)
+
+ def test_notification_reject(self):
+ """
+ Calling Notification.reject() rejects a notification.
+ """
+ deferred = self.source.get("uuid", 1)
+ channel = self.connector.transport.channel(1)
+ channel.basic_consume_ok(consumer_tag="uuid.1")
+ channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
+ channel.basic_cancel_ok(consumer_tag="uuid.1")
+ notification = extract_result(deferred)
+ self.connector.transport.outgoing.clear()
+ notification.reject()
+ [frame] = self.connector.transport.outgoing[1]
+ self.assertEqual("reject", frame.payload.method.name)
+ self.assertEqual((1, True), frame.payload.args)
+
+ def test_notification_bounced(self):
+ """
+ If an error happens while ack'ing a Notification, a Bounced exception
+ is raised.
+ """
+ deferred = self.source.get("uuid1", 1)
+ channel = self.connector.transport.channel(1)
+ channel.basic_consume_ok(consumer_tag="uuid1.1")
+ channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
+ channel.basic_cancel_ok(consumer_tag="uuid1.1")
+ notification = extract_result(deferred)
+
+ # Simulate the broker shutting down.
+ channel.connection_close(reply_code=320, reply_text="shutdown")
+
+ self.assertRaises(Bounced, extract_result, notification.ack())
+
def fires_with_channel(id):
"""Assert that a connector fires with the given channel ID."""