← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #972098 in Launchpad itself: "celery RabbitMQ queue slowly increasing on ackee"
  https://bugs.launchpad.net/launchpad/+bug/972098

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-job-feature-flag/+merge/100647

= Summary =
Fix 972098: celery RabbitMQ queue slowly increasing on ackee

== Proposed fix ==
Implement a feature flag to control which kinds of jobs run via Celery, so that none will run by default.

== Pre-implementation notes ==
Discussed with deryck

== Implementation details ==
jobs.celery.enabled_classses is a space-separated list of BaseJobRunner subclasses that should be run via Celery.  It is implemented via lp.services.job.runner.celery_enabled().

In order to test that branchChanged was honouring the flag, we needed to test it specifically.  But that didn't provide a way to get the Celery response, so that we could call response.wait.  So I implemented monitor_celery, to provide the responses.  This implementation also ensures that if responses will not be used, they are not sent in the first place.

== Tests ==
bin/test -t TestBranchJobViaCelery -t TestCeleryEnabled

== Demo and Q/A ==
Have webops determine the length of the "celery" RabbitMQ queue.  Push a new branch.  Have webops determine the length of the "celery" RabbitMQ queue.  The length should not have changed.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/code/model/branch.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/tests/test_branch.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-job-feature-flag/+merge/100647
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad.
=== modified file 'lib/lp/code/model/branch.py'
--- lib/lp/code/model/branch.py	2012-03-27 18:13:01 +0000
+++ lib/lp/code/model/branch.py	2012-04-03 16:24:23 +0000
@@ -150,6 +150,10 @@
 from lp.services.helpers import shortlist
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import Job
+from lp.services.job.runner import (
+    BaseRunnableJob,
+    celery_enabled,
+    )
 from lp.services.mail.notificationrecipientset import NotificationRecipientSet
 from lp.services.propertycache import cachedproperty
 from lp.services.webapp import urlappend
@@ -1060,13 +1064,18 @@
         if self.last_scanned_id != last_revision_id:
             from lp.code.model.branchjob import BranchScanJob
             job_id = BranchScanJob.create(self).job_id
-            if celery_scan:
+            if celery_scan and celery_enabled('BranchScanJob'):
                 # lp.services.job.celery is imported only where needed.
                 from lp.services.job.celeryjob import CeleryRunJob
                 current = transaction.get()
+
                 def runHook(succeeded):
+                    ignore_result = (BaseRunnableJob.celery_responses is None)
                     if succeeded:
-                        CeleryRunJob.delay(job_id)
+                        response = CeleryRunJob.apply_async(
+                            (job_id,), ignore_result=ignore_result)
+                        if not ignore_result:
+                            BaseRunnableJob.celery_responses.append(response)
                 current.addAfterCommitHook(runHook)
         self.control_format = control_format
         self.branch_format = branch_format

=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2012-03-26 21:03:05 +0000
+++ lib/lp/code/model/tests/test_branch.py	2012-04-03 16:24:23 +0000
@@ -12,6 +12,7 @@
     datetime,
     timedelta,
     )
+import os
 
 from bzrlib.bzrdir import BzrDir
 from bzrlib.revision import NULL_REVISION
@@ -109,6 +110,7 @@
 from lp.services.config import config
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.lpstorm import IStore
+from lp.services.features.testing import FeatureFixture
 from lp.services.osutils import override_environ
 from lp.services.propertycache import clear_property_cache
 from lp.services.webapp.interfaces import IOpenLaunchBag
@@ -132,6 +134,7 @@
     DatabaseFunctionalLayer,
     LaunchpadFunctionalLayer,
     LaunchpadZopelessLayer,
+    ZopelessAppServerLayer,
     )
 from lp.translations.model.translationtemplatesbuildjob import (
     ITranslationTemplatesBuildJobSource,
@@ -285,6 +288,56 @@
              branch.repository_format))
 
 
