launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #19598
[Merge] lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad.
Commit message:
Run "celery worker" rather than the deprecated "celeryd".
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/modern-celery-invocation/+merge/274139
Run "celery worker" rather than the deprecated "celeryd". This suppresses a bit of annoying/confusing test output.
I left some compatibility in place in the configuration code since it's still being run as "celeryd" in production.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/modern-celery-invocation into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2015-08-03 07:23:57 +0000
+++ lib/lp/services/job/celeryconfig.py 2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from datetime import timedelta
@@ -50,17 +50,18 @@
args = parser.parse_known_args(argv)
queues = args[0].queues
# A queue must be specified as a command line parameter for each
- # celeryd instance, but this is not required for a Launchpad app server.
- if 'celeryd' in argv[0]:
+ # "celery worker" instance, but this is not required for a Launchpad app
+ # server.
+ if 'celeryd' in argv[0] or ('celery' in argv[0] and argv[1] == 'worker'):
if queues is None or queues == '':
raise ConfigurationError('A queue must be specified.')
queues = queues.split(',')
- # Allow only one queue per celeryd instance. More than one queue
- # would require a check for consistent timeout values, and especially
- # a better way to specify a fallback queue.
+ # Allow only one queue per "celery worker" instance. More than one
+ # queue would require a check for consistent timeout values, and
+ # especially a better way to specify a fallback queue.
if len(queues) > 1:
raise ConfigurationError(
- 'A celeryd instance may serve only one queue.')
+ 'A "celery worker" instance may serve only one queue.')
queue = queues[0]
if queue not in celery_queues:
raise ConfigurationError(
@@ -103,7 +104,7 @@
# See http://ask.github.com/celery/userguide/optimizing.html:
# The AMQP message of a job should stay in the RabbitMQ server
# until the job has been finished. This allows to simply kill
- # a celeryd instance while a job is executed; when another
+ # a "celery worker" instance while a job is executed; when another
# instance is started later, it will run the aborted job again.
result['CELERYD_PREFETCH_MULTIPLIER'] = 1
result['CELERY_ACKS_LATE'] = True
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/tests/__init__.py 2015-10-12 13:53:26 +0000
@@ -1,11 +1,11 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
__all__ = [
'block_on_job',
- 'celeryd',
+ 'celery_worker',
'monitor_celery',
'pop_remote_notifications',
]
@@ -19,11 +19,11 @@
from lp.testing.fixture import CaptureOops
-def celeryd(queue, cwd=None):
- """Return a ContextManager for a celeryd instance.
+def celery_worker(queue, cwd=None):
+ """Return a ContextManager for a "celery worker" instance.
- The celeryd instance will be configured to use the currently-configured
- BROKER_URL, and able to run CeleryRunJob tasks.
+ The "celery worker" instance will be configured to use the
+ currently-configured BROKER_URL, and able to run CeleryRunJob tasks.
"""
from lp.services.job.celeryjob import CeleryRunJob
from lazr.jobrunner.tests.test_celerytask import running
@@ -31,6 +31,7 @@
with CeleryRunJob.app.broker_connection() as connection:
broker_uri = connection.as_uri(include_password=True)
cmd_args = (
+ 'worker',
'--config', 'lp.services.job.celeryconfig',
'--broker', broker_uri,
'--concurrency', '1',
@@ -38,7 +39,7 @@
'--queues', queue,
'--include', 'lp.services.job.tests.celery_helpers',
)
- return running('bin/celeryd', cmd_args, cwd=cwd)
+ return running('bin/celery', cmd_args, cwd=cwd)
@contextmanager
@@ -80,6 +81,6 @@
def pop_remote_notifications():
- """Pop the notifications from a celeryd worker."""
+ """Pop the notifications from a celery worker."""
from lp.services.job.tests.celery_helpers import pop_notifications
return pop_notifications.delay().get(30)
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 10:24:06 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from contextlib import contextmanager
@@ -17,12 +17,14 @@
config.pop('test_changes')
-class TestCeleryConfiguration(TestCase):
+class TestCeleryWorkerConfiguration(TestCase):
layer = RabbitMQLayer
+ command = ['celery', 'worker']
+
def check_default_common_parameters(self, config):
# Tests for default config values that are set for app servers
- # and for celeryd instances.
+ # and for "celery worker" instances.
# Four queues are defined; the binding key for each queue is
# just the queue name.
@@ -51,7 +53,7 @@
config = configure([''])
self.check_default_common_parameters(config)
- def check_job_specific_celeryd_configuration(self, expected, config):
+ def check_job_specific_celery_worker_configuration(self, expected, config):
self.check_default_common_parameters(config)
self.assertEqual(
expected['concurrency'], config['CELERYD_CONCURRENCY'])
@@ -60,34 +62,34 @@
self.assertEqual(
expected['fallback'], config.get('FALLBACK', None))
- def test_default_celeryd_configuration_fast_lanes(self):
+ def test_default_celery_worker_configuration_fast_lanes(self):
from lp.services.job.celeryconfig import configure
expected = {
'concurrency': 3,
'fallback': 'launchpad_job_slow',
'timeout': 300,
}
- config = configure(['celeryd', '-Q', 'launchpad_job'])
+ config = configure(self.command + ['-Q', 'launchpad_job'])
self.check_default_common_parameters(config)
- self.check_job_specific_celeryd_configuration(expected, config)
- config = configure(['celeryd', '-Q', 'branch_write_job'])
+ self.check_job_specific_celery_worker_configuration(expected, config)
+ config = configure(self.command + ['-Q', 'branch_write_job'])
self.check_default_common_parameters(config)
expected['fallback'] = 'branch_write_job_slow'
- self.check_job_specific_celeryd_configuration(expected, config)
+ self.check_job_specific_celery_worker_configuration(expected, config)
- def test_default_celeryd_configuration_slow_lanes(self):
+ def test_default_celery_worker_configuration_slow_lanes(self):
from lp.services.job.celeryconfig import configure
expected = {
'concurrency': 1,
'fallback': None,
'timeout': 86400,
}
- config = configure(['celeryd', '-Q', 'launchpad_job_slow'])
- self.check_default_common_parameters(config)
- self.check_job_specific_celeryd_configuration(expected, config)
- config = configure(['celeryd', '-Q', 'branch_write_job_slow'])
- self.check_default_common_parameters(config)
- self.check_job_specific_celeryd_configuration(expected, config)
+ config = configure(self.command + ['-Q', 'launchpad_job_slow'])
+ self.check_default_common_parameters(config)
+ self.check_job_specific_celery_worker_configuration(expected, config)
+ config = configure(self.command + ['-Q', 'branch_write_job_slow'])
+ self.check_default_common_parameters(config)
+ self.check_job_specific_celery_worker_configuration(expected, config)
def test_circular_fallback_lanes(self):
# Circular fallback lanes are detected.
@@ -109,8 +111,8 @@
self.assertRaisesWithContent(
ConfigurationError, error, configure, [''])
- def test_missing_queue_parameter_for_celeryd(self):
- # An exception is raised when celeryd is started without
+ def test_missing_queue_parameter_for_celery_worker(self):
+ # An exception is raised when "celery worker" is started without
# the parameter -Q.
# Import late because the RabbitMQ parameters are set during layer
# setup.
@@ -120,24 +122,25 @@
)
error = 'A queue must be specified.'
self.assertRaisesWithContent(
- ConfigurationError, error, configure, ['celeryd'])
+ ConfigurationError, error, configure, self.command)
- def test_two_queues_for_celeryd(self):
- # An exception is raised when celeryd is started for two queues.
+ def test_two_queues_for_celery_worker(self):
+ # An exception is raised when "celery worker" is started for two
+ # queues.
# Import late because the RabbitMQ parameters are set during layer
# setup.
from lp.services.job.celeryconfig import (
ConfigurationError,
configure,
)
- error = 'A celeryd instance may serve only one queue.'
+ error = 'A "celery worker" instance may serve only one queue.'
self.assertRaisesWithContent(
ConfigurationError, error, configure,
- ['celeryd', '--queue=launchpad_job,branch_write_job'])
+ self.command + ['--queue=launchpad_job,branch_write_job'])
- def test_unconfigured_queue_for_celeryd(self):
- # An exception is raised when celeryd is started for a queue that
- # is not configured.
+ def test_unconfigured_queue_for_celery_worker(self):
+ # An exception is raised when "celery worker" is started for a queue
+ # that is not configured.
# Import late because the RabbitMQ parameters are set during layer
# setup.
from lp.services.job.celeryconfig import (
@@ -146,4 +149,11 @@
)
error = 'Queue foo is not configured in schema-lazr.conf'
self.assertRaisesWithContent(
- ConfigurationError, error, configure, ['celeryd', '--queue=foo'])
+ ConfigurationError, error, configure,
+ self.command + ['--queue=foo'])
+
+
+class TestCelerydConfiguration(TestCeleryWorkerConfiguration):
+ """Test behaviour with legacy "celeryd" command name."""
+
+ command = ['celeryd']
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-11-08 04:29:11 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2015-10-12 13:53:26 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from cStringIO import StringIO
@@ -13,7 +13,7 @@
from lp.scripts.helpers import TransactionFreeOperation
from lp.services.features.testing import FeatureFixture
from lp.services.job.tests import (
- celeryd,
+ celery_worker,
drain_celery_queues,
monitor_celery,
)
@@ -133,17 +133,17 @@
result_queue_name = request.task_id.replace('-', '')
# Paranoia check: This test intends to prove that a Celery
# result queue for the task created above will _not_ be created.
- # This would also happen when "with celeryd()" would do nothing.
+ # This would also happen when "with celery_worker()" would do nothing.
# So let's be sure that a task is queued...
# Give the system some time to deliver the message
self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
- # Wait at most 60 seconds for celeryd to start and process
+ # Wait at most 60 seconds for "celery worker" to start and process
# the task.
- with celeryd(job_queue_name):
+ with celery_worker(job_queue_name):
# Due to FIFO ordering, this will only return after
# RunMissingReady has finished.
noop.apply_async(queue=job_queue_name).wait(60)
- # But now the message has been consumed by celeryd.
+ # But now the message has been consumed by "celery worker".
self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
# No result queue was created for the task.
try:
=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py 2015-10-08 12:08:25 +0000
+++ lib/lp/testing/layers.py 2015-10-12 13:53:26 +0000
@@ -116,7 +116,7 @@
from lp.services.googlesearch.tests.googleserviceharness import (
GoogleServiceTestSetup,
)
-from lp.services.job.tests import celeryd
+from lp.services.job.tests import celery_worker
from lp.services.librarian.model import LibraryFileAlias
from lp.services.librarianserver.testing.server import LibrarianServerFixture
from lp.services.mail.mailbox import (
@@ -1873,55 +1873,55 @@
class CeleryJobLayer(AppServerLayer):
"""Layer for tests that run jobs via Celery."""
- celeryd = None
+ celery_worker = None
@classmethod
@profiled
def setUp(cls):
- cls.celeryd = celeryd('launchpad_job')
- cls.celeryd.__enter__()
+ cls.celery_worker = celery_worker('launchpad_job')
+ cls.celery_worker.__enter__()
@classmethod
@profiled
def tearDown(cls):
- cls.celeryd.__exit__(None, None, None)
- cls.celeryd = None
+ cls.celery_worker.__exit__(None, None, None)
+ cls.celery_worker = None
class CeleryBzrsyncdJobLayer(AppServerLayer):
"""Layer for tests that run jobs that read from branches via Celery."""
- celeryd = None
+ celery_worker = None
@classmethod
@profiled
def setUp(cls):
- cls.celeryd = celeryd('bzrsyncd_job')
- cls.celeryd.__enter__()
+ cls.celery_worker = celery_worker('bzrsyncd_job')
+ cls.celery_worker.__enter__()
@classmethod
@profiled
def tearDown(cls):
- cls.celeryd.__exit__(None, None, None)
- cls.celeryd = None
+ cls.celery_worker.__exit__(None, None, None)
+ cls.celery_worker = None
class CeleryBranchWriteJobLayer(AppServerLayer):
"""Layer for tests that run jobs which write to branches via Celery."""
- celeryd = None
+ celery_worker = None
@classmethod
@profiled
def setUp(cls):
- cls.celeryd = celeryd('branch_write_job')
- cls.celeryd.__enter__()
+ cls.celery_worker = celery_worker('branch_write_job')
+ cls.celery_worker.__enter__()
@classmethod
@profiled
def tearDown(cls):
- cls.celeryd.__exit__(None, None, None)
- cls.celeryd = None
+ cls.celery_worker.__exit__(None, None, None)
+ cls.celery_worker = None
class ZopelessAppServerLayer(LaunchpadZopelessLayer):
Follow ups