launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07151
[Merge] lp:~adeuring/launchpad/celery-config into lp:launchpad
Abel Deuring has proposed merging lp:~adeuring/launchpad/celery-config into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~adeuring/launchpad/celery-config/+merge/102535
This branch adds configuration details we need for the Celery based
job runner.
Celery reads its configuration from a Python file (we use
lp.services.job.celeryconfig). This module reads the "real config data"
from our regular lp.services.config.config module.
lp.services.job.celeryconfig already existed; I added several parameters
that were missing, and I added some sanity checks.
Note that the config module is used in two different places: The app
servers load it in order to send messages to a RabbitMQ server; several
celeryd instances consume these messages and run the jobs described by
them.
Normally, a celeryconfig modules looks like so:
PARAMETER_1 = value_1
PARAMETER_2 = value_2
i.e., the variables are set when the module is loaded. This makes it
difficult to test variants of the configuration -- re-loading a Python
module can be quite painful, so I moved this into a function configure().
The check for circular chain of linked fallback queues is quite naive and
inefficient, but I think it is good enough for our purposes: It is
unlikely that we will have, let's say, 500 different queues where some
complex fallback chains need to be tested.
test: ./bin/test -vvt lp.services.job.tests.test_celery_configuration
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/config/schema-lazr.conf
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/tests/test_celery_configuration.py
./lib/lp/services/config/schema-lazr.conf
501: Line exceeds 80 characters.
1112: Line exceeds 80 characters.
1119: Line exceeds 80 characters.
1711: Line exceeds 80 characters.
These are lines I did not change.
--
https://code.launchpad.net/~adeuring/launchpad/celery-config/+merge/102535
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/celery-config into lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2012-04-06 17:28:25 +0000
+++ lib/lp/services/config/schema-lazr.conf 2012-04-18 15:04:32 +0000
@@ -1955,3 +1955,42 @@
module: lp.answers.interfaces.questionjob
dbuser: answertracker
crontab_group: MAIN
+
+[job_runner_queues]
+# Each key is the name of a queue; the value is the AMQP binding key.
+# Each of the queues must have config section with the queue name.
+job: job
+job_slow: job_slow
+branch_write_job: branch_write_job
+branch_write_job_slow: branch_write_job_slow
+
+# The main job queue.
+[job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 600
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1
+
+[branch_write_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 600
+fallback_queue: branch_write_job_slow
+concurrency: 3
+
+[branch_write_job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-04-18 15:04:32 +0000
@@ -1,16 +1,97 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import argparse
+import sys
from lp.services.config import config
-host, port = config.rabbitmq.host.split(':')
-BROKER_HOST = host
-BROKER_PORT = port
-BROKER_USER = config.rabbitmq.userid
-BROKER_PASSWORD = config.rabbitmq.password
-BROKER_VHOST = config.rabbitmq.virtual_host
-CELERY_IMPORTS = ("lp.services.job.celeryjob", )
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_QUEUES = {
- "branch_write_job": {"binding_key": "branch_write_job"},
- "job": {"binding_key": "job"},
-}
-CELERY_DEFAULT_EXCHANGE = "job"
-CELERY_DEFAULT_QUEUE = "job"
-CELERY_CREATE_MISSING_QUEUES = False
+
+
+class ConfigurationError(Exception):
+ pass
+
+
+def check_circular_fallbacks(queue):
+ """Check for curcular fallback queues.
+
+ A circular chain of fallback queues could keep a job forever queued
+ if it times out in all queues.
+ """
+ linked_queues = []
+ while config[queue].fallback_queue != '':
+ linked_queues.append(queue)
+ queue = config[queue].fallback_queue
+ if queue in linked_queues:
+ raise ConfigurationError(
+ 'Circular chain of fallback queues: '
+ '%s already in %s' % (queue, linked_queues))
+
+
+def configure():
+ """Set the Celery parameters.
+
+ Doing this in a function is convenient for testing.
+ """
+ global BROKER_HOST
+ global BROKER_PORT
+ global BROKER_USER
+ global BROKER_PASSWORD
+ global BROKER_VHOST
+ global CELERY_CREATE_MISSING_QUEUES
+ global CELERY_DEFAULT_EXCHANGE
+ global CELERY_DEFAULT_QUEUE
+ global CELERY_IMPORTS
+ global CELERY_QUEUES
+ global CELERY_RESULT_BACKEND
+ global CELERYD_CONCURRENCY
+ global CELERYD_TASK_SOFT_TIME_LIMIT
+ global FALLBACK
+
+ CELERY_QUEUES = {}
+ for queue_name in config.job_runner_queues:
+ CELERY_QUEUES[queue_name] = {
+ 'binding_key': config.job_runner_queues[queue_name],
+ }
+ check_circular_fallbacks(queue_name)
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-Q', '--queues')
+ args = parser.parse_known_args(sys.argv)
+ queues = args[0].queues
+ # A queue must be specified as a command line parameter for each
+ # celeryd instance, but this is not required for a Launchpad app server.
+ if 'celeryd' in sys.argv[0]:
+ if queues is None or queues == '':
+ raise ConfigurationError('A queue must be specified.')
+ queues = queues.split(',')
+ # Allow only one queue per celeryd instance. More than one queue
+ # would require a check for consistent timeout values, and especially
+ # a better way to specify a fallback queue.
+ if len(queues) > 1:
+ raise ConfigurationError(
+ 'A celeryd instance may serve only one queue.')
+ queue = queues[0]
+ if queue not in CELERY_QUEUES:
+ raise ConfigurationError(
+ 'Queue %s is not configured in schema-lazr.conf' % queue)
+ CELERYD_TASK_SOFT_TIME_LIMIT = config[queue].timeout
+ if config[queue].fallback_queue != '':
+ FALLBACK = config[queue].fallback_queue
+ CELERYD_CONCURRENCY = config[queue].concurrency
+
+ host, port = config.rabbitmq.host.split(':')
+ BROKER_HOST = host
+ BROKER_PORT = port
+ BROKER_USER = config.rabbitmq.userid
+ BROKER_PASSWORD = config.rabbitmq.password
+ BROKER_VHOST = config.rabbitmq.virtual_host
+ CELERY_IMPORTS = ("lp.services.job.celeryjob", )
+ CELERY_RESULT_BACKEND = "amqp"
+ CELERY_DEFAULT_EXCHANGE = "job"
+ CELERY_DEFAULT_QUEUE = "job"
+ CELERY_CREATE_MISSING_QUEUES = False
+
+try:
+ configure()
+except ConfigurationError, error:
+ print >>sys.stderr, error
+ sys.exit(1)
=== added file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-04-18 15:04:32 +0000
@@ -0,0 +1,167 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from contextlib import contextmanager
+import sys
+
+from lp.services.config import config
+from lp.testing import TestCase
+from lp.testing.layers import RabbitMQLayer
+
+
+def get_celery_configuration():
+ """Return the current celeryconfiguration"""
+ # Import late because the RabbitMQ parameters are set during layer setup.
+ from lp.services.job import celeryconfig
+ celeryconfig.configure()
+ return celeryconfig
+
+
+@contextmanager
+def faked_command_line(argv):
+ """Fake sys.argv to pretend that celeryd is started."""
+ real_argv = sys.argv
+ sys.argv = argv
+ yield
+ sys.argv = real_argv
+
+
+@contextmanager
+def changed_config(changes):
+ config.push('test_changes', changes)
+ yield
+ config.pop('test_changes')
+
+
+class TestCeleryConfiguration(TestCase):
+ layer = RabbitMQLayer
+
+ def tearDown(self):
+ # celeryconfig.configure() defines celeryconfig.FALLBACK in some
+ # tests but subsequent tests may assume that this variable does
+ # not exist, so remove this variable, if it has been created
+ # by a test.
+ from lp.services.job import celeryconfig
+ try:
+ del celeryconfig.FALLBACK
+ except AttributeError:
+ pass
+ super(TestCeleryConfiguration, self).tearDown()
+
+ def check_default_common_parameters(self, config):
+ # Tests for default config values that are set for app servers
+ # and for celeryd instances.
+
+ # Four queues are defined; the binding key for each queue is
+ # just the queue name.
+ queue_names = [
+ 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
+ queues = config.CELERY_QUEUES
+ self.assertEqual(queue_names, sorted(queues))
+ for name in queue_names:
+ self.assertEqual(name, config.CELERY_QUEUES[name]['binding_key'])
+
+ self.assertEqual('localhost', config.BROKER_HOST)
+ # BROKER_PORT changes between test runs, so just check that it
+ # is defined.
+ config.BROKER_PORT
+ self.assertEqual('guest', config.BROKER_USER)
+ self.assertEqual('guest', config.BROKER_PASSWORD)
+ self.assertEqual('/', config.BROKER_VHOST)
+ self.assertFalse(config.CELERY_CREATE_MISSING_QUEUES)
+ self.assertEqual('job', config.CELERY_DEFAULT_EXCHANGE)
+ self.assertEqual('job', config.CELERY_DEFAULT_QUEUE)
+ self.assertEqual(
+ ('lp.services.job.celeryjob', ), config.CELERY_IMPORTS)
+ self.assertEqual('amqp', config.CELERY_RESULT_BACKEND)
+
+ def test_app_server_configuration(self):
+ self.check_default_common_parameters(get_celery_configuration())
+
+ def check_job_specific_celeryd_configutartion(self, expected, config):
+ self.check_default_common_parameters(config)
+ self.assertEqual(expected['concurrency'], config.CELERYD_CONCURRENCY)
+ self.assertEqual(
+ expected['timeout'], config.CELERYD_TASK_SOFT_TIME_LIMIT)
+ self.assertEqual(
+ expected['fallback'], getattr(config, 'FALLBACK', None))
+
+ def test_default_celeryd_configuration_fast_lanes(self):
+ expected = {
+ 'concurrency': 3,
+ 'fallback': 'job_slow',
+ 'timeout': 600,
+ }
+ with faked_command_line(['celeryd', '-Q', 'job']):
+ config = get_celery_configuration()
+ self.check_default_common_parameters(config)
+ self.check_job_specific_celeryd_configutartion(expected, config)
+ with faked_command_line(['celeryd', '-Q', 'branch_write_job']):
+ config = get_celery_configuration()
+ self.check_default_common_parameters(config)
+ expected['fallback'] = 'branch_write_job_slow'
+ self.check_job_specific_celeryd_configutartion(expected, config)
+
+ def test_default_celeryd_configuration_slow_lanes(self):
+ expected = {
+ 'concurrency': 1,
+ 'fallback': None,
+ 'timeout': 86400,
+ }
+ with faked_command_line(['celeryd', '-Q', 'job_slow']):
+ config = get_celery_configuration()
+ self.check_default_common_parameters(config)
+ self.check_job_specific_celeryd_configutartion(expected, config)
+ with faked_command_line(['celeryd', '-Q', 'branch_write_job_slow']):
+ config = get_celery_configuration()
+ self.check_default_common_parameters(config)
+ self.check_job_specific_celeryd_configutartion(expected, config)
+
+ def test_circular_fallback_lanes(self):
+ # Circular fallback lanes are detected.
+ # Import late because the RabbitMQ parameters are set during layer
+ # setup.
+ from lp.services.job.celeryconfig import ConfigurationError
+ with changed_config(
+ """
+ [job_slow]
+ fallback_queue: job
+ """):
+ error = (
+ "Circular chain of fallback queues: job already in "
+ "['job', 'job_slow']"
+ )
+ self.assertRaisesWithContent(
+ ConfigurationError, error, get_celery_configuration)
+
+ def test_missing_queue_parameter_for_celeryd(self):
+ # An exception is raised when celeryd is started without
+ # the parameter -Q.
+ # Import late because the RabbitMQ parameters are set during layer
+ # setup.
+ from lp.services.job.celeryconfig import ConfigurationError
+ with faked_command_line(['celeryd']):
+ error = 'A queue must be specified.'
+ self.assertRaisesWithContent(
+ ConfigurationError, error, get_celery_configuration)
+
+ def test_two_queues_for_celeryd(self):
+ # An exception is raised when celeryd is started for two queues.
+ # Import late because the RabbitMQ parameters are set during layer
+ # setup.
+ from lp.services.job.celeryconfig import ConfigurationError
+ with faked_command_line(['celeryd', '--queue=job,branch_write_job']):
+ error = 'A celeryd instance may serve only one queue.'
+ self.assertRaisesWithContent(
+ ConfigurationError, error, get_celery_configuration)
+
+ def test_unconfigured_queue_for_celeryd(self):
+ # An exception is raised when celeryd is started for a queue that
+ # is not configured.
+ # Import late because the RabbitMQ parameters are set during layer
+ # setup.
+ from lp.services.job.celeryconfig import ConfigurationError
+ with faked_command_line(['celeryd', '--queue=foo']):
+ error = 'Queue foo is not configured in schema-lazr.conf'
+ self.assertRaisesWithContent(
+ ConfigurationError, error, get_celery_configuration)