← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp

 

Colin Watson has proposed merging lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp.

Commit message:
Port from amqplib to amqp.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-datedir2amqp/amqp/+merge/397248

This matches the change in oops-amqp 0.1.0.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-datedir2amqp/amqp into lp:python-oops-datedir2amqp.
=== modified file 'NEWS'
--- NEWS	2012-09-03 23:29:34 +0000
+++ NEWS	2021-02-01 12:47:03 +0000
@@ -6,6 +6,8 @@
 NEXT
 ----
 
+* Port from amqplib to amqp. (Colin Watson)
+
 0.0.4
 -----
 

=== modified file 'oops_datedir2amqp/main.py'
--- oops_datedir2amqp/main.py	2011-12-01 01:31:15 +0000
+++ oops_datedir2amqp/main.py	2021-02-01 12:47:03 +0000
@@ -21,13 +21,19 @@
 from textwrap import dedent
 import sys
 
-from amqplib import client_0_8 as amqp
+import amqp
 import oops_amqp
 import oops_datedir_repo
 
 __all__ = ['main']
 
 
+def republish(factory, exchange, key, repo):
+    publisher = oops_amqp.Publisher(factory, exchange, key, inherit_id=True)
+    repo = oops_datedir_repo.DateDirRepo(repo)
+    repo.republish(publisher)
+
+
 def main(argv=None):
     if argv is None:
         argv=sys.argv
@@ -77,6 +83,4 @@
     factory = partial(
         amqp.Connection, host=options.host, userid=options.username,
         password=options.password, virtual_host=options.vhost)
-    publisher = oops_amqp.Publisher(factory, options.exchange, options.key, inherit_id=True)
-    repo = oops_datedir_repo.DateDirRepo(options.repo)
-    repo.republish(publisher)
+    republish(factory, options.exchange, options.key, options.repo)

=== modified file 'oops_datedir2amqp/tests/__init__.py'
--- oops_datedir2amqp/tests/__init__.py	2011-11-03 10:10:10 +0000
+++ oops_datedir2amqp/tests/__init__.py	2021-02-01 12:47:03 +0000
@@ -18,13 +18,114 @@
 
 from unittest import TestLoader
 
-from testresources import OptimisingTestSuite
+import amqp
+from fixtures import Fixture
+from oops_amqp.utils import close_ignoring_connection_errors
+from rabbitfixture.server import RabbitServer
+from testresources import (
+    _get_result,
+    FixtureResource,
+    OptimisingTestSuite,
+    setUpResources,
+    tearDownResources,
+    )
+import testtools
 
 
 __all__ = [
+    'ChannelFixture',
+    'QueueFixture',
     'test_suite',
+    'TestCase',
     ]
 
