← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into lp-codeimport:focal

 

Tushar Gupta has proposed merging ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into lp-codeimport:focal with ~tushar5526/lp-codeimport:deprecate-bzr as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~tushar5526/lp-codeimport/+git/lp-codeimport/+merge/488162
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into lp-codeimport:focal.
diff --git a/configs/development/codeimport-lazr.conf b/configs/development/codeimport-lazr.conf
index 30dc168..0696766 100644
--- a/configs/development/codeimport-lazr.conf
+++ b/configs/development/codeimport-lazr.conf
@@ -23,7 +23,5 @@ error_dir: /var/tmp/lperr
 ca_certificates_path: /etc/ssl/certs/ca-certificates.crt
 
 [rabbitmq]
-host: localhost:56720
-userid: guest
-password: guest
-virtual_host: /
+launch: True
+broker_urls: amqp://guest:guest@localhost:56720//
diff --git a/configs/testrunner/codeimport-lazr.conf b/configs/testrunner/codeimport-lazr.conf
index c91cff0..07c8580 100644
--- a/configs/testrunner/codeimport-lazr.conf
+++ b/configs/testrunner/codeimport-lazr.conf
@@ -10,7 +10,5 @@ oops_prefix: T
 error_dir: /var/tmp/lperr.test
 
 [rabbitmq]
-host: none
-userid: none
-password: none
-virtual_host: none
+launch: False
+broker_urls: none
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index 3041d6c..6d2d380 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -114,12 +114,26 @@ urlfetch_timeout: 30
 
 
 [rabbitmq]
-# The host:port at which RabbitMQ is listening.
+# Should RabbitMQ be launched by default?
+# datatype: boolean
+launch: False
+# The URL at which RabbitMQ is listening (in the form
+# amqp://USERNAME:PASSWORD@HOSTNAME:PORT/VIRTUAL_HOST), or a space-separated
+# list of such URLs to use round-robin failover between them.
+broker_urls: none
+# The host:port at which RabbitMQ is listening (ignored if broker_urls is
+# set).
 # datatype: string
 host: none
+# The username to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
 # datatype: string
 userid: none
+# The password to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
 # datatype: string
 password: none
+# The virtual host to use when connecting to RabbitMQ (ignored if
+# broker_urls is set).
 # datatype: string
 virtual_host: none
diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py
index c3fa0e3..c73efac 100644
--- a/lib/lp/services/messaging/interfaces.py
+++ b/lib/lp/services/messaging/interfaces.py
@@ -3,20 +3,10 @@
 
 """Messaging interfaces."""
 
-__metaclass__ = type
 __all__ = [
-    'IMessageConsumer',
-    'IMessageProducer',
-    'IMessageSession',
-    'MessagingException',
-    'MessagingUnavailable',
-    'QueueEmpty',
-    'QueueNotFound',
-    ]
-
-
-from zope.interface import Interface
-from zope.schema import Bool
+    "MessagingException",
+    "MessagingUnavailable",
+]
 
 
 class MessagingException(Exception):
@@ -25,77 +15,3 @@ class MessagingException(Exception):
 
 class MessagingUnavailable(MessagingException):
     """Messaging systems are not available."""
-
-
-class QueueNotFound(MessagingException):
-    """Raised if the queue was not found."""
-
-
-class QueueEmpty(MessagingException):
-    """Raised if there are no queued messages on a non-blocking read."""
-
-
-class IMessageSession(Interface):
-
-    is_connected = Bool(
-        u"Whether the session is connected to the messaging system.")
-
-    def connect():
-        """Connect to the messaging system.
-
-        If the session is already connected this should be a no-op.
-        """
-
-    def disconnect():
-        """Disconnect from the messaging system.
-
-        If the session is already disconnected this should be a no-op.
-        """
-
-    def flush():
-        """Run deferred tasks."""
-
-    def finish():
-        """Flush the session and reset."""
-
-    def reset():
-        """Reset the session."""
-
-    def defer(func, *args, **kwargs):
-        """Schedule something to happen when this session is finished."""
-
-    def getProducer(name):
-        """Get a `IMessageProducer` associated with this session."""
-
-    def getConsumer(name):
-        """Get a `IMessageConsumer` associated with this session."""
-
-
-class IMessageConsumer(Interface):
-
-    def receive(blocking=True):
-        """Receive data from the queue.
-
-        :raises EmptyQueue: If non-blocking and the queue is empty.
-        """
-
-
-class IMessageProducer(Interface):
-
-    def send(data):
-        """Serialize `data` into JSON and send it to the queue on commit."""
-
-    def sendNow(data):
-        """Serialize `data` into JSON and send it to the queue immediately."""
-
-    def associateConsumer(consumer):
-        """Make the consumer receive messages from this producer on commit.
-
-        :param consumer: An `IMessageConsumer`
-        """
-
-    def associateConsumerNow(consumer):
-        """Make the consumer receive messages from this producer.
-
-        :param consumer: An `IMessageConsumer`
-        """
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 417d3a6..bfdf60f 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -3,67 +3,28 @@
 
 """An API for messaging systems in Launchpad, e.g. RabbitMQ."""
 
