← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp

 

Robert Collins has proposed merging lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #884540 in python-oops-amqp: "amqp connection exception when amqp is restarted"
  https://bugs.launchpad.net/python-oops-amqp/+bug/884540

For more details, see:
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.4/+merge/80865

More robustness discovered by observing repeated stop-start cycles of amqp2disk on carob. I've filed a bug about the basic_cancel blowing up upstream, but the guard isn't wrong, so there is no need to wait or fudge things.

The basic_cancel change is tested (see the stub change) but the new exception isn't, and is still nasty to try to do so.
-- 
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.4/+merge/80865
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS	2011-10-27 03:36:02 +0000
+++ NEWS	2011-11-01 04:14:23 +0000
@@ -6,6 +6,16 @@
 NEXT
 ----
 
+0.0.4
+-----
+
+* Do not attempt operations on closed amqplib channels.
+  (Robert Collins, #884539)
+
+* Catch AMQPConnectionException in addition to IOError and socket.error as that
+  is raised by amqplib when it has the chance to read the broker shutdown
+  warning in wait(). (Robert Collins, #884540)
+
 0.0.3
 -----
 

=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py	2011-10-27 03:36:02 +0000
+++ oops_amqp/__init__.py	2011-11-01 04:14:23 +0000
@@ -97,7 +97,7 @@
 # established at this point, and setup.py will use a version of next-$(revno).
 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
 # Otherwise it is major.minor.micro~$(revno).
-__version__ = (0, 0, 3, 'beta', 0)
+__version__ = (0, 0, 4, 'beta', 0)
 
 __all__ = [
     'Publisher',

=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py	2011-10-27 03:36:02 +0000
+++ oops_amqp/publisher.py	2011-11-01 04:14:23 +0000
@@ -19,13 +19,15 @@
 __metaclass__ = type
 
 from hashlib import md5
-import socket
 from threading import local
 
 from amqplib import client_0_8 as amqp
 from bson import dumps
 
-from utils import is_amqplib_connection_error
+from utils import (
+    amqplib_error_types,
+    is_amqplib_connection_error,
+    )
 
 __all__ = [
     'Publisher',
@@ -63,7 +65,7 @@
         if getattr(self.channels, 'channel', None) is None:
             try:
                 self.channels.channel = self.connection_factory().channel()
-            except (socket.error, IOError), e:
+            except amqplib_error_types, e:
                 if is_amqplib_connection_error(e):
                     # Could not connect
                     return None
@@ -91,7 +93,7 @@
         try:
             channel.basic_publish(
                 message, self.exchange_name, routing_key=self.routing_key)
-        except (socket.error, IOError), e:
+        except amqplib_error_types, e:
             self.channels.channel = None
             if is_amqplib_connection_error(e):
                 # Could not connect / interrupted connection

=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py	2011-10-27 03:36:02 +0000
+++ oops_amqp/receiver.py	2011-11-01 04:14:23 +0000
@@ -18,12 +18,12 @@
 
 __metaclass__ = type
 
-import socket
 import time
 
 import bson
 
 from utils import (
+    amqplib_error_types,
     close_ignoring_EPIPE,
     is_amqplib_connection_error,
     )
@@ -84,7 +84,7 @@
                (not self.went_bad or time.time() < self.went_bad + 120)):
             try:
                 self._run_forever()
-            except (socket.error, IOError), e:
+            except amqplib_error_types, e:
                 if not is_amqplib_connection_error(e):
                     # Something unknown went wrong.
                     raise
@@ -109,7 +109,8 @@
                         if self.stopping:
                             break
                 finally:
-                    self.channel.basic_cancel(self.consume_tag)
+                    if self.channel.is_open:
+                        self.channel.basic_cancel(self.consume_tag)
             finally:
                 close_ignoring_EPIPE(self.channel)
         finally:

=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py	2011-10-18 03:29:27 +0000
+++ oops_amqp/tests/test_receiver.py	2011-11-01 04:14:23 +0000
@@ -98,6 +98,7 @@
         class FakeChannel:
             def __init__(self, calls):
                 self.calls = calls
+                self.is_open = True
             def basic_consume(self, queue_name, callback=None):
                 self.calls.append(('basic_consume', queue_name, callback))
                 return 'tag'
@@ -108,7 +109,7 @@
             def basic_cancel(self, tag):
                 self.calls.append(('basic_cancel', tag))
             def close(self):
-                pass
+                self.is_open = False
         class FakeConnection:
             def channel(self):
                 return FakeChannel(calls)

=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py	2011-10-27 03:36:02 +0000
+++ oops_amqp/utils.py	2011-11-01 04:14:23 +0000
@@ -19,12 +19,25 @@
 import errno
 import socket
 
+from amqplib.client_0_8.exceptions import AMQPConnectionException
+
 __all__ = [
+    'amqplib_error_types',
     'close_ignoring_EPIPE',
+    'is_amqplib_connection_error',
     'is_amqplib_ioerror',
-    'is_amqplib_connection_error',
     ]
 
+# These exception types always indicate an AMQP connection error/closure.
+# However you should catch amqplib_error_types and post-filter with
+# is_amqplib_connection_error.
+amqplib_connection_errors = (socket.error, AMQPConnectionException)
+# A tuple to reduce duplication in different code paths. Lists the types of
+# exceptions legitimately raised by amqplib when the AMQP server goes down.
+# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
+# do a second-stage filter after catching the exception.
+amqplib_error_types = amqplib_connection_errors + (IOError,)
+
 
 def close_ignoring_EPIPE(closable):
     try:
@@ -42,4 +55,4 @@
 
 def is_amqplib_connection_error(e):
     """Return True if e was (probably) raised due to a connection issue."""
-    return isinstance(e, socket.error) or is_amqplib_ioerror(e)
+    return isinstance(e, amqplib_connection_errors) or is_amqplib_ioerror(e)

=== modified file 'setup.py'
--- setup.py	2011-10-27 03:36:02 +0000
+++ setup.py	2011-11-01 04:14:23 +0000
@@ -23,7 +23,7 @@
         os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
 
 setup(name="oops_amqp",
-      version="0.0.3",
+      version="0.0.4",
       description=\
               "OOPS AMQP transport.",
       long_description=description,