+
+class QueueFixture(Fixture):
+    """Create an exchange with a subscribed queue on an AMQP instance.
+
+    The exchange and queue are made durable (to permit testing with amqp server
+    restarts) and non-auto-delete (to permit dropping the test connection and
+    restablishing it without losing the config (e.g. server restarts).
+
+    If the server is restarted, the channel may need to be updated before
+    teardown, or the teardown will not be able to delete the exchange and
+    queue.
+
+    Possibly wants to live in rabbitfixture, or a amqpfixture.
+    """
+
+    def __init__(self, channel, unique_string_factory):
+        """Create a QueueFixture.
+
+        :param channel: An amqplib channel to request the exchange and queue
+            over.
+        :param unique_string_factory: A helper that will return a (process
+            lifetime scope) unique string.
+        """
+        self.channel = channel
+        self.unique_string_factory = unique_string_factory
+
+    def setUp(self):
+        super(QueueFixture, self).setUp()
+        self.exchange_name = self.unique_string_factory()
+        self.channel.exchange_declare(
+            exchange=self.exchange_name, type="fanout", durable=True,
+            auto_delete=False)
+        self.addCleanup(self.delete_exchange)
+        self.queue_name, _, _ = self.channel.queue_declare(
+            durable=True, auto_delete=False)
+        self.addCleanup(self.delete_queue)
+        self.channel.queue_bind(self.queue_name, self.exchange_name)
+
+    def delete_queue(self):
+        self.channel.queue_delete(self.queue_name)
+
+    def delete_exchange(self):
+        self.channel.exchange_delete(self.exchange_name)
+
+
+class ChannelFixture(Fixture):
+    """Create an AMQP connection and channel for tests.
+
+    :ivar connection: an amqplib connection.
+    :ivar channel: an amqplib channel
+    """
+
+    def __init__(self, connection_factory):
+        super(ChannelFixture, self).__init__()
+        self.connection_factory = connection_factory
+
+    def setUp(self):
+        super(ChannelFixture, self).setUp()
+        self.connection = self.connection_factory()
+        self.connection.connect()
+        self.addCleanup(close_ignoring_connection_errors, self.connection)
+        self.channel = self.connection.channel()
+        self.addCleanup(close_ignoring_connection_errors, self.channel)
+
+
+class TestCase(testtools.TestCase):
+    """Subclass to start a RabbitMQ server."""
+
+    resources = [('rabbit', FixtureResource(RabbitServer()))]
+
+    def setUp(self):
+        super(TestCase, self).setUp()
+        # ResourcedTestCase handles teardown in the wrong order for us (we
+        # need to ensure that the RabbitServer fixture is only cleaned up
+        # after any other fixtures registered by individual tests), so we
+        # imitate it manually.
+        result = _get_result()
+        setUpResources(self, self.resources, result)
+        self.addCleanup(tearDownResources, self, self.resources, result)
+
+    def connection_factory(self):
+        """When called, return an amqplib connection."""
+        return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
+            self.rabbit.config.port), userid="guest", password="guest",
+            virtual_host="/")
+
+
 def test_suite():
     test_mod_names = [
         'main',

=== modified file 'oops_datedir2amqp/tests/test_main.py'
--- oops_datedir2amqp/tests/test_main.py	2011-11-03 10:10:10 +0000
+++ oops_datedir2amqp/tests/test_main.py	2021-02-01 12:47:03 +0000
@@ -16,11 +16,53 @@
 
 """Tests for main()."""
 
-from testtools import TestCase
+from fixtures import TempDir
+from oops_datedir_repo import DateDirRepo
+from oops_amqp import anybson as bson
+from testtools.matchers import (
+    ContainsDict,
+    Equals,
+    MatchesSetwise,
+    )
+
+from oops_datedir2amqp.main import republish
+from oops_datedir2amqp.tests import (
+    ChannelFixture,
+    QueueFixture,
+    TestCase,
+    )
 
 
 class TestMain(TestCase):
 
-    def test_importable(self):
-        # This is where setup looks for it.
-        from oops_datedir2amqp import main
+    def test_publishes(self):
+        channel = self.useFixture(
+            ChannelFixture(self.connection_factory)).channel
+        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
+        repo = DateDirRepo(self.useFixture(TempDir()).path)
+        reports = [{'something': self.getUniqueString()} for _ in range(3)]
+        for report in reports:
+            [oops_id] = repo.publish(report)
+            report['id'] = oops_id
+        republish(self.connection_factory, queue.exchange_name, '', repo.root)
+        republished = []
+
+        def consumer(msg):
+            body = msg.body
+            if not isinstance(body, bytes):
+                body = body.encode(msg.content_encoding or 'UTF-8')
+            republished.append(bson.loads(body))
+            channel.basic_ack(msg.delivery_tag)
+            if len(republished) == len(reports):
+                channel.basic_cancel(queue.queue_name)
+
+        channel.basic_consume(
+            queue.queue_name, callback=consumer, consumer_tag=queue.queue_name)
+        while len(republished) < len(reports):
+            channel.connection.drain_events()
+        self.assertThat(
+            republished,
+            MatchesSetwise(*(
+                ContainsDict(
+                    {key: Equals(value) for key, value in report.items()})
+                for report in reports)))

=== modified file 'setup.py'
--- setup.py	2012-09-03 23:34:02 +0000
+++ setup.py	2021-02-01 12:47:03 +0000
@@ -40,8 +40,9 @@
           'Programming Language :: Python',
           ],
       install_requires = [
+          'amqp>=2.0.0',
           'oops_datedir_repo',
-          'oops_amqp',
+          'oops_amqp>=0.1.0',
           ],
       extras_require = dict(
           test=[

=== modified file 'versions.cfg'
--- versions.cfg	2012-09-03 23:29:34 +0000
+++ versions.cfg	2021-02-01 12:47:03 +0000
@@ -2,7 +2,7 @@
 versions = versions
 
 [versions]
-amqplib = 0.6.1
+amqp = 2.6.1
 bson = 0.3.2
 elementtree = 1.2.6-20050316
 fixtures = 0.3.6
@@ -15,7 +15,7 @@
 lazr.uri = 1.0.2
 oauth = 1.0.1
 oops = 0.0.13
-oops-amqp = 0.0.7
+oops-amqp = 0.1.0
 oops-datedir-repo = 0.0.18
 pymongo = 2.1.1
 pytz = 2011n


Follow ups