← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~julian-edwards/launchpad/buildd-manager-parallel-scan into lp:launchpad/devel

 

Julian Edwards has proposed merging lp:~julian-edwards/launchpad/buildd-manager-parallel-scan into lp:launchpad/devel.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)


= Summary =
New and improved buildd-manager with extra wow!

== Proposed fix ==
The buildd-manager is designed so that it scans all the builders in sequence 
one after the other.  This has a couple of problems:

 1. Bad isolation of failures and comms errors.  A problem with a single 
builder or job going wrong can terminate the whole scan and leave the 
remainder of the builders in the sequence unscanned.
 2. Downloading files from one builder can't be done in parallel with 
uploading files to another.

I'm changing it here so that each builder is now scanned individually.

== Pre-implementation notes ==
Countless hours going through the nightmare code with jml and mwh.

== Implementation details ==
I'm very sorry.  It's a big branch (1100 lines) and in my attempts to make the 
application code more understandable, the accompanying test fixes are now even 
more obtuse.

Here's the basic outline of the changes:

 * the old BuilddManager class is now a simple wrapper class that creates 
SlaveScanner instances for each builder, and a NewBuildersScanner.
 * The new SlaveScanner class is similar to the old BuilddManager except it's 
simplified to only deal with one builder instead of all of them.  I renamed a 
lot of the methods and properties to make it more obvious what they are and 
what they do.
 * The new NewBuildersScanner is a singleton (not enforced) that periodically 
checks to see if new builders were added so that it can create a SlaveScanner 
for them.

== Tests ==
bin/test -cvv test_manager

The tests will make your eyes bleed.  I've pretty much hacked them to death to 
get them working with the new code.  mwh and I both decided this was actually 
easier considering it would need a total re-write to make them look nicer and 
this stuff is going to get junked when we have the new unified job/build 
system in place.

There's also some new tests for the NewBuildersScanner code which I think 
you'll find a bit more pleasant.

== Demo and Q/A ==
It's currently running on dogfood and I've smashed the builders around to 
force errors while queueing up a load of builds, and it appears to cope 
admirably so far.

= Launchpad lint =

This is buggy output, it doesn't like defs inside defs.

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/buildmaster/tests/test_manager.py
  lib/lp/buildmaster/manager.py

./lib/lp/buildmaster/tests/test_manager.py
     443: E301 expected 1 blank line, found 0
     844: E301 expected 1 blank line, found 0
./lib/lp/buildmaster/manager.py
      55: E301 expected 1 blank line, found 0
     570: E301 expected 1 blank line, found 2
-- 
https://code.launchpad.net/~julian-edwards/launchpad/buildd-manager-parallel-scan/+merge/30672
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~julian-edwards/launchpad/buildd-manager-parallel-scan into lp:launchpad/devel.
=== modified file 'lib/lp/buildmaster/manager.py'
--- lib/lp/buildmaster/manager.py	2010-05-27 13:13:47 +0000
+++ lib/lp/buildmaster/manager.py	2010-07-22 15:26:12 +0000
@@ -29,8 +29,13 @@
 
 from canonical.config import config
 from canonical.launchpad.webapp import urlappend
+from canonical.launchpad.webapp.interfaces import (
+    IStoreSelector, MAIN_STORE, DEFAULT_FLAVOR)
 from canonical.librarian.db import write_transaction
+from lp.buildmaster.model.buildqueue import BuildQueue
 from lp.buildmaster.interfaces.buildbase import BUILDD_MANAGER_LOG_NAME
+from lp.services.job.model.job import Job
+from lp.services.job.interfaces.job import JobStatus
 from lp.services.twistedsupport.processmonitor import ProcessWithTimeout
 
 
@@ -208,172 +213,225 @@
         self._cleanJob(builder.currentjob)
 
 
-class BuilddManager(service.Service):
-    """Build slave manager."""
-
-    # Dispatch result factories, used to build objects of type
-    # BaseDispatchResult representing the result of a dispatching chain.
+class SlaveScanner:
+    """A manager for a single builder."""
+
+    SCAN_INTERVAL = 5
+
+    # These are for the benefit of tests, it seems.
     reset_result = ResetDispatchResult
     fail_result = FailDispatchResult
 
-    def __init__(self):
-        # Store for running chains.
-        self._deferreds = []
-
-        # Keep track of build slaves that need handling in a scan/dispatch
-        # cycle.
-        self.remaining_slaves = []
-
-        self.logger = self._setupLogger()
-
-    def _setupLogger(self):
-        """Setup a 'slave-scanner' logger that redirects to twisted.
-
-        It is going to be used locally and within the thread running
-        the scan() method.
-
-        Make it less verbose to avoid messing too much with the old code.
-        """
-        level = logging.INFO
-        logger = logging.getLogger(BUILDD_MANAGER_LOG_NAME)
-
-        # Redirect the output to the twisted log module.
-        channel = logging.StreamHandler(log.StdioOnnaStick())
-        channel.setLevel(level)
-        channel.setFormatter(logging.Formatter('%(message)s'))
-
-        logger.addHandler(channel)
-        logger.setLevel(level)
-        return logger
-
-    def startService(self):
-        """Service entry point, run at the start of a scan/dispatch cycle."""
-        self.logger.info('Starting scanning cycle.')
+    def __init__(self, builder_name, logger):
+        # XXX in the original code, it doesn't set the slave as as
+        # RecordingSlave until tries to dispatch a build.  This is the
+        # only part that's using Deferreds right now, and can be
+        # improved if we add more methods to RecordingSlave.  In fact we
+        # should use it exclusively and remove the wrapper hack.
+        self.builder_name = builder_name
+        self.logger = logger
+        self._deferred_list = []
+
+    def scheduleNextScanCycle(self):
+        self._deferred_list = []
+        reactor.callLater(self.SCAN_INTERVAL, self.startCycle)
+
+    def startCycle(self):
+        """Main entry point for each scan cycle on this builder."""
+        self.logger.debug("Scanning builder: %s" % self.builder_name)
 
         d = defer.maybeDeferred(self.scan)
         d.addCallback(self.resumeAndDispatch)
         d.addErrback(self.scanFailed)
 
