launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #06989
[Merge] lp:~abentley/launchpad/celery-everywhere into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere into lp:launchpad with lp:~abentley/launchpad/celery-job-feature-flag as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere/+merge/100686
= Summary =
Support running more jobs via Celery
== Pre-implementation notes ==
None
== Implementation details ==
In the prerequisite branch, a feature flag was introduced that controlled which kinds of jobs are run via Celery. This makes the celery_scan parameter of branchChanged obsolete, so it is removed.
Funtionality for running jobs via celery is extracted from branchChanged to BaseRunnableJob.celeryRunOnCommit (and callees).
A single config file is used, so that settings are common by default. The celeryd contexmanager overrides broker URL, concurrency and queue.
Jobs are assigned to the 'standard' queue by default. BranchUpgradeJob and ReclamBranchSpaceJob are assigned to the branch_write queue.
As a driveby, ReclaimBranchSpaceJob uses get_real_branch_path instead of reimplementing it.
create_knit moves to test_branch to facilitate testing branch upgrades.
Test ReclaimBranchSpaceJob and BranchUpgradeJob via celery.
TestCelery is deleted, because it was really only testing BranchScanJobs.
== Tests ==
bin/test -t elery branch
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/job/runner.py
lib/lp/code/interfaces/branch.py
lib/lp/services/job/interfaces/job.py
lib/lp/code/model/tests/test_branchjob.py
lib/lp/testing/factory.py
lib/lp/code/model/branch.py
lib/lp/services/features/flags.py
lib/lp/code/model/tests/test_branchtarget.py
lib/lp/services/job/celeryconfig.py
lib/lp/code/model/tests/test_branchpuller.py
lib/lp/services/job/tests/__init__.py
lib/lp/code/model/tests/test_branch.py
lib/lp/code/model/branchjob.py
--
https://code.launchpad.net/~abentley/launchpad/celery-everywhere/+merge/100686
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere into lp:launchpad.
=== modified file 'lib/lp/code/interfaces/branch.py'
--- lib/lp/code/interfaces/branch.py 2012-03-27 19:12:33 +0000
+++ lib/lp/code/interfaces/branch.py 2012-04-03 20:26:24 +0000
@@ -1078,7 +1078,7 @@
"""Create an IBranchUpgradeJob to upgrade this branch."""
def branchChanged(stacked_on_url, last_revision_id, control_format,
- branch_format, repository_format, celery_scan=True):
+ branch_format, repository_format):
"""Record that a branch has been changed.
This method records the stacked on branch tip revision id and format
@@ -1092,9 +1092,6 @@
:param branch_format: The entry from BranchFormat for the branch.
:param repository_format: The entry from RepositoryFormat for the
branch.
- :param celery_scan: If True, request a branch scan via Celery.
- Otherwise, a BranchScanJob may be created, but not requested to
- run. Should only be False in certain tests.
"""
@export_destructor_operation()
=== modified file 'lib/lp/code/model/branch.py'
--- lib/lp/code/model/branch.py 2012-04-03 20:26:23 +0000
+++ lib/lp/code/model/branch.py 2012-04-03 20:26:24 +0000
@@ -40,7 +40,6 @@
Reference,
)
from storm.store import Store
-import transaction
from zope.component import getUtility
from zope.event import notify
from zope.interface import implements
@@ -150,10 +149,6 @@
from lp.services.helpers import shortlist
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
-from lp.services.job.runner import (
- BaseRunnableJob,
- celery_enabled,
- )
from lp.services.mail.notificationrecipientset import NotificationRecipientSet
from lp.services.propertycache import cachedproperty
from lp.services.webapp import urlappend
@@ -1037,8 +1032,7 @@
return getUtility(IBranchLookup).getByUniqueName(location)
def branchChanged(self, stacked_on_url, last_revision_id,
- control_format, branch_format, repository_format,
- celery_scan=True):
+ control_format, branch_format, repository_format):
"""See `IBranch`."""
self.mirror_status_message = None
if stacked_on_url == '' or stacked_on_url is None:
@@ -1063,20 +1057,8 @@
self.last_mirrored_id = last_revision_id
if self.last_scanned_id != last_revision_id:
from lp.code.model.branchjob import BranchScanJob
- job_id = BranchScanJob.create(self).job_id
- if celery_scan and celery_enabled('BranchScanJob'):
- # lp.services.job.celery is imported only where needed.
- from lp.services.job.celeryjob import CeleryRunJob
- current = transaction.get()
-
- def runHook(succeeded):
- ignore_result = (BaseRunnableJob.celery_responses is None)
- if succeeded:
- response = CeleryRunJob.apply_async(
- (job_id,), ignore_result=ignore_result)
- if not ignore_result:
- BaseRunnableJob.celery_responses.append(response)
- current.addAfterCommitHook(runHook)
+ job = BranchScanJob.create(self)
+ job.celeryRunOnCommit()
self.control_format = control_format
self.branch_format = branch_format
self.repository_format = repository_format
@@ -1151,7 +1133,8 @@
branch_id = self.id
SQLBase.destroySelf(self)
# And now create a job to remove the branch from disk when it's done.
- getUtility(IReclaimBranchSpaceJobSource).create(branch_id)
+ job = getUtility(IReclaimBranchSpaceJobSource).create(branch_id)
+ job.celeryRunOnCommit()
def commitsForDays(self, since):
"""See `IBranch`."""
@@ -1205,7 +1188,9 @@
def requestUpgrade(self, requester):
"""See `IBranch`."""
from lp.code.interfaces.branchjob import IBranchUpgradeJobSource
- return getUtility(IBranchUpgradeJobSource).create(self, requester)
+ job = getUtility(IBranchUpgradeJobSource).create(self, requester)
+ job.celeryRunOnCommit()
+ return job
def _checkBranchVisibleByUser(self, user):
"""Is *this* branch visible by the user.
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py 2012-03-27 14:36:41 +0000
+++ lib/lp/code/model/branchjob.py 2012-04-03 20:26:24 +0000
@@ -83,13 +83,12 @@
from lp.codehosting.bzrutils import server
from lp.codehosting.scanner.bzrsync import BzrSync
from lp.codehosting.vfs import (
- branch_id_to_path,
get_ro_server,
get_rw_server,
)
+from lp.codehosting.vfs.branchfs import get_real_branch_path
from lp.registry.interfaces.productseries import IProductSeriesSet
from lp.scripts.helpers import TransactionFreeOperation
-from lp.services.config import config
from lp.services.database.enumcol import EnumCol
from lp.services.database.lpstorm import IStore
from lp.services.database.sqlbase import SQLBase
@@ -328,6 +327,8 @@
user_error_types = (NotBranchError,)
+ task_queue = 'branch_write'
+
def getOperationDescription(self):
return 'upgrading a branch'
@@ -948,6 +949,8 @@
class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE
+ task_queue = 'branch_write'
+
def __repr__(self):
return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % {
'id': self.context.id,
@@ -970,8 +973,6 @@
return self.metadata['branch_id']
def run(self):
- branch_path = os.path.join(
- config.codehosting.mirrored_branches_root,
- branch_id_to_path(self.branch_id))
+ branch_path = get_real_branch_path(self.branch_id)
if os.path.exists(branch_path):
shutil.rmtree(branch_path)
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2012-04-03 20:26:23 +0000
+++ lib/lp/code/model/tests/test_branch.py 2012-04-03 20:26:24 +0000
@@ -14,6 +14,7 @@
)
import os
+from bzrlib.branch import Branch
from bzrlib.bzrdir import BzrDir
from bzrlib.revision import NULL_REVISION
from pytz import UTC
@@ -21,6 +22,10 @@
from sqlobject import SQLObjectNotFound
from storm.locals import Store
from testtools import ExpectedException
+from testtools.matchers import (
+ Not,
+ PathExists,
+ )
import transaction
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -104,6 +109,7 @@
from lp.code.model.revision import Revision
from lp.code.tests.helpers import add_revision_to_branch
from lp.codehosting.safe_open import BadUrl
+from lp.codehosting.vfs.branchfs import get_real_branch_path
from lp.registry.interfaces.person import PersonVisibility
from lp.registry.interfaces.pocket import PackagePublishingPocket
from lp.registry.model.sourcepackage import SourcePackage
@@ -111,6 +117,10 @@
from lp.services.database.constants import UTC_NOW
from lp.services.database.lpstorm import IStore
from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import (
+ celeryd,
+ monitor_celery,
+ )
from lp.services.osutils import override_environ
from lp.services.propertycache import clear_property_cache
from lp.services.webapp.interfaces import IOpenLaunchBag
@@ -141,6 +151,13 @@
)
+def create_knit(test_case):
+ db_branch, tree = test_case.create_branch_and_tree(format='knit')
+ db_branch.branch_format = BranchFormat.BZR_BRANCH_5
+ db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
+ return db_branch, tree
+
+
class TestCodeImport(TestCase):
layer = LaunchpadZopelessLayer
@@ -297,16 +314,10 @@
# Delay importing anything that uses Celery until RabbitMQLayer is
# running, so that config.rabbitmq.host is defined when
# lp.services.job.celeryconfig is loaded.
- from lp.services.job.celeryjob import CeleryRunJob
- from lp.services.job.tests.test_celeryjob import monitor_celery
from celery.exceptions import TimeoutError
- from lazr.jobrunner.tests.test_celerytask import running
- self.useFixture(FeatureFixture({'jobs.celery.enabled_classses':
- 'BranchScanJob'}))
- cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
- env = dict(os.environ)
- env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
- with running('bin/celeryd', cmd_args, env=env) as proc:
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classses': 'BranchScanJob'}))
+ with celeryd('standard') as proc:
self.useBzrBranches()
db_branch, bzr_tree = self.create_branch_and_tree()
bzr_tree.commit(
@@ -327,7 +338,6 @@
# Delay importing anything that uses Celery until RabbitMQLayer is
# running, so that config.rabbitmq.host is defined when
# lp.services.job.celeryconfig is loaded.
- from lp.services.job.tests.test_celeryjob import monitor_celery
self.useBzrBranches()
db_branch, bzr_tree = self.create_branch_and_tree()
bzr_tree.commit(
@@ -337,6 +347,25 @@
transaction.commit()
self.assertEqual([], responses)
+ def test_destroySelf_via_celery(self):
+ """Calling destroySelf causes Celery to delete the branch."""
+ from celery.exceptions import TimeoutError
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'}))
+ with celeryd('branch_write'):
+ self.useBzrBranches()
+ db_branch, tree = self.create_branch_and_tree()
+ branch_path = get_real_branch_path(db_branch.id)
+ self.assertThat(branch_path, PathExists())
+ db_branch.destroySelf()
+ with monitor_celery() as responses:
+ transaction.commit()
+ try:
+ responses[-1].wait(30)
+ except TimeoutError:
+ pass
+ self.assertThat(branch_path, Not(PathExists()))
+
class TestBranchRevisionMethods(TestCaseWithFactory):
"""Test the branch methods for adding and removing branch revisions."""
@@ -638,7 +667,7 @@
class TestBranchUpgrade(TestCaseWithFactory):
"""Test the upgrade functionalities of branches."""
- layer = DatabaseFunctionalLayer
+ layer = ZopelessAppServerLayer
def test_needsUpgrading_empty_formats(self):
branch = self.factory.makePersonalBranch()
@@ -790,6 +819,27 @@
jobs,
[job, ])
+ def test_requestUpgradeUsesCelery(self):
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classses': 'BranchUpgradeJob'}))
+ cwd = os.getcwd()
+ self.useBzrBranches()
+ db_branch, tree = create_knit(self)
+ self.assertEqual(
+ tree.branch.repository._format.get_format_string(),
+ 'Bazaar-NG Knit Repository Format 1')
+
+ db_branch.requestUpgrade(db_branch.owner)
+ with monitor_celery() as responses:
+ transaction.commit()
+ with celeryd('branch_write', cwd):
+ responses[-1].wait(30)
+ new_branch = Branch.open(tree.branch.base)
+ self.assertEqual(
+ new_branch.repository._format.get_format_string(),
+ 'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
+ self.assertFalse(db_branch.needs_upgrading)
+
def test_requestUpgrade_no_upgrade_needed(self):
# If a branch doesn't need to be upgraded, requestUpgrade raises an
# AlreadyLatestFormat.
=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py 2012-02-21 19:13:45 +0000
+++ lib/lp/code/model/tests/test_branchjob.py 2012-04-03 20:26:24 +0000
@@ -60,6 +60,7 @@
RosettaUploadJob,
)
from lp.code.model.branchrevision import BranchRevision
+from lp.code.model.tests.test_branch import create_knit
from lp.code.model.revision import RevisionSet
from lp.codehosting.vfs import branch_id_to_path
from lp.scripts.helpers import TransactionFreeOperation
@@ -193,7 +194,7 @@
def test_upgrades_branch(self):
"""Ensure that a branch with an outdated format is upgraded."""
self.useBzrBranches(direct_database=True)
- db_branch, tree = self.create_knit()
+ db_branch, tree = create_knit(self)
self.assertEqual(
tree.branch.repository._format.get_format_string(),
'Bazaar-NG Knit Repository Format 1')
@@ -222,17 +223,11 @@
AlreadyLatestFormat, BranchUpgradeJob.create, branch,
self.factory.makePerson())
- def create_knit(self):
- db_branch, tree = self.create_branch_and_tree(format='knit')
- db_branch.branch_format = BranchFormat.BZR_BRANCH_5
- db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
- return db_branch, tree
-
def test_existing_bzr_backup(self):
# If the target branch already has a backup.bzr dir, the upgrade copy
# should remove it.
self.useBzrBranches(direct_database=True)
- db_branch, tree = self.create_knit()
+ db_branch, tree = create_knit(self)
# Add a fake backup.bzr dir
source_branch_transport = get_transport(db_branch.getInternalBzrUrl())
=== modified file 'lib/lp/code/model/tests/test_branchpuller.py'
--- lib/lp/code/model/tests/test_branchpuller.py 2012-03-27 17:52:44 +0000
+++ lib/lp/code/model/tests/test_branchpuller.py 2012-04-03 20:26:24 +0000
@@ -89,7 +89,7 @@
transaction.commit()
branch.startMirroring()
removeSecurityProxy(branch).branchChanged(
- '', 'rev1', None, None, None, celery_scan=False)
+ '', 'rev1', None, None, None)
self.assertEqual(None, branch.next_mirror_time)
def test_mirrorFailureResetsMirrorRequest(self):
@@ -158,7 +158,7 @@
transaction.commit()
branch.startMirroring()
removeSecurityProxy(branch).branchChanged(
- '', 'rev1', None, None, None, celery_scan=False)
+ '', 'rev1', None, None, None)
self.assertInFuture(branch.next_mirror_time, self.increment)
self.assertEqual(0, branch.mirror_failures)
=== modified file 'lib/lp/code/model/tests/test_branchtarget.py'
--- lib/lp/code/model/tests/test_branchtarget.py 2012-03-27 17:52:44 +0000
+++ lib/lp/code/model/tests/test_branchtarget.py 2012-04-03 20:26:24 +0000
@@ -129,8 +129,7 @@
default_branch = self.factory.makePackageBranch(
sourcepackage=development_package)
removeSecurityProxy(default_branch).branchChanged(
- '', self.factory.getUniqueString(), None, None, None,
- celery_scan=False)
+ '', self.factory.getUniqueString(), None, None, None)
registrant = development_package.distribution.owner
with person_logged_in(registrant):
development_package.setBranch(
@@ -398,7 +397,7 @@
branch = self.factory.makeProductBranch(product=self.original)
self._setDevelopmentFocus(self.original, branch)
removeSecurityProxy(branch).branchChanged(
- '', 'rev1', None, None, None, celery_scan=False)
+ '', 'rev1', None, None, None)
target = IBranchTarget(self.original)
self.assertEqual(branch, target.default_stacked_on_branch)
@@ -538,8 +537,7 @@
branch = self.factory.makeAnyBranch(branch_type=BranchType.MIRRORED)
branch.startMirroring()
removeSecurityProxy(branch).branchChanged(
- '', self.factory.getUniqueString(), None, None, None,
- celery_scan=False)
+ '', self.factory.getUniqueString(), None, None, None)
removeSecurityProxy(branch).branch_type = BranchType.REMOTE
self.assertIs(None, check_default_stacked_on(branch))
@@ -555,16 +553,14 @@
branch = self.factory.makeAnyBranch(private=True)
naked_branch = removeSecurityProxy(branch)
naked_branch.branchChanged(
- '', self.factory.getUniqueString(), None, None, None,
- celery_scan=False)
+ '', self.factory.getUniqueString(), None, None, None)
self.assertIs(None, check_default_stacked_on(branch))
def test_been_mirrored(self):
# `check_default_stacked_on` returns the branch if it has revisions.
branch = self.factory.makeAnyBranch()
removeSecurityProxy(branch).branchChanged(
- '', self.factory.getUniqueString(), None, None, None,
- celery_scan=False)
+ '', self.factory.getUniqueString(), None, None, None)
self.assertEqual(branch, check_default_stacked_on(branch))
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-03-27 17:40:11 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-04-03 20:26:24 +0000
@@ -2,3 +2,10 @@
BROKER_URL = "amqplib://%s" % config.rabbitmq.host
CELERY_IMPORTS = ("lp.services.job.celeryjob", )
CELERY_RESULT_BACKEND = "amqp"
+CELERY_QUEUES = {
+ "branch_write": {"binding_key": "branch_write"},
+ "standard": {"binding_key": "standard"},
+}
+CELERY_DEFAULT_EXCHANGE = "standard"
+CELERY_DEFAULT_QUEUE = "standard"
+CELERY_CREATE_MISSING_QUEUES = False
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py 2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/interfaces/job.py 2012-04-03 20:26:24 +0000
@@ -171,6 +171,9 @@
def run():
"""Run this job."""
+ def celeryRunOnCommit():
+ """Request Celery to run this job on transaction commit."""
+
class IJobSource(Interface):
"""Interface for creating and getting jobs."""
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-04-03 20:26:23 +0000
+++ lib/lp/services/job/runner.py 2012-04-03 20:26:24 +0000
@@ -103,6 +103,8 @@
retry_error_types = ()
+ task_queue = 'standard'
+
celery_responses = None
# We redefine __eq__ and __ne__ here to prevent the security proxy
@@ -188,6 +190,31 @@
return oops_config.create(
context=dict(exc_info=info))
+ def runViaCelery(self):
+ """Request that this job be run via celery."""
+ # Avoid importing from lp.services.job.celeryjob where not needed, to
+ # avoid configuring Celery when Rabbit is not configured.
+ from lp.services.job.celeryjob import CeleryRunJob
+ ignore_result = bool(BaseRunnableJob.celery_responses is None)
+ response = CeleryRunJob.apply_async(
+ (self.job_id,), queue=self.task_queue,
+ ignore_result=ignore_result)
+ if not ignore_result:
+ BaseRunnableJob.celery_responses.append(response)
+ return response
+
+ def celeryCommitHook(self, succeeded):
+ """Hook function to call when a commit completes."""
+ if succeeded:
+ self.runViaCelery()
+
+ def celeryRunOnCommit(self):
+ """Configure transaction so that commit runs this job via Celery."""
+ if not celery_enabled(self.__class__.__name__):
+ return
+ current = transaction.get()
+ current.addAfterCommitHook(self.celeryCommitHook)
+
class BaseJobRunner(LazrJobRunner):
"""Runner of Jobs."""
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2009-05-13 20:03:39 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-04-03 20:26:24 +0000
@@ -0,0 +1,44 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = ['celeryd', 'monitor_celery']
+
+
+from contextlib import contextmanager
+
+from lp.services.job.runner import BaseRunnableJob
+
+
+@contextmanager
+def celeryd(queue, cwd=None):
+ """Return a ContextManager for a celeryd instance.
+
+ The celeryd instance will be configured to use the currently-configured
+ BROKER_URL, and able to run CeleryRunJob tasks.
+ """
+ from lp.services.job.celeryjob import CeleryRunJob
+ from lazr.jobrunner.tests.test_celerytask import running
+ cmd_args = (
+ '--config', 'lp.services.job.celeryconfig',
+ '--broker', CeleryRunJob.app.conf['BROKER_URL'],
+ '--concurrency', '1',
+ '--loglevel', 'INFO',
+ '--queues', queue,
+ )
+ with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
+ # Wait for celeryd startup to complete.
+ yield proc
+
+
+@contextmanager
+def monitor_celery():
+ """Context manager that provides a list of Celery responses."""
+ responses = []
+ old_responses = BaseRunnableJob.celery_responses
+ BaseRunnableJob.celery_responses = responses
+ try:
+ yield responses
+ finally:
+ BaseRunnableJob.celery_responses = old_responses
=== removed file 'lib/lp/services/job/tests/celeryconfig.py'
--- lib/lp/services/job/tests/celeryconfig.py 2012-03-23 16:26:46 +0000
+++ lib/lp/services/job/tests/celeryconfig.py 1970-01-01 00:00:00 +0000
@@ -1,7 +0,0 @@
-import os
-BROKER_VHOST = "/"
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_IMPORTS = ("lp.services.job.celeryjob", )
-CELERYD_LOG_LEVEL = 'INFO'
-CELERYD_CONCURRENCY = 1
-BROKER_URL = os.environ['BROKER_URL']
=== removed file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-04-03 20:26:23 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 1970-01-01 00:00:00 +0000
@@ -1,56 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-
-from contextlib import contextmanager
-import os
-
-import transaction
-
-from lp.code.model.branchjob import BranchScanJob
-from lp.services.job.runner import BaseRunnableJob
-from lp.testing import TestCaseWithFactory
-from lp.testing.layers import ZopelessAppServerLayer
-
-
-@contextmanager
-def monitor_celery():
- """Context manager that provides a list of Celery responses."""
- responses = []
- old_responses = BaseRunnableJob.celery_responses
- BaseRunnableJob.celery_responses = responses
- try:
- yield responses
- finally:
- BaseRunnableJob.celery_responses = old_responses
-
-
-class TestCelery(TestCaseWithFactory):
-
- layer = ZopelessAppServerLayer
-
- def test_run_scan_job(self):
- """Running a job via Celery succeeds and emits expected output."""
- # Delay importing anything that uses Celery until RabbitMQLayer is
- # running, so that config.rabbitmq.host is defined when
- # lp.services.job.celeryconfig is loaded.
- from lp.services.job.celeryjob import CeleryRunJob
- from celery.exceptions import TimeoutError
- from lazr.jobrunner.tests.test_celerytask import running
- cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
- env = dict(os.environ)
- env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
- with running('bin/celeryd', cmd_args, env=env) as proc:
- self.useBzrBranches()
- db_branch, bzr_tree = self.create_branch_and_tree()
- bzr_tree.commit(
- 'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
- job = BranchScanJob.create(db_branch)
- transaction.commit()
- try:
- CeleryRunJob.delay(job.job_id).wait(30)
- except TimeoutError:
- pass
- self.assertIn(
- 'Updating branch scanner status: 1 revs', proc.stderr.read())
- self.assertEqual(db_branch.revision_count, 1)
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2012-03-30 18:13:38 +0000
+++ lib/lp/testing/factory.py 2012-04-03 20:26:24 +0000
@@ -1407,7 +1407,7 @@
# We just remove the security proxies to be able to change the objects
# here.
removeSecurityProxy(branch).branchChanged(
- '', 'rev1', None, None, None, celery_scan=False)
+ '', 'rev1', None, None, None)
naked_series = removeSecurityProxy(product.development_focus)
naked_series.branch = branch
return branch
@@ -1422,7 +1422,7 @@
# We just remove the security proxies to be able to change the branch
# here.
removeSecurityProxy(branch).branchChanged(
- '', 'rev1', None, None, None, celery_scan=False)
+ '', 'rev1', None, None, None)
with person_logged_in(package.distribution.owner):
package.development_version.setBranch(
PackagePublishingPocket.RELEASE, branch,
@@ -1628,7 +1628,7 @@
if branch.branch_type not in (BranchType.REMOTE, BranchType.HOSTED):
branch.startMirroring()
removeSecurityProxy(branch).branchChanged(
- '', parent.revision_id, None, None, None, celery_scan=False)
+ '', parent.revision_id, None, None, None)
branch.updateScannedDetails(parent, sequence)
def makeBranchRevision(self, branch, revision_id=None, sequence=None,