← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp

 

Colin Watson has proposed merging lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp.

Commit message:
Port test suite to pytest.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-amqp/pytest-full/+merge/430882

This turns out to be about three times faster on my system, probably mainly due to using a session-scoped server fixture.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS	2022-09-30 22:53:58 +0000
+++ NEWS	2022-10-03 10:15:17 +0000
@@ -8,6 +8,7 @@
 
 - Switch from buildout to tox. (Colin Watson)
 - Switch to declarative setup.cfg. (Colin Watson)
+- Port test suite to pytest. (Colin Watson)
 
 0.1.0
 -----

=== modified file 'README'
--- README	2022-09-30 22:41:01 +0000
+++ README	2022-10-03 10:15:17 +0000
@@ -17,7 +17,7 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
     GNU Lesser General Public License version 3 (see the file LICENSE).
 
-The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
+The oops_amqp package provides an AMQP OOPS (https://pypi.org/project/oops)
 publisher, and a small daemon that listens on amqp for OOPS reports and
 republishes them (into a supplied publisher). The OOPS framework permits
 falling back to additional publishers if AMQP is down.
@@ -29,20 +29,16 @@
 
 * bson
 
-* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
+* oops (https://pypi.org/project/oops) 0.0.11 or newer.
 
 * amqp
 
 Testing Dependencies
 ====================
 
-* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo)
-
-* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture)
-
-* testresources (http://pypi.python.org/pypi/testresources)
-
-* testtools (http://pypi.python.org/pypi/testtools)
+* pytest (https://pypi.org/project/pytest)
+
+* rabbitfixture (https://pypi.org/project/rabbitfixture)
 
 Usage
 =====

=== removed file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py	2018-03-12 11:48:55 +0000
+++ oops_amqp/tests/__init__.py	1970-01-01 00:00:00 +0000
@@ -1,135 +0,0 @@
-# Copyright (c) 2011, Canonical Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published by
-# the Free Software Foundation, version 3 only.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-# GNU Lesser General Public License version 3 (see the file LICENSE).
-
-"""Tests for oops_amqp."""
-
-from __future__ import absolute_import, print_function
-
-from unittest import TestLoader
-
-import amqp
-from fixtures import Fixture
-from rabbitfixture.server import RabbitServer
-import testtools
-from testresources import (
-    _get_result,
-    FixtureResource,
-    OptimisingTestSuite,
-    setUpResources,
-    tearDownResources,
-    )
-
-from oops_amqp.utils import close_ignoring_connection_errors
-
-__all__ = [
-    'ChannelFixture',
-    'QueueFixture',
-    'test_suite',
-    'TestCase',
-    ]
-
-
-class QueueFixture(Fixture):
-    """Create an exchange with a subscribed queue on an AMQP instance.
-
-    The exchange and queue are made durable (to permit testing with amqp server
-    restarts) and non-auto-delete (to permit dropping the test connection and
-    restablishing it without losing the config (e.g. server restarts).
-
-    If the server is restarted, the channel may need to be updated before
-    teardown, or the teardown will not be able to delete the exchange and queue.
-    
-    Possibly wants to live in rabbitfixture, or a amqpfixture.
-    """
-
-    def __init__(self, channel, unique_string_factory):
-        """Create a QueueFixture.
-
-        :param channel: An amqplib channel to request the exchange and queue
-            over.
-        :param unique_string_factory: A helper that will return a (process
-            lifetime scope) unique string.
-        """
-        self.channel = channel
-        self.unique_string_factory = unique_string_factory
-
-    def setUp(self):
-        super(QueueFixture, self).setUp()
-        self.exchange_name = self.unique_string_factory()
-        self.channel.exchange_declare(
-            exchange=self.exchange_name, type="fanout", durable=True,
-            auto_delete=False)
-        self.addCleanup(self.delete_exchange)
-        self.queue_name, _, _ = self.channel.queue_declare(
-            durable=True, auto_delete=False)
-        self.addCleanup(self.delete_queue)
-        self.channel.queue_bind(self.queue_name, self.exchange_name)
-        
-    def delete_queue(self):
-        self.channel.queue_delete(self.queue_name)
-
-    def delete_exchange(self):
-        self.channel.exchange_delete(self.exchange_name)
-
-
-class ChannelFixture(Fixture):
-    """Create an AMQP connection and channel for tests.
-    
-    :ivar connection: an amqplib connection.
-    :ivar channel: an amqplib channel
-    """
-
-    def __init__(self, connection_factory):
-        super(ChannelFixture, self).__init__()
-        self.connection_factory = connection_factory
-
-    def setUp(self):
-        super(ChannelFixture, self).setUp()
-        self.connection = self.connection_factory()
-        self.connection.connect()
-        self.addCleanup(close_ignoring_connection_errors, self.connection)
-        self.channel = self.connection.channel()
-        self.addCleanup(close_ignoring_connection_errors, self.channel)
-
-
-class TestCase(testtools.TestCase):
-    """Subclass to start a RabbitMQ server."""
-
-    resources = [('rabbit', FixtureResource(RabbitServer()))]
-
-    def setUp(self):
-        super(TestCase, self).setUp()
-        # ResourcedTestCase handles teardown in the wrong order for us (we
-        # need to ensure that the RabbitServer fixture is only cleaned up
-        # after any other fixtures registered by individual tests), so we
-        # imitate it manually.
-        result = _get_result()
-        setUpResources(self, self.resources, result)
-        self.addCleanup(tearDownResources, self, self.resources, result)
-
-    def connection_factory(self):
-        """When called, return an amqplib connection."""
-        return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
-            self.rabbit.config.port), userid="guest", password="guest",
-            virtual_host="/")
-
-
-def test_suite():
-    test_mod_names = [
-        'publisher',
-        'receiver',
-        ]
-    return OptimisingTestSuite(TestLoader().loadTestsFromNames(
-        ['oops_amqp.tests.test_' + name for name in test_mod_names]))

=== modified file 'setup.cfg'
--- setup.cfg	2022-09-30 22:53:58 +0000
+++ setup.cfg	2022-10-03 10:15:17 +0000
@@ -36,7 +36,6 @@
 
 [options.extras_require]
 test =
+    pytest
     rabbitfixture
     six
-    testresources
-    testtools

=== renamed directory 'oops_amqp/tests' => 'tests'
=== added file 'tests/conftest.py'
--- tests/conftest.py	1970-01-01 00:00:00 +0000
+++ tests/conftest.py	2022-10-03 10:15:17 +0000
@@ -0,0 +1,104 @@
+# Copyright (C) 2011-2022, Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, version 3 only.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# GNU Lesser General Public License version 3 (see the file LICENSE).
+
+import itertools
+from functools import partial
+
+import amqp
+import pytest
+from rabbitfixture.server import RabbitServer
+
+from oops_amqp.utils import close_ignoring_connection_errors
+
+
+_unique_id_gen = itertools.count(1)
+
+
+@pytest.fixture
+def get_unique_integer():
+    def get():
+        return next(_unique_id_gen)
+
+    return get
+
+
+@pytest.fixture
+def get_unique_string(get_unique_integer):
+    def get(prefix):
+        return "%s-%d" % (prefix, get_unique_integer())
+
+    return get
+
+
+@pytest.fixture(scope="session")
+def rabbit():
+    rabbit = RabbitServer()
+    rabbit.setUp()
+    try:
+        yield rabbit
+    finally:
+        rabbit.cleanUp()
+
+
+@pytest.fixture
+def connection_factory(rabbit):
+    return partial(
+        amqp.Connection,
+        host="%s:%s" % (rabbit.config.hostname, rabbit.config.port),
+        userid="guest",
+        password="guest",
+        virtual_host="/",
+    )
+
+
+@pytest.fixture
+def connection(connection_factory):
+    connection = connection_factory()
+    connection.connect()
+    try:
+        yield connection
+    finally:
+        close_ignoring_connection_errors(connection)
+
+
+@pytest.fixture
+def channel(connection):
+    channel = connection.channel()
+    try:
+        yield channel
+    finally:
+        close_ignoring_connection_errors(channel)
+
+
+@pytest.fixture
+def exchange_name(channel, get_unique_string):
+    exchange_name = get_unique_string("exchange")
+    channel.exchange_declare(
+        exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+    )
+    try:
+        yield exchange_name
+    finally:
+        channel.exchange_delete(exchange_name)
+
+
+@pytest.fixture
+def queue_name(channel, exchange_name):
+    queue_name, _, _ = channel.queue_declare(durable=True, auto_delete=False)
+    try:
+        channel.queue_bind(queue_name, exchange_name)
+        yield queue_name
+    finally:
+        channel.queue_delete(queue_name)

=== modified file 'tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py	2018-03-12 11:48:55 +0000
+++ tests/test_publisher.py	2022-10-03 10:15:17 +0000
@@ -22,118 +22,125 @@
 from oops_amqp import (
     anybson as bson,
     Publisher,
-    )
-from oops_amqp.tests import (
-    ChannelFixture,
-    QueueFixture,
-    TestCase,
-    )
-
-
-class TestPublisher(TestCase):
-
-    def test_publish_inherit_id(self):
-        # OOPS id's can be set outside of Publisher().
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        publisher = Publisher(self.connection_factory, queue.exchange_name, "",
-            inherit_id=True)
-        reference_oops = {'id': 'kept', 'akey': 'avalue'}
-        oops = dict(reference_oops)
-        expected_id = 'kept'
-        oops_ids = publisher(oops)
-        # Publication returns the oops ID allocated.
-        self.assertEqual([expected_id], oops_ids)
-        # The oops should not be altered by publication.
-        self.assertEqual(reference_oops, oops)
-        # The received OOPS should have the ID embedded and be a bson dict.
-        def check_oops(msg):
-            body = msg.body
-            if not isinstance(body, bytes):
-                body = body.encode(msg.content_encoding or 'UTF-8')
-            self.assertEqual(reference_oops, bson.loads(body))
-            channel.basic_ack(msg.delivery_tag)
-            channel.basic_cancel(queue.queue_name)
-        channel.basic_consume(
-            queue.queue_name, callback=check_oops,
-            consumer_tag=queue.queue_name)
-        channel.connection.drain_events()
-
-    def test_publish(self):
-        # Publishing an oops sends it to the exchange, making a connection as
-        # it goes.
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        publisher = Publisher(self.connection_factory, queue.exchange_name, "")
-        reference_oops = {'akey': 'avalue'}
-        oops = dict(reference_oops)
-        id_bson = md5(bson.dumps(oops)).hexdigest()
-        expected_id = "OOPS-%s" % id_bson
-        oops_ids = publisher(oops)
-        # Publication returns the oops ID allocated.
-        self.assertEqual([expected_id], oops_ids)
-        # The oops should not be altered by publication.
-        self.assertEqual(reference_oops, oops)
-        # The received OOPS should have the ID embedded and be a bson dict.
-        expected_oops = dict(reference_oops)
-        expected_oops['id'] = oops_ids[0]
-        def check_oops(msg):
-            body = msg.body
-            if not isinstance(body, bytes):
-                body = body.encode(msg.content_encoding or 'UTF-8')
-            self.assertEqual(expected_oops, bson.loads(body))
-            channel.basic_ack(msg.delivery_tag)
-            channel.basic_cancel(queue.queue_name)
-        channel.basic_consume(
-            queue.queue_name, callback=check_oops,
-            consumer_tag=queue.queue_name)
-        channel.connection.drain_events()
-
-    def test_publish_amqp_already_down(self):
-        # If amqp is down when a connection is attempted, None is returned to
-        # indicate that publication failed - and publishing after it comes back
-        # works.
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        # The private method use and the restart of rabbit before it gets torn
-        # down are bugs in rabbitfixture that will be fixed in a future
-        # release.
-        self.rabbit.runner._stop()
-        try:
-            publisher = Publisher(
-                self.connection_factory, queue.exchange_name, "")
-            oops = {'akey': 42}
-            self.assertEqual([], publisher(oops))
-        finally:
-            self.rabbit.runner._start()
-            connection = self.connection_factory()
-            connection.connect()
-            queue.channel = connection.channel()
-        self.assertNotEqual([], publisher(oops))
-
-    def test_publish_amqp_down_after_use(self):
-        # If amqp goes down after its been successfully used, None is returned
-        # to indicate that publication failed - and publishing after it comes
-        # back works.
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        publisher = Publisher(self.connection_factory, queue.exchange_name, "")
-        oops = {'akey': 42}
-        self.assertNotEqual(None, publisher(oops))
-        # The private method use and the restart of rabbit before it gets torn
-        # down are bugs in rabbitfixture that will be fixed in a future
-        # release.
-        self.rabbit.runner._stop()
-        try:
-            self.assertEqual([], publisher(oops))
-        finally:
-            self.rabbit.runner._start()
-            connection = self.connection_factory()
-            connection.connect()
-            queue.channel = connection.channel()
-        self.assertNotEqual([], publisher(oops))
-
+)
+
+
+def test_publish_inherit_id(
+    connection_factory, channel, exchange_name, queue_name
+):
+    # OOPS IDs can be set outside of Publisher().
+    publisher = Publisher(
+        connection_factory, exchange_name, "", inherit_id=True
+    )
+    reference_oops = {"id": "kept", "akey": "avalue"}
+    oops = dict(reference_oops)
+    expected_id = "kept"
+    oops_ids = publisher(oops)
+    # Publication returns the oops ID allocated.
+    assert oops_ids == [expected_id]
+    # The oops should not be altered by publication.
+    assert oops == reference_oops
+
+    # The received OOPS should have the ID embedded and be a bson dict.
+    def check_oops(msg):
+        body = msg.body
+        if not isinstance(body, bytes):
+            body = body.encode(msg.content_encoding or "UTF-8")
+        assert bson.loads(body) == reference_oops
+        channel.basic_ack(msg.delivery_tag)
+        channel.basic_cancel(queue_name)
+
+    channel.basic_consume(
+        queue_name, callback=check_oops, consumer_tag=queue_name
+    )
+    channel.connection.drain_events()
+
+
+def test_publish(connection_factory, channel, exchange_name, queue_name):
+    # Publishing an oops sends it to the exchange, making a connection as
+    # it goes.
+    publisher = Publisher(connection_factory, exchange_name, "")
+    reference_oops = {"akey": "avalue"}
+    oops = dict(reference_oops)
+    id_bson = md5(bson.dumps(oops)).hexdigest()
+    expected_id = "OOPS-%s" % id_bson
+    oops_ids = publisher(oops)
+    # Publication returns the oops ID allocated.
+    assert oops_ids == [expected_id]
+    # The oops should not be altered by publication.
+    assert oops == reference_oops
+    # The received OOPS should have the ID embedded and be a bson dict.
+    expected_oops = dict(reference_oops)
+    expected_oops["id"] = oops_ids[0]
+
+    def check_oops(msg):
+        body = msg.body
+        if not isinstance(body, bytes):
+            body = body.encode(msg.content_encoding or "UTF-8")
+        assert bson.loads(body) == expected_oops
+        channel.basic_ack(msg.delivery_tag)
+        channel.basic_cancel(queue_name)
+
+    channel.basic_consume(
+        queue_name, callback=check_oops, consumer_tag=queue_name
+    )
+    channel.connection.drain_events()
+
+
+def test_publish_amqp_already_down(
+    rabbit, connection_factory, channel, get_unique_string
+):
+    # If amqp is down when a connection is attempted, None is returned to
+    # indicate that publication failed - and publishing after it comes back
+    # works.
+    # The private method use and the restart of rabbit before it gets torn
+    # down are bugs in rabbitfixture that will be fixed in a future
+    # release.
+    exchange_name = get_unique_string("exchange")
+    channel.exchange_declare(
+        exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+    )
+    try:
+        rabbit.runner._stop()
+        try:
+            publisher = Publisher(connection_factory, exchange_name, "")
+            oops = {"akey": 42}
+            assert publisher(oops) == []
+        finally:
+            rabbit.runner._start()
+            connection = connection_factory()
+            connection.connect()
+            channel = connection.channel()
+        assert publisher(oops) != []
+    finally:
+        channel.exchange_delete(exchange_name)
+
+
+def test_publish_amqp_down_after_use(
+    rabbit, connection_factory, channel, get_unique_string
+):
+    # If amqp goes down after its been successfully used, None is returned
+    # to indicate that publication failed - and publishing after it comes
+    # back works.
+    exchange_name = get_unique_string("exchange")
+    channel.exchange_declare(
+        exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+    )
+    try:
+        publisher = Publisher(connection_factory, exchange_name, "")
+        oops = {"akey": 42}
+        assert publisher(oops) is not None
+        # The private method use and the restart of rabbit before it gets
+        # torn down are bugs in rabbitfixture that will be fixed in a future
+        # release.
+        rabbit.runner._stop()
+        try:
+            assert publisher(oops) == []
+        finally:
+            rabbit.runner._start()
+            connection = connection_factory()
+            connection.connect()
+            channel = connection.channel()
+        assert publisher(oops) != []
+    finally:
+        channel.exchange_delete(exchange_name)

=== modified file 'tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py	2018-03-12 11:48:55 +0000
+++ tests/test_receiver.py	2022-10-03 10:15:17 +0000
@@ -27,158 +27,176 @@
 from oops_amqp import (
     anybson as bson,
     Receiver,
-    )
-from oops_amqp.tests import (
-    ChannelFixture,
-    QueueFixture,
-    TestCase,
-    )
-
-
-class TestReceiver(TestCase):
-
-    def test_stop_on_sentinel(self):
-        # A sentinel can be used to stop the receiver (useful for testing).
-        reports = []
-        def capture(report):
-            reports.append(report)
-            return [report['id']]
-        expected_report = {'id': 'foo', 'otherkey': 42}
-        message = amqp.Message(bson.dumps(expected_report))
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        channel.basic_publish(
-            message, queue.exchange_name, routing_key="")
-        sentinel = b"xxx"
-        channel.basic_publish(
-            amqp.Message(sentinel), queue.exchange_name, routing_key="")
-        config = Config()
-        config.publisher = capture
-        receiver = Receiver(config, self.connection_factory, queue.queue_name)
-        receiver.sentinel = sentinel
-        receiver.run_forever()
-        self.assertEqual([expected_report], reports)
-
-    def test_stop_via_stopping(self):
-        # Setting the stopping field should stop the run_forever loop.
-        reports = []
-        def capture(report):
-            reports.append(report)
-            return [report['id']]
-        expected_report = {'id': 'foo', 'otherkey': 42}
-        message = amqp.Message(bson.dumps(expected_report))
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        channel.basic_publish(
-            message, queue.exchange_name, routing_key="")
-        config = Config()
-        config.publisher = capture
-        # We don't want to loop forever: patch the channel so that after one
-        # call to wait (which will get our injected message) the loop will shut
-        # down.
-        def patching_factory():
-            connection = self.connection_factory()
-            old_channel = connection.channel
-            def new_channel():
-                result = old_channel()
-                old_wait = result.wait
-                def new_wait(*args, **kwargs):
-                    receiver.stopping = True
-                    return old_wait(*args, **kwargs)
-                result.wait = new_wait
-                return result
-            connection.channel = new_channel
-            return connection
-        receiver = Receiver(config, patching_factory, queue.queue_name)
-        receiver.run_forever()
-        self.assertEqual([expected_report], reports)
-
-    def test_run_forever(self):
-        # run_forever subscribes and then calls drain_events in a loop.
-        calls = []
-        class FakeChannel:
-            def __init__(self, calls):
-                self.calls = calls
-                self.is_open = True
-            def basic_consume(self, queue_name, callback=None):
-                self.calls.append(('basic_consume', queue_name, callback))
-                return 'tag'
-            def basic_cancel(self, tag):
-                self.calls.append(('basic_cancel', tag))
-            def close(self):
-                self.is_open = False
-        class FakeConnection:
-            def __init__(self, calls):
-                self.calls = calls
-            def connect(self):
-                pass
-            def channel(self):
-                return FakeChannel(calls)
-            def drain_events(self, timeout=None):
-                self.calls.append(('drain_events', timeout))
-                if len(self.calls) > 2:
-                    receiver.stopping = True
-            def close(self):
-                pass
-        receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
-        receiver.run_forever()
-        self.assertEqual(
-            [('basic_consume', 'foo', receiver.handle_report),
-             ('drain_events', 1),
-             ('drain_events', 1),
-             ('basic_cancel', 'tag')],
-            calls)
-
-    def test_tolerates_amqp_trouble(self):
-        # If the AMQP server is unavailable for a short period, the receiver
-        # will automatically reconnect.
-        # Break a connection to raise socket.error (which we know from the 
-        # publisher tests is what leaks through when rabbit is shutdown).
-        # We raise it the first time on each amqp method call.
-        reports = []
-        def capture(report):
-            reports.append(report)
-            return [report['id']]
-        expected_report = {'id': 'foo', 'otherkey': 42}
-        message = amqp.Message(bson.dumps(expected_report))
-        channel = self.useFixture(
-            ChannelFixture(self.connection_factory)).channel
-        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
-        channel.basic_publish(message, queue.exchange_name, routing_key="")
-        config = Config()
-        config.publisher = capture
-        state = {}
-        def error_once(func):
-            def wrapped(*args, **kwargs):
-                func_ref = six.get_function_code(func)
-                if func_ref in state:
-                    return func(*args, **kwargs)
-                else:
-                    state[func_ref] = True
-                    # Use EPIPE because the close() code checks that (though
-                    # the rest doesn't)
-                    raise socket.error(errno.EPIPE, "booyah")
-            return wrapped
+)
+
+
+def test_stop_on_sentinel(
+    connection_factory, channel, exchange_name, queue_name
+):
+    # A sentinel can be used to stop the receiver (useful for testing).
+    reports = []
+
+    def capture(report):
+        reports.append(report)
+        return [report["id"]]
+
+    expected_report = {"id": "foo", "otherkey": 42}
+    message = amqp.Message(bson.dumps(expected_report))
+    channel.basic_publish(message, exchange_name, routing_key="")
+    sentinel = b"xxx"
+    channel.basic_publish(
+        amqp.Message(sentinel), exchange_name, routing_key=""
+    )
+    config = Config()
+    config.publisher = capture
+    receiver = Receiver(config, connection_factory, queue_name)
+    receiver.sentinel = sentinel
+    receiver.run_forever()
+    assert reports == [expected_report]
+
+
+def test_stop_via_stopping(
+    connection_factory, channel, exchange_name, queue_name
+):
+    # Setting the stopping field should stop the run_forever loop.
+    reports = []
+
+    def capture(report):
+        reports.append(report)
+        return [report["id"]]
+
+    expected_report = {"id": "foo", "otherkey": 42}
+    message = amqp.Message(bson.dumps(expected_report))
+    channel.basic_publish(message, exchange_name, routing_key="")
+    config = Config()
+    config.publisher = capture
+    # We don't want to loop forever: patch the channel so that after one
+    # call to wait (which will get our injected message) the loop will shut
+    # down.
+    def patching_factory():
+        connection = connection_factory()
+        old_channel = connection.channel
+
+        def new_channel():
+            result = old_channel()
+            old_wait = result.wait
+
+            def new_wait(*args, **kwargs):
+                receiver.stopping = True
+                return old_wait(*args, **kwargs)
+
+            result.wait = new_wait
+            return result
+
+        connection.channel = new_channel
+        return connection
+
+    receiver = Receiver(config, patching_factory, queue_name)
+    receiver.run_forever()
+    assert reports == [expected_report]
+
+
+def test_run_forever():
+    # run_forever subscribes and then calls drain_events in a loop.
+    calls = []
+
+    class FakeChannel:
+        def __init__(self, calls):
+            self.calls = calls
+            self.is_open = True
+
+        def basic_consume(self, queue_name, callback=None):
+            self.calls.append(("basic_consume", queue_name, callback))
+            return "tag"
+
+        def basic_cancel(self, tag):
+            self.calls.append(("basic_cancel", tag))
+
+        def close(self):
+            self.is_open = False
+
+    class FakeConnection:
+        def __init__(self, calls):
+            self.calls = calls
+
+        def connect(self):
+            pass
+
+        def channel(self):
+            return FakeChannel(calls)
+
+        def drain_events(self, timeout=None):
+            self.calls.append(("drain_events", timeout))
+            if len(self.calls) > 2:
+                receiver.stopping = True
+
+        def close(self):
+            pass
+
+    receiver = Receiver(None, lambda: FakeConnection(calls), "foo")
+    receiver.run_forever()
+    assert calls == [
+        ("basic_consume", "foo", receiver.handle_report),
+        ("drain_events", 1),
+        ("drain_events", 1),
+        ("basic_cancel", "tag"),
+    ]
+
+
+def test_tolerates_amqp_trouble(
+    connection_factory, channel, exchange_name, queue_name
+):
+    # If the AMQP server is unavailable for a short period, the receiver
+    # will automatically reconnect.
+    # Break a connection to raise socket.error (which we know from the
+    # publisher tests is what leaks through when rabbit is shutdown).
+    # We raise it the first time on each amqp method call.
+    reports = []
+
+    def capture(report):
+        reports.append(report)
+        return [report["id"]]
+
+    expected_report = {"id": "foo", "otherkey": 42}
+    message = amqp.Message(bson.dumps(expected_report))
+    channel.basic_publish(message, exchange_name, routing_key="")
+    config = Config()
+    config.publisher = capture
+    state = {}
+
+    def error_once(func):
+        def wrapped(*args, **kwargs):
+            func_ref = six.get_function_code(func)
+            if func_ref in state:
+                return func(*args, **kwargs)
+            else:
+                state[func_ref] = True
+                # Use EPIPE because the close() code checks that (though
+                # the rest doesn't)
+                raise socket.error(errno.EPIPE, "booyah")
+
+        return wrapped
+
+    @error_once
+    def patching_factory():
+        connection = connection_factory()
+        old_channel = connection.channel
+
         @error_once
-        def patching_factory():
-            connection = self.connection_factory()
-            old_channel = connection.channel
-            @error_once
-            def new_channel():
-                result = old_channel()
-                result.basic_consume = error_once(result.basic_consume)
-                result.basic_cancel = error_once(result.basic_cancel)
-                result.close = error_once(result.close)
-                return result
-            connection.channel = new_channel
-            connection.drain_events = error_once(connection.drain_events)
-            connection.close = error_once(connection.close)
-            return connection
-        receiver = Receiver(config, patching_factory, queue.queue_name)
-        receiver.sentinel = b"arhh"
-        channel.basic_publish(
-            amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
-        receiver.run_forever()
-        self.assertEqual([expected_report], reports)
+        def new_channel():
+            result = old_channel()
+            result.basic_consume = error_once(result.basic_consume)
+            result.basic_cancel = error_once(result.basic_cancel)
+            result.close = error_once(result.close)
+            return result
+
+        connection.channel = new_channel
+        connection.drain_events = error_once(connection.drain_events)
+        connection.close = error_once(connection.close)
+        return connection
+
+    receiver = Receiver(config, patching_factory, queue_name)
+    receiver.sentinel = b"arhh"
+    channel.basic_publish(amqp.Message(b"arhh"), exchange_name, routing_key="")
+    receiver.run_forever()
+    assert reports == [expected_report]

=== modified file 'tox.ini'
--- tox.ini	2022-09-30 22:41:01 +0000
+++ tox.ini	2022-10-03 10:15:17 +0000
@@ -11,6 +11,5 @@
 [testenv]
 deps =
     .[test]
-    pytest
 commands =
     pytest {posargs}