launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07085
[Merge] lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-2 as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-3/+merge/101626
= Summary =
Implement Celery support for BranchMergeProposalJobs
== Proposed fix ==
Add BranchMergeProposalJob support to UniversalJobSource, add celeryRunOnCommit to BranchMergeProposalJob.create, and update the Jobs
== Pre-implementation notes ==
None
== Implementation details ==
All jobs specify a config.
Some jobs send mail, so I extracted pop_remote_notifications from test_email.
Jobs which need access to branches are updated to use server(no_replace=True), and the corresponding functionality is removed from contextManager().
BranchMergeProposalJobDerived now uses EnumeratedSubclass as its metaclass, to match BranchJobDerived. This makes BranchMergeProposalJobFactory redundant, so it is removed.
Some of BranchMergeProposalJobDerived.create is extracted to _create, so that subclasses can reuse it.
The contents of ReviewRequestedEmail mail was untested, so I added a test.
It turns out that apply_async(ignore_result=True) does not work as I expected, so I've added CeleryRunJobIgnoreResult to support it.
== Tests ==
bin/test -t TestViaCelery test_branchmergeproposaljobs
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/codehosting/scanner/tests/test_mergedetection.py
lib/lp/services/job/runner.py
lib/lp/code/model/tests/test_branchmergeproposaljobs.py
lib/lp/code/model/branchmergeproposal.py
lib/lp/code/model/tests/test_branchjob.py
lib/lp/services/job/tests/celery_helpers.py
lib/lp/services/job/celeryjob.py
lib/lp/codehosting/scanner/tests/test_email.py
lib/lp/services/job/tests/test_job.py
lib/lp/services/features/flags.py
lib/lp/code/model/branchmergeproposaljob.py
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/model/job.py
lib/lp/services/job/tests/__init__.py
lib/lp/codehosting/scanner/email.py
lib/lp/code/model/tests/test_branch.py
lib/lp/code/model/branchjob.py
--
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-3/+merge/101626
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad.
=== modified file 'lib/lp/code/model/branchmergeproposal.py'
--- lib/lp/code/model/branchmergeproposal.py 2012-02-28 04:24:19 +0000
+++ lib/lp/code/model/branchmergeproposal.py 2012-04-11 18:49:25 +0000
@@ -208,8 +208,7 @@
def next_preview_diff_job(self):
# circular dependencies
from lp.code.model.branchmergeproposaljob import (
- BranchMergeProposalJob, BranchMergeProposalJobFactory,
- BranchMergeProposalJobType)
+ BranchMergeProposalJob, BranchMergeProposalJobType)
jobs = Store.of(self).find(
BranchMergeProposalJob,
BranchMergeProposalJob.branch_merge_proposal == self,
@@ -219,7 +218,7 @@
Job._status.is_in([JobStatus.WAITING, JobStatus.RUNNING]))
job = jobs.order_by(Job.scheduled_start, Job.date_created).first()
if job is not None:
- return BranchMergeProposalJobFactory.create(job)
+ return job.makeDerived()
else:
return None
=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py 2011-12-30 06:14:56 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py 2012-04-11 18:49:25 +0000
@@ -14,7 +14,6 @@
__all__ = [
'BranchMergeProposalJob',
- 'BranchMergeProposalJobFactory',
'BranchMergeProposalJobSource',
'BranchMergeProposalJobType',
'CodeReviewCommentEmailJob',
@@ -89,6 +88,7 @@
from lp.code.mail.codereviewcomment import CodeReviewCommentMailer
from lp.code.model.branchmergeproposal import BranchMergeProposal
from lp.code.model.diff import PreviewDiff
+from lp.codehosting.bzrutils import server
from lp.codehosting.vfs import (
get_ro_server,
get_rw_server,
@@ -98,7 +98,10 @@
from lp.services.database.enumcol import EnumCol
from lp.services.database.stormbase import StormBase
from lp.services.job.interfaces.job import JobStatus
-from lp.services.job.model.job import Job
+from lp.services.job.model.job import (
+ EnumeratedSubclass,
+ Job,
+ )
from lp.services.job.runner import (
BaseRunnableJob,
BaseRunnableJobSource,
@@ -236,10 +239,15 @@
'No occurrence of %s has key %s' % (klass.__name__, key))
return instance
+ def makeDerived(self):
+ return BranchMergeProposalJobDerived.makeSubclass(self)
+
class BranchMergeProposalJobDerived(BaseRunnableJob):
-
"""Intermediate class for deriving from BranchMergeProposalJob."""
+
+ __metaclass__ = EnumeratedSubclass
+
delegates(IBranchMergeProposalJob)
def __init__(self, job):
@@ -256,9 +264,15 @@
@classmethod
def create(cls, bmp):
"""See `IMergeProposalCreationJob`."""
- job = BranchMergeProposalJob(
- bmp, cls.class_job_type, {})
- return cls(job)
+ return cls._create(bmp, {})
+
+ @classmethod
+ def _create(cls, bmp, metadata):
+ base_job = BranchMergeProposalJob(
+ bmp, cls.class_job_type, metadata)
+ job = cls(base_job)
+ job.celeryRunOnCommit()
+ return job
@classmethod
def get(cls, job_id):
@@ -317,6 +331,8 @@
class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW
+ config = config.merge_proposal_jobs
+
def run(self):
"""See `IMergeProposalNeedsReviewEmailJob`."""
mailer = BMPMailer.forCreation(
@@ -344,6 +360,8 @@
class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF
+ config = config.merge_proposal_jobs
+
user_error_types = (UpdatePreviewDiffNotReady, )
retry_error_types = (BranchHasPendingWrites, )
@@ -369,8 +387,9 @@
def run(self):
"""See `IRunnableJob`."""
self.checkReady()
- preview = PreviewDiff.fromBranchMergeProposal(
- self.branch_merge_proposal)
+ with server(get_ro_server(), no_replace=True):
+ preview = PreviewDiff.fromBranchMergeProposal(
+ self.branch_merge_proposal)
with BranchMergeProposalDelta.monitor(
self.branch_merge_proposal):
self.branch_merge_proposal.preview_diff = preview
@@ -473,6 +492,8 @@
class_job_type = BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL
+ config = config.merge_proposal_jobs
+
def run(self):
"""See `IRunnableJob`."""
mailer = CodeReviewCommentMailer.forCreation(self.code_review_comment)
@@ -483,8 +504,7 @@
"""See `ICodeReviewCommentEmailJobSource`."""
metadata = cls.getMetadata(code_review_comment)
bmp = code_review_comment.branch_merge_proposal
- job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
- return cls(job)
+ return cls._create(bmp, metadata)
@staticmethod
def getMetadata(code_review_comment):
@@ -525,6 +545,8 @@
class_job_type = BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL
+ config = config.merge_proposal_jobs
+
def run(self):
"""See `IRunnableJob`."""
reason = RecipientReason.forReviewer(
@@ -538,8 +560,7 @@
"""See `IReviewRequestedEmailJobSource`."""
metadata = cls.getMetadata(review_request)
bmp = review_request.branch_merge_proposal
- job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
- return cls(job)
+ return cls._create(bmp, metadata)
@staticmethod
def getMetadata(review_request):
@@ -591,6 +612,8 @@
class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED
+ config = config.merge_proposal_jobs
+
def run(self):
"""See `IRunnableJob`."""
mailer = BMPMailer.forModification(
@@ -601,9 +624,7 @@
def create(cls, merge_proposal, delta_text, editor):
"""See `IReviewRequestedEmailJobSource`."""
metadata = cls.getMetadata(delta_text, editor)
- job = BranchMergeProposalJob(
- merge_proposal, cls.class_job_type, metadata)
- return cls(job)
+ return cls._create(merge_proposal, metadata)
@staticmethod
def getMetadata(delta_text, editor):
@@ -658,6 +679,8 @@
class_job_type = BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF
+ config = config.merge_proposal_jobs
+
def acquireLease(self, duration=600):
return self.job.acquireLease(duration)
@@ -665,15 +688,14 @@
revision_set = getUtility(IRevisionSet)
old_revision = revision_set.getByRevisionId(self.old_revision_id)
new_revision = revision_set.getByRevisionId(self.new_revision_id)
- self.branch_merge_proposal.generateIncrementalDiff(
- old_revision, new_revision)
+ with server(get_ro_server(), no_replace=True):
+ self.branch_merge_proposal.generateIncrementalDiff(
+ old_revision, new_revision)
@classmethod
def create(cls, merge_proposal, old_revision_id, new_revision_id):
metadata = cls.getMetadata(old_revision_id, new_revision_id)
- job = BranchMergeProposalJob(
- merge_proposal, cls.class_job_type, metadata)
- return cls(job)
+ return cls._create(merge_proposal, metadata)
@staticmethod
def getMetadata(old_revision_id, new_revision_id):
@@ -710,31 +732,6 @@
return format_address_for_person(registrant)
-class BranchMergeProposalJobFactory:
- """Construct a derived merge proposal job for a BranchMergeProposalJob."""
-
- job_classes = {
- BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW:
- MergeProposalNeedsReviewEmailJob,
- BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF:
- UpdatePreviewDiffJob,
- BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL:
- CodeReviewCommentEmailJob,
- BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL:
- ReviewRequestedEmailJob,
- BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED:
- MergeProposalUpdatedEmailJob,
- BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF:
- GenerateIncrementalDiffJob,
- }
-
- @classmethod
- def create(cls, bmp_job):
- """Create the derived job for the bmp_job's job type."""
- job_class = cls.job_classes[bmp_job.job_type]
- return job_class(bmp_job)
-
-
class BranchMergeProposalJobSource(BaseRunnableJobSource):
"""Provide a job source for all merge proposal jobs.
@@ -748,10 +745,6 @@
def contextManager():
"""See `IJobSource`."""
errorlog.globalErrorUtility.configure('merge_proposal_jobs')
- server = get_ro_server()
- server.start_server()
- yield
- server.stop_server()
@staticmethod
def get(job_id):
@@ -763,7 +756,7 @@
or its job_type does not match the desired subclass.
"""
job = BranchMergeProposalJob.get(job_id)
- return BranchMergeProposalJobFactory.create(job)
+ return job.makeDerived()
@staticmethod
def iterReady(job_type=None):
@@ -802,7 +795,7 @@
# If the job is running, then skip it
if job.status == JobStatus.RUNNING:
continue
- derived_job = BranchMergeProposalJobFactory.create(bmp_job)
+ derived_job = bmp_job.makeDerived()
# If the job is an update preview diff, then check that it is
# ready.
if IUpdatePreviewDiffJob.providedBy(derived_job):
=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-01-20 15:42:44 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-04-11 18:49:25 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Canonical Ltd. This software is licensed under the
+# Copyright 2010-2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for branch merge proposal jobs."""
@@ -55,9 +55,15 @@
)
from lp.code.subscribers.branchmergeproposal import merge_proposal_modified
from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
from lp.services.job.runner import JobRunner
+from lp.services.job.tests import (
+ celeryd,
+ monitor_celery,
+ pop_remote_notifications,
+ )
from lp.services.osutils import override_environ
from lp.services.webapp.testing import verifyObject
from lp.testing import (
@@ -65,7 +71,10 @@
TestCaseWithFactory,
)
from lp.testing.dbuser import dbuser
-from lp.testing.layers import LaunchpadZopelessLayer
+from lp.testing.layers import (
+ AppServerLayer,
+ LaunchpadZopelessLayer,
+ )
from lp.testing.mail_helpers import pop_notifications
@@ -549,6 +558,15 @@
'emailing a reviewer requesting a review',
job.getOperationDescription())
+ def test_run_sends_mail(self):
+ request = self.factory.makeCodeReviewVoteReference()
+ job = ReviewRequestedEmailJob.create(request)
+ job.run()
+ (notification,) = pop_notifications()
+ self.assertIn(
+ 'You have been requested to review the proposed merge',
+ notification.get_payload(decode=True))
+
class TestMergeProposalUpdatedEmailJob(TestCaseWithFactory):
@@ -573,3 +591,83 @@
self.assertEqual(
'emailing subscribers about merge proposal changes',
job.getOperationDescription())
+
+
+class TestViaCelery(TestCaseWithFactory):
+
+ layer = AppServerLayer
+
+ def test_MergeProposalNeedsReviewEmailJob(self):
+ """MergeProposalNeedsReviewEmailJob runs under Celery."""
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes':
+ 'MergeProposalNeedsReviewEmailJob'}))
+ self.useContext(celeryd('job'))
+ bmp = self.factory.makeBranchMergeProposal()
+ with monitor_celery() as responses:
+ MergeProposalNeedsReviewEmailJob.create(bmp)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertEqual(2, len(pop_remote_notifications()))
+
+ def test_UpdatePreviewDiffJob(self):
+ """UpdatePreviewDiffJob runs under Celery."""
+ self.useContext(celeryd('job'))
+ self.useBzrBranches(direct_database=True)
+ bmp = create_example_merge(self)[0]
+ self.factory.makeRevisionsForBranch(bmp.source_branch, count=1)
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'UpdatePreviewDiffJob'}))
+ with monitor_celery() as responses:
+ UpdatePreviewDiffJob.create(bmp)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertIsNot(None, bmp.preview_diff)
+
+ def test_CodeReviewCommentEmailJob(self):
+ """CodeReviewCommentEmailJob runs under Celery."""
+ comment = self.factory.makeCodeReviewComment()
+ self.useContext(celeryd('job'))
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'CodeReviewCommentEmailJob'}))
+ with monitor_celery() as responses:
+ CodeReviewCommentEmailJob.create(comment)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertEqual(2, len(pop_remote_notifications()))
+
+ def test_ReviewRequestedEmailJob(self):
+ """ReviewRequestedEmailJob runs under Celery."""
+ request = self.factory.makeCodeReviewVoteReference()
+ self.useContext(celeryd('job'))
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'ReviewRequestedEmailJob'}))
+ with monitor_celery() as responses:
+ ReviewRequestedEmailJob.create(request)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertEqual(1, len(pop_remote_notifications()))
+
+ def test_MergeProposalUpdatedEmailJob(self):
+ """MergeProposalUpdatedEmailJob runs under Celery."""
+ bmp = self.factory.makeBranchMergeProposal()
+ self.useContext(celeryd('job'))
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'MergeProposalUpdatedEmailJob'}))
+ with monitor_celery() as responses:
+ MergeProposalUpdatedEmailJob.create(
+ bmp, 'change', bmp.registrant)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertEqual(2, len(pop_remote_notifications()))
+
+ def test_GenerateIncrementalDiffJob(self):
+ """GenerateIncrementalDiffJob runs under Celery."""
+ self.useContext(celeryd('job'))
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'GenerateIncrementalDiffJob'}))
+ with monitor_celery() as responses:
+ job = make_runnable_incremental_diff_job(self)
+ transaction.commit()
+ responses[0].wait(30)
+ self.assertEqual(JobStatus.COMPLETED, job.status)
=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py 2012-04-09 19:02:25 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-11 18:49:25 +0000
@@ -32,6 +32,7 @@
from lp.services.job.tests import (
celeryd,
monitor_celery,
+ pop_remote_notifications,
)
from lp.testing import TestCaseWithFactory
from lp.testing.dbuser import switch_dbuser
@@ -163,11 +164,6 @@
layer = ZopelessAppServerLayer
- @staticmethod
- def pop_notifications():
- from lp.services.job.tests.celery_helpers import pop_notifications
- return pop_notifications.delay().get(30)
-
def prepare(self, job_name):
self.useFixture(FeatureFixture(
{'jobs.celery.enabled_classes': job_name}))
@@ -186,7 +182,7 @@
with monitor_celery() as responses:
BzrSync(db_branch).syncBranchAndClose(tree.branch)
responses[-1].wait(30)
- self.assertEqual(1, len(self.pop_notifications()))
+ self.assertEqual(1, len(pop_remote_notifications()))
def test_uncommit_branch(self):
"""RevisionMailJob for removed revisions runs via Celery."""
@@ -196,11 +192,11 @@
with monitor_celery() as responses:
bzr_sync.syncBranchAndClose(tree.branch)
responses[0].wait(30)
- self.pop_notifications()
+ pop_remote_notifications()
uncommit(tree.branch)
bzr_sync.syncBranchAndClose(tree.branch)
responses[1].wait(30)
- self.assertEqual(1, len(self.pop_notifications()))
+ self.assertEqual(1, len(pop_remote_notifications()))
def test_revisions_added(self):
"""RevisionMailJob for removed revisions runs via Celery."""
@@ -208,12 +204,12 @@
tree.commit('message')
bzr_sync = BzrSync(db_branch)
bzr_sync.syncBranchAndClose(tree.branch)
- self.pop_notifications()
+ pop_remote_notifications()
tree.commit('message2')
with monitor_celery() as responses:
bzr_sync.syncBranchAndClose(tree.branch)
responses[-1].wait(30)
- self.assertEqual(1, len(self.pop_notifications()))
+ self.assertEqual(1, len(pop_remote_notifications()))
class TestScanBranches(TestCaseWithFactory):
=== modified file 'lib/lp/codehosting/scanner/tests/test_mergedetection.py'
--- lib/lp/codehosting/scanner/tests/test_mergedetection.py 2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/tests/test_mergedetection.py 2012-04-11 18:49:25 +0000
@@ -19,7 +19,6 @@
from lp.code.interfaces.branchlookup import IBranchLookup
from lp.code.model.branchmergeproposaljob import (
BranchMergeProposalJob,
- BranchMergeProposalJobFactory,
BranchMergeProposalJobType,
)
from lp.codehosting.scanner import (
@@ -285,7 +284,7 @@
BranchMergeProposalJob.branch_merge_proposal == proposal,
BranchMergeProposalJob.job_type ==
BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED).one()
- derived_job = BranchMergeProposalJobFactory.create(job)
+ derived_job = job.makeDerived()
derived_job.run()
notifications = pop_notifications()
self.assertIn('Work in progress => Merged',
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2012-04-05 19:05:16 +0000
+++ lib/lp/services/job/celeryjob.py 2012-04-11 18:49:25 +0000
@@ -9,7 +9,10 @@
__metaclass__ = type
-__all__ = ['CeleryRunJob']
+__all__ = [
+ 'CeleryRunJob',
+ 'CeleryRunJobIgnoreResult',
+ ]
import os
@@ -28,3 +31,8 @@
def getJobRunner(self):
"""Return a BaseJobRunner, to support customization."""
return BaseJobRunner()
+
+
+class CeleryRunJobIgnoreResult(CeleryRunJob):
+
+ ignore_result = True
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-04-09 19:02:25 +0000
+++ lib/lp/services/job/model/job.py 2012-04-11 18:49:25 +0000
@@ -261,11 +261,18 @@
from lp.code.model.branchjob import (
BranchJob,
)
- store = IStore(BranchJob)
- branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
- if branch_job is None:
+ from lp.code.model.branchmergeproposaljob import (
+ BranchMergeProposalJob,
+ )
+ store = IStore(Job)
+ for cls in [BranchJob, BranchMergeProposalJob]:
+ base_job = store.find(cls, cls.job == job_id).one()
+ if base_job is not None:
+ break
+ if base_job is None:
raise ValueError('No BranchJob with job=%s.' % job_id)
- return branch_job.makeDerived(), store
+
+ return base_job.makeDerived(), store
@classmethod
def switchDBUser(cls, job_id):
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/runner.py 2012-04-11 18:49:25 +0000
@@ -194,10 +194,13 @@
"""Request that this job be run via celery."""
# Avoid importing from lp.services.job.celeryjob where not needed, to
# avoid configuring Celery when Rabbit is not configured.
- from lp.services.job.celeryjob import CeleryRunJob
- return CeleryRunJob.apply_async(
- (self.job_id,), queue=self.task_queue,
- ignore_result=ignore_result)
+ from lp.services.job.celeryjob import (
+ CeleryRunJob, CeleryRunJobIgnoreResult)
+ if ignore_result:
+ cls = CeleryRunJobIgnoreResult
+ else:
+ cls = CeleryRunJob
+ return cls.apply_async((self.job_id,), queue=self.task_queue)
def celeryCommitHook(self, succeeded):
"""Hook function to call when a commit completes."""
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-04-10 20:24:43 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-04-11 18:49:25 +0000
@@ -5,7 +5,8 @@
__all__ = [
'celeryd',
- 'monitor_celery'
+ 'monitor_celery',
+ 'pop_remote_notifications',
]
@@ -49,3 +50,9 @@
yield responses
finally:
BaseRunnableJob.celery_responses = old_responses
+
+
+def pop_remote_notifications():
+ """Pop the notifications from a celeryd worker."""
+ from lp.services.job.tests.celery_helpers import pop_notifications
+ return pop_notifications.delay().get(30)
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/tests/test_job.py 2012-04-11 18:49:25 +0000
@@ -10,6 +10,7 @@
from lazr.jobrunner.jobrunner import LeaseHeld
from storm.locals import Store
+from lp.code.model.branchmergeproposaljob import CodeReviewCommentEmailJob
from lp.services.database.constants import UTC_NOW
from lp.services.database.lpstorm import IStore
from lp.services.job.interfaces.job import (
@@ -19,6 +20,7 @@
from lp.services.job.model.job import (
InvalidTransition,
Job,
+ UniversalJobSource,
)
from lp.services.webapp.testing import verifyObject
from lp.testing import (
@@ -340,3 +342,14 @@
job = Job()
job.acquireLease(-300)
self.assertEqual(0, job.getTimeout())
+
+
+class TestUniversalJobSource(TestCaseWithFactory):
+
+ layer = ZopelessDatabaseLayer
+
+ def test_getDerived_with_merge_proposal_job(self):
+ comment = self.factory.makeCodeReviewComment()
+ job = CodeReviewCommentEmailJob.create(comment)
+ newjob = UniversalJobSource.getDerived(job.job_id)[0]
+ self.assertEqual(job, newjob)