-__metaclass__ = type
 __all__ = [
     "connect",
     "is_configured",
-    "session",
-    "unreliable_session",
-    ]
+]
 
-from collections import deque
-from functools import partial
-import json
-import socket
-import sys
-import threading
-import time
-
-import amqp
-import transaction
-from transaction._transaction import Status as TransactionStatus
-from zope.interface import implementer
+import kombu
+from lazr.config import as_host_port
 
 from lp.services.config import config
-from lp.services.messaging.interfaces import (
-    IMessageConsumer,
-    IMessageProducer,
-    IMessageSession,
-    MessagingUnavailable,
-    QueueEmpty,
-    QueueNotFound,
-    )
-
-
-LAUNCHPAD_EXCHANGE = "launchpad-exchange"
-
-
-@implementer(transaction.interfaces.ISynchronizer)
-class RabbitSessionTransactionSync:
-
-    def __init__(self, session):
-        self.session = session
-
-    def newTransaction(self, txn):
-        pass
-
-    def beforeCompletion(self, txn):
-        pass
-
-    def afterCompletion(self, txn):
-        if txn.status == TransactionStatus.COMMITTED:
-            self.session.finish()
-        else:
-            self.session.reset()
+from lp.services.messaging.interfaces import MessagingUnavailable
 
 
 def is_configured():
     """Return True if rabbit looks to be configured."""
+    if config.rabbitmq.broker_urls is not None:
+        return True
     return not (
-        config.rabbitmq.host is None or
-        config.rabbitmq.userid is None or
-        config.rabbitmq.password is None or
-        config.rabbitmq.virtual_host is None)
+        config.rabbitmq.host is None
+        or config.rabbitmq.userid is None
+        or config.rabbitmq.password is None
+        or config.rabbitmq.virtual_host is None
+    )
 
 
 def connect():
