← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/buildd-manager-inlineCallbacks into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/buildd-manager-inlineCallbacks into lp:launchpad.

Commit message:
Port various bits of SlaveScanner and BuilderInteractor to use inlineCallbacks.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/buildd-manager-inlineCallbacks/+merge/183578

Port various bits of SlaveScanner and BuilderInteractor to use inlineCallbacks, making their logic far less impossible to understand.
-- 
https://code.launchpad.net/~wgrant/launchpad/buildd-manager-inlineCallbacks/+merge/183578
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/buildd-manager-inlineCallbacks into lp:launchpad.
=== modified file 'lib/lp/buildmaster/interactor.py'
--- lib/lp/buildmaster/interactor.py	2013-09-02 07:13:06 +0000
+++ lib/lp/buildmaster/interactor.py	2013-09-03 07:44:15 +0000
@@ -23,7 +23,6 @@
 
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonError,
-    BuildSlaveFailure,
     CannotFetchFile,
     CannotResumeHost,
     CorruptBuildCookie,
@@ -209,14 +208,9 @@
         :param args: A dictionary of extra arguments. The contents depend on
             the build job type.
         """
-        d = self._with_timeout(self._server.callRemote(
+        return self._with_timeout(self._server.callRemote(
             'build', buildid, builder_type, chroot_sha1, filemap, args))
 
-        def got_fault(failure):
-            failure.trap(xmlrpclib.Fault)
-            raise BuildSlaveFailure(failure.value)
-        return d.addErrback(got_fault)
-
 
 class BuilderInteractor(object):
 
@@ -272,6 +266,7 @@
             self._cached_currentjob = currentjob
         return self._cached_build_behavior
 
+    @defer.inlineCallbacks
     def slaveStatus(self):
         """Get the slave status for this builder.
 
@@ -280,23 +275,20 @@
             potentially other values included by the current build
             behavior.
         """
-        d = self.slave.status()
-
-        def got_status(status_sentence):
-            status = {'builder_status': status_sentence[0]}
-
-            # Extract detailed status and log information if present.
-            # Although build_id is also easily extractable here, there is no
-            # valid reason for anything to use it, so we exclude it.
-            if status['builder_status'] == 'BuilderStatus.WAITING':
-                status['build_status'] = status_sentence[1]
-            else:
-                if status['builder_status'] == 'BuilderStatus.BUILDING':
-                    status['logtail'] = status_sentence[2]
-            return (status_sentence, status)
-
-        return d.addCallback(got_status)
-
+        status_sentence = yield self.slave.status()
+        status = {'builder_status': status_sentence[0]}
+
+        # Extract detailed status and log information if present.
+        # Although build_id is also easily extractable here, there is no
+        # valid reason for anything to use it, so we exclude it.
+        if status['builder_status'] == 'BuilderStatus.WAITING':
+            status['build_status'] = status_sentence[1]
+        else:
+            if status['builder_status'] == 'BuilderStatus.BUILDING':
+                status['logtail'] = status_sentence[2]
+        defer.returnValue((status_sentence, status))
+
+    @defer.inlineCallbacks
     def isAvailable(self):
         """Whether or not a builder is available for building new jobs.
 
@@ -304,16 +296,12 @@
             whether the builder is available or not.
         """
         if not self.builder.builderok:
-            return defer.succeed(False)
-        d = self.slave.status()
-
-        def catch_fault(failure):
-            failure.trap(xmlrpclib.Fault, socket.error)
-            return False
-
-        def check_available(status):
-            return status[0] == 'BuilderStatus.IDLE'
-        return d.addCallbacks(check_available, catch_fault)
+            defer.returnValue(False)
+        try:
+            status = yield self.slave.status()
+        except (xmlrpclib.Fault, socket.error):
+            defer.returnValue(False)
+        defer.returnValue(status[0] == 'BuilderStatus.IDLE')
 
     def verifySlaveBuildCookie(self, slave_build_cookie):
         """See `IBuildFarmJobBehavior`."""
@@ -323,6 +311,7 @@
         if slave_build_cookie != good_cookie:
             raise CorruptBuildCookie("Invalid slave build cookie.")
 
+    @defer.inlineCallbacks
     def rescueIfLost(self, logger=None):
         """Reset the slave if its job information doesn't match the DB.
 
