← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp

 

Robert Collins has proposed merging lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #875976 in python-oops-amqp: "pydoc oops_amqp misleading"
  https://bugs.launchpad.net/python-oops-amqp/+bug/875976
  Bug #875984 in python-oops-amqp: "pydoc refers to nonexisting OOPSDateDirRepo"
  https://bugs.launchpad.net/python-oops-amqp/+bug/875984

For more details, see:
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.2/+merge/79637

0.0.2
-----

* Fix documentation warts from initial release, updated to 0.0.2 and prepare
  for making the receiver deal with interrupted services.
  (Robert Collins, #875976, #875984)

* Fix Receiver.run_forever to actually run forever. (Robert Collins)

* Change API for constructing Receiver to take a connection factory rather than
  a channel. This will permit handling transient faults internally rather than
  forcing a restart. (Robert Collins)

* Implement resiliency for the Receiver: automatically reconnect if a socket
  error is received from rabbit, for up to two minutes of downtime.
  (Robert Collins)


-- 
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.2/+merge/79637
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS	2011-10-10 21:49:50 +0000
+++ NEWS	2011-10-18 00:08:23 +0000
@@ -6,6 +6,23 @@
 NEXT
 ----
 
+0.0.2
+-----
+
+* Fix documentation warts from initial release, updated to 0.0.2 and prepare
+  for making the receiver deal with interrupted services.
+  (Robert Collins, #875976, #875984)
+
+* Fix Receiver.run_forever to actually run forever. (Robert Collins)
+
+* Change API for constructing Receiver to take a connection factory rather than
+  a channel. This will permit handling transient faults internally rather than
+  forcing a restart. (Robert Collins)
+
+* Implement resiliency for the Receiver: automatically reconnect if a socket
+  error is received from rabbit, for up to two minutes of downtime.
+  (Robert Collins)
+
 0.0.1
 -----
 

=== modified file 'README'
--- README	2011-10-10 21:49:50 +0000
+++ README	2011-10-18 00:08:23 +0000
@@ -98,17 +98,16 @@
 AMQP. To use it you need to configure a local config to publish the received
 reports. A full config is used because that includes support for filtering
 (which can be useful if you need to throttle volume, for instance).
-Additionally you need an amqp channel object and a queue name to receive from.
+Additionally you need an amqp connection factory (to handle the amqp server
+being restarted) and a queue name to receive from.
 
-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
+This example uses the DateDirRepo publisher, telling it to accept whatever
 id was assigned by the process publishing to AMQP::
 
-  >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
+  >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
   >>> config = oops.Config()
   >>> config.publishers.append(publisher.publish)
-  >>> connection = amqp.Connection(host="localhost:5672",
-  ...     userid="guest", password="guest", virtual_host="/", insist=False)
-  >>> receiver = oops_amqp.Receiver(config, connection, "my queue")
+  >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
   >>> receiver.run_forever()
 
 For more information see pydoc oops_amqp.

=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py	2011-10-10 21:49:50 +0000
+++ oops_amqp/__init__.py	2011-10-18 00:08:23 +0000
@@ -73,14 +73,16 @@
 AMQP. To use it you need to configure a local config to publish the received
 reports. A full config is used because that includes support for filtering
 (which can be useful if you need to throttle volume, for instance).
+Additionally you need an amqp connection factory (to handle the amqp server
+being restarted) and a queue name to receive from.
 
-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
+This example uses the DateDirRepo publisher, telling it to accept whatever
 id was assigned by the process publishing to AMQP::
 
-  >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
+  >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
   >>> config = oops.Config()
   >>> config.publishers.append(publisher.publish)
-  >>> receiver = oops_amqp.Receiver(config)
+  >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
   >>> receiver.run_forever()
 """
 
@@ -95,7 +97,7 @@
 # established at this point, and setup.py will use a version of next-$(revno).
 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
 # Otherwise it is major.minor.micro~$(revno).
-__version__ = (0, 0, 1, 'beta', 0)
+__version__ = (0, 0, 2, 'beta', 0)
 
 __all__ = [
     'Publisher',

=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py	2011-10-10 21:49:50 +0000
+++ oops_amqp/receiver.py	2011-10-18 00:08:23 +0000
@@ -18,8 +18,13 @@
 
 __metaclass__ = type
 
+import socket
+import time
+
 import bson
 
+from utils import close_ignoring_EPIPE
+
 __all__ = [
     'Receiver',
     ]
@@ -28,23 +33,25 @@
     """Republish OOPS reports from AMQP to a local oops.Config.
     
     :ivar stopping: When True will cause Receiver to break out of run_forever.
+        Calls to run_forever reset this to False.
     """
 
-    def __init__(self, config, channel, queue_name):
+    def __init__(self, config, connection_factory, queue_name):
         """Create a Receiver.
 
         :param config: An oops.Config to republish the OOPS reports.
-        :param channel: An amqplib Channel to listen for reports on.
+        :param connection: An amqplib connection factory, used to make the
+            initial connection and to reconnect if that connection is
+            interrupted.
         :param queue_name: The queue to listen for reports on.
         """
         self.config = config
-        self.channel = channel
+        self.connection = None
+        self.channel = None
+        self.connection_factory = connection_factory
         self.queue_name = queue_name
-        self.stopping = False
 
     def handle_report(self, message):
-        if self.stopping:
-            self.channel.basic_cancel(self.consume_tag)
         try:
             report = bson.loads(message.body)
         except KeyError:
@@ -56,6 +63,38 @@
         self.channel.basic_ack(message.delivery_tag)
     
     def run_forever(self):
-        self.consume_tag = self.channel.basic_consume(
-            self.queue_name, callback=self.handle_report)
-        self.channel.wait()
+        """Run in a loop handling messages.
+
+        If the amqp server is down or uncontactable for > 120 seconds, error
+        out.
+        """
+        self.stopping = False
+        self.connection_start = time.time()
+        while not self.stopping and time.time() < self.connection_start + 120:
+            try:
+                self._run_forever()
+            except socket.error:
+                # Don't probe immediately, give the network/process time to
+                # come back.
+                time.sleep(0.1)
+
+    def _run_forever(self):
+        self.connection = self.connection_factory()
+        # A successful connection: record this so run_forever won't bail early.
+        self.connection_start = time.time()
+        try:
+            self.channel = self.connection.channel()
+            try:
+                self.consume_tag = self.channel.basic_consume(
+                    self.queue_name, callback=self.handle_report)
+                try:
+                    while True:
+                        self.channel.wait()
+                        if self.stopping:
+                            break
+                finally:
+                    self.channel.basic_cancel(self.consume_tag)
+            finally:
+                close_ignoring_EPIPE(self.channel)
+        finally:
+            close_ignoring_EPIPE(self.connection)

=== modified file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py	2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/__init__.py	2011-10-18 00:08:23 +0000
@@ -16,8 +16,6 @@
 
 """Tests for oops_amqp."""
 
-import errno
-import socket
 from unittest import TestLoader
 
 from amqplib import client_0_8 as amqp
@@ -91,13 +89,6 @@
             self.rabbit.config.port), userid="guest", password="guest",
             virtual_host="/")
 
-    def close_ignoring_EPIPE(self, closable):
-        try:
-            closable.close()
-        except socket.error, e:
-            if e.errno != errno.EPIPE:
-                raise
-
 
 def test_suite():
     test_mod_names = [

=== modified file 'oops_amqp/tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py	2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/test_publisher.py	2011-10-18 00:08:23 +0000
@@ -21,6 +21,7 @@
 import bson
 
 from oops_amqp import Publisher
+from oops_amqp.utils import close_ignoring_EPIPE
 from oops_amqp.tests import (
     QueueFixture,
     TestCase,
@@ -64,9 +65,9 @@
         # indicate that publication failed - and publishing after it comes back
         # works.
         connection = self.connection_factory()
-        self.addCleanup(self.close_ignoring_EPIPE, connection)
+        self.addCleanup(close_ignoring_EPIPE, connection)
         channel = connection.channel()
-        self.addCleanup(self.close_ignoring_EPIPE, channel)
+        self.addCleanup(close_ignoring_EPIPE, channel)
         queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
         # The private method use and the restart of rabbit before it gets torn
         # down are bugs in rabbitfixture that will be fixed in a future
@@ -87,9 +88,9 @@
         # to indicate that publication failed - and publishing after it comes
         # back works.
         connection = self.connection_factory()
-        self.addCleanup(self.close_ignoring_EPIPE, connection)
+        self.addCleanup(close_ignoring_EPIPE, connection)
         channel = connection.channel()
-        self.addCleanup(self.close_ignoring_EPIPE, channel)
+        self.addCleanup(close_ignoring_EPIPE, channel)
         queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
         publisher = Publisher(self.connection_factory, queue.exchange_name, "")
         oops = {'akey': 42}

=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py	2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/test_receiver.py	2011-10-18 00:08:23 +0000
@@ -16,6 +16,9 @@
 
 """Tests for AMQP receiving."""
 
+import errno
+import socket
+
 from amqplib import client_0_8 as amqp
 import bson
 from oops import Config
@@ -45,33 +48,112 @@
         channel.basic_publish(message, queue.exchange_name, routing_key="")
         config = Config()
         config.publishers.append(capture)
-        receiver_channel = connection.channel()
-        self.addCleanup(receiver_channel.close)
-        receiver = Receiver(config, receiver_channel, queue.queue_name)
         # We don't want to loop forever: patch the channel so that after one
         # call to wait (which will get our injected message) the loop will shut
-        # down. This also checks we use the consume_tag correctly.
-        old_wait = receiver_channel.wait
-        def new_wait(allowed_methods=None):
-            receiver.stopping = True
-            return old_wait(allowed_methods=allowed_methods)
-        receiver_channel.wait = new_wait
+        # down.
+        def patching_factory():
+            connection = self.connection_factory()
+            old_channel = connection.channel
+            def new_channel():
+                result = old_channel()
+                old_wait = result.wait
+                def new_wait(allowed_methods=None):
+                    receiver.stopping = True
+                    return old_wait(allowed_methods=allowed_methods)
+                result.wait = new_wait
+                return result
+            connection.channel = new_channel
+            return connection
+        receiver = Receiver(config, patching_factory, queue.queue_name)
         receiver.run_forever()
         self.assertEqual([expected_report], reports)
 
     def test_run_forever(self):
-        # run_forever subscribes and then calls wait.
+        # run_forever subscribes and then calls wait in a loop.
         config = None
+        calls = []
         class FakeChannel:
-            def __init__(self):
-                self.calls = []
+            def __init__(self, calls):
+                self.calls = calls
             def basic_consume(self, queue_name, callback=None):
                 self.calls.append(('basic_consume', queue_name, callback))
+                return 'tag'
             def wait(self):
                 self.calls.append(('wait',))
-        channel = FakeChannel()
-        receiver = Receiver(None, channel, 'foo')
+                if len(self.calls) > 2:
+                    receiver.stopping = True
+            def basic_cancel(self, tag):
+                self.calls.append(('basic_cancel', tag))
+            def close(self):
+                pass
+        class FakeConnection:
+            def channel(self):
+                return FakeChannel(calls)
+            def close(self):
+                pass
+        receiver = Receiver(None, FakeConnection, 'foo')
         receiver.run_forever()
         self.assertEqual(
-            [('basic_consume', 'foo', receiver.handle_report), ('wait',)],
-            channel.calls)
+            [('basic_consume', 'foo', receiver.handle_report),
+             ('wait',),
+             ('wait',),
+             ('basic_cancel', 'tag')],
+            calls)
+
+    def test_tolerates_amqp_trouble(self):
+        # If the AMQP server is unavailable for a short period, the receiver
+        # will automatically reconnect.
+        # Break a connection to raise socket.error (which we know from the 
+        # publisher tests is what le aks through when rabbit is shutdown).
+        # We raise it the first time on each amqp method call.
+        reports = []
+        def capture(report):
+            reports.append(report)
+            return report['id']
+        expected_report = {'id': 'foo', 'otherkey': 42}
+        message = amqp.Message(bson.dumps(expected_report))
+        connection = self.connection_factory()
+        self.addCleanup(connection.close)
+        channel = connection.channel()
+        self.addCleanup(channel.close)
+        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
+        channel.basic_publish(message, queue.exchange_name, routing_key="")
+        config = Config()
+        config.publishers.append(capture)
+        state = {}
+        def error_once(func):
+            def wrapped(*args, **kwargs):
+                func_ref = func.func_code
+                if func_ref in state:
+                    return func(*args, **kwargs)
+                else:
+                    state[func_ref] = True
+                    # Use EPIPE because the close() code checks that (though
+                    # the rest doesn't)
+                    raise socket.error(errno.EPIPE, "booyah")
+            return wrapped
+        @error_once
+        def patching_factory():
+            connection = self.connection_factory()
+            old_channel = connection.channel
+            @error_once
+            def new_channel():
+                result = old_channel()
+                result.wait = error_once(result.wait)
+                result.basic_consume = error_once(result.basic_consume)
+                result.basic_cancel = error_once(result.basic_cancel)
+                result.close = error_once(result.close)
+                return result
+            connection.channel = new_channel
+            connection.close = error_once(connection.close)
+            return connection
+        receiver = Receiver(config, patching_factory, queue.queue_name)
+        # After we've successfully handled a message, we can be fairly sure
+        # we'll handle things.
+        old_handle = receiver.handle_report
+        def handle_and_stop(message):
+            old_handle(message)
+            receiver.stopping = True
+        receiver.handle_report = handle_and_stop
+        receiver.run_forever()
+        self.assertEqual([expected_report], reports)

=== added file 'oops_amqp/utils.py'
--- oops_amqp/utils.py	1970-01-01 00:00:00 +0000
+++ oops_amqp/utils.py	2011-10-18 00:08:23 +0000
@@ -0,0 +1,32 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Utility functions for oops_amqp."""
+
+import errno
+import socket
+
+__all__ = [
+    'close_ignoring_EPIPE',
+    ]
+
+
+def close_ignoring_EPIPE(closable):
+    try:
+        return closable.close()
+    except socket.error, e:
+        if e.errno != errno.EPIPE:
+            raise

=== modified file 'setup.py'
--- setup.py	2011-10-10 21:49:50 +0000
+++ setup.py	2011-10-18 00:08:23 +0000
@@ -23,7 +23,7 @@
         os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
 
 setup(name="oops_amqp",
-      version="0.0.1",
+      version="0.0.2",
       description=\
               "OOPS AMQP transport.",
       long_description=description,