launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #05260
[Merge] lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp
Robert Collins has proposed merging lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #875976 in python-oops-amqp: "pydoc oops_amqp misleading"
https://bugs.launchpad.net/python-oops-amqp/+bug/875976
Bug #875984 in python-oops-amqp: "pydoc refers to nonexisting OOPSDateDirRepo"
https://bugs.launchpad.net/python-oops-amqp/+bug/875984
For more details, see:
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.2/+merge/79637
0.0.2
-----
* Fix documentation warts from initial release, updated to 0.0.2 and prepare
for making the receiver deal with interrupted services.
(Robert Collins, #875976, #875984)
* Fix Receiver.run_forever to actually run forever. (Robert Collins)
* Change API for constructing Receiver to take a connection factory rather than
a channel. This will permit handling transient faults internally rather than
forcing a restart. (Robert Collins)
* Implement resiliency for the Receiver: automatically reconnect if a socket
error is received from rabbit, for up to two minutes of downtime.
(Robert Collins)
--
https://code.launchpad.net/~lifeless/python-oops-amqp/0.0.2/+merge/79637
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-amqp/0.0.2 into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS 2011-10-10 21:49:50 +0000
+++ NEWS 2011-10-18 00:08:23 +0000
@@ -6,6 +6,23 @@
NEXT
----
+0.0.2
+-----
+
+* Fix documentation warts from initial release, updated to 0.0.2 and prepare
+ for making the receiver deal with interrupted services.
+ (Robert Collins, #875976, #875984)
+
+* Fix Receiver.run_forever to actually run forever. (Robert Collins)
+
+* Change API for constructing Receiver to take a connection factory rather than
+ a channel. This will permit handling transient faults internally rather than
+ forcing a restart. (Robert Collins)
+
+* Implement resiliency for the Receiver: automatically reconnect if a socket
+ error is received from rabbit, for up to two minutes of downtime.
+ (Robert Collins)
+
0.0.1
-----
=== modified file 'README'
--- README 2011-10-10 21:49:50 +0000
+++ README 2011-10-18 00:08:23 +0000
@@ -98,17 +98,16 @@
AMQP. To use it you need to configure a local config to publish the received
reports. A full config is used because that includes support for filtering
(which can be useful if you need to throttle volume, for instance).
-Additionally you need an amqp channel object and a queue name to receive from.
+Additionally you need an amqp connection factory (to handle the amqp server
+being restarted) and a queue name to receive from.
-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
+This example uses the DateDirRepo publisher, telling it to accept whatever
id was assigned by the process publishing to AMQP::
- >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
+ >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
>>> config = oops.Config()
>>> config.publishers.append(publisher.publish)
- >>> connection = amqp.Connection(host="localhost:5672",
- ... userid="guest", password="guest", virtual_host="/", insist=False)
- >>> receiver = oops_amqp.Receiver(config, connection, "my queue")
+ >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
>>> receiver.run_forever()
For more information see pydoc oops_amqp.
=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py 2011-10-10 21:49:50 +0000
+++ oops_amqp/__init__.py 2011-10-18 00:08:23 +0000
@@ -73,14 +73,16 @@
AMQP. To use it you need to configure a local config to publish the received
reports. A full config is used because that includes support for filtering
(which can be useful if you need to throttle volume, for instance).
+Additionally you need an amqp connection factory (to handle the amqp server
+being restarted) and a queue name to receive from.
-This example uses the OOPSDateDirRepo publisher, telling it to accept whatever
+This example uses the DateDirRepo publisher, telling it to accept whatever
id was assigned by the process publishing to AMQP::
- >>> publisher = oops_datedir_repo.OOPSDateDirRepo('.', inherit_id=True)
+ >>> publisher = oops_datedir_repo.DateDirRepo('.', inherit_id=True)
>>> config = oops.Config()
>>> config.publishers.append(publisher.publish)
- >>> receiver = oops_amqp.Receiver(config)
+ >>> receiver = oops_amqp.Receiver(config, factory, "my queue")
>>> receiver.run_forever()
"""
@@ -95,7 +97,7 @@
# established at this point, and setup.py will use a version of next-$(revno).
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
-__version__ = (0, 0, 1, 'beta', 0)
+__version__ = (0, 0, 2, 'beta', 0)
__all__ = [
'Publisher',
=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py 2011-10-10 21:49:50 +0000
+++ oops_amqp/receiver.py 2011-10-18 00:08:23 +0000
@@ -18,8 +18,13 @@
__metaclass__ = type
+import socket
+import time
+
import bson
+from utils import close_ignoring_EPIPE
+
__all__ = [
'Receiver',
]
@@ -28,23 +33,25 @@
"""Republish OOPS reports from AMQP to a local oops.Config.
:ivar stopping: When True will cause Receiver to break out of run_forever.
+ Calls to run_forever reset this to False.
"""
- def __init__(self, config, channel, queue_name):
+ def __init__(self, config, connection_factory, queue_name):
"""Create a Receiver.
:param config: An oops.Config to republish the OOPS reports.
- :param channel: An amqplib Channel to listen for reports on.
+ :param connection: An amqplib connection factory, used to make the
+ initial connection and to reconnect if that connection is
+ interrupted.
:param queue_name: The queue to listen for reports on.
"""
self.config = config
- self.channel = channel
+ self.connection = None
+ self.channel = None
+ self.connection_factory = connection_factory
self.queue_name = queue_name
- self.stopping = False
def handle_report(self, message):
- if self.stopping:
- self.channel.basic_cancel(self.consume_tag)
try:
report = bson.loads(message.body)
except KeyError:
@@ -56,6 +63,38 @@
self.channel.basic_ack(message.delivery_tag)
def run_forever(self):
- self.consume_tag = self.channel.basic_consume(
- self.queue_name, callback=self.handle_report)
- self.channel.wait()
+ """Run in a loop handling messages.
+
+ If the amqp server is down or uncontactable for > 120 seconds, error
+ out.
+ """
+ self.stopping = False
+ self.connection_start = time.time()
+ while not self.stopping and time.time() < self.connection_start + 120:
+ try:
+ self._run_forever()
+ except socket.error:
+ # Don't probe immediately, give the network/process time to
+ # come back.
+ time.sleep(0.1)
+
+ def _run_forever(self):
+ self.connection = self.connection_factory()
+ # A successful connection: record this so run_forever won't bail early.
+ self.connection_start = time.time()
+ try:
+ self.channel = self.connection.channel()
+ try:
+ self.consume_tag = self.channel.basic_consume(
+ self.queue_name, callback=self.handle_report)
+ try:
+ while True:
+ self.channel.wait()
+ if self.stopping:
+ break
+ finally:
+ self.channel.basic_cancel(self.consume_tag)
+ finally:
+ close_ignoring_EPIPE(self.channel)
+ finally:
+ close_ignoring_EPIPE(self.connection)
=== modified file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py 2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/__init__.py 2011-10-18 00:08:23 +0000
@@ -16,8 +16,6 @@
"""Tests for oops_amqp."""
-import errno
-import socket
from unittest import TestLoader
from amqplib import client_0_8 as amqp
@@ -91,13 +89,6 @@
self.rabbit.config.port), userid="guest", password="guest",
virtual_host="/")
- def close_ignoring_EPIPE(self, closable):
- try:
- closable.close()
- except socket.error, e:
- if e.errno != errno.EPIPE:
- raise
-
def test_suite():
test_mod_names = [
=== modified file 'oops_amqp/tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py 2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/test_publisher.py 2011-10-18 00:08:23 +0000
@@ -21,6 +21,7 @@
import bson
from oops_amqp import Publisher
+from oops_amqp.utils import close_ignoring_EPIPE
from oops_amqp.tests import (
QueueFixture,
TestCase,
@@ -64,9 +65,9 @@
# indicate that publication failed - and publishing after it comes back
# works.
connection = self.connection_factory()
- self.addCleanup(self.close_ignoring_EPIPE, connection)
+ self.addCleanup(close_ignoring_EPIPE, connection)
channel = connection.channel()
- self.addCleanup(self.close_ignoring_EPIPE, channel)
+ self.addCleanup(close_ignoring_EPIPE, channel)
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
# The private method use and the restart of rabbit before it gets torn
# down are bugs in rabbitfixture that will be fixed in a future
@@ -87,9 +88,9 @@
# to indicate that publication failed - and publishing after it comes
# back works.
connection = self.connection_factory()
- self.addCleanup(self.close_ignoring_EPIPE, connection)
+ self.addCleanup(close_ignoring_EPIPE, connection)
channel = connection.channel()
- self.addCleanup(self.close_ignoring_EPIPE, channel)
+ self.addCleanup(close_ignoring_EPIPE, channel)
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
publisher = Publisher(self.connection_factory, queue.exchange_name, "")
oops = {'akey': 42}
=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2011-10-10 21:49:50 +0000
+++ oops_amqp/tests/test_receiver.py 2011-10-18 00:08:23 +0000
@@ -16,6 +16,9 @@
"""Tests for AMQP receiving."""
+import errno
+import socket
+
from amqplib import client_0_8 as amqp
import bson
from oops import Config
@@ -45,33 +48,112 @@
channel.basic_publish(message, queue.exchange_name, routing_key="")
config = Config()
config.publishers.append(capture)
- receiver_channel = connection.channel()
- self.addCleanup(receiver_channel.close)
- receiver = Receiver(config, receiver_channel, queue.queue_name)
# We don't want to loop forever: patch the channel so that after one
# call to wait (which will get our injected message) the loop will shut
- # down. This also checks we use the consume_tag correctly.
- old_wait = receiver_channel.wait
- def new_wait(allowed_methods=None):
- receiver.stopping = True
- return old_wait(allowed_methods=allowed_methods)
- receiver_channel.wait = new_wait
+ # down.
+ def patching_factory():
+ connection = self.connection_factory()
+ old_channel = connection.channel
+ def new_channel():
+ result = old_channel()
+ old_wait = result.wait
+ def new_wait(allowed_methods=None):
+ receiver.stopping = True
+ return old_wait(allowed_methods=allowed_methods)
+ result.wait = new_wait
+ return result
+ connection.channel = new_channel
+ return connection
+ receiver = Receiver(config, patching_factory, queue.queue_name)
receiver.run_forever()
self.assertEqual([expected_report], reports)
def test_run_forever(self):
- # run_forever subscribes and then calls wait.
+ # run_forever subscribes and then calls wait in a loop.
config = None
+ calls = []
class FakeChannel:
- def __init__(self):
- self.calls = []
+ def __init__(self, calls):
+ self.calls = calls
def basic_consume(self, queue_name, callback=None):
self.calls.append(('basic_consume', queue_name, callback))
+ return 'tag'
def wait(self):
self.calls.append(('wait',))
- channel = FakeChannel()
- receiver = Receiver(None, channel, 'foo')
+ if len(self.calls) > 2:
+ receiver.stopping = True
+ def basic_cancel(self, tag):
+ self.calls.append(('basic_cancel', tag))
+ def close(self):
+ pass
+ class FakeConnection:
+ def channel(self):
+ return FakeChannel(calls)
+ def close(self):
+ pass
+ receiver = Receiver(None, FakeConnection, 'foo')
receiver.run_forever()
self.assertEqual(
- [('basic_consume', 'foo', receiver.handle_report), ('wait',)],
- channel.calls)
+ [('basic_consume', 'foo', receiver.handle_report),
+ ('wait',),
+ ('wait',),
+ ('basic_cancel', 'tag')],
+ calls)
+
+ def test_tolerates_amqp_trouble(self):
+ # If the AMQP server is unavailable for a short period, the receiver
+ # will automatically reconnect.
+ # Break a connection to raise socket.error (which we know from the
+ # publisher tests is what le aks through when rabbit is shutdown).
+ # We raise it the first time on each amqp method call.
+ reports = []
+ def capture(report):
+ reports.append(report)
+ return report['id']
+ expected_report = {'id': 'foo', 'otherkey': 42}
+ message = amqp.Message(bson.dumps(expected_report))
+ connection = self.connection_factory()
+ self.addCleanup(connection.close)
+ channel = connection.channel()
+ self.addCleanup(channel.close)
+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
+ channel.basic_publish(message, queue.exchange_name, routing_key="")
+ config = Config()
+ config.publishers.append(capture)
+ state = {}
+ def error_once(func):
+ def wrapped(*args, **kwargs):
+ func_ref = func.func_code
+ if func_ref in state:
+ return func(*args, **kwargs)
+ else:
+ state[func_ref] = True
+ # Use EPIPE because the close() code checks that (though
+ # the rest doesn't)
+ raise socket.error(errno.EPIPE, "booyah")
+ return wrapped
+ @error_once
+ def patching_factory():
+ connection = self.connection_factory()
+ old_channel = connection.channel
+ @error_once
+ def new_channel():
+ result = old_channel()
+ result.wait = error_once(result.wait)
+ result.basic_consume = error_once(result.basic_consume)
+ result.basic_cancel = error_once(result.basic_cancel)
+ result.close = error_once(result.close)
+ return result
+ connection.channel = new_channel
+ connection.close = error_once(connection.close)
+ return connection
+ receiver = Receiver(config, patching_factory, queue.queue_name)
+ # After we've successfully handled a message, we can be fairly sure
+ # we'll handle things.
+ old_handle = receiver.handle_report
+ def handle_and_stop(message):
+ old_handle(message)
+ receiver.stopping = True
+ receiver.handle_report = handle_and_stop
+ receiver.run_forever()
+ self.assertEqual([expected_report], reports)
=== added file 'oops_amqp/utils.py'
--- oops_amqp/utils.py 1970-01-01 00:00:00 +0000
+++ oops_amqp/utils.py 2011-10-18 00:08:23 +0000
@@ -0,0 +1,32 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Utility functions for oops_amqp."""
+
+import errno
+import socket
+
+__all__ = [
+ 'close_ignoring_EPIPE',
+ ]
+
+
+def close_ignoring_EPIPE(closable):
+ try:
+ return closable.close()
+ except socket.error, e:
+ if e.errno != errno.EPIPE:
+ raise
=== modified file 'setup.py'
--- setup.py 2011-10-10 21:49:50 +0000
+++ setup.py 2011-10-18 00:08:23 +0000
@@ -23,7 +23,7 @@
os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
setup(name="oops_amqp",
- version="0.0.1",
+ version="0.0.2",
description=\
"OOPS AMQP transport.",
long_description=description,