← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-2 as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-3/+merge/101626

= Summary =
Implement Celery support for BranchMergeProposalJobs

== Proposed fix ==
Add BranchMergeProposalJob support to UniversalJobSource, add celeryRunOnCommit to BranchMergeProposalJob.create, and update the Jobs

== Pre-implementation notes ==
None

== Implementation details ==
All jobs specify a config.

Some jobs send mail, so I extracted pop_remote_notifications from test_email.

Jobs which need access to branches are updated to use server(no_replace=True), and the corresponding functionality is removed from contextManager().

BranchMergeProposalJobDerived now uses EnumeratedSubclass as its metaclass, to match BranchJobDerived.  This makes BranchMergeProposalJobFactory redundant, so it is removed.

Some of BranchMergeProposalJobDerived.create is extracted to _create, so that subclasses can reuse it.

The contents of ReviewRequestedEmail mail was untested, so I added a test.

It turns out that apply_async(ignore_result=True) does not work as I expected, so I've added CeleryRunJobIgnoreResult to support it.

== Tests ==
bin/test -t TestViaCelery test_branchmergeproposaljobs

== Demo and Q/A ==
None


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/codehosting/scanner/tests/test_mergedetection.py
  lib/lp/services/job/runner.py
  lib/lp/code/model/tests/test_branchmergeproposaljobs.py
  lib/lp/code/model/branchmergeproposal.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/codehosting/scanner/tests/test_email.py
  lib/lp/services/job/tests/test_job.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/branchmergeproposaljob.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/codehosting/scanner/email.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-3/+merge/101626
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad.
=== modified file 'lib/lp/code/model/branchmergeproposal.py'
--- lib/lp/code/model/branchmergeproposal.py	2012-02-28 04:24:19 +0000
+++ lib/lp/code/model/branchmergeproposal.py	2012-04-11 18:49:25 +0000
@@ -208,8 +208,7 @@
     def next_preview_diff_job(self):
         # circular dependencies
         from lp.code.model.branchmergeproposaljob import (
-            BranchMergeProposalJob, BranchMergeProposalJobFactory,
-            BranchMergeProposalJobType)
+            BranchMergeProposalJob, BranchMergeProposalJobType)
         jobs = Store.of(self).find(
             BranchMergeProposalJob,
             BranchMergeProposalJob.branch_merge_proposal == self,
@@ -219,7 +218,7 @@
             Job._status.is_in([JobStatus.WAITING, JobStatus.RUNNING]))
         job = jobs.order_by(Job.scheduled_start, Job.date_created).first()
         if job is not None:
-            return BranchMergeProposalJobFactory.create(job)
+            return job.makeDerived()
         else:
             return None
 

