← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll

 

Free Ekanayaka has proposed merging lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~free.ekanayaka/txlongpoll/notification-ack-and-reject/+merge/298216

Add acknowledgement/rejection support to txlongpoll.notification.Notification.

-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~free.ekanayaka/txlongpoll/notification-ack-and-reject into lp:txlongpoll.
=== modified file 'txlongpoll/notification.py'
--- txlongpoll/notification.py	2016-06-06 13:07:31 +0000
+++ txlongpoll/notification.py	2016-06-23 12:20:23 +0000
@@ -10,6 +10,7 @@
 
 See also txlongpoll.frontend.FrontEndAjax.
 """
+from functools import partial
 
 from twisted.internet import reactor
 from twisted.internet.error import ConnectionClosed as TransportClosed
@@ -50,6 +51,10 @@
     """
 
 
+class Bounced(Exception):
+    """Raised if a Notification could not be ack'ed or rejected."""
+
+
 class NotificationConnector(object):
     """Provide ready-to-use AMQP channels."""
 
@@ -104,19 +109,30 @@
 class Notification(object):
     """A single notification from a stream."""
 
-    def __init__(self, source, message):
+    def __init__(self, source, channel, message):
         """
-        @param source: The NotificationSource the message was received through.
+        @param source: The NotificationSource that generated the notification.
+        @param channel: The AMQChannel the message was received through.
         @param message: The raw txamqp.message.Message received from the
             underlying AMQP queue.
         """
         self._source = source
+        self._channel = channel
         self._message = message
 
     @property
     def payload(self):
+        """Return the content of the notification."""
         return self._message.content.body
 
