← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:kombu-amqp into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:kombu-amqp into launchpad:master.

Commit message:
Support multiple RabbitMQ broker URLs

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/432065

`kombu` (already used by `celery`) is a higher-level messaging library than `amqp`.  For our purposes, we can treat it mostly as a wrapper for `amqp` with some slightly more convenient interfaces, but it has one key feature that's of interest to us: it supports multiple RabbitMQ broker URLs with round-robin failover between them.  This makes it possible to configure Launchpad to use RabbitMQ with high availability: if we have a RabbitMQ cluster, then we can configure Launchpad with broker URLs for all the nodes in the cluster, and if one fails then `kombu` will automatically fail over to the next.

To make it practical to configure this, I had to add a `rabbitmq.broker_urls` configuration option, which supersedes the existing broken-out configuration options (`rabbitmq.host`, `rabbitmq.userid`, `rabbitmq.password`, and `rabbitmq.virtual_host`).  For backward compatibility, the old options continue to work as long as `rabbitmq.broker_urls` is unset.

This also includes upgrading to oops-amqp 0.2.0, since that includes a change to accept connection factories that return `kombu` connections.

Dependencies: https://code.launchpad.net/~cjwatson/lp-source-dependencies/+git/lp-source-dependencies/+merge/432064
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:kombu-amqp into launchpad:master.
diff --git a/configs/development/launchpad-lazr.conf b/configs/development/launchpad-lazr.conf
index 97e07c4..4e80454 100644
--- a/configs/development/launchpad-lazr.conf
+++ b/configs/development/launchpad-lazr.conf
@@ -155,10 +155,7 @@ private_base_url: http://private-ppa.launchpad.test
 
 [rabbitmq]
 launch: True
-host: localhost:56720
-userid: guest
-password: guest
-virtual_host: /
+broker_urls: amqp://guest:guest@localhost:56720//
 
 [snappy]
 tools_source: deb http://ppa.launchpad.net/snappy-dev/snapcraft-daily/ubuntu %(series)s main
diff --git a/configs/testrunner/launchpad-lazr.conf b/configs/testrunner/launchpad-lazr.conf
index 8c485dd..5de6421 100644
--- a/configs/testrunner/launchpad-lazr.conf
+++ b/configs/testrunner/launchpad-lazr.conf
@@ -160,10 +160,7 @@ logs_root: lib/lp/soyuz/scripts/tests/ppa-apache-log-files
 
 [rabbitmq]
 launch: False
-host: none
-userid: none
-password: none
-virtual_host: none
+broker_urls: none
 
 [rosetta]
 generate_templates: True
diff --git a/lib/lp/code/model/tests/test_branch.py b/lib/lp/code/model/tests/test_branch.py
index d78c340..919f8d8 100644
--- a/lib/lp/code/model/tests/test_branch.py
+++ b/lib/lp/code/model/tests/test_branch.py
@@ -323,7 +323,7 @@ class TestBranchJobViaCelery(TestCaseWithFactory):
     def test_branchChanged_via_celery(self):
         """Running a job via Celery succeeds and emits expected output."""
         # Delay importing anything that uses Celery until RabbitMQLayer is
