← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/separate-celerys into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/separate-celerys into lp:launchpad.

Commit message:
Rename the job celery queue to launchpad_job, and move BranchScanJob to a new bzrsyncd_job. This'll let us run celerys as both launchpad@ and bzrsyncd@ on prod.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/separate-celerys/+merge/163832

Rename the job celery queue to launchpad_job, and move BranchScanJob to a new bzrsyncd_job. This'll let us run celerys as both launchpad@ and bzrsyncd@ on prod.
-- 
https://code.launchpad.net/~wgrant/launchpad/separate-celerys/+merge/163832
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/separate-celerys into lp:launchpad.
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py	2012-11-08 09:18:40 +0000
+++ lib/lp/code/model/branchjob.py	2013-05-15 04:52:26 +0000
@@ -313,6 +313,8 @@
 
     retry_error_types = (AdvisoryLockHeld,)
 
+    task_queue = 'bzrsyncd_job'
+
     config = config.branchscanner
 
     @classmethod
@@ -752,6 +754,8 @@
 
     class_job_type = BranchJobType.ROSETTA_UPLOAD
 
+    task_queue = 'bzrsyncd_job'
+
     config = config.rosettabranches
 
     def __init__(self, branch_job):

=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2013-02-21 05:43:21 +0000
+++ lib/lp/code/model/tests/test_branch.py	2013-05-15 04:52:26 +0000
@@ -160,7 +160,7 @@
 from lp.testing.layers import (
     AppServerLayer,
     CeleryBranchWriteJobLayer,
-    CeleryJobLayer,
+    CeleryBzrsyncdJobLayer,
     DatabaseFunctionalLayer,
     LaunchpadFunctionalLayer,
     LaunchpadZopelessLayer,
@@ -331,7 +331,7 @@
 
 class TestBranchJobViaCelery(TestCaseWithFactory):
 
-    layer = CeleryJobLayer
+    layer = CeleryBzrsyncdJobLayer
 
     def test_branchChanged_via_celery(self):
         """Running a job via Celery succeeds and emits expected output."""

=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py	2012-11-12 22:27:55 +0000
+++ lib/lp/code/model/tests/test_branchjob.py	2013-05-15 04:52:26 +0000
@@ -87,7 +87,7 @@
     switch_dbuser,
     )
 from lp.testing.layers import (
-    CeleryJobLayer,
+    CeleryBzrsyncdJobLayer,
     DatabaseFunctionalLayer,
     LaunchpadZopelessLayer,
     )
@@ -1270,7 +1270,7 @@
 
 class TestViaCelery(TestCaseWithFactory):
 
-    layer = CeleryJobLayer
+    layer = CeleryBzrsyncdJobLayer
 
     def test_RosettaUploadJob(self):
         """Ensure RosettaUploadJob can run under Celery."""

=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2013-04-29 21:32:04 +0000
+++ lib/lp/services/config/schema-lazr.conf	2013-05-15 04:52:26 +0000
@@ -1817,19 +1817,35 @@
 
 [job_runner_queues]
 # The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow celerybeat
+queues: launchpad_job launchpad_job_slow bzrsyncd_job bzrsyncd_job_slow branch_write_job branch_write_job_slow celerybeat
 
 # The main job queue.
