← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad.

Commit message:
Run "celery worker" rather than the deprecated "celeryd".

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/modern-celery-invocation/+merge/274139

Run "celery worker" rather than the deprecated "celeryd".  This suppresses a bit of annoying/confusing test output.

I left some compatibility in place in the configuration code since it's still being run as "celeryd" in production.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2015-08-03 07:23:57 +0000
+++ lib/lp/services/job/celeryconfig.py	2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from datetime import timedelta
@@ -50,17 +50,18 @@
     args = parser.parse_known_args(argv)
     queues = args[0].queues
     # A queue must be specified as a command line parameter for each
-    # celeryd instance, but this is not required for a Launchpad app server.
-    if 'celeryd' in argv[0]:
+    # "celery worker" instance, but this is not required for a Launchpad app
+    # server.
+    if 'celeryd' in argv[0] or ('celery' in argv[0] and argv[1] == 'worker'):
         if queues is None or queues == '':
             raise ConfigurationError('A queue must be specified.')
         queues = queues.split(',')
-        # Allow only one queue per celeryd instance. More than one queue
-        # would require a check for consistent timeout values, and especially
-        # a better way to specify a fallback queue.
+        # Allow only one queue per "celery worker" instance. More than one
+        # queue would require a check for consistent timeout values, and
+        # especially a better way to specify a fallback queue.
         if len(queues) > 1:
             raise ConfigurationError(
-                'A celeryd instance may serve only one queue.')
+                'A "celery worker" instance may serve only one queue.')
         queue = queues[0]
         if queue not in celery_queues:
             raise ConfigurationError(
@@ -103,7 +104,7 @@
     # See http://ask.github.com/celery/userguide/optimizing.html:
     # The AMQP message of a job should stay in the RabbitMQ server
     # until the job has been finished. This allows to simply kill
-    # a celeryd instance while a job is executed; when another
+    # a "celery worker" instance while a job is executed; when another
     # instance is started later, it will run the aborted job again.
     result['CELERYD_PREFETCH_MULTIPLIER'] = 1
     result['CELERY_ACKS_LATE'] = True

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/tests/__init__.py	2015-10-12 13:53:26 +0000
@@ -1,11 +1,11 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
 
 __all__ = [
     'block_on_job',
-    'celeryd',
+    'celery_worker',
     'monitor_celery',
     'pop_remote_notifications',
     ]
@@ -19,11 +19,11 @@
 from lp.testing.fixture import CaptureOops
 
 
-def celeryd(queue, cwd=None):
-    """Return a ContextManager for a celeryd instance.
+def celery_worker(queue, cwd=None):
+    """Return a ContextManager for a "celery worker" instance.
 
-    The celeryd instance will be configured to use the currently-configured
-    BROKER_URL, and able to run CeleryRunJob tasks.
+    The "celery worker" instance will be configured to use the
+    currently-configured BROKER_URL, and able to run CeleryRunJob tasks.
     """
     from lp.services.job.celeryjob import CeleryRunJob
     from lazr.jobrunner.tests.test_celerytask import running
@@ -31,6 +31,7 @@
     with CeleryRunJob.app.broker_connection() as connection:
         broker_uri = connection.as_uri(include_password=True)
     cmd_args = (
+        'worker',
         '--config', 'lp.services.job.celeryconfig',
         '--broker', broker_uri,
         '--concurrency', '1',
@@ -38,7 +39,7 @@
         '--queues', queue,
         '--include', 'lp.services.job.tests.celery_helpers',
     )
-    return running('bin/celeryd', cmd_args, cwd=cwd)
+    return running('bin/celery', cmd_args, cwd=cwd)
 
 
 @contextmanager
@@ -80,6 +81,6 @@
 
 
 def pop_remote_notifications():
-    """Pop the notifications from a celeryd worker."""
+    """Pop the notifications from a celery worker."""
     from lp.services.job.tests.celery_helpers import pop_notifications
     return pop_notifications.delay().get(30)

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2015-08-03 10:24:06 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
@@ -17,12 +17,14 @@
     config.pop('test_changes')
 
 
-class TestCeleryConfiguration(TestCase):
+class TestCeleryWorkerConfiguration(TestCase):
     layer = RabbitMQLayer
 
+    command = ['celery', 'worker']
+
     def check_default_common_parameters(self, config):
         # Tests for default config values that are set for app servers
-        # and for celeryd instances.
+        # and for "celery worker" instances.
 
         # Four queues are defined; the binding key for each queue is
         # just the queue name.
@@ -51,7 +53,7 @@
         config = configure([''])
         self.check_default_common_parameters(config)
 
-    def check_job_specific_celeryd_configuration(self, expected, config):
+    def check_job_specific_celery_worker_configuration(self, expected, config):
         self.check_default_common_parameters(config)
         self.assertEqual(
             expected['concurrency'], config['CELERYD_CONCURRENCY'])
@@ -60,34 +62,34 @@
         self.assertEqual(
             expected['fallback'], config.get('FALLBACK', None))
 
-    def test_default_celeryd_configuration_fast_lanes(self):
+    def test_default_celery_worker_configuration_fast_lanes(self):
         from lp.services.job.celeryconfig import configure
         expected = {
             'concurrency': 3,
             'fallback': 'launchpad_job_slow',
             'timeout': 300,
             }
-        config = configure(['celeryd', '-Q', 'launchpad_job'])
+        config = configure(self.command + ['-Q', 'launchpad_job'])
         self.check_default_common_parameters(config)
-        self.check_job_specific_celeryd_configuration(expected, config)
-        config = configure(['celeryd', '-Q', 'branch_write_job'])
+        self.check_job_specific_celery_worker_configuration(expected, config)
+        config = configure(self.command + ['-Q', 'branch_write_job'])
         self.check_default_common_parameters(config)
         expected['fallback'] = 'branch_write_job_slow'
-        self.check_job_specific_celeryd_configuration(expected, config)
+        self.check_job_specific_celery_worker_configuration(expected, config)
 
-    def test_default_celeryd_configuration_slow_lanes(self):
+    def test_default_celery_worker_configuration_slow_lanes(self):
         from lp.services.job.celeryconfig import configure
         expected = {
             'concurrency': 1,
             'fallback': None,
             'timeout': 86400,
             }
-        config = configure(['celeryd', '-Q', 'launchpad_job_slow'])
-        self.check_default_common_parameters(config)
-        self.check_job_specific_celeryd_configuration(expected, config)
-        config = configure(['celeryd', '-Q', 'branch_write_job_slow'])
-        self.check_default_common_parameters(config)
-        self.check_job_specific_celeryd_configuration(expected, config)
+        config = configure(self.command + ['-Q', 'launchpad_job_slow'])
+        self.check_default_common_parameters(config)
+        self.check_job_specific_celery_worker_configuration(expected, config)
+        config = configure(self.command + ['-Q', 'branch_write_job_slow'])
+        self.check_default_common_parameters(config)
+        self.check_job_specific_celery_worker_configuration(expected, config)
 
     def test_circular_fallback_lanes(self):
         # Circular fallback lanes are detected.
@@ -109,8 +111,8 @@
             self.assertRaisesWithContent(
                 ConfigurationError, error, configure, [''])
 
-    def test_missing_queue_parameter_for_celeryd(self):
-        # An exception is raised when celeryd is started without
+    def test_missing_queue_parameter_for_celery_worker(self):
+        # An exception is raised when "celery worker" is started without
         # the parameter -Q.
         # Import late because the RabbitMQ parameters are set during layer
         # setup.
@@ -120,24 +122,25 @@
             )
         error = 'A queue must be specified.'
         self.assertRaisesWithContent(
-            ConfigurationError, error, configure, ['celeryd'])
+            ConfigurationError, error, configure, self.command)
 
-    def test_two_queues_for_celeryd(self):
-        # An exception is raised when celeryd is started for two queues.
+    def test_two_queues_for_celery_worker(self):
+        # An exception is raised when "celery worker" is started for two
+        # queues.
         # Import late because the RabbitMQ parameters are set during layer
         # setup.
         from lp.services.job.celeryconfig import (
             ConfigurationError,
             configure,
             )
-        error = 'A celeryd instance may serve only one queue.'
+        error = 'A "celery worker" instance may serve only one queue.'
         self.assertRaisesWithContent(
             ConfigurationError, error, configure,
-            ['celeryd', '--queue=launchpad_job,branch_write_job'])
+            self.command + ['--queue=launchpad_job,branch_write_job'])
 
-    def test_unconfigured_queue_for_celeryd(self):
-        # An exception is raised when celeryd is started for a queue that
-        # is not configured.
+    def test_unconfigured_queue_for_celery_worker(self):
+        # An exception is raised when "celery worker" is started for a queue
+        # that is not configured.
         # Import late because the RabbitMQ parameters are set during layer
         # setup.
         from lp.services.job.celeryconfig import (
@@ -146,4 +149,11 @@
             )
         error = 'Queue foo is not configured in schema-lazr.conf'
         self.assertRaisesWithContent(
-            ConfigurationError, error, configure, ['celeryd', '--queue=foo'])
+            ConfigurationError, error, configure,
+            self.command + ['--queue=foo'])
+
+
+class TestCelerydConfiguration(TestCeleryWorkerConfiguration):
+    """Test behaviour with legacy "celeryd" command name."""
+
+    command = ['celeryd']

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-11-08 04:29:11 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from cStringIO import StringIO
@@ -13,7 +13,7 @@
 from lp.scripts.helpers import TransactionFreeOperation
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.tests import (
-    celeryd,
+    celery_worker,
     drain_celery_queues,
     monitor_celery,
     )
@@ -133,17 +133,17 @@
         result_queue_name = request.task_id.replace('-', '')
         # Paranoia check: This test intends to prove that a Celery
         # result queue for the task created above will _not_ be created.
-        # This would also happen when "with celeryd()" would do nothing.
+        # This would also happen when "with celery_worker()" would do nothing.
         # So let's be sure that a task is queued...
         # Give the system some time to deliver the message
         self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
-        # Wait at most 60 seconds for celeryd to start and process
+        # Wait at most 60 seconds for "celery worker" to start and process
         # the task.
-        with celeryd(job_queue_name):
+        with celery_worker(job_queue_name):
             # Due to FIFO ordering, this will only return after
             # RunMissingReady has finished.
             noop.apply_async(queue=job_queue_name).wait(60)
-        # But now the message has been consumed by celeryd.
+        # But now the message has been consumed by "celery worker".
         self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
         # No result queue was created for the task.
         try:

=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py	2015-10-08 12:08:25 +0000
+++ lib/lp/testing/layers.py	2015-10-12 13:53:26 +0000
@@ -116,7 +116,7 @@
 from lp.services.googlesearch.tests.googleserviceharness import (
     GoogleServiceTestSetup,
     )
-from lp.services.job.tests import celeryd
+from lp.services.job.tests import celery_worker
 from lp.services.librarian.model import LibraryFileAlias
 from lp.services.librarianserver.testing.server import LibrarianServerFixture
 from lp.services.mail.mailbox import (
@@ -1873,55 +1873,55 @@
 class CeleryJobLayer(AppServerLayer):
     """Layer for tests that run jobs via Celery."""
 
-    celeryd = None
+    celery_worker = None
 
     @classmethod
     @profiled
     def setUp(cls):
-        cls.celeryd = celeryd('launchpad_job')
-        cls.celeryd.__enter__()
+        cls.celery_worker = celery_worker('launchpad_job')
+        cls.celery_worker.__enter__()
 
     @classmethod
     @profiled
     def tearDown(cls):
-        cls.celeryd.__exit__(None, None, None)
-        cls.celeryd = None
+        cls.celery_worker.__exit__(None, None, None)
+        cls.celery_worker = None
 
 
 class CeleryBzrsyncdJobLayer(AppServerLayer):
     """Layer for tests that run jobs that read from branches via Celery."""
 
-    celeryd = None
+    celery_worker = None
 
     @classmethod
     @profiled
     def setUp(cls):
-        cls.celeryd = celeryd('bzrsyncd_job')
-        cls.celeryd.__enter__()
+        cls.celery_worker = celery_worker('bzrsyncd_job')
+        cls.celery_worker.__enter__()
 
     @classmethod
     @profiled
     def tearDown(cls):
-        cls.celeryd.__exit__(None, None, None)
-        cls.celeryd = None
+        cls.celery_worker.__exit__(None, None, None)
+        cls.celery_worker = None
 
 
 class CeleryBranchWriteJobLayer(AppServerLayer):
     """Layer for tests that run jobs which write to branches via Celery."""
 
-    celeryd = None
+    celery_worker = None
 
     @classmethod
     @profiled
     def setUp(cls):
-        cls.celeryd = celeryd('branch_write_job')
-        cls.celeryd.__enter__()
+        cls.celery_worker = celery_worker('branch_write_job')
+        cls.celery_worker.__enter__()
 
     @classmethod
     @profiled
     def tearDown(cls):
-        cls.celeryd.__exit__(None, None, None)
-        cls.celeryd = None
+        cls.celery_worker.__exit__(None, None, None)
+        cls.celery_worker = None
 
 
 class ZopelessAppServerLayer(LaunchpadZopelessLayer):


Follow ups