-        # running, so that config.rabbitmq.host is defined when
+        # running, so that config.rabbitmq.broker_urls is defined when
         # lp.services.job.celeryconfig is loaded.
         self.useFixture(
             FeatureFixture({"jobs.celery.enabled_classes": "BranchScanJob"})
diff --git a/lib/lp/scripts/runlaunchpad.py b/lib/lp/scripts/runlaunchpad.py
index 904fe57..f753ead 100644
--- a/lib/lp/scripts/runlaunchpad.py
+++ b/lib/lp/scripts/runlaunchpad.py
@@ -8,6 +8,7 @@ import signal
 import subprocess
 import sys
 from contextlib import ExitStack
+from urllib.parse import urlparse
 
 import fixtures
 from lazr.config import as_host_port
@@ -172,7 +173,12 @@ class RabbitService(Service):
         return config.rabbitmq.launch
 
     def launch(self):
-        hostname, port = as_host_port(config.rabbitmq.host, None, None)
+        if config.rabbitmq.broker_urls:
+            parsed_url = urlparse(config.rabbitmq.broker_urls.split()[0])
+            hostname = parsed_url.hostname
+            port = parsed_url.port
+        else:
+            hostname, port = as_host_port(config.rabbitmq.host, None, None)
         self.server = RabbitServer(
             RabbitServerResources(hostname=hostname, port=port)
         )
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index cf9c276..ccb5be3 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1560,13 +1560,24 @@ memory_profile_log:
 # Should RabbitMQ be launched by default?
 # datatype: boolean
 launch: False
-# The host:port at which RabbitMQ is listening.
+# The URL at which RabbitMQ is listening (in the form
+# amqp://USERNAME:PASSWORD@HOSTNAME:PORT/VIRTUAL_HOST), or a space-separated
+# list of such URLs to use round-robin failover between them.
+broker_urls: none
+# The host:port at which RabbitMQ is listening (ignored if broker_urls is
+# set).
 # datatype: string
 host: none
+# The username to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
 # datatype: string
 userid: none
+# The password to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
 # datatype: string
 password: none
+# The virtual host to use when connecting to RabbitMQ (ignored if
+# broker_urls is set).
 # datatype: string
 virtual_host: none
 
diff --git a/lib/lp/services/job/celeryconfig.py b/lib/lp/services/job/celeryconfig.py
index f52186d..7048ca8 100644
--- a/lib/lp/services/job/celeryconfig.py
+++ b/lib/lp/services/job/celeryconfig.py
@@ -86,12 +86,18 @@ def configure(argv):
         "interval_step": 0.1,
         "interval_max": 0.1,
     }
