← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/celery-config into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/celery-config into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/celery-config/+merge/102535

This branch adds configuration details we need for the Celery based
job runner.

Celery reads its configuration from a Python file (we use
lp.services.job.celeryconfig). This module reads the "real config data"
from our regular lp.services.config.config module.

lp.services.job.celeryconfig already existed; I added several parameters
that were missing, and I added some sanity checks.

Note that the config module is used in two different places: The app
servers load it in order to send messages to a RabbitMQ server; several
celeryd instances consume these messages and run the jobs described by
them.

Normally, a celeryconfig modules looks like so:

PARAMETER_1 = value_1
PARAMETER_2 = value_2

i.e., the variables are set when the module is loaded. This makes it
difficult to test variants of the configuration -- re-loading a Python
module can be quite painful, so I moved this into a function configure().

The check for circular chain of linked fallback queues is quite naive and
inefficient, but I think it is good enough for our purposes: It is
unlikely that we will have, let's say, 500 different queues where some
complex fallback chains need to be tested.

test: ./bin/test -vvt lp.services.job.tests.test_celery_configuration

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/config/schema-lazr.conf
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/tests/test_celery_configuration.py

./lib/lp/services/config/schema-lazr.conf
     501: Line exceeds 80 characters.
    1112: Line exceeds 80 characters.
    1119: Line exceeds 80 characters.
    1711: Line exceeds 80 characters.

These are lines I did not change.

-- 
https://code.launchpad.net/~adeuring/launchpad/celery-config/+merge/102535
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/celery-config into lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2012-04-06 17:28:25 +0000
+++ lib/lp/services/config/schema-lazr.conf	2012-04-18 15:04:32 +0000
@@ -1955,3 +1955,42 @@
 module: lp.answers.interfaces.questionjob
 dbuser: answertracker
 crontab_group: MAIN
