← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp

 

Colin Watson has proposed merging lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp.

Commit message:
Add Python 3 support.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-amqp/py3/+merge/341298

This also involves porting from amqplib to the better-maintained amqp.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp.
=== modified file '.bzrignore'
--- .bzrignore	2011-12-08 10:50:34 +0000
+++ .bzrignore	2018-03-12 11:56:56 +0000
@@ -1,3 +1,4 @@
+__pycache__
 ./eggs/*
 ./.installed.cfg
 ./develop-eggs

=== modified file 'NEWS'
--- NEWS	2015-10-08 14:49:18 +0000
+++ NEWS	2018-03-12 11:56:56 +0000
@@ -9,6 +9,8 @@
 * Dropped dependency on pymongo in favor of bson. This avoids having
   to depend both on bson and pymongo when installing in conjunction
   with oops-datedir-repo. (Ricardo Kirkner)
+* Port from amqplib to amqp. (Colin Watson)
+* Add Python 3 support. (Colin Watson)
 
 0.0.7
 -----

=== modified file 'README'
--- README	2012-08-10 01:41:44 +0000
+++ README	2018-03-12 11:56:56 +0000
@@ -31,7 +31,7 @@
 
 * oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
 
-* amqplib
+* amqp
 
 Testing Dependencies
 ====================
@@ -57,7 +57,7 @@
 connection - and the exchange name and routing key to submit to.
 
   >>> factory = partial(amqp.Connection, host="localhost:5672",
-  ...     userid="guest", password="guest", virtual_host="/", insist=False)
+  ...     userid="guest", password="guest", virtual_host="/")
   >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
 
 Provide the publisher to your OOPS config::
@@ -70,7 +70,7 @@
 OOPS ids are generating by hashing the oops message (without the id field) -
 this ensures unique ids.
 
-The reason a factory is used is because amqplib is not threadsafe - the
+The reason a factory is used is because amqp is not threadsafe - the
 publisher maintains a thread locals object to hold the factories and creates
 connections when new threads are created(when they first generate an OOPS).
 
@@ -87,7 +87,7 @@
 method failed::
 
   >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
-  ...     userid="guest", password="guest", virtual_host="/", insist=False)
+  ...     userid="guest", password="guest", virtual_host="/")
   >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
   >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
 

=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py	2015-10-02 16:26:57 +0000
+++ oops_amqp/__init__.py	2018-03-12 11:56:56 +0000
@@ -32,7 +32,7 @@
 connection - and the exchange name and routing key to submit to.
 
   >>> factory = partial(amqp.Connection, host="localhost:5672",
-  ...     userid="guest", password="guest", virtual_host="/", insist=False)
+  ...     userid="guest", password="guest", virtual_host="/")
   >>> publisher = oops_amqp.Publisher(factory, "oopses", "")
 
 Provide the publisher to your OOPS config::
@@ -45,7 +45,7 @@
 OOPS ids are generating by hashing the oops message (without the id field) -
 this ensures unique ids.
 
-The reason a factory is used is because amqplib is not threadsafe - the
+The reason a factory is used is because amqp is not threadsafe - the
 publisher maintains a thread locals object to hold the factories and creates
 connections when new threads are created(when they first generate an OOPS).
 
@@ -62,7 +62,7 @@
 method failed::
 
   >>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
-  ...     userid="guest", password="guest", virtual_host="/", insist=False)
+  ...     userid="guest", password="guest", virtual_host="/")
   >>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
   >>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
 
@@ -86,6 +86,8 @@
   >>> receiver.run_forever()
 """
 
+from __future__ import absolute_import, print_function
+
 # same format as sys.version_info: "A tuple containing the five components of
 # the version number: major, minor, micro, releaselevel, and serial. All
 # values except releaselevel are integers; the release level is 'alpha',

=== modified file 'oops_amqp/anybson.py'
--- oops_amqp/anybson.py	2012-02-10 19:27:09 +0000
+++ oops_amqp/anybson.py	2018-03-12 11:56:56 +0000
@@ -13,6 +13,8 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 # GNU Lesser General Public License version 3 (see the file LICENSE).
 
+from __future__ import absolute_import, print_function
+
 __all__ = [
     'dumps',
     'loads',

=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py	2012-08-10 01:41:44 +0000
+++ oops_amqp/publisher.py	2018-03-12 11:56:56 +0000
@@ -15,15 +15,17 @@
 
 """Publish OOPS reports over amqp."""
 
+from __future__ import absolute_import, print_function
+
 __metaclass__ = type
 
 from hashlib import md5
 from threading import local
 
-from amqplib import client_0_8 as amqp
-from anybson import dumps
+import amqp
 
-from utils import (
+from oops_amqp.anybson import dumps
+from oops_amqp.utils import (
     amqplib_error_types,
     is_amqplib_connection_error,
     )
@@ -63,8 +65,10 @@
     def get_channel(self):
         if getattr(self.channels, 'channel', None) is None:
             try:
-                self.channels.channel = self.connection_factory().channel()
-            except amqplib_error_types, e:
+                connection = self.connection_factory()
+                connection.connect()
+                self.channels.channel = connection.channel()
+            except amqplib_error_types as e:
                 if is_amqplib_connection_error(e):
                     # Could not connect
                     return None
@@ -92,7 +96,7 @@
         try:
             channel.basic_publish(
                 message, self.exchange_name, routing_key=self.routing_key)
-        except amqplib_error_types, e:
+        except amqplib_error_types as e:
             self.channels.channel = None
             if is_amqplib_connection_error(e):
                 # Could not connect / interrupted connection

=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py	2012-02-09 23:13:45 +0000
+++ oops_amqp/receiver.py	2018-03-12 11:56:56 +0000
@@ -15,12 +15,14 @@
 
 """Receive OOPS reports over amqp and republish locally."""
 
+from __future__ import absolute_import, print_function
+
 __metaclass__ = type
 
 import time
 
-import anybson as bson
-from utils import (
+from oops_amqp import anybson as bson
+from oops_amqp.utils import (
     amqplib_error_types,
     close_ignoring_connection_errors,
     is_amqplib_connection_error,
@@ -56,12 +58,16 @@
         self.sentinel = None
 
     def handle_report(self, message):
-        if message.body == self.sentinel:
+        # bson requires bytes.
+        body = message.body
+        if not isinstance(body, bytes):
+            body = body.encode(message.content_encoding or 'UTF-8')
+        if body == self.sentinel:
             self.stopping = True
             self.channel.basic_ack(message.delivery_tag)
             return
         try:
-            report = bson.loads(message.body)
+            report = bson.loads(body)
         except KeyError:
             # Garbage in the queue. Possibly this should raise an OOPS itself
             # (through a different config) or log an info level message.
@@ -82,7 +88,7 @@
                (not self.went_bad or time.time() < self.went_bad + 120)):
             try:
                 self._run_forever()
-            except amqplib_error_types, e:
+            except amqplib_error_types as e:
                 if not is_amqplib_connection_error(e):
                     # Something unknown went wrong.
                     raise
@@ -94,6 +100,7 @@
 
     def _run_forever(self):
         self.connection = self.connection_factory()
+        self.connection.connect()
         # A successful connection: record this so run_forever won't bail early.
         self.went_bad = None
         try:
@@ -103,7 +110,7 @@
                     self.queue_name, callback=self.handle_report)
                 try:
                     while True:
-                        self.channel.wait()
+                        self.connection.drain_events(timeout=1)
                         if self.stopping:
                             break
                 finally:

=== modified file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py	2011-12-08 10:15:23 +0000
+++ oops_amqp/tests/__init__.py	2018-03-12 11:56:56 +0000
@@ -15,16 +15,20 @@
 
 """Tests for oops_amqp."""
 
+from __future__ import absolute_import, print_function
+
 from unittest import TestLoader
 
-from amqplib import client_0_8 as amqp
+import amqp
 from fixtures import Fixture
 from rabbitfixture.server import RabbitServer
 import testtools
 from testresources import (
+    _get_result,
     FixtureResource,
     OptimisingTestSuite,
-    ResourcedTestCase,
+    setUpResources,
+    tearDownResources,
     )
 
 from oops_amqp.utils import close_ignoring_connection_errors
@@ -94,16 +98,27 @@
     def setUp(self):
         super(ChannelFixture, self).setUp()
         self.connection = self.connection_factory()
+        self.connection.connect()
         self.addCleanup(close_ignoring_connection_errors, self.connection)
         self.channel = self.connection.channel()
         self.addCleanup(close_ignoring_connection_errors, self.channel)
 
 
-class TestCase(testtools.TestCase, ResourcedTestCase):
-    """Subclass to mix in testresources ResourcedTestCase."""
+class TestCase(testtools.TestCase):
+    """Subclass to start a RabbitMQ server."""
 
     resources = [('rabbit', FixtureResource(RabbitServer()))]
 
+    def setUp(self):
+        super(TestCase, self).setUp()
+        # ResourcedTestCase handles teardown in the wrong order for us (we
+        # need to ensure that the RabbitServer fixture is only cleaned up
+        # after any other fixtures registered by individual tests), so we
+        # imitate it manually.
+        result = _get_result()
+        setUpResources(self, self.resources, result)
+        self.addCleanup(tearDownResources, self, self.resources, result)
+
     def connection_factory(self):
         """When called, return an amqplib connection."""
         return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,

=== modified file 'oops_amqp/tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py	2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_publisher.py	2018-03-12 11:56:56 +0000
@@ -15,6 +15,8 @@
 
 """Tests for AMQP publishing."""
 
+from __future__ import absolute_import, print_function
+
 from hashlib import md5
 
 from oops_amqp import (
@@ -47,13 +49,16 @@
         self.assertEqual(reference_oops, oops)
         # The received OOPS should have the ID embedded and be a bson dict.
         def check_oops(msg):
-            self.assertEqual(reference_oops, bson.loads(msg.body))
+            body = msg.body
+            if not isinstance(body, bytes):
+                body = body.encode(msg.content_encoding or 'UTF-8')
+            self.assertEqual(reference_oops, bson.loads(body))
             channel.basic_ack(msg.delivery_tag)
             channel.basic_cancel(queue.queue_name)
         channel.basic_consume(
             queue.queue_name, callback=check_oops,
             consumer_tag=queue.queue_name)
-        channel.wait()
+        channel.connection.drain_events()
 
     def test_publish(self):
         # Publishing an oops sends it to the exchange, making a connection as
@@ -75,13 +80,16 @@
         expected_oops = dict(reference_oops)
         expected_oops['id'] = oops_ids[0]
         def check_oops(msg):
-            self.assertEqual(expected_oops, bson.loads(msg.body))
+            body = msg.body
+            if not isinstance(body, bytes):
+                body = body.encode(msg.content_encoding or 'UTF-8')
+            self.assertEqual(expected_oops, bson.loads(body))
             channel.basic_ack(msg.delivery_tag)
             channel.basic_cancel(queue.queue_name)
         channel.basic_consume(
             queue.queue_name, callback=check_oops,
             consumer_tag=queue.queue_name)
-        channel.wait()
+        channel.connection.drain_events()
 
     def test_publish_amqp_already_down(self):
         # If amqp is down when a connection is attempted, None is returned to
@@ -101,7 +109,9 @@
             self.assertEqual([], publisher(oops))
         finally:
             self.rabbit.runner._start()
-            queue.channel = self.connection_factory().channel()
+            connection = self.connection_factory()
+            connection.connect()
+            queue.channel = connection.channel()
         self.assertNotEqual([], publisher(oops))
 
     def test_publish_amqp_down_after_use(self):
@@ -122,6 +132,8 @@
             self.assertEqual([], publisher(oops))
         finally:
             self.rabbit.runner._start()
-            queue.channel = self.connection_factory().channel()
+            connection = self.connection_factory()
+            connection.connect()
+            queue.channel = connection.channel()
         self.assertNotEqual([], publisher(oops))
 

=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py	2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_receiver.py	2018-03-12 11:56:56 +0000
@@ -15,11 +15,14 @@
 
 """Tests for AMQP receiving."""
 
+from __future__ import absolute_import, print_function
+
 import errno
 import socket
 
-from amqplib import client_0_8 as amqp
+import amqp
 from oops import Config
+import six
 
 from oops_amqp import (
     anybson as bson,
@@ -47,7 +50,7 @@
         queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
         channel.basic_publish(
             message, queue.exchange_name, routing_key="")
-        sentinel = "xxx"
+        sentinel = b"xxx"
         channel.basic_publish(
             amqp.Message(sentinel), queue.exchange_name, routing_key="")
         config = Config()
@@ -81,9 +84,9 @@
             def new_channel():
                 result = old_channel()
                 old_wait = result.wait
-                def new_wait(allowed_methods=None):
+                def new_wait(*args, **kwargs):
                     receiver.stopping = True
-                    return old_wait(allowed_methods=allowed_methods)
+                    return old_wait(*args, **kwargs)
                 result.wait = new_wait
                 return result
             connection.channel = new_channel
@@ -93,8 +96,7 @@
         self.assertEqual([expected_report], reports)
 
     def test_run_forever(self):
-        # run_forever subscribes and then calls wait in a loop.
-        config = None
+        # run_forever subscribes and then calls drain_events in a loop.
         calls = []
         class FakeChannel:
             def __init__(self, calls):
@@ -103,25 +105,29 @@
             def basic_consume(self, queue_name, callback=None):
                 self.calls.append(('basic_consume', queue_name, callback))
                 return 'tag'
-            def wait(self):
-                self.calls.append(('wait',))
-                if len(self.calls) > 2:
-                    receiver.stopping = True
             def basic_cancel(self, tag):
                 self.calls.append(('basic_cancel', tag))
             def close(self):
                 self.is_open = False
         class FakeConnection:
+            def __init__(self, calls):
+                self.calls = calls
+            def connect(self):
+                pass
             def channel(self):
                 return FakeChannel(calls)
+            def drain_events(self, timeout=None):
+                self.calls.append(('drain_events', timeout))
+                if len(self.calls) > 2:
+                    receiver.stopping = True
             def close(self):
                 pass
-        receiver = Receiver(None, FakeConnection, 'foo')
+        receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
         receiver.run_forever()
         self.assertEqual(
             [('basic_consume', 'foo', receiver.handle_report),
-             ('wait',),
-             ('wait',),
+             ('drain_events', 1),
+             ('drain_events', 1),
              ('basic_cancel', 'tag')],
             calls)
 
@@ -146,7 +152,7 @@
         state = {}
         def error_once(func):
             def wrapped(*args, **kwargs):
-                func_ref = func.func_code
+                func_ref = six.get_function_code(func)
                 if func_ref in state:
                     return func(*args, **kwargs)
                 else:
@@ -162,17 +168,17 @@
             @error_once
             def new_channel():
                 result = old_channel()
-                result.wait = error_once(result.wait)
                 result.basic_consume = error_once(result.basic_consume)
                 result.basic_cancel = error_once(result.basic_cancel)
                 result.close = error_once(result.close)
                 return result
             connection.channel = new_channel
+            connection.drain_events = error_once(connection.drain_events)
             connection.close = error_once(connection.close)
             return connection
         receiver = Receiver(config, patching_factory, queue.queue_name)
-        receiver.sentinel = "arhh"
+        receiver.sentinel = b"arhh"
         channel.basic_publish(
-            amqp.Message("arhh"), queue.exchange_name, routing_key="")
+            amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
         receiver.run_forever()
         self.assertEqual([expected_report], reports)

=== modified file 'oops_amqp/trace.py'
--- oops_amqp/trace.py	2012-08-10 02:56:01 +0000
+++ oops_amqp/trace.py	2018-03-12 11:56:56 +0000
@@ -15,17 +15,17 @@
 
 """Trace OOPS reports coming from an AMQP queue."""
 
+from __future__ import absolute_import, print_function
+
 from functools import partial
 import sys
 import optparse
 from textwrap import dedent
 
-import amqplib.client_0_8 as amqp
+import amqp
 import oops
 import oops_amqp
 
-import anybson as bson
-
 
 def main(argv=None):
     if argv is None:

=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py	2011-12-08 10:15:23 +0000
+++ oops_amqp/utils.py	2018-03-12 11:56:56 +0000
@@ -15,10 +15,11 @@
 
 """Utility functions for oops_amqp."""
 
-import errno
+from __future__ import absolute_import, print_function
+
 import socket
 
-from amqplib.client_0_8.exceptions import AMQPConnectionException
+from amqp.exceptions import ConnectionError
 
 __all__ = [
     'amqplib_error_types',
@@ -30,7 +31,7 @@
 # These exception types always indicate an AMQP connection error/closure.
 # However you should catch amqplib_error_types and post-filter with
 # is_amqplib_connection_error.
-amqplib_connection_errors = (socket.error, AMQPConnectionException)
+amqplib_connection_errors = (socket.error, ConnectionError)
 # A tuple to reduce duplication in different code paths. Lists the types of
 # exceptions legitimately raised by amqplib when the AMQP server goes down.
 # Not all exceptions *will* be such errors - use is_amqplib_connection_error to
@@ -41,7 +42,7 @@
 def close_ignoring_connection_errors(closable):
     try:
         return closable.close()
-    except amqplib_error_types, e:
+    except amqplib_error_types as e:
         if is_amqplib_connection_error(e):
             return
         raise

=== modified file 'setup.py'
--- setup.py	2015-10-02 16:25:32 +0000
+++ setup.py	2018-03-12 11:56:56 +0000
@@ -19,8 +19,8 @@
 from distutils.core import setup
 import os.path
 
-description = file(
-        os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
+with open(os.path.join(os.path.dirname(__file__), 'README')) as f:
+    description = f.read()
 
 setup(name="oops_amqp",
       version="0.0.8b1",
@@ -38,15 +38,18 @@
           'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)',
           'Operating System :: OS Independent',
           'Programming Language :: Python',
+          'Programming Language :: Python :: 2',
+          'Programming Language :: Python :: 3',
           ],
       install_requires = [
           'bson',
           'oops>=0.0.11',
-          'amqplib',
+          'amqp>=2.0.0',
           ],
       extras_require = dict(
           test=[
               'rabbitfixture',
+              'six',
               'testresources',
               'testtools',
               ]

=== modified file 'versions.cfg'
--- versions.cfg	2012-08-10 01:41:44 +0000
+++ versions.cfg	2018-03-12 11:56:56 +0000
@@ -2,7 +2,7 @@
 versions = versions
 
 [versions]
-amqplib = 0.6.1
+amqp = 2.2.2
 bson = 0.3.2
 fixtures = 0.3.6
 iso8601 = 0.1.4
@@ -11,8 +11,10 @@
 pytz = 2010o
 rabbitfixture = 0.3.2
 setuptools = 0.6c11
+six = 1.11.0
 testresources = 0.2.4-r58
 testtools = 0.9.12-r228
+vine = 1.1.4
 zc.recipe.egg = 1.3.2
 z3c.recipe.filetemplate = 2.1.0
 z3c.recipe.scripts = 1.0.1