@@ -344,60 +333,43 @@
             'BuilderStatus.WAITING': 2
             }
 
-        d = self.slave.status()
-
-        def got_status(status_sentence):
-            """After we get the status, clean if we have to.
-
-            Always return status_sentence.
-            """
-            # Isolate the BuilderStatus string, always the first token in
-            # BuilderSlave.status().
-            status = status_sentence[0]
-
-            # If the cookie test below fails, it will request an abort of the
-            # builder.  This will leave the builder in the aborted state and
-            # with no assigned job, and we should now "clean" the slave which
-            # will reset its state back to IDLE, ready to accept new builds.
-            # This situation is usually caused by a temporary loss of
-            # communications with the slave and the build manager had to reset
-            # the job.
-            if (status == 'BuilderStatus.ABORTED'
-                    and self.builder.currentjob is None):
-                if logger is not None:
-                    logger.info(
-                        "Builder '%s' being cleaned up from ABORTED" %
-                        (self.builder.name,))
-                d = self.cleanSlave()
-                return d.addCallback(lambda ignored: status_sentence)
+        # Isolate the BuilderStatus string, always the first token in
+        # BuilderSlave.status().
+        status_sentence = yield self.slave.status()
+        status = status_sentence[0]
+
+        # If the cookie test below fails, it will request an abort of the
+        # builder.  This will leave the builder in the aborted state and
+        # with no assigned job, and we should now "clean" the slave which
+        # will reset its state back to IDLE, ready to accept new builds.
+        # This situation is usually caused by a temporary loss of
+        # communications with the slave and the build manager had to reset
+        # the job.
+        if (status == 'BuilderStatus.ABORTED'
+                and self.builder.currentjob is None):
+            if logger is not None:
+                logger.info(
+                    "Builder '%s' being cleaned up from ABORTED" %
+                    (self.builder.name,))
+            yield self.cleanSlave()
+
+        # If slave is not building nor waiting, it's not in need of
+        # rescuing.
+        status = status_sentence[0]
+        if status not in ident_position.keys():
+            return
+        slave_build_id = status_sentence[ident_position[status]]
+        try:
+            self.verifySlaveBuildCookie(slave_build_id)
+        except CorruptBuildCookie as reason:
+            if status == 'BuilderStatus.WAITING':
+                yield self.cleanSlave()
             else:
-                return status_sentence
-
-        def rescue_slave(status_sentence):
-            # If slave is not building nor waiting, it's not in need of
-            # rescuing.
-            status = status_sentence[0]
-            if status not in ident_position.keys():
-                return
-            slave_build_id = status_sentence[ident_position[status]]
-            try:
-                self.verifySlaveBuildCookie(slave_build_id)
-            except CorruptBuildCookie as reason:
-                if status == 'BuilderStatus.WAITING':
-                    d = self.cleanSlave()
-                else:
-                    d = self.requestAbort()
-
-                def log_rescue(ignored):
-                    if logger:
-                        logger.info(
-                            "Builder '%s' rescued from '%s': '%s'" %
-                            (self.builder.name, slave_build_id, reason))
-                return d.addCallback(log_rescue)
-
-        d.addCallback(got_status)
-        d.addCallback(rescue_slave)
-        return d
+                yield self.requestAbort()
+            if logger:
+                logger.info(
+                    "Builder '%s' rescued from '%s': '%s'" %
+                    (self.builder.name, slave_build_id, reason))
 
     def updateStatus(self, logger=None):
         """Update the builder's status by probing it.
@@ -464,6 +436,7 @@
 
         return d.addCallback(got_resume_ok).addErrback(got_resume_bad)
 
+    @defer.inlineCallbacks
     def _startBuild(self, build_queue_item, logger):
         """Start a build on this builder.
 
