← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-everywhere into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere into lp:launchpad with lp:~abentley/launchpad/celery-job-feature-flag as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere/+merge/100686

= Summary =
Support running more jobs via Celery

== Pre-implementation notes ==
None

== Implementation details ==
In the prerequisite branch, a feature flag was introduced that controlled which kinds of jobs are run via Celery.  This makes the celery_scan parameter of branchChanged obsolete, so it is removed.

Funtionality for running jobs via celery is extracted from branchChanged to BaseRunnableJob.celeryRunOnCommit (and callees).

A single config file is used, so that settings are common by default.  The celeryd contexmanager overrides broker URL, concurrency and queue.  

Jobs are assigned to the 'standard' queue by default.  BranchUpgradeJob and ReclamBranchSpaceJob are assigned to the branch_write queue.

As a driveby, ReclaimBranchSpaceJob uses get_real_branch_path instead of reimplementing it.

create_knit moves to test_branch to facilitate testing branch upgrades.

Test ReclaimBranchSpaceJob and BranchUpgradeJob via celery.

TestCelery is deleted, because it was really only testing BranchScanJobs.

== Tests ==
bin/test -t elery branch

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/code/interfaces/branch.py
  lib/lp/services/job/interfaces/job.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/testing/factory.py
  lib/lp/code/model/branch.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/tests/test_branchtarget.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/code/model/tests/test_branchpuller.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-everywhere/+merge/100686
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere into lp:launchpad.
=== modified file 'lib/lp/code/interfaces/branch.py'
--- lib/lp/code/interfaces/branch.py	2012-03-27 19:12:33 +0000
+++ lib/lp/code/interfaces/branch.py	2012-04-03 20:26:24 +0000
@@ -1078,7 +1078,7 @@
         """Create an IBranchUpgradeJob to upgrade this branch."""
 
     def branchChanged(stacked_on_url, last_revision_id, control_format,
-                      branch_format, repository_format, celery_scan=True):
+                      branch_format, repository_format):
         """Record that a branch has been changed.
 
         This method records the stacked on branch tip revision id and format
@@ -1092,9 +1092,6 @@
         :param branch_format: The entry from BranchFormat for the branch.
         :param repository_format: The entry from RepositoryFormat for the
             branch.
-        :param celery_scan: If True, request a branch scan via Celery.
-            Otherwise, a BranchScanJob may be created, but not requested to
-            run.  Should only be False in certain tests.
         """
 
     @export_destructor_operation()

=== modified file 'lib/lp/code/model/branch.py'
--- lib/lp/code/model/branch.py	2012-04-03 20:26:23 +0000
+++ lib/lp/code/model/branch.py	2012-04-03 20:26:24 +0000
@@ -40,7 +40,6 @@
     Reference,
     )
 from storm.store import Store
-import transaction
 from zope.component import getUtility
 from zope.event import notify
 from zope.interface import implements
@@ -150,10 +149,6 @@
 from lp.services.helpers import shortlist
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import Job
-from lp.services.job.runner import (
-    BaseRunnableJob,
-    celery_enabled,
-    )
 from lp.services.mail.notificationrecipientset import NotificationRecipientSet
 from lp.services.propertycache import cachedproperty
 from lp.services.webapp import urlappend
@@ -1037,8 +1032,7 @@
             return getUtility(IBranchLookup).getByUniqueName(location)
 
     def branchChanged(self, stacked_on_url, last_revision_id,
-                      control_format, branch_format, repository_format,
-                      celery_scan=True):
+                      control_format, branch_format, repository_format):
         """See `IBranch`."""
         self.mirror_status_message = None
         if stacked_on_url == '' or stacked_on_url is None:
@@ -1063,20 +1057,8 @@
         self.last_mirrored_id = last_revision_id
         if self.last_scanned_id != last_revision_id:
             from lp.code.model.branchjob import BranchScanJob
-            job_id = BranchScanJob.create(self).job_id
-            if celery_scan and celery_enabled('BranchScanJob'):
-                # lp.services.job.celery is imported only where needed.
-                from lp.services.job.celeryjob import CeleryRunJob
-                current = transaction.get()
-
-                def runHook(succeeded):
-                    ignore_result = (BaseRunnableJob.celery_responses is None)
-                    if succeeded:
-                        response = CeleryRunJob.apply_async(
-                            (job_id,), ignore_result=ignore_result)
-                        if not ignore_result:
-                            BaseRunnableJob.celery_responses.append(response)
-                current.addAfterCommitHook(runHook)
+            job = BranchScanJob.create(self)
+            job.celeryRunOnCommit()
         self.control_format = control_format
         self.branch_format = branch_format
         self.repository_format = repository_format