+    def ack(self):
+        """Confirm that the notification was successfully processed."""
+        return self._source._done(self, True)
+
+    def reject(self):
+        """Reject the the notification, it will be re-queued."""
+        return self._source._done(self, False)
+
 
 class NotificationSource(object):
     """
@@ -235,7 +251,7 @@
             else:
                 raise Timeout()
 
-        returnValue(Notification(self, msg))
+        returnValue(Notification(self, channel, msg))
 
     @inlineCallbacks
     def _check_retriable(self, method, **kwargs):
@@ -275,6 +291,32 @@
         finally:
             self._channel_lock.release()
 
+    @inlineCallbacks
+    def _done(self, notification, successful):
+        """Confirm that a notification has been handled (successfully or not).
+
+        @param notification: The Notification to confirm.
+        @param successful: If True, then the notification has been correctly
+            processed and will be deleted. If False, it will be re-queued and
+            be available at the next NotificationSource.get() call for the
+            same UUID.
+        """
+        channel = notification._channel
+        if successful:
+            method = channel.basic_ack
+        else:
+            method = partial(channel.basic_reject, requeue=True)
+
+        yield self._channel_lock.acquire()
+        try:
+            yield method(delivery_tag=notification._message.delivery_tag)
+        except Closed:
+            # If we hit any channel or connection error, we raise an error
+            # since there's no way this can be re-tried.
+            raise Bounced()
+        finally:
+            self._channel_lock.release()
+
 
 class _Retriable(Exception):
     """Raised by _check_retriable in case of transient errors."""

=== modified file 'txlongpoll/tests/test_integration.py'
--- txlongpoll/tests/test_integration.py	2016-06-23 07:56:35 +0000
+++ txlongpoll/tests/test_integration.py	2016-06-23 12:20:23 +0000
@@ -34,6 +34,7 @@
     NotificationSource,
     NotFound,
     Timeout,
+    Bounced,
 )
 from txlongpoll.client import AMQP0_8_SPEC_PATH
 from txlongpoll.frontend import DeprecatedQueueManager
@@ -315,6 +316,64 @@
         self.assertEqual("hello", notification.payload)
         self.assertEqual(2, proxy.connections)
 
+    @inlineCallbacks
+    def test_reject_notification(self):
+        """
+        Calling reject() on a Notification puts the associated message back in
+        the queue so that it's available to subsequent get() calls.
+        """
+        yield self.channel.basic_publish(
+            routing_key="uuid", content=Content("hello"))
+        notification = yield self.source.get("uuid", 0)
+        yield notification.reject()
+
+        notification = yield self.source.get("uuid", 1)
+        self.assertEqual("hello", notification.payload)
+
+    @inlineCallbacks
+    def test_ack_message(self):
+        """
+        Calling ack() on a Notification confirms the removal of the
+        associated message from the queue, making subsequent calls
+        waiting for another message.
+        """
+        yield self.channel.basic_publish(
+            routing_key="uuid", content=Content("hello"))
+        notification = yield self.source.get("uuid", 0)
+        yield notification.ack()
+
+        yield self.channel.basic_publish(
+            routing_key="uuid", content=Content("hello 2"))
+        notification = yield self.source.get("uuid", 1)
+        self.assertEqual("hello 2", notification.payload)
+
+    @inlineCallbacks
+    def test_ack_with_broker_shutdown(self):
+        """
+        If rabbitmq gets shutdown before we ack a Notification, an error is
+        raised.
+        """
+        client = yield self.service.whenConnected()
+
+        yield self.channel.basic_publish(
+            routing_key="uuid", content=Content("hello"))
+        notification = yield self.source.get("uuid", 0)
+
+        self.rabbit.cleanUp()
+
+        yield client.disconnected.wait()
+
+        try:
+            yield notification.ack()
+        except Bounced:
+            pass
+        else:
+            self.fail("Notification not bounced")
+
+        self.rabbit.config = RabbitServerResources(
+            port=self.rabbit.config.port)  # Ensure that we use the same port
+        self.rabbit.setUp()
+
 
 class DeprecatedQueueManagerTest(AMQTest):
 

=== modified file 'txlongpoll/tests/test_notification.py'
--- txlongpoll/tests/test_notification.py	2016-06-23 07:04:53 +0000
+++ txlongpoll/tests/test_notification.py	2016-06-23 12:20:23 +0000
@@ -12,6 +12,8 @@
     succeeded,
     failed,
 )
+# XXX This testtools API should probably be public, see #1589657.
+from testtools.twistedsupport._deferred import extract_result
 
 from twisted.internet.task import Clock
 from twisted.logger import Logger
@@ -25,6 +27,7 @@
     NotificationSource,
     Timeout,
     NotFound,
+    Bounced,
 )
 from txlongpoll.testing.unit import (
     FakeConnector,
@@ -281,6 +284,83 @@
         channel.basic_cancel_ok(consumer_tag="uuid2.1")
         self.assertThat(deferred2, fires_with_payload("foo"))
 
+    def test_notification_ack(self):
+        """
+        Calling Notification.ack() acknowledges a notification.
+        """
+        deferred = self.source.get("uuid", 1)
+        channel = self.connector.transport.channel(1)
+        channel.basic_consume_ok(consumer_tag="uuid.1")
+        channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
+        channel.basic_cancel_ok(consumer_tag="uuid.1")
+        notification = extract_result(deferred)
+        self.connector.transport.outgoing.clear()
+        notification.ack()
+        [frame] = self.connector.transport.outgoing[1]
+        self.assertEqual("ack", frame.payload.method.name)
+        self.assertEqual((1, False), frame.payload.args)
+
+    def test_notification_ack_is_serialized(self):
+        """
+        Calls to Notification.ack() are serialized, so they don't get spurious
+        errors from unrelated channel commands.
+        """
+        deferred = self.source.get("uuid1", 1)
+        channel = self.connector.transport.channel(1)
+        channel.basic_consume_ok(consumer_tag="uuid1.1")
+        channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
+        channel.basic_cancel_ok(consumer_tag="uuid1.1")
+        notification = extract_result(deferred)
+
+        # Simulate a concurrent get() call locking the channel during its
+        # basic-consume.
+        self.source.get("uuid2", 1)
+
+        # Calling Notification.ack() now will not result in any outgoing frame,
+        # since the call will wait for the basic-consume above to complete.
+        self.connector.transport.outgoing.clear()
+        notification.ack()
+        self.assertEqual({}, self.connector.transport.outgoing)
+
+        # As soon as the channel lock is released, the frame is sent.
+        channel.basic_consume_ok(consumer_tag="uuid2.1")
+        [frame] = self.connector.transport.outgoing[1]
+        self.assertEqual("ack", frame.payload.method.name)
+        self.assertEqual((1, False), frame.payload.args)
+
+    def test_notification_reject(self):
+        """
+        Calling Notification.reject() rejects a notification.
+        """
+        deferred = self.source.get("uuid", 1)
+        channel = self.connector.transport.channel(1)
+        channel.basic_consume_ok(consumer_tag="uuid.1")
+        channel.deliver("foo", consumer_tag='uuid.1', delivery_tag=1)
+        channel.basic_cancel_ok(consumer_tag="uuid.1")
+        notification = extract_result(deferred)
+        self.connector.transport.outgoing.clear()
+        notification.reject()
+        [frame] = self.connector.transport.outgoing[1]
+        self.assertEqual("reject", frame.payload.method.name)
+        self.assertEqual((1, True), frame.payload.args)
+
+    def test_notification_bounced(self):
+        """
+        If an error happens while ack'ing a Notification, a Bounced exception
+        is raised.
+        """
+        deferred = self.source.get("uuid1", 1)
+        channel = self.connector.transport.channel(1)
+        channel.basic_consume_ok(consumer_tag="uuid1.1")
+        channel.deliver("foo", consumer_tag='uuid1.1', delivery_tag=1)
+        channel.basic_cancel_ok(consumer_tag="uuid1.1")
+        notification = extract_result(deferred)
+
+        # Simulate the broker shutting down.
+        channel.connection_close(reply_code=320, reply_text="shutdown")
+
+        self.assertRaises(Bounced, extract_result, notification.ack())
+
 
 def fires_with_channel(id):
     """Assert that a connector fires with the given channel ID."""