-[job]
-# The maximum job run time in seconds. When a job runs longer, Celery
-# terminates it with a SoftTimeLimitExceeded exception.
-timeout: 300
-# If a job times out, it will be queued again in the fallback queue.
-fallback_queue: job_slow
-concurrency: 3
-
-# The queue for jobs that time out in the queue "job".
-[job_slow]
+[launchpad_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 300
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: launchpad_job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[launchpad_job_slow]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 86400
+fallback_queue:
+concurrency: 1
+
+[bzrsyncd_job]
+# The maximum job run time in seconds. When a job runs longer, Celery
+# terminates it with a SoftTimeLimitExceeded exception.
+timeout: 300
+# If a job times out, it will be queued again in the fallback queue.
+fallback_queue: bzrsyncd_job_slow
+concurrency: 3
+
+# The queue for jobs that time out in the queue "job".
+[bzrsyncd_job_slow]
 # The maximum job run time in seconds. When a job runs longer, Celery
 # terminates it with a SoftTimeLimitExceeded exception.
 timeout: 86400

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-07-26 14:31:45 +0000
+++ lib/lp/services/job/celeryconfig.py	2013-05-15 04:52:26 +0000
@@ -79,7 +79,7 @@
     result['BROKER_VHOST'] = config.rabbitmq.virtual_host
     result['CELERY_CREATE_MISSING_QUEUES'] = False
     result['CELERY_DEFAULT_EXCHANGE'] = 'job'
-    result['CELERY_DEFAULT_QUEUE'] = 'job'
+    result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
     result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
     result['CELERY_QUEUES'] = celery_queues
     result['CELERY_RESULT_BACKEND'] = 'amqp'

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-11-26 08:33:03 +0000
+++ lib/lp/services/job/runner.py	2013-05-15 04:52:26 +0000
@@ -109,7 +109,7 @@
 
     retry_error_types = ()
 
-    task_queue = 'job'
+    task_queue = 'launchpad_job'
 
     celery_responses = None
 

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2012-07-26 14:31:45 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2013-05-15 04:52:26 +0000
@@ -25,8 +25,9 @@
         # Four queues are defined; the binding key for each queue is
         # just the queue name.
         queue_names = [
-            'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
-            'job_slow']
+            'branch_write_job', 'branch_write_job_slow',
+            'bzrsyncd_job', 'bzrsyncd_job_slow', 'celerybeat',
+            'launchpad_job', 'launchpad_job_slow']
         queues = config['CELERY_QUEUES']
         self.assertEqual(queue_names, sorted(queues))
         for name in queue_names:
@@ -41,7 +42,7 @@
         self.assertEqual('/', config['BROKER_VHOST'])
         self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
         self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
-        self.assertEqual('job', config['CELERY_DEFAULT_QUEUE'])
+        self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
         self.assertEqual(
             ('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
         self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
@@ -64,10 +65,10 @@
         from lp.services.job.celeryconfig import configure
         expected = {
             'concurrency': 3,
-            'fallback': 'job_slow',
+            'fallback': 'launchpad_job_slow',
             'timeout': 300,
             }
-        config = configure(['celeryd', '-Q', 'job'])
+        config = configure(['celeryd', '-Q', 'launchpad_job'])
         self.check_default_common_parameters(config)
         self.check_job_specific_celeryd_configuration(expected, config)
         config = configure(['celeryd', '-Q', 'branch_write_job'])
@@ -82,7 +83,7 @@
             'fallback': None,
             'timeout': 86400,
             }
-        config = configure(['celeryd', '-Q', 'job_slow'])
+        config = configure(['celeryd', '-Q', 'launchpad_job_slow'])
         self.check_default_common_parameters(config)
         self.check_job_specific_celeryd_configuration(expected, config)
         config = configure(['celeryd', '-Q', 'branch_write_job_slow'])
@@ -99,12 +100,12 @@
             )
         with changed_config(
             """
-            [job_slow]
-            fallback_queue: job
+            [launchpad_job_slow]
+            fallback_queue: launchpad_job
         """):
             error = (
-                "Circular chain of fallback queues: job already in "
-                "['job', 'job_slow']"
+                "Circular chain of fallback queues: launchpad_job already in "
+                "['launchpad_job', 'launchpad_job_slow']"
                 )
             self.assertRaisesWithContent(
                 ConfigurationError, error, configure, [''])
@@ -133,7 +134,7 @@
         error = 'A celeryd instance may serve only one queue.'
         self.assertRaisesWithContent(
             ConfigurationError, error, configure,
-            ['celeryd', '--queue=job,branch_write_job'])
+            ['celeryd', '--queue=launchpad_job,branch_write_job'])
 
     def test_unconfigured_queue_for_celeryd(self):
         # An exception is raised when celeryd is started for a queue that

=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py	2013-01-07 03:21:35 +0000
+++ lib/lp/testing/layers.py	2013-05-15 04:52:26 +0000
@@ -1904,7 +1904,25 @@
     @classmethod
     @profiled
     def setUp(cls):
-        cls.celeryd = celeryd('job')
+        cls.celeryd = celeryd('launchpad_job')
+        cls.celeryd.__enter__()
+
+    @classmethod
+    @profiled
+    def tearDown(cls):
+        cls.celeryd.__exit__(None, None, None)
+        cls.celeryd = None
+
+
+class CeleryBzrsyncdJobLayer(AppServerLayer):
+    """Layer for tests that run jobs that read from branches via Celery."""
+
+    celeryd = None
+
+    @classmethod
+    @profiled
+    def setUp(cls):
+        cls.celeryd = celeryd('bzrsyncd_job')
         cls.celeryd.__enter__()
 
     @classmethod