+    @write_transaction
+    def scan(self):
+        """Probe the builder and update/dispatch/collect as appropriate.
+
+        The whole method is wrapped in a transaction, but we do partial
+        commits to avoid holding locks on tables.
+        """
+        # We need to re-fetch the builder object on each cycle as the
+        # Storm store is invalidated over transaction boundaries.
+
+        # Avoid circular import.
+        from lp.buildmaster.interfaces.builder import IBuilderSet
+        builder_set = getUtility(IBuilderSet)
+        self.builder = builder_set[self.builder_name]
+
+        if self.builder.builderok:
+            self.builder.updateStatus(self.logger)
+            transaction.commit()
+
+        # See if we think there's an active build on the builder.
+        store = store = getUtility(
+            IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
+        buildqueue = store.find(
+            BuildQueue,
+            BuildQueue.job == Job.id,
+            BuildQueue.builder == self.builder.id,
+            Job._status == JobStatus.RUNNING,
+            Job.date_started != None).one()
+
+        # Scan the slave and get the logtail, or collect the build if
+        # it's ready.
+        if buildqueue is not None:
+            self.builder.updateBuild(buildqueue)
+            transaction.commit()
+
+        # If the builder is in manual mode, don't dispatch anything.
+        if self.builder.manual:
+            self.logger.debug(
+            '%s is in manual mode, not dispatching.' % self.builder.name)
+            return None
+
+        # If the builder is marked unavailable, don't dispatch anything.
+        # Additionally see if we think there's a job running on it.  If
+        # there is then it means the builder was just taken out of the
+        # pool and we need to put the job back in the queue.
+        if not self.builder.is_available:
+            job = self.builder.currentjob
+            if job is not None and not self.builder.builderok:
+                self.logger.info(
+                    "%s was made unavailable, resetting attached "
+                    "job" % self.builder.name)
+                job.reset()
+                transaction.commit()
+            return None
+
+        # See if there is a job we can dispatch to the builder slave.
+        slave = RecordingSlave(
+            self.builder.name, self.builder.url, self.builder.vm_host)
+        candidate = self.builder.findAndStartJob(buildd_slave=slave)
+        if self.builder.currentjob is not None:
+            transaction.commit()
+            return slave
+
+        return None
+
     def scanFailed(self, error):
         """Deal with scanning failures."""
-        self.logger.info(
-            'Scanning failed with: %s\n%s' %
+        self.logger.info("Scanning failed with: %s\n%s" %
             (error.getErrorMessage(), error.getTraceback()))
-        self.finishCycle()
-
-    def nextCycle(self):
-        """Schedule the next scanning cycle."""
-        self.logger.debug('Next cycle in 5 seconds.')
-        reactor.callLater(5, self.startService)
-
-    def slaveDone(self, slave):
-        """Mark slave as done for this cycle.
-
-        When all active slaves are processed, call `finishCycle`.
-        """
-        self.remaining_slaves.remove(slave)
-
-        self.logger.info(
-            '%r marked as done. [%d]' % (slave, len(self.remaining_slaves)))
-
-        if len(self.remaining_slaves) == 0:
-            self.finishCycle()
-
-    def finishCycle(self, r=None):
-        """Finishes a slave-scanning cycle.
-
-        Once all the active events were executed:
-
-         * Evaluate pending builder update results;
-         * Clean the list of active events (_deferreds);
-         * Call `nextCycle`.
-        """
-        def done(deferred_results):
-            """Called when all events quiesce.
-
-            Perform the finishing-cycle tasks mentioned above.
-            """
-            self.logger.debug('Scanning cycle finished.')
-            # We are only interested in returned objects of type
-            # BaseDispatchResults, those are the ones that needs evaluation.
-            # None, resulting from successful chains, are discarded.
-            dispatch_results = [
-                result for status, result in deferred_results
-                if isinstance(result, BaseDispatchResult)]
-
-            # Evaluate then, which will synchronize the database information.
-            for result in dispatch_results:
-                self.logger.info('%r' % result)
+        # We should probably detect continuous failures here and mark
+        # the builder down.
+        self.scheduleNextScanCycle()
+
+    def resumeAndDispatch(self, slave):
+        """Chain the resume and dispatching Deferreds.
+
+        `extra_callback` is only set by the test suite.
+        """
+        if slave is not None:
+            if slave.resume_requested:
+                # The slave needs to be reset before we can dispatch to
+                # it (e.g. a virtual slave)
+                d = slave.resumeSlave()
+                d.addBoth(self.checkResume, slave)
+            else:
+                # No resume required, build dispatching can commence.
+                d = defer.succeed(None)
+
+            # Use Twisted events to dispatch the build to the slave.
+            d.addCallback(self.initiateDispatch, slave)
+            # Store this deferred so we can wait for it along with all
+            # the others that will be generated.
+            self._deferred_list.append(d)
+        else:
+            # The scan for this builder didn't want to dispatch
+            # anything, so we can finish this cycle.
+            self.scheduleNextScanCycle()
+
+    def initiateDispatch(self, resume_result, slave):
+        """Start dispatching a build to a slave.
+
+        If the previous task in chain (slave resuming) has failed it will
+        receive a `ResetBuilderRequest` instance as 'resume_result' and
+        will immediately return that so the subsequent callback can collect
+        it.
+
+        If the slave resuming succeeded, it starts the XMLRPC dialogue.  The
+        dialogue may consist of many calls to the slave before the build
+        starts.  Each call is done via a Deferred event, where slave calls
+        are sent in callSlave(), and checked in checkDispatch() which will
+        keep firing events via callSlave() until all the events are done or
+        an error occurrs.
+        """
+        if resume_result is not None:
+            self.waitOnDeferredList()
+            return resume_result
+
+        self.logger.info('Dispatching: %s' % slave)
+        self.callSlave(slave)
+
+    def _getProxyForSlave(self, slave):
+        """Return a twisted.web.xmlrpc.Proxy for the buildd slave.
+
+        Uses a protocol with timeout support, See QueryFactoryWithTimeout.
+        """
+        proxy = xmlrpc.Proxy(str(urlappend(slave.url, 'rpc')))
+        proxy.queryFactory = QueryFactoryWithTimeout
+        return proxy
+
+    def callSlave(self, slave):
+        """Dispatch the next XMLRPC for the given slave."""
+        if len(slave.calls) == 0:
+            # That's the end of the dialogue with the slave.
+            self.waitOnDeferredList()
+            return
+
+        # Get an XMPRPC proxy for the buildd slave.
+        proxy = self._getProxyForSlave(slave)
+        method, args = slave.calls.pop(0)
+        d = proxy.callRemote(method, *args)
+        d.addBoth(self.checkDispatch, method, slave)
+        self._deferred_list.append(d)
+        self.logger.debug('%s -> %s(%s)' % (slave, method, args))
+
+    def waitOnDeferredList(self):
+        """After all the Deferreds are set up, wait on them."""
+        dl = defer.DeferredList(self._deferred_list, consumeErrors=True)
+        dl.addBoth(self.evaluateDispatchResult)
+        return dl
+
+    def evaluateDispatchResult(self, deferred_list_results):
+        """Process the DispatchResult for this dispatch chain.
+
+        After waiting for the Deferred chain to finish, we'll have a
+        DispatchResult to evaluate, which deals with the result of
+        dispatching.
+        """
+        # The `deferred_list_results` is what we get when waiting on a
+        # DeferredList.  It's a list of tuples of (status, result) where
+        # result is what the last callback in that chain returned.
+
+        # If the result is an instance of BaseDispatchResult we need to
+        # evaluate it, as there's further action required at the end of
+        # the dispatch chain.  None, resulting from successful chains,
+        # are discarded.
+
+        dispatch_results = [
+            result for status, result in deferred_list_results
+            if isinstance(result, BaseDispatchResult)]
+
+        for result in dispatch_results:
+            if isinstance(result, BaseDispatchResult):
+                self.logger.info("%r" % result)
                 result()
 
-            # Clean the events stored for this cycle and schedule the
-            # next one.
-            self._deferreds = []
-            self.nextCycle()
-
-            # Return the evaluated events for testing purpose.
-            return deferred_results
-
-        self.logger.debug('Finishing scanning cycle.')
-        dl = defer.DeferredList(self._deferreds, consumeErrors=True)
-        dl.addBoth(done)
-        return dl
-
-    @write_transaction
-    def scan(self):
-        """Scan all builders and dispatch build jobs to the idle ones.
-
-        All builders are polled for status and any required post-processing
-        actions are performed.
-
-        Subsequently, build job candidates are selected and assigned to the
-        idle builders. The necessary build job assignment actions are not
-        carried out directly though but merely memorized by the recording
-        build slaves.
-
-        In a second stage (see resumeAndDispatch()) each of the latter will be
-        handled in an asynchronous and parallel fashion.
-        """
-        # Avoiding circular imports.
-        from lp.buildmaster.interfaces.builder import IBuilderSet
-
-        recording_slaves = []
-        builder_set = getUtility(IBuilderSet)
-
-        # Builddmaster will perform partial commits for avoiding
-        # long-living trasaction with changes that affects other
-        # parts of the system.
-        builder_set.pollBuilders(self.logger, transaction)
-
-        for builder in builder_set:
-            self.logger.debug("Considering %s" % builder.name)
-
-            if builder.manual:
-                self.logger.debug('Builder is in manual state, ignored.')
-                continue
-
-            if not builder.is_available:
-                self.logger.debug('Builder is not available, ignored.')
-                job = builder.currentjob
-                if job is not None and not builder.builderok:
-                    self.logger.debug('Reseting attached job.')
-                    job.reset()
-                    transaction.commit()
-                continue
-
-            slave = RecordingSlave(builder.name, builder.url, builder.vm_host)
-            candidate = builder.findAndStartJob(buildd_slave=slave)
-            if builder.currentjob is not None:
-                recording_slaves.append(slave)
-                transaction.commit()
-
-        return recording_slaves
+        # At this point, we're done dispatching, so we can schedule the
+        # next scan cycle.
+        self.scheduleNextScanCycle()
+
+        # For the test suite so that it can chain callback results.
+        return deferred_list_results
 
     def checkResume(self, response, slave):
-        """Deal with a slave resume failure.
+        """Check the result of resuming a slave.
 
-        Return a corresponding `ResetDispatchResult` dispatch result,
-        which is chained to the next callback, dispatchBuild.
+        If there's a problem resuming, we return a ResetDispatchResult which
+        will get evaluated at the end of the scan, or None if the resume
+        was OK.
         """
         # 'response' is the tuple that's constructed in
         # ProcessWithTimeout.processEnded(), or is a Failure that
@@ -395,9 +453,9 @@
         If it failed and it compromises the slave then return a corresponding
         `FailDispatchResult`, if it was a communication failure, simply
         reset the slave by returning a `ResetDispatchResult`.
-
-        Otherwise dispatch the next call if there are any and return None.
         """
+        # XXX these DispatchResult classes are badly named and do the
+        # same thing.  We need to fix that.
         self.logger.debug(
             '%s response for "%s": %s' % (slave, method, response))
 
@@ -405,15 +463,15 @@
             self.logger.warn(
                 '%s communication failed (%s)' %
                 (slave, response.getErrorMessage()))
-            self.slaveDone(slave)
+            self.waitOnDeferredList()
             return self.reset_result(slave)
 
-        if isinstance(response, list) and len(response) == 2 :
+        if isinstance(response, list) and len(response) == 2:
             if method in buildd_success_result_map.keys():
                 expected_status = buildd_success_result_map.get(method)
                 status, info = response
                 if status == expected_status:
-                    self._mayDispatch(slave)
+                    self.callSlave(slave)
                     return None
             else:
                 info = 'Unknown slave method: %s' % method
@@ -423,76 +481,98 @@
         self.logger.error(
             '%s failed to dispatch (%s)' % (slave, info))
 
-        self.slaveDone(slave)
+        self.waitOnDeferredList()
         return self.fail_result(slave, info)
 
-    def resumeAndDispatch(self, recording_slaves):
-        """Dispatch existing resume procedure calls and chain dispatching.
-
-        See `RecordingSlave.resumeSlaveHost` for more details.
-        """
-        self.logger.debug('Resuming slaves: %s' % recording_slaves)
-        self.remaining_slaves = recording_slaves
-        if len(self.remaining_slaves) == 0:
-            self.finishCycle()
-
-        for slave in recording_slaves:
-            if slave.resume_requested:
-                # The buildd slave needs to be reset before we can dispatch
-                # builds to it.
-                d = slave.resumeSlave()
-                d.addBoth(self.checkResume, slave)
-            else:
-                # Buildd slave is clean, we can dispatch a build to it
-                # straightaway.
-                d = defer.succeed(None)
-            d.addCallback(self.dispatchBuild, slave)
-            # Store the active deferred.
-            self._deferreds.append(d)
-
-    def dispatchBuild(self, resume_result, slave):
-        """Start dispatching a build to a slave.
-
-        If the previous task in chain (slave resuming) has failed it will
-        receive a `ResetBuilderRequest` instance as 'resume_result' and
-        will immediately return that so the subsequent callback can collect
-        it.
-
-        If the slave resuming succeed, it starts the XMLRPC dialog. See
-        `_mayDispatch` for more information.
-        """
-        if resume_result is not None:
-            self.slaveDone(slave)
-            return resume_result
-        self.logger.info('Dispatching: %s' % slave)
-        self._mayDispatch(slave)
-
-    def _getProxyForSlave(self, slave):
-        """Return a twisted.web.xmlrpc.Proxy for the buildd slave.
-
-        Uses a protocol with timeout support, See QueryFactoryWithTimeout.
-        """
-        proxy = xmlrpc.Proxy(str(urlappend(slave.url, 'rpc')))
-        proxy.queryFactory = QueryFactoryWithTimeout
-        return proxy
-
-    def _mayDispatch(self, slave):
-        """Dispatch the next XMLRPC for the given slave.
-
-        If there are no messages to dispatch return None and mark the slave
-        as done for this cycle. Otherwise it will fetch a new XMLRPC proxy,
-        dispatch the call and set `checkDispatch` as callback.
-        """
-        if len(slave.calls) == 0:
-            self.slaveDone(slave)
-            return
-
-        # Get an XMPRPC proxy for the buildd slave.
-        proxy = self._getProxyForSlave(slave)
-        method, args = slave.calls.pop(0)
-        d = proxy.callRemote(method, *args)
-        d.addBoth(self.checkDispatch, method, slave)
-
-        # Store another active event.
-        self._deferreds.append(d)
-        self.logger.debug('%s -> %s(%s)' % (slave, method, args))
+
+class NewBuildersScanner:
+    """If new builders appear, create a scanner for them."""
+
+    # How often to check for new builders, in seconds.
+    SCAN_INTERVAL = 300
+
+    def __init__(self, manager):
+        self.manager = manager
+        # Avoid circular import.
+        from lp.buildmaster.interfaces.builder import IBuilderSet
+        self.current_builders = [
+            builder.name for builder in getUtility(IBuilderSet)]
+
+    def scheduleScan(self):
+        """Schedule a callback SCAN_INTERVAL seconds later."""
+        reactor.callLater(self.SCAN_INTERVAL, self.scan)
+
+    def scan(self):
+        """If a new builder appears, create a SlaveScanner for it."""
+        new_builders = self.checkForNewBuilders()
+        if new_builders is not None:
+            self.manager.addScanForBuilders(new_builders)
+        self.scheduleScan()
+
+    def checkForNewBuilders(self):
+        """See if any new builders were added."""
+        # Avoid circular import.
+        from lp.buildmaster.interfaces.builder import IBuilderSet
+        new_builders = set(
+            builder.name for builder in getUtility(IBuilderSet))
+        old_builders = set(self.current_builders)
+        extra_builders = new_builders.difference(old_builders)
+        if len(extra_builders) == 0:
+            # No new builders.
+            return None
+
+        return [builder for builder in extra_builders]
+
+
+class BuilddManager(service.Service):
+    """Main Buildd Manager service class."""
+
+    def __init__(self):
+        self.builder_slaves = []
+        self.logger = self._setupLogger()
+        self.new_builders_scanner = NewBuildersScanner(manager=self)
+
+    def _setupLogger(self):
+        """Setup a 'slave-scanner' logger that redirects to twisted.
+
+        It is going to be used locally and within the thread running
+        the scan() method.
+
+        Make it less verbose to avoid messing too much with the old code.
+        """
+        level = logging.INFO
+        logger = logging.getLogger(BUILDD_MANAGER_LOG_NAME)
+
+        # Redirect the output to the twisted log module.
+        channel = logging.StreamHandler(log.StdioOnnaStick())
+        channel.setLevel(level)
+        channel.setFormatter(logging.Formatter('%(message)s'))
+
+        logger.addHandler(channel)
+        logger.setLevel(level)
+        return logger
+
+    def startService(self):
+        """Service entry point, called when the application starts."""
+
+        # Get a list of builders and set up scanners on each one.
+
+        # Avoiding circular imports.
+        from lp.buildmaster.interfaces.builder import IBuilderSet
+        builder_set = getUtility(IBuilderSet)
+        builders = [builder.name for builder in builder_set]
+        self.addScanForBuilders(builders)
+        self.new_builders_scanner.scheduleScan()
+
+        # Events will now fire in the SlaveScanner objects to scan each
+        # builder.
+
+    def addScanForBuilders(self, builders):
+        """Set up scanner objects for the builders specified."""
+        for builder in builders:
+            slave_scanner = SlaveScanner(builder, self.logger)
+            self.builder_slaves.append(slave_scanner)
+            slave_scanner.scheduleNextScanCycle()
+
+        # Return the slave list for the benefit of tests.
+        return self.builder_slaves

=== modified file 'lib/lp/buildmaster/tests/test_manager.py'
--- lib/lp/buildmaster/tests/test_manager.py	2010-07-18 17:43:23 +0000
+++ lib/lp/buildmaster/tests/test_manager.py	2010-07-22 15:26:12 +0000
@@ -8,9 +8,9 @@
 import transaction
 import unittest
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.internet.error import ConnectionClosed
-from twisted.internet.task import Clock
+from twisted.internet.task import Clock, deferLater
 from twisted.python.failure import Failure
 from twisted.trial.unittest import TestCase as TrialTestCase
 
@@ -27,13 +27,16 @@
 from lp.buildmaster.interfaces.builder import IBuilderSet
 from lp.buildmaster.interfaces.buildqueue import IBuildQueueSet
 from lp.buildmaster.manager import (
-    BaseDispatchResult, BuilddManager, FailDispatchResult, RecordingSlave,
-    ResetDispatchResult, buildd_success_result_map)
+    BaseDispatchResult, BuilddManager, SlaveScanner, FailDispatchResult,
+    NewBuildersScanner, RecordingSlave, ResetDispatchResult,
+    buildd_success_result_map)
 from lp.buildmaster.tests.harness import BuilddManagerTestSetup
 from lp.registry.interfaces.distribution import IDistributionSet
 from lp.soyuz.interfaces.binarypackagebuild import IBinaryPackageBuildSet
 from lp.soyuz.tests.soyuzbuilddhelpers import BuildingSlave
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
+from lp.testing.factory import LaunchpadObjectFactory
+from lp.testing.fakemethod import FakeMethod
 
 
 class TestRecordingSlaves(TrialTestCase):
@@ -207,23 +210,22 @@
         self.processed = True
 
 
-class TestingBuilddManager(BuilddManager):
+class TestingSlaveScanner(SlaveScanner):
     """Override the dispatch result factories """
 
     reset_result = TestingResetDispatchResult
     fail_result = TestingFailDispatchResult
 
 
-class TestBuilddManager(TrialTestCase):
+class TestSlaveScanner(TrialTestCase):
     """Tests for the actual build slave manager."""
     layer = TwistedLayer
 
     def setUp(self):
         TrialTestCase.setUp(self)
-        self.manager = TestingBuilddManager()
-        self.manager.logger = BufferLogger()
+        self.manager = TestingSlaveScanner("bob", BufferLogger())
 
-        # We will use an instrumented BuilddManager instance for tests in
+        # We will use an instrumented SlaveScanner instance for tests in
         # this context.
 
         # Stop cyclic execution and record the end of the cycle.
@@ -232,7 +234,7 @@
         def testNextCycle():
             self.stopped = True
 
-        self.manager.nextCycle = testNextCycle
+        self.manager.scheduleNextScanCycle = testNextCycle
 
         # Return the testing Proxy version.
         self.test_proxy = TestingXMLRPCProxy()
@@ -247,56 +249,10 @@
         self.manager.scan = testScan
 
         # Stop automatic collection of dispatching results.
-        def testSlaveDone(slave):
+        def testwaitOnDeferredList():
             pass
-        self._realSlaveDone = self.manager.slaveDone
-        self.manager.slaveDone = testSlaveDone
-
-    def testFinishCycle(self):
-        """Check if the chain is terminated and database updates are done.
-
-        'BuilddManager.finishCycle' verifies the number of active deferreds
-        and once they cease it performs all needed database updates (builder
-        reset or failure) synchronously and call `BuilddManager.nextCycle`.
-        """
-        # There are no active deferreds in a just instantiated BuilddManager.
-        self.assertEqual(0, len(self.manager._deferreds))
-
-        # Fill the deferred list with events we can check later.
-        reset_me = TestingResetDispatchResult(
-            RecordingSlave('foo', 'http://foo', 'foo.host'))
-        fail_me = TestingFailDispatchResult(
-            RecordingSlave('bar', 'http://bar', 'bar.host'), 'boingo')
-        self.manager._deferreds.extend(
-            [defer.succeed(reset_me), defer.succeed(fail_me), defer.fail()])
-
-        # When `finishCycle` is called, and it is called after all build
-        # slave interaction, active deferreds are consumed.
-        events = self.manager.finishCycle()
-
-        def check_events(results):
-            # The cycle was stopped (and in the production the next cycle
-            # would have being scheduled).
-            self.assertTrue(self.stopped)
-
-            # The stored list of events from this cycle was consumed.
-            self.assertEqual(0, len(self.manager._deferreds))
-
-            # We have exactly 2 BaseDispatchResult events.
-            [reset, fail] = [
-                r for s, r in results if isinstance(r, BaseDispatchResult)]
-
-            # They corresponds to the ones created above and were already
-            # processed.
-            self.assertEqual(
-                '<foo:http://foo> reset failure', repr(reset))
-            self.assertTrue(reset.processed)
-            self.assertEqual(
-                '<bar:http://bar> failure (boingo)', repr(fail))
-            self.assertTrue(fail.processed)
-
-        events.addCallback(check_events)
-        return events
+        self._realwaitOnDeferredList = self.manager.waitOnDeferredList
+        self.manager.waitOnDeferredList = testwaitOnDeferredList
 
     def assertIsDispatchReset(self, result):
         self.assertTrue(
@@ -309,7 +265,7 @@
             'Dispatch failure did not result in a FailBuildResult object')
 
     def test_checkResume(self):
-        """`BuilddManager.checkResume` is chained after resume requests.
+        """`SlaveScanner.checkResume` is chained after resume requests.
 
         If the resume request succeed it returns None, otherwise it returns
         a `ResetBuildResult` (the one in the test context) that will be
@@ -355,24 +311,26 @@
         # Make a failing slave that is requesting a resume.
         slave = RecordingSlave('foo', 'http://foo.buildd:8221/', 'foo.host')
         slave.resume_requested = True
-        slave.resumeSlave = lambda: defer.fail(Failure(('out', 'err', 1)))
+        slave.resumeSlave = lambda: deferLater(
+            reactor, 0, defer.fail, Failure(('out', 'err', 1)))
 
         # Make the manager log the reset result calls.
         self.manager.reset_result = LoggingResetResult
-        # Restore the slaveDone method. It's very relevant to this test.
-        self.manager.slaveDone = self._realSlaveDone
+
         # We only care about this one slave. Reset the list of manager
         # deferreds in case setUp did something unexpected.
-        self.manager._deferreds = []
-
-        self.manager.resumeAndDispatch([slave])
-        # Note: finishCycle isn't generally called by external users, normally
-        # resumeAndDispatch or slaveDone calls it. However, these calls
-        # swallow the Deferred that finishCycle returns, and we need that
-        # Deferred to make sure this test completes properly.
-        d = self.manager.finishCycle()
-        return d.addCallback(
-            lambda ignored: self.assertEqual([slave], reset_result_calls))
+        self.manager._deferred_list = []
+
+        # Here, we're patching the waitOnDeferredList method so we can
+        # get an extra callback at the end of it, so we can
+        # verify that the reset_result was really called.
+        def _waitOnDeferredList():
+            d = self._realwaitOnDeferredList()
+            return d.addCallback(
+                lambda ignored: self.assertEqual([slave], reset_result_calls))
+        self.manager.waitOnDeferredList = _waitOnDeferredList
+
+        self.manager.resumeAndDispatch(slave)
 
     def test_failed_to_resume_slave_ready_for_reset(self):
         # When a slave fails to resume, the manager has a Deferred in its
@@ -385,11 +343,12 @@
 
         # We only care about this one slave. Reset the list of manager
         # deferreds in case setUp did something unexpected.
-        self.manager._deferreds = []
-        # Restore the slaveDone method. It's very relevant to this test.
-        self.manager.slaveDone = self._realSlaveDone
-        self.manager.resumeAndDispatch([slave])
-        [d] = self.manager._deferreds
+        self.manager._deferred_list = []
+        # Restore the waitOnDeferredList method. It's very relevant to
+        # this test.
+        self.manager.waitOnDeferredList = self._realwaitOnDeferredList
+        self.manager.resumeAndDispatch(slave)
+        [d] = self.manager._deferred_list
 
         # The Deferred for our failing slave should be ready to fire
         # successfully with a ResetDispatchResult.
@@ -400,7 +359,7 @@
         return d.addCallback(check_result)
 
     def testCheckDispatch(self):
-        """`BuilddManager.checkDispatch` is chained after dispatch requests.
+        """`SlaveScanner.checkDispatch` is chained after dispatch requests.
 
         If the dispatch request fails or a unknown method is given, it
         returns a `FailDispatchResult` (in the test context) that will
@@ -468,7 +427,7 @@
             '<foo:http://foo.buildd:8221/> failure '
             '(Unknown slave method: unknown-method)', repr(result))
 
