← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-2/+merge/101284

= Summary =
Support revision mail jobs and translation jobs via Celery.

== Pre-implementation notes ==
None

== Implementation details ==
Add celeryRunJob calls for RosettaUploadJob, RevisionMailJob and RevisionsAddedJob, and test.

Provide pop_notifications task to retrieve the mail notifications from a celeryd.

Rename the feature flag to remove an extra "s" in "classes".

Rename the task queue names to "job" and "branch_write_job", to be more descriptive.

Run jobs with the appropriate database user.  To do this, introduce a "config" member on BaseRunnableJob that gives the configuration for that job type.

== Tests ==
bin/test -vt ViaCelery

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/codehosting/scanner/tests/test_email.py
  lib/lp/services/features/flags.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/codehosting/scanner/email.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-2/+merge/101284
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad.
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py	2012-04-02 20:46:13 +0000
+++ lib/lp/code/model/branchjob.py	2012-04-09 19:51:32 +0000
@@ -80,7 +80,10 @@
 from lp.code.model.branch import Branch
 from lp.code.model.branchmergeproposal import BranchMergeProposal
 from lp.code.model.revision import RevisionSet
-from lp.codehosting.bzrutils import server
+from lp.codehosting.bzrutils import (
+    read_locked,
+    server,
+    )
 from lp.codehosting.scanner.bzrsync import BzrSync
 from lp.codehosting.vfs import (
     get_ro_server,
@@ -89,6 +92,7 @@
 from lp.codehosting.vfs.branchfs import get_real_branch_path
 from lp.registry.interfaces.productseries import IProductSeriesSet
 from lp.scripts.helpers import TransactionFreeOperation
+from lp.services.config import config
 from lp.services.database.enumcol import EnumCol
 from lp.services.database.lpstorm import IStore
 from lp.services.database.sqlbase import SQLBase
@@ -296,6 +300,8 @@
     class_job_type = BranchJobType.SCAN_BRANCH
     memory_limit = 2 * (1024 ** 3)
 
+    config = config.branchscanner
+
     @classmethod
     def create(cls, branch):
         """See `IBranchScanJobSource`."""
@@ -327,7 +333,9 @@
 
     user_error_types = (NotBranchError,)
 
-    task_queue = 'branch_write'
+    task_queue = 'branch_write_job'
+
+    config = config.upgrade_branches
 
     def getOperationDescription(self):
         return 'upgrading a branch'
@@ -413,6 +421,8 @@
 
     class_job_type = BranchJobType.REVISION_MAIL
 
+    config = config.sendbranchmail
+
     @classmethod
     def create(cls, branch, revno, from_address, body, subject):
         """See `IRevisionMailJobSource`."""
@@ -458,6 +468,8 @@
 
     class_job_type = BranchJobType.REVISIONS_ADDED_MAIL
 
+    config = config.sendbranchmail
+
     @classmethod
     def create(cls, branch, last_scanned_id, last_revision_id,
                from_address):
@@ -520,15 +532,13 @@
         if not subscriptions:
             return
 
-        self.bzr_branch.lock_read()
-        try:
+        with server(get_ro_server(), no_replace=True), \
+                read_locked(self.bzr_branch):
             for revision, revno in self.iterAddedMainline():
                 assert revno is not None
                 mailer = self.getMailerForRevision(
                     revision, revno, self.generateDiffs())
                 mailer.sendAll()
-        finally:
-            self.bzr_branch.unlock()
 
     def getDiffForRevisions(self, from_revision_id, to_revision_id):
         """Generate the diff between from_revision_id and to_revision_id."""
@@ -718,6 +728,8 @@
 
     class_job_type = BranchJobType.ROSETTA_UPLOAD
 
+    config = config.rosettabranches
+
     def __init__(self, branch_job):
         super(RosettaUploadJob, self).__init__(branch_job)
 
@@ -763,7 +775,9 @@
                                        force_translations_upload)
             branch_job = BranchJob(
                 branch, BranchJobType.ROSETTA_UPLOAD, metadata)
-            return cls(branch_job)
+            job = cls(branch_job)
+            job.celeryRunOnCommit()
+            return job
         else:
             return None
 