@@ -73,216 +34,16 @@ def connect():
     """
     if not is_configured():
         raise MessagingUnavailable("Incomplete configuration")
-    connection = amqp.Connection(
-        host=config.rabbitmq.host, userid=config.rabbitmq.userid,
-        password=config.rabbitmq.password,
-        virtual_host=config.rabbitmq.virtual_host)
+    if config.rabbitmq.broker_urls is not None:
+        connection = kombu.Connection(config.rabbitmq.broker_urls.split())
+    else:
+        hostname, port = as_host_port(config.rabbitmq.host, default_port=5672)
+        connection = kombu.Connection(
+            hostname=hostname,
+            userid=config.rabbitmq.userid,
+            password=config.rabbitmq.password,
+            virtual_host=config.rabbitmq.virtual_host,
+            port=port,
+        )
     connection.connect()
     return connection
-
-
-@implementer(IMessageSession)
-class RabbitSession(threading.local):
-
-    exchange = LAUNCHPAD_EXCHANGE
-
-    def __init__(self):
-        super(RabbitSession, self).__init__()
-        self._connection = None
-        self._deferred = deque()
-        # Maintain sessions according to transaction boundaries. Keep a strong
-        # reference to the sync because the transaction manager does not. We
-        # need one per thread (definining it here is enough to ensure that).
-        self._sync = RabbitSessionTransactionSync(self)
-        transaction.manager.registerSynch(self._sync)
-
-    @property
-    def is_connected(self):
-        """See `IMessageSession`."""
-        return self._connection is not None and self._connection.connected
-
-    def connect(self):
-        """See `IMessageSession`.
-
-        Open a connection for this thread if necessary. Connections cannot be
-        shared between threads.
-        """
-        if self._connection is None or not self._connection.connected:
-            self._connection = connect()
-        return self._connection
-
-    def disconnect(self):
-        """See `IMessageSession`."""
-        if self._connection is not None:
-            try:
-                self._connection.close()
-            except socket.error:
-                # Socket error is fine; the connection is still closed.
-                pass
-            finally:
-                self._connection = None
-
-    def flush(self):
-        """See `IMessageSession`."""
-        tasks = self._deferred
-        while len(tasks) != 0:
-            tasks.popleft()()
-
-    def finish(self):
-        """See `IMessageSession`."""
-        try:
-            self.flush()
-        finally:
-            self.reset()
-
-    def reset(self):
-        """See `IMessageSession`."""
-        self._deferred.clear()
-        self.disconnect()
-
-    def defer(self, func, *args, **kwargs):
-        """See `IMessageSession`."""
-        self._deferred.append(partial(func, *args, **kwargs))
-
-    def getProducer(self, name):
-        """See `IMessageSession`."""
-        return RabbitRoutingKey(self, name)
-
-    def getConsumer(self, name):
-        """See `IMessageSession`."""
-        return RabbitQueue(self, name)
-
-
-# Per-thread sessions.
-session = RabbitSession()
-session_finish_handler = (
-    lambda event: session.finish())
-
-
-class RabbitUnreliableSession(RabbitSession):
-    """An "unreliable" `RabbitSession`.
-
-    Unreliable in this case means that certain errors in deferred tasks are
-    silently suppressed. This means that services can continue to function
-    even in the absence of a running and fully functional message queue.
-
-    Other types of errors are also caught because we don't want this
-    subsystem to destabilise other parts of Launchpad but we nonetheless
-    record OOPses for these.
-
-    XXX: We only suppress MessagingUnavailable for now because we want to
-    monitor this closely before we add more exceptions to the
-    suppressed_errors list. Potential candidates are `MessagingException`,
-    `IOError` or `amqp.AMQPException`.
-    """
-
-    suppressed_errors = (
-        MessagingUnavailable,
-        )
-
-    def finish(self):
-        """See `IMessageSession`.
-
-        Suppresses errors listed in `suppressed_errors`. Also suppresses
-        other errors but files an oops report for these.
-        """
-        try:
-            super(RabbitUnreliableSession, self).finish()
-        except self.suppressed_errors:
-            pass
-        except Exception:
-            from lp.services.webapp import errorlog
-            errorlog.globalErrorUtility.raising(sys.exc_info())
-
-
-# Per-thread "unreliable" sessions.
-unreliable_session = RabbitUnreliableSession()
-unreliable_session_finish_handler = (
-    lambda event: unreliable_session.finish())
-
-
-class RabbitMessageBase:
-    """Base class for all RabbitMQ messaging."""
-
-    def __init__(self, session):
-        self.session = IMessageSession(session)
-        self._channel = None
-
-    @property
-    def channel(self):
-        if self._channel is None or not self._channel.is_open:
-            connection = self.session.connect()
-            self._channel = connection.channel()
-            self._channel.exchange_declare(
-                self.session.exchange, "direct", durable=False,
-                auto_delete=False, nowait=False)
-        return self._channel
-
-
-@implementer(IMessageProducer)
-class RabbitRoutingKey(RabbitMessageBase):
-    """A RabbitMQ data origination point."""
-
-    def __init__(self, session, routing_key):
-        super(RabbitRoutingKey, self).__init__(session)
-        self.key = routing_key
-
-    def associateConsumer(self, consumer):
-        """Only receive messages for requested routing key."""
-        self.session.defer(self.associateConsumerNow, consumer)
-
-    def associateConsumerNow(self, consumer):
-        """Only receive messages for requested routing key."""
-        # The queue will be auto-deleted 5 minutes after its last use.
-        # http://www.rabbitmq.com/extensions.html#queue-leases
-        self.channel.queue_declare(
-            consumer.name, nowait=False, auto_delete=False,
-            arguments={"x-expires": 300000})  # 5 minutes.
-        self.channel.queue_bind(
-            queue=consumer.name, exchange=self.session.exchange,
-            routing_key=self.key, nowait=False)
-
-    def send(self, data):
-        """See `IMessageProducer`."""
-        self.session.defer(self.sendNow, data)
-
-    def sendNow(self, data):
-        """Immediately send a message to the broker."""
-        json_data = json.dumps(data)
-        msg = amqp.Message(json_data)
-        self.channel.basic_publish(
-            exchange=self.session.exchange,
-            routing_key=self.key, msg=msg)
-
-
-@implementer(IMessageConsumer)
-class RabbitQueue(RabbitMessageBase):
-    """A RabbitMQ Queue."""
-
-    def __init__(self, session, name):
-        super(RabbitQueue, self).__init__(session)
-        self.name = name
-
-    def receive(self, timeout=0.0):
-        """Pull a message from the queue.
-
-        :param timeout: Wait a maximum of `timeout` seconds before giving up,
-            trying at least once.
-        :raises QueueEmpty: if the timeout passes.
-        """
-        endtime = time.time() + timeout
-        while True:
-            try:
-                message = self.channel.basic_get(self.name)
-                if message is None:
-                    if time.time() > endtime:
-                        raise QueueEmpty()
-                    time.sleep(0.1)
-                else:
-                    self.channel.basic_ack(message.delivery_tag)
-                    return json.loads(message.body)
-            except amqp.ChannelError as error:
-                if error.reply_code == 404:
-                    raise QueueNotFound()
-                else:
-                    raise
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
index b25c073..f4d8f7c 100644
--- a/lib/lp/services/messaging/tests/test_rabbit.py
+++ b/lib/lp/services/messaging/tests/test_rabbit.py
@@ -1,417 +1,121 @@
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
-"""Messaging utility tests."""
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesStructure
 
-__metaclass__ = type
+from lp.services.config import config
+from lp.services.messaging import rabbit
+from lp.testing import TestCase
+from lp.testing.layers import BaseLayer, RabbitMQLayer
 
-from functools import partial
-from itertools import count
-import socket
 
-import six
-from testtools.testcase import ExpectedException
-import transaction
-from transaction._transaction import Status as TransactionStatus
+class TestIsConfigured(TestCase):
+    layer = BaseLayer
 
-from lp.services.messaging.interfaces import (
-    IMessageConsumer,
-    IMessageProducer,
-    IMessageSession,
-    MessagingUnavailable,
-    QueueEmpty,
-    QueueNotFound,
-    )
-from lp.services.messaging.rabbit import (
-    RabbitMessageBase,
-    RabbitQueue,
-    RabbitRoutingKey,
-    RabbitSession,
-    RabbitSessionTransactionSync,
-    RabbitUnreliableSession,
-    session as global_session,
-    unreliable_session as global_unreliable_session,
-    )
-from lp.testing import (
-    monkey_patch,
-    TestCase,
-    )
-from lp.testing.fakemethod import FakeMethod
-from lp.testing.faketransaction import FakeTransaction
-from lp.testing.layers import RabbitMQLayer
-from lp.testing.matchers import Provides
+    def test_unconfigured(self):
+        self.assertFalse(rabbit.is_configured())
 
+    def test_broker_url(self):
+        self.pushConfig(
+            "rabbitmq", broker_urls="amqp://guest:guest@rabbitmq.example//"
+        )
+        self.assertTrue(rabbit.is_configured())
 
-# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
-# of distinct names.
-queue_names = ("queue.%d" % num for num in count(1))
-key_names = ("key.%d" % num for num in count(1))
+    def test_partial_compat(self):
+        self.pushConfig("rabbitmq", host="rabbitmq.example")
+        self.assertFalse(rabbit.is_configured())
 
+    def test_full_compat(self):
+        self.pushConfig(
+            "rabbitmq",
+            host="rabbitmq.example",
+            userid="guest",
+            password="guest",
+            virtual_host="/",
+        )
+        self.assertTrue(rabbit.is_configured())
 
-class FakeRabbitSession:
 
-    def __init__(self):
-        self.log = []
-
-    def finish(self):
-        self.log.append("finish")
-
-    def reset(self):
-        self.log.append("reset")
-
-
-class TestRabbitSessionTransactionSync(TestCase):
-
-    def test_interface(self):
-        self.assertThat(
-            RabbitSessionTransactionSync(None),
-            Provides(transaction.interfaces.ISynchronizer))
-
-    def test_afterCompletion_COMMITTED(self):
-        txn = FakeTransaction()
-        txn.status = TransactionStatus.COMMITTED
-        fake_session = FakeRabbitSession()
-        sync = RabbitSessionTransactionSync(fake_session)
-        sync.afterCompletion(txn)
-        self.assertEqual(["finish"], fake_session.log)
-
-    def test_afterCompletion_ACTIVE(self):
-        txn = FakeTransaction()
-        txn.status = TransactionStatus.ACTIVE
-        fake_session = FakeRabbitSession()
-        sync = RabbitSessionTransactionSync(fake_session)
-        sync.afterCompletion(txn)
-        self.assertEqual(["reset"], fake_session.log)
-
-
-class RabbitTestCase(TestCase):
-
-    layer = RabbitMQLayer
-
-    def tearDown(self):
-        super(RabbitTestCase, self).tearDown()
-        global_session.reset()
-        global_unreliable_session.reset()
-
-
-class TestRabbitSession(RabbitTestCase):
-
-    session_factory = RabbitSession
-
-    def test_interface(self):
-        session = self.session_factory()
-        self.assertThat(session, Provides(IMessageSession))
-
-    def test_connect(self):
-        session = self.session_factory()
-        self.assertFalse(session.is_connected)
-        connection = session.connect()
-        self.assertTrue(session.is_connected)
-        self.assertIs(connection, session._connection)
-
-    def test_connect_with_incomplete_configuration(self):
-        self.pushConfig("rabbitmq", host="none")
-        session = self.session_factory()
-        with ExpectedException(
-            MessagingUnavailable, "Incomplete configuration"):
-            session.connect()
-
-    def test_disconnect(self):
-        session = self.session_factory()
-        session.connect()
-        session.disconnect()
-        self.assertFalse(session.is_connected)
-
-    def test_disconnect_with_error(self):
-        session = self.session_factory()
-        session.connect()
-        old_close = session._connection.close
-
-        def new_close(*args, **kwargs):
-            old_close(*args, **kwargs)
-            raise socket.error
-
-        with monkey_patch(session._connection, close=new_close):
-            session.disconnect()
-            self.assertFalse(session.is_connected)
-
-    def test_is_connected(self):
-        # is_connected is False once a connection has been closed.
-        session = self.session_factory()
-        session.connect()
-        # Close the connection without using disconnect().
-        session._connection.close()
-        self.assertFalse(session.is_connected)
-
-    def test_defer(self):
-        task = lambda foo, bar: None
-        session = self.session_factory()
-        session.defer(task, "foo", bar="baz")
-        self.assertEqual(1, len(session._deferred))
-        [deferred_task] = session._deferred
-        self.assertIsInstance(deferred_task, partial)
-        self.assertIs(task, deferred_task.func)
-        self.assertEqual(("foo",), deferred_task.args)
-        self.assertEqual({"bar": "baz"}, deferred_task.keywords)
-
-    def test_flush(self):
-        # RabbitSession.flush() runs deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.flush()
-        self.assertEqual(["task"], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertTrue(session.is_connected)
-
-    def test_reset(self):
-        # RabbitSession.reset() resets session variables and does not run
-        # deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.reset()
-        self.assertEqual([], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertFalse(session.is_connected)
-
-    def test_finish(self):
-        # RabbitSession.finish() resets session variables after running
-        # deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.finish()
-        self.assertEqual(["task"], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertFalse(session.is_connected)
-
-    def test_getProducer(self):
-        session = self.session_factory()
-        producer = session.getProducer("foo")
-        self.assertIsInstance(producer, RabbitRoutingKey)
-        self.assertIs(session, producer.session)
-        self.assertEqual("foo", producer.key)
-
-    def test_getConsumer(self):
-        session = self.session_factory()
-        consumer = session.getConsumer("foo")
-        self.assertIsInstance(consumer, RabbitQueue)
-        self.assertIs(session, consumer.session)
-        self.assertEqual("foo", consumer.name)
-
-
-class TestRabbitUnreliableSession(TestRabbitSession):
-
-    session_factory = RabbitUnreliableSession
+class TestConnect(TestCase):
     layer = RabbitMQLayer
 
-    def setUp(self):
-        super(TestRabbitUnreliableSession, self).setUp()
-        self.prev_oops = self.getOops()
-
-    def getOops(self):
-        try:
-            self.oops_capture.sync()
-            return self.oopses[-1]
-        except IndexError:
-            return None
-
-    def assertNoOops(self):
-        oops_report = self.getOops()
-        self.assertEqual(repr(self.prev_oops), repr(oops_report))
-
-    def assertOops(self, text_in_oops):
-        oops_report = self.getOops()
-        self.assertNotEqual(
-            repr(self.prev_oops), repr(oops_report), 'No OOPS reported!')
-        self.assertIn(text_in_oops, str(oops_report))
-
-    def _test_finish_suppresses_exception(self, exception):
-        # Simple helper to test that the given exception is suppressed
-        # when raised by finish().
-        session = self.session_factory()
-        session.defer(FakeMethod(failure=exception))
-        session.finish()  # Look, no exceptions!
-
-    def test_finish_suppresses_MessagingUnavailable(self):
-        self._test_finish_suppresses_exception(
-            MessagingUnavailable('Messaging borked.'))
-        self.assertNoOops()
-
-    def test_finish_suppresses_other_errors_with_oopses(self):
-        exception = Exception("That hent worked.")
-        self._test_finish_suppresses_exception(exception)
-        self.assertOops(str(exception))
-
-
-class TestRabbitMessageBase(RabbitTestCase):
-
-    def test_session(self):
-        base = RabbitMessageBase(global_session)
-        self.assertIs(global_session, base.session)
-
-    def test_channel(self):
-        # Referencing the channel property causes the session to connect.
-        base = RabbitMessageBase(global_session)
-        self.assertFalse(base.session.is_connected)
-        channel = base.channel
-        self.assertTrue(base.session.is_connected)
-        self.assertIsNot(None, channel)
-        # The same channel is returned every time.
-        self.assertIs(channel, base.channel)
-
-    def test_channel_session_closed(self):
-        # When the session is disconnected the channel is thrown away too.
-        base = RabbitMessageBase(global_session)
-        channel1 = base.channel
-        base.session.disconnect()
-        channel2 = base.channel
-        self.assertNotEqual(channel1, channel2)
-
-
-class TestRabbitRoutingKey(RabbitTestCase):
-
-    def test_interface(self):
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        self.assertThat(routing_key, Provides(IMessageProducer))
-
-    def test_associateConsumer(self):
-        # associateConsumer() only associates the consumer at transaction
-        # commit time. However, order is preserved.
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumer(consumer)
-        # The session is still not connected.
-        self.assertFalse(global_session.is_connected)
-        routing_key.sendNow('now')
-        routing_key.send('later')
-        # The queue is not found because the consumer has not yet been
-        # associated with the routing key and the queue declared.
-        self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
-        transaction.commit()
-        # Now that the transaction has been committed, the consumer is
-        # associated, and receives the deferred message.
-        self.assertEqual('later', consumer.receive(timeout=2))
-
-    def test_associateConsumerNow(self):
-        # associateConsumerNow() associates the consumer right away.
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-        routing_key.sendNow('now')
-        routing_key.send('later')
-        # There is already something in the queue.
-        self.assertEqual('now', consumer.receive(timeout=2))
-        transaction.commit()
-        # Now that the transaction has been committed there is another item in
-        # the queue.
-        self.assertEqual('later', consumer.receive(timeout=2))
-
-    def test_send(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(90, 100):
-            routing_key.send(data)
-
-        routing_key.sendNow('sync')
-        # There is nothing in the queue except the sync we just sent.
-        self.assertEqual('sync', consumer.receive(timeout=2))
-
-        # Messages get sent on commit
-        transaction.commit()
-        for data in range(90, 100):
-            self.assertEqual(data, consumer.receive())
-
-        # There are no more messages. They have all been consumed.
-        routing_key.sendNow('sync')
-        self.assertEqual('sync', consumer.receive(timeout=2))
-
-    def test_sendNow(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(50, 60):
-            routing_key.sendNow(data)
-            received_data = consumer.receive(timeout=2)
-            self.assertEqual(data, received_data)
-
-    def test_does_not_connect_session_immediately(self):
-        # RabbitRoutingKey does not connect the session until necessary.
-        RabbitRoutingKey(global_session, next(key_names))
-        self.assertFalse(global_session.is_connected)
-
-
-class TestRabbitQueue(RabbitTestCase):
-
-    def test_interface(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        self.assertThat(consumer, Provides(IMessageConsumer))
-
-    def test_receive(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(55, 65):
-            routing_key.sendNow(data)
-            self.assertEqual(data, consumer.receive(timeout=2))
-
-        # All the messages received were consumed.
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-        # New connections to the queue see an empty queue too.
-        consumer.session.disconnect()
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-    def test_does_not_connect_session_immediately(self):
-        # RabbitQueue does not connect the session until necessary.
-        RabbitQueue(global_session, next(queue_names))
-        self.assertFalse(global_session.is_connected)
-
-
-class TestRabbit(RabbitTestCase):
-    """Integration-like tests for the RabbitMQ messaging abstractions."""
-
-    def get_synced_sessions(self):
-        try:
-            syncs_set = transaction.manager.manager._synchs
-        except KeyError:
-            return set()
-        else:
-            return set(
-                sync.session for sync in six.itervalues(syncs_set.data)
-                if isinstance(sync, RabbitSessionTransactionSync))
-
-    def test_global_session(self):
-        self.assertIsInstance(global_session, RabbitSession)
-        self.assertIn(global_session, self.get_synced_sessions())
-
-    def test_global_unreliable_session(self):
-        self.assertIsInstance(
-            global_unreliable_session, RabbitUnreliableSession)
-        self.assertIn(global_unreliable_session, self.get_synced_sessions())
-
-    def test_abort(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(90, 100):
-            routing_key.send(data)
-
-        # Messages sent using send() are forgotten on abort.
-        transaction.abort()
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
+    def test_unconfigured(self):
+        self.pushConfig("rabbitmq", broker_urls="none")
+        self.assertRaisesWithContent(
+            rabbit.MessagingUnavailable,
+            "Incomplete configuration",
+            rabbit.connect,
+        )
+
+    def test_single_broker_url(self):
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[broker_url],
+                ),
+            )
+
+    def test_multiple_broker_urls(self):
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        self.assertEqual("localhost", parsed_url["hostname"])
+        self.pushConfig(
+            "rabbitmq",
+            broker_urls=(
+                "%s amqp://guest:guest@alternate.example//" % broker_url
+            ),
+        )
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[broker_url, "amqp://guest:guest@alternate.example//"],
+                ),
+            )
+
+    def test_compat_config(self):
+        # The old-style host/userid/password/virtual_host configuration
+        # format still works.
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        self.assertEqual("localhost", parsed_url["hostname"])
+        self.pushConfig(
+            "rabbitmq",
+            broker_urls="none",
+            host="%s:%s" % (parsed_url["hostname"], parsed_url["port"]),
+            userid=parsed_url["userid"],
+            password=parsed_url["password"],
+            virtual_host=parsed_url["virtual_host"],
+        )
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[],
+                ),
+            )
diff --git a/lib/lp/services/rabbit/server.py b/lib/lp/services/rabbit/server.py
index 9cd3434..0cd8f6d 100644
--- a/lib/lp/services/rabbit/server.py
+++ b/lib/lp/services/rabbit/server.py
@@ -3,17 +3,9 @@
 
 """RabbitMQ server fixture."""
 
-from __future__ import (
-    absolute_import,
-    print_function,
-    unicode_literals,
-    )
-
-
-__metaclass__ = type
 __all__ = [
-    'RabbitServer',
-    ]
+    "RabbitServer",
+]
 
 from textwrap import dedent
 
@@ -28,11 +20,14 @@ class RabbitServer(rabbitfixture.server.RabbitServer):
     """
 
     def setUp(self):
