launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #06986
[Merge] lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #972098 in Launchpad itself: "celery RabbitMQ queue slowly increasing on ackee"
https://bugs.launchpad.net/launchpad/+bug/972098
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-job-feature-flag/+merge/100647
= Summary =
Fix 972098: celery RabbitMQ queue slowly increasing on ackee
== Proposed fix ==
Implement a feature flag to control which kinds of jobs run via Celery, so that none will run by default.
== Pre-implementation notes ==
Discussed with deryck
== Implementation details ==
jobs.celery.enabled_classses is a space-separated list of BaseJobRunner subclasses that should be run via Celery. It is implemented via lp.services.job.runner.celery_enabled().
In order to test that branchChanged was honouring the flag, we needed to test it specifically. But that didn't provide a way to get the Celery response, so that we could call response.wait. So I implemented monitor_celery, to provide the responses. This implementation also ensures that if responses will not be used, they are not sent in the first place.
== Tests ==
bin/test -t TestBranchJobViaCelery -t TestCeleryEnabled
== Demo and Q/A ==
Have webops determine the length of the "celery" RabbitMQ queue. Push a new branch. Have webops determine the length of the "celery" RabbitMQ queue. The length should not have changed.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/job/runner.py
lib/lp/services/job/tests/test_celeryjob.py
lib/lp/code/model/branch.py
lib/lp/services/features/flags.py
lib/lp/code/model/tests/test_branch.py
--
https://code.launchpad.net/~abentley/launchpad/celery-job-feature-flag/+merge/100647
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad.
=== modified file 'lib/lp/code/model/branch.py'
--- lib/lp/code/model/branch.py 2012-03-27 18:13:01 +0000
+++ lib/lp/code/model/branch.py 2012-04-03 16:24:23 +0000
@@ -150,6 +150,10 @@
from lp.services.helpers import shortlist
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
+from lp.services.job.runner import (
+ BaseRunnableJob,
+ celery_enabled,
+ )
from lp.services.mail.notificationrecipientset import NotificationRecipientSet
from lp.services.propertycache import cachedproperty
from lp.services.webapp import urlappend
@@ -1060,13 +1064,18 @@
if self.last_scanned_id != last_revision_id:
from lp.code.model.branchjob import BranchScanJob
job_id = BranchScanJob.create(self).job_id
- if celery_scan:
+ if celery_scan and celery_enabled('BranchScanJob'):
# lp.services.job.celery is imported only where needed.
from lp.services.job.celeryjob import CeleryRunJob
current = transaction.get()
+
def runHook(succeeded):
+ ignore_result = (BaseRunnableJob.celery_responses is None)
if succeeded:
- CeleryRunJob.delay(job_id)
+ response = CeleryRunJob.apply_async(
+ (job_id,), ignore_result=ignore_result)
+ if not ignore_result:
+ BaseRunnableJob.celery_responses.append(response)
current.addAfterCommitHook(runHook)
self.control_format = control_format
self.branch_format = branch_format
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2012-03-26 21:03:05 +0000
+++ lib/lp/code/model/tests/test_branch.py 2012-04-03 16:24:23 +0000
@@ -12,6 +12,7 @@
datetime,
timedelta,
)
+import os
from bzrlib.bzrdir import BzrDir
from bzrlib.revision import NULL_REVISION
@@ -109,6 +110,7 @@
from lp.services.config import config
from lp.services.database.constants import UTC_NOW
from lp.services.database.lpstorm import IStore
+from lp.services.features.testing import FeatureFixture
from lp.services.osutils import override_environ
from lp.services.propertycache import clear_property_cache
from lp.services.webapp.interfaces import IOpenLaunchBag
@@ -132,6 +134,7 @@
DatabaseFunctionalLayer,
LaunchpadFunctionalLayer,
LaunchpadZopelessLayer,
+ ZopelessAppServerLayer,
)
from lp.translations.model.translationtemplatesbuildjob import (
ITranslationTemplatesBuildJobSource,
@@ -285,6 +288,56 @@
branch.repository_format))
+class TestBranchJobViaCelery(TestCaseWithFactory):
+
+ layer = ZopelessAppServerLayer
+
+ def test_branchChanged_via_celery(self):
+ """Running a job via Celery succeeds and emits expected output."""
+ # Delay importing anything that uses Celery until RabbitMQLayer is
+ # running, so that config.rabbitmq.host is defined when
+ # lp.services.job.celeryconfig is loaded.
+ from lp.services.job.celeryjob import CeleryRunJob
+ from lp.services.job.tests.test_celeryjob import monitor_celery
+ from celery.exceptions import TimeoutError
+ from lazr.jobrunner.tests.test_celerytask import running
+ self.useFixture(FeatureFixture({'jobs.celery.enabled_classses':
+ 'BranchScanJob'}))
+ cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
+ env = dict(os.environ)
+ env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
+ with running('bin/celeryd', cmd_args, env=env) as proc:
+ self.useBzrBranches()
+ db_branch, bzr_tree = self.create_branch_and_tree()
+ bzr_tree.commit(
+ 'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+ db_branch.branchChanged(None, 'rev1', None, None, None)
+ with monitor_celery() as responses:
+ transaction.commit()
+ try:
+ responses[-1].wait(30)
+ except TimeoutError:
+ pass
+ self.assertIn(
+ 'Updating branch scanner status: 1 revs', proc.stderr.read())
+ self.assertEqual(db_branch.revision_count, 1)
+
+ def test_branchChanged_via_celery_no_enabled(self):
+ """Running a job via Celery succeeds and emits expected output."""
+ # Delay importing anything that uses Celery until RabbitMQLayer is
+ # running, so that config.rabbitmq.host is defined when
+ # lp.services.job.celeryconfig is loaded.
+ from lp.services.job.tests.test_celeryjob import monitor_celery
+ self.useBzrBranches()
+ db_branch, bzr_tree = self.create_branch_and_tree()
+ bzr_tree.commit(
+ 'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+ db_branch.branchChanged(None, 'rev1', None, None, None)
+ with monitor_celery() as responses:
+ transaction.commit()
+ self.assertEqual([], responses)
+
+
class TestBranchRevisionMethods(TestCaseWithFactory):
"""Test the branch methods for adding and removing branch revisions."""
=== modified file 'lib/lp/services/features/flags.py'
--- lib/lp/services/features/flags.py 2012-03-26 21:23:40 +0000
+++ lib/lp/services/features/flags.py 2012-04-03 16:24:23 +0000
@@ -125,6 +125,12 @@
'',
'',
''),
+ ('jobs.celery.enabled_classses',
+ 'space delimited',
+ 'Names of Job classes that should be run via celery',
+ 'No jobs run via celery',
+ 'Celery-enabled job classes',
+ 'https://dev.launchpad.net/CeleryJobRunner'),
('js.combo_loader.enabled',
'boolean',
'Determines if we use a js combo loader or not.',
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-03-23 17:26:11 +0000
+++ lib/lp/services/job/runner.py 2012-04-03 16:24:23 +0000
@@ -9,6 +9,7 @@
'BaseJobRunner',
'BaseRunnableJob',
'BaseRunnableJobSource',
+ 'celery_enabled',
'JobCronScript',
'JobRunner',
'JobRunnerProcess',
@@ -62,6 +63,7 @@
config,
dbconfig,
)
+from lp.services.features import getFeatureFlag
from lp.services.job.interfaces.job import (
IJob,
IRunnableJob,
@@ -101,6 +103,8 @@
retry_error_types = ()
+ celery_responses = None
+
# We redefine __eq__ and __ne__ here to prevent the security proxy
# from mucking up our comparisons in tests and elsewhere.
def __eq__(self, job):
@@ -585,3 +589,14 @@
def __init__(self):
Exception.__init__(self, "Job ran too long.")
+
+
+def celery_enabled(class_name):
+ """Determine whether a given class is configured to run via Celery.
+
+ The name of a BaseRunnableJob must be specified.
+ """
+ flag = getFeatureFlag('jobs.celery.enabled_classses')
+ if flag is None:
+ return False
+ return class_name in flag.split(' ')
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-03-23 18:39:09 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-04-03 16:24:23 +0000
@@ -2,15 +2,29 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
+from contextlib import contextmanager
import os
import transaction
from lp.code.model.branchjob import BranchScanJob
+from lp.services.job.runner import BaseRunnableJob
from lp.testing import TestCaseWithFactory
from lp.testing.layers import ZopelessAppServerLayer
+@contextmanager
+def monitor_celery():
+ """Context manager that provides a list of Celery responses."""
+ responses = []
+ old_responses = BaseRunnableJob.celery_responses
+ BaseRunnableJob.celery_responses = responses
+ try:
+ yield responses
+ finally:
+ BaseRunnableJob.celery_responses = old_responses
+
+
class TestCelery(TestCaseWithFactory):
layer = ZopelessAppServerLayer
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-03-30 18:13:38 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-04-03 16:24:23 +0000
@@ -19,6 +19,7 @@
from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import (
IRunnableJob,
JobStatus,
@@ -26,6 +27,7 @@
from lp.services.job.model.job import Job
from lp.services.job.runner import (
BaseRunnableJob,
+ celery_enabled,
JobCronScript,
JobRunner,
TwistedJobRunner,
@@ -705,3 +707,32 @@
"""No --log-twisted sets JobCronScript.log_twisted False."""
jcs = JobCronScript(TwistedJobRunner, test_args=[])
self.assertFalse(jcs.log_twisted)
+
+
+class TestCeleryEnabled(TestCaseWithFactory):
+
+ layer = LaunchpadZopelessLayer
+
+ def test_no_flag(self):
+ """With no flag set, result is False."""
+ self.assertFalse(celery_enabled('foo'))
+
+ def test_matching_flag(self):
+ """A matching flag returns True."""
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classses': 'foo bar'}))
+ self.assertTrue(celery_enabled('foo'))
+ self.assertTrue(celery_enabled('bar'))
+
+ def test_non_matching_flag(self):
+ """A non-matching flag returns false."""
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classses': 'foo bar'}))
+ self.assertFalse(celery_enabled('baz'))
+ self.assertTrue(celery_enabled('bar'))
+
+ def test_substring(self):
+ """A substring of an enabled class does not match."""
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classses': 'foobar'}))
+ self.assertFalse(celery_enabled('bar'))