launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #22241
[Merge] lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp
Colin Watson has proposed merging lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp.
Commit message:
Add Python 3 support.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-amqp/py3/+merge/341298
This also involves porting from amqplib to the better-maintained amqp.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-amqp/py3 into lp:python-oops-amqp.
=== modified file '.bzrignore'
--- .bzrignore 2011-12-08 10:50:34 +0000
+++ .bzrignore 2018-03-12 11:56:56 +0000
@@ -1,3 +1,4 @@
+__pycache__
./eggs/*
./.installed.cfg
./develop-eggs
=== modified file 'NEWS'
--- NEWS 2015-10-08 14:49:18 +0000
+++ NEWS 2018-03-12 11:56:56 +0000
@@ -9,6 +9,8 @@
* Dropped dependency on pymongo in favor of bson. This avoids having
to depend both on bson and pymongo when installing in conjunction
with oops-datedir-repo. (Ricardo Kirkner)
+* Port from amqplib to amqp. (Colin Watson)
+* Add Python 3 support. (Colin Watson)
0.0.7
-----
=== modified file 'README'
--- README 2012-08-10 01:41:44 +0000
+++ README 2018-03-12 11:56:56 +0000
@@ -31,7 +31,7 @@
* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
-* amqplib
+* amqp
Testing Dependencies
====================
@@ -57,7 +57,7 @@
connection - and the exchange name and routing key to submit to.
>>> factory = partial(amqp.Connection, host="localhost:5672",
- ... userid="guest", password="guest", virtual_host="/", insist=False)
+ ... userid="guest", password="guest", virtual_host="/")
>>> publisher = oops_amqp.Publisher(factory, "oopses", "")
Provide the publisher to your OOPS config::
@@ -70,7 +70,7 @@
OOPS ids are generating by hashing the oops message (without the id field) -
this ensures unique ids.
-The reason a factory is used is because amqplib is not threadsafe - the
+The reason a factory is used is because amqp is not threadsafe - the
publisher maintains a thread locals object to hold the factories and creates
connections when new threads are created(when they first generate an OOPS).
@@ -87,7 +87,7 @@
method failed::
>>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
- ... userid="guest", password="guest", virtual_host="/", insist=False)
+ ... userid="guest", password="guest", virtual_host="/")
>>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
>>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py 2015-10-02 16:26:57 +0000
+++ oops_amqp/__init__.py 2018-03-12 11:56:56 +0000
@@ -32,7 +32,7 @@
connection - and the exchange name and routing key to submit to.
>>> factory = partial(amqp.Connection, host="localhost:5672",
- ... userid="guest", password="guest", virtual_host="/", insist=False)
+ ... userid="guest", password="guest", virtual_host="/")
>>> publisher = oops_amqp.Publisher(factory, "oopses", "")
Provide the publisher to your OOPS config::
@@ -45,7 +45,7 @@
OOPS ids are generating by hashing the oops message (without the id field) -
this ensures unique ids.
-The reason a factory is used is because amqplib is not threadsafe - the
+The reason a factory is used is because amqp is not threadsafe - the
publisher maintains a thread locals object to hold the factories and creates
connections when new threads are created(when they first generate an OOPS).
@@ -62,7 +62,7 @@
method failed::
>>> fallback_factory = partial(amqp.Connection, host="otherserver:5672",
- ... userid="guest", password="guest", virtual_host="/", insist=False)
+ ... userid="guest", password="guest", virtual_host="/")
>>> fallback_publisher = oops_amqp.Publisher(fallback_factory, "oopses", "")
>>> config.publisher = publish_with_fallback(publisher, fallback_publisher)
@@ -86,6 +86,8 @@
>>> receiver.run_forever()
"""
+from __future__ import absolute_import, print_function
+
# same format as sys.version_info: "A tuple containing the five components of
# the version number: major, minor, micro, releaselevel, and serial. All
# values except releaselevel are integers; the release level is 'alpha',
=== modified file 'oops_amqp/anybson.py'
--- oops_amqp/anybson.py 2012-02-10 19:27:09 +0000
+++ oops_amqp/anybson.py 2018-03-12 11:56:56 +0000
@@ -13,6 +13,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# GNU Lesser General Public License version 3 (see the file LICENSE).
+from __future__ import absolute_import, print_function
+
__all__ = [
'dumps',
'loads',
=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/publisher.py 2018-03-12 11:56:56 +0000
@@ -15,15 +15,17 @@
"""Publish OOPS reports over amqp."""
+from __future__ import absolute_import, print_function
+
__metaclass__ = type
from hashlib import md5
from threading import local
-from amqplib import client_0_8 as amqp
-from anybson import dumps
+import amqp
-from utils import (
+from oops_amqp.anybson import dumps
+from oops_amqp.utils import (
amqplib_error_types,
is_amqplib_connection_error,
)
@@ -63,8 +65,10 @@
def get_channel(self):
if getattr(self.channels, 'channel', None) is None:
try:
- self.channels.channel = self.connection_factory().channel()
- except amqplib_error_types, e:
+ connection = self.connection_factory()
+ connection.connect()
+ self.channels.channel = connection.channel()
+ except amqplib_error_types as e:
if is_amqplib_connection_error(e):
# Could not connect
return None
@@ -92,7 +96,7 @@
try:
channel.basic_publish(
message, self.exchange_name, routing_key=self.routing_key)
- except amqplib_error_types, e:
+ except amqplib_error_types as e:
self.channels.channel = None
if is_amqplib_connection_error(e):
# Could not connect / interrupted connection
=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py 2012-02-09 23:13:45 +0000
+++ oops_amqp/receiver.py 2018-03-12 11:56:56 +0000
@@ -15,12 +15,14 @@
"""Receive OOPS reports over amqp and republish locally."""
+from __future__ import absolute_import, print_function
+
__metaclass__ = type
import time
-import anybson as bson
-from utils import (
+from oops_amqp import anybson as bson
+from oops_amqp.utils import (
amqplib_error_types,
close_ignoring_connection_errors,
is_amqplib_connection_error,
@@ -56,12 +58,16 @@
self.sentinel = None
def handle_report(self, message):
- if message.body == self.sentinel:
+ # bson requires bytes.
+ body = message.body
+ if not isinstance(body, bytes):
+ body = body.encode(message.content_encoding or 'UTF-8')
+ if body == self.sentinel:
self.stopping = True
self.channel.basic_ack(message.delivery_tag)
return
try:
- report = bson.loads(message.body)
+ report = bson.loads(body)
except KeyError:
# Garbage in the queue. Possibly this should raise an OOPS itself
# (through a different config) or log an info level message.
@@ -82,7 +88,7 @@
(not self.went_bad or time.time() < self.went_bad + 120)):
try:
self._run_forever()
- except amqplib_error_types, e:
+ except amqplib_error_types as e:
if not is_amqplib_connection_error(e):
# Something unknown went wrong.
raise
@@ -94,6 +100,7 @@
def _run_forever(self):
self.connection = self.connection_factory()
+ self.connection.connect()
# A successful connection: record this so run_forever won't bail early.
self.went_bad = None
try:
@@ -103,7 +110,7 @@
self.queue_name, callback=self.handle_report)
try:
while True:
- self.channel.wait()
+ self.connection.drain_events(timeout=1)
if self.stopping:
break
finally:
=== modified file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py 2011-12-08 10:15:23 +0000
+++ oops_amqp/tests/__init__.py 2018-03-12 11:56:56 +0000
@@ -15,16 +15,20 @@
"""Tests for oops_amqp."""
+from __future__ import absolute_import, print_function
+
from unittest import TestLoader
-from amqplib import client_0_8 as amqp
+import amqp
from fixtures import Fixture
from rabbitfixture.server import RabbitServer
import testtools
from testresources import (
+ _get_result,
FixtureResource,
OptimisingTestSuite,
- ResourcedTestCase,
+ setUpResources,
+ tearDownResources,
)
from oops_amqp.utils import close_ignoring_connection_errors
@@ -94,16 +98,27 @@
def setUp(self):
super(ChannelFixture, self).setUp()
self.connection = self.connection_factory()
+ self.connection.connect()
self.addCleanup(close_ignoring_connection_errors, self.connection)
self.channel = self.connection.channel()
self.addCleanup(close_ignoring_connection_errors, self.channel)
-class TestCase(testtools.TestCase, ResourcedTestCase):
- """Subclass to mix in testresources ResourcedTestCase."""
+class TestCase(testtools.TestCase):
+ """Subclass to start a RabbitMQ server."""
resources = [('rabbit', FixtureResource(RabbitServer()))]
+ def setUp(self):
+ super(TestCase, self).setUp()
+ # ResourcedTestCase handles teardown in the wrong order for us (we
+ # need to ensure that the RabbitServer fixture is only cleaned up
+ # after any other fixtures registered by individual tests), so we
+ # imitate it manually.
+ result = _get_result()
+ setUpResources(self, self.resources, result)
+ self.addCleanup(tearDownResources, self, self.resources, result)
+
def connection_factory(self):
"""When called, return an amqplib connection."""
return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
=== modified file 'oops_amqp/tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_publisher.py 2018-03-12 11:56:56 +0000
@@ -15,6 +15,8 @@
"""Tests for AMQP publishing."""
+from __future__ import absolute_import, print_function
+
from hashlib import md5
from oops_amqp import (
@@ -47,13 +49,16 @@
self.assertEqual(reference_oops, oops)
# The received OOPS should have the ID embedded and be a bson dict.
def check_oops(msg):
- self.assertEqual(reference_oops, bson.loads(msg.body))
+ body = msg.body
+ if not isinstance(body, bytes):
+ body = body.encode(msg.content_encoding or 'UTF-8')
+ self.assertEqual(reference_oops, bson.loads(body))
channel.basic_ack(msg.delivery_tag)
channel.basic_cancel(queue.queue_name)
channel.basic_consume(
queue.queue_name, callback=check_oops,
consumer_tag=queue.queue_name)
- channel.wait()
+ channel.connection.drain_events()
def test_publish(self):
# Publishing an oops sends it to the exchange, making a connection as
@@ -75,13 +80,16 @@
expected_oops = dict(reference_oops)
expected_oops['id'] = oops_ids[0]
def check_oops(msg):
- self.assertEqual(expected_oops, bson.loads(msg.body))
+ body = msg.body
+ if not isinstance(body, bytes):
+ body = body.encode(msg.content_encoding or 'UTF-8')
+ self.assertEqual(expected_oops, bson.loads(body))
channel.basic_ack(msg.delivery_tag)
channel.basic_cancel(queue.queue_name)
channel.basic_consume(
queue.queue_name, callback=check_oops,
consumer_tag=queue.queue_name)
- channel.wait()
+ channel.connection.drain_events()
def test_publish_amqp_already_down(self):
# If amqp is down when a connection is attempted, None is returned to
@@ -101,7 +109,9 @@
self.assertEqual([], publisher(oops))
finally:
self.rabbit.runner._start()
- queue.channel = self.connection_factory().channel()
+ connection = self.connection_factory()
+ connection.connect()
+ queue.channel = connection.channel()
self.assertNotEqual([], publisher(oops))
def test_publish_amqp_down_after_use(self):
@@ -122,6 +132,8 @@
self.assertEqual([], publisher(oops))
finally:
self.rabbit.runner._start()
- queue.channel = self.connection_factory().channel()
+ connection = self.connection_factory()
+ connection.connect()
+ queue.channel = connection.channel()
self.assertNotEqual([], publisher(oops))
=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2012-08-10 01:41:44 +0000
+++ oops_amqp/tests/test_receiver.py 2018-03-12 11:56:56 +0000
@@ -15,11 +15,14 @@
"""Tests for AMQP receiving."""
+from __future__ import absolute_import, print_function
+
import errno
import socket
-from amqplib import client_0_8 as amqp
+import amqp
from oops import Config
+import six
from oops_amqp import (
anybson as bson,
@@ -47,7 +50,7 @@
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
channel.basic_publish(
message, queue.exchange_name, routing_key="")
- sentinel = "xxx"
+ sentinel = b"xxx"
channel.basic_publish(
amqp.Message(sentinel), queue.exchange_name, routing_key="")
config = Config()
@@ -81,9 +84,9 @@
def new_channel():
result = old_channel()
old_wait = result.wait
- def new_wait(allowed_methods=None):
+ def new_wait(*args, **kwargs):
receiver.stopping = True
- return old_wait(allowed_methods=allowed_methods)
+ return old_wait(*args, **kwargs)
result.wait = new_wait
return result
connection.channel = new_channel
@@ -93,8 +96,7 @@
self.assertEqual([expected_report], reports)
def test_run_forever(self):
- # run_forever subscribes and then calls wait in a loop.
- config = None
+ # run_forever subscribes and then calls drain_events in a loop.
calls = []
class FakeChannel:
def __init__(self, calls):
@@ -103,25 +105,29 @@
def basic_consume(self, queue_name, callback=None):
self.calls.append(('basic_consume', queue_name, callback))
return 'tag'
- def wait(self):
- self.calls.append(('wait',))
- if len(self.calls) > 2:
- receiver.stopping = True
def basic_cancel(self, tag):
self.calls.append(('basic_cancel', tag))
def close(self):
self.is_open = False
class FakeConnection:
+ def __init__(self, calls):
+ self.calls = calls
+ def connect(self):
+ pass
def channel(self):
return FakeChannel(calls)
+ def drain_events(self, timeout=None):
+ self.calls.append(('drain_events', timeout))
+ if len(self.calls) > 2:
+ receiver.stopping = True
def close(self):
pass
- receiver = Receiver(None, FakeConnection, 'foo')
+ receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
receiver.run_forever()
self.assertEqual(
[('basic_consume', 'foo', receiver.handle_report),
- ('wait',),
- ('wait',),
+ ('drain_events', 1),
+ ('drain_events', 1),
('basic_cancel', 'tag')],
calls)
@@ -146,7 +152,7 @@
state = {}
def error_once(func):
def wrapped(*args, **kwargs):
- func_ref = func.func_code
+ func_ref = six.get_function_code(func)
if func_ref in state:
return func(*args, **kwargs)
else:
@@ -162,17 +168,17 @@
@error_once
def new_channel():
result = old_channel()
- result.wait = error_once(result.wait)
result.basic_consume = error_once(result.basic_consume)
result.basic_cancel = error_once(result.basic_cancel)
result.close = error_once(result.close)
return result
connection.channel = new_channel
+ connection.drain_events = error_once(connection.drain_events)
connection.close = error_once(connection.close)
return connection
receiver = Receiver(config, patching_factory, queue.queue_name)
- receiver.sentinel = "arhh"
+ receiver.sentinel = b"arhh"
channel.basic_publish(
- amqp.Message("arhh"), queue.exchange_name, routing_key="")
+ amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
receiver.run_forever()
self.assertEqual([expected_report], reports)
=== modified file 'oops_amqp/trace.py'
--- oops_amqp/trace.py 2012-08-10 02:56:01 +0000
+++ oops_amqp/trace.py 2018-03-12 11:56:56 +0000
@@ -15,17 +15,17 @@
"""Trace OOPS reports coming from an AMQP queue."""
+from __future__ import absolute_import, print_function
+
from functools import partial
import sys
import optparse
from textwrap import dedent
-import amqplib.client_0_8 as amqp
+import amqp
import oops
import oops_amqp
-import anybson as bson
-
def main(argv=None):
if argv is None:
=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py 2011-12-08 10:15:23 +0000
+++ oops_amqp/utils.py 2018-03-12 11:56:56 +0000
@@ -15,10 +15,11 @@
"""Utility functions for oops_amqp."""
-import errno
+from __future__ import absolute_import, print_function
+
import socket
-from amqplib.client_0_8.exceptions import AMQPConnectionException
+from amqp.exceptions import ConnectionError
__all__ = [
'amqplib_error_types',
@@ -30,7 +31,7 @@
# These exception types always indicate an AMQP connection error/closure.
# However you should catch amqplib_error_types and post-filter with
# is_amqplib_connection_error.
-amqplib_connection_errors = (socket.error, AMQPConnectionException)
+amqplib_connection_errors = (socket.error, ConnectionError)
# A tuple to reduce duplication in different code paths. Lists the types of
# exceptions legitimately raised by amqplib when the AMQP server goes down.
# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
@@ -41,7 +42,7 @@
def close_ignoring_connection_errors(closable):
try:
return closable.close()
- except amqplib_error_types, e:
+ except amqplib_error_types as e:
if is_amqplib_connection_error(e):
return
raise
=== modified file 'setup.py'
--- setup.py 2015-10-02 16:25:32 +0000
+++ setup.py 2018-03-12 11:56:56 +0000
@@ -19,8 +19,8 @@
from distutils.core import setup
import os.path
-description = file(
- os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
+with open(os.path.join(os.path.dirname(__file__), 'README')) as f:
+ description = f.read()
setup(name="oops_amqp",
version="0.0.8b1",
@@ -38,15 +38,18 @@
'License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)',
'Operating System :: OS Independent',
'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 3',
],
install_requires = [
'bson',
'oops>=0.0.11',
- 'amqplib',
+ 'amqp>=2.0.0',
],
extras_require = dict(
test=[
'rabbitfixture',
+ 'six',
'testresources',
'testtools',
]
=== modified file 'versions.cfg'
--- versions.cfg 2012-08-10 01:41:44 +0000
+++ versions.cfg 2018-03-12 11:56:56 +0000
@@ -2,7 +2,7 @@
versions = versions
[versions]
-amqplib = 0.6.1
+amqp = 2.2.2
bson = 0.3.2
fixtures = 0.3.6
iso8601 = 0.1.4
@@ -11,8 +11,10 @@
pytz = 2010o
rabbitfixture = 0.3.2
setuptools = 0.6c11
+six = 1.11.0
testresources = 0.2.4-r58
testtools = 0.9.12-r228
+vine = 1.1.4
zc.recipe.egg = 1.3.2
z3c.recipe.filetemplate = 2.1.0
z3c.recipe.scripts = 1.0.1