launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32670
[Merge] ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into ~tushar5526/lp-codeimport:deprecate-bzr
Tushar Gupta has proposed merging ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into ~tushar5526/lp-codeimport:deprecate-bzr.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~tushar5526/lp-codeimport/+git/lp-codeimport/+merge/488158
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into ~tushar5526/lp-codeimport:deprecate-bzr.
diff --git a/configs/development/codeimport-lazr.conf b/configs/development/codeimport-lazr.conf
index 30dc168..0696766 100644
--- a/configs/development/codeimport-lazr.conf
+++ b/configs/development/codeimport-lazr.conf
@@ -23,7 +23,5 @@ error_dir: /var/tmp/lperr
ca_certificates_path: /etc/ssl/certs/ca-certificates.crt
[rabbitmq]
-host: localhost:56720
-userid: guest
-password: guest
-virtual_host: /
+launch: True
+broker_urls: amqp://guest:guest@localhost:56720//
diff --git a/configs/testrunner/codeimport-lazr.conf b/configs/testrunner/codeimport-lazr.conf
index c91cff0..07c8580 100644
--- a/configs/testrunner/codeimport-lazr.conf
+++ b/configs/testrunner/codeimport-lazr.conf
@@ -10,7 +10,5 @@ oops_prefix: T
error_dir: /var/tmp/lperr.test
[rabbitmq]
-host: none
-userid: none
-password: none
-virtual_host: none
+launch: False
+broker_urls: none
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index 3041d6c..6d2d380 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -114,12 +114,26 @@ urlfetch_timeout: 30
[rabbitmq]
-# The host:port at which RabbitMQ is listening.
+# Should RabbitMQ be launched by default?
+# datatype: boolean
+launch: False
+# The URL at which RabbitMQ is listening (in the form
+# amqp://USERNAME:PASSWORD@HOSTNAME:PORT/VIRTUAL_HOST), or a space-separated
+# list of such URLs to use round-robin failover between them.
+broker_urls: none
+# The host:port at which RabbitMQ is listening (ignored if broker_urls is
+# set).
# datatype: string
host: none
+# The username to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
# datatype: string
userid: none
+# The password to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
# datatype: string
password: none
+# The virtual host to use when connecting to RabbitMQ (ignored if
+# broker_urls is set).
# datatype: string
virtual_host: none
diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py
index c3fa0e3..c73efac 100644
--- a/lib/lp/services/messaging/interfaces.py
+++ b/lib/lp/services/messaging/interfaces.py
@@ -3,20 +3,10 @@
"""Messaging interfaces."""
-__metaclass__ = type
__all__ = [
- 'IMessageConsumer',
- 'IMessageProducer',
- 'IMessageSession',
- 'MessagingException',
- 'MessagingUnavailable',
- 'QueueEmpty',
- 'QueueNotFound',
- ]
-
-
-from zope.interface import Interface
-from zope.schema import Bool
+ "MessagingException",
+ "MessagingUnavailable",
+]
class MessagingException(Exception):
@@ -25,77 +15,3 @@ class MessagingException(Exception):
class MessagingUnavailable(MessagingException):
"""Messaging systems are not available."""
-
-
-class QueueNotFound(MessagingException):
- """Raised if the queue was not found."""
-
-
-class QueueEmpty(MessagingException):
- """Raised if there are no queued messages on a non-blocking read."""
-
-
-class IMessageSession(Interface):
-
- is_connected = Bool(
- u"Whether the session is connected to the messaging system.")
-
- def connect():
- """Connect to the messaging system.
-
- If the session is already connected this should be a no-op.
- """
-
- def disconnect():
- """Disconnect from the messaging system.
-
- If the session is already disconnected this should be a no-op.
- """
-
- def flush():
- """Run deferred tasks."""
-
- def finish():
- """Flush the session and reset."""
-
- def reset():
- """Reset the session."""
-
- def defer(func, *args, **kwargs):
- """Schedule something to happen when this session is finished."""
-
- def getProducer(name):
- """Get a `IMessageProducer` associated with this session."""
-
- def getConsumer(name):
- """Get a `IMessageConsumer` associated with this session."""
-
-
-class IMessageConsumer(Interface):
-
- def receive(blocking=True):
- """Receive data from the queue.
-
- :raises EmptyQueue: If non-blocking and the queue is empty.
- """
-
-
-class IMessageProducer(Interface):
-
- def send(data):
- """Serialize `data` into JSON and send it to the queue on commit."""
-
- def sendNow(data):
- """Serialize `data` into JSON and send it to the queue immediately."""
-
- def associateConsumer(consumer):
- """Make the consumer receive messages from this producer on commit.
-
- :param consumer: An `IMessageConsumer`
- """
-
- def associateConsumerNow(consumer):
- """Make the consumer receive messages from this producer.
-
- :param consumer: An `IMessageConsumer`
- """
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 417d3a6..bfdf60f 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -3,67 +3,28 @@
"""An API for messaging systems in Launchpad, e.g. RabbitMQ."""
-__metaclass__ = type
__all__ = [
"connect",
"is_configured",
- "session",
- "unreliable_session",
- ]
+]
-from collections import deque
-from functools import partial
-import json
-import socket
-import sys
-import threading
-import time
-
-import amqp
-import transaction
-from transaction._transaction import Status as TransactionStatus
-from zope.interface import implementer
+import kombu
+from lazr.config import as_host_port
from lp.services.config import config
-from lp.services.messaging.interfaces import (
- IMessageConsumer,
- IMessageProducer,
- IMessageSession,
- MessagingUnavailable,
- QueueEmpty,
- QueueNotFound,
- )
-
-
-LAUNCHPAD_EXCHANGE = "launchpad-exchange"
-
-
-@implementer(transaction.interfaces.ISynchronizer)
-class RabbitSessionTransactionSync:
-
- def __init__(self, session):
- self.session = session
-
- def newTransaction(self, txn):
- pass
-
- def beforeCompletion(self, txn):
- pass
-
- def afterCompletion(self, txn):
- if txn.status == TransactionStatus.COMMITTED:
- self.session.finish()
- else:
- self.session.reset()
+from lp.services.messaging.interfaces import MessagingUnavailable
def is_configured():
"""Return True if rabbit looks to be configured."""
+ if config.rabbitmq.broker_urls is not None:
+ return True
return not (
- config.rabbitmq.host is None or
- config.rabbitmq.userid is None or
- config.rabbitmq.password is None or
- config.rabbitmq.virtual_host is None)
+ config.rabbitmq.host is None
+ or config.rabbitmq.userid is None
+ or config.rabbitmq.password is None
+ or config.rabbitmq.virtual_host is None
+ )
def connect():
@@ -73,216 +34,16 @@ def connect():
"""
if not is_configured():
raise MessagingUnavailable("Incomplete configuration")
- connection = amqp.Connection(
- host=config.rabbitmq.host, userid=config.rabbitmq.userid,
- password=config.rabbitmq.password,
- virtual_host=config.rabbitmq.virtual_host)
+ if config.rabbitmq.broker_urls is not None:
+ connection = kombu.Connection(config.rabbitmq.broker_urls.split())
+ else:
+ hostname, port = as_host_port(config.rabbitmq.host, default_port=5672)
+ connection = kombu.Connection(
+ hostname=hostname,
+ userid=config.rabbitmq.userid,
+ password=config.rabbitmq.password,
+ virtual_host=config.rabbitmq.virtual_host,
+ port=port,
+ )
connection.connect()
return connection
-
-
-@implementer(IMessageSession)
-class RabbitSession(threading.local):
-
- exchange = LAUNCHPAD_EXCHANGE
-
- def __init__(self):
- super(RabbitSession, self).__init__()
- self._connection = None
- self._deferred = deque()
- # Maintain sessions according to transaction boundaries. Keep a strong
- # reference to the sync because the transaction manager does not. We
- # need one per thread (definining it here is enough to ensure that).
- self._sync = RabbitSessionTransactionSync(self)
- transaction.manager.registerSynch(self._sync)
-
- @property
- def is_connected(self):
- """See `IMessageSession`."""
- return self._connection is not None and self._connection.connected
-
- def connect(self):
- """See `IMessageSession`.
-
- Open a connection for this thread if necessary. Connections cannot be
- shared between threads.
- """
- if self._connection is None or not self._connection.connected:
- self._connection = connect()
- return self._connection
-
- def disconnect(self):
- """See `IMessageSession`."""
- if self._connection is not None:
- try:
- self._connection.close()
- except socket.error:
- # Socket error is fine; the connection is still closed.
- pass
- finally:
- self._connection = None
-
- def flush(self):
- """See `IMessageSession`."""
- tasks = self._deferred
- while len(tasks) != 0:
- tasks.popleft()()
-
- def finish(self):
- """See `IMessageSession`."""
- try:
- self.flush()
- finally:
- self.reset()
-
- def reset(self):
- """See `IMessageSession`."""
- self._deferred.clear()
- self.disconnect()
-
- def defer(self, func, *args, **kwargs):
- """See `IMessageSession`."""
- self._deferred.append(partial(func, *args, **kwargs))
-
- def getProducer(self, name):
- """See `IMessageSession`."""
- return RabbitRoutingKey(self, name)
-
- def getConsumer(self, name):
- """See `IMessageSession`."""
- return RabbitQueue(self, name)
-
-
-# Per-thread sessions.
-session = RabbitSession()
-session_finish_handler = (
- lambda event: session.finish())
-
-
-class RabbitUnreliableSession(RabbitSession):
- """An "unreliable" `RabbitSession`.
-
- Unreliable in this case means that certain errors in deferred tasks are
- silently suppressed. This means that services can continue to function
- even in the absence of a running and fully functional message queue.
-
- Other types of errors are also caught because we don't want this
- subsystem to destabilise other parts of Launchpad but we nonetheless
- record OOPses for these.
-
- XXX: We only suppress MessagingUnavailable for now because we want to
- monitor this closely before we add more exceptions to the
- suppressed_errors list. Potential candidates are `MessagingException`,
- `IOError` or `amqp.AMQPException`.
- """
-
- suppressed_errors = (
- MessagingUnavailable,
- )
-
- def finish(self):
- """See `IMessageSession`.
-
- Suppresses errors listed in `suppressed_errors`. Also suppresses
- other errors but files an oops report for these.
- """
- try:
- super(RabbitUnreliableSession, self).finish()
- except self.suppressed_errors:
- pass
- except Exception:
- from lp.services.webapp import errorlog
- errorlog.globalErrorUtility.raising(sys.exc_info())
-
-
-# Per-thread "unreliable" sessions.
-unreliable_session = RabbitUnreliableSession()
-unreliable_session_finish_handler = (
- lambda event: unreliable_session.finish())
-
-
-class RabbitMessageBase:
- """Base class for all RabbitMQ messaging."""
-
- def __init__(self, session):
- self.session = IMessageSession(session)
- self._channel = None
-
- @property
- def channel(self):
- if self._channel is None or not self._channel.is_open:
- connection = self.session.connect()
- self._channel = connection.channel()
- self._channel.exchange_declare(
- self.session.exchange, "direct", durable=False,
- auto_delete=False, nowait=False)
- return self._channel
-
-
-@implementer(IMessageProducer)
-class RabbitRoutingKey(RabbitMessageBase):
- """A RabbitMQ data origination point."""
-
- def __init__(self, session, routing_key):
- super(RabbitRoutingKey, self).__init__(session)
- self.key = routing_key
-
- def associateConsumer(self, consumer):
- """Only receive messages for requested routing key."""
- self.session.defer(self.associateConsumerNow, consumer)
-
- def associateConsumerNow(self, consumer):
- """Only receive messages for requested routing key."""
- # The queue will be auto-deleted 5 minutes after its last use.
- # http://www.rabbitmq.com/extensions.html#queue-leases
- self.channel.queue_declare(
- consumer.name, nowait=False, auto_delete=False,
- arguments={"x-expires": 300000}) # 5 minutes.
- self.channel.queue_bind(
- queue=consumer.name, exchange=self.session.exchange,
- routing_key=self.key, nowait=False)
-
- def send(self, data):
- """See `IMessageProducer`."""
- self.session.defer(self.sendNow, data)
-
- def sendNow(self, data):
- """Immediately send a message to the broker."""
- json_data = json.dumps(data)
- msg = amqp.Message(json_data)
- self.channel.basic_publish(
- exchange=self.session.exchange,
- routing_key=self.key, msg=msg)
-
-
-@implementer(IMessageConsumer)
-class RabbitQueue(RabbitMessageBase):
- """A RabbitMQ Queue."""
-
- def __init__(self, session, name):
- super(RabbitQueue, self).__init__(session)
- self.name = name
-
- def receive(self, timeout=0.0):
- """Pull a message from the queue.
-
- :param timeout: Wait a maximum of `timeout` seconds before giving up,
- trying at least once.
- :raises QueueEmpty: if the timeout passes.
- """
- endtime = time.time() + timeout
- while True:
- try:
- message = self.channel.basic_get(self.name)
- if message is None:
- if time.time() > endtime:
- raise QueueEmpty()
- time.sleep(0.1)
- else:
- self.channel.basic_ack(message.delivery_tag)
- return json.loads(message.body)
- except amqp.ChannelError as error:
- if error.reply_code == 404:
- raise QueueNotFound()
- else:
- raise
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
index b25c073..f4d8f7c 100644
--- a/lib/lp/services/messaging/tests/test_rabbit.py
+++ b/lib/lp/services/messaging/tests/test_rabbit.py
@@ -1,417 +1,121 @@
-# Copyright 2011 Canonical Ltd. This software is licensed under the
+# Copyright 2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-"""Messaging utility tests."""
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesStructure
-__metaclass__ = type
+from lp.services.config import config
+from lp.services.messaging import rabbit
+from lp.testing import TestCase
+from lp.testing.layers import BaseLayer, RabbitMQLayer
-from functools import partial
-from itertools import count
-import socket
-import six
-from testtools.testcase import ExpectedException
-import transaction
-from transaction._transaction import Status as TransactionStatus
+class TestIsConfigured(TestCase):
+ layer = BaseLayer
-from lp.services.messaging.interfaces import (
- IMessageConsumer,
- IMessageProducer,
- IMessageSession,
- MessagingUnavailable,
- QueueEmpty,
- QueueNotFound,
- )
-from lp.services.messaging.rabbit import (
- RabbitMessageBase,
- RabbitQueue,
- RabbitRoutingKey,
- RabbitSession,
- RabbitSessionTransactionSync,
- RabbitUnreliableSession,
- session as global_session,
- unreliable_session as global_unreliable_session,
- )
-from lp.testing import (
- monkey_patch,
- TestCase,
- )
-from lp.testing.fakemethod import FakeMethod
-from lp.testing.faketransaction import FakeTransaction
-from lp.testing.layers import RabbitMQLayer
-from lp.testing.matchers import Provides
+ def test_unconfigured(self):
+ self.assertFalse(rabbit.is_configured())
+ def test_broker_url(self):
+ self.pushConfig(
+ "rabbitmq", broker_urls="amqp://guest:guest@rabbitmq.example//"
+ )
+ self.assertTrue(rabbit.is_configured())
-# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
-# of distinct names.
-queue_names = ("queue.%d" % num for num in count(1))
-key_names = ("key.%d" % num for num in count(1))
+ def test_partial_compat(self):
+ self.pushConfig("rabbitmq", host="rabbitmq.example")
+ self.assertFalse(rabbit.is_configured())
+ def test_full_compat(self):
+ self.pushConfig(
+ "rabbitmq",
+ host="rabbitmq.example",
+ userid="guest",
+ password="guest",
+ virtual_host="/",
+ )
+ self.assertTrue(rabbit.is_configured())
-class FakeRabbitSession:
- def __init__(self):
- self.log = []
-
- def finish(self):
- self.log.append("finish")
-
- def reset(self):
- self.log.append("reset")
-
-
-class TestRabbitSessionTransactionSync(TestCase):
-
- def test_interface(self):
- self.assertThat(
- RabbitSessionTransactionSync(None),
- Provides(transaction.interfaces.ISynchronizer))
-
- def test_afterCompletion_COMMITTED(self):
- txn = FakeTransaction()
- txn.status = TransactionStatus.COMMITTED
- fake_session = FakeRabbitSession()
- sync = RabbitSessionTransactionSync(fake_session)
- sync.afterCompletion(txn)
- self.assertEqual(["finish"], fake_session.log)
-
- def test_afterCompletion_ACTIVE(self):
- txn = FakeTransaction()
- txn.status = TransactionStatus.ACTIVE
- fake_session = FakeRabbitSession()
- sync = RabbitSessionTransactionSync(fake_session)
- sync.afterCompletion(txn)
- self.assertEqual(["reset"], fake_session.log)
-
-
-class RabbitTestCase(TestCase):
-
- layer = RabbitMQLayer
-
- def tearDown(self):
- super(RabbitTestCase, self).tearDown()
- global_session.reset()
- global_unreliable_session.reset()
-
-
-class TestRabbitSession(RabbitTestCase):
-
- session_factory = RabbitSession
-
- def test_interface(self):
- session = self.session_factory()
- self.assertThat(session, Provides(IMessageSession))
-
- def test_connect(self):
- session = self.session_factory()
- self.assertFalse(session.is_connected)
- connection = session.connect()
- self.assertTrue(session.is_connected)
- self.assertIs(connection, session._connection)
-
- def test_connect_with_incomplete_configuration(self):
- self.pushConfig("rabbitmq", host="none")
- session = self.session_factory()
- with ExpectedException(
- MessagingUnavailable, "Incomplete configuration"):
- session.connect()
-
- def test_disconnect(self):
- session = self.session_factory()
- session.connect()
- session.disconnect()
- self.assertFalse(session.is_connected)
-
- def test_disconnect_with_error(self):
- session = self.session_factory()
- session.connect()
- old_close = session._connection.close
-
- def new_close(*args, **kwargs):
- old_close(*args, **kwargs)
- raise socket.error
-
- with monkey_patch(session._connection, close=new_close):
- session.disconnect()
- self.assertFalse(session.is_connected)
-
- def test_is_connected(self):
- # is_connected is False once a connection has been closed.
- session = self.session_factory()
- session.connect()
- # Close the connection without using disconnect().
- session._connection.close()
- self.assertFalse(session.is_connected)
-
- def test_defer(self):
- task = lambda foo, bar: None
- session = self.session_factory()
- session.defer(task, "foo", bar="baz")
- self.assertEqual(1, len(session._deferred))
- [deferred_task] = session._deferred
- self.assertIsInstance(deferred_task, partial)
- self.assertIs(task, deferred_task.func)
- self.assertEqual(("foo",), deferred_task.args)
- self.assertEqual({"bar": "baz"}, deferred_task.keywords)
-
- def test_flush(self):
- # RabbitSession.flush() runs deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.flush()
- self.assertEqual(["task"], log)
- self.assertEqual([], list(session._deferred))
- self.assertTrue(session.is_connected)
-
- def test_reset(self):
- # RabbitSession.reset() resets session variables and does not run
- # deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.reset()
- self.assertEqual([], log)
- self.assertEqual([], list(session._deferred))
- self.assertFalse(session.is_connected)
-
- def test_finish(self):
- # RabbitSession.finish() resets session variables after running
- # deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.finish()
- self.assertEqual(["task"], log)
- self.assertEqual([], list(session._deferred))
- self.assertFalse(session.is_connected)
-
- def test_getProducer(self):
- session = self.session_factory()
- producer = session.getProducer("foo")
- self.assertIsInstance(producer, RabbitRoutingKey)
- self.assertIs(session, producer.session)
- self.assertEqual("foo", producer.key)
-
- def test_getConsumer(self):
- session = self.session_factory()
- consumer = session.getConsumer("foo")
- self.assertIsInstance(consumer, RabbitQueue)
- self.assertIs(session, consumer.session)
- self.assertEqual("foo", consumer.name)
-
-
-class TestRabbitUnreliableSession(TestRabbitSession):
-
- session_factory = RabbitUnreliableSession
+class TestConnect(TestCase):
layer = RabbitMQLayer
- def setUp(self):
- super(TestRabbitUnreliableSession, self).setUp()
- self.prev_oops = self.getOops()
-
- def getOops(self):
- try:
- self.oops_capture.sync()
- return self.oopses[-1]
- except IndexError:
- return None
-
- def assertNoOops(self):
- oops_report = self.getOops()
- self.assertEqual(repr(self.prev_oops), repr(oops_report))
-
- def assertOops(self, text_in_oops):
- oops_report = self.getOops()
- self.assertNotEqual(
- repr(self.prev_oops), repr(oops_report), 'No OOPS reported!')
- self.assertIn(text_in_oops, str(oops_report))
-
- def _test_finish_suppresses_exception(self, exception):
- # Simple helper to test that the given exception is suppressed
- # when raised by finish().
- session = self.session_factory()
- session.defer(FakeMethod(failure=exception))
- session.finish() # Look, no exceptions!
-
- def test_finish_suppresses_MessagingUnavailable(self):
- self._test_finish_suppresses_exception(
- MessagingUnavailable('Messaging borked.'))
- self.assertNoOops()
-
- def test_finish_suppresses_other_errors_with_oopses(self):
- exception = Exception("That hent worked.")
- self._test_finish_suppresses_exception(exception)
- self.assertOops(str(exception))
-
-
-class TestRabbitMessageBase(RabbitTestCase):
-
- def test_session(self):
- base = RabbitMessageBase(global_session)
- self.assertIs(global_session, base.session)
-
- def test_channel(self):
- # Referencing the channel property causes the session to connect.
- base = RabbitMessageBase(global_session)
- self.assertFalse(base.session.is_connected)
- channel = base.channel
- self.assertTrue(base.session.is_connected)
- self.assertIsNot(None, channel)
- # The same channel is returned every time.
- self.assertIs(channel, base.channel)
-
- def test_channel_session_closed(self):
- # When the session is disconnected the channel is thrown away too.
- base = RabbitMessageBase(global_session)
- channel1 = base.channel
- base.session.disconnect()
- channel2 = base.channel
- self.assertNotEqual(channel1, channel2)
-
-
-class TestRabbitRoutingKey(RabbitTestCase):
-
- def test_interface(self):
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- self.assertThat(routing_key, Provides(IMessageProducer))
-
- def test_associateConsumer(self):
- # associateConsumer() only associates the consumer at transaction
- # commit time. However, order is preserved.
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumer(consumer)
- # The session is still not connected.
- self.assertFalse(global_session.is_connected)
- routing_key.sendNow('now')
- routing_key.send('later')
- # The queue is not found because the consumer has not yet been
- # associated with the routing key and the queue declared.
- self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
- transaction.commit()
- # Now that the transaction has been committed, the consumer is
- # associated, and receives the deferred message.
- self.assertEqual('later', consumer.receive(timeout=2))
-
- def test_associateConsumerNow(self):
- # associateConsumerNow() associates the consumer right away.
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
- routing_key.sendNow('now')
- routing_key.send('later')
- # There is already something in the queue.
- self.assertEqual('now', consumer.receive(timeout=2))
- transaction.commit()
- # Now that the transaction has been committed there is another item in
- # the queue.
- self.assertEqual('later', consumer.receive(timeout=2))
-
- def test_send(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(90, 100):
- routing_key.send(data)
-
- routing_key.sendNow('sync')
- # There is nothing in the queue except the sync we just sent.
- self.assertEqual('sync', consumer.receive(timeout=2))
-
- # Messages get sent on commit
- transaction.commit()
- for data in range(90, 100):
- self.assertEqual(data, consumer.receive())
-
- # There are no more messages. They have all been consumed.
- routing_key.sendNow('sync')
- self.assertEqual('sync', consumer.receive(timeout=2))
-
- def test_sendNow(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(50, 60):
- routing_key.sendNow(data)
- received_data = consumer.receive(timeout=2)
- self.assertEqual(data, received_data)
-
- def test_does_not_connect_session_immediately(self):
- # RabbitRoutingKey does not connect the session until necessary.
- RabbitRoutingKey(global_session, next(key_names))
- self.assertFalse(global_session.is_connected)
-
-
-class TestRabbitQueue(RabbitTestCase):
-
- def test_interface(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- self.assertThat(consumer, Provides(IMessageConsumer))
-
- def test_receive(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(55, 65):
- routing_key.sendNow(data)
- self.assertEqual(data, consumer.receive(timeout=2))
-
- # All the messages received were consumed.
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
- # New connections to the queue see an empty queue too.
- consumer.session.disconnect()
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
- def test_does_not_connect_session_immediately(self):
- # RabbitQueue does not connect the session until necessary.
- RabbitQueue(global_session, next(queue_names))
- self.assertFalse(global_session.is_connected)
-
-
-class TestRabbit(RabbitTestCase):
- """Integration-like tests for the RabbitMQ messaging abstractions."""
-
- def get_synced_sessions(self):
- try:
- syncs_set = transaction.manager.manager._synchs
- except KeyError:
- return set()
- else:
- return set(
- sync.session for sync in six.itervalues(syncs_set.data)
- if isinstance(sync, RabbitSessionTransactionSync))
-
- def test_global_session(self):
- self.assertIsInstance(global_session, RabbitSession)
- self.assertIn(global_session, self.get_synced_sessions())
-
- def test_global_unreliable_session(self):
- self.assertIsInstance(
- global_unreliable_session, RabbitUnreliableSession)
- self.assertIn(global_unreliable_session, self.get_synced_sessions())
-
- def test_abort(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(90, 100):
- routing_key.send(data)
-
- # Messages sent using send() are forgotten on abort.
- transaction.abort()
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
+ def test_unconfigured(self):
+ self.pushConfig("rabbitmq", broker_urls="none")
+ self.assertRaisesWithContent(
+ rabbit.MessagingUnavailable,
+ "Incomplete configuration",
+ rabbit.connect,
+ )
+
+ def test_single_broker_url(self):
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[broker_url],
+ ),
+ )
+
+ def test_multiple_broker_urls(self):
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ self.assertEqual("localhost", parsed_url["hostname"])
+ self.pushConfig(
+ "rabbitmq",
+ broker_urls=(
+ "%s amqp://guest:guest@alternate.example//" % broker_url
+ ),
+ )
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[broker_url, "amqp://guest:guest@alternate.example//"],
+ ),
+ )
+
+ def test_compat_config(self):
+ # The old-style host/userid/password/virtual_host configuration
+ # format still works.
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ self.assertEqual("localhost", parsed_url["hostname"])
+ self.pushConfig(
+ "rabbitmq",
+ broker_urls="none",
+ host="%s:%s" % (parsed_url["hostname"], parsed_url["port"]),
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ )
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[],
+ ),
+ )
diff --git a/lib/lp/services/rabbit/server.py b/lib/lp/services/rabbit/server.py
index 9cd3434..0cd8f6d 100644
--- a/lib/lp/services/rabbit/server.py
+++ b/lib/lp/services/rabbit/server.py
@@ -3,17 +3,9 @@
"""RabbitMQ server fixture."""
-from __future__ import (
- absolute_import,
- print_function,
- unicode_literals,
- )
-
-
-__metaclass__ = type
__all__ = [
- 'RabbitServer',
- ]
+ "RabbitServer",
+]
from textwrap import dedent
@@ -28,11 +20,14 @@ class RabbitServer(rabbitfixture.server.RabbitServer):
"""
def setUp(self):
- super(RabbitServer, self).setUp()
- self.config.service_config = dedent("""\
+ super().setUp()
+ # The two trailing slashes here are deliberate: this has the effect
+ # of setting the virtual host to "/" rather than to the empty
+ # string.
+ self.config.service_config = dedent(
+ """\
[rabbitmq]
- host: localhost:%d
- userid: guest
- password: guest
- virtual_host: /
- """ % self.config.port)
+ broker_urls: amqp://guest:guest@localhost:%d//
+ """
+ % self.config.port
+ )
diff --git a/lib/lp/services/rabbit/tests/test_server.py b/lib/lp/services/rabbit/tests/test_server.py
index ed93e2d..ec200d2 100644
--- a/lib/lp/services/rabbit/tests/test_server.py
+++ b/lib/lp/services/rabbit/tests/test_server.py
@@ -3,45 +3,36 @@
"""Tests for lp.services.rabbit.RabbitServer."""
-from __future__ import (
- absolute_import,
- print_function,
- unicode_literals,
- )
-
-
-__metaclass__ = type
-
import io
+from configparser import ConfigParser
from fixtures import EnvironmentVariableFixture
-from lp.services.compat import SafeConfigParser
from lp.services.rabbit.server import RabbitServer
from lp.testing import TestCase
from lp.testing.layers import BaseLayer
class TestRabbitServer(TestCase):
-
layer = BaseLayer
def test_service_config(self):
# Rabbit needs to fully isolate itself: an existing per user
# .erlang.cookie has to be ignored, and ditto bogus HOME if other
# tests fail to cleanup.
- self.useFixture(EnvironmentVariableFixture('HOME', '/nonsense/value'))
+ self.useFixture(EnvironmentVariableFixture("HOME", "/nonsense/value"))
+ # The default timeout is 15 seconds, but increase this a bit to
+ # allow some more leeway for slow test environments.
+ fixture = self.useFixture(RabbitServer(ctltimeout=120))
# RabbitServer pokes some .ini configuration into its config.
- fixture = self.useFixture(RabbitServer())
- service_config = SafeConfigParser()
- service_config.readfp(io.StringIO(fixture.config.service_config))
+ service_config = ConfigParser()
+ service_config.read_file(io.StringIO(fixture.config.service_config))
self.assertEqual(["rabbitmq"], service_config.sections())
expected = {
- "host": "localhost:%d" % fixture.config.port,
- "userid": "guest",
- "password": "guest",
- "virtual_host": "/",
- }
+ "broker_urls": (
+ "amqp://guest:guest@localhost:%d//" % fixture.config.port
+ ),
+ }
observed = dict(service_config.items("rabbitmq"))
self.assertEqual(expected, observed)
diff --git a/lib/lp/testing/__init__.py b/lib/lp/testing/__init__.py
index 68b2fcc..7ff3940 100644
--- a/lib/lp/testing/__init__.py
+++ b/lib/lp/testing/__init__.py
@@ -167,7 +167,18 @@ class TestCase(testtools.TestCase, fixtures.TestWithFixtures):
if msg is None:
msg = "%r is %r" % (expected, observed)
self.assertTrue(expected is not observed, msg)
-
+
+ def assertRaisesWithContent(
+ self, exception, exception_content, func, *args, **kwargs
+ ):
+ """Check if the given exception is raised with given content.
+
+ If the exception isn't raised or the exception_content doesn't
+ match what was raised an AssertionError is raised.
+ """
+ err = self.assertRaises(exception, func, *args, **kwargs)
+ self.assertEqual(exception_content, str(err))
+
def assertContentEqual(self, iter1, iter2):
"""Assert that 'iter1' has the same content as 'iter2'."""
self.assertThat(iter1, MatchesSetwise(*(map(Equals, iter2))))
diff --git a/lib/lp/testing/tests/test_layers_functional.py b/lib/lp/testing/tests/test_layers_functional.py
index b891061..6938f27 100644
--- a/lib/lp/testing/tests/test_layers_functional.py
+++ b/lib/lp/testing/tests/test_layers_functional.py
@@ -15,13 +15,13 @@ __metaclass__ = type
import os
import uuid
-import amqp
from fixtures import (
EnvironmentVariableFixture,
Fixture,
)
from lp.services.config import config
+from lp.services.messaging import rabbit
from lp.testing import TestCase
from lp.testing.layers import (
BaseLayer,
@@ -97,18 +97,10 @@ class BaseTestCase(TestCase):
self.assertEqual(BaseLayer.isSetUp, True)
def testRabbitWorking(self):
- rabbitmq = config.rabbitmq
if not self.want_rabbitmq:
- self.assertEqual(None, rabbitmq.host)
+ self.assertFalse(rabbit.is_configured())
else:
- self.assertNotEqual(None, rabbitmq.host)
- conn = amqp.Connection(
- host=rabbitmq.host,
- userid=rabbitmq.userid,
- password=rabbitmq.password,
- virtual_host=rabbitmq.virtual_host)
- conn.connect()
- conn.close()
+ rabbit.connect().close()
class RabbitMQTestCase(BaseTestCase):
diff --git a/requirements/lp-codeimport.txt b/requirements/lp-codeimport.txt
index 8f3c399..cbf253f 100644
--- a/requirements/lp-codeimport.txt
+++ b/requirements/lp-codeimport.txt
@@ -5,7 +5,7 @@
# Don't list entries from ztk-versions.cfg here unless overriding their
# versions; they will be included automatically.
-amqp==2.4.2
+amqp==2.6.1
appdirs==1.4.3
asn1crypto==0.23.0
attrs==19.1.0
@@ -29,6 +29,7 @@ incremental==17.5.0
ipaddress==1.0.18
iso8601==0.1.12
keyring==0.6.2
+kombu==4.6.11
launchpadlib==1.10.9
lazr.config==2.2.2
lazr.delegates==2.0.4
@@ -40,7 +41,7 @@ mistune==0.8.3
mock==1.0.1
oauthlib==3.1.0
oops==0.0.14
-oops-amqp==0.1.0
+oops-amqp==0.2.0
oops-datedir-repo==0.0.24
oops-datedir2amqp==0.1.0
oops-timeline==0.0.3
@@ -54,7 +55,7 @@ pymacaroons==0.13.0
PyNaCl==1.3.0
pyOpenSSL==17.5.0
python-dateutil==2.8.1
-rabbitfixture==0.4.2
+rabbitfixture==0.5.3
responses==0.9.0
scandir==1.7
service-identity==18.1.0
@@ -62,7 +63,6 @@ setuptools-git==1.2
setuptools-scm==3.4.3
six==1.15.0
subprocess32==3.2.6
-subvertpy==0.11.0
testresources==0.2.7
testscenarios==0.4
timeline==0.0.7
diff --git a/setup.py b/setup.py
index 8b4b377..fb9dede 100644
--- a/setup.py
+++ b/setup.py
@@ -162,11 +162,10 @@ setup(
# entirely, but we need to retain it until codeimport has been
# ported to Breezy.
'breezy',
- 'bzr; python_version < "3"',
- 'contextlib2; python_version < "3.3"',
'defusedxml',
'dulwich',
'fixtures',
+ 'kombu',
'lazr.config',
'lazr.enum',
'lazr.uri',
Follow ups