=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py	2011-12-30 06:14:56 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py	2012-04-11 18:49:25 +0000
@@ -14,7 +14,6 @@
 
 __all__ = [
     'BranchMergeProposalJob',
-    'BranchMergeProposalJobFactory',
     'BranchMergeProposalJobSource',
     'BranchMergeProposalJobType',
     'CodeReviewCommentEmailJob',
@@ -89,6 +88,7 @@
 from lp.code.mail.codereviewcomment import CodeReviewCommentMailer
 from lp.code.model.branchmergeproposal import BranchMergeProposal
 from lp.code.model.diff import PreviewDiff
+from lp.codehosting.bzrutils import server
 from lp.codehosting.vfs import (
     get_ro_server,
     get_rw_server,
@@ -98,7 +98,10 @@
 from lp.services.database.enumcol import EnumCol
 from lp.services.database.stormbase import StormBase
 from lp.services.job.interfaces.job import JobStatus
-from lp.services.job.model.job import Job
+from lp.services.job.model.job import (
+    EnumeratedSubclass,
+    Job,
+    )
 from lp.services.job.runner import (
     BaseRunnableJob,
     BaseRunnableJobSource,
@@ -236,10 +239,15 @@
                 'No occurrence of %s has key %s' % (klass.__name__, key))
         return instance
 
+    def makeDerived(self):
+        return BranchMergeProposalJobDerived.makeSubclass(self)
+
 
 class BranchMergeProposalJobDerived(BaseRunnableJob):
-
     """Intermediate class for deriving from BranchMergeProposalJob."""
+
+    __metaclass__ = EnumeratedSubclass
+
     delegates(IBranchMergeProposalJob)
 
     def __init__(self, job):
@@ -256,9 +264,15 @@
     @classmethod
     def create(cls, bmp):
         """See `IMergeProposalCreationJob`."""
-        job = BranchMergeProposalJob(
-            bmp, cls.class_job_type, {})
-        return cls(job)
+        return cls._create(bmp, {})
+
+    @classmethod
+    def _create(cls, bmp, metadata):
+        base_job = BranchMergeProposalJob(
+            bmp, cls.class_job_type, metadata)
+        job = cls(base_job)
+        job.celeryRunOnCommit()
+        return job
 
     @classmethod
     def get(cls, job_id):
@@ -317,6 +331,8 @@
 
     class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW
 
+    config = config.merge_proposal_jobs
+
     def run(self):
         """See `IMergeProposalNeedsReviewEmailJob`."""
         mailer = BMPMailer.forCreation(
@@ -344,6 +360,8 @@
 
     class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF
 
+    config = config.merge_proposal_jobs
+
     user_error_types = (UpdatePreviewDiffNotReady, )
 
     retry_error_types = (BranchHasPendingWrites, )
@@ -369,8 +387,9 @@
     def run(self):
         """See `IRunnableJob`."""
         self.checkReady()
-        preview = PreviewDiff.fromBranchMergeProposal(
-            self.branch_merge_proposal)
+        with server(get_ro_server(), no_replace=True):
+            preview = PreviewDiff.fromBranchMergeProposal(
+                self.branch_merge_proposal)
         with BranchMergeProposalDelta.monitor(
             self.branch_merge_proposal):
             self.branch_merge_proposal.preview_diff = preview
@@ -473,6 +492,8 @@
 
     class_job_type = BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL
 
+    config = config.merge_proposal_jobs
+
     def run(self):
         """See `IRunnableJob`."""
         mailer = CodeReviewCommentMailer.forCreation(self.code_review_comment)
@@ -483,8 +504,7 @@
         """See `ICodeReviewCommentEmailJobSource`."""
         metadata = cls.getMetadata(code_review_comment)
         bmp = code_review_comment.branch_merge_proposal
-        job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
-        return cls(job)
+        return cls._create(bmp, metadata)
 
     @staticmethod
     def getMetadata(code_review_comment):
@@ -525,6 +545,8 @@
 
     class_job_type = BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL
 
+    config = config.merge_proposal_jobs
+
     def run(self):
         """See `IRunnableJob`."""
         reason = RecipientReason.forReviewer(
@@ -538,8 +560,7 @@
         """See `IReviewRequestedEmailJobSource`."""
         metadata = cls.getMetadata(review_request)
         bmp = review_request.branch_merge_proposal
-        job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
-        return cls(job)
+        return cls._create(bmp, metadata)
 
     @staticmethod
     def getMetadata(review_request):
@@ -591,6 +612,8 @@
 
     class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED
 
+    config = config.merge_proposal_jobs
+
     def run(self):
         """See `IRunnableJob`."""
         mailer = BMPMailer.forModification(
@@ -601,9 +624,7 @@
     def create(cls, merge_proposal, delta_text, editor):
         """See `IReviewRequestedEmailJobSource`."""
         metadata = cls.getMetadata(delta_text, editor)
-        job = BranchMergeProposalJob(
-            merge_proposal, cls.class_job_type, metadata)
-        return cls(job)
+        return cls._create(merge_proposal, metadata)
 
     @staticmethod
     def getMetadata(delta_text, editor):
@@ -658,6 +679,8 @@
 
     class_job_type = BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF
 
+    config = config.merge_proposal_jobs
+
     def acquireLease(self, duration=600):
         return self.job.acquireLease(duration)
 
@@ -665,15 +688,14 @@
         revision_set = getUtility(IRevisionSet)
         old_revision = revision_set.getByRevisionId(self.old_revision_id)
         new_revision = revision_set.getByRevisionId(self.new_revision_id)
-        self.branch_merge_proposal.generateIncrementalDiff(
-            old_revision, new_revision)
+        with server(get_ro_server(), no_replace=True):
+            self.branch_merge_proposal.generateIncrementalDiff(
+                old_revision, new_revision)
 
     @classmethod
     def create(cls, merge_proposal, old_revision_id, new_revision_id):
         metadata = cls.getMetadata(old_revision_id, new_revision_id)
-        job = BranchMergeProposalJob(
-            merge_proposal, cls.class_job_type, metadata)
-        return cls(job)
+        return cls._create(merge_proposal, metadata)
 
     @staticmethod
     def getMetadata(old_revision_id, new_revision_id):
@@ -710,31 +732,6 @@
         return format_address_for_person(registrant)
 
 
-class BranchMergeProposalJobFactory:
-    """Construct a derived merge proposal job for a BranchMergeProposalJob."""
-
-    job_classes = {
-        BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW:
-            MergeProposalNeedsReviewEmailJob,
-        BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF:
-            UpdatePreviewDiffJob,
-        BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL:
-            CodeReviewCommentEmailJob,
-        BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL:
-            ReviewRequestedEmailJob,
-        BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED:
-            MergeProposalUpdatedEmailJob,
-        BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF:
-            GenerateIncrementalDiffJob,
-        }
-
-    @classmethod
-    def create(cls, bmp_job):
-        """Create the derived job for the bmp_job's job type."""
-        job_class = cls.job_classes[bmp_job.job_type]
-        return job_class(bmp_job)
-
-
 class BranchMergeProposalJobSource(BaseRunnableJobSource):
     """Provide a job source for all merge proposal jobs.
 
@@ -748,10 +745,6 @@
     def contextManager():
         """See `IJobSource`."""
         errorlog.globalErrorUtility.configure('merge_proposal_jobs')
-        server = get_ro_server()
-        server.start_server()
-        yield
-        server.stop_server()
 
     @staticmethod
     def get(job_id):
@@ -763,7 +756,7 @@
             or its job_type does not match the desired subclass.
         """
         job = BranchMergeProposalJob.get(job_id)
-        return BranchMergeProposalJobFactory.create(job)
+        return job.makeDerived()
 
     @staticmethod
     def iterReady(job_type=None):
@@ -802,7 +795,7 @@
             # If the job is running, then skip it
             if job.status == JobStatus.RUNNING:
                 continue
-            derived_job = BranchMergeProposalJobFactory.create(bmp_job)
+            derived_job = bmp_job.makeDerived()
             # If the job is an update preview diff, then check that it is
             # ready.
             if IUpdatePreviewDiffJob.providedBy(derived_job):

=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2012-01-20 15:42:44 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2012-04-11 18:49:25 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for branch merge proposal jobs."""
@@ -55,9 +55,15 @@
     )
 from lp.code.subscribers.branchmergeproposal import merge_proposal_modified
 from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import Job
 from lp.services.job.runner import JobRunner
+from lp.services.job.tests import (
+    celeryd,
+    monitor_celery,
+    pop_remote_notifications,
+    )
 from lp.services.osutils import override_environ
 from lp.services.webapp.testing import verifyObject
 from lp.testing import (
@@ -65,7 +71,10 @@
     TestCaseWithFactory,
     )
 from lp.testing.dbuser import dbuser
-from lp.testing.layers import LaunchpadZopelessLayer
+from lp.testing.layers import (
+    AppServerLayer,
+    LaunchpadZopelessLayer,
+    )
 from lp.testing.mail_helpers import pop_notifications
 
 
@@ -549,6 +558,15 @@
             'emailing a reviewer requesting a review',
             job.getOperationDescription())
 
+    def test_run_sends_mail(self):
+        request = self.factory.makeCodeReviewVoteReference()
+        job = ReviewRequestedEmailJob.create(request)
+        job.run()
+        (notification,) = pop_notifications()
+        self.assertIn(
+            'You have been requested to review the proposed merge',
+            notification.get_payload(decode=True))
+
 
 class TestMergeProposalUpdatedEmailJob(TestCaseWithFactory):
 
@@ -573,3 +591,83 @@
         self.assertEqual(
             'emailing subscribers about merge proposal changes',
             job.getOperationDescription())
+
+
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = AppServerLayer
+
+    def test_MergeProposalNeedsReviewEmailJob(self):
+        """MergeProposalNeedsReviewEmailJob runs under Celery."""
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes':
+             'MergeProposalNeedsReviewEmailJob'}))
+        self.useContext(celeryd('job'))
+        bmp = self.factory.makeBranchMergeProposal()
+        with monitor_celery() as responses:
+            MergeProposalNeedsReviewEmailJob.create(bmp)
+            transaction.commit()
+        responses[0].wait(30)
+        self.assertEqual(2, len(pop_remote_notifications()))
+
+    def test_UpdatePreviewDiffJob(self):
+        """UpdatePreviewDiffJob runs under Celery."""
+        self.useContext(celeryd('job'))
+        self.useBzrBranches(direct_database=True)
+        bmp = create_example_merge(self)[0]
+        self.factory.makeRevisionsForBranch(bmp.source_branch, count=1)
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': 'UpdatePreviewDiffJob'}))
+        with monitor_celery() as responses:
+            UpdatePreviewDiffJob.create(bmp)
+            transaction.commit()
+            responses[0].wait(30)
+        self.assertIsNot(None, bmp.preview_diff)
+
+    def test_CodeReviewCommentEmailJob(self):
+        """CodeReviewCommentEmailJob runs under Celery."""
+        comment = self.factory.makeCodeReviewComment()
+        self.useContext(celeryd('job'))
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': 'CodeReviewCommentEmailJob'}))
+        with monitor_celery() as responses:
+            CodeReviewCommentEmailJob.create(comment)
+            transaction.commit()
+        responses[0].wait(30)
+        self.assertEqual(2, len(pop_remote_notifications()))
+
+    def test_ReviewRequestedEmailJob(self):
+        """ReviewRequestedEmailJob runs under Celery."""
+        request = self.factory.makeCodeReviewVoteReference()
+        self.useContext(celeryd('job'))
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': 'ReviewRequestedEmailJob'}))
+        with monitor_celery() as responses:
+            ReviewRequestedEmailJob.create(request)
+            transaction.commit()
+        responses[0].wait(30)
+        self.assertEqual(1, len(pop_remote_notifications()))
+
+    def test_MergeProposalUpdatedEmailJob(self):
+        """MergeProposalUpdatedEmailJob runs under Celery."""
+        bmp = self.factory.makeBranchMergeProposal()
+        self.useContext(celeryd('job'))
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': 'MergeProposalUpdatedEmailJob'}))
+        with monitor_celery() as responses:
+            MergeProposalUpdatedEmailJob.create(
+                bmp, 'change', bmp.registrant)
+            transaction.commit()
+        responses[0].wait(30)
+        self.assertEqual(2, len(pop_remote_notifications()))
+
+    def test_GenerateIncrementalDiffJob(self):
+        """GenerateIncrementalDiffJob runs under Celery."""
+        self.useContext(celeryd('job'))
+        self.useFixture(FeatureFixture(
+            {'jobs.celery.enabled_classes': 'GenerateIncrementalDiffJob'}))
+        with monitor_celery() as responses:
+            job = make_runnable_incremental_diff_job(self)
+            transaction.commit()
+        responses[0].wait(30)
+        self.assertEqual(JobStatus.COMPLETED, job.status)

=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py	2012-04-09 19:02:25 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py	2012-04-11 18:49:25 +0000
@@ -32,6 +32,7 @@
 from lp.services.job.tests import (
     celeryd,
     monitor_celery,
+    pop_remote_notifications,
     )
 from lp.testing import TestCaseWithFactory
 from lp.testing.dbuser import switch_dbuser
@@ -163,11 +164,6 @@
 
     layer = ZopelessAppServerLayer
 
-    @staticmethod
-    def pop_notifications():
-        from lp.services.job.tests.celery_helpers import pop_notifications
-        return pop_notifications.delay().get(30)
-
     def prepare(self, job_name):
         self.useFixture(FeatureFixture(
             {'jobs.celery.enabled_classes': job_name}))
@@ -186,7 +182,7 @@
         with monitor_celery() as responses:
             BzrSync(db_branch).syncBranchAndClose(tree.branch)
         responses[-1].wait(30)
-        self.assertEqual(1, len(self.pop_notifications()))
+        self.assertEqual(1, len(pop_remote_notifications()))
 
     def test_uncommit_branch(self):
         """RevisionMailJob for removed revisions runs via Celery."""
@@ -196,11 +192,11 @@
         with monitor_celery() as responses:
             bzr_sync.syncBranchAndClose(tree.branch)
             responses[0].wait(30)
-            self.pop_notifications()
+            pop_remote_notifications()
             uncommit(tree.branch)
             bzr_sync.syncBranchAndClose(tree.branch)
         responses[1].wait(30)
-        self.assertEqual(1, len(self.pop_notifications()))
+        self.assertEqual(1, len(pop_remote_notifications()))
 
     def test_revisions_added(self):
         """RevisionMailJob for removed revisions runs via Celery."""
@@ -208,12 +204,12 @@
         tree.commit('message')
         bzr_sync = BzrSync(db_branch)
         bzr_sync.syncBranchAndClose(tree.branch)
-        self.pop_notifications()
+        pop_remote_notifications()
         tree.commit('message2')
         with monitor_celery() as responses:
             bzr_sync.syncBranchAndClose(tree.branch)
             responses[-1].wait(30)
-            self.assertEqual(1, len(self.pop_notifications()))
+            self.assertEqual(1, len(pop_remote_notifications()))
 
 
 class TestScanBranches(TestCaseWithFactory):

=== modified file 'lib/lp/codehosting/scanner/tests/test_mergedetection.py'
--- lib/lp/codehosting/scanner/tests/test_mergedetection.py	2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/tests/test_mergedetection.py	2012-04-11 18:49:25 +0000
@@ -19,7 +19,6 @@
 from lp.code.interfaces.branchlookup import IBranchLookup
 from lp.code.model.branchmergeproposaljob import (
     BranchMergeProposalJob,
-    BranchMergeProposalJobFactory,
     BranchMergeProposalJobType,
     )
 from lp.codehosting.scanner import (
@@ -285,7 +284,7 @@
             BranchMergeProposalJob.branch_merge_proposal == proposal,
             BranchMergeProposalJob.job_type ==
             BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED).one()
-        derived_job = BranchMergeProposalJobFactory.create(job)
+        derived_job = job.makeDerived()
         derived_job.run()
         notifications = pop_notifications()
         self.assertIn('Work in progress => Merged',

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2012-04-05 19:05:16 +0000
+++ lib/lp/services/job/celeryjob.py	2012-04-11 18:49:25 +0000
@@ -9,7 +9,10 @@
 
 __metaclass__ = type
 
-__all__ = ['CeleryRunJob']
+__all__ = [
+    'CeleryRunJob',
+    'CeleryRunJobIgnoreResult',
+    ]
 
 import os
 
@@ -28,3 +31,8 @@
     def getJobRunner(self):
         """Return a BaseJobRunner, to support customization."""
         return BaseJobRunner()
+
+
+class CeleryRunJobIgnoreResult(CeleryRunJob):
+
+    ignore_result = True

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-09 19:02:25 +0000
+++ lib/lp/services/job/model/job.py	2012-04-11 18:49:25 +0000
@@ -261,11 +261,18 @@
         from lp.code.model.branchjob import (
             BranchJob,
             )
-        store = IStore(BranchJob)
-        branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
-        if branch_job is None:
+        from lp.code.model.branchmergeproposaljob import (
+            BranchMergeProposalJob,
+            )
+        store = IStore(Job)
+        for cls in [BranchJob, BranchMergeProposalJob]:
+            base_job = store.find(cls, cls.job == job_id).one()
+            if base_job is not None:
+                break
+        if base_job is None:
             raise ValueError('No BranchJob with job=%s.' % job_id)
-        return branch_job.makeDerived(), store
+
+        return base_job.makeDerived(), store
 
     @classmethod
     def switchDBUser(cls, job_id):

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/runner.py	2012-04-11 18:49:25 +0000
@@ -194,10 +194,13 @@
         """Request that this job be run via celery."""
         # Avoid importing from lp.services.job.celeryjob where not needed, to
         # avoid configuring Celery when Rabbit is not configured.
-        from lp.services.job.celeryjob import CeleryRunJob
-        return CeleryRunJob.apply_async(
-            (self.job_id,), queue=self.task_queue,
-            ignore_result=ignore_result)
+        from lp.services.job.celeryjob import (
+            CeleryRunJob, CeleryRunJobIgnoreResult)
+        if ignore_result:
+            cls = CeleryRunJobIgnoreResult
+        else:
+            cls = CeleryRunJob
+        return cls.apply_async((self.job_id,), queue=self.task_queue)
 
     def celeryCommitHook(self, succeeded):
         """Hook function to call when a commit completes."""

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/tests/__init__.py	2012-04-11 18:49:25 +0000
@@ -5,7 +5,8 @@
 
 __all__ = [
     'celeryd',
-    'monitor_celery'
+    'monitor_celery',
+    'pop_remote_notifications',
     ]
 
 
@@ -49,3 +50,9 @@
         yield responses
     finally:
         BaseRunnableJob.celery_responses = old_responses
+
+
+def pop_remote_notifications():
+    """Pop the notifications from a celeryd worker."""
+    from lp.services.job.tests.celery_helpers import pop_notifications
+    return pop_notifications.delay().get(30)

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/tests/test_job.py	2012-04-11 18:49:25 +0000
@@ -10,6 +10,7 @@
 from lazr.jobrunner.jobrunner import LeaseHeld
 from storm.locals import Store
 
+from lp.code.model.branchmergeproposaljob import CodeReviewCommentEmailJob
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.lpstorm import IStore
 from lp.services.job.interfaces.job import (
@@ -19,6 +20,7 @@
 from lp.services.job.model.job import (
     InvalidTransition,
     Job,
+    UniversalJobSource,
     )
 from lp.services.webapp.testing import verifyObject
 from lp.testing import (
@@ -340,3 +342,14 @@
         job = Job()
         job.acquireLease(-300)
         self.assertEqual(0, job.getTimeout())
+
+
+class TestUniversalJobSource(TestCaseWithFactory):
+
+    layer = ZopelessDatabaseLayer
+
+    def test_getDerived_with_merge_proposal_job(self):
+        comment = self.factory.makeCodeReviewComment()
+        job = CodeReviewCommentEmailJob.create(comment)
+        newjob = UniversalJobSource.getDerived(job.job_id)[0]
+        self.assertEqual(job, newjob)