-        super(RabbitServer, self).setUp()
-        self.config.service_config = dedent("""\
+        super().setUp()
+        # The two trailing slashes here are deliberate: this has the effect
+        # of setting the virtual host to "/" rather than to the empty
+        # string.
+        self.config.service_config = dedent(
+            """\
             [rabbitmq]
-            host: localhost:%d
-            userid: guest
-            password: guest
-            virtual_host: /
-            """ % self.config.port)
+            broker_urls: amqp://guest:guest@localhost:%d//
+            """
+            % self.config.port
+        )
diff --git a/lib/lp/services/rabbit/tests/test_server.py b/lib/lp/services/rabbit/tests/test_server.py
index ed93e2d..ec200d2 100644
--- a/lib/lp/services/rabbit/tests/test_server.py
+++ b/lib/lp/services/rabbit/tests/test_server.py
@@ -3,45 +3,36 @@
 
 """Tests for lp.services.rabbit.RabbitServer."""
 
-from __future__ import (
-    absolute_import,
-    print_function,
-    unicode_literals,
-    )
-
-
-__metaclass__ = type
-
 import io
+from configparser import ConfigParser
 
 from fixtures import EnvironmentVariableFixture
 
-from lp.services.compat import SafeConfigParser
 from lp.services.rabbit.server import RabbitServer
 from lp.testing import TestCase
 from lp.testing.layers import BaseLayer
 
 
 class TestRabbitServer(TestCase):
