launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #26155
[Merge] lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp
Colin Watson has proposed merging lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp.
Commit message:
Port from amqplib to amqp.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-datedir2amqp/amqp/+merge/397248
This matches the change in oops-amqp 0.1.0.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp.
=== modified file 'NEWS'
--- NEWS 2012-09-03 23:29:34 +0000
+++ NEWS 2021-02-01 12:47:03 +0000
@@ -6,6 +6,8 @@
NEXT
----
+* Port from amqplib to amqp. (Colin Watson)
+
0.0.4
-----
=== modified file 'oops_datedir2amqp/main.py'
--- oops_datedir2amqp/main.py 2011-12-01 01:31:15 +0000
+++ oops_datedir2amqp/main.py 2021-02-01 12:47:03 +0000
@@ -21,13 +21,19 @@
from textwrap import dedent
import sys
-from amqplib import client_0_8 as amqp
+import amqp
import oops_amqp
import oops_datedir_repo
__all__ = ['main']
+def republish(factory, exchange, key, repo):
+ publisher = oops_amqp.Publisher(factory, exchange, key, inherit_id=True)
+ repo = oops_datedir_repo.DateDirRepo(repo)
+ repo.republish(publisher)
+
+
def main(argv=None):
if argv is None:
argv=sys.argv
@@ -77,6 +83,4 @@
factory = partial(
amqp.Connection, host=options.host, userid=options.username,
password=options.password, virtual_host=options.vhost)
- publisher = oops_amqp.Publisher(factory, options.exchange, options.key, inherit_id=True)
- repo = oops_datedir_repo.DateDirRepo(options.repo)
- repo.republish(publisher)
+ republish(factory, options.exchange, options.key, options.repo)
=== modified file 'oops_datedir2amqp/tests/__init__.py'
--- oops_datedir2amqp/tests/__init__.py 2011-11-03 10:10:10 +0000
+++ oops_datedir2amqp/tests/__init__.py 2021-02-01 12:47:03 +0000
@@ -18,13 +18,114 @@
from unittest import TestLoader
-from testresources import OptimisingTestSuite
+import amqp
+from fixtures import Fixture
+from oops_amqp.utils import close_ignoring_connection_errors
+from rabbitfixture.server import RabbitServer
+from testresources import (
+ _get_result,
+ FixtureResource,
+ OptimisingTestSuite,
+ setUpResources,
+ tearDownResources,
+ )
+import testtools
__all__ = [
+ 'ChannelFixture',
+ 'QueueFixture',
'test_suite',
+ 'TestCase',
]
+
+class QueueFixture(Fixture):
+ """Create an exchange with a subscribed queue on an AMQP instance.
+
+ The exchange and queue are made durable (to permit testing with amqp server
+ restarts) and non-auto-delete (to permit dropping the test connection and
+ restablishing it without losing the config (e.g. server restarts).
+
+ If the server is restarted, the channel may need to be updated before
+ teardown, or the teardown will not be able to delete the exchange and
+ queue.
+
+ Possibly wants to live in rabbitfixture, or a amqpfixture.
+ """
+
+ def __init__(self, channel, unique_string_factory):
+ """Create a QueueFixture.
+
+ :param channel: An amqplib channel to request the exchange and queue
+ over.
+ :param unique_string_factory: A helper that will return a (process
+ lifetime scope) unique string.
+ """
+ self.channel = channel
+ self.unique_string_factory = unique_string_factory
+
+ def setUp(self):
+ super(QueueFixture, self).setUp()
+ self.exchange_name = self.unique_string_factory()
+ self.channel.exchange_declare(
+ exchange=self.exchange_name, type="fanout", durable=True,
+ auto_delete=False)
+ self.addCleanup(self.delete_exchange)
+ self.queue_name, _, _ = self.channel.queue_declare(
+ durable=True, auto_delete=False)
+ self.addCleanup(self.delete_queue)
+ self.channel.queue_bind(self.queue_name, self.exchange_name)
+
+ def delete_queue(self):
+ self.channel.queue_delete(self.queue_name)
+
+ def delete_exchange(self):
+ self.channel.exchange_delete(self.exchange_name)
+
+
+class ChannelFixture(Fixture):
+ """Create an AMQP connection and channel for tests.
+
+ :ivar connection: an amqplib connection.
+ :ivar channel: an amqplib channel
+ """
+
+ def __init__(self, connection_factory):
+ super(ChannelFixture, self).__init__()
+ self.connection_factory = connection_factory
+
+ def setUp(self):
+ super(ChannelFixture, self).setUp()
+ self.connection = self.connection_factory()
+ self.connection.connect()
+ self.addCleanup(close_ignoring_connection_errors, self.connection)
+ self.channel = self.connection.channel()
+ self.addCleanup(close_ignoring_connection_errors, self.channel)
+
+
+class TestCase(testtools.TestCase):
+ """Subclass to start a RabbitMQ server."""
+
+ resources = [('rabbit', FixtureResource(RabbitServer()))]
+
+ def setUp(self):
+ super(TestCase, self).setUp()
+ # ResourcedTestCase handles teardown in the wrong order for us (we
+ # need to ensure that the RabbitServer fixture is only cleaned up
+ # after any other fixtures registered by individual tests), so we
+ # imitate it manually.
+ result = _get_result()
+ setUpResources(self, self.resources, result)
+ self.addCleanup(tearDownResources, self, self.resources, result)
+
+ def connection_factory(self):
+ """When called, return an amqplib connection."""
+ return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
+ self.rabbit.config.port), userid="guest", password="guest",
+ virtual_host="/")
+
+
def test_suite():
test_mod_names = [
'main',
=== modified file 'oops_datedir2amqp/tests/test_main.py'
--- oops_datedir2amqp/tests/test_main.py 2011-11-03 10:10:10 +0000
+++ oops_datedir2amqp/tests/test_main.py 2021-02-01 12:47:03 +0000
@@ -16,11 +16,53 @@
"""Tests for main()."""
-from testtools import TestCase
+from fixtures import TempDir
+from oops_datedir_repo import DateDirRepo
+from oops_amqp import anybson as bson
+from testtools.matchers import (
+ ContainsDict,
+ Equals,
+ MatchesSetwise,
+ )
+
+from oops_datedir2amqp.main import republish
+from oops_datedir2amqp.tests import (
+ ChannelFixture,
+ QueueFixture,
+ TestCase,
+ )
class TestMain(TestCase):
- def test_importable(self):
- # This is where setup looks for it.
- from oops_datedir2amqp import main
+ def test_publishes(self):
+ channel = self.useFixture(
+ ChannelFixture(self.connection_factory)).channel
+ queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
+ repo = DateDirRepo(self.useFixture(TempDir()).path)
+ reports = [{'something': self.getUniqueString()} for _ in range(3)]
+ for report in reports:
+ [oops_id] = repo.publish(report)
+ report['id'] = oops_id
+ republish(self.connection_factory, queue.exchange_name, '', repo.root)
+ republished = []
+
+ def consumer(msg):
+ body = msg.body
+ if not isinstance(body, bytes):
+ body = body.encode(msg.content_encoding or 'UTF-8')
+ republished.append(bson.loads(body))
+ channel.basic_ack(msg.delivery_tag)
+ if len(republished) == len(reports):
+ channel.basic_cancel(queue.queue_name)
+
+ channel.basic_consume(
+ queue.queue_name, callback=consumer, consumer_tag=queue.queue_name)
+ while len(republished) < len(reports):
+ channel.connection.drain_events()
+ self.assertThat(
+ republished,
+ MatchesSetwise(*(
+ ContainsDict(
+ {key: Equals(value) for key, value in report.items()})
+ for report in reports)))
=== modified file 'setup.py'
--- setup.py 2012-09-03 23:34:02 +0000
+++ setup.py 2021-02-01 12:47:03 +0000
@@ -40,8 +40,9 @@
'Programming Language :: Python',
],
install_requires = [
+ 'amqp>=2.0.0',
'oops_datedir_repo',
- 'oops_amqp',
+ 'oops_amqp>=0.1.0',
],
extras_require = dict(
test=[
=== modified file 'versions.cfg'
--- versions.cfg 2012-09-03 23:29:34 +0000
+++ versions.cfg 2021-02-01 12:47:03 +0000
@@ -2,7 +2,7 @@
versions = versions
[versions]
-amqplib = 0.6.1
+amqp = 2.6.1
bson = 0.3.2
elementtree = 1.2.6-20050316
fixtures = 0.3.6
@@ -15,7 +15,7 @@
lazr.uri = 1.0.2
oauth = 1.0.1
oops = 0.0.13
-oops-amqp = 0.0.7
+oops-amqp = 0.1.0
oops-datedir-repo = 0.0.18
pymongo = 2.1.1
pytz = 2011n
Follow ups