← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-job-layer into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-job-layer into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-3 as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-job-layer/+merge/101806

= Summary =
Run celeryd via a layer in tests.

== Proposed fix ==
None

== Pre-implementation notes ==
Robert confirmed that Layers are still our preferred means of reducing redundant startup/teardown costs.

== Implementation details ==
Implement CeleryJobLayer and CeleryBranchWriteJobLayer to provide celeryd instances for the corresponding queues.

Update all existing tests to use these layers

Extract block_on_job from various tests that retrieve the job's response and then wait.

lp.services.job.tests.celeryd only specifies parameters to lazr.jobrunner.tests.test_celerytask.running, and does not change its behaviour, so it can return running instead of being a contextmanager, itself.

== Tests ==
bin/test --layer=CeleryJobLayer
bin/test --layer=CeleryBranchWriteJobLayer

== Demo and Q/A ==
None.


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/codehosting/scanner/tests/test_mergedetection.py
  lib/lp/services/job/runner.py
  lib/lp/code/model/tests/test_branchmergeproposaljobs.py
  lib/lp/code/model/branchmergeproposal.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/codehosting/scanner/tests/test_email.py
  lib/lp/services/job/tests/test_job.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/branchmergeproposaljob.py
  lib/lp/testing/layers.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/codehosting/scanner/email.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-job-layer/+merge/101806
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-job-layer into lp:launchpad.
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2012-04-10 20:24:43 +0000
+++ lib/lp/code/model/tests/test_branch.py	2012-04-12 20:15:24 +0000
@@ -12,7 +12,6 @@
     datetime,
     timedelta,
     )
-import os
 
 from bzrlib.branch import Branch
 from bzrlib.bzrdir import BzrDir
@@ -118,7 +117,7 @@
 from lp.services.database.lpstorm import IStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.tests import (
-    celeryd,
+    block_on_job,
     monitor_celery,
     )
 from lp.services.osutils import override_environ