@@ -889,27 +903,28 @@
 
     def run(self):
         """See `IRosettaUploadJob`."""
-        # This is not called upon job creation because the branch would
-        # neither have been mirrored nor scanned then.
-        self._init_translation_file_lists()
-        # Get the product series that are connected to this branch and
-        # that want to upload translations.
-        productseriesset = getUtility(IProductSeriesSet)
-        productseries = productseriesset.findByTranslationsImportBranch(
-            self.branch, self.force_translations_upload)
-        translation_import_queue = getUtility(ITranslationImportQueue)
-        for series in productseries:
-            approver = TranslationBranchApprover(self.file_names,
-                                                 productseries=series)
-            for iter_info in self._iter_lists_and_uploaders(series):
-                file_names, changed_files, uploader = iter_info
-                for upload_file_name, upload_file_content in changed_files:
-                    if len(upload_file_content) == 0:
-                        continue  # Skip empty files
-                    entry = translation_import_queue.addOrUpdateEntry(
-                        upload_file_name, upload_file_content,
-                        True, uploader, productseries=series)
-                    approver.approve(entry)
+        with server(get_ro_server(), no_replace=True):
+            # This is not called upon job creation because the branch would
+            # neither have been mirrored nor scanned then.
+            self._init_translation_file_lists()
+            # Get the product series that are connected to this branch and
+            # that want to upload translations.
+            productseriesset = getUtility(IProductSeriesSet)
+            productseries = productseriesset.findByTranslationsImportBranch(
+                self.branch, self.force_translations_upload)
+            translation_import_queue = getUtility(ITranslationImportQueue)
+            for series in productseries:
+                approver = TranslationBranchApprover(self.file_names,
+                                                     productseries=series)
+                for iter_info in self._iter_lists_and_uploaders(series):
+                    file_names, changed_files, uploader = iter_info
+                    for upload_file_name, upload_file_content in changed_files:
+                        if len(upload_file_content) == 0:
+                            continue  # Skip empty files
+                        entry = translation_import_queue.addOrUpdateEntry(
+                            upload_file_name, upload_file_content,
+                            True, uploader, productseries=series)
+                        approver.approve(entry)
 
     @staticmethod
     def iterReady():
@@ -949,7 +964,9 @@
 
     class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE
 
-    task_queue = 'branch_write'
+    task_queue = 'branch_write_job'
+
+    config = config.reclaimbranchspace
 
     def __repr__(self):
         return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % {

=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2012-04-04 13:16:03 +0000
+++ lib/lp/code/model/tests/test_branch.py	2012-04-09 19:51:32 +0000
@@ -316,8 +316,8 @@
         # lp.services.job.celeryconfig is loaded.
         from celery.exceptions import TimeoutError
         self.useFixture(FeatureFixture({
-            'jobs.celery.enabled_classses': 'BranchScanJob'}))
-        with celeryd('standard') as proc:
+            'jobs.celery.enabled_classes': 'BranchScanJob'}))
+        with celeryd('job') as proc:
             self.useBzrBranches()
             db_branch, bzr_tree = self.create_branch_and_tree()
             bzr_tree.commit(
@@ -348,8 +348,8 @@
         """Calling destroySelf causes Celery to delete the branch."""
         from celery.exceptions import TimeoutError
         self.useFixture(FeatureFixture({
-            'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'}))
-        with celeryd('branch_write'):
+            'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'}))
+        with celeryd('branch_write_job'):
             self.useBzrBranches()
             db_branch, tree = self.create_branch_and_tree()
             branch_path = get_real_branch_path(db_branch.id)
@@ -818,7 +818,7 @@
 
     def test_requestUpgradeUsesCelery(self):
         self.useFixture(FeatureFixture({
-            'jobs.celery.enabled_classses': 'BranchUpgradeJob'}))
+            'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
         cwd = os.getcwd()
         self.useBzrBranches()
         db_branch, tree = create_knit(self)
@@ -829,7 +829,7 @@
         db_branch.requestUpgrade(db_branch.owner)
         with monitor_celery() as responses:
             transaction.commit()