-
     layer = BaseLayer
 
     def test_service_config(self):
         # Rabbit needs to fully isolate itself: an existing per user
         # .erlang.cookie has to be ignored, and ditto bogus HOME if other
         # tests fail to cleanup.
-        self.useFixture(EnvironmentVariableFixture('HOME', '/nonsense/value'))
+        self.useFixture(EnvironmentVariableFixture("HOME", "/nonsense/value"))
 
+        # The default timeout is 15 seconds, but increase this a bit to
+        # allow some more leeway for slow test environments.
+        fixture = self.useFixture(RabbitServer(ctltimeout=120))
         # RabbitServer pokes some .ini configuration into its config.
-        fixture = self.useFixture(RabbitServer())
-        service_config = SafeConfigParser()
-        service_config.readfp(io.StringIO(fixture.config.service_config))
+        service_config = ConfigParser()
+        service_config.read_file(io.StringIO(fixture.config.service_config))
         self.assertEqual(["rabbitmq"], service_config.sections())
         expected = {
-            "host": "localhost:%d" % fixture.config.port,
-            "userid": "guest",
-            "password": "guest",
-            "virtual_host": "/",
-            }
+            "broker_urls": (
+                "amqp://guest:guest@localhost:%d//" % fixture.config.port
+            ),
+        }
         observed = dict(service_config.items("rabbitmq"))
         self.assertEqual(expected, observed)
