launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29353
[Merge] ~cjwatson/launchpad:kombu-amqp into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:kombu-amqp into launchpad:master.
Commit message:
Support multiple RabbitMQ broker URLs
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/432065
`kombu` (already used by `celery`) is a higher-level messaging library than `amqp`. For our purposes, we can treat it mostly as a wrapper for `amqp` with some slightly more convenient interfaces, but it has one key feature that's of interest to us: it supports multiple RabbitMQ broker URLs with round-robin failover between them. This makes it possible to configure Launchpad to use RabbitMQ with high availability: if we have a RabbitMQ cluster, then we can configure Launchpad with broker URLs for all the nodes in the cluster, and if one fails then `kombu` will automatically fail over to the next.
To make it practical to configure this, I had to add a `rabbitmq.broker_urls` configuration option, which supersedes the existing broken-out configuration options (`rabbitmq.host`, `rabbitmq.userid`, `rabbitmq.password`, and `rabbitmq.virtual_host`). For backward compatibility, the old options continue to work as long as `rabbitmq.broker_urls` is unset.
This also includes upgrading to oops-amqp 0.2.0, since that includes a change to accept connection factories that return `kombu` connections.
Dependencies: https://code.launchpad.net/~cjwatson/lp-source-dependencies/+git/lp-source-dependencies/+merge/432064
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:kombu-amqp into launchpad:master.
diff --git a/configs/development/launchpad-lazr.conf b/configs/development/launchpad-lazr.conf
index 97e07c4..4e80454 100644
--- a/configs/development/launchpad-lazr.conf
+++ b/configs/development/launchpad-lazr.conf
@@ -155,10 +155,7 @@ private_base_url: http://private-ppa.launchpad.test
[rabbitmq]
launch: True
-host: localhost:56720
-userid: guest
-password: guest
-virtual_host: /
+broker_urls: amqp://guest:guest@localhost:56720//
[snappy]
tools_source: deb http://ppa.launchpad.net/snappy-dev/snapcraft-daily/ubuntu %(series)s main
diff --git a/configs/testrunner/launchpad-lazr.conf b/configs/testrunner/launchpad-lazr.conf
index 8c485dd..5de6421 100644
--- a/configs/testrunner/launchpad-lazr.conf
+++ b/configs/testrunner/launchpad-lazr.conf
@@ -160,10 +160,7 @@ logs_root: lib/lp/soyuz/scripts/tests/ppa-apache-log-files
[rabbitmq]
launch: False
-host: none
-userid: none
-password: none
-virtual_host: none
+broker_urls: none
[rosetta]
generate_templates: True
diff --git a/lib/lp/code/model/tests/test_branch.py b/lib/lp/code/model/tests/test_branch.py
index d78c340..919f8d8 100644
--- a/lib/lp/code/model/tests/test_branch.py
+++ b/lib/lp/code/model/tests/test_branch.py
@@ -323,7 +323,7 @@ class TestBranchJobViaCelery(TestCaseWithFactory):
def test_branchChanged_via_celery(self):
"""Running a job via Celery succeeds and emits expected output."""
# Delay importing anything that uses Celery until RabbitMQLayer is
- # running, so that config.rabbitmq.host is defined when
+ # running, so that config.rabbitmq.broker_urls is defined when
# lp.services.job.celeryconfig is loaded.
self.useFixture(
FeatureFixture({"jobs.celery.enabled_classes": "BranchScanJob"})
diff --git a/lib/lp/scripts/runlaunchpad.py b/lib/lp/scripts/runlaunchpad.py
index 904fe57..f753ead 100644
--- a/lib/lp/scripts/runlaunchpad.py
+++ b/lib/lp/scripts/runlaunchpad.py
@@ -8,6 +8,7 @@ import signal
import subprocess
import sys
from contextlib import ExitStack
+from urllib.parse import urlparse
import fixtures
from lazr.config import as_host_port
@@ -172,7 +173,12 @@ class RabbitService(Service):
return config.rabbitmq.launch
def launch(self):
- hostname, port = as_host_port(config.rabbitmq.host, None, None)
+ if config.rabbitmq.broker_urls:
+ parsed_url = urlparse(config.rabbitmq.broker_urls.split()[0])
+ hostname = parsed_url.hostname
+ port = parsed_url.port
+ else:
+ hostname, port = as_host_port(config.rabbitmq.host, None, None)
self.server = RabbitServer(
RabbitServerResources(hostname=hostname, port=port)
)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index cf9c276..ccb5be3 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1560,13 +1560,24 @@ memory_profile_log:
# Should RabbitMQ be launched by default?
# datatype: boolean
launch: False
-# The host:port at which RabbitMQ is listening.
+# The URL at which RabbitMQ is listening (in the form
+# amqp://USERNAME:PASSWORD@HOSTNAME:PORT/VIRTUAL_HOST), or a space-separated
+# list of such URLs to use round-robin failover between them.
+broker_urls: none
+# The host:port at which RabbitMQ is listening (ignored if broker_urls is
+# set).
# datatype: string
host: none
+# The username to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
# datatype: string
userid: none
+# The password to use when connecting to RabbitMQ (ignored if broker_urls is
+# set).
# datatype: string
password: none
+# The virtual host to use when connecting to RabbitMQ (ignored if
+# broker_urls is set).
# datatype: string
virtual_host: none
diff --git a/lib/lp/services/job/celeryconfig.py b/lib/lp/services/job/celeryconfig.py
index f52186d..7048ca8 100644
--- a/lib/lp/services/job/celeryconfig.py
+++ b/lib/lp/services/job/celeryconfig.py
@@ -86,12 +86,18 @@ def configure(argv):
"interval_step": 0.1,
"interval_max": 0.1,
}
- result["broker_url"] = "amqp://%s:%s@%s/%s" % (
- config.rabbitmq.userid,
- config.rabbitmq.password,
- config.rabbitmq.host,
- config.rabbitmq.virtual_host,
- )
+ if config.rabbitmq.broker_urls:
+ result["broker_url"] = config.rabbitmq.broker_urls.split()
+ else:
+ result["broker_url"] = [
+ "amqp://%s:%s@%s/%s"
+ % (
+ config.rabbitmq.userid,
+ config.rabbitmq.password,
+ config.rabbitmq.host,
+ config.rabbitmq.virtual_host,
+ )
+ ]
result["beat_schedule"] = {
"schedule-missing": {
"task": "lp.services.job.celeryjob.run_missing_ready",
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 351f1d3..a69ad20 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -49,6 +49,7 @@ from lp.services.mail.sendmail import (
MailController,
set_immediate_mail_delivery,
)
+from lp.services.messaging import rabbit
from lp.services.statsd.interfaces.statsd_client import IStatsdClient
from lp.services.timeout import (
get_default_timeout_function,
@@ -291,7 +292,7 @@ class BaseRunnableJob(BaseRunnableJobSource):
def celeryRunOnCommit(self):
"""Configure transaction so that commit runs this job via Celery."""
- if config.rabbitmq.host is None or not celery_enabled(
+ if not rabbit.is_configured() or not celery_enabled(
self.__class__.__name__
):
return
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 495f5e2..df50435 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -212,8 +212,8 @@ class TestJobsViaCelery(TestCaseWithFactory):
self.assertEqual(JobStatus.COMPLETED, job.status)
def test_without_rabbitmq(self):
- # If no RabbitMQ host is configured, the job is not run via Celery.
- self.pushConfig("rabbitmq", host="none")
+ # If no RabbitMQ broker is configured, the job is not run via Celery.
+ self.pushConfig("rabbitmq", broker_urls="none")
self.useFixture(
FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
)
diff --git a/lib/lp/services/job/tests/test_celery_configuration.py b/lib/lp/services/job/tests/test_celery_configuration.py
index 3c0735d..4efcc5d 100644
--- a/lib/lp/services/job/tests/test_celery_configuration.py
+++ b/lib/lp/services/job/tests/test_celery_configuration.py
@@ -2,8 +2,10 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
from contextlib import contextmanager
+from textwrap import dedent
-from testtools.matchers import MatchesRegex
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesListwise, MatchesRegex
from lp.services.config import config
from lp.testing import TestCase
@@ -45,7 +47,9 @@ class TestCeleryWorkerConfiguration(TestCase):
# The port changes between test runs.
self.assertThat(
config["broker_url"],
- MatchesRegex(r"amqp://guest:guest@localhost:\d+//\Z"),
+ MatchesListwise(
+ [MatchesRegex(r"amqp://guest:guest@localhost:\d+//\Z")]
+ ),
)
self.assertFalse(config["task_create_missing_queues"])
self.assertEqual("job", config["task_default_exchange"])
@@ -159,3 +163,25 @@ class TestCeleryWorkerConfiguration(TestCase):
configure,
self.command + ["--queue=foo"],
)
+
+ def test_compat_config(self):
+ # The old-style host/userid/password/virtual_host configuration
+ # format still works.
+ from lp.services.job.celeryconfig import configure
+
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ service_config = dedent(
+ """\
+ [rabbitmq]
+ broker_urls: none
+ host: {hostname}:{port}
+ userid: {userid}
+ password: {password}
+ virtual_host: {virtual_host}
+ """.format(
+ **parsed_url
+ )
+ )
+ with changed_config(service_config):
+ self.check_default_common_parameters(configure([""]))
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 6c8bd34..bfdf60f 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -8,7 +8,8 @@ __all__ = [
"is_configured",
]
-import amqp
+import kombu
+from lazr.config import as_host_port
from lp.services.config import config
from lp.services.messaging.interfaces import MessagingUnavailable
@@ -16,6 +17,8 @@ from lp.services.messaging.interfaces import MessagingUnavailable
def is_configured():
"""Return True if rabbit looks to be configured."""
+ if config.rabbitmq.broker_urls is not None:
+ return True
return not (
config.rabbitmq.host is None
or config.rabbitmq.userid is None
@@ -31,11 +34,16 @@ def connect():
"""
if not is_configured():
raise MessagingUnavailable("Incomplete configuration")
- connection = amqp.Connection(
- host=config.rabbitmq.host,
- userid=config.rabbitmq.userid,
- password=config.rabbitmq.password,
- virtual_host=config.rabbitmq.virtual_host,
- )
+ if config.rabbitmq.broker_urls is not None:
+ connection = kombu.Connection(config.rabbitmq.broker_urls.split())
+ else:
+ hostname, port = as_host_port(config.rabbitmq.host, default_port=5672)
+ connection = kombu.Connection(
+ hostname=hostname,
+ userid=config.rabbitmq.userid,
+ password=config.rabbitmq.password,
+ virtual_host=config.rabbitmq.virtual_host,
+ port=port,
+ )
connection.connect()
return connection
diff --git a/lib/lp/services/messaging/tests/__init__.py b/lib/lp/services/messaging/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/messaging/tests/__init__.py
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
new file mode 100644
index 0000000..f4d8f7c
--- /dev/null
+++ b/lib/lp/services/messaging/tests/test_rabbit.py
@@ -0,0 +1,121 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from kombu.utils.url import parse_url
+from testtools.matchers import MatchesStructure
+
+from lp.services.config import config
+from lp.services.messaging import rabbit
+from lp.testing import TestCase
+from lp.testing.layers import BaseLayer, RabbitMQLayer
+
+
+class TestIsConfigured(TestCase):
+ layer = BaseLayer
+
+ def test_unconfigured(self):
+ self.assertFalse(rabbit.is_configured())
+
+ def test_broker_url(self):
+ self.pushConfig(
+ "rabbitmq", broker_urls="amqp://guest:guest@rabbitmq.example//"
+ )
+ self.assertTrue(rabbit.is_configured())
+
+ def test_partial_compat(self):
+ self.pushConfig("rabbitmq", host="rabbitmq.example")
+ self.assertFalse(rabbit.is_configured())
+
+ def test_full_compat(self):
+ self.pushConfig(
+ "rabbitmq",
+ host="rabbitmq.example",
+ userid="guest",
+ password="guest",
+ virtual_host="/",
+ )
+ self.assertTrue(rabbit.is_configured())
+
+
+class TestConnect(TestCase):
+ layer = RabbitMQLayer
+
+ def test_unconfigured(self):
+ self.pushConfig("rabbitmq", broker_urls="none")
+ self.assertRaisesWithContent(
+ rabbit.MessagingUnavailable,
+ "Incomplete configuration",
+ rabbit.connect,
+ )
+
+ def test_single_broker_url(self):
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[broker_url],
+ ),
+ )
+
+ def test_multiple_broker_urls(self):
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ self.assertEqual("localhost", parsed_url["hostname"])
+ self.pushConfig(
+ "rabbitmq",
+ broker_urls=(
+ "%s amqp://guest:guest@alternate.example//" % broker_url
+ ),
+ )
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[broker_url, "amqp://guest:guest@alternate.example//"],
+ ),
+ )
+
+ def test_compat_config(self):
+ # The old-style host/userid/password/virtual_host configuration
+ # format still works.
+ self.assertIsNotNone(config.rabbitmq.broker_urls)
+ [broker_url] = config.rabbitmq.broker_urls.split()
+ parsed_url = parse_url(broker_url)
+ self.assertEqual("localhost", parsed_url["hostname"])
+ self.pushConfig(
+ "rabbitmq",
+ broker_urls="none",
+ host="%s:%s" % (parsed_url["hostname"], parsed_url["port"]),
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ )
+ with rabbit.connect() as connection:
+ self.assertThat(
+ connection,
+ MatchesStructure.byEquality(
+ # kombu.transport.pyamqp forces "localhost" to "127.0.0.1".
+ hostname="127.0.0.1",
+ userid=parsed_url["userid"],
+ password=parsed_url["password"],
+ virtual_host=parsed_url["virtual_host"],
+ port=int(parsed_url["port"]),
+ alt=[],
+ ),
+ )
diff --git a/lib/lp/services/rabbit/server.py b/lib/lp/services/rabbit/server.py
index a023fcb..0cd8f6d 100644
--- a/lib/lp/services/rabbit/server.py
+++ b/lib/lp/services/rabbit/server.py
@@ -21,13 +21,13 @@ class RabbitServer(rabbitfixture.server.RabbitServer):
def setUp(self):
super().setUp()
+ # The two trailing slashes here are deliberate: this has the effect
+ # of setting the virtual host to "/" rather than to the empty
+ # string.
self.config.service_config = dedent(
"""\
[rabbitmq]
- host: localhost:%d
- userid: guest
- password: guest
- virtual_host: /
+ broker_urls: amqp://guest:guest@localhost:%d//
"""
% self.config.port
)
diff --git a/lib/lp/services/rabbit/tests/test_server.py b/lib/lp/services/rabbit/tests/test_server.py
index aa14ad8..4afc670 100644
--- a/lib/lp/services/rabbit/tests/test_server.py
+++ b/lib/lp/services/rabbit/tests/test_server.py
@@ -31,10 +31,9 @@ class TestRabbitServer(TestCase):
service_config.read_file(io.StringIO(fixture.config.service_config))
self.assertEqual(["rabbitmq"], service_config.sections())
expected = {
- "host": "localhost:%d" % fixture.config.port,
- "userid": "guest",
- "password": "guest",
- "virtual_host": "/",
+ "broker_urls": (
+ "amqp://guest:guest@localhost:%d//" % fixture.config.port
+ ),
}
observed = dict(service_config.items("rabbitmq"))
self.assertEqual(expected, observed)
diff --git a/lib/lp/testing/fixture.py b/lib/lp/testing/fixture.py
index f112eb1..d5a7b74 100644
--- a/lib/lp/testing/fixture.py
+++ b/lib/lp/testing/fixture.py
@@ -23,7 +23,6 @@ import socket
import time
from configparser import ConfigParser
-import amqp
import oops
import oops_amqp
import pgbouncer.fixture
@@ -391,24 +390,17 @@ class CaptureOops(Fixture):
return
# Send ourselves a message: when we receive this, we've processed all
# oopses created before sync() was invoked.
- message = amqp.Message(self.AMQP_SENTINEL)
- # Match what oops publishing does
- message.properties["delivery_mode"] = 2
# Publish the message via a new channel (otherwise rabbit
# shortcircuits it straight back to us, apparently).
- connection = connect()
- try:
- channel = connection.channel()
- try:
- channel.basic_publish(
- message,
- config.error_reports.error_exchange,
- config.error_reports.error_queue_key,
+ with connect() as connection:
+ with connection.channel() as channel:
+ connection.Producer(channel).publish(
+ body=self.AMQP_SENTINEL,
+ routing_key=config.error_reports.error_queue_key,
+ # Match what oops publishing does.
+ delivery_mode="persistent",
+ exchange=config.error_reports.error_exchange,
)
- finally:
- channel.close()
- finally:
- connection.close()
receiver = oops_amqp.Receiver(
self.oops_config, connect, self.queue_name
)
diff --git a/lib/lp/testing/tests/test_layers_functional.py b/lib/lp/testing/tests/test_layers_functional.py
index 17a3412..15452fd 100644
--- a/lib/lp/testing/tests/test_layers_functional.py
+++ b/lib/lp/testing/tests/test_layers_functional.py
@@ -13,7 +13,6 @@ import signal
from urllib.error import HTTPError
from urllib.request import urlopen
-import amqp
import six
from fixtures import EnvironmentVariableFixture, Fixture, TestWithFixtures
from zope.component import getUtility
@@ -23,6 +22,7 @@ from lp.services.config import config
from lp.services.librarian.client import LibrarianClient, UploadFailed
from lp.services.librarian.interfaces.client import ILibrarianClient
from lp.services.memcache.client import memcache_client_factory
+from lp.services.messaging import rabbit
from lp.services.pidfile import pidfile_path
from lp.testing import TestCase
from lp.testing.layers import (
@@ -259,19 +259,10 @@ class BaseTestCase(TestCase):
)
def testRabbitWorking(self):
- rabbitmq = config.rabbitmq
if not self.want_rabbitmq:
- self.assertEqual(None, rabbitmq.host)
+ self.assertFalse(rabbit.is_configured())
else:
- self.assertNotEqual(None, rabbitmq.host)
- conn = amqp.Connection(
- host=rabbitmq.host,
- userid=rabbitmq.userid,
- password=rabbitmq.password,
- virtual_host=rabbitmq.virtual_host,
- )
- conn.connect()
- conn.close()
+ rabbit.connect().close()
class MemcachedTestCase(BaseTestCase):
diff --git a/requirements/launchpad.txt b/requirements/launchpad.txt
index 30b4cfe..fe72892 100644
--- a/requirements/launchpad.txt
+++ b/requirements/launchpad.txt
@@ -98,7 +98,7 @@ netifaces==0.11.0
oauth==1.0
oauthlib==3.1.0
oops==0.0.14
-oops-amqp==0.1.0
+oops-amqp==0.2.0
oops-datedir-repo==0.0.24
oops-datedir2amqp==0.1.0
oops-timeline==0.0.3
diff --git a/setup.cfg b/setup.cfg
index a13cb50..f3f51e3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -44,6 +44,7 @@ install_requires =
importlib-resources; python_version < "3.7"
ipython
jsautobuild
+ kombu
launchpad-buildd
launchpadlib
lazr.batchnavigator