-    def testDispatchBuild(self):
+    def test_initiateDispatch(self):
         """Check `dispatchBuild` in various scenarios.
 
         When there are no recording slaves (i.e. no build got dispatched
@@ -481,6 +440,26 @@
         On slave call failure the chain is stopped immediately and an
         FailDispatchResult is collected while finishing the cycle.
         """
+        def check_no_events(results):
+            errors = [
+                r for s, r in results if isinstance(r, BaseDispatchResult)]
+            self.assertEqual(0, len(errors))
+
+        def check_events(results):
+            [error] = [r for s, r in results if r is not None]
+            self.assertEqual(
+                '<foo:http://foo.buildd:8221/> failure (very broken slave)',
+                repr(error))
+            self.assertTrue(error.processed)
+
+        def _wait_on_deferreds_then_check_no_events():
+            dl = self._realwaitOnDeferredList()
+            dl.addCallback(check_no_events)
+
+        def _wait_on_deferreds_then_check_events():
+            dl = self._realwaitOnDeferredList()
+            dl.addCallback(check_events)
+
         # A functional slave charged with some interactions.
         slave = RecordingSlave('foo', 'http://foo.buildd:8221/', 'foo.host')
         slave.ensurepresent('arg1', 'arg2', 'arg3')
@@ -488,30 +467,28 @@
 
         # If the previous step (resuming) has failed nothing gets dispatched.
         reset_result = ResetDispatchResult(slave)