@@ -1151,7 +1133,8 @@
         branch_id = self.id
         SQLBase.destroySelf(self)
         # And now create a job to remove the branch from disk when it's done.
-        getUtility(IReclaimBranchSpaceJobSource).create(branch_id)
+        job = getUtility(IReclaimBranchSpaceJobSource).create(branch_id)
+        job.celeryRunOnCommit()
 
     def commitsForDays(self, since):
         """See `IBranch`."""
@@ -1205,7 +1188,9 @@
     def requestUpgrade(self, requester):
         """See `IBranch`."""
         from lp.code.interfaces.branchjob import IBranchUpgradeJobSource
-        return getUtility(IBranchUpgradeJobSource).create(self, requester)
+        job = getUtility(IBranchUpgradeJobSource).create(self, requester)
+        job.celeryRunOnCommit()
+        return job
 
     def _checkBranchVisibleByUser(self, user):
         """Is *this* branch visible by the user.

=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py	2012-03-27 14:36:41 +0000
+++ lib/lp/code/model/branchjob.py	2012-04-03 20:26:24 +0000
@@ -83,13 +83,12 @@
 from lp.codehosting.bzrutils import server
 from lp.codehosting.scanner.bzrsync import BzrSync
 from lp.codehosting.vfs import (
-    branch_id_to_path,
     get_ro_server,
     get_rw_server,
     )
+from lp.codehosting.vfs.branchfs import get_real_branch_path
 from lp.registry.interfaces.productseries import IProductSeriesSet
 from lp.scripts.helpers import TransactionFreeOperation
-from lp.services.config import config
 from lp.services.database.enumcol import EnumCol
 from lp.services.database.lpstorm import IStore
 from lp.services.database.sqlbase import SQLBase
@@ -328,6 +327,8 @@
 
     user_error_types = (NotBranchError,)
 
+    task_queue = 'branch_write'
+
     def getOperationDescription(self):
         return 'upgrading a branch'
 
@@ -948,6 +949,8 @@
 
     class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE
 
+    task_queue = 'branch_write'
+
     def __repr__(self):
         return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % {
             'id': self.context.id,
@@ -970,8 +973,6 @@
         return self.metadata['branch_id']
 
     def run(self):
-        branch_path = os.path.join(
-            config.codehosting.mirrored_branches_root,
-            branch_id_to_path(self.branch_id))
+        branch_path = get_real_branch_path(self.branch_id)
         if os.path.exists(branch_path):
             shutil.rmtree(branch_path)

=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2012-04-03 20:26:23 +0000
+++ lib/lp/code/model/tests/test_branch.py	2012-04-03 20:26:24 +0000
@@ -14,6 +14,7 @@
     )
 import os
 
+from bzrlib.branch import Branch
 from bzrlib.bzrdir import BzrDir
 from bzrlib.revision import NULL_REVISION
 from pytz import UTC
@@ -21,6 +22,10 @@
 from sqlobject import SQLObjectNotFound
 from storm.locals import Store
 from testtools import ExpectedException
+from testtools.matchers import (
+    Not,
+    PathExists,
+    )
 import transaction
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
@@ -104,6 +109,7 @@
 from lp.code.model.revision import Revision
 from lp.code.tests.helpers import add_revision_to_branch
 from lp.codehosting.safe_open import BadUrl
+from lp.codehosting.vfs.branchfs import get_real_branch_path
 from lp.registry.interfaces.person import PersonVisibility
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.registry.model.sourcepackage import SourcePackage
@@ -111,6 +117,10 @@
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.lpstorm import IStore
 from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import (
+    celeryd,
+    monitor_celery,
+    )
 from lp.services.osutils import override_environ
 from lp.services.propertycache import clear_property_cache
 from lp.services.webapp.interfaces import IOpenLaunchBag
@@ -141,6 +151,13 @@
     )
 
 
+def create_knit(test_case):
+    db_branch, tree = test_case.create_branch_and_tree(format='knit')
+    db_branch.branch_format = BranchFormat.BZR_BRANCH_5
+    db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
+    return db_branch, tree
+
+
 class TestCodeImport(TestCase):
 
     layer = LaunchpadZopelessLayer
@@ -297,16 +314,10 @@
         # Delay importing anything that uses Celery until RabbitMQLayer is
         # running, so that config.rabbitmq.host is defined when
         # lp.services.job.celeryconfig is loaded.
-        from lp.services.job.celeryjob import CeleryRunJob
-        from lp.services.job.tests.test_celeryjob import monitor_celery
         from celery.exceptions import TimeoutError
-        from lazr.jobrunner.tests.test_celerytask import running
-        self.useFixture(FeatureFixture({'jobs.celery.enabled_classses':
-            'BranchScanJob'}))
-        cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
-        env = dict(os.environ)
-        env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
-        with running('bin/celeryd', cmd_args, env=env) as proc:
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classses': 'BranchScanJob'}))
+        with celeryd('standard') as proc:
             self.useBzrBranches()
             db_branch, bzr_tree = self.create_branch_and_tree()
             bzr_tree.commit(
@@ -327,7 +338,6 @@
         # Delay importing anything that uses Celery until RabbitMQLayer is
         # running, so that config.rabbitmq.host is defined when
         # lp.services.job.celeryconfig is loaded.
-        from lp.services.job.tests.test_celeryjob import monitor_celery
         self.useBzrBranches()
         db_branch, bzr_tree = self.create_branch_and_tree()
         bzr_tree.commit(
@@ -337,6 +347,25 @@
             transaction.commit()
             self.assertEqual([], responses)
 
+    def test_destroySelf_via_celery(self):
+        """Calling destroySelf causes Celery to delete the branch."""
+        from celery.exceptions import TimeoutError
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'}))
+        with celeryd('branch_write'):
+            self.useBzrBranches()
+            db_branch, tree = self.create_branch_and_tree()
+            branch_path = get_real_branch_path(db_branch.id)
+            self.assertThat(branch_path, PathExists())
+            db_branch.destroySelf()
+            with monitor_celery() as responses:
+                transaction.commit()
+                try:
+                    responses[-1].wait(30)
+                except TimeoutError:
+                    pass
+        self.assertThat(branch_path, Not(PathExists()))
+
 
 class TestBranchRevisionMethods(TestCaseWithFactory):
     """Test the branch methods for adding and removing branch revisions."""
@@ -638,7 +667,7 @@
 class TestBranchUpgrade(TestCaseWithFactory):
     """Test the upgrade functionalities of branches."""
 
-    layer = DatabaseFunctionalLayer
+    layer = ZopelessAppServerLayer
 
     def test_needsUpgrading_empty_formats(self):
         branch = self.factory.makePersonalBranch()
@@ -790,6 +819,27 @@
             jobs,
             [job, ])
 
+    def test_requestUpgradeUsesCelery(self):
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classses': 'BranchUpgradeJob'}))
+        cwd = os.getcwd()
+        self.useBzrBranches()
+        db_branch, tree = create_knit(self)
+        self.assertEqual(
+            tree.branch.repository._format.get_format_string(),
+            'Bazaar-NG Knit Repository Format 1')
+
+        db_branch.requestUpgrade(db_branch.owner)
+        with monitor_celery() as responses:
+            transaction.commit()
+            with celeryd('branch_write', cwd):
+                responses[-1].wait(30)
+        new_branch = Branch.open(tree.branch.base)
+        self.assertEqual(
+            new_branch.repository._format.get_format_string(),
+            'Bazaar repository format 2a (needs bzr 1.16 or later)\n')
+        self.assertFalse(db_branch.needs_upgrading)
+
     def test_requestUpgrade_no_upgrade_needed(self):
         # If a branch doesn't need to be upgraded, requestUpgrade raises an
         # AlreadyLatestFormat.

=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
--- lib/lp/code/model/tests/test_branchjob.py	2012-02-21 19:13:45 +0000
+++ lib/lp/code/model/tests/test_branchjob.py	2012-04-03 20:26:24 +0000
@@ -60,6 +60,7 @@
     RosettaUploadJob,
     )
 from lp.code.model.branchrevision import BranchRevision
+from lp.code.model.tests.test_branch import create_knit
 from lp.code.model.revision import RevisionSet
 from lp.codehosting.vfs import branch_id_to_path
 from lp.scripts.helpers import TransactionFreeOperation
@@ -193,7 +194,7 @@
     def test_upgrades_branch(self):
         """Ensure that a branch with an outdated format is upgraded."""
         self.useBzrBranches(direct_database=True)
-        db_branch, tree = self.create_knit()
+        db_branch, tree = create_knit(self)
         self.assertEqual(
             tree.branch.repository._format.get_format_string(),
             'Bazaar-NG Knit Repository Format 1')
@@ -222,17 +223,11 @@
             AlreadyLatestFormat, BranchUpgradeJob.create, branch,
             self.factory.makePerson())
 
-    def create_knit(self):
-        db_branch, tree = self.create_branch_and_tree(format='knit')
-        db_branch.branch_format = BranchFormat.BZR_BRANCH_5
-        db_branch.repository_format = RepositoryFormat.BZR_KNIT_1
-        return db_branch, tree
-
     def test_existing_bzr_backup(self):
         # If the target branch already has a backup.bzr dir, the upgrade copy
         # should remove it.
         self.useBzrBranches(direct_database=True)
-        db_branch, tree = self.create_knit()
+        db_branch, tree = create_knit(self)
 
         # Add a fake backup.bzr dir
         source_branch_transport = get_transport(db_branch.getInternalBzrUrl())

=== modified file 'lib/lp/code/model/tests/test_branchpuller.py'
--- lib/lp/code/model/tests/test_branchpuller.py	2012-03-27 17:52:44 +0000
+++ lib/lp/code/model/tests/test_branchpuller.py	2012-04-03 20:26:24 +0000
@@ -89,7 +89,7 @@
         transaction.commit()
         branch.startMirroring()
         removeSecurityProxy(branch).branchChanged(
-            '', 'rev1', None, None, None, celery_scan=False)
+            '', 'rev1', None, None, None)
         self.assertEqual(None, branch.next_mirror_time)
 
     def test_mirrorFailureResetsMirrorRequest(self):
@@ -158,7 +158,7 @@
         transaction.commit()
         branch.startMirroring()
         removeSecurityProxy(branch).branchChanged(
-            '', 'rev1', None, None, None, celery_scan=False)
+            '', 'rev1', None, None, None)
         self.assertInFuture(branch.next_mirror_time, self.increment)
         self.assertEqual(0, branch.mirror_failures)
 

=== modified file 'lib/lp/code/model/tests/test_branchtarget.py'
--- lib/lp/code/model/tests/test_branchtarget.py	2012-03-27 17:52:44 +0000
+++ lib/lp/code/model/tests/test_branchtarget.py	2012-04-03 20:26:24 +0000
@@ -129,8 +129,7 @@
         default_branch = self.factory.makePackageBranch(
             sourcepackage=development_package)
         removeSecurityProxy(default_branch).branchChanged(
-            '', self.factory.getUniqueString(), None, None, None,
-            celery_scan=False)
+            '', self.factory.getUniqueString(), None, None, None)
         registrant = development_package.distribution.owner
         with person_logged_in(registrant):
             development_package.setBranch(
@@ -398,7 +397,7 @@
         branch = self.factory.makeProductBranch(product=self.original)
         self._setDevelopmentFocus(self.original, branch)
         removeSecurityProxy(branch).branchChanged(
-            '', 'rev1', None, None, None, celery_scan=False)
+            '', 'rev1', None, None, None)
         target = IBranchTarget(self.original)
         self.assertEqual(branch, target.default_stacked_on_branch)
 
@@ -538,8 +537,7 @@
         branch = self.factory.makeAnyBranch(branch_type=BranchType.MIRRORED)
         branch.startMirroring()
         removeSecurityProxy(branch).branchChanged(
-            '', self.factory.getUniqueString(), None, None, None,
-            celery_scan=False)
+            '', self.factory.getUniqueString(), None, None, None)
         removeSecurityProxy(branch).branch_type = BranchType.REMOTE
         self.assertIs(None, check_default_stacked_on(branch))
 
@@ -555,16 +553,14 @@
         branch = self.factory.makeAnyBranch(private=True)
         naked_branch = removeSecurityProxy(branch)
         naked_branch.branchChanged(
-            '', self.factory.getUniqueString(), None, None, None,
-            celery_scan=False)
+            '', self.factory.getUniqueString(), None, None, None)
         self.assertIs(None, check_default_stacked_on(branch))
 
     def test_been_mirrored(self):
         # `check_default_stacked_on` returns the branch if it has revisions.
         branch = self.factory.makeAnyBranch()
         removeSecurityProxy(branch).branchChanged(
-            '', self.factory.getUniqueString(), None, None, None,
-            celery_scan=False)
+            '', self.factory.getUniqueString(), None, None, None)
         self.assertEqual(branch, check_default_stacked_on(branch))
 
 

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-03-27 17:40:11 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-04-03 20:26:24 +0000
@@ -2,3 +2,10 @@
 BROKER_URL = "amqplib://%s" % config.rabbitmq.host
 CELERY_IMPORTS = ("lp.services.job.celeryjob", )
 CELERY_RESULT_BACKEND = "amqp"
+CELERY_QUEUES = {
+    "branch_write": {"binding_key": "branch_write"},
+    "standard": {"binding_key": "standard"},
+}
+CELERY_DEFAULT_EXCHANGE = "standard"
+CELERY_DEFAULT_QUEUE = "standard"
+CELERY_CREATE_MISSING_QUEUES = False

=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py	2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/interfaces/job.py	2012-04-03 20:26:24 +0000
@@ -171,6 +171,9 @@
     def run():
         """Run this job."""
 
+    def celeryRunOnCommit():
+        """Request Celery to run this job on transaction commit."""
+
 
 class IJobSource(Interface):
     """Interface for creating and getting jobs."""

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-04-03 20:26:23 +0000
+++ lib/lp/services/job/runner.py	2012-04-03 20:26:24 +0000
@@ -103,6 +103,8 @@
 
     retry_error_types = ()
 
+    task_queue = 'standard'
+
     celery_responses = None
 
     # We redefine __eq__ and __ne__ here to prevent the security proxy
@@ -188,6 +190,31 @@
         return oops_config.create(
             context=dict(exc_info=info))
 
+    def runViaCelery(self):
+        """Request that this job be run via celery."""
+        # Avoid importing from lp.services.job.celeryjob where not needed, to
+        # avoid configuring Celery when Rabbit is not configured.
+        from lp.services.job.celeryjob import CeleryRunJob
+        ignore_result = bool(BaseRunnableJob.celery_responses is None)
+        response = CeleryRunJob.apply_async(
+            (self.job_id,), queue=self.task_queue,
+            ignore_result=ignore_result)
+        if not ignore_result:
+            BaseRunnableJob.celery_responses.append(response)
+        return response
+
+    def celeryCommitHook(self, succeeded):
+        """Hook function to call when a commit completes."""
+        if succeeded:
+            self.runViaCelery()
+
+    def celeryRunOnCommit(self):
+        """Configure transaction so that commit runs this job via Celery."""
+        if not celery_enabled(self.__class__.__name__):
+            return
+        current = transaction.get()
+        current.addAfterCommitHook(self.celeryCommitHook)
+
 
 class BaseJobRunner(LazrJobRunner):
     """Runner of Jobs."""

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2009-05-13 20:03:39 +0000
+++ lib/lp/services/job/tests/__init__.py	2012-04-03 20:26:24 +0000
@@ -0,0 +1,44 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = ['celeryd', 'monitor_celery']
+
+
+from contextlib import contextmanager
+
+from lp.services.job.runner import BaseRunnableJob
+
+
+@contextmanager
+def celeryd(queue, cwd=None):
+    """Return a ContextManager for a celeryd instance.
+
+    The celeryd instance will be configured to use the currently-configured
+    BROKER_URL, and able to run CeleryRunJob tasks.
+    """
+    from lp.services.job.celeryjob import CeleryRunJob
+    from lazr.jobrunner.tests.test_celerytask import running
+    cmd_args = (
+        '--config', 'lp.services.job.celeryconfig',
+        '--broker', CeleryRunJob.app.conf['BROKER_URL'],
+        '--concurrency', '1',
+        '--loglevel', 'INFO',
+        '--queues', queue,
+    )
+    with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
+        # Wait for celeryd startup to complete.
+        yield proc
+
+
+@contextmanager
+def monitor_celery():
+    """Context manager that provides a list of Celery responses."""
+    responses = []
+    old_responses = BaseRunnableJob.celery_responses
+    BaseRunnableJob.celery_responses = responses
+    try:
+        yield responses
+    finally:
+        BaseRunnableJob.celery_responses = old_responses

=== removed file 'lib/lp/services/job/tests/celeryconfig.py'
--- lib/lp/services/job/tests/celeryconfig.py	2012-03-23 16:26:46 +0000
+++ lib/lp/services/job/tests/celeryconfig.py	1970-01-01 00:00:00 +0000
@@ -1,7 +0,0 @@
-import os
-BROKER_VHOST = "/"
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_IMPORTS = ("lp.services.job.celeryjob", )
-CELERYD_LOG_LEVEL = 'INFO'
-CELERYD_CONCURRENCY = 1
-BROKER_URL = os.environ['BROKER_URL']

=== removed file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-04-03 20:26:23 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	1970-01-01 00:00:00 +0000
@@ -1,56 +0,0 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-
-from contextlib import contextmanager
-import os
-
-import transaction
-
-from lp.code.model.branchjob import BranchScanJob
-from lp.services.job.runner import BaseRunnableJob
-from lp.testing import TestCaseWithFactory
-from lp.testing.layers import ZopelessAppServerLayer
-
-
-@contextmanager
-def monitor_celery():
-    """Context manager that provides a list of Celery responses."""
-    responses = []
-    old_responses = BaseRunnableJob.celery_responses
-    BaseRunnableJob.celery_responses = responses
-    try:
-        yield responses
-    finally:
-        BaseRunnableJob.celery_responses = old_responses
-
-
-class TestCelery(TestCaseWithFactory):
-
-    layer = ZopelessAppServerLayer
-
-    def test_run_scan_job(self):
-        """Running a job via Celery succeeds and emits expected output."""
-        # Delay importing anything that uses Celery until RabbitMQLayer is
-        # running, so that config.rabbitmq.host is defined when
-        # lp.services.job.celeryconfig is loaded.
-        from lp.services.job.celeryjob import CeleryRunJob
-        from celery.exceptions import TimeoutError
-        from lazr.jobrunner.tests.test_celerytask import running
-        cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
-        env = dict(os.environ)
-        env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
-        with running('bin/celeryd', cmd_args, env=env) as proc:
-            self.useBzrBranches()
-            db_branch, bzr_tree = self.create_branch_and_tree()
-            bzr_tree.commit(
-                'First commit', rev_id='rev1', committer='me@xxxxxxxxxxx')
-            job = BranchScanJob.create(db_branch)
-            transaction.commit()
-            try:
-                CeleryRunJob.delay(job.job_id).wait(30)
-            except TimeoutError:
-                pass
-        self.assertIn(
-            'Updating branch scanner status: 1 revs', proc.stderr.read())
-        self.assertEqual(db_branch.revision_count, 1)

=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py	2012-03-30 18:13:38 +0000
+++ lib/lp/testing/factory.py	2012-04-03 20:26:24 +0000
@@ -1407,7 +1407,7 @@
         # We just remove the security proxies to be able to change the objects
         # here.
         removeSecurityProxy(branch).branchChanged(
-            '', 'rev1', None, None, None, celery_scan=False)
+            '', 'rev1', None, None, None)
         naked_series = removeSecurityProxy(product.development_focus)
         naked_series.branch = branch
         return branch
@@ -1422,7 +1422,7 @@
         # We just remove the security proxies to be able to change the branch
         # here.
         removeSecurityProxy(branch).branchChanged(
-            '', 'rev1', None, None, None, celery_scan=False)
+            '', 'rev1', None, None, None)
         with person_logged_in(package.distribution.owner):
             package.development_version.setBranch(
                 PackagePublishingPocket.RELEASE, branch,
@@ -1628,7 +1628,7 @@
         if branch.branch_type not in (BranchType.REMOTE, BranchType.HOSTED):
             branch.startMirroring()
         removeSecurityProxy(branch).branchChanged(
-            '', parent.revision_id, None, None, None, celery_scan=False)
+            '', parent.revision_id, None, None, None)
         branch.updateScannedDetails(parent, sequence)
 
     def makeBranchRevision(self, branch, revision_id=None, sequence=None,