launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #15592
[Merge] lp:~wgrant/launchpad/separate-celerys into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/separate-celerys into lp:launchpad.
Commit message:
Rename the job celery queue to launchpad_job, and move BranchScanJob to a new bzrsyncd_job. This'll let us run celerys as both launchpad@ and bzrsyncd@ on prod.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/separate-celerys/+merge/163832
Rename the job celery queue to launchpad_job, and move BranchScanJob to a new bzrsyncd_job. This'll let us run celerys as both launchpad@ and bzrsyncd@ on prod.
--
https://code.launchpad.net/~wgrant/launchpad/separate-celerys/+merge/163832
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/separate-celerys into lp:launchpad.
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py 2012-11-08 09:18:40 +0000
+++ lib/lp/code/model/branchjob.py 2013-05-15 04:52:26 +0000
@@ -313,6 +313,8 @@
retry_error_types = (AdvisoryLockHeld,)
+ task_queue = 'bzrsyncd_job'
+
config = config.branchscanner
@classmethod
@@ -752,6 +754,8 @@
class_job_type = BranchJobType.ROSETTA_UPLOAD
+ task_queue = 'bzrsyncd_job'
+
config = config.rosettabranches
def __init__(self, branch_job):
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2013-02-21 05:43:21 +0000
+++ lib/lp/code/model/tests/test_branch.py 2013-05-15 04:52:26 +0000
@@ -160,7 +160,7 @@
from lp.testing.layers import (
AppServerLayer,
CeleryBranchWriteJobLayer,
- CeleryJobLayer,
+ CeleryBzrsyncdJobLayer,
DatabaseFunctionalLayer,
LaunchpadFunctionalLayer,
LaunchpadZopelessLayer,
@@ -331,7 +331,7 @@
class TestBranchJobViaCelery(TestCaseWithFactory):
- layer = CeleryJobLayer
+ layer = CeleryBzrsyncdJobLayer
def test_branchChanged_via_celery(self):
"""Running a job via Celery succeeds and emits expected output."""
=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py 2012-11-12 22:27:55 +0000
+++ lib/lp/code/model/tests/test_branchjob.py 2013-05-15 04:52:26 +0000
@@ -87,7 +87,7 @@
switch_dbuser,
)
from lp.testing.layers import (
- CeleryJobLayer,
+ CeleryBzrsyncdJobLayer,
DatabaseFunctionalLayer,
LaunchpadZopelessLayer,
)
@@ -1270,7 +1270,7 @@
class TestViaCelery(TestCaseWithFactory):
- layer = CeleryJobLayer
+ layer = CeleryBzrsyncdJobLayer
def test_RosettaUploadJob(self):
"""Ensure RosettaUploadJob can run under Celery."""
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2013-04-29 21:32:04 +0000
+++ lib/lp/services/config/schema-lazr.conf 2013-05-15 04:52:26 +0000
@@ -1817,19 +1817,35 @@
[job_runner_queues]
# The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow celerybeat
+queues: launchpad_job launchpad_job_slow bzrsyncd_job bzrsyncd_job_slow branch_write_job branch_write_job_slow celerybeat
# The main job queue.
-[job]
-# The maximum job run time in seconds. When a job runs longer, Celery
-# terminates it with a SoftTimeLimitExceeded exception.
-timeout: 300
-# If a job times out, it will be queued again in the fallback queue.
-fallback_queue: job_slow
-concurrency: 3
-
-# The queue for jobs that time out in the queue "job".
-[job_slow]
+[launchpad_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 300
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: launchpad_job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[launchpad_job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1
+
+[bzrsyncd_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 300
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: bzrsyncd_job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[bzrsyncd_job_slow]
# The maximum job run time in seconds. When a job runs longer, Celery
# terminates it with a SoftTimeLimitExceeded exception.
timeout: 86400
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-07-26 14:31:45 +0000
+++ lib/lp/services/job/celeryconfig.py 2013-05-15 04:52:26 +0000
@@ -79,7 +79,7 @@
result['BROKER_VHOST'] = config.rabbitmq.virtual_host
result['CELERY_CREATE_MISSING_QUEUES'] = False
result['CELERY_DEFAULT_EXCHANGE'] = 'job'
- result['CELERY_DEFAULT_QUEUE'] = 'job'
+ result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
result['CELERY_QUEUES'] = celery_queues
result['CELERY_RESULT_BACKEND'] = 'amqp'
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-11-26 08:33:03 +0000
+++ lib/lp/services/job/runner.py 2013-05-15 04:52:26 +0000
@@ -109,7 +109,7 @@
retry_error_types = ()
- task_queue = 'job'
+ task_queue = 'launchpad_job'
celery_responses = None
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2012-07-26 14:31:45 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2013-05-15 04:52:26 +0000
@@ -25,8 +25,9 @@
# Four queues are defined; the binding key for each queue is
# just the queue name.
queue_names = [
- 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
- 'job_slow']
+ 'branch_write_job', 'branch_write_job_slow',
+ 'bzrsyncd_job', 'bzrsyncd_job_slow', 'celerybeat',
+ 'launchpad_job', 'launchpad_job_slow']
queues = config['CELERY_QUEUES']
self.assertEqual(queue_names, sorted(queues))
for name in queue_names:
@@ -41,7 +42,7 @@
self.assertEqual('/', config['BROKER_VHOST'])
self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
- self.assertEqual('job', config['CELERY_DEFAULT_QUEUE'])
+ self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
self.assertEqual(
('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
@@ -64,10 +65,10 @@
from lp.services.job.celeryconfig import configure
expected = {
'concurrency': 3,
- 'fallback': 'job_slow',
+ 'fallback': 'launchpad_job_slow',
'timeout': 300,
}
- config = configure(['celeryd', '-Q', 'job'])
+ config = configure(['celeryd', '-Q', 'launchpad_job'])
self.check_default_common_parameters(config)
self.check_job_specific_celeryd_configuration(expected, config)
config = configure(['celeryd', '-Q', 'branch_write_job'])
@@ -82,7 +83,7 @@
'fallback': None,
'timeout': 86400,
}
- config = configure(['celeryd', '-Q', 'job_slow'])
+ config = configure(['celeryd', '-Q', 'launchpad_job_slow'])
self.check_default_common_parameters(config)
self.check_job_specific_celeryd_configuration(expected, config)
config = configure(['celeryd', '-Q', 'branch_write_job_slow'])
@@ -99,12 +100,12 @@
)
with changed_config(
"""
- [job_slow]
- fallback_queue: job
+ [launchpad_job_slow]
+ fallback_queue: launchpad_job
"""):
error = (
- "Circular chain of fallback queues: job already in "
- "['job', 'job_slow']"
+ "Circular chain of fallback queues: launchpad_job already in "
+ "['launchpad_job', 'launchpad_job_slow']"
)
self.assertRaisesWithContent(
ConfigurationError, error, configure, [''])
@@ -133,7 +134,7 @@
error = 'A celeryd instance may serve only one queue.'
self.assertRaisesWithContent(
ConfigurationError, error, configure,
- ['celeryd', '--queue=job,branch_write_job'])
+ ['celeryd', '--queue=launchpad_job,branch_write_job'])
def test_unconfigured_queue_for_celeryd(self):
# An exception is raised when celeryd is started for a queue that
=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py 2013-01-07 03:21:35 +0000
+++ lib/lp/testing/layers.py 2013-05-15 04:52:26 +0000
@@ -1904,7 +1904,25 @@
@classmethod
@profiled
def setUp(cls):
- cls.celeryd = celeryd('job')
+ cls.celeryd = celeryd('launchpad_job')
+ cls.celeryd.__enter__()
+
+ @classmethod
+ @profiled
+ def tearDown(cls):
+ cls.celeryd.__exit__(None, None, None)
+ cls.celeryd = None
+
+
+class CeleryBzrsyncdJobLayer(AppServerLayer):
+ """Layer for tests that run jobs that read from branches via Celery."""
+
+ celeryd = None
+
+ @classmethod
+ @profiled
+ def setUp(cls):
+ cls.celeryd = celeryd('bzrsyncd_job')
cls.celeryd.__enter__()
@classmethod