-        result = self.manager.dispatchBuild(reset_result, slave)
+        result = self.manager.initiateDispatch(reset_result, slave)
         self.assertTrue(result is reset_result)
         self.assertFalse(slave.resume_requested)
-        self.assertEqual(0, len(self.manager._deferreds))
+        self.assertEqual(0, len(self.manager._deferred_list))
 
         # Operation with the default (funcional slave), no resets or
         # failures results are triggered.
         slave.resume()
-        result = self.manager.dispatchBuild(None, slave)
+        result = self.manager.initiateDispatch(None, slave)
         self.assertEqual(None, result)
         self.assertTrue(slave.resume_requested)
         self.assertEqual(
             [('ensurepresent', 'arg1', 'arg2', 'arg3'),
              ('build', 'arg1', 'arg2', 'arg3')],
             self.test_proxy.calls)
-        self.assertEqual(2, len(self.manager._deferreds))
-
-        events = self.manager.finishCycle()
-
-        def check_no_events(results):
-            errors = [
-                r for s, r in results if isinstance(r, BaseDispatchResult)]
-            self.assertEqual(0, len(errors))
-        events.addCallback(check_no_events)
+        self.assertEqual(2, len(self.manager._deferred_list))
+
+        # Monkey patch the waitOnDeferredList method so we can chain a
+        # callback to check the end of the result chain.
+        self.manager.waitOnDeferredList = \
+            _wait_on_deferreds_then_check_no_events
+        events = self.manager.waitOnDeferredList()
 
         # Create a broken slave and insert interaction that will
         # cause the builder to be marked as fail.