-            with celeryd('branch_write', cwd):
+            with celeryd('branch_write_job', cwd):
                 responses[-1].wait(30)
         new_branch = Branch.open(tree.branch.base)
         self.assertEqual(

=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py	2012-04-02 18:49:44 +0000
+++ lib/lp/code/model/tests/test_branchjob.py	2012-04-09 19:51:32 +0000
@@ -60,6 +60,7 @@
     RosettaUploadJob,
     )
 from lp.code.model.branchrevision import BranchRevision
+from lp.code.model.directbranchcommit import DirectBranchCommit
 from lp.code.model.tests.test_branch import create_knit
 from lp.code.model.revision import RevisionSet
 from lp.codehosting.vfs import branch_id_to_path
@@ -67,18 +68,27 @@
 from lp.services.config import config
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.lpstorm import IMasterStore
+from lp.services.features.testing import FeatureFixture
 from lp.services.identity.interfaces.emailaddress import EmailAddressStatus
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import Job
 from lp.services.job.runner import JobRunner
+from lp.services.job.tests import (
+    celeryd,
+    monitor_celery,
+    )
 from lp.services.osutils import override_environ
 from lp.services.webapp import canonical_url
-from lp.testing import TestCaseWithFactory
+from lp.testing import (
+    person_logged_in,
+    TestCaseWithFactory,
+    )
 from lp.testing.dbuser import (
     dbuser,
     switch_dbuser,
     )
 from lp.testing.layers import (
+    AppServerLayer,
     DatabaseFunctionalLayer,
     LaunchpadZopelessLayer,
     )
@@ -1222,6 +1232,41 @@
         self.assertEqual([], unfinished_jobs)
 
 
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = AppServerLayer
+
+    def test_RosettaUploadJob(self):
+        """Ensure RosettaUploadJob can run under Celery."""
+        self.useContext(celeryd('job'))
+        self.useBzrBranches(direct_database=True)
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob'
+        }))
+        db_branch = self.factory.makeAnyBranch()
+        self.createBzrBranch(db_branch)
+        commit = DirectBranchCommit(db_branch, no_race_check=True)
+        commit.writeFile('foo.pot', 'gibberish')
+        with monitor_celery() as responses:
+            with person_logged_in(db_branch.owner):
+                commit.commit('message')
+                transaction.commit()
+                # Wait for branch scan to complete.
+                responses[0].wait(30)
+                series = self.factory.makeProductSeries(branch=db_branch)
+                RosettaUploadJob.create(
+                    commit.db_branch, NULL_REVISION,
+                    force_translations_upload=True)
+                transaction.commit()
+        # Wait for RosettaUploadJob to complete
+        responses[1].wait(30)
+        queue = getUtility(ITranslationImportQueue)
+        entries = list(queue.getAllEntries(target=series))
+        self.assertEqual(len(entries), 1)
+        entry = entries[0]
+        self.assertEqual('foo.pot', entry.path)
+
+
 class TestReclaimBranchSpaceJob(TestCaseWithFactory):
 
     layer = LaunchpadZopelessLayer

=== modified file 'lib/lp/codehosting/scanner/email.py'
--- lib/lp/codehosting/scanner/email.py	2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/email.py	2012-04-09 19:51:32 +0000
@@ -46,10 +46,11 @@
     # No diff is associated with the removed email.
     subject = "[Branch %s] %s removed" % (
         revisions_removed.db_branch.unique_name, count)
-    getUtility(IRevisionMailJobSource).create(
+    job = getUtility(IRevisionMailJobSource).create(
         revisions_removed.db_branch, revno='removed',
         from_address=config.canonical.noreply_from_address,
         body=contents, subject=subject)
+    job.celeryRunOnCommit()
 
 
 def queue_tip_changed_email_jobs(tip_changed):
@@ -66,11 +67,12 @@
                    revisions)
         subject = "[Branch %s] %s" % (
             tip_changed.db_branch.unique_name, revisions)