+class TestBranchJobViaCelery(TestCaseWithFactory):
+
+    layer = ZopelessAppServerLayer
+
+    def test_branchChanged_via_celery(self):
+        """Running a job via Celery succeeds and emits expected output."""
+        # Delay importing anything that uses Celery until RabbitMQLayer is
+        # running, so that config.rabbitmq.host is defined when
+        # lp.services.job.celeryconfig is loaded.
+        from lp.services.job.celeryjob import CeleryRunJob
+        from lp.services.job.tests.test_celeryjob import monitor_celery
+        from celery.exceptions import TimeoutError
+        from lazr.jobrunner.tests.test_celerytask import running
+        self.useFixture(FeatureFixture({'jobs.celery.enabled_classses':
+            'BranchScanJob'}))
+        cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
+        env = dict(os.environ)
+        env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
+        with running('bin/celeryd', cmd_args, env=env) as proc:
+            self.useBzrBranches()
+            db_branch, bzr_tree = self.create_branch_and_tree()
+            bzr_tree.commit(
+                'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+            db_branch.branchChanged(None, 'rev1', None, None, None)
+            with monitor_celery() as responses:
+                transaction.commit()
+                try:
+                    responses[-1].wait(30)
+                except TimeoutError:
+                    pass
+        self.assertIn(
+            'Updating branch scanner status: 1 revs', proc.stderr.read())
+        self.assertEqual(db_branch.revision_count, 1)
+
+    def test_branchChanged_via_celery_no_enabled(self):
+        """Running a job via Celery succeeds and emits expected output."""
+        # Delay importing anything that uses Celery until RabbitMQLayer is
+        # running, so that config.rabbitmq.host is defined when
+        # lp.services.job.celeryconfig is loaded.
+        from lp.services.job.tests.test_celeryjob import monitor_celery
+        self.useBzrBranches()
+        db_branch, bzr_tree = self.create_branch_and_tree()
+        bzr_tree.commit(
+            'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+        db_branch.branchChanged(None, 'rev1', None, None, None)
+        with monitor_celery() as responses:
+            transaction.commit()
+            self.assertEqual([], responses)
+
+
 class TestBranchRevisionMethods(TestCaseWithFactory):
     """Test the branch methods for adding and removing branch revisions."""
 

=== modified file 'lib/lp/services/features/flags.py'
--- lib/lp/services/features/flags.py	2012-03-26 21:23:40 +0000
+++ lib/lp/services/features/flags.py	2012-04-03 16:24:23 +0000
@@ -125,6 +125,12 @@
      '',
      '',
      ''),
+    ('jobs.celery.enabled_classses',
+     'space delimited',
+     'Names of Job classes that should be run via celery',
+     'No jobs run via celery',
+     'Celery-enabled job classes',
+     'https://dev.launchpad.net/CeleryJobRunner'),
     ('js.combo_loader.enabled',
      'boolean',
      'Determines if we use a js combo loader or not.',

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-03-23 17:26:11 +0000
+++ lib/lp/services/job/runner.py	2012-04-03 16:24:23 +0000
@@ -9,6 +9,7 @@
     'BaseJobRunner',
     'BaseRunnableJob',
     'BaseRunnableJobSource',
+    'celery_enabled',
     'JobCronScript',
     'JobRunner',
     'JobRunnerProcess',
@@ -62,6 +63,7 @@
     config,
     dbconfig,
     )
+from lp.services.features import getFeatureFlag
 from lp.services.job.interfaces.job import (
     IJob,
     IRunnableJob,
@@ -101,6 +103,8 @@
 
     retry_error_types = ()
 
+    celery_responses = None
+
     # We redefine __eq__ and __ne__ here to prevent the security proxy
     # from mucking up our comparisons in tests and elsewhere.
     def __eq__(self, job):
@@ -585,3 +589,14 @@
 
     def __init__(self):
         Exception.__init__(self, "Job ran too long.")
+
+
+def celery_enabled(class_name):
+    """Determine whether a given class is configured to run via Celery.
+
+    The name of a BaseRunnableJob must be specified.
+    """
+    flag = getFeatureFlag('jobs.celery.enabled_classses')
+    if flag is None:
+        return False
+    return class_name in flag.split(' ')

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-03-23 18:39:09 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2012-04-03 16:24:23 +0000
@@ -2,15 +2,29 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 
+from contextlib import contextmanager
 import os
 
 import transaction
 
 from lp.code.model.branchjob import BranchScanJob
+from lp.services.job.runner import BaseRunnableJob
 from lp.testing import TestCaseWithFactory
 from lp.testing.layers import ZopelessAppServerLayer
 
 
+@contextmanager
+def monitor_celery():
+    """Context manager that provides a list of Celery responses."""
+    responses = []
+    old_responses = BaseRunnableJob.celery_responses
+    BaseRunnableJob.celery_responses = responses
+    try:
+        yield responses
+    finally:
+        BaseRunnableJob.celery_responses = old_responses
+
+
 class TestCelery(TestCaseWithFactory):
 
     layer = ZopelessAppServerLayer

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2012-03-30 18:13:38 +0000
+++ lib/lp/services/job/tests/test_runner.py	2012-04-03 16:24:23 +0000
@@ -19,6 +19,7 @@
 
 from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
 from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import (
     IRunnableJob,
     JobStatus,
@@ -26,6 +27,7 @@
 from lp.services.job.model.job import Job
 from lp.services.job.runner import (
     BaseRunnableJob,
+    celery_enabled,
     JobCronScript,
     JobRunner,
     TwistedJobRunner,
@@ -705,3 +707,32 @@
         """No --log-twisted sets JobCronScript.log_twisted False."""
         jcs = JobCronScript(TwistedJobRunner, test_args=[])
         self.assertFalse(jcs.log_twisted)
+
+
+class TestCeleryEnabled(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+
+    def test_no_flag(self):
+        """With no flag set, result is False."""
+        self.assertFalse(celery_enabled('foo'))
+
+    def test_matching_flag(self):
+        """A matching flag returns True."""
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classses': 'foo bar'}))
+        self.assertTrue(celery_enabled('foo'))
+        self.assertTrue(celery_enabled('bar'))
+
+    def test_non_matching_flag(self):
+        """A non-matching flag returns false."""
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classses': 'foo bar'}))
+        self.assertFalse(celery_enabled('baz'))
+        self.assertTrue(celery_enabled('bar'))
+
+    def test_substring(self):
+        """A substring of an enabled class does not match."""
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classses': 'foobar'}))
+        self.assertFalse(celery_enabled('bar'))