@@ -520,29 +497,24 @@
         slave.ensurepresent('arg1', 'arg2', 'arg3')
         slave.build('arg1', 'arg2', 'arg3')
 
-        result = self.manager.dispatchBuild(None, slave)
+        result = self.manager.initiateDispatch(None, slave)
         self.assertEqual(None, result)
-        self.assertEqual(1, len(self.manager._deferreds))
+        self.assertEqual(3, len(self.manager._deferred_list))
         self.assertEqual(
             [('ensurepresent', 'arg1', 'arg2', 'arg3')],
             self.test_proxy.calls)
 
-        events = self.manager.finishCycle()
-
-        def check_events(results):
-            [error] = [r for s, r in results if r is not None]
-            self.assertEqual(
-                '<foo:http://foo.buildd:8221/> failure (very broken slave)',
-                repr(error))
-            self.assertTrue(error.processed)
-
-        events.addCallback(check_events)
+        # Monkey patch the waitOnDeferredList method so we can chain a
+        # callback to check the end of the result chain.
+        self.manager.waitOnDeferredList = \
+            _wait_on_deferreds_then_check_events
+        events = self.manager.waitOnDeferredList()
 
         return events
 
 
-class TestBuilddManagerScan(TrialTestCase):
-    """Tests `BuilddManager.scan` method.
+class TestSlaveScannerScan(TrialTestCase):
+    """Tests `SlaveScanner.scan` method.
 
     This method uses the old framework for scanning and dispatching builds.
     """
@@ -599,29 +571,24 @@
         self.assertEqual(job.logtail, logtail)
 
     def _getManager(self):
-        """Instantiate a BuilddManager object.
+        """Instantiate a SlaveScanner object.
 
         Replace its default logging handler by a testing version.
         """