@@ -481,8 +454,6 @@
                 "Inappropriate IBuildFarmJobBehavior: %r is not a %r" %
                 (self._current_build_behavior, needed_bfjb))
         self._current_build_behavior.logStartBuild(logger)
-
-        # Make sure the request is valid; an exception is raised if it's not.
         self._current_build_behavior.verifyBuildRequest(logger)
 
         # Set the build behavior depending on the provided build queue item.
@@ -490,34 +461,20 @@
             raise BuildDaemonError(
                 "Attempted to start a build on a known-bad builder.")
 
-        # If we are building a virtual build, resume the virtual machine.
+        # If we are building a virtual build, resume the virtual
+        # machine.  Before we try and contact the resumed slave, we're
+        # going to send it a message.  This is to ensure it's accepting
+        # packets from the outside world, because testing has shown that
+        # the first packet will randomly fail for no apparent reason.
+        # This could be a quirk of the Xen guest, we're not sure.  We
+        # also don't care about the result from this message, just that
+        # it's sent, hence the "addBoth".  See bug 586359.
         if self.builder.virtualized:
-            d = self.resumeSlaveHost()
-        else:
-            d = defer.succeed(None)
-
-        def ping_done(ignored):
-            return self._current_build_behavior.dispatchBuildToSlave(
-                build_queue_item.id, logger)
-
-        def resume_done(ignored):
-            # Before we try and contact the resumed slave, we're going
-            # to send it a message.  This is to ensure it's accepting
-            # packets from the outside world, because testing has shown
-            # that the first packet will randomly fail for no apparent
-            # reason.  This could be a quirk of the Xen guest, we're not
-            # sure.  We also don't care about the result from this message,
-            # just that it's sent, hence the "addBoth".
-            # See bug 586359.
-            if self.builder.virtualized:
-                d = self.slave.echo("ping")
-            else:
-                d = defer.succeed(None)
-            d.addBoth(ping_done)
-            return d
-
-        d.addCallback(resume_done)
-        return d
+            yield self.resumeSlaveHost()
+            yield self.slave.echo("ping")
+
+        yield self._current_build_behavior.dispatchBuildToSlave(
+            build_queue_item.id, logger)
 
     def resetOrFail(self, logger, exception):
         """Handle "confirmed" build slave failures.
@@ -556,29 +513,29 @@
                 "Disabling builder: %s -- %s" % (
                     self.builder.url, error_message))
             self.builder.failBuilder(error_message)
+            transaction.commit()
         return defer.succeed(None)
 
+    @defer.inlineCallbacks
     def findAndStartJob(self):
         """Find a job to run and send it to the buildd slave.
 
         :return: A Deferred whose value is the `IBuildQueue` instance
             found or None if no job was found.
         """
+        logger = self._getSlaveScannerLogger()
         # XXX This method should be removed in favour of two separately
         # called methods that find and dispatch the job.  It will
         # require a lot of test fixing.
-        logger = self._getSlaveScannerLogger()
         candidate = self.builder.acquireBuildCandidate()
-
         if candidate is None:
             logger.debug("No build candidates available for builder.")
-            return defer.succeed(None)
-
-        # Using maybeDeferred ensures that any exceptions are also
-        # wrapped up and caught later.
-        d = defer.maybeDeferred(self._startBuild, candidate, logger)
-        return d.addCallback(lambda ignored: candidate)
-
+            defer.returnValue(None)
+
+        yield self._startBuild(candidate, logger)
+        defer.returnValue(candidate)
+
+    @defer.inlineCallbacks
     def updateBuild(self, queueItem):
         """Verify the current build job status.
 
