← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/parallel-garbo into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/parallel-garbo into lp:launchpad with lp:~stub/launchpad/garbo-bulk-pruner as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~stub/launchpad/parallel-garbo/+merge/55598

Run garbo tasks in parallel.
-- 
https://code.launchpad.net/~stub/launchpad/parallel-garbo/+merge/55598
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/parallel-garbo into lp:launchpad.
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2011-03-30 16:47:22 +0000
+++ lib/lp/scripts/garbo.py	2011-03-30 16:47:23 +0000
@@ -14,9 +14,11 @@
     timedelta,
     )
 from fixtures import TempDir
+import multiprocessing
 import os
 import signal
 import subprocess
+import threading
 import time
 
 from psycopg2 import IntegrityError
@@ -134,16 +136,23 @@
     # See `TunableLoop`. May be overridden.
     maximum_chunk_size = 10000
 
+    _unique_counter = 0
+
     def __init__(self, log, abort_time=None):
         super(BulkPruner, self).__init__(log, abort_time)
 
         self.store = IMasterStore(self.target_table_class)
         self.target_table_name = self.target_table_class.__storm_table__
 
+        self._unique_counter += 1
+        self.cursor_name = (
+            'bulkprunerid_%s_%d'
+            % (self.__class__.__name__, self._unique_counter)).lower()
+
         # Open the cursor.
         self.store.execute(
-            "DECLARE bulkprunerid NO SCROLL CURSOR WITH HOLD FOR %s"
-            % self.ids_to_prune_query)
+            "DECLARE %s NO SCROLL CURSOR WITH HOLD FOR %s"
+            % (self.cursor_name, self.ids_to_prune_query))
 
     _num_removed = None
 
@@ -156,15 +165,16 @@
         result = self.store.execute("""
             DELETE FROM %s WHERE %s IN (
                 SELECT id FROM
-                cursor_fetch('bulkprunerid', %d) AS f(id integer))
-            """
-            % (self.target_table_name, self.target_table_key, chunk_size))
+                cursor_fetch('%s', %d) AS f(id integer))
+            """ % (
+                self.target_table_name, self.target_table_key,
+                self.cursor_name, chunk_size))
         self._num_removed = result.rowcount
         transaction.commit()
 
     def cleanUp(self):
         """See `ITunableLoop`."""
-        self.store.execute("CLOSE bulkprunerid")
+        self.store.execute("CLOSE %s" % self.cursor_name)
 
 
 class POTranslationPruner(BulkPruner):
@@ -894,56 +904,99 @@
             default=False, action="store_true",
             help="Run experimental jobs. Normally this is just for staging.")
         self.parser.add_option("--abort-script",
-            dest="abort_script", default=None, action="store", type="int",
+            dest="abort_script", default=None, action="store", type="float",
             metavar="SECS", help="Abort script after SECS seconds.")
         self.parser.add_option("--abort-task",
-            dest="abort_task", default=None, action="store", type="int",
+            dest="abort_task", default=None, action="store", type="float",
             metavar="SECS", help="Abort a task if it runs over SECS seconds.")
+        self.parser.add_option("--threads",
+            dest="threads", default=multiprocessing.cpu_count(),
+            action="store", type="int", metavar='NUM',
+            help="Run NUM tasks in parallel [Default %d]."
+            % multiprocessing.cpu_count())
 
     def main(self):
         start_time = time.time()
-        failure_count = 0
+
+        # Stores the number of failed tasks.
+        self.failure_count = 0
 
         if self.options.experimental:
-            tunable_loops = (
+            tunable_loops = list(
                 self.tunable_loops + self.experimental_tunable_loops)
         else:
-            tunable_loops = self.tunable_loops
+            tunable_loops = list(self.tunable_loops)
 
         a_very_long_time = 31536000 # 1 year
         abort_task = self.options.abort_task or a_very_long_time
         abort_script = self.options.abort_script or a_very_long_time
 
-        for tunable_loop in tunable_loops:
-            self.logger.info("Running %s" % tunable_loop.__name__)
-
-            if abort_script <= 0:
-                self.logger.warn(
-                    "Script aborted after %d seconds." % abort_script)
+        def worker():
+            self.logger.debug(
+                "Worker thread %s running.", threading.currentThread().name)
+            self.login()
+            while True:
+                try:
+                    tunable_loop_class = tunable_loops.pop(0)
+                except IndexError:
+                    break
+
+                if start_time + abort_script - time.time() <= 0:
+                    # Exit silently. We warn later.
+                    self.logger.debug(
+                        "Worker thread %s detected script timeout.",
+                        threading.currentThread().name)
+                    break
+
+                self.logger.info("Running %s", tunable_loop_class.__name__)
+
+                abort_time = min(
+                    abort_task,
+                    abort_script + start_time - time.time())
+
+                tunable_loop = tunable_loop_class(
+                    abort_time=abort_time, log=self.logger)
+
+                if self._maximum_chunk_size is not None:
+                    tunable_loop.maximum_chunk_size = self._maximum_chunk_size
+
+                try:
+                    tunable_loop.run()
+                    self.logger.debug(
+                        "%s completed sucessfully.",
+                        tunable_loop_class.__name__)
+                except Exception:
+                    self.logger.exception("Unhandled exception")
+                    self.failure_count += 1
+                finally:
+                    transaction.abort()
+
+        threads = set()
+        for count in range(0, self.options.threads):
+            thread = threading.Thread(
+                target=worker,name='Worker-%d' % (count+1,))
+            thread.start()
+            threads.add(thread)
+
+        # Block until all the worker threads have completed. We block
+        # until the script timeout is hit, plus 60 seconds. We wait the
+        # extra time because the loops are supposed to shut themselves
+        # down when the script timeout is hit, and the extra time is to
+        # give them a chance to clean up.
+        for thread in threads:
+            time_to_go = start_time + abort_script - time.time() + 60
+            if time_to_go > 0:
+                thread.join(time_to_go)
+            else:
                 break
 
-            abort_time = min(
-                abort_task, abort_script + start_time - time.time())
-
-            tunable_loop = tunable_loop(
-                abort_time=abort_time, log=self.logger)
-
-            if self._maximum_chunk_size is not None:
-                tunable_loop.maximum_chunk_size = self._maximum_chunk_size
-
-            try:
-                tunable_loop.run()
-            except (KeyboardInterrupt, SystemExit):
-                raise
-            except:
-                if not self.continue_on_failure:
-                    raise
-                self.logger.exception("Unhandled exception")
-                failure_count += 1
-                transaction.abort()
-            transaction.abort()
-        if failure_count:
-            raise SilentLaunchpadScriptFailure(failure_count)
+        # If the script ran out of time, warn.
+        if start_time + abort_script - time.time() < 0:
+            self.logger.warn(
+                "Script aborted after %d seconds.", abort_script)
+
+        if self.failure_count:
+            raise SilentLaunchpadScriptFailure(self.failure_count)
 
 
 class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):

=== modified file 'lib/lp/services/scripts/base.py'
--- lib/lp/services/scripts/base.py	2011-03-29 00:11:57 +0000
+++ lib/lp/services/scripts/base.py	2011-03-30 16:47:23 +0000
@@ -227,7 +227,7 @@
     # Convenience or death
     #
     @log_unhandled_exception_and_exit
-    def login(self, user):
+    def login(self, user=ANONYMOUS):
         """Super-convenience method that avoids the import."""
         setupInteractionByEmail(user)
 


Follow ups