launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #05347
[Merge] lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp
Robert Collins has proposed merging lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #884540 in python-oops-amqp: "amqp connection exception when amqp is restarted"
https://bugs.launchpad.net/python-oops-amqp/+bug/884540
For more details, see:
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.4/+merge/80865
More robustness discovered by observing repeated stop-start cycles of amqp2disk on carob. I've filed a bug about the basic_cancel blowing up upstream, but the guard isn't wrong, so there is no need to wait or fudge things.
The basic_cancel change is tested (see the stub change) but the new exception isn't, and is still nasty to try to do so.
--
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.4/+merge/80865
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS 2011-10-27 03:36:02 +0000
+++ NEWS 2011-11-01 04:14:23 +0000
@@ -6,6 +6,16 @@
NEXT
----
+0.0.4
+-----
+
+* Do not attempt operations on closed amqplib channels.
+ (Robert Collins, #884539)
+
+* Catch AMQPConnectionException in addition to IOError and socket.error as that
+ is raised by amqplib when it has the chance to read the broker shutdown
+ warning in wait(). (Robert Collins, #884540)
+
0.0.3
-----
=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/__init__.py 2011-11-01 04:14:23 +0000
@@ -97,7 +97,7 @@
# established at this point, and setup.py will use a version of next-$(revno).
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
-__version__ = (0, 0, 3, 'beta', 0)
+__version__ = (0, 0, 4, 'beta', 0)
__all__ = [
'Publisher',
=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/publisher.py 2011-11-01 04:14:23 +0000
@@ -19,13 +19,15 @@
__metaclass__ = type
from hashlib import md5
-import socket
from threading import local
from amqplib import client_0_8 as amqp
from bson import dumps
-from utils import is_amqplib_connection_error
+from utils import (
+ amqplib_error_types,
+ is_amqplib_connection_error,
+ )
__all__ = [
'Publisher',
@@ -63,7 +65,7 @@
if getattr(self.channels, 'channel', None) is None:
try:
self.channels.channel = self.connection_factory().channel()
- except (socket.error, IOError), e:
+ except amqplib_error_types, e:
if is_amqplib_connection_error(e):
# Could not connect
return None
@@ -91,7 +93,7 @@
try:
channel.basic_publish(
message, self.exchange_name, routing_key=self.routing_key)
- except (socket.error, IOError), e:
+ except amqplib_error_types, e:
self.channels.channel = None
if is_amqplib_connection_error(e):
# Could not connect / interrupted connection
=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/receiver.py 2011-11-01 04:14:23 +0000
@@ -18,12 +18,12 @@
__metaclass__ = type
-import socket
import time
import bson
from utils import (
+ amqplib_error_types,
close_ignoring_EPIPE,
is_amqplib_connection_error,
)
@@ -84,7 +84,7 @@
(not self.went_bad or time.time() < self.went_bad + 120)):
try:
self._run_forever()
- except (socket.error, IOError), e:
+ except amqplib_error_types, e:
if not is_amqplib_connection_error(e):
# Something unknown went wrong.
raise
@@ -109,7 +109,8 @@
if self.stopping:
break
finally:
- self.channel.basic_cancel(self.consume_tag)
+ if self.channel.is_open:
+ self.channel.basic_cancel(self.consume_tag)
finally:
close_ignoring_EPIPE(self.channel)
finally:
=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2011-10-18 03:29:27 +0000
+++ oops_amqp/tests/test_receiver.py 2011-11-01 04:14:23 +0000
@@ -98,6 +98,7 @@
class FakeChannel:
def __init__(self, calls):
self.calls = calls
+ self.is_open = True
def basic_consume(self, queue_name, callback=None):
self.calls.append(('basic_consume', queue_name, callback))
return 'tag'
@@ -108,7 +109,7 @@
def basic_cancel(self, tag):
self.calls.append(('basic_cancel', tag))
def close(self):
- pass
+ self.is_open = False
class FakeConnection:
def channel(self):
return FakeChannel(calls)
=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/utils.py 2011-11-01 04:14:23 +0000
@@ -19,12 +19,25 @@
import errno
import socket
+from amqplib.client_0_8.exceptions import AMQPConnectionException
+
__all__ = [
+ 'amqplib_error_types',
'close_ignoring_EPIPE',
+ 'is_amqplib_connection_error',
'is_amqplib_ioerror',
- 'is_amqplib_connection_error',
]
+# These exception types always indicate an AMQP connection error/closure.
+# However you should catch amqplib_error_types and post-filter with
+# is_amqplib_connection_error.
+amqplib_connection_errors = (socket.error, AMQPConnectionException)
+# A tuple to reduce duplication in different code paths. Lists the types of
+# exceptions legitimately raised by amqplib when the AMQP server goes down.
+# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
+# do a second-stage filter after catching the exception.
+amqplib_error_types = amqplib_connection_errors + (IOError,)
+
def close_ignoring_EPIPE(closable):
try:
@@ -42,4 +55,4 @@
def is_amqplib_connection_error(e):
"""Return True if e was (probably) raised due to a connection issue."""
- return isinstance(e, socket.error) or is_amqplib_ioerror(e)
+ return isinstance(e, amqplib_connection_errors) or is_amqplib_ioerror(e)
=== modified file 'setup.py'
--- setup.py 2011-10-27 03:36:02 +0000
+++ setup.py 2011-11-01 04:14:23 +0000
@@ -23,7 +23,7 @@
os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
setup(name="oops_amqp",
- version="0.0.3",
+ version="0.0.4",
description=\
"OOPS AMQP transport.",
long_description=description,