-        manager = BuilddManager()
-
-        for handler in manager.logger.handlers:
-            manager.logger.removeHandler(handler)
-        manager.logger = BufferLogger()
+        manager = SlaveScanner("bob", BufferLogger())
         manager.logger.name = 'slave-scanner'
 
         return manager
 
-    def _checkDispatch(self, recording_slaves, builder):
-        """`BuilddManager.scan` return a list of `RecordingSlaves`.
+    def _checkDispatch(self, slave, builder):
+        """`SlaveScanner.scan` returns a `RecordingSlave`.
 
         The single slave returned should match the given builder and
         contain interactions that should be performed asynchronously for
         properly dispatching the sampledata job.
         """
-        self.assertEqual(
-            len(recording_slaves), 1, "Unexpected recording_slaves.")
-        [slave] = recording_slaves
+        self.assertFalse(
+            slave is None, "Unexpected recording_slaves.")
 
         self.assertEqual(slave.name, builder.name)
         self.assertEqual(slave.url, builder.url)
@@ -664,15 +631,15 @@
         d.addCallback(self._checkDispatch, builder)
         return d
 
-    def _checkNoDispatch(self, recording_slaves, builder):
+    def _checkNoDispatch(self, recording_slave, builder):
         """Assert that no dispatch has occurred.
 
