← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jelmer/launchpad/128259-async-1 into lp:launchpad/devel

 

Jelmer Vernooij has proposed merging lp:~jelmer/launchpad/128259-async-1 into lp:launchpad/devel with lp:~jelmer/launchpad/refactor-uploadprocessor as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers): code
Related bugs:
  #128259 Buildmaster should not call "uploader" script for processing incoming binaries
  https://bugs.launchpad.net/bugs/128259


This branch adds a --builds option to process-upload. 

This new option makes process-upload the directory name in which the upload was found as <buildid>-<buildqueuerecordid>, and update the build status appropriately after processing the upload. This is the name format that is already used by the buildd manager.

This makes it possible for uploads of builds to no longer happen synchronously inside of the buildd manager, but rather for the buildd manager to move them into a separate queue where they can be processed by a process-upload independently without blocking the buildd manager.
-- 
https://code.launchpad.net/~jelmer/launchpad/128259-async-1/+merge/32439
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jelmer/launchpad/128259-async-1 into lp:launchpad/devel.
=== modified file 'lib/lp/archiveuploader/tests/test_uploadprocessor.py'
--- lib/lp/archiveuploader/tests/test_uploadprocessor.py	2010-08-12 13:42:54 +0000
+++ lib/lp/archiveuploader/tests/test_uploadprocessor.py	2010-08-12 13:43:14 +0000
@@ -25,7 +25,8 @@
 from lp.app.errors import NotFoundError
 from lp.archiveuploader.uploadpolicy import (AbstractUploadPolicy,
     findPolicyByOptions)
-from lp.archiveuploader.uploadprocessor import UploadProcessor
+from lp.archiveuploader.uploadprocessor import (UploadProcessor,
+    parse_build_upload_leaf_name)
 from lp.buildmaster.interfaces.buildbase import BuildStatus
 from canonical.config import config
 from canonical.database.constants import UTC_NOW
@@ -125,6 +126,7 @@
 
         self.options = MockOptions()
         self.options.base_fsroot = self.queue_folder
+        self.options.builds = True
         self.options.leafname = None
         self.options.distro = "ubuntu"
         self.options.distroseries = None