diff --git a/lib/lp/testing/__init__.py b/lib/lp/testing/__init__.py
index 68b2fcc..7ff3940 100644
--- a/lib/lp/testing/__init__.py
+++ b/lib/lp/testing/__init__.py
@@ -167,7 +167,18 @@ class TestCase(testtools.TestCase, fixtures.TestWithFixtures):
         if msg is None:
             msg = "%r is %r" % (expected, observed)
         self.assertTrue(expected is not observed, msg)
-
+    
+    def assertRaisesWithContent(
+        self, exception, exception_content, func, *args, **kwargs
+    ):
+        """Check if the given exception is raised with given content.
+
+        If the exception isn't raised or the exception_content doesn't
+        match what was raised an AssertionError is raised.
+        """
+        err = self.assertRaises(exception, func, *args, **kwargs)
+        self.assertEqual(exception_content, str(err))
+    
     def assertContentEqual(self, iter1, iter2):
         """Assert that 'iter1' has the same content as 'iter2'."""
         self.assertThat(iter1, MatchesSetwise(*(map(Equals, iter2))))
diff --git a/lib/lp/testing/tests/test_layers_functional.py b/lib/lp/testing/tests/test_layers_functional.py
index b891061..6938f27 100644
--- a/lib/lp/testing/tests/test_layers_functional.py
+++ b/lib/lp/testing/tests/test_layers_functional.py
@@ -15,13 +15,13 @@ __metaclass__ = type
 import os
 import uuid
 