@@ -586,36 +543,21 @@
 
         :return: A Deferred that fires when the slave dialog is finished.
         """
+        builder_status_handlers = {
+            'BuilderStatus.IDLE': self.updateBuild_IDLE,
+            'BuilderStatus.BUILDING': self.updateBuild_BUILDING,
+            'BuilderStatus.ABORTING': self.updateBuild_ABORTING,
+            'BuilderStatus.ABORTED': self.updateBuild_ABORTED,
+            'BuilderStatus.WAITING': self.updateBuild_WAITING,
+            }
+        statuses = yield self.slaveStatus()
         logger = logging.getLogger('slave-scanner')
-
-        d = self.slaveStatus()
-
-        def got_failure(failure):
-            failure.trap(xmlrpclib.Fault, socket.error)
-            info = failure.value
-            info = ("Could not contact the builder %s, caught a (%s)"
-                    % (queueItem.builder.url, info))
-            raise BuildSlaveFailure(info)
-
-        def got_status(statuses):
-            builder_status_handlers = {
-                'BuilderStatus.IDLE': self.updateBuild_IDLE,
-                'BuilderStatus.BUILDING': self.updateBuild_BUILDING,
-                'BuilderStatus.ABORTING': self.updateBuild_ABORTING,
-                'BuilderStatus.ABORTED': self.updateBuild_ABORTED,
-                'BuilderStatus.WAITING': self.updateBuild_WAITING,
-                }
-            status_sentence, status_dict = statuses
-            builder_status = status_dict['builder_status']
-            if builder_status not in builder_status_handlers:
-                raise AssertionError("Unknown status %s" % builder_status)
-            method = builder_status_handlers[builder_status]
-            return defer.maybeDeferred(
-                method, queueItem, status_sentence, status_dict, logger)
-
-        d.addErrback(got_failure)
-        d.addCallback(got_status)
-        return d
+        status_sentence, status_dict = statuses
+        builder_status = status_dict['builder_status']
+        if builder_status not in builder_status_handlers:
+            raise AssertionError("Unknown status %s" % builder_status)
+        method = builder_status_handlers[builder_status]
+        yield method(queueItem, status_sentence, status_dict, logger)
 
     def updateBuild_IDLE(self, queueItem, status_sentence, status_dict,
                          logger):

=== modified file 'lib/lp/buildmaster/manager.py'
--- lib/lp/buildmaster/manager.py	2013-09-02 07:30:53 +0000
+++ lib/lp/buildmaster/manager.py	2013-09-03 07:44:15 +0000
@@ -249,6 +249,7 @@
             # value is not None if we resumed a slave host.
             defer.returnValue(value is not None)
 
+    @defer.inlineCallbacks
     def scan(self):
         """Probe the builder and update/dispatch/collect as appropriate.
 
@@ -278,83 +279,53 @@
         self.builder = get_builder(self.builder_name)
         self.interactor = BuilderInteractor(self.builder)
 
-        def status_updated(ignored):
-            # Commit the changes done while possibly rescuing jobs, to
-            # avoid holding table locks.
-            transaction.commit()
-
-            # See if we think there's an active build on the builder.
-            buildqueue = self.builder.currentjob
-
-            # Scan the slave and get the logtail, or collect the build if
-            # it's ready.  Yes, "updateBuild" is a bad name.
-            if buildqueue is not None:
-                return self.interactor.updateBuild(buildqueue)
-
-        def build_updated(ignored):
-            # Commit changes done while updating the build, to avoid
-            # holding table locks.
-            transaction.commit()
-
-            # If the builder is in manual mode, don't dispatch anything.
-            if self.builder.manual:
-                self.logger.debug(
-                    '%s is in manual mode, not dispatching.' %
-                    self.builder.name)
-                return
-
-            # If the builder is marked unavailable, don't dispatch anything.
-            # Additionaly, because builders can be removed from the pool at
-            # any time, we need to see if we think there was a build running
-            # on it before it was marked unavailable. In this case we reset
-            # the build thusly forcing it to get re-dispatched to another
-            # builder.
-
-            return self.interactor.isAvailable().addCallback(got_available)
-
-        def got_available(available):
-            if not available:
-                job = self.builder.currentjob
-                if job is not None and not self.builder.builderok:
-                    self.logger.info(
-                        "%s was made unavailable, resetting attached "
-                        "job" % self.builder.name)
-                    job.reset()
-                    transaction.commit()
-                return
-
-            # See if there is a job we can dispatch to the builder slave.
-
-            d = self.interactor.findAndStartJob()
-
-            def job_started(candidate):
-                if self.builder.currentjob is not None:
-                    # After a successful dispatch we can reset the
-                    # failure_count.
-                    self.builder.resetFailureCount()
-                    transaction.commit()
-                    return self.interactor.slave
-                else:
-                    return None
-            return d.addCallback(job_started)
-
-        def cancellation_checked(cancelled):
+        if self.builder.builderok:
+            cancelled = yield self.checkCancellation(self.builder)
             if cancelled:
-                return defer.succeed(None)
-            d = self.interactor.updateStatus(self.logger)
-            d.addCallback(status_updated)
-            d.addCallback(build_updated)
-            return d
-
-        if self.builder.builderok:
-            d = self.checkCancellation(self.builder)
-            d.addCallback(cancellation_checked)
-        else:
-            d = defer.succeed(None)
-            d.addCallback(status_updated)
-            d.addCallback(build_updated)
-
-        return d
+                return
+            yield self.interactor.updateStatus(self.logger)
+
+        # Commit the changes done while possibly rescuing jobs, to
+        # avoid holding table locks.
+        transaction.commit()
+
+        buildqueue = self.builder.currentjob
+        if buildqueue is not None:
+            # Scan the slave and get the logtail, or collect the build
+            # if it's ready.  Yes, "updateBuild" is a bad name.
+            yield self.interactor.updateBuild(buildqueue)
+
+        # If the builder is in manual mode, don't dispatch anything.
+        if self.builder.manual:
+            self.logger.debug(
+                '%s is in manual mode, not dispatching.' %
+                self.builder.name)
+            return
+
+        # If the builder is marked unavailable, don't dispatch anything.
+        # Additionaly, because builders can be removed from the pool at
+        # any time, we need to see if we think there was a build running
+        # on it before it was marked unavailable. In this case we reset
+        # the build thusly forcing it to get re-dispatched to another
+        # builder.
+        available = yield self.interactor.isAvailable()
+        if not available:
+            job = self.builder.currentjob
+            if job is not None and not self.builder.builderok:
+                self.logger.info(
+                    "%s was made unavailable, resetting attached "
+                    "job" % self.builder.name)
+                job.reset()
+                transaction.commit()
+            return
+
+        # See if there is a job we can dispatch to the builder slave.
+        yield self.interactor.findAndStartJob()
+        if self.builder.currentjob is not None:
+            # After a successful dispatch we can reset the
+            # failure_count.
+            self.builder.resetFailureCount()
+            transaction.commit()
 
 
 class NewBuildersScanner:

=== modified file 'lib/lp/buildmaster/tests/test_manager.py'
--- lib/lp/buildmaster/tests/test_manager.py	2013-09-02 07:30:53 +0000
+++ lib/lp/buildmaster/tests/test_manager.py	2013-09-03 07:44:15 +0000
@@ -128,14 +128,7 @@
 
         return scanner
 
-    def _checkDispatch(self, slave, builder):
-        # SlaveScanner.scan returns a slave when a dispatch was
-        # successful.  We also check that the builder has a job on it.
-
-        self.assertTrue(slave is not None, "Expected a slave.")
-        self.assertEqual(0, builder.failure_count)
-        self.assertTrue(builder.currentjob is not None)
-
+    @defer.inlineCallbacks
     def testScanDispatchForResetBuilder(self):
         # A job gets dispatched to the sampledata builder after it's reset.
 
@@ -150,9 +143,9 @@
         # Run 'scan' and check its result.
         switch_dbuser(config.builddmaster.dbuser)
         scanner = self._getScanner()
-        d = defer.maybeDeferred(scanner.scan)
-        d.addCallback(self._checkDispatch, builder)
-        return d
+        yield scanner.scan()
+        self.assertEqual(0, builder.failure_count)
+        self.assertTrue(builder.currentjob is not None)
 
     def _checkNoDispatch(self, slave, builder):
         """Assert that no dispatch has occurred.


Follow ups