+
+[job_runner_queues]
+# Each key is the name of a queue; the value is the AMQP binding key.
+# Each of the queues must have config section with the queue name.
+job: job
+job_slow: job_slow
+branch_write_job: branch_write_job
+branch_write_job_slow: branch_write_job_slow
+
+# The main job queue.
+[job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 600
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1
+
+[branch_write_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 600
+fallback_queue: branch_write_job_slow
+concurrency: 3
+
+[branch_write_job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-04-18 15:04:32 +0000
@@ -1,16 +1,97 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import argparse
+import sys
 from lp.services.config import config
-host, port = config.rabbitmq.host.split(':')
-BROKER_HOST = host
-BROKER_PORT = port
-BROKER_USER = config.rabbitmq.userid
-BROKER_PASSWORD = config.rabbitmq.password
-BROKER_VHOST = config.rabbitmq.virtual_host
-CELERY_IMPORTS = ("lp.services.job.celeryjob", )
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_QUEUES = {
-    "branch_write_job": {"binding_key": "branch_write_job"},
-    "job": {"binding_key": "job"},
-}
-CELERY_DEFAULT_EXCHANGE = "job"
-CELERY_DEFAULT_QUEUE = "job"
-CELERY_CREATE_MISSING_QUEUES = False
+
+
+class ConfigurationError(Exception):
+    pass
+
+
+def check_circular_fallbacks(queue):
+    """Check for curcular fallback queues.
+
+    A circular chain of fallback queues could keep a job forever queued
+    if it times out in all queues.
+    """
+    linked_queues = []
+    while config[queue].fallback_queue != '':
+        linked_queues.append(queue)
+        queue = config[queue].fallback_queue
+        if queue in linked_queues:
+            raise ConfigurationError(
+                'Circular chain of fallback queues: '
+                '%s already in %s' % (queue, linked_queues))
+
+
+def configure():
+    """Set the Celery parameters.
+
+    Doing this in a function is convenient for testing.
+    """
+    global BROKER_HOST
+    global BROKER_PORT
+    global BROKER_USER
+    global BROKER_PASSWORD
+    global BROKER_VHOST
+    global CELERY_CREATE_MISSING_QUEUES
+    global CELERY_DEFAULT_EXCHANGE
+    global CELERY_DEFAULT_QUEUE
+    global CELERY_IMPORTS
+    global CELERY_QUEUES
+    global CELERY_RESULT_BACKEND
+    global CELERYD_CONCURRENCY
+    global CELERYD_TASK_SOFT_TIME_LIMIT
+    global FALLBACK
+
+    CELERY_QUEUES = {}
+    for queue_name in config.job_runner_queues:
+        CELERY_QUEUES[queue_name] = {
+            'binding_key': config.job_runner_queues[queue_name],
+            }
+        check_circular_fallbacks(queue_name)
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-Q',  '--queues')
+    args = parser.parse_known_args(sys.argv)
+    queues = args[0].queues
+    # A queue must be specified as a command line parameter for each
+    # celeryd instance, but this is not required for a Launchpad app server.
+    if 'celeryd' in sys.argv[0]:
+        if queues is None or queues == '':
+            raise ConfigurationError('A queue must be specified.')
+        queues = queues.split(',')
+        # Allow only one queue per celeryd instance. More than one queue
+        # would require a check for consistent timeout values, and especially
+        # a better way to specify a fallback queue.
+        if len(queues) > 1:
+            raise ConfigurationError(
+                'A celeryd instance may serve only one queue.')
+        queue = queues[0]
+        if queue not in CELERY_QUEUES:
+            raise ConfigurationError(
+                'Queue %s is not configured in schema-lazr.conf' % queue)
+        CELERYD_TASK_SOFT_TIME_LIMIT = config[queue].timeout
+        if config[queue].fallback_queue != '':
+            FALLBACK = config[queue].fallback_queue
+        CELERYD_CONCURRENCY = config[queue].concurrency
+
+    host, port = config.rabbitmq.host.split(':')
+    BROKER_HOST = host
+    BROKER_PORT = port
+    BROKER_USER = config.rabbitmq.userid
+    BROKER_PASSWORD = config.rabbitmq.password
+    BROKER_VHOST = config.rabbitmq.virtual_host
+    CELERY_IMPORTS = ("lp.services.job.celeryjob", )
+    CELERY_RESULT_BACKEND = "amqp"
+    CELERY_DEFAULT_EXCHANGE = "job"
+    CELERY_DEFAULT_QUEUE = "job"
+    CELERY_CREATE_MISSING_QUEUES = False
+
+try:
+    configure()
+except ConfigurationError, error:
+    print >>sys.stderr, error
+    sys.exit(1)

=== added file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2012-04-18 15:04:32 +0000
@@ -0,0 +1,167 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from contextlib import contextmanager
+import sys
+
+from lp.services.config import config
+from lp.testing import TestCase
+from lp.testing.layers import RabbitMQLayer
+
+
+def get_celery_configuration():
+    """Return the current celeryconfiguration"""
+    # Import late because the RabbitMQ parameters are set during layer setup.
+    from lp.services.job import celeryconfig
+    celeryconfig.configure()
+    return celeryconfig
+
+
+@contextmanager
+def faked_command_line(argv):
+    """Fake sys.argv to pretend that celeryd is started."""
+    real_argv = sys.argv
+    sys.argv = argv
+    yield
+    sys.argv = real_argv
+
+
+@contextmanager
+def changed_config(changes):
+    config.push('test_changes', changes)
+    yield
+    config.pop('test_changes')
+
+
+class TestCeleryConfiguration(TestCase):
+    layer = RabbitMQLayer
+
+    def tearDown(self):
+        # celeryconfig.configure() defines celeryconfig.FALLBACK in some
+        # tests but subsequent tests may assume that this variable does
+        # not exist, so remove this variable, if it has been created
+        # by a test.
+        from lp.services.job import celeryconfig
+        try:
+            del celeryconfig.FALLBACK
+        except AttributeError:
+            pass
+        super(TestCeleryConfiguration, self).tearDown()
+
+    def check_default_common_parameters(self, config):
+        # Tests for default config values that are set for app servers
+        # and for celeryd instances.
+
+        # Four queues are defined; the binding key for each queue is
+        # just the queue name.
+        queue_names = [
+            'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
+        queues = config.CELERY_QUEUES
+        self.assertEqual(queue_names, sorted(queues))
+        for name in queue_names:
+            self.assertEqual(name, config.CELERY_QUEUES[name]['binding_key'])
+
+        self.assertEqual('localhost', config.BROKER_HOST)
+        # BROKER_PORT changes between test runs, so just check that it
+        # is defined.
+        config.BROKER_PORT
+        self.assertEqual('guest', config.BROKER_USER)
+        self.assertEqual('guest', config.BROKER_PASSWORD)
+        self.assertEqual('/', config.BROKER_VHOST)
+        self.assertFalse(config.CELERY_CREATE_MISSING_QUEUES)
+        self.assertEqual('job', config.CELERY_DEFAULT_EXCHANGE)
+        self.assertEqual('job', config.CELERY_DEFAULT_QUEUE)
+        self.assertEqual(
+            ('lp.services.job.celeryjob', ), config.CELERY_IMPORTS)
+        self.assertEqual('amqp', config.CELERY_RESULT_BACKEND)
+
+    def test_app_server_configuration(self):
+        self.check_default_common_parameters(get_celery_configuration())
+
+    def check_job_specific_celeryd_configutartion(self, expected, config):
+        self.check_default_common_parameters(config)
+        self.assertEqual(expected['concurrency'], config.CELERYD_CONCURRENCY)
+        self.assertEqual(
+            expected['timeout'], config.CELERYD_TASK_SOFT_TIME_LIMIT)
+        self.assertEqual(
+            expected['fallback'], getattr(config, 'FALLBACK', None))
+
+    def test_default_celeryd_configuration_fast_lanes(self):
+        expected = {
+            'concurrency': 3,
+            'fallback': 'job_slow',
+            'timeout': 600,
+            }
+        with faked_command_line(['celeryd', '-Q', 'job']):
+            config = get_celery_configuration()
+            self.check_default_common_parameters(config)
+            self.check_job_specific_celeryd_configutartion(expected, config)
+        with faked_command_line(['celeryd', '-Q', 'branch_write_job']):
+            config = get_celery_configuration()
+            self.check_default_common_parameters(config)
+            expected['fallback'] = 'branch_write_job_slow'
+            self.check_job_specific_celeryd_configutartion(expected, config)
+
+    def test_default_celeryd_configuration_slow_lanes(self):
+        expected = {
+            'concurrency': 1,
+            'fallback': None,
+            'timeout': 86400,
+            }
+        with faked_command_line(['celeryd', '-Q', 'job_slow']):
+            config = get_celery_configuration()
+            self.check_default_common_parameters(config)
+            self.check_job_specific_celeryd_configutartion(expected, config)
+        with faked_command_line(['celeryd', '-Q', 'branch_write_job_slow']):
+            config = get_celery_configuration()
+            self.check_default_common_parameters(config)
+            self.check_job_specific_celeryd_configutartion(expected, config)
+
+    def test_circular_fallback_lanes(self):
+        # Circular fallback lanes are detected.
+        # Import late because the RabbitMQ parameters are set during layer
+        # setup.
+        from lp.services.job.celeryconfig import ConfigurationError
+        with changed_config(
+            """
+            [job_slow]
+            fallback_queue: job
+        """):
+            error = (
+                "Circular chain of fallback queues: job already in "
+                "['job', 'job_slow']"
+                )
+            self.assertRaisesWithContent(
+                ConfigurationError, error, get_celery_configuration)
+
+    def test_missing_queue_parameter_for_celeryd(self):
+        # An exception is raised when celeryd is started without
+        # the parameter -Q.
+        # Import late because the RabbitMQ parameters are set during layer
+        # setup.
+        from lp.services.job.celeryconfig import ConfigurationError
+        with faked_command_line(['celeryd']):
+            error = 'A queue must be specified.'
+            self.assertRaisesWithContent(
+                ConfigurationError, error, get_celery_configuration)
+
+    def test_two_queues_for_celeryd(self):
+        # An exception is raised when celeryd is started for two queues.
+        # Import late because the RabbitMQ parameters are set during layer
+        # setup.
+        from lp.services.job.celeryconfig import ConfigurationError
+        with faked_command_line(['celeryd', '--queue=job,branch_write_job']):
+            error = 'A celeryd instance may serve only one queue.'
+            self.assertRaisesWithContent(
+                ConfigurationError, error, get_celery_configuration)
+
+    def test_unconfigured_queue_for_celeryd(self):
+        # An exception is raised when celeryd is started for a queue that
+        # is not configured.
+        # Import late because the RabbitMQ parameters are set during layer
+        # setup.
+        from lp.services.job.celeryconfig import ConfigurationError
+        with faked_command_line(['celeryd', '--queue=foo']):
+            error = 'Queue foo is not configured in schema-lazr.conf'
+            self.assertRaisesWithContent(
+                ConfigurationError, error, get_celery_configuration)