-import amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,
     )
 
 from lp.services.config import config
+from lp.services.messaging import rabbit
 from lp.testing import TestCase
 from lp.testing.layers import (
     BaseLayer,
@@ -97,18 +97,10 @@ class BaseTestCase(TestCase):
         self.assertEqual(BaseLayer.isSetUp, True)
 
     def testRabbitWorking(self):
-        rabbitmq = config.rabbitmq
         if not self.want_rabbitmq:
-            self.assertEqual(None, rabbitmq.host)
+            self.assertFalse(rabbit.is_configured())
         else:
-            self.assertNotEqual(None, rabbitmq.host)
-            conn = amqp.Connection(
-                host=rabbitmq.host,
-                userid=rabbitmq.userid,
-                password=rabbitmq.password,
-                virtual_host=rabbitmq.virtual_host)
-            conn.connect()
-            conn.close()
+            rabbit.connect().close()
 
 
 class RabbitMQTestCase(BaseTestCase):
diff --git a/requirements/lp-codeimport.txt b/requirements/lp-codeimport.txt
index 8f3c399..cbf253f 100644
--- a/requirements/lp-codeimport.txt
+++ b/requirements/lp-codeimport.txt
@@ -5,7 +5,7 @@
 # Don't list entries from ztk-versions.cfg here unless overriding their
 # versions; they will be included automatically.
 
-amqp==2.4.2
+amqp==2.6.1
 appdirs==1.4.3
 asn1crypto==0.23.0
 attrs==19.1.0
@@ -29,6 +29,7 @@ incremental==17.5.0
 ipaddress==1.0.18
 iso8601==0.1.12
 keyring==0.6.2
+kombu==4.6.11
 launchpadlib==1.10.9
 lazr.config==2.2.2
 lazr.delegates==2.0.4
@@ -40,7 +41,7 @@ mistune==0.8.3
 mock==1.0.1
 oauthlib==3.1.0
 oops==0.0.14
-oops-amqp==0.1.0
+oops-amqp==0.2.0
 oops-datedir-repo==0.0.24
 oops-datedir2amqp==0.1.0
 oops-timeline==0.0.3
@@ -54,7 +55,7 @@ pymacaroons==0.13.0
 PyNaCl==1.3.0
 pyOpenSSL==17.5.0
 python-dateutil==2.8.1
-rabbitfixture==0.4.2
+rabbitfixture==0.5.3
 responses==0.9.0
 scandir==1.7
 service-identity==18.1.0
@@ -62,7 +63,6 @@ setuptools-git==1.2
 setuptools-scm==3.4.3
 six==1.15.0
 subprocess32==3.2.6
-subvertpy==0.11.0
 testresources==0.2.7
 testscenarios==0.4
 timeline==0.0.7
diff --git a/setup.py b/setup.py
index 8b4b377..fb9dede 100644
--- a/setup.py
+++ b/setup.py
@@ -162,11 +162,10 @@ setup(
         # entirely, but we need to retain it until codeimport has been
         # ported to Breezy.
         'breezy',
-        'bzr; python_version < "3"',
-        'contextlib2; python_version < "3.3"',
         'defusedxml',
         'dulwich',
         'fixtures',
+        'kombu',
         'lazr.config',
         'lazr.enum',
         'lazr.uri',