-    result["broker_url"] = "amqp://%s:%s@%s/%s" % (
-        config.rabbitmq.userid,
-        config.rabbitmq.password,
-        config.rabbitmq.host,
-        config.rabbitmq.virtual_host,
-    )
+    if config.rabbitmq.broker_urls:
+        result["broker_url"] = config.rabbitmq.broker_urls.split()
+    else:
+        result["broker_url"] = [
+            "amqp://%s:%s@%s/%s"
+            % (
+                config.rabbitmq.userid,
+                config.rabbitmq.password,
+                config.rabbitmq.host,
+                config.rabbitmq.virtual_host,
+            )
+        ]
     result["beat_schedule"] = {
         "schedule-missing": {
             "task": "lp.services.job.celeryjob.run_missing_ready",
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 351f1d3..a69ad20 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -49,6 +49,7 @@ from lp.services.mail.sendmail import (
     MailController,
     set_immediate_mail_delivery,
 )
+from lp.services.messaging import rabbit
 from lp.services.statsd.interfaces.statsd_client import IStatsdClient
 from lp.services.timeout import (
     get_default_timeout_function,
@@ -291,7 +292,7 @@ class BaseRunnableJob(BaseRunnableJobSource):
 
     def celeryRunOnCommit(self):
         """Configure transaction so that commit runs this job via Celery."""
-        if config.rabbitmq.host is None or not celery_enabled(
+        if not rabbit.is_configured() or not celery_enabled(
             self.__class__.__name__
         ):
             return
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 495f5e2..df50435 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -212,8 +212,8 @@ class TestJobsViaCelery(TestCaseWithFactory):
         self.assertEqual(JobStatus.COMPLETED, job.status)
 
     def test_without_rabbitmq(self):
-        # If no RabbitMQ host is configured, the job is not run via Celery.
-        self.pushConfig("rabbitmq", host="none")
+        # If no RabbitMQ broker is configured, the job is not run via Celery.
+        self.pushConfig("rabbitmq", broker_urls="none")
         self.useFixture(
             FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
         )
diff --git a/lib/lp/services/job/tests/test_celery_configuration.py b/lib/lp/services/job/tests/test_celery_configuration.py
index 3c0735d..4efcc5d 100644
--- a/lib/lp/services/job/tests/test_celery_configuration.py
+++ b/lib/lp/services/job/tests/test_celery_configuration.py
@@ -2,8 +2,10 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
+from textwrap import dedent
 
-from testtools.matchers import MatchesRegex
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesListwise, MatchesRegex
 
 from lp.services.config import config
 from lp.testing import TestCase
@@ -45,7 +47,9 @@ class TestCeleryWorkerConfiguration(TestCase):
         # The port changes between test runs.
         self.assertThat(
             config["broker_url"],
-            MatchesRegex(r"amqp://guest:guest@localhost:\d+//\Z"),
+            MatchesListwise(
+                [MatchesRegex(r"amqp://guest:guest@localhost:\d+//\Z")]
+            ),
         )
         self.assertFalse(config["task_create_missing_queues"])
         self.assertEqual("job", config["task_default_exchange"])
@@ -159,3 +163,25 @@ class TestCeleryWorkerConfiguration(TestCase):
             configure,
             self.command + ["--queue=foo"],
         )
+
+    def test_compat_config(self):
+        # The old-style host/userid/password/virtual_host configuration
+        # format still works.
+        from lp.services.job.celeryconfig import configure
+
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        service_config = dedent(
+            """\
+            [rabbitmq]
+            broker_urls: none
+            host: {hostname}:{port}
+            userid: {userid}
+            password: {password}
+            virtual_host: {virtual_host}
+            """.format(
+                **parsed_url
+            )
+        )
+        with changed_config(service_config):
+            self.check_default_common_parameters(configure([""]))
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 6c8bd34..bfdf60f 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -8,7 +8,8 @@ __all__ = [
     "is_configured",
 ]
 
-import amqp
+import kombu
+from lazr.config import as_host_port
 
 from lp.services.config import config
 from lp.services.messaging.interfaces import MessagingUnavailable
@@ -16,6 +17,8 @@ from lp.services.messaging.interfaces import MessagingUnavailable
 
 def is_configured():
     """Return True if rabbit looks to be configured."""
+    if config.rabbitmq.broker_urls is not None:
+        return True
     return not (
         config.rabbitmq.host is None
         or config.rabbitmq.userid is None
@@ -31,11 +34,16 @@ def connect():
     """
     if not is_configured():
         raise MessagingUnavailable("Incomplete configuration")
-    connection = amqp.Connection(
-        host=config.rabbitmq.host,
-        userid=config.rabbitmq.userid,
-        password=config.rabbitmq.password,
-        virtual_host=config.rabbitmq.virtual_host,
-    )
+    if config.rabbitmq.broker_urls is not None:
+        connection = kombu.Connection(config.rabbitmq.broker_urls.split())
+    else:
+        hostname, port = as_host_port(config.rabbitmq.host, default_port=5672)
+        connection = kombu.Connection(
+            hostname=hostname,
+            userid=config.rabbitmq.userid,
+            password=config.rabbitmq.password,
+            virtual_host=config.rabbitmq.virtual_host,
+            port=port,
+        )
     connection.connect()
     return connection
diff --git a/lib/lp/services/messaging/tests/__init__.py b/lib/lp/services/messaging/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/messaging/tests/__init__.py
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
new file mode 100644
index 0000000..f4d8f7c
--- /dev/null
+++ b/lib/lp/services/messaging/tests/test_rabbit.py
@@ -0,0 +1,121 @@
+# Copyright 2022 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesStructure
+
+from lp.services.config import config
+from lp.services.messaging import rabbit
+from lp.testing import TestCase
+from lp.testing.layers import BaseLayer, RabbitMQLayer
+
+
+class TestIsConfigured(TestCase):
+    layer = BaseLayer
+
+    def test_unconfigured(self):
+        self.assertFalse(rabbit.is_configured())
+
+    def test_broker_url(self):
+        self.pushConfig(
+            "rabbitmq", broker_urls="amqp://guest:guest@rabbitmq.example//"
+        )
+        self.assertTrue(rabbit.is_configured())
+
+    def test_partial_compat(self):
+        self.pushConfig("rabbitmq", host="rabbitmq.example")
+        self.assertFalse(rabbit.is_configured())
+
+    def test_full_compat(self):
+        self.pushConfig(
+            "rabbitmq",
+            host="rabbitmq.example",
+            userid="guest",
+            password="guest",
+            virtual_host="/",
+        )
+        self.assertTrue(rabbit.is_configured())
+
+
+class TestConnect(TestCase):
+    layer = RabbitMQLayer
+
+    def test_unconfigured(self):
+        self.pushConfig("rabbitmq", broker_urls="none")
+        self.assertRaisesWithContent(
+            rabbit.MessagingUnavailable,
+            "Incomplete configuration",
+            rabbit.connect,
+        )
+
+    def test_single_broker_url(self):
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[broker_url],
+                ),
+            )
+
+    def test_multiple_broker_urls(self):
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        self.assertEqual("localhost", parsed_url["hostname"])
+        self.pushConfig(
+            "rabbitmq",
+            broker_urls=(
+                "%s amqp://guest:guest@alternate.example//" % broker_url
+            ),
+        )
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[broker_url, "amqp://guest:guest@alternate.example//"],
+                ),
+            )
+
+    def test_compat_config(self):
+        # The old-style host/userid/password/virtual_host configuration
+        # format still works.
+        self.assertIsNotNone(config.rabbitmq.broker_urls)
+        [broker_url] = config.rabbitmq.broker_urls.split()
+        parsed_url = parse_url(broker_url)
+        self.assertEqual("localhost", parsed_url["hostname"])
+        self.pushConfig(
+            "rabbitmq",
+            broker_urls="none",
+            host="%s:%s" % (parsed_url["hostname"], parsed_url["port"]),
+            userid=parsed_url["userid"],
+            password=parsed_url["password"],
+            virtual_host=parsed_url["virtual_host"],
+        )
+        with rabbit.connect() as connection:
+            self.assertThat(
+                connection,
+                MatchesStructure.byEquality(
+                    # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+                    hostname="127.0.0.1",
+                    userid=parsed_url["userid"],
+                    password=parsed_url["password"],
+                    virtual_host=parsed_url["virtual_host"],
+                    port=int(parsed_url["port"]),
+                    alt=[],
+                ),
+            )
diff --git a/lib/lp/services/rabbit/server.py b/lib/lp/services/rabbit/server.py
index a023fcb..0cd8f6d 100644
--- a/lib/lp/services/rabbit/server.py
+++ b/lib/lp/services/rabbit/server.py
@@ -21,13 +21,13 @@ class RabbitServer(rabbitfixture.server.RabbitServer):
 
     def setUp(self):
         super().setUp()
+        # The two trailing slashes here are deliberate: this has the effect
+        # of setting the virtual host to "/" rather than to the empty
+        # string.
         self.config.service_config = dedent(
             """\
             [rabbitmq]
-            host: localhost:%d
-            userid: guest
-            password: guest
-            virtual_host: /
+            broker_urls: amqp://guest:guest@localhost:%d//
             """
             % self.config.port
         )
diff --git a/lib/lp/services/rabbit/tests/test_server.py b/lib/lp/services/rabbit/tests/test_server.py
index aa14ad8..4afc670 100644
--- a/lib/lp/services/rabbit/tests/test_server.py
+++ b/lib/lp/services/rabbit/tests/test_server.py
@@ -31,10 +31,9 @@ class TestRabbitServer(TestCase):
         service_config.read_file(io.StringIO(fixture.config.service_config))
         self.assertEqual(["rabbitmq"], service_config.sections())
         expected = {
-            "host": "localhost:%d" % fixture.config.port,
-            "userid": "guest",
-            "password": "guest",
-            "virtual_host": "/",
+            "broker_urls": (
+                "amqp://guest:guest@localhost:%d//" % fixture.config.port
+            ),
         }
         observed = dict(service_config.items("rabbitmq"))
         self.assertEqual(expected, observed)
diff --git a/lib/lp/testing/fixture.py b/lib/lp/testing/fixture.py
index f112eb1..d5a7b74 100644
--- a/lib/lp/testing/fixture.py
+++ b/lib/lp/testing/fixture.py
@@ -23,7 +23,6 @@ import socket
 import time
 from configparser import ConfigParser
 
-import amqp
 import oops
 import oops_amqp
 import pgbouncer.fixture
@@ -391,24 +390,17 @@ class CaptureOops(Fixture):
             return
         # Send ourselves a message: when we receive this, we've processed all
         # oopses created before sync() was invoked.
-        message = amqp.Message(self.AMQP_SENTINEL)
-        # Match what oops publishing does
-        message.properties["delivery_mode"] = 2
         # Publish the message via a new channel (otherwise rabbit
         # shortcircuits it straight back to us, apparently).
-        connection = connect()
-        try:
-            channel = connection.channel()
-            try:
-                channel.basic_publish(
-                    message,
-                    config.error_reports.error_exchange,
-                    config.error_reports.error_queue_key,
+        with connect() as connection:
+            with connection.channel() as channel:
+                connection.Producer(channel).publish(
+                    body=self.AMQP_SENTINEL,
+                    routing_key=config.error_reports.error_queue_key,
+                    # Match what oops publishing does.
+                    delivery_mode="persistent",
+                    exchange=config.error_reports.error_exchange,
                 )
-            finally:
-                channel.close()
-        finally:
-            connection.close()
         receiver = oops_amqp.Receiver(
             self.oops_config, connect, self.queue_name
         )
diff --git a/lib/lp/testing/tests/test_layers_functional.py b/lib/lp/testing/tests/test_layers_functional.py
index 17a3412..15452fd 100644
--- a/lib/lp/testing/tests/test_layers_functional.py
+++ b/lib/lp/testing/tests/test_layers_functional.py
@@ -13,7 +13,6 @@ import signal
 from urllib.error import HTTPError
 from urllib.request import urlopen
 
-import amqp
 import six
 from fixtures import EnvironmentVariableFixture, Fixture, TestWithFixtures
 from zope.component import getUtility
@@ -23,6 +22,7 @@ from lp.services.config import config
 from lp.services.librarian.client import LibrarianClient, UploadFailed
 from lp.services.librarian.interfaces.client import ILibrarianClient
 from lp.services.memcache.client import memcache_client_factory
+from lp.services.messaging import rabbit
 from lp.services.pidfile import pidfile_path
 from lp.testing import TestCase
 from lp.testing.layers import (
@@ -259,19 +259,10 @@ class BaseTestCase(TestCase):
             )
 
     def testRabbitWorking(self):
-        rabbitmq = config.rabbitmq
         if not self.want_rabbitmq:
-            self.assertEqual(None, rabbitmq.host)
+            self.assertFalse(rabbit.is_configured())
         else:
-            self.assertNotEqual(None, rabbitmq.host)
-            conn = amqp.Connection(
-                host=rabbitmq.host,
-                userid=rabbitmq.userid,
-                password=rabbitmq.password,
-                virtual_host=rabbitmq.virtual_host,
-            )
-            conn.connect()
-            conn.close()
+            rabbit.connect().close()
 
 
 class MemcachedTestCase(BaseTestCase):
diff --git a/requirements/launchpad.txt b/requirements/launchpad.txt
index 30b4cfe..fe72892 100644
--- a/requirements/launchpad.txt
+++ b/requirements/launchpad.txt
@@ -98,7 +98,7 @@ netifaces==0.11.0
 oauth==1.0
 oauthlib==3.1.0
 oops==0.0.14
-oops-amqp==0.1.0
+oops-amqp==0.2.0
 oops-datedir-repo==0.0.24
 oops-datedir2amqp==0.1.0
 oops-timeline==0.0.3
diff --git a/setup.cfg b/setup.cfg
index a13cb50..f3f51e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -44,6 +44,7 @@ install_requires =
     importlib-resources; python_version < "3.7"
     ipython
     jsautobuild
+    kombu
     launchpad-buildd
     launchpadlib
     lazr.batchnavigator