-        'recording_slaves' is empty, so no interations would be passed
+        'recording_slave' is None, so no interations would be passed
         to the asynchonous dispatcher and the builder remained active
         and IDLE.
         """
-        self.assertEqual(
-            len(recording_slaves), 0, "Unexpected recording_slaves.")
+        self.assertTrue(
+            recording_slave is None, "Unexpected recording_slave.")
 
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertTrue(builder.builderok)
@@ -703,14 +670,14 @@
         d.addCallback(self._checkNoDispatch, builder)
         return d
 
-    def _checkJobRescued(self, recording_slaves, builder, job):
-        """`BuilddManager.scan` rescued the job.
+    def _checkJobRescued(self, slave, builder, job):
+        """`SlaveScanner.scan` rescued the job.
 
         Nothing gets dispatched,  the 'broken' builder remained disabled
         and the 'rescued' job is ready to be dispatched.
         """
-        self.assertEqual(
-            len(recording_slaves), 0, "Unexpected recording_slaves.")
+        self.assertTrue(
+            slave is None, "Unexpected slave.")
 
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertFalse(builder.builderok)
@@ -743,14 +710,13 @@
         d.addCallback(self._checkJobRescued, builder, job)
         return d
 
-    def _checkJobUpdated(self, recording_slaves, builder, job):
-        """`BuilddManager.scan` updates legitimate jobs.
+    def _checkJobUpdated(self, slave, builder, job):
+        """`SlaveScanner.scan` updates legitimate jobs.
 
         Job is kept assigned to the active builder and its 'logtail' is
         updated.
         """
-        self.assertEqual(
-            len(recording_slaves), 0, "Unexpected recording_slaves.")
+        self.assertTrue(slave is None, "Unexpected slave.")
 
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertTrue(builder.builderok)
@@ -867,6 +833,119 @@
         self.assertEqual('does not work!', builder.failnotes)
 
 
+class TestBuilddManager(TrialTestCase):
+
+    layer = LaunchpadZopelessLayer
+
+    def _stub_out_scheduleNextScanCycle(self):
+        # stub out the code that adds a callLater, so that later tests
+        # don't get surprises.
+        self._saved_schedule = SlaveScanner.scheduleNextScanCycle
+        def cleanup():
+            SlaveScanner.scheduleNextScanCycle = self._saved_schedule
+        self.addCleanup(cleanup)
+        SlaveScanner.scheduleNextScanCycle = FakeMethod
+
+    def test_addScanForBuilders(self):
+        # Test that addScanForBuilders generates NewBuildersScanner objects.
+
+        self._stub_out_scheduleNextScanCycle()
+
+        manager = BuilddManager()
+        builder_names = [builder.name for builder in getUtility(IBuilderSet)]
+        scanners = manager.addScanForBuilders(builder_names)
+        scanner_names = [scanner.builder_name for scanner in scanners]
+        for builder in builder_names:
+            self.assertTrue(builder in scanner_names)
+
+    def test_startService_adds_NewBuildersScanner(self):
+        # When startService is called, the manager will start up a
+        # NewBuildersScanner object.
+        self._stub_out_scheduleNextScanCycle()
+        NewBuildersScanner.SCAN_INTERVAL = 0
+        manager = BuilddManager()
+
+        # Replace scan() with FakeMethod so we can see if it was called.
+        manager.new_builders_scanner.scan = FakeMethod
+
+        def assertCalled():
+            self.failIf(manager.new_builders_scanner.scan.call_count == 0)
+
+        manager.startService()
+        reactor.callLater(0, assertCalled)
+
+
+class TestNewBuilders(TrialTestCase):
+    """Test detecting of new builders."""
+
+    layer = LaunchpadZopelessLayer
+
+    def _getScanner(self, manager=None):
+        return NewBuildersScanner(manager=manager)
+
+    def test_init_stores_existing_builders(self):
+        # Make sure that NewBuildersScanner initialises itself properly
+        # by storing a list of existing builders.
+        all_builders = [builder.name for builder in getUtility(IBuilderSet)]
+        builder_scanner = self._getScanner()
+        self.assertEqual(all_builders, builder_scanner.current_builders)
+
+    def test_scheduleScan(self):
+        # Test that scheduleScan calls the "scan" method.
+        builder_scanner = self._getScanner()
+        builder_scanner.SCAN_INTERVAL = 0
+
+        builder_scanner.scan = FakeMethod
+        builder_scanner.scheduleScan()
+
+        def assertCalled():
+            self.failIf(
+                builder_scanner.scan.call_count == 0,
+                "scheduleScan did not schedule anything")
+
+        reactor.callLater(0, assertCalled)
+
+    def test_checkForNewBuilders(self):
+        # Test that checkForNewBuilders() detects a new builder
+
+        # The basic case, where no builders are added.
+        builder_scanner = self._getScanner()
+        self.failUnlessIdentical(None, builder_scanner.checkForNewBuilders())
+
+        # Add two builders and ensure they're returned.
+        new_builders = ["scooby", "lassie"]
+        factory = LaunchpadObjectFactory()
+        for builder_name in new_builders:
+            factory.makeBuilder(name=builder_name)
+        self.failUnlessEqual(
+            new_builders, builder_scanner.checkForNewBuilders())
+
+    def test_scan(self):
+        # See if scan detects new builders and schedules the next scan.
+
+        # stub out the addScanForBuilders and scheduleScan methods since
+        # they use callLater; we only want to assert that they get
+        # called.
+        def fake_checkForNewBuilders():
+            return "new_builders"
+
+        def fake_addScanForBuilders(new_builders):
+            self.failUnlessEqual("new_builders", new_builders)
+
+        def assertCalled():
+            self.failIf(
+                builder_scanner.scheduleScan.call_count == 0,
+                "scheduleScan did not get called")
+
+        builder_scanner = self._getScanner(BuilddManager())
+        builder_scanner.checkForNewBuilders = fake_checkForNewBuilders
+        builder_scanner.manager.addScanForBuilders = fake_addScanForBuilders
+        builder_scanner.scheduleScan = FakeMethod
+
+        reactor.callLater(0, builder_scanner.scan)
+        reactor.callLater(0, assertCalled)
+
+
 class TestBuilddManagerScript(unittest.TestCase):
 
     layer = LaunchpadScriptLayer


Follow ups