-        getUtility(IRevisionMailJobSource).create(
+        job = getUtility(IRevisionMailJobSource).create(
             tip_changed.db_branch, 'initial',
             config.canonical.noreply_from_address, message, subject)
     else:
-        getUtility(IRevisionsAddedJobSource).create(
+        job = getUtility(IRevisionsAddedJobSource).create(
             tip_changed.db_branch, tip_changed.db_branch.last_scanned_id,
             tip_changed.bzr_branch.last_revision(),
             config.canonical.noreply_from_address)
+    job.celeryRunOnCommit()

=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py	2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py	2012-04-09 19:51:32 +0000
@@ -7,6 +7,7 @@
 
 import email
 
+from bzrlib.uncommit import uncommit
 from zope.component import getUtility
 from zope.event import notify
 
@@ -20,13 +21,34 @@
     IRevisionsAddedJobSource,
     )
 from lp.code.model.branchjob import RevisionMailJob
+from lp.codehosting.scanner.bzrsync import BzrSync
 from lp.codehosting.scanner import events
 from lp.codehosting.scanner.tests.test_bzrsync import BzrSyncTestCase
 from lp.registry.interfaces.person import IPersonSet
+from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
 from lp.services.job.runner import JobRunner
 from lp.services.mail import stub
+from lp.services.job.tests import (
+    celeryd,
+    monitor_celery,
+    )
 from lp.testing import TestCaseWithFactory
-from lp.testing.layers import LaunchpadZopelessLayer
+from lp.testing.dbuser import switch_dbuser
+from lp.testing.layers import (
+    LaunchpadZopelessLayer,
+    ZopelessAppServerLayer,
+    )
+
+
+def add_subscriber(branch):
+    test_user = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
+    branch.subscribe(
+        test_user,
+        BranchSubscriptionNotificationLevel.FULL,
+        BranchSubscriptionDiffSize.FIVEKLINES,
+        CodeReviewNotificationLevel.NOEMAIL,
+        test_user)
 
 
 class TestBzrSyncEmail(BzrSyncTestCase):
@@ -39,13 +61,7 @@
     def makeDatabaseBranch(self):
         branch = BzrSyncTestCase.makeDatabaseBranch(self)
         LaunchpadZopelessLayer.txn.begin()
-        test_user = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
-        branch.subscribe(
-            test_user,
-            BranchSubscriptionNotificationLevel.FULL,
-            BranchSubscriptionDiffSize.FIVEKLINES,
-            CodeReviewNotificationLevel.NOEMAIL,
-            test_user)
+        add_subscriber(branch)
         LaunchpadZopelessLayer.txn.commit()
         return branch
 
@@ -130,12 +146,12 @@
         recommit_email_msg = email.message_from_string(recommit_email[2])
         recommit_email_body = recommit_email_msg.get_payload()[0].get_payload(
             decode=True)
-        subject =  '[Branch %s] Rev 1: second' % self.db_branch.unique_name
+        subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name
         self.assertEmailHeadersEqual(subject, recommit_email_msg['Subject'])
         body_bits = [
             'revno: 1',
             'committer: %s' % author,
-            'branch nick: %s'  % self.bzr_branch.nick,
+            'branch nick: %s' % self.bzr_branch.nick,
             'message:\n  second',
             'added:\n  hello.txt',
             ]
@@ -143,6 +159,63 @@
             self.assertTextIn(bit, recommit_email_body)
 
 
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = ZopelessAppServerLayer
+
+    @staticmethod
+    def pop_notifications():
+        from lp.services.job.tests.celery_helpers import pop_notifications
+        return pop_notifications.delay().get(30)
+
+    def prepare(self, job_name):
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': job_name}))
+        self.useContext(celeryd('job'))
+        self.useBzrBranches(direct_database=True)
+        db_branch, tree = self.create_branch_and_tree()
+        add_subscriber(db_branch)
+        switch_dbuser(config.branchscanner.dbuser)
+        # Needed for feature flag teardown
+        self.addCleanup(switch_dbuser, config.launchpad.dbuser)
+        return db_branch, tree
+
+    def test_empty_branch(self):
+        """RevisionMailJob for empty branches runs via Celery."""
+        db_branch, tree = self.prepare('RevisionMailJob')
+        with monitor_celery() as responses:
+            BzrSync(db_branch).syncBranchAndClose(tree.branch)
+        responses[-1].wait(30)
+        self.assertEqual(1, len(self.pop_notifications()))
+
+    def test_uncommit_branch(self):
+        """RevisionMailJob for removed revisions runs via Celery."""
+        db_branch, tree = self.prepare('RevisionMailJob')
+        tree.commit('message')
+        bzr_sync = BzrSync(db_branch)
+        with monitor_celery() as responses:
+            bzr_sync.syncBranchAndClose(tree.branch)
+            responses[0].wait(30)
+            self.pop_notifications()
+            uncommit(tree.branch)
+            bzr_sync.syncBranchAndClose(tree.branch)
+        responses[1].wait(30)
+        self.assertEqual(1, len(self.pop_notifications()))
+
+    def test_revisions_added(self):
+        """RevisionMailJob for removed revisions runs via Celery."""
+        db_branch, tree = self.prepare('RevisionsAddedJob')
+        tree.commit('message')
+        bzr_sync = BzrSync(db_branch)
+        bzr_sync.syncBranchAndClose(tree.branch)
+        self.pop_notifications()
+        tree.commit('message2')
+        with monitor_celery() as responses:
+            bzr_sync.syncBranchAndClose(tree.branch)
+            responses[-1].wait(30)
+            self.assertEqual(1, len(self.pop_notifications()))
+
+
 class TestScanBranches(TestCaseWithFactory):
 
     layer = LaunchpadZopelessLayer

=== modified file 'lib/lp/services/features/flags.py'
--- lib/lp/services/features/flags.py	2012-04-05 13:05:04 +0000
+++ lib/lp/services/features/flags.py	2012-04-09 19:51:32 +0000
@@ -125,7 +125,7 @@
      '',
      '',
      ''),
