← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/parallel-germinate into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/parallel-germinate into lp:launchpad.

Commit message:
Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1248867 in Launchpad itself: "Investigate parallelising cron.germinate by arch"
  https://bugs.launchpad.net/launchpad/+bug/1248867

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/parallel-germinate/+merge/220610

Parallelise generate-extra-overrides by architecture, saving about two minutes off each publisher run.

I considered using the multiprocessing module, but gave up after six hours of fighting with obscure errors; it's simple enough to do by hand.  Note that attempts to do this using threading instead are unlikely to scale well, as germinate is CPU-heavy and Python threads all have to share the GIL.
-- 
https://code.launchpad.net/~cjwatson/launchpad/parallel-germinate/+merge/220610
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/parallel-germinate into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/scripts/generate_extra_overrides.py'
--- lib/lp/archivepublisher/scripts/generate_extra_overrides.py	2012-11-10 02:21:31 +0000
+++ lib/lp/archivepublisher/scripts/generate_extra_overrides.py	2014-05-22 10:42:13 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011-2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2014 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Generate extra overrides using Germinate."""
@@ -8,12 +8,16 @@
     'GenerateExtraOverrides',
     ]
 
+import copy
 from functools import partial
 import glob
 import logging
 from optparse import OptionValueError
 import os
+import pickle
 import re
+from StringIO import StringIO
+import traceback
 
 from germinate.archive import TagFile
 from germinate.germinator import Germinator
@@ -54,6 +58,21 @@
             os.rename("%s.new" % self.filename, self.filename)
 
 
+class BufferHandler(logging.Handler):
+    """A log handler which stores records for emission by another logger."""
+
+    def __init__(self):
+        super(BufferHandler, self).__init__()
+        self.records = []
+
+    def emit(self, record):
+        # Record arguments may not be available at the other end.
+        record_copy = copy.copy(record)
+        record_copy.msg = record.getMessage()
+        record_copy.args = None
+        self.records.append(pickle.dumps(record_copy, -1))
+
+
 def find_operable_series(distribution):
     """Find all the series we can operate on in this distribution.
 
@@ -133,9 +152,9 @@
         self.germinate_logger.setLevel(logging.INFO)
         self.log_file = os.path.join(
             self.config.germinateroot, "germinate.output")
-        handler = logging.FileHandler(self.log_file, mode="w")
-        handler.setFormatter(GerminateFormatter())
-        self.germinate_logger.addHandler(handler)
+        self.log_handler = logging.FileHandler(self.log_file, mode="w")
+        self.log_handler.setFormatter(GerminateFormatter())
+        self.germinate_logger.addHandler(self.log_handler)
         self.germinate_logger.propagate = False
 
     def setUp(self):
@@ -306,9 +325,15 @@
         if "build-essential" in structure.names and primary_flavour:
             write_overrides("build-essential", "Build-Essential", "yes")
 
-    def germinateArch(self, override_file, series_name, components, arch,
-                      flavours, structures, seed_outputs=None):
+    def germinateArch(self, series_name, components, arch, flavours,
+                      structures):
         """Germinate seeds on all flavours for a single architecture."""
+        # Buffer log output for each architecture so that it appears
+        # sequential.
+        self.germinate_logger.removeHandler(self.log_handler)
+        log_handler = BufferHandler()
+        self.germinate_logger.addHandler(log_handler)
+
         germinator = Germinator(arch)
 
         # Read archive metadata.
@@ -317,6 +342,8 @@
             "file://%s" % self.config.archiveroot, cleanup=True)
         germinator.parse_archive(archive)
 
+        override_file = StringIO()
+        seed_outputs = set()
         for flavour in flavours:
             self.logger.info(
                 "Germinating for %s/%s/%s", flavour, series_name, arch)
@@ -331,6 +358,20 @@
                 structures[flavour], flavour == flavours[0],
                 seed_outputs=seed_outputs)
 
+        return log_handler.records, override_file.getvalue(), seed_outputs
+
+    def germinateArchChild(self, close_in_child, wfd, *args):
+        """Helper method to call germinateArch in a forked child process."""
+        try:
+            for fd in close_in_child:
+                os.close(fd)
+            with os.fdopen(wfd, "wb") as writer:
+                pickle.dump(self.germinateArch(*args), writer, -1)
+            return 0
+        except:
+            traceback.print_exc()
+            return 1
+
     def removeStaleOutputs(self, series_name, seed_outputs):
         """Remove stale outputs for a series.
 
@@ -348,15 +389,35 @@
             series_name, flavours, seed_bases=seed_bases)
 
         if structures:
+            procs = []
+            close_in_child = []
+            for arch in architectures:
+                rfd, wfd = os.pipe()
+                close_in_child.append(rfd)
+                pid = os.fork()
+                if pid == 0:  # child
+                    os._exit(self.germinateArchChild(
+                        close_in_child, wfd,
+                        series_name, components, arch, flavours, structures))
+                else:  # parent
+                    os.close(wfd)
+                    reader = os.fdopen(rfd, "rb")
+                    procs.append((pid, reader))
+
             seed_outputs = set()
             override_path = os.path.join(
                 self.config.miscroot,
                 "more-extra.override.%s.main" % series_name)
             with AtomicFile(override_path) as override_file:
-                for arch in architectures:
-                    self.germinateArch(
-                        override_file, series_name, components, arch,
-                        flavours, structures, seed_outputs=seed_outputs)
+                for pid, reader in procs:
+                    log_records, overrides, arch_seed_outputs = pickle.load(
+                        reader)
+                    for log_record in log_records:
+                        self.germinate_logger.handle(pickle.loads(log_record))
+                    override_file.write(overrides)
+                    seed_outputs |= arch_seed_outputs
+                    reader.close()
+                    os.waitpid(pid, 0)
             self.removeStaleOutputs(series_name, seed_outputs)
 
     def process(self, seed_bases=None):

=== modified file 'lib/lp/archivepublisher/tests/test_generate_extra_overrides.py'
--- lib/lp/archivepublisher/tests/test_generate_extra_overrides.py	2013-09-24 05:45:06 +0000
+++ lib/lp/archivepublisher/tests/test_generate_extra_overrides.py	2014-05-22 10:42:13 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011-2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2014 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test for the `generate-extra-overrides` script."""
@@ -9,7 +9,6 @@
 import logging
 from optparse import OptionValueError
 import os
-import tempfile
 
 from germinate import (
     archive,
@@ -356,13 +355,10 @@
             distroseries.name, flavours,
             seed_bases=["file://%s" % self.seeddir])
 
-        override_fd, override_path = tempfile.mkstemp()
-        with os.fdopen(override_fd, "w") as override_file:
-            script.germinateArch(
-                override_file, distroseries.name,
-                script.getComponents(distroseries), arch, flavours,
-                structures)
-        return file_contents(override_path).splitlines()
+        _, overrides, _ = script.germinateArch(
+            distroseries.name, script.getComponents(distroseries), arch,
+            flavours, structures)
+        return overrides.splitlines()
 
     def test_germinate_output(self):
         # A single call to germinateArch produces output for all flavours on


Follow ups