launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07044
[Merge] lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-2/+merge/101284
= Summary =
Support revision mail jobs and translation jobs via Celery.
== Pre-implementation notes ==
None
== Implementation details ==
Add celeryRunJob calls for RosettaUploadJob, RevisionMailJob and RevisionsAddedJob, and test.
Provide pop_notifications task to retrieve the mail notifications from a celeryd.
Rename the feature flag to remove an extra "s" in "classes".
Rename the task queue names to "job" and "branch_write_job", to be more descriptive.
Run jobs with the appropriate database user. To do this, introduce a "config" member on BaseRunnableJob that gives the configuration for that job type.
== Tests ==
bin/test -vt ViaCelery
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/job/runner.py
lib/lp/code/model/tests/test_branchjob.py
lib/lp/services/job/tests/celery_helpers.py
lib/lp/codehosting/scanner/tests/test_email.py
lib/lp/services/features/flags.py
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/model/job.py
lib/lp/services/job/tests/__init__.py
lib/lp/codehosting/scanner/email.py
lib/lp/code/model/tests/test_branch.py
lib/lp/code/model/branchjob.py
--
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-2/+merge/101284
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad.
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py 2012-04-02 20:46:13 +0000
+++ lib/lp/code/model/branchjob.py 2012-04-09 19:51:32 +0000
@@ -80,7 +80,10 @@
from lp.code.model.branch import Branch
from lp.code.model.branchmergeproposal import BranchMergeProposal
from lp.code.model.revision import RevisionSet
-from lp.codehosting.bzrutils import server
+from lp.codehosting.bzrutils import (
+ read_locked,
+ server,
+ )
from lp.codehosting.scanner.bzrsync import BzrSync
from lp.codehosting.vfs import (
get_ro_server,
@@ -89,6 +92,7 @@
from lp.codehosting.vfs.branchfs import get_real_branch_path
from lp.registry.interfaces.productseries import IProductSeriesSet
from lp.scripts.helpers import TransactionFreeOperation
+from lp.services.config import config
from lp.services.database.enumcol import EnumCol
from lp.services.database.lpstorm import IStore
from lp.services.database.sqlbase import SQLBase
@@ -296,6 +300,8 @@
class_job_type = BranchJobType.SCAN_BRANCH
memory_limit = 2 * (1024 ** 3)
+ config = config.branchscanner
+
@classmethod
def create(cls, branch):
"""See `IBranchScanJobSource`."""
@@ -327,7 +333,9 @@
user_error_types = (NotBranchError,)
- task_queue = 'branch_write'
+ task_queue = 'branch_write_job'
+
+ config = config.upgrade_branches
def getOperationDescription(self):
return 'upgrading a branch'
@@ -413,6 +421,8 @@
class_job_type = BranchJobType.REVISION_MAIL
+ config = config.sendbranchmail
+
@classmethod
def create(cls, branch, revno, from_address, body, subject):
"""See `IRevisionMailJobSource`."""
@@ -458,6 +468,8 @@
class_job_type = BranchJobType.REVISIONS_ADDED_MAIL
+ config = config.sendbranchmail
+
@classmethod
def create(cls, branch, last_scanned_id, last_revision_id,
from_address):
@@ -520,15 +532,13 @@
if not subscriptions:
return
- self.bzr_branch.lock_read()
- try:
+ with server(get_ro_server(), no_replace=True), \
+ read_locked(self.bzr_branch):
for revision, revno in self.iterAddedMainline():
assert revno is not None
mailer = self.getMailerForRevision(
revision, revno, self.generateDiffs())
mailer.sendAll()
- finally:
- self.bzr_branch.unlock()
def getDiffForRevisions(self, from_revision_id, to_revision_id):
"""Generate the diff between from_revision_id and to_revision_id."""
@@ -718,6 +728,8 @@
class_job_type = BranchJobType.ROSETTA_UPLOAD
+ config = config.rosettabranches
+
def __init__(self, branch_job):
super(RosettaUploadJob, self).__init__(branch_job)
@@ -763,7 +775,9 @@
force_translations_upload)
branch_job = BranchJob(
branch, BranchJobType.ROSETTA_UPLOAD, metadata)
- return cls(branch_job)
+ job = cls(branch_job)
+ job.celeryRunOnCommit()
+ return job
else:
return None
@@ -889,27 +903,28 @@
def run(self):
"""See `IRosettaUploadJob`."""
- # This is not called upon job creation because the branch would
- # neither have been mirrored nor scanned then.
- self._init_translation_file_lists()
- # Get the product series that are connected to this branch and
- # that want to upload translations.
- productseriesset = getUtility(IProductSeriesSet)
- productseries = productseriesset.findByTranslationsImportBranch(
- self.branch, self.force_translations_upload)
- translation_import_queue = getUtility(ITranslationImportQueue)
- for series in productseries:
- approver = TranslationBranchApprover(self.file_names,
- productseries=series)
- for iter_info in self._iter_lists_and_uploaders(series):
- file_names, changed_files, uploader = iter_info
- for upload_file_name, upload_file_content in changed_files:
- if len(upload_file_content) == 0:
- continue # Skip empty files
- entry = translation_import_queue.addOrUpdateEntry(
- upload_file_name, upload_file_content,
- True, uploader, productseries=series)
- approver.approve(entry)
+ with server(get_ro_server(), no_replace=True):
+ # This is not called upon job creation because the branch would
+ # neither have been mirrored nor scanned then.
+ self._init_translation_file_lists()
+ # Get the product series that are connected to this branch and
+ # that want to upload translations.
+ productseriesset = getUtility(IProductSeriesSet)
+ productseries = productseriesset.findByTranslationsImportBranch(
+ self.branch, self.force_translations_upload)
+ translation_import_queue = getUtility(ITranslationImportQueue)
+ for series in productseries:
+ approver = TranslationBranchApprover(self.file_names,
+ productseries=series)
+ for iter_info in self._iter_lists_and_uploaders(series):
+ file_names, changed_files, uploader = iter_info
+ for upload_file_name, upload_file_content in changed_files:
+ if len(upload_file_content) == 0:
+ continue # Skip empty files
+ entry = translation_import_queue.addOrUpdateEntry(
+ upload_file_name, upload_file_content,
+ True, uploader, productseries=series)
+ approver.approve(entry)
@staticmethod
def iterReady():
@@ -949,7 +964,9 @@
class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE
- task_queue = 'branch_write'
+ task_queue = 'branch_write_job'
+
+ config = config.reclaimbranchspace
def __repr__(self):
return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % {
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2012-04-04 13:16:03 +0000
+++ lib/lp/code/model/tests/test_branch.py 2012-04-09 19:51:32 +0000
@@ -316,8 +316,8 @@
# lp.services.job.celeryconfig is loaded.
from celery.exceptions import TimeoutError
self.useFixture(FeatureFixture({
- 'jobs.celery.enabled_classses': 'BranchScanJob'}))
- with celeryd('standard') as proc:
+ 'jobs.celery.enabled_classes': 'BranchScanJob'}))
+ with celeryd('job') as proc:
self.useBzrBranches()
db_branch, bzr_tree = self.create_branch_and_tree()
bzr_tree.commit(
@@ -348,8 +348,8 @@
"""Calling destroySelf causes Celery to delete the branch."""
from celery.exceptions import TimeoutError
self.useFixture(FeatureFixture({
- 'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'}))
- with celeryd('branch_write'):
+ 'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'}))
+ with celeryd('branch_write_job'):
self.useBzrBranches()
db_branch, tree = self.create_branch_and_tree()
branch_path = get_real_branch_path(db_branch.id)
@@ -818,7 +818,7 @@
def test_requestUpgradeUsesCelery(self):
self.useFixture(FeatureFixture({
- 'jobs.celery.enabled_classses': 'BranchUpgradeJob'}))
+ 'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
cwd = os.getcwd()
self.useBzrBranches()
db_branch, tree = create_knit(self)
@@ -829,7 +829,7 @@
db_branch.requestUpgrade(db_branch.owner)
with monitor_celery() as responses:
transaction.commit()
- with celeryd('branch_write', cwd):
+ with celeryd('branch_write_job', cwd):
responses[-1].wait(30)
new_branch = Branch.open(tree.branch.base)
self.assertEqual(
=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py 2012-04-02 18:49:44 +0000
+++ lib/lp/code/model/tests/test_branchjob.py 2012-04-09 19:51:32 +0000
@@ -60,6 +60,7 @@
RosettaUploadJob,
)
from lp.code.model.branchrevision import BranchRevision
+from lp.code.model.directbranchcommit import DirectBranchCommit
from lp.code.model.tests.test_branch import create_knit
from lp.code.model.revision import RevisionSet
from lp.codehosting.vfs import branch_id_to_path
@@ -67,18 +68,27 @@
from lp.services.config import config
from lp.services.database.constants import UTC_NOW
from lp.services.database.lpstorm import IMasterStore
+from lp.services.features.testing import FeatureFixture
from lp.services.identity.interfaces.emailaddress import EmailAddressStatus
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
from lp.services.job.runner import JobRunner
+from lp.services.job.tests import (
+ celeryd,
+ monitor_celery,
+ )
from lp.services.osutils import override_environ
from lp.services.webapp import canonical_url
-from lp.testing import TestCaseWithFactory
+from lp.testing import (
+ person_logged_in,
+ TestCaseWithFactory,
+ )
from lp.testing.dbuser import (
dbuser,
switch_dbuser,
)
from lp.testing.layers import (
+ AppServerLayer,
DatabaseFunctionalLayer,
LaunchpadZopelessLayer,
)
@@ -1222,6 +1232,41 @@
self.assertEqual([], unfinished_jobs)
+class TestViaCelery(TestCaseWithFactory):
+
+ layer = AppServerLayer
+
+ def test_RosettaUploadJob(self):
+ """Ensure RosettaUploadJob can run under Celery."""
+ self.useContext(celeryd('job'))
+ self.useBzrBranches(direct_database=True)
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob'
+ }))
+ db_branch = self.factory.makeAnyBranch()
+ self.createBzrBranch(db_branch)
+ commit = DirectBranchCommit(db_branch, no_race_check=True)
+ commit.writeFile('foo.pot', 'gibberish')
+ with monitor_celery() as responses:
+ with person_logged_in(db_branch.owner):
+ commit.commit('message')
+ transaction.commit()
+ # Wait for branch scan to complete.
+ responses[0].wait(30)
+ series = self.factory.makeProductSeries(branch=db_branch)
+ RosettaUploadJob.create(
+ commit.db_branch, NULL_REVISION,
+ force_translations_upload=True)
+ transaction.commit()
+ # Wait for RosettaUploadJob to complete
+ responses[1].wait(30)
+ queue = getUtility(ITranslationImportQueue)
+ entries = list(queue.getAllEntries(target=series))
+ self.assertEqual(len(entries), 1)
+ entry = entries[0]
+ self.assertEqual('foo.pot', entry.path)
+
+
class TestReclaimBranchSpaceJob(TestCaseWithFactory):
layer = LaunchpadZopelessLayer
=== modified file 'lib/lp/codehosting/scanner/email.py'
--- lib/lp/codehosting/scanner/email.py 2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/email.py 2012-04-09 19:51:32 +0000
@@ -46,10 +46,11 @@
# No diff is associated with the removed email.
subject = "[Branch %s] %s removed" % (
revisions_removed.db_branch.unique_name, count)
- getUtility(IRevisionMailJobSource).create(
+ job = getUtility(IRevisionMailJobSource).create(
revisions_removed.db_branch, revno='removed',
from_address=config.canonical.noreply_from_address,
body=contents, subject=subject)
+ job.celeryRunOnCommit()
def queue_tip_changed_email_jobs(tip_changed):
@@ -66,11 +67,12 @@
revisions)
subject = "[Branch %s] %s" % (
tip_changed.db_branch.unique_name, revisions)
- getUtility(IRevisionMailJobSource).create(
+ job = getUtility(IRevisionMailJobSource).create(
tip_changed.db_branch, 'initial',
config.canonical.noreply_from_address, message, subject)
else:
- getUtility(IRevisionsAddedJobSource).create(
+ job = getUtility(IRevisionsAddedJobSource).create(
tip_changed.db_branch, tip_changed.db_branch.last_scanned_id,
tip_changed.bzr_branch.last_revision(),
config.canonical.noreply_from_address)
+ job.celeryRunOnCommit()
=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
--- lib/lp/codehosting/scanner/tests/test_email.py 2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-09 19:51:32 +0000
@@ -7,6 +7,7 @@
import email
+from bzrlib.uncommit import uncommit
from zope.component import getUtility
from zope.event import notify
@@ -20,13 +21,34 @@
IRevisionsAddedJobSource,
)
from lp.code.model.branchjob import RevisionMailJob
+from lp.codehosting.scanner.bzrsync import BzrSync
from lp.codehosting.scanner import events
from lp.codehosting.scanner.tests.test_bzrsync import BzrSyncTestCase
from lp.registry.interfaces.person import IPersonSet
+from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
from lp.services.job.runner import JobRunner
from lp.services.mail import stub
+from lp.services.job.tests import (
+ celeryd,
+ monitor_celery,
+ )
from lp.testing import TestCaseWithFactory
-from lp.testing.layers import LaunchpadZopelessLayer
+from lp.testing.dbuser import switch_dbuser
+from lp.testing.layers import (
+ LaunchpadZopelessLayer,
+ ZopelessAppServerLayer,
+ )
+
+
+def add_subscriber(branch):
+ test_user = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
+ branch.subscribe(
+ test_user,
+ BranchSubscriptionNotificationLevel.FULL,
+ BranchSubscriptionDiffSize.FIVEKLINES,
+ CodeReviewNotificationLevel.NOEMAIL,
+ test_user)
class TestBzrSyncEmail(BzrSyncTestCase):
@@ -39,13 +61,7 @@
def makeDatabaseBranch(self):
branch = BzrSyncTestCase.makeDatabaseBranch(self)
LaunchpadZopelessLayer.txn.begin()
- test_user = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
- branch.subscribe(
- test_user,
- BranchSubscriptionNotificationLevel.FULL,
- BranchSubscriptionDiffSize.FIVEKLINES,
- CodeReviewNotificationLevel.NOEMAIL,
- test_user)
+ add_subscriber(branch)
LaunchpadZopelessLayer.txn.commit()
return branch
@@ -130,12 +146,12 @@
recommit_email_msg = email.message_from_string(recommit_email[2])
recommit_email_body = recommit_email_msg.get_payload()[0].get_payload(
decode=True)
- subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name
+ subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name
self.assertEmailHeadersEqual(subject, recommit_email_msg['Subject'])
body_bits = [
'revno: 1',
'committer: %s' % author,
- 'branch nick: %s' % self.bzr_branch.nick,
+ 'branch nick: %s' % self.bzr_branch.nick,
'message:\n second',
'added:\n hello.txt',
]
@@ -143,6 +159,63 @@
self.assertTextIn(bit, recommit_email_body)
+class TestViaCelery(TestCaseWithFactory):
+
+ layer = ZopelessAppServerLayer
+
+ @staticmethod
+ def pop_notifications():
+ from lp.services.job.tests.celery_helpers import pop_notifications
+ return pop_notifications.delay().get(30)
+
+ def prepare(self, job_name):
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': job_name}))
+ self.useContext(celeryd('job'))
+ self.useBzrBranches(direct_database=True)
+ db_branch, tree = self.create_branch_and_tree()
+ add_subscriber(db_branch)
+ switch_dbuser(config.branchscanner.dbuser)
+ # Needed for feature flag teardown
+ self.addCleanup(switch_dbuser, config.launchpad.dbuser)
+ return db_branch, tree
+
+ def test_empty_branch(self):
+ """RevisionMailJob for empty branches runs via Celery."""
+ db_branch, tree = self.prepare('RevisionMailJob')
+ with monitor_celery() as responses:
+ BzrSync(db_branch).syncBranchAndClose(tree.branch)
+ responses[-1].wait(30)
+ self.assertEqual(1, len(self.pop_notifications()))
+
+ def test_uncommit_branch(self):
+ """RevisionMailJob for removed revisions runs via Celery."""
+ db_branch, tree = self.prepare('RevisionMailJob')
+ tree.commit('message')
+ bzr_sync = BzrSync(db_branch)
+ with monitor_celery() as responses:
+ bzr_sync.syncBranchAndClose(tree.branch)
+ responses[0].wait(30)
+ self.pop_notifications()
+ uncommit(tree.branch)
+ bzr_sync.syncBranchAndClose(tree.branch)
+ responses[1].wait(30)
+ self.assertEqual(1, len(self.pop_notifications()))
+
+ def test_revisions_added(self):
+ """RevisionMailJob for removed revisions runs via Celery."""
+ db_branch, tree = self.prepare('RevisionsAddedJob')
+ tree.commit('message')
+ bzr_sync = BzrSync(db_branch)
+ bzr_sync.syncBranchAndClose(tree.branch)
+ self.pop_notifications()
+ tree.commit('message2')
+ with monitor_celery() as responses:
+ bzr_sync.syncBranchAndClose(tree.branch)
+ responses[-1].wait(30)
+ self.assertEqual(1, len(self.pop_notifications()))
+
+
class TestScanBranches(TestCaseWithFactory):
layer = LaunchpadZopelessLayer
=== modified file 'lib/lp/services/features/flags.py'
--- lib/lp/services/features/flags.py 2012-04-05 13:05:04 +0000
+++ lib/lp/services/features/flags.py 2012-04-09 19:51:32 +0000
@@ -125,7 +125,7 @@
'',
'',
''),
- ('jobs.celery.enabled_classses',
+ ('jobs.celery.enabled_classes',
'space delimited',
'Names of Job classes that should be run via celery',
'No jobs run via celery',
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-04-04 15:15:09 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-04-09 19:51:32 +0000
@@ -8,9 +8,9 @@
CELERY_IMPORTS = ("lp.services.job.celeryjob", )
CELERY_RESULT_BACKEND = "amqp"
CELERY_QUEUES = {
- "branch_write": {"binding_key": "branch_write"},
- "standard": {"binding_key": "standard"},
+ "branch_write_job": {"binding_key": "branch_write_job"},
+ "job": {"binding_key": "job"},
}
-CELERY_DEFAULT_EXCHANGE = "standard"
-CELERY_DEFAULT_QUEUE = "standard"
+CELERY_DEFAULT_EXCHANGE = "job"
+CELERY_DEFAULT_QUEUE = "job"
CELERY_CREATE_MISSING_QUEUES = False
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-03-21 20:39:49 +0000
+++ lib/lp/services/job/model/job.py 2012-04-09 19:51:32 +0000
@@ -33,7 +33,9 @@
Int,
Reference,
)
+from storm.zope.interfaces import IZStorm
import transaction
+from zope.component import getUtility
from zope.interface import implements
from lp.services.config import dbconfig
@@ -253,17 +255,33 @@
needs_init = True
- @classmethod
- def get(cls, job_id):
- if cls.needs_init:
- scripts.execute_zcml_for_scripts(use_web_security=False)
- cls.needs_init = False
-
- dbconfig.override(
- dbuser='branchscanner', isolation_level='read_committed')
+ @staticmethod
+ def getDerived(job_id):
+ """Return the derived branch job associated with the job id."""
from lp.code.model.branchjob import (
BranchJob,
)
store = IStore(BranchJob)
branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
- return branch_job.makeDerived()
+ if branch_job is None:
+ raise ValueError('No BranchJob with job=%s.' % job_id)
+ return branch_job.makeDerived(), store
+
+ @classmethod
+ def switchDBUser(cls, job_id):
+ """Switch to the DB user associated with this Job ID."""
+ derived, store = cls.getDerived(job_id)
+ dbconfig.override(
+ dbuser=derived.config.dbuser, isolation_level='read_committed')
+ transaction.abort()
+ getUtility(IZStorm).remove(store)
+ store.close()
+
+ @classmethod
+ def get(cls, job_id):
+ transaction.abort()
+ if cls.needs_init:
+ scripts.execute_zcml_for_scripts(use_web_security=False)
+ cls.needs_init = False
+ cls.switchDBUser(job_id)
+ return cls.getDerived(job_id)[0]
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-04-03 18:58:11 +0000
+++ lib/lp/services/job/runner.py 2012-04-09 19:51:32 +0000
@@ -103,7 +103,7 @@
retry_error_types = ()
- task_queue = 'standard'
+ task_queue = 'job'
celery_responses = None
@@ -190,23 +190,22 @@
return oops_config.create(
context=dict(exc_info=info))
- def runViaCelery(self):
+ def runViaCelery(self, ignore_result=False):
"""Request that this job be run via celery."""
# Avoid importing from lp.services.job.celeryjob where not needed, to
# avoid configuring Celery when Rabbit is not configured.
from lp.services.job.celeryjob import CeleryRunJob
- ignore_result = bool(BaseRunnableJob.celery_responses is None)
- response = CeleryRunJob.apply_async(
+ return CeleryRunJob.apply_async(
(self.job_id,), queue=self.task_queue,
ignore_result=ignore_result)
- if not ignore_result:
- BaseRunnableJob.celery_responses.append(response)
- return response
def celeryCommitHook(self, succeeded):
"""Hook function to call when a commit completes."""
if succeeded:
- self.runViaCelery()
+ ignore_result = bool(BaseRunnableJob.celery_responses is None)
+ response = self.runViaCelery(ignore_result)
+ if not ignore_result:
+ BaseRunnableJob.celery_responses.append(response)
def celeryRunOnCommit(self):
"""Configure transaction so that commit runs this job via Celery."""
@@ -623,7 +622,7 @@
The name of a BaseRunnableJob must be specified.
"""
- flag = getFeatureFlag('jobs.celery.enabled_classses')
+ flag = getFeatureFlag('jobs.celery.enabled_classes')
if flag is None:
return False
return class_name in flag.split(' ')
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-04-04 15:15:09 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-04-09 19:51:32 +0000
@@ -32,6 +32,7 @@
'--concurrency', '1',
'--loglevel', 'INFO',
'--queues', queue,
+ '--include', 'lp.services.job.tests.celery_helpers',
)
with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
# Wait for celeryd startup to complete.
=== added file 'lib/lp/services/job/tests/celery_helpers.py'
--- lib/lp/services/job/tests/celery_helpers.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/celery_helpers.py 2012-04-09 19:51:32 +0000
@@ -0,0 +1,19 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = ['pop_notifications']
+
+# Force the correct celeryconfig to be used.
+import lp.services.job.celeryjob
+# Quiet lint unused import warning.
+lp.services.job.celeryjob
+
+from celery.task import task
+
+
+@task
+def pop_notifications():
+ from lp.testing.mail_helpers import pop_notifications
+ return pop_notifications()
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-04-03 16:06:43 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-04-09 19:51:32 +0000
@@ -720,19 +720,19 @@
def test_matching_flag(self):
"""A matching flag returns True."""
self.useFixture(FeatureFixture(
- {'jobs.celery.enabled_classses': 'foo bar'}))
+ {'jobs.celery.enabled_classes': 'foo bar'}))
self.assertTrue(celery_enabled('foo'))
self.assertTrue(celery_enabled('bar'))
def test_non_matching_flag(self):
"""A non-matching flag returns false."""
self.useFixture(FeatureFixture(
- {'jobs.celery.enabled_classses': 'foo bar'}))
+ {'jobs.celery.enabled_classes': 'foo bar'}))
self.assertFalse(celery_enabled('baz'))
self.assertTrue(celery_enabled('bar'))
def test_substring(self):
"""A substring of an enabled class does not match."""
self.useFixture(FeatureFixture(
- {'jobs.celery.enabled_classses': 'foobar'}))
+ {'jobs.celery.enabled_classes': 'foobar'}))
self.assertFalse(celery_enabled('bar'))