launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07113
[Merge] lp:~abentley/launchpad/celery-job-layer into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-job-layer into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-3 as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-job-layer/+merge/101806
= Summary =
Run celeryd via a layer in tests.
== Proposed fix ==
None
== Pre-implementation notes ==
Robert confirmed that Layers are still our preferred means of reducing redundant startup/teardown costs.
== Implementation details ==
Implement CeleryJobLayer and CeleryBranchWriteJobLayer to provide celeryd instances for the corresponding queues.
Update all existing tests to use these layers
Extract block_on_job from various tests that retrieve the job's response and then wait.
lp.services.job.tests.celeryd only specifies parameters to lazr.jobrunner.tests.test_celerytask.running, and does not change its behaviour, so it can return running instead of being a contextmanager, itself.
== Tests ==
bin/test --layer=CeleryJobLayer
bin/test --layer=CeleryBranchWriteJobLayer
== Demo and Q/A ==
None.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/codehosting/scanner/tests/test_mergedetection.py
lib/lp/services/job/runner.py
lib/lp/code/model/tests/test_branchmergeproposaljobs.py
lib/lp/code/model/branchmergeproposal.py
lib/lp/code/model/tests/test_branchjob.py
lib/lp/services/job/tests/celery_helpers.py
lib/lp/services/job/celeryjob.py
lib/lp/codehosting/scanner/tests/test_email.py
lib/lp/services/job/tests/test_job.py
lib/lp/services/features/flags.py
lib/lp/code/model/branchmergeproposaljob.py
lib/lp/testing/layers.py
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/model/job.py
lib/lp/services/job/tests/__init__.py
lib/lp/codehosting/scanner/email.py
lib/lp/code/model/tests/test_branch.py
lib/lp/code/model/branchjob.py
--
https://code.launchpad.net/~abentley/launchpad/celery-job-layer/+merge/101806
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-job-layer into lp:launchpad.
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2012-04-10 20:24:43 +0000
+++ lib/lp/code/model/tests/test_branch.py 2012-04-12 20:15:24 +0000
@@ -12,7 +12,6 @@
datetime,
timedelta,
)
-import os
from bzrlib.branch import Branch
from bzrlib.bzrdir import BzrDir
@@ -118,7 +117,7 @@
from lp.services.database.lpstorm import IStore
from lp.services.features.testing import FeatureFixture
from lp.services.job.tests import (
- celeryd,
+ block_on_job,
monitor_celery,
)
from lp.services.osutils import override_environ
@@ -141,6 +140,8 @@
from lp.testing.factory import LaunchpadObjectFactory
from lp.testing.layers import (
AppServerLayer,
+ CeleryBranchWriteJobLayer,
+ CeleryJobLayer,
DatabaseFunctionalLayer,
LaunchpadFunctionalLayer,
LaunchpadZopelessLayer,
@@ -153,8 +154,9 @@
def create_knit(test_case):
db_branch, tree = test_case.create_branch_and_tree(format='knit')
- db_branch.branch_format = BranchFormat.BZR_BRANCH_5
- db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
+ with person_logged_in(db_branch.owner):
+ db_branch.branch_format = BranchFormat.BZR_BRANCH_5
+ db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
return db_branch, tree
@@ -307,62 +309,75 @@
class TestBranchJobViaCelery(TestCaseWithFactory):
- layer = ZopelessAppServerLayer
+ layer = CeleryJobLayer
def test_branchChanged_via_celery(self):
"""Running a job via Celery succeeds and emits expected output."""
# Delay importing anything that uses Celery until RabbitMQLayer is
# running, so that config.rabbitmq.host is defined when
# lp.services.job.celeryconfig is loaded.
- from celery.exceptions import TimeoutError
self.useFixture(FeatureFixture({
'jobs.celery.enabled_classes': 'BranchScanJob'}))
- with celeryd('job') as proc:
- self.useBzrBranches()
- db_branch, bzr_tree = self.create_branch_and_tree()
- bzr_tree.commit(
- 'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+ self.useBzrBranches()
+ db_branch, bzr_tree = self.create_branch_and_tree()
+ bzr_tree.commit(
+ 'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+ with person_logged_in(db_branch.owner):
db_branch.branchChanged(None, 'rev1', None, None, None)
- with monitor_celery() as responses:
- transaction.commit()
- try:
- responses[-1].wait(30)
- except TimeoutError:
- pass
- self.assertIn(
- 'Updating branch scanner status: 1 revs', proc.stderr.read())
+ with block_on_job():
+ transaction.commit()
self.assertEqual(db_branch.revision_count, 1)
def test_branchChanged_via_celery_no_enabled(self):
- """Running a job via Celery succeeds and emits expected output."""
+ """With no feature flag, no task is created."""
self.useBzrBranches()
db_branch, bzr_tree = self.create_branch_and_tree()
bzr_tree.commit(
'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
- db_branch.branchChanged(None, 'rev1', None, None, None)
+ with person_logged_in(db_branch.owner):
+ db_branch.branchChanged(None, 'rev1', None, None, None)
with monitor_celery() as responses:
transaction.commit()
self.assertEqual([], responses)
+
+class TestBranchWriteJobViaCelery(TestCaseWithFactory):
+
+ layer = CeleryBranchWriteJobLayer
+
def test_destroySelf_via_celery(self):
"""Calling destroySelf causes Celery to delete the branch."""
- from celery.exceptions import TimeoutError
self.useFixture(FeatureFixture({
'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'}))
- with celeryd('branch_write_job'):
- self.useBzrBranches()
- db_branch, tree = self.create_branch_and_tree()
- branch_path = get_real_branch_path(db_branch.id)
- self.assertThat(branch_path, PathExists())
+ self.useBzrBranches()
+ db_branch, tree = self.create_branch_and_tree()
+ branch_path = get_real_branch_path(db_branch.id)
+ self.assertThat(branch_path, PathExists())
+ with person_logged_in(db_branch.owner):
db_branch.destroySelf()
- with monitor_celery() as responses:
- transaction.commit()
- try:
- responses[-1].wait(30)
- except TimeoutError:
- pass
+ with block_on_job():
+ transaction.commit()
self.assertThat(branch_path, Not(PathExists()))
+ def test_requestUpgradeUsesCelery(self):
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
+ self.useBzrBranches()
+ db_branch, tree = create_knit(self)
+ self.assertEqual(
+ tree.branch.repository._format.get_format_string(),
+ 'Bazaar-NG Knit Repository Format 1')
+
+ with person_logged_in(db_branch.owner):
+ db_branch.requestUpgrade(db_branch.owner)
+ with block_on_job():
+ transaction.commit()
+ new_branch = Branch.open(tree.branch.base)
+ self.assertEqual(
+ new_branch.repository._format.get_format_string(),
+ 'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
+ self.assertFalse(db_branch.needs_upgrading)
+
class TestBranchRevisionMethods(TestCaseWithFactory):
"""Test the branch methods for adding and removing branch revisions."""
@@ -816,27 +831,6 @@
jobs,
[job, ])
- def test_requestUpgradeUsesCelery(self):
- self.useFixture(FeatureFixture({
- 'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
- cwd = os.getcwd()
- self.useBzrBranches()
- db_branch, tree = create_knit(self)
- self.assertEqual(
- tree.branch.repository._format.get_format_string(),
- 'Bazaar-NG Knit Repository Format 1')
-
- db_branch.requestUpgrade(db_branch.owner)
- with monitor_celery() as responses:
- transaction.commit()
- with celeryd('branch_write_job', cwd):
- responses[-1].wait(30)
- new_branch = Branch.open(tree.branch.base)
- self.assertEqual(
- new_branch.repository._format.get_format_string(),
- 'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
- self.assertFalse(db_branch.needs_upgrading)
-
def test_requestUpgrade_no_upgrade_needed(self):
# If a branch doesn't need to be upgraded, requestUpgrade raises an
# AlreadyLatestFormat.
=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py 2012-04-10 20:24:43 +0000
+++ lib/lp/code/model/tests/test_branchjob.py 2012-04-12 20:15:24 +0000
@@ -74,8 +74,7 @@
from lp.services.job.model.job import Job
from lp.services.job.runner import JobRunner
from lp.services.job.tests import (
- celeryd,
- monitor_celery,
+ block_on_job,
)
from lp.services.osutils import override_environ
from lp.services.webapp import canonical_url
@@ -88,7 +87,7 @@
switch_dbuser,
)
from lp.testing.layers import (
- AppServerLayer,
+ CeleryJobLayer,
DatabaseFunctionalLayer,
LaunchpadZopelessLayer,
)
@@ -1234,11 +1233,10 @@
class TestViaCelery(TestCaseWithFactory):
- layer = AppServerLayer
+ layer = CeleryJobLayer
def test_RosettaUploadJob(self):
"""Ensure RosettaUploadJob can run under Celery."""
- self.useContext(celeryd('job'))
self.useBzrBranches(direct_database=True)
self.useFixture(FeatureFixture({
'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob'
@@ -1247,19 +1245,17 @@
self.createBzrBranch(db_branch)
commit = DirectBranchCommit(db_branch, no_race_check=True)
commit.writeFile('foo.pot', 'gibberish')
- with monitor_celery() as responses:
- with person_logged_in(db_branch.owner):
+ with person_logged_in(db_branch.owner):
+ # wait for branch scan
+ with block_on_job():
commit.commit('message')
transaction.commit()
- # Wait for branch scan to complete.
- responses[0].wait(30)
- series = self.factory.makeProductSeries(branch=db_branch)
- RosettaUploadJob.create(
- commit.db_branch, NULL_REVISION,
- force_translations_upload=True)
- transaction.commit()
- # Wait for RosettaUploadJob to complete
- responses[1].wait(30)
+ series = self.factory.makeProductSeries(branch=db_branch)
+ with block_on_job():
+ RosettaUploadJob.create(
+ commit.db_branch, NULL_REVISION,
+ force_translations_upload=True)
+ transaction.commit()
queue = getUtility(ITranslationImportQueue)
entries = list(queue.getAllEntries(target=series))
self.assertEqual(len(entries), 1)
=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-04-12 20:15:23 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-04-12 20:15:24 +0000
@@ -60,8 +60,7 @@
from lp.services.job.model.job import Job
from lp.services.job.runner import JobRunner
from lp.services.job.tests import (
- celeryd,
- monitor_celery,
+ block_on_job,
pop_remote_notifications,
)
from lp.services.osutils import override_environ
@@ -72,7 +71,7 @@
)
from lp.testing.dbuser import dbuser
from lp.testing.layers import (
- AppServerLayer,
+ CeleryJobLayer,
LaunchpadZopelessLayer,
)
from lp.testing.mail_helpers import pop_notifications
@@ -595,79 +594,67 @@
class TestViaCelery(TestCaseWithFactory):
- layer = AppServerLayer
+ layer = CeleryJobLayer
def test_MergeProposalNeedsReviewEmailJob(self):
"""MergeProposalNeedsReviewEmailJob runs under Celery."""
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes':
'MergeProposalNeedsReviewEmailJob'}))
- self.useContext(celeryd('job'))
bmp = self.factory.makeBranchMergeProposal()
- with monitor_celery() as responses:
+ with block_on_job():
MergeProposalNeedsReviewEmailJob.create(bmp)
transaction.commit()
- responses[0].wait(30)
self.assertEqual(2, len(pop_remote_notifications()))
def test_UpdatePreviewDiffJob(self):
"""UpdatePreviewDiffJob runs under Celery."""
- self.useContext(celeryd('job'))
self.useBzrBranches(direct_database=True)
bmp = create_example_merge(self)[0]
self.factory.makeRevisionsForBranch(bmp.source_branch, count=1)
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': 'UpdatePreviewDiffJob'}))
- with monitor_celery() as responses:
+ with block_on_job():
UpdatePreviewDiffJob.create(bmp)
transaction.commit()
- responses[0].wait(30)
self.assertIsNot(None, bmp.preview_diff)
def test_CodeReviewCommentEmailJob(self):
"""CodeReviewCommentEmailJob runs under Celery."""
comment = self.factory.makeCodeReviewComment()
- self.useContext(celeryd('job'))
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': 'CodeReviewCommentEmailJob'}))
- with monitor_celery() as responses:
+ with block_on_job():
CodeReviewCommentEmailJob.create(comment)
transaction.commit()
- responses[0].wait(30)
self.assertEqual(2, len(pop_remote_notifications()))
def test_ReviewRequestedEmailJob(self):
"""ReviewRequestedEmailJob runs under Celery."""
request = self.factory.makeCodeReviewVoteReference()
- self.useContext(celeryd('job'))
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': 'ReviewRequestedEmailJob'}))
- with monitor_celery() as responses:
+ with block_on_job():
ReviewRequestedEmailJob.create(request)
transaction.commit()
- responses[0].wait(30)
self.assertEqual(1, len(pop_remote_notifications()))
def test_MergeProposalUpdatedEmailJob(self):
"""MergeProposalUpdatedEmailJob runs under Celery."""
bmp = self.factory.makeBranchMergeProposal()
- self.useContext(celeryd('job'))
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': 'MergeProposalUpdatedEmailJob'}))
- with monitor_celery() as responses:
+ with block_on_job():
MergeProposalUpdatedEmailJob.create(
bmp, 'change', bmp.registrant)
transaction.commit()
- responses[0].wait(30)
self.assertEqual(2, len(pop_remote_notifications()))
def test_GenerateIncrementalDiffJob(self):
"""GenerateIncrementalDiffJob runs under Celery."""
- self.useContext(celeryd('job'))
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': 'GenerateIncrementalDiffJob'}))
- with monitor_celery() as responses:
+ with block_on_job():
job = make_runnable_incremental_diff_job(self)
transaction.commit()
- responses[0].wait(30)
self.assertEqual(JobStatus.COMPLETED, job.status)
=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py 2012-04-12 20:15:23 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-12 20:15:24 +0000
@@ -30,15 +30,14 @@
from lp.services.job.runner import JobRunner
from lp.services.mail import stub
from lp.services.job.tests import (
- celeryd,
- monitor_celery,
+ block_on_job,
pop_remote_notifications,
)
from lp.testing import TestCaseWithFactory
from lp.testing.dbuser import switch_dbuser
from lp.testing.layers import (
+ CeleryJobLayer,
LaunchpadZopelessLayer,
- ZopelessAppServerLayer,
)
@@ -162,12 +161,11 @@
class TestViaCelery(TestCaseWithFactory):
- layer = ZopelessAppServerLayer
+ layer = CeleryJobLayer
def prepare(self, job_name):
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': job_name}))
- self.useContext(celeryd('job'))
self.useBzrBranches(direct_database=True)
db_branch, tree = self.create_branch_and_tree()
add_subscriber(db_branch)
@@ -179,9 +177,8 @@
def test_empty_branch(self):
"""RevisionMailJob for empty branches runs via Celery."""
db_branch, tree = self.prepare('RevisionMailJob')
- with monitor_celery() as responses:
+ with block_on_job():
BzrSync(db_branch).syncBranchAndClose(tree.branch)
- responses[-1].wait(30)
self.assertEqual(1, len(pop_remote_notifications()))
def test_uncommit_branch(self):
@@ -189,13 +186,12 @@
db_branch, tree = self.prepare('RevisionMailJob')
tree.commit('message')
bzr_sync = BzrSync(db_branch)
- with monitor_celery() as responses:
- bzr_sync.syncBranchAndClose(tree.branch)
- responses[0].wait(30)
- pop_remote_notifications()
- uncommit(tree.branch)
- bzr_sync.syncBranchAndClose(tree.branch)
- responses[1].wait(30)
+ with block_on_job():
+ bzr_sync.syncBranchAndClose(tree.branch)
+ pop_remote_notifications()
+ uncommit(tree.branch)
+ with block_on_job():
+ bzr_sync.syncBranchAndClose(tree.branch)
self.assertEqual(1, len(pop_remote_notifications()))
def test_revisions_added(self):
@@ -206,10 +202,9 @@
bzr_sync.syncBranchAndClose(tree.branch)
pop_remote_notifications()
tree.commit('message2')
- with monitor_celery() as responses:
+ with block_on_job():
bzr_sync.syncBranchAndClose(tree.branch)
- responses[-1].wait(30)
- self.assertEqual(1, len(pop_remote_notifications()))
+ self.assertEqual(1, len(pop_remote_notifications()))
class TestScanBranches(TestCaseWithFactory):
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-04-12 20:15:23 +0000
+++ lib/lp/services/job/model/job.py 2012-04-12 20:15:24 +0000
@@ -275,15 +275,20 @@
return base_job.makeDerived(), store
+ @staticmethod
+ def clearStore(store):
+ transaction.abort()
+ getUtility(IZStorm).remove(store)
+ store.close()
+
@classmethod
def switchDBUser(cls, job_id):
"""Switch to the DB user associated with this Job ID."""
+ cls.clearStore(IStore(Job))
derived, store = cls.getDerived(job_id)
dbconfig.override(
dbuser=derived.config.dbuser, isolation_level='read_committed')
- transaction.abort()
- getUtility(IZStorm).remove(store)
- store.close()
+ cls.clearStore(store)
@classmethod
def get(cls, job_id):
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-04-12 20:15:23 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-04-12 20:15:24 +0000
@@ -4,6 +4,7 @@
__metaclass__ = type
__all__ = [
+ 'block_on_job',
'celeryd',
'monitor_celery',
'pop_remote_notifications',
@@ -15,7 +16,6 @@
from lp.services.job.runner import BaseRunnableJob
-@contextmanager
def celeryd(queue, cwd=None):
"""Return a ContextManager for a celeryd instance.
@@ -35,9 +35,7 @@
'--queues', queue,
'--include', 'lp.services.job.tests.celery_helpers',
)
- with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
- # Wait for celeryd startup to complete.
- yield proc
+ return running('bin/celeryd', cmd_args, cwd=cwd)
@contextmanager
@@ -52,6 +50,13 @@
BaseRunnableJob.celery_responses = old_responses
+@contextmanager
+def block_on_job():
+ with monitor_celery() as responses:
+ yield
+ responses[-1].wait(30)
+
+
def pop_remote_notifications():
"""Pop the notifications from a celeryd worker."""
from lp.services.job.tests.celery_helpers import pop_notifications
=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py 2012-03-22 12:13:15 +0000
+++ lib/lp/testing/layers.py 2012-04-12 20:15:24 +0000
@@ -111,6 +111,7 @@
ConfigUseFixture,
)
from lp.services.database.sqlbase import session_store
+from lp.services.job.tests import celeryd
from lp.services.googlesearch.tests.googleserviceharness import (
GoogleServiceTestSetup,
)
@@ -1859,6 +1860,42 @@
LayerProcessController.postTestInvariants()
+class CeleryJobLayer(AppServerLayer):
+ """Layer for tests that run jobs via Celery."""
+
+ celeryd = None
+
+ @classmethod
+ @profiled
+ def setUp(cls):
+ cls.celeryd = celeryd('job')
+ cls.celeryd.__enter__()
+
+ @classmethod
+ @profiled
+ def tearDown(cls):
+ cls.celeryd.__exit__(None, None, None)
+ cls.celeryd = None
+
+
+class CeleryBranchWriteJobLayer(AppServerLayer):
+ """Layer for tests that run jobs which write to branches via Celery."""
+
+ celeryd = None
+
+ @classmethod
+ @profiled
+ def setUp(cls):
+ cls.celeryd = celeryd('branch_write_job')
+ cls.celeryd.__enter__()
+
+ @classmethod
+ @profiled
+ def tearDown(cls):
+ cls.celeryd.__exit__(None, None, None)
+ cls.celeryd = None
+
+
class ZopelessAppServerLayer(LaunchpadZopelessLayer):
"""Layer for tests that run in the zopeless environment with an appserver.
"""