@@ -141,6 +140,8 @@
 from lp.testing.factory import LaunchpadObjectFactory
 from lp.testing.layers import (
     AppServerLayer,
+    CeleryBranchWriteJobLayer,
+    CeleryJobLayer,
     DatabaseFunctionalLayer,
     LaunchpadFunctionalLayer,
     LaunchpadZopelessLayer,
@@ -153,8 +154,9 @@
 
 def create_knit(test_case):
     db_branch, tree = test_case.create_branch_and_tree(format='knit')
-    db_branch.branch_format = BranchFormat.BZR_BRANCH_5
-    db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
+    with person_logged_in(db_branch.owner):
+        db_branch.branch_format = BranchFormat.BZR_BRANCH_5
+        db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
     return db_branch, tree
 
 
@@ -307,62 +309,75 @@
 
 class TestBranchJobViaCelery(TestCaseWithFactory):
 
-    layer = ZopelessAppServerLayer
+    layer = CeleryJobLayer
 
     def test_branchChanged_via_celery(self):
         """Running a job via Celery succeeds and emits expected output."""
         # Delay importing anything that uses Celery until RabbitMQLayer is
         # running, so that config.rabbitmq.host is defined when
         # lp.services.job.celeryconfig is loaded.
-        from celery.exceptions import TimeoutError
         self.useFixture(FeatureFixture({
             'jobs.celery.enabled_classes': 'BranchScanJob'}))
-        with celeryd('job') as proc:
-            self.useBzrBranches()
-            db_branch, bzr_tree = self.create_branch_and_tree()
-            bzr_tree.commit(
-                'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+        self.useBzrBranches()
+        db_branch, bzr_tree = self.create_branch_and_tree()
+        bzr_tree.commit(
+            'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
+        with person_logged_in(db_branch.owner):
             db_branch.branchChanged(None, 'rev1', None, None, None)
-            with monitor_celery() as responses:
-                transaction.commit()
-                try:
-                    responses[-1].wait(30)
-                except TimeoutError:
-                    pass
-        self.assertIn(
-            'Updating branch scanner status: 1 revs', proc.stderr.read())
+        with block_on_job():
+            transaction.commit()
         self.assertEqual(db_branch.revision_count, 1)
 
     def test_branchChanged_via_celery_no_enabled(self):
-        """Running a job via Celery succeeds and emits expected output."""
+        """With no feature flag, no task is created."""
         self.useBzrBranches()
         db_branch, bzr_tree = self.create_branch_and_tree()
         bzr_tree.commit(
             'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
-        db_branch.branchChanged(None, 'rev1', None, None, None)
+        with person_logged_in(db_branch.owner):
+            db_branch.branchChanged(None, 'rev1', None, None, None)
         with monitor_celery() as responses:
             transaction.commit()
             self.assertEqual([], responses)
 
+
+class TestBranchWriteJobViaCelery(TestCaseWithFactory):
+
+    layer = CeleryBranchWriteJobLayer
+
     def test_destroySelf_via_celery(self):
         """Calling destroySelf causes Celery to delete the branch."""
-        from celery.exceptions import TimeoutError
         self.useFixture(FeatureFixture({
             'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'}))
-        with celeryd('branch_write_job'):
-            self.useBzrBranches()
-            db_branch, tree = self.create_branch_and_tree()
-            branch_path = get_real_branch_path(db_branch.id)
-            self.assertThat(branch_path, PathExists())
+        self.useBzrBranches()
+        db_branch, tree = self.create_branch_and_tree()
+        branch_path = get_real_branch_path(db_branch.id)
+        self.assertThat(branch_path, PathExists())
+        with person_logged_in(db_branch.owner):
             db_branch.destroySelf()
-            with monitor_celery() as responses:
-                transaction.commit()
-                try:
-                    responses[-1].wait(30)
-                except TimeoutError:
-                    pass
+        with block_on_job():
+            transaction.commit()
         self.assertThat(branch_path, Not(PathExists()))
 
+    def test_requestUpgradeUsesCelery(self):
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
+        self.useBzrBranches()
+        db_branch, tree = create_knit(self)
+        self.assertEqual(
+            tree.branch.repository._format.get_format_string(),
+            'Bazaar-NG Knit Repository Format 1')
+
+        with person_logged_in(db_branch.owner):
+            db_branch.requestUpgrade(db_branch.owner)
+        with block_on_job():
+            transaction.commit()
+        new_branch = Branch.open(tree.branch.base)
+        self.assertEqual(
+            new_branch.repository._format.get_format_string(),
+            'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
+        self.assertFalse(db_branch.needs_upgrading)
+
 
 class TestBranchRevisionMethods(TestCaseWithFactory):
     """Test the branch methods for adding and removing branch revisions."""
@@ -816,27 +831,6 @@
             jobs,
             [job, ])
 
-    def test_requestUpgradeUsesCelery(self):
-        self.useFixture(FeatureFixture({
-            'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
-        cwd = os.getcwd()
-        self.useBzrBranches()
-        db_branch, tree = create_knit(self)
-        self.assertEqual(
-            tree.branch.repository._format.get_format_string(),
-            'Bazaar-NG Knit Repository Format 1')
-
-        db_branch.requestUpgrade(db_branch.owner)
-        with monitor_celery() as responses:
-            transaction.commit()
-            with celeryd('branch_write_job', cwd):
-                responses[-1].wait(30)
-        new_branch = Branch.open(tree.branch.base)
-        self.assertEqual(
-            new_branch.repository._format.get_format_string(),
-            'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
-        self.assertFalse(db_branch.needs_upgrading)
-
     def test_requestUpgrade_no_upgrade_needed(self):
         # If a branch doesn't need to be upgraded, requestUpgrade raises an
         # AlreadyLatestFormat.

=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py	2012-04-10 20:24:43 +0000
+++ lib/lp/code/model/tests/test_branchjob.py	2012-04-12 20:15:24 +0000
@@ -74,8 +74,7 @@
 from lp.services.job.model.job import Job
 from lp.services.job.runner import JobRunner
 from lp.services.job.tests import (
-    celeryd,
-    monitor_celery,
+    block_on_job,
     )
 from lp.services.osutils import override_environ
 from lp.services.webapp import canonical_url
@@ -88,7 +87,7 @@
     switch_dbuser,
     )
 from lp.testing.layers import (
-    AppServerLayer,
+    CeleryJobLayer,
     DatabaseFunctionalLayer,
     LaunchpadZopelessLayer,
     )
@@ -1234,11 +1233,10 @@
 
 class TestViaCelery(TestCaseWithFactory):
 
-    layer = AppServerLayer
+    layer = CeleryJobLayer
 
     def test_RosettaUploadJob(self):
         """Ensure RosettaUploadJob can run under Celery."""
-        self.useContext(celeryd('job'))
         self.useBzrBranches(direct_database=True)
         self.useFixture(FeatureFixture({
             'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob'
@@ -1247,19 +1245,17 @@
         self.createBzrBranch(db_branch)
         commit = DirectBranchCommit(db_branch, no_race_check=True)
         commit.writeFile('foo.pot', 'gibberish')
-        with monitor_celery() as responses:
-            with person_logged_in(db_branch.owner):
+        with person_logged_in(db_branch.owner):
+            # wait for branch scan
+            with block_on_job():
                 commit.commit('message')
                 transaction.commit()
-                # Wait for branch scan to complete.
-                responses[0].wait(30)
-                series = self.factory.makeProductSeries(branch=db_branch)
-                RosettaUploadJob.create(
-                    commit.db_branch, NULL_REVISION,
-                    force_translations_upload=True)
-                transaction.commit()
-        # Wait for RosettaUploadJob to complete
-        responses[1].wait(30)
+        series = self.factory.makeProductSeries(branch=db_branch)
+        with block_on_job():
+            RosettaUploadJob.create(
+                commit.db_branch, NULL_REVISION,
+                force_translations_upload=True)
+            transaction.commit()
         queue = getUtility(ITranslationImportQueue)
         entries = list(queue.getAllEntries(target=series))
         self.assertEqual(len(entries), 1)

=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2012-04-12 20:15:23 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2012-04-12 20:15:24 +0000
@@ -60,8 +60,7 @@
 from lp.services.job.model.job import Job
 from lp.services.job.runner import JobRunner
 from lp.services.job.tests import (
-    celeryd,
-    monitor_celery,
+    block_on_job,
     pop_remote_notifications,
     )
 from lp.services.osutils import override_environ
@@ -72,7 +71,7 @@
     )
 from lp.testing.dbuser import dbuser
 from lp.testing.layers import (
-    AppServerLayer,
+    CeleryJobLayer,
     LaunchpadZopelessLayer,
     )
 from lp.testing.mail_helpers import pop_notifications
@@ -595,79 +594,67 @@
 
 class TestViaCelery(TestCaseWithFactory):
 
-    layer = AppServerLayer
+    layer = CeleryJobLayer
 
     def test_MergeProposalNeedsReviewEmailJob(self):
         """MergeProposalNeedsReviewEmailJob runs under Celery."""
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes':
              'MergeProposalNeedsReviewEmailJob'}))
-        self.useContext(celeryd('job'))
         bmp = self.factory.makeBranchMergeProposal()
-        with monitor_celery() as responses:
+        with block_on_job():
             MergeProposalNeedsReviewEmailJob.create(bmp)
             transaction.commit()
-        responses[0].wait(30)
         self.assertEqual(2, len(pop_remote_notifications()))
 
     def test_UpdatePreviewDiffJob(self):
         """UpdatePreviewDiffJob runs under Celery."""
-        self.useContext(celeryd('job'))
         self.useBzrBranches(direct_database=True)
         bmp = create_example_merge(self)[0]
         self.factory.makeRevisionsForBranch(bmp.source_branch, count=1)
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': 'UpdatePreviewDiffJob'}))
-        with monitor_celery() as responses:
+        with block_on_job():
             UpdatePreviewDiffJob.create(bmp)
             transaction.commit()
-            responses[0].wait(30)
         self.assertIsNot(None, bmp.preview_diff)
 
     def test_CodeReviewCommentEmailJob(self):
         """CodeReviewCommentEmailJob runs under Celery."""
         comment = self.factory.makeCodeReviewComment()
-        self.useContext(celeryd('job'))
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': 'CodeReviewCommentEmailJob'}))
-        with monitor_celery() as responses:
+        with block_on_job():
             CodeReviewCommentEmailJob.create(comment)
             transaction.commit()
-        responses[0].wait(30)
         self.assertEqual(2, len(pop_remote_notifications()))
 
     def test_ReviewRequestedEmailJob(self):
         """ReviewRequestedEmailJob runs under Celery."""
         request = self.factory.makeCodeReviewVoteReference()
-        self.useContext(celeryd('job'))
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': 'ReviewRequestedEmailJob'}))
-        with monitor_celery() as responses:
+        with block_on_job():
             ReviewRequestedEmailJob.create(request)
             transaction.commit()
-        responses[0].wait(30)
         self.assertEqual(1, len(pop_remote_notifications()))
 
     def test_MergeProposalUpdatedEmailJob(self):
         """MergeProposalUpdatedEmailJob runs under Celery."""
         bmp = self.factory.makeBranchMergeProposal()
-        self.useContext(celeryd('job'))
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': 'MergeProposalUpdatedEmailJob'}))
-        with monitor_celery() as responses:
+        with block_on_job():
             MergeProposalUpdatedEmailJob.create(
                 bmp, 'change', bmp.registrant)
             transaction.commit()
-        responses[0].wait(30)
         self.assertEqual(2, len(pop_remote_notifications()))
 
     def test_GenerateIncrementalDiffJob(self):
         """GenerateIncrementalDiffJob runs under Celery."""
-        self.useContext(celeryd('job'))
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': 'GenerateIncrementalDiffJob'}))
-        with monitor_celery() as responses:
+        with block_on_job():
             job = make_runnable_incremental_diff_job(self)
             transaction.commit()
-        responses[0].wait(30)
         self.assertEqual(JobStatus.COMPLETED, job.status)

=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py	2012-04-12 20:15:23 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py	2012-04-12 20:15:24 +0000
@@ -30,15 +30,14 @@
 from lp.services.job.runner import JobRunner
 from lp.services.mail import stub
 from lp.services.job.tests import (
-    celeryd,
-    monitor_celery,
+    block_on_job,
     pop_remote_notifications,
     )
 from lp.testing import TestCaseWithFactory
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.layers import (
+    CeleryJobLayer,
     LaunchpadZopelessLayer,
-    ZopelessAppServerLayer,
     )
 
 
@@ -162,12 +161,11 @@
 
 class TestViaCelery(TestCaseWithFactory):
 
-    layer = ZopelessAppServerLayer
+    layer = CeleryJobLayer
 
     def prepare(self, job_name):
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': job_name}))
-        self.useContext(celeryd('job'))
         self.useBzrBranches(direct_database=True)
         db_branch, tree = self.create_branch_and_tree()
         add_subscriber(db_branch)
@@ -179,9 +177,8 @@
     def test_empty_branch(self):
         """RevisionMailJob for empty branches runs via Celery."""
         db_branch, tree = self.prepare('RevisionMailJob')
-        with monitor_celery() as responses:
+        with block_on_job():
             BzrSync(db_branch).syncBranchAndClose(tree.branch)
-        responses[-1].wait(30)
         self.assertEqual(1, len(pop_remote_notifications()))
 
     def test_uncommit_branch(self):
@@ -189,13 +186,12 @@
         db_branch, tree = self.prepare('RevisionMailJob')
         tree.commit('message')
         bzr_sync = BzrSync(db_branch)
-        with monitor_celery() as responses:
-            bzr_sync.syncBranchAndClose(tree.branch)
-            responses[0].wait(30)
-            pop_remote_notifications()
-            uncommit(tree.branch)
-            bzr_sync.syncBranchAndClose(tree.branch)
-        responses[1].wait(30)
+        with block_on_job():
+            bzr_sync.syncBranchAndClose(tree.branch)
+        pop_remote_notifications()
+        uncommit(tree.branch)
+        with block_on_job():
+            bzr_sync.syncBranchAndClose(tree.branch)
         self.assertEqual(1, len(pop_remote_notifications()))
 
     def test_revisions_added(self):
@@ -206,10 +202,9 @@
         bzr_sync.syncBranchAndClose(tree.branch)
         pop_remote_notifications()
         tree.commit('message2')
-        with monitor_celery() as responses:
+        with block_on_job():
             bzr_sync.syncBranchAndClose(tree.branch)
-            responses[-1].wait(30)
-            self.assertEqual(1, len(pop_remote_notifications()))
+        self.assertEqual(1, len(pop_remote_notifications()))
 
 
 class TestScanBranches(TestCaseWithFactory):

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-12 20:15:23 +0000
+++ lib/lp/services/job/model/job.py	2012-04-12 20:15:24 +0000
@@ -275,15 +275,20 @@
 
         return base_job.makeDerived(), store
 
+    @staticmethod
+    def clearStore(store):
+        transaction.abort()
+        getUtility(IZStorm).remove(store)
+        store.close()
+
     @classmethod
     def switchDBUser(cls, job_id):
         """Switch to the DB user associated with this Job ID."""
+        cls.clearStore(IStore(Job))
         derived, store = cls.getDerived(job_id)
         dbconfig.override(
             dbuser=derived.config.dbuser, isolation_level='read_committed')
-        transaction.abort()
-        getUtility(IZStorm).remove(store)
-        store.close()
+        cls.clearStore(store)
 
     @classmethod
     def get(cls, job_id):

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2012-04-12 20:15:23 +0000
+++ lib/lp/services/job/tests/__init__.py	2012-04-12 20:15:24 +0000
@@ -4,6 +4,7 @@
 __metaclass__ = type
 
 __all__ = [
+    'block_on_job',
     'celeryd',
     'monitor_celery',
     'pop_remote_notifications',
@@ -15,7 +16,6 @@
 from lp.services.job.runner import BaseRunnableJob
 
 
-@contextmanager
 def celeryd(queue, cwd=None):
     """Return a ContextManager for a celeryd instance.
 
@@ -35,9 +35,7 @@
         '--queues', queue,
         '--include', 'lp.services.job.tests.celery_helpers',
     )
-    with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
-        # Wait for celeryd startup to complete.
-        yield proc
+    return running('bin/celeryd', cmd_args, cwd=cwd)
 
 
 @contextmanager
@@ -52,6 +50,13 @@
         BaseRunnableJob.celery_responses = old_responses
 
 
+@contextmanager
+def block_on_job():
+    with monitor_celery() as responses:
+        yield
+    responses[-1].wait(30)
+
+
 def pop_remote_notifications():
     """Pop the notifications from a celeryd worker."""
     from lp.services.job.tests.celery_helpers import pop_notifications

=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py	2012-03-22 12:13:15 +0000
+++ lib/lp/testing/layers.py	2012-04-12 20:15:24 +0000
@@ -111,6 +111,7 @@
     ConfigUseFixture,
     )
 from lp.services.database.sqlbase import session_store
+from lp.services.job.tests import celeryd
 from lp.services.googlesearch.tests.googleserviceharness import (
     GoogleServiceTestSetup,
     )
@@ -1859,6 +1860,42 @@
         LayerProcessController.postTestInvariants()
 
 
+class CeleryJobLayer(AppServerLayer):
+    """Layer for tests that run jobs via Celery."""
+
+    celeryd = None
+
+    @classmethod
+    @profiled
+    def setUp(cls):
+        cls.celeryd = celeryd('job')
+        cls.celeryd.__enter__()
+
+    @classmethod
+    @profiled
+    def tearDown(cls):
+        cls.celeryd.__exit__(None, None, None)
+        cls.celeryd = None
+
+
+class CeleryBranchWriteJobLayer(AppServerLayer):
+    """Layer for tests that run jobs which write to branches via Celery."""
+
+    celeryd = None
+
+    @classmethod
+    @profiled
+    def setUp(cls):
+        cls.celeryd = celeryd('branch_write_job')
+        cls.celeryd.__enter__()
+
+    @classmethod
+    @profiled
+    def tearDown(cls):
+        cls.celeryd.__exit__(None, None, None)
+        cls.celeryd = None
+
+
 class ZopelessAppServerLayer(LaunchpadZopelessLayer):
     """Layer for tests that run in the zopeless environment with an appserver.
     """