-    ('jobs.celery.enabled_classses',
+    ('jobs.celery.enabled_classes',
      'space delimited',
      'Names of Job classes that should be run via celery',
      'No jobs run via celery',

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-04-04 15:15:09 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-04-09 19:51:32 +0000
@@ -8,9 +8,9 @@
 CELERY_IMPORTS = ("lp.services.job.celeryjob", )
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_QUEUES = {
-    "branch_write": {"binding_key": "branch_write"},
-    "standard": {"binding_key": "standard"},
+    "branch_write_job": {"binding_key": "branch_write_job"},
+    "job": {"binding_key": "job"},
 }
-CELERY_DEFAULT_EXCHANGE = "standard"
-CELERY_DEFAULT_QUEUE = "standard"
+CELERY_DEFAULT_EXCHANGE = "job"
+CELERY_DEFAULT_QUEUE = "job"
 CELERY_CREATE_MISSING_QUEUES = False

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-03-21 20:39:49 +0000
+++ lib/lp/services/job/model/job.py	2012-04-09 19:51:32 +0000
@@ -33,7 +33,9 @@
     Int,
     Reference,
     )
+from storm.zope.interfaces import IZStorm
 import transaction
+from zope.component import getUtility
 from zope.interface import implements
 
 from lp.services.config import dbconfig
@@ -253,17 +255,33 @@
 
     needs_init = True
 
-    @classmethod
-    def get(cls, job_id):
-        if cls.needs_init:
-            scripts.execute_zcml_for_scripts(use_web_security=False)
-            cls.needs_init = False
-
-        dbconfig.override(
-            dbuser='branchscanner', isolation_level='read_committed')
+    @staticmethod
+    def getDerived(job_id):
+        """Return the derived branch job associated with the job id."""
         from lp.code.model.branchjob import (
             BranchJob,
             )
         store = IStore(BranchJob)
         branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
-        return branch_job.makeDerived()
+        if branch_job is None:
+            raise ValueError('No BranchJob with job=%s.' % job_id)
+        return branch_job.makeDerived(), store
+
+    @classmethod
+    def switchDBUser(cls, job_id):
+        """Switch to the DB user associated with this Job ID."""
+        derived, store = cls.getDerived(job_id)
+        dbconfig.override(
+            dbuser=derived.config.dbuser, isolation_level='read_committed')
+        transaction.abort()
+        getUtility(IZStorm).remove(store)
+        store.close()
+
+    @classmethod
+    def get(cls, job_id):
+        transaction.abort()
+        if cls.needs_init:
+            scripts.execute_zcml_for_scripts(use_web_security=False)
+            cls.needs_init = False
+        cls.switchDBUser(job_id)
+        return cls.getDerived(job_id)[0]

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-04-03 18:58:11 +0000
+++ lib/lp/services/job/runner.py	2012-04-09 19:51:32 +0000
@@ -103,7 +103,7 @@
 
     retry_error_types = ()
 
-    task_queue = 'standard'
+    task_queue = 'job'
 
     celery_responses = None
 
@@ -190,23 +190,22 @@
         return oops_config.create(
             context=dict(exc_info=info))
 
-    def runViaCelery(self):
+    def runViaCelery(self, ignore_result=False):
         """Request that this job be run via celery."""
         # Avoid importing from lp.services.job.celeryjob where not needed, to
         # avoid configuring Celery when Rabbit is not configured.
         from lp.services.job.celeryjob import CeleryRunJob
-        ignore_result = bool(BaseRunnableJob.celery_responses is None)
-        response = CeleryRunJob.apply_async(
+        return CeleryRunJob.apply_async(
             (self.job_id,), queue=self.task_queue,
             ignore_result=ignore_result)
-        if not ignore_result:
-            BaseRunnableJob.celery_responses.append(response)
-        return response
 
     def celeryCommitHook(self, succeeded):
         """Hook function to call when a commit completes."""
         if succeeded:
-            self.runViaCelery()
+            ignore_result = bool(BaseRunnableJob.celery_responses is None)
+            response = self.runViaCelery(ignore_result)
+            if not ignore_result:
+                BaseRunnableJob.celery_responses.append(response)
 
     def celeryRunOnCommit(self):
         """Configure transaction so that commit runs this job via Celery."""
@@ -623,7 +622,7 @@
 
     The name of a BaseRunnableJob must be specified.
     """
-    flag = getFeatureFlag('jobs.celery.enabled_classses')
+    flag = getFeatureFlag('jobs.celery.enabled_classes')
     if flag is None:
         return False
     return class_name in flag.split(' ')

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2012-04-04 15:15:09 +0000
+++ lib/lp/services/job/tests/__init__.py	2012-04-09 19:51:32 +0000
@@ -32,6 +32,7 @@
         '--concurrency', '1',
         '--loglevel', 'INFO',
         '--queues', queue,
+        '--include', 'lp.services.job.tests.celery_helpers',
     )
     with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
         # Wait for celeryd startup to complete.

=== added file 'lib/lp/services/job/tests/celery_helpers.py'
--- lib/lp/services/job/tests/celery_helpers.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/celery_helpers.py	2012-04-09 19:51:32 +0000
@@ -0,0 +1,19 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = ['pop_notifications']
+
+# Force the correct celeryconfig to be used.
+import lp.services.job.celeryjob
+# Quiet lint unused import warning.
+lp.services.job.celeryjob
+
+from celery.task import task
+
+
+@task
+def pop_notifications():
+    from lp.testing.mail_helpers import pop_notifications
+    return pop_notifications()

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2012-04-03 16:06:43 +0000
+++ lib/lp/services/job/tests/test_runner.py	2012-04-09 19:51:32 +0000
@@ -720,19 +720,19 @@
     def test_matching_flag(self):
         """A matching flag returns True."""
         self.useFixture(FeatureFixture(
-            {'jobs.celery.enabled_classses': 'foo bar'}))
+            {'jobs.celery.enabled_classes': 'foo bar'}))
         self.assertTrue(celery_enabled('foo'))
         self.assertTrue(celery_enabled('bar'))
 
     def test_non_matching_flag(self):
         """A non-matching flag returns false."""
         self.useFixture(FeatureFixture(
-            {'jobs.celery.enabled_classses': 'foo bar'}))
+            {'jobs.celery.enabled_classes': 'foo bar'}))
         self.assertFalse(celery_enabled('baz'))
         self.assertTrue(celery_enabled('bar'))
 
     def test_substring(self):
         """A substring of an enabled class does not match."""
         self.useFixture(FeatureFixture(
-            {'jobs.celery.enabled_classses': 'foobar'}))
+            {'jobs.celery.enabled_classes': 'foobar'}))
         self.assertFalse(celery_enabled('bar'))