@@ -148,7 +150,7 @@
             return findPolicyByOptions(self.options)
         return UploadProcessor(
             self.options.base_fsroot, self.options.dryrun,
-            self.options.nomails,
+            self.options.nomails, self.options.builds,
             self.options.keep, getPolicy, txn, self.log)
 
     def publishPackage(self, packagename, version, source=True,
@@ -434,7 +436,7 @@
             # Move it
             self.options.base_fsroot = testdir
             up = self.getUploadProcessor(None)
-            up.moveUpload(upload, target_name)
+            up.moveUpload(upload, target_name, self.log)
 
             # Check it moved
             self.assertTrue(os.path.exists(os.path.join(target, upload_name)))
@@ -455,7 +457,7 @@
             # Remove it
             self.options.base_fsroot = testdir
             up = self.getUploadProcessor(None)
-            up.moveProcessedUpload(upload, "accepted")
+            up.moveProcessedUpload(upload, "accepted", self.log)
 
             # Check it was removed, not moved
             self.assertFalse(os.path.exists(os.path.join(
@@ -478,7 +480,7 @@
             # Move it
             self.options.base_fsroot = testdir
             up = self.getUploadProcessor(None)
-            up.moveProcessedUpload(upload, "rejected")
+            up.moveProcessedUpload(upload, "rejected", self.log)
 
             # Check it moved
             self.assertTrue(os.path.exists(os.path.join(testdir,
@@ -501,7 +503,7 @@
             # Remove it
             self.options.base_fsroot = testdir
             up = self.getUploadProcessor(None)
-            up.removeUpload(upload)
+            up.removeUpload(upload, self.log)
 
             # Check it was removed, not moved
             self.assertFalse(os.path.exists(os.path.join(
@@ -1313,6 +1315,7 @@
         used.
         That exception will then initiate the creation of an OOPS report.
         """
+        self.options.builds = False
         processor = self.getUploadProcessor(self.layer.txn)
 
         upload_dir = self.queueUpload("foocomm_1.0-1_proposed")
@@ -1822,3 +1825,101 @@
         pubrec.datepublished = UTC_NOW
         queue_item.setDone()
         self.PGPSignatureNotPreserved(archive=self.breezy.main_archive)
+
+
+class TestBuildUploadProcessor(TestUploadProcessorBase):
+    """Test that processing build uploads works."""
+
+    def setUp(self):
+        super(TestBuildUploadProcessor, self).setUp()
+        self.uploadprocessor = self.setupBreezyAndGetUploadProcessor()
+
+    def testInvalidLeafName(self):
+        upload_dir = self.queueUpload("bar_1.0-1")
+        self.uploadprocessor.processBuildUpload(upload_dir, "bar_1.0-1")
+        self.assertLogContains('Unable to extract build id from leaf '
+                               'name bar_1.0-1, skipping.')
+
+    def testNoBuildEntry(self):
+        upload_dir = self.queueUpload("bar_1.0-1", queue_entry="42-60")
+        self.assertRaises(NotFoundError, self.uploadprocessor.processBuildUpload,
+                upload_dir, "42-60")
+
+    def testNoFiles(self):
+        # Upload a source package
+        upload_dir = self.queueUpload("bar_1.0-1")
+        self.processUpload(self.uploadprocessor, upload_dir)
+        source_pub = self.publishPackage('bar', '1.0-1')
+        [build] = source_pub.createMissingBuilds()
+
+        # Move the source from the accepted queue.
+        [queue_item] = self.breezy.getQueueItems(
+            status=PackageUploadStatus.ACCEPTED,
+            version="1.0-1", name="bar")
+        queue_item.setDone()
+
+        build.jobStarted()
+        build.builder = self.factory.makeBuilder()
+
+        # Upload and accept a binary for the primary archive source.
+        shutil.rmtree(upload_dir)
+        self.layer.txn.commit()
+        leaf_name = "%d-%d" % (build.id, 60)
+        os.mkdir(os.path.join(self.incoming_folder, leaf_name))
+        self.options.context = 'buildd'
+        self.options.builds = True
+        self.uploadprocessor.processBuildUpload(self.incoming_folder, leaf_name)
+        self.layer.txn.commit()
+        self.assertEquals(BuildStatus.FAILEDTOUPLOAD, build.status)
+        log_contents = build.upload_log.read()
+        self.assertTrue('ERROR: Exception while processing upload '
+            in log_contents)
+        self.assertTrue('DEBUG: Moving upload directory '
+            in log_contents)
+
+    def testSuccess(self):
+        # Upload a source package
+        upload_dir = self.queueUpload("bar_1.0-1")
+        self.processUpload(self.uploadprocessor, upload_dir)
+        source_pub = self.publishPackage('bar', '1.0-1')
+        [build] = source_pub.createMissingBuilds()
+
+        # Move the source from the accepted queue.
+        [queue_item] = self.breezy.getQueueItems(
+            status=PackageUploadStatus.ACCEPTED,
+            version="1.0-1", name="bar")
+        queue_item.setDone()
+
+        build.jobStarted()
+        build.builder = self.factory.makeBuilder()
+
+        # Upload and accept a binary for the primary archive source.
+        shutil.rmtree(upload_dir)
+        self.layer.txn.commit()
+        leaf_name = "%d-%d" % (build.id, 60)
+        upload_dir = self.queueUpload("bar_1.0-1_binary",
+                queue_entry=leaf_name)
+        self.options.context = 'buildd'
+        self.options.builds = True
+        self.uploadprocessor.processBuildUpload(self.incoming_folder, leaf_name)
+        self.layer.txn.commit()
+        self.assertEquals(BuildStatus.FULLYBUILT, build.status)
+        log_lines = build.upload_log.read().splitlines()
+        self.assertTrue(
+            'INFO: Processing upload bar_1.0-1_i386.changes' in log_lines)
+        self.assertTrue(
+            'INFO: Committing the transaction and any mails associated with'
+            'this upload.')
+
+
+class ParseBuildUploadLeafNameTests(TestCase):
+    """Tests for parse_build_upload_leaf_name."""
+
+    def test_valid(self):
+        self.assertEquals((42, 60), parse_build_upload_leaf_name("42-60"))
+
+    def test_invalid_chars(self):
+        self.assertRaises(ValueError, parse_build_upload_leaf_name, "a42-460")
+
+    def test_no_dash(self):
+        self.assertRaises(ValueError, parse_build_upload_leaf_name, "32")

=== modified file 'lib/lp/archiveuploader/uploadpolicy.py'
--- lib/lp/archiveuploader/uploadpolicy.py	2010-06-23 15:54:45 +0000
+++ lib/lp/archiveuploader/uploadpolicy.py	2010-08-12 13:43:14 +0000
@@ -299,7 +299,8 @@
     def setOptions(self, options):
         AbstractUploadPolicy.setOptions(self, options)
         # We require a buildid to be provided
-        if getattr(options, 'buildid', None) is None:
+        if (getattr(options, 'buildid', None) is None and
+            not getattr(options, 'builds', False)):
             raise UploadPolicyError("BuildID required for buildd context")
 
     def policySpecificChecks(self, upload):

=== modified file 'lib/lp/archiveuploader/uploadprocessor.py'
--- lib/lp/archiveuploader/uploadprocessor.py	2010-08-12 13:42:54 +0000
+++ lib/lp/archiveuploader/uploadprocessor.py	2010-08-12 13:43:14 +0000
@@ -47,7 +47,9 @@
 
 __metaclass__ = type
 
+import datetime
 import os
+import pytz
 import shutil
 import stat
 import sys
@@ -61,9 +63,12 @@
     NascentUpload, FatalUploadError, EarlyReturnUploadError)
 from lp.archiveuploader.uploadpolicy import (
     UploadPolicyError)
+from lp.buildmaster.interfaces.buildbase import BuildStatus
 from lp.soyuz.interfaces.archive import IArchiveSet, NoSuchPPA
+from lp.soyuz.interfaces.binarypackagebuild import IBinaryPackageBuildSet
 from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.interfaces.person import IPersonSet
+from canonical.launchpad.scripts.logger import BufferLogger
 from canonical.launchpad.webapp.errorlog import (
     ErrorReportingUtility, ScriptRequest)
 
@@ -71,6 +76,7 @@
 
 __all__ = [
     'UploadProcessor',
+    'parse_build_upload_leaf_name',
     'parse_upload_path',
     ]
 
@@ -84,6 +90,19 @@
 """)
 
 
+def parse_build_upload_leaf_name(name):
+    """Parse the leaf directory name of a build upload.
+
+    :param name: Directory name.
+    :return: Tuple with build id and build queue record id.
+    """
+    (build_id_str, queue_record_str) = name.split("-", 1)
+    try:
+        return int(build_id_str), int(queue_record_str)
+    except TypeError:
+        raise ValueError
+
+
 class UploadStatusEnum:
     """Possible results from processing an upload.
 
@@ -108,8 +127,8 @@
 class UploadProcessor:
     """Responsible for processing uploads. See module docstring."""
 
-    def __init__(self, base_fsroot, dry_run, no_mails, keep, policy_for_distro,
-                 ztm, log):
+    def __init__(self, base_fsroot, dry_run, no_mails, builds, keep,
+                 policy_for_distro, ztm, log):
         """Create a new upload processor.
 
         :param base_fsroot: Root path for queue to use
@@ -128,6 +147,7 @@
         self.last_processed_upload = None
         self.log = log
         self.no_mails = no_mails
+        self.builds = builds
         self._getPolicyForDistro = policy_for_distro
         self.ztm = ztm
 
@@ -140,6 +160,7 @@
         This method also creates the 'incoming', 'accepted', 'rejected', and
         'failed' directories inside the base_fsroot if they don't yet exist.
         """
+        self.ztm.commit()
         try:
             self.log.debug("Beginning processing")
 
@@ -159,11 +180,63 @@
                     self.log.debug("Skipping %s -- does not match %s" % (
                         upload, leaf_name))
                     continue
-                self.processUpload(fsroot, upload)
+                if self.builds:
+                    # Upload directories contain build results,
+                    # directories are named after build ids.
+                    self.processBuildUpload(fsroot, upload)
+                else:
+                    self.processUpload(fsroot, upload)
         finally:
             self.log.debug("Rolling back any remaining transactions.")
             self.ztm.abort()
 
+    def processBuildUpload(self, fsroot, upload):
+        """Process an upload that is the result of a build.
+
+        The name of the leaf is the build id of the build.
+        Build uploads always contain a single package per leaf.
+        """
+        try:
+            (build_id, build_queue_record_id) = parse_build_upload_leaf_name(
+                upload)
+        except ValueError:
+            self.log.warn("Unable to extract build id from leaf name %s,"
+                " skipping." % upload)
+            return
+        build = getUtility(IBinaryPackageBuildSet).getByBuildID(int(build_id))
+        self.log.debug("Build %s found" % build.id)
+        logger = BufferLogger()
+        upload_path = os.path.join(fsroot, upload)
+        try:
+            [changes_file] = self.locateChangesFiles(upload_path)
+            logger.debug("Considering changefile %s" % changes_file)
+            result = self.processChangesFile(upload_path, changes_file,
+                    logger)
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            info = sys.exc_info()
+            message = (
+                'Exception while processing upload %s' % upload_path)
+            properties = [('error-explanation', message)]
+            request = ScriptRequest(properties)
+            error_utility = ErrorReportingUtility()
+            error_utility.raising(info, request)
+            logger.error('%s (%s)' % (message, request.oopsid))
+            result = UploadStatusEnum.FAILED
+        destination = {
+            UploadStatusEnum.FAILED: "failed",
+            UploadStatusEnum.REJECTED: "rejected",
+            UploadStatusEnum.ACCEPTED: "accepted"}[result]
+        self.moveProcessedUpload(upload_path, destination, logger)
+        if not (result == UploadStatusEnum.ACCEPTED and
+                build.verifySuccessfulUpload() and
+                build.status == BuildStatus.FULLYBUILT):
+            build.status = BuildStatus.FAILEDTOUPLOAD
+            build.date_finished = datetime.datetime.now(pytz.UTC)
+            build.notify(extra_info="Uploading build %s failed." % upload)
+        build.storeUploadLog(logger.buffer.getvalue())
+
     def processUpload(self, fsroot, upload):
         """Process an upload's changes files, and move it to a new directory.
 
@@ -184,7 +257,8 @@
         for changes_file in changes_files:
             self.log.debug("Considering changefile %s" % changes_file)
             try:
-                result = self.processChangesFile(upload_path, changes_file)
+                result = self.processChangesFile(upload_path, changes_file,
+                        self.log)
                 if result == UploadStatusEnum.FAILED:
                     some_failed = True
                 elif result == UploadStatusEnum.REJECTED:
@@ -214,8 +288,7 @@
             # There were no changes files at all. We consider
             # the upload to be failed in this case.
             destination = "failed"
-
-        self.moveProcessedUpload(upload_path, destination)
+        self.moveProcessedUpload(upload_path, destination, self.log)
 
     def locateDirectories(self, fsroot):
         """Return a list of upload directories in a given queue.
@@ -278,7 +351,7 @@
                         os.path.join(relative_path, filename))
         return self.orderFilenames(changes_files)
 
-    def processChangesFile(self, upload_path, changes_file):
+    def processChangesFile(self, upload_path, changes_file, logger=None):
         """Process a single changes file.
 
         This is done by obtaining the appropriate upload policy (according
@@ -295,6 +368,8 @@
         Returns a value from UploadStatusEnum, or re-raises an exception
         from NascentUpload.
         """
+        if logger is None:
+            logger = self.log
         # Calculate the distribution from the path within the upload
         # Reject the upload since we could not process the path,
         # Store the exception information as a rejection message.
@@ -331,7 +406,7 @@
                          "Please check the documentation at "
                          "https://help.launchpad.net/Packaging/PPA#Uploading "
                          "and update your configuration.")))
-        self.log.debug("Finding fresh policy")
+        logger.debug("Finding fresh policy")
         policy = self._getPolicyForDistro(distribution)
         policy.archive = archive
 
@@ -345,8 +420,12 @@
         # The path we want for NascentUpload is the path to the folder
         # containing the changes file (and the other files referenced by it).
         changesfile_path = os.path.join(upload_path, changes_file)
+<<<<<<< TREE
         upload = NascentUpload.from_changesfile_path(
             changesfile_path, policy, self.log)
+=======
+        upload = NascentUpload(changesfile_path, policy, logger)
+>>>>>>> MERGE-SOURCE
 
         # Reject source upload to buildd upload paths.
         first_path = relative_path.split(os.path.sep)[0]
@@ -358,7 +437,7 @@
                 "Invalid upload path (%s) for this policy (%s)" %
                 (relative_path, policy.name))
             upload.reject(error_message)
-            self.log.error(error_message)
+            logger.error(error_message)
 
         # Reject upload with path processing errors.
         if upload_path_error is not None:
@@ -368,7 +447,7 @@
         self.last_processed_upload = upload
 
         try:
-            self.log.info("Processing upload %s" % upload.changes.filename)
+            logger.info("Processing upload %s" % upload.changes.filename)
             result = UploadStatusEnum.ACCEPTED
 
             try:
@@ -376,11 +455,11 @@
             except UploadPolicyError, e:
                 upload.reject("UploadPolicyError escaped upload.process: "
                               "%s " % e)
-                self.log.debug("UploadPolicyError escaped upload.process",
+                logger.debug("UploadPolicyError escaped upload.process",
                                exc_info=True)
             except FatalUploadError, e:
                 upload.reject("UploadError escaped upload.process: %s" % e)
-                self.log.debug("UploadError escaped upload.process",
+                logger.debug("UploadError escaped upload.process",
                                exc_info=True)
             except (KeyboardInterrupt, SystemExit):
                 raise
@@ -398,7 +477,7 @@
                 # the new exception will be handled by the caller just like
                 # the one we caught would have been, by failing the upload
                 # with no email.
-                self.log.exception("Unhandled exception processing upload")
+                logger.exception("Unhandled exception processing upload")
                 upload.reject("Unhandled exception processing upload: %s" % e)
 
             # XXX julian 2007-05-25 bug=29744:
@@ -416,20 +495,20 @@
                 successful = upload.do_accept(notify=notify)
                 if not successful:
                     result = UploadStatusEnum.REJECTED
-                    self.log.info("Rejection during accept. "
+                    logger.info("Rejection during accept. "
                                   "Aborting partial accept.")
                     self.ztm.abort()
 
             if upload.is_rejected:
-                self.log.warn("Upload was rejected:")
+                logger.warn("Upload was rejected:")
                 for msg in upload.rejections:
-                    self.log.warn("\t%s" % msg)
+                    logger.warn("\t%s" % msg)
 
             if self.dry_run:
-                self.log.info("Dry run, aborting transaction.")
+                logger.info("Dry run, aborting transaction.")
                 self.ztm.abort()
             else:
-                self.log.info("Committing the transaction and any mails "
+                logger.info("Committing the transaction and any mails "
                               "associated with this upload.")
                 self.ztm.commit()
         except:
@@ -438,49 +517,49 @@
 
         return result
 
-    def removeUpload(self, upload):
+    def removeUpload(self, upload, logger):
         """Remove an upload that has succesfully been processed.
 
         This includes moving the given upload directory and moving the
         matching .distro file, if it exists.
         """
         if self.keep or self.dry_run:
-            self.log.debug("Keeping contents untouched")
+            logger.debug("Keeping contents untouched")
             return
 
         pathname = os.path.basename(upload)
 
-        self.log.debug("Removing upload directory %s", upload)
+        logger.debug("Removing upload directory %s", upload)
         shutil.rmtree(upload)
 
         distro_filename = upload + ".distro"
         if os.path.isfile(distro_filename):
-            self.log.debug("Removing distro file %s", distro_filename)
+            logger.debug("Removing distro file %s", distro_filename)
             os.remove(distro_filename)
 
-    def moveProcessedUpload(self, upload_path, destination):
+    def moveProcessedUpload(self, upload_path, destination, logger):
         """Move or remove the upload depending on the status of the upload.
         """
         if destination == "accepted":
-            self.removeUpload(upload_path)
+            self.removeUpload(upload_path, logger)
         else:
-            self.moveUpload(upload_path, destination)
+            self.moveUpload(upload_path, destination, logger)
 
-    def moveUpload(self, upload, subdir_name):
+    def moveUpload(self, upload, subdir_name, logger):
         """Move the upload to the named subdir of the root, eg 'accepted'.
 
         This includes moving the given upload directory and moving the
         matching .distro file, if it exists.
         """
         if self.keep or self.dry_run:
-            self.log.debug("Keeping contents untouched")
+            logger.debug("Keeping contents untouched")
             return
 
         pathname = os.path.basename(upload)
 
         target_path = os.path.join(
             self.base_fsroot, subdir_name, pathname)
-        self.log.debug("Moving upload directory %s to %s" %
+        logger.debug("Moving upload directory %s to %s" %
             (upload, target_path))
         shutil.move(upload, target_path)
 
@@ -488,7 +567,7 @@
         if os.path.isfile(distro_filename):
             target_path = os.path.join(self.base_fsroot, subdir_name,
                                        os.path.basename(distro_filename))
-            self.log.debug("Moving distro file %s to %s" % (distro_filename,
+            logger.debug("Moving distro file %s to %s" % (distro_filename,
                                                             target_path))
             shutil.move(distro_filename, target_path)
 

=== modified file 'lib/lp/soyuz/scripts/soyuz_process_upload.py'
--- lib/lp/soyuz/scripts/soyuz_process_upload.py	2010-08-12 13:42:54 +0000
+++ lib/lp/soyuz/scripts/soyuz_process_upload.py	2010-08-12 13:43:14 +0000
@@ -35,6 +35,11 @@
             help="Whether to suppress the sending of mails or not.")
 
         self.parser.add_option(
+            "--builds", action="store_true",
+            dest="builds", default=False,
+            help="Whether to interpret leaf names as build ids.")
+
+        self.parser.add_option(
             "-J", "--just-leaf", action="store", dest="leafname",
             default=None, help="A specific leaf dir to limit to.",
             metavar = "LEAF")


Follow ups