launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29268
[Merge] lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp
Colin Watson has proposed merging lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp.
Commit message:
Port test suite to pytest.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/python-oops-amqp/pytest-full/+merge/430882
This turns out to be about three times faster on my system, probably mainly due to using a session-scoped server fixture.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp.
=== modified file 'NEWS'
--- NEWS 2022-09-30 22:53:58 +0000
+++ NEWS 2022-10-03 10:15:17 +0000
@@ -8,6 +8,7 @@
- Switch from buildout to tox. (Colin Watson)
- Switch to declarative setup.cfg. (Colin Watson)
+- Port test suite to pytest. (Colin Watson)
0.1.0
-----
=== modified file 'README'
--- README 2022-09-30 22:41:01 +0000
+++ README 2022-10-03 10:15:17 +0000
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
GNU Lesser General Public License version 3 (see the file LICENSE).
-The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
+The oops_amqp package provides an AMQP OOPS (https://pypi.org/project/oops)
publisher, and a small daemon that listens on amqp for OOPS reports and
republishes them (into a supplied publisher). The OOPS framework permits
falling back to additional publishers if AMQP is down.
@@ -29,20 +29,16 @@
* bson
-* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
+* oops (https://pypi.org/project/oops) 0.0.11 or newer.
* amqp
Testing Dependencies
====================
-* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo)
-
-* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture)
-
-* testresources (http://pypi.python.org/pypi/testresources)
-
-* testtools (http://pypi.python.org/pypi/testtools)
+* pytest (https://pypi.org/project/pytest)
+
+* rabbitfixture (https://pypi.org/project/rabbitfixture)
Usage
=====
=== removed file 'oops_amqp/tests/__init__.py'
--- oops_amqp/tests/__init__.py 2018-03-12 11:48:55 +0000
+++ oops_amqp/tests/__init__.py 1970-01-01 00:00:00 +0000
@@ -1,135 +0,0 @@
-# Copyright (c) 2011, Canonical Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published by
-# the Free Software Foundation, version 3 only.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-# GNU Lesser General Public License version 3 (see the file LICENSE).
-
-"""Tests for oops_amqp."""
-
-from __future__ import absolute_import, print_function
-
-from unittest import TestLoader
-
-import amqp
-from fixtures import Fixture
-from rabbitfixture.server import RabbitServer
-import testtools
-from testresources import (
- _get_result,
- FixtureResource,
- OptimisingTestSuite,
- setUpResources,
- tearDownResources,
- )
-
-from oops_amqp.utils import close_ignoring_connection_errors
-
-__all__ = [
- 'ChannelFixture',
- 'QueueFixture',
- 'test_suite',
- 'TestCase',
- ]
-
-
-class QueueFixture(Fixture):
- """Create an exchange with a subscribed queue on an AMQP instance.
-
- The exchange and queue are made durable (to permit testing with amqp server
- restarts) and non-auto-delete (to permit dropping the test connection and
- restablishing it without losing the config (e.g. server restarts).
-
- If the server is restarted, the channel may need to be updated before
- teardown, or the teardown will not be able to delete the exchange and queue.
-
- Possibly wants to live in rabbitfixture, or a amqpfixture.
- """
-
- def __init__(self, channel, unique_string_factory):
- """Create a QueueFixture.
-
- :param channel: An amqplib channel to request the exchange and queue
- over.
- :param unique_string_factory: A helper that will return a (process
- lifetime scope) unique string.
- """
- self.channel = channel
- self.unique_string_factory = unique_string_factory
-
- def setUp(self):
- super(QueueFixture, self).setUp()
- self.exchange_name = self.unique_string_factory()
- self.channel.exchange_declare(
- exchange=self.exchange_name, type="fanout", durable=True,
- auto_delete=False)
- self.addCleanup(self.delete_exchange)
- self.queue_name, _, _ = self.channel.queue_declare(
- durable=True, auto_delete=False)
- self.addCleanup(self.delete_queue)
- self.channel.queue_bind(self.queue_name, self.exchange_name)
-
- def delete_queue(self):
- self.channel.queue_delete(self.queue_name)
-
- def delete_exchange(self):
- self.channel.exchange_delete(self.exchange_name)
-
-
-class ChannelFixture(Fixture):
- """Create an AMQP connection and channel for tests.
-
- :ivar connection: an amqplib connection.
- :ivar channel: an amqplib channel
- """
-
- def __init__(self, connection_factory):
- super(ChannelFixture, self).__init__()
- self.connection_factory = connection_factory
-
- def setUp(self):
- super(ChannelFixture, self).setUp()
- self.connection = self.connection_factory()
- self.connection.connect()
- self.addCleanup(close_ignoring_connection_errors, self.connection)
- self.channel = self.connection.channel()
- self.addCleanup(close_ignoring_connection_errors, self.channel)
-
-
-class TestCase(testtools.TestCase):
- """Subclass to start a RabbitMQ server."""
-
- resources = [('rabbit', FixtureResource(RabbitServer()))]
-
- def setUp(self):
- super(TestCase, self).setUp()
- # ResourcedTestCase handles teardown in the wrong order for us (we
- # need to ensure that the RabbitServer fixture is only cleaned up
- # after any other fixtures registered by individual tests), so we
- # imitate it manually.
- result = _get_result()
- setUpResources(self, self.resources, result)
- self.addCleanup(tearDownResources, self, self.resources, result)
-
- def connection_factory(self):
- """When called, return an amqplib connection."""
- return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
- self.rabbit.config.port), userid="guest", password="guest",
- virtual_host="/")
-
-
-def test_suite():
- test_mod_names = [
- 'publisher',
- 'receiver',
- ]
- return OptimisingTestSuite(TestLoader().loadTestsFromNames(
- ['oops_amqp.tests.test_' + name for name in test_mod_names]))
=== modified file 'setup.cfg'
--- setup.cfg 2022-09-30 22:53:58 +0000
+++ setup.cfg 2022-10-03 10:15:17 +0000
@@ -36,7 +36,6 @@
[options.extras_require]
test =
+ pytest
rabbitfixture
six
- testresources
- testtools
=== renamed directory 'oops_amqp/tests' => 'tests'
=== added file 'tests/conftest.py'
--- tests/conftest.py 1970-01-01 00:00:00 +0000
+++ tests/conftest.py 2022-10-03 10:15:17 +0000
@@ -0,0 +1,104 @@
+# Copyright (C) 2011-2022, Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, version 3 only.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# GNU Lesser General Public License version 3 (see the file LICENSE).
+
+import itertools
+from functools import partial
+
+import amqp
+import pytest
+from rabbitfixture.server import RabbitServer
+
+from oops_amqp.utils import close_ignoring_connection_errors
+
+
+_unique_id_gen = itertools.count(1)
+
+
+@pytest.fixture
+def get_unique_integer():
+ def get():
+ return next(_unique_id_gen)
+
+ return get
+
+
+@pytest.fixture
+def get_unique_string(get_unique_integer):
+ def get(prefix):
+ return "%s-%d" % (prefix, get_unique_integer())
+
+ return get
+
+
+@pytest.fixture(scope="session")
+def rabbit():
+ rabbit = RabbitServer()
+ rabbit.setUp()
+ try:
+ yield rabbit
+ finally:
+ rabbit.cleanUp()
+
+
+@pytest.fixture
+def connection_factory(rabbit):
+ return partial(
+ amqp.Connection,
+ host="%s:%s" % (rabbit.config.hostname, rabbit.config.port),
+ userid="guest",
+ password="guest",
+ virtual_host="/",
+ )
+
+
+@pytest.fixture
+def connection(connection_factory):
+ connection = connection_factory()
+ connection.connect()
+ try:
+ yield connection
+ finally:
+ close_ignoring_connection_errors(connection)
+
+
+@pytest.fixture
+def channel(connection):
+ channel = connection.channel()
+ try:
+ yield channel
+ finally:
+ close_ignoring_connection_errors(channel)
+
+
+@pytest.fixture
+def exchange_name(channel, get_unique_string):
+ exchange_name = get_unique_string("exchange")
+ channel.exchange_declare(
+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+ )
+ try:
+ yield exchange_name
+ finally:
+ channel.exchange_delete(exchange_name)
+
+
+@pytest.fixture
+def queue_name(channel, exchange_name):
+ queue_name, _, _ = channel.queue_declare(durable=True, auto_delete=False)
+ try:
+ channel.queue_bind(queue_name, exchange_name)
+ yield queue_name
+ finally:
+ channel.queue_delete(queue_name)
=== modified file 'tests/test_publisher.py'
--- oops_amqp/tests/test_publisher.py 2018-03-12 11:48:55 +0000
+++ tests/test_publisher.py 2022-10-03 10:15:17 +0000
@@ -22,118 +22,125 @@
from oops_amqp import (
anybson as bson,
Publisher,
- )
-from oops_amqp.tests import (
- ChannelFixture,
- QueueFixture,
- TestCase,
- )
-
-
-class TestPublisher(TestCase):
-
- def test_publish_inherit_id(self):
- # OOPS id's can be set outside of Publisher().
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- publisher = Publisher(self.connection_factory, queue.exchange_name, "",
- inherit_id=True)
- reference_oops = {'id': 'kept', 'akey': 'avalue'}
- oops = dict(reference_oops)
- expected_id = 'kept'
- oops_ids = publisher(oops)
- # Publication returns the oops ID allocated.
- self.assertEqual([expected_id], oops_ids)
- # The oops should not be altered by publication.
- self.assertEqual(reference_oops, oops)
- # The received OOPS should have the ID embedded and be a bson dict.
- def check_oops(msg):
- body = msg.body
- if not isinstance(body, bytes):
- body = body.encode(msg.content_encoding or 'UTF-8')
- self.assertEqual(reference_oops, bson.loads(body))
- channel.basic_ack(msg.delivery_tag)
- channel.basic_cancel(queue.queue_name)
- channel.basic_consume(
- queue.queue_name, callback=check_oops,
- consumer_tag=queue.queue_name)
- channel.connection.drain_events()
-
- def test_publish(self):
- # Publishing an oops sends it to the exchange, making a connection as
- # it goes.
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- publisher = Publisher(self.connection_factory, queue.exchange_name, "")
- reference_oops = {'akey': 'avalue'}
- oops = dict(reference_oops)
- id_bson = md5(bson.dumps(oops)).hexdigest()
- expected_id = "OOPS-%s" % id_bson
- oops_ids = publisher(oops)
- # Publication returns the oops ID allocated.
- self.assertEqual([expected_id], oops_ids)
- # The oops should not be altered by publication.
- self.assertEqual(reference_oops, oops)
- # The received OOPS should have the ID embedded and be a bson dict.
- expected_oops = dict(reference_oops)
- expected_oops['id'] = oops_ids[0]
- def check_oops(msg):
- body = msg.body
- if not isinstance(body, bytes):
- body = body.encode(msg.content_encoding or 'UTF-8')
- self.assertEqual(expected_oops, bson.loads(body))
- channel.basic_ack(msg.delivery_tag)
- channel.basic_cancel(queue.queue_name)
- channel.basic_consume(
- queue.queue_name, callback=check_oops,
- consumer_tag=queue.queue_name)
- channel.connection.drain_events()
-
- def test_publish_amqp_already_down(self):
- # If amqp is down when a connection is attempted, None is returned to
- # indicate that publication failed - and publishing after it comes back
- # works.
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- # The private method use and the restart of rabbit before it gets torn
- # down are bugs in rabbitfixture that will be fixed in a future
- # release.
- self.rabbit.runner._stop()
- try:
- publisher = Publisher(
- self.connection_factory, queue.exchange_name, "")
- oops = {'akey': 42}
- self.assertEqual([], publisher(oops))
- finally:
- self.rabbit.runner._start()
- connection = self.connection_factory()
- connection.connect()
- queue.channel = connection.channel()
- self.assertNotEqual([], publisher(oops))
-
- def test_publish_amqp_down_after_use(self):
- # If amqp goes down after its been successfully used, None is returned
- # to indicate that publication failed - and publishing after it comes
- # back works.
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- publisher = Publisher(self.connection_factory, queue.exchange_name, "")
- oops = {'akey': 42}
- self.assertNotEqual(None, publisher(oops))
- # The private method use and the restart of rabbit before it gets torn
- # down are bugs in rabbitfixture that will be fixed in a future
- # release.
- self.rabbit.runner._stop()
- try:
- self.assertEqual([], publisher(oops))
- finally:
- self.rabbit.runner._start()
- connection = self.connection_factory()
- connection.connect()
- queue.channel = connection.channel()
- self.assertNotEqual([], publisher(oops))
-
+)
+
+
+def test_publish_inherit_id(
+ connection_factory, channel, exchange_name, queue_name
+):
+ # OOPS IDs can be set outside of Publisher().
+ publisher = Publisher(
+ connection_factory, exchange_name, "", inherit_id=True
+ )
+ reference_oops = {"id": "kept", "akey": "avalue"}
+ oops = dict(reference_oops)
+ expected_id = "kept"
+ oops_ids = publisher(oops)
+ # Publication returns the oops ID allocated.
+ assert oops_ids == [expected_id]
+ # The oops should not be altered by publication.
+ assert oops == reference_oops
+
+ # The received OOPS should have the ID embedded and be a bson dict.
+ def check_oops(msg):
+ body = msg.body
+ if not isinstance(body, bytes):
+ body = body.encode(msg.content_encoding or "UTF-8")
+ assert bson.loads(body) == reference_oops
+ channel.basic_ack(msg.delivery_tag)
+ channel.basic_cancel(queue_name)
+
+ channel.basic_consume(
+ queue_name, callback=check_oops, consumer_tag=queue_name
+ )
+ channel.connection.drain_events()
+
+
+def test_publish(connection_factory, channel, exchange_name, queue_name):
+ # Publishing an oops sends it to the exchange, making a connection as
+ # it goes.
+ publisher = Publisher(connection_factory, exchange_name, "")
+ reference_oops = {"akey": "avalue"}
+ oops = dict(reference_oops)
+ id_bson = md5(bson.dumps(oops)).hexdigest()
+ expected_id = "OOPS-%s" % id_bson
+ oops_ids = publisher(oops)
+ # Publication returns the oops ID allocated.
+ assert oops_ids == [expected_id]
+ # The oops should not be altered by publication.
+ assert oops == reference_oops
+ # The received OOPS should have the ID embedded and be a bson dict.
+ expected_oops = dict(reference_oops)
+ expected_oops["id"] = oops_ids[0]
+
+ def check_oops(msg):
+ body = msg.body
+ if not isinstance(body, bytes):
+ body = body.encode(msg.content_encoding or "UTF-8")
+ assert bson.loads(body) == expected_oops
+ channel.basic_ack(msg.delivery_tag)
+ channel.basic_cancel(queue_name)
+
+ channel.basic_consume(
+ queue_name, callback=check_oops, consumer_tag=queue_name
+ )
+ channel.connection.drain_events()
+
+
+def test_publish_amqp_already_down(
+ rabbit, connection_factory, channel, get_unique_string
+):
+ # If amqp is down when a connection is attempted, None is returned to
+ # indicate that publication failed - and publishing after it comes back
+ # works.
+ # The private method use and the restart of rabbit before it gets torn
+ # down are bugs in rabbitfixture that will be fixed in a future
+ # release.
+ exchange_name = get_unique_string("exchange")
+ channel.exchange_declare(
+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+ )
+ try:
+ rabbit.runner._stop()
+ try:
+ publisher = Publisher(connection_factory, exchange_name, "")
+ oops = {"akey": 42}
+ assert publisher(oops) == []
+ finally:
+ rabbit.runner._start()
+ connection = connection_factory()
+ connection.connect()
+ channel = connection.channel()
+ assert publisher(oops) != []
+ finally:
+ channel.exchange_delete(exchange_name)
+
+
+def test_publish_amqp_down_after_use(
+ rabbit, connection_factory, channel, get_unique_string
+):
+ # If amqp goes down after its been successfully used, None is returned
+ # to indicate that publication failed - and publishing after it comes
+ # back works.
+ exchange_name = get_unique_string("exchange")
+ channel.exchange_declare(
+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
+ )
+ try:
+ publisher = Publisher(connection_factory, exchange_name, "")
+ oops = {"akey": 42}
+ assert publisher(oops) is not None
+ # The private method use and the restart of rabbit before it gets
+ # torn down are bugs in rabbitfixture that will be fixed in a future
+ # release.
+ rabbit.runner._stop()
+ try:
+ assert publisher(oops) == []
+ finally:
+ rabbit.runner._start()
+ connection = connection_factory()
+ connection.connect()
+ channel = connection.channel()
+ assert publisher(oops) != []
+ finally:
+ channel.exchange_delete(exchange_name)
=== modified file 'tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2018-03-12 11:48:55 +0000
+++ tests/test_receiver.py 2022-10-03 10:15:17 +0000
@@ -27,158 +27,176 @@
from oops_amqp import (
anybson as bson,
Receiver,
- )
-from oops_amqp.tests import (
- ChannelFixture,
- QueueFixture,
- TestCase,
- )
-
-
-class TestReceiver(TestCase):
-
- def test_stop_on_sentinel(self):
- # A sentinel can be used to stop the receiver (useful for testing).
- reports = []
- def capture(report):
- reports.append(report)
- return [report['id']]
- expected_report = {'id': 'foo', 'otherkey': 42}
- message = amqp.Message(bson.dumps(expected_report))
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- channel.basic_publish(
- message, queue.exchange_name, routing_key="")
- sentinel = b"xxx"
- channel.basic_publish(
- amqp.Message(sentinel), queue.exchange_name, routing_key="")
- config = Config()
- config.publisher = capture
- receiver = Receiver(config, self.connection_factory, queue.queue_name)
- receiver.sentinel = sentinel
- receiver.run_forever()
- self.assertEqual([expected_report], reports)
-
- def test_stop_via_stopping(self):
- # Setting the stopping field should stop the run_forever loop.
- reports = []
- def capture(report):
- reports.append(report)
- return [report['id']]
- expected_report = {'id': 'foo', 'otherkey': 42}
- message = amqp.Message(bson.dumps(expected_report))
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- channel.basic_publish(
- message, queue.exchange_name, routing_key="")
- config = Config()
- config.publisher = capture
- # We don't want to loop forever: patch the channel so that after one
- # call to wait (which will get our injected message) the loop will shut
- # down.
- def patching_factory():
- connection = self.connection_factory()
- old_channel = connection.channel
- def new_channel():
- result = old_channel()
- old_wait = result.wait
- def new_wait(*args, **kwargs):
- receiver.stopping = True
- return old_wait(*args, **kwargs)
- result.wait = new_wait
- return result
- connection.channel = new_channel
- return connection
- receiver = Receiver(config, patching_factory, queue.queue_name)
- receiver.run_forever()
- self.assertEqual([expected_report], reports)
-
- def test_run_forever(self):
- # run_forever subscribes and then calls drain_events in a loop.
- calls = []
- class FakeChannel:
- def __init__(self, calls):
- self.calls = calls
- self.is_open = True
- def basic_consume(self, queue_name, callback=None):
- self.calls.append(('basic_consume', queue_name, callback))
- return 'tag'
- def basic_cancel(self, tag):
- self.calls.append(('basic_cancel', tag))
- def close(self):
- self.is_open = False
- class FakeConnection:
- def __init__(self, calls):
- self.calls = calls
- def connect(self):
- pass
- def channel(self):
- return FakeChannel(calls)
- def drain_events(self, timeout=None):
- self.calls.append(('drain_events', timeout))
- if len(self.calls) > 2:
- receiver.stopping = True
- def close(self):
- pass
- receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
- receiver.run_forever()
- self.assertEqual(
- [('basic_consume', 'foo', receiver.handle_report),
- ('drain_events', 1),
- ('drain_events', 1),
- ('basic_cancel', 'tag')],
- calls)
-
- def test_tolerates_amqp_trouble(self):
- # If the AMQP server is unavailable for a short period, the receiver
- # will automatically reconnect.
- # Break a connection to raise socket.error (which we know from the
- # publisher tests is what leaks through when rabbit is shutdown).
- # We raise it the first time on each amqp method call.
- reports = []
- def capture(report):
- reports.append(report)
- return [report['id']]
- expected_report = {'id': 'foo', 'otherkey': 42}
- message = amqp.Message(bson.dumps(expected_report))
- channel = self.useFixture(
- ChannelFixture(self.connection_factory)).channel
- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
- channel.basic_publish(message, queue.exchange_name, routing_key="")
- config = Config()
- config.publisher = capture
- state = {}
- def error_once(func):
- def wrapped(*args, **kwargs):
- func_ref = six.get_function_code(func)
- if func_ref in state:
- return func(*args, **kwargs)
- else:
- state[func_ref] = True
- # Use EPIPE because the close() code checks that (though
- # the rest doesn't)
- raise socket.error(errno.EPIPE, "booyah")
- return wrapped
+)
+
+
+def test_stop_on_sentinel(
+ connection_factory, channel, exchange_name, queue_name
+):
+ # A sentinel can be used to stop the receiver (useful for testing).
+ reports = []
+
+ def capture(report):
+ reports.append(report)
+ return [report["id"]]
+
+ expected_report = {"id": "foo", "otherkey": 42}
+ message = amqp.Message(bson.dumps(expected_report))
+ channel.basic_publish(message, exchange_name, routing_key="")
+ sentinel = b"xxx"
+ channel.basic_publish(
+ amqp.Message(sentinel), exchange_name, routing_key=""
+ )
+ config = Config()
+ config.publisher = capture
+ receiver = Receiver(config, connection_factory, queue_name)
+ receiver.sentinel = sentinel
+ receiver.run_forever()
+ assert reports == [expected_report]
+
+
+def test_stop_via_stopping(
+ connection_factory, channel, exchange_name, queue_name
+):
+ # Setting the stopping field should stop the run_forever loop.
+ reports = []
+
+ def capture(report):
+ reports.append(report)
+ return [report["id"]]
+
+ expected_report = {"id": "foo", "otherkey": 42}
+ message = amqp.Message(bson.dumps(expected_report))
+ channel.basic_publish(message, exchange_name, routing_key="")
+ config = Config()
+ config.publisher = capture
+ # We don't want to loop forever: patch the channel so that after one
+ # call to wait (which will get our injected message) the loop will shut
+ # down.
+ def patching_factory():
+ connection = connection_factory()
+ old_channel = connection.channel
+
+ def new_channel():
+ result = old_channel()
+ old_wait = result.wait
+
+ def new_wait(*args, **kwargs):
+ receiver.stopping = True
+ return old_wait(*args, **kwargs)
+
+ result.wait = new_wait
+ return result
+
+ connection.channel = new_channel
+ return connection
+
+ receiver = Receiver(config, patching_factory, queue_name)
+ receiver.run_forever()
+ assert reports == [expected_report]
+
+
+def test_run_forever():
+ # run_forever subscribes and then calls drain_events in a loop.
+ calls = []
+
+ class FakeChannel:
+ def __init__(self, calls):
+ self.calls = calls
+ self.is_open = True
+
+ def basic_consume(self, queue_name, callback=None):
+ self.calls.append(("basic_consume", queue_name, callback))
+ return "tag"
+
+ def basic_cancel(self, tag):
+ self.calls.append(("basic_cancel", tag))
+
+ def close(self):
+ self.is_open = False
+
+ class FakeConnection:
+ def __init__(self, calls):
+ self.calls = calls
+
+ def connect(self):
+ pass
+
+ def channel(self):
+ return FakeChannel(calls)
+
+ def drain_events(self, timeout=None):
+ self.calls.append(("drain_events", timeout))
+ if len(self.calls) > 2:
+ receiver.stopping = True
+
+ def close(self):
+ pass
+
+ receiver = Receiver(None, lambda: FakeConnection(calls), "foo")
+ receiver.run_forever()
+ assert calls == [
+ ("basic_consume", "foo", receiver.handle_report),
+ ("drain_events", 1),
+ ("drain_events", 1),
+ ("basic_cancel", "tag"),
+ ]
+
+
+def test_tolerates_amqp_trouble(
+ connection_factory, channel, exchange_name, queue_name
+):
+ # If the AMQP server is unavailable for a short period, the receiver
+ # will automatically reconnect.
+ # Break a connection to raise socket.error (which we know from the
+ # publisher tests is what leaks through when rabbit is shutdown).
+ # We raise it the first time on each amqp method call.
+ reports = []
+
+ def capture(report):
+ reports.append(report)
+ return [report["id"]]
+
+ expected_report = {"id": "foo", "otherkey": 42}
+ message = amqp.Message(bson.dumps(expected_report))
+ channel.basic_publish(message, exchange_name, routing_key="")
+ config = Config()
+ config.publisher = capture
+ state = {}
+
+ def error_once(func):
+ def wrapped(*args, **kwargs):
+ func_ref = six.get_function_code(func)
+ if func_ref in state:
+ return func(*args, **kwargs)
+ else:
+ state[func_ref] = True
+ # Use EPIPE because the close() code checks that (though
+ # the rest doesn't)
+ raise socket.error(errno.EPIPE, "booyah")
+
+ return wrapped
+
+ @error_once
+ def patching_factory():
+ connection = connection_factory()
+ old_channel = connection.channel
+
@error_once
- def patching_factory():
- connection = self.connection_factory()
- old_channel = connection.channel
- @error_once
- def new_channel():
- result = old_channel()
- result.basic_consume = error_once(result.basic_consume)
- result.basic_cancel = error_once(result.basic_cancel)
- result.close = error_once(result.close)
- return result
- connection.channel = new_channel
- connection.drain_events = error_once(connection.drain_events)
- connection.close = error_once(connection.close)
- return connection
- receiver = Receiver(config, patching_factory, queue.queue_name)
- receiver.sentinel = b"arhh"
- channel.basic_publish(
- amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
- receiver.run_forever()
- self.assertEqual([expected_report], reports)
+ def new_channel():
+ result = old_channel()
+ result.basic_consume = error_once(result.basic_consume)
+ result.basic_cancel = error_once(result.basic_cancel)
+ result.close = error_once(result.close)
+ return result
+
+ connection.channel = new_channel
+ connection.drain_events = error_once(connection.drain_events)
+ connection.close = error_once(connection.close)
+ return connection
+
+ receiver = Receiver(config, patching_factory, queue_name)
+ receiver.sentinel = b"arhh"
+ channel.basic_publish(amqp.Message(b"arhh"), exchange_name, routing_key="")
+ receiver.run_forever()
+ assert reports == [expected_report]
=== modified file 'tox.ini'
--- tox.ini 2022-09-30 22:41:01 +0000
+++ tox.ini 2022-10-03 10:15:17 +0000
@@ -11,6 +11,5 @@
[testenv]
deps =
.[test]
- pytest
commands =
pytest {posargs}