launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #03125
[Merge] lp:~stub/launchpad/parallel-garbo into lp:launchpad
Stuart Bishop has proposed merging lp:~stub/launchpad/parallel-garbo into lp:launchpad with lp:~stub/launchpad/garbo-bulk-pruner as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~stub/launchpad/parallel-garbo/+merge/55598
Run garbo tasks in parallel.
--
https://code.launchpad.net/~stub/launchpad/parallel-garbo/+merge/55598
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/parallel-garbo into lp:launchpad.
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py 2011-03-30 16:47:22 +0000
+++ lib/lp/scripts/garbo.py 2011-03-30 16:47:23 +0000
@@ -14,9 +14,11 @@
timedelta,
)
from fixtures import TempDir
+import multiprocessing
import os
import signal
import subprocess
+import threading
import time
from psycopg2 import IntegrityError
@@ -134,16 +136,23 @@
# See `TunableLoop`. May be overridden.
maximum_chunk_size = 10000
+ _unique_counter = 0
+
def __init__(self, log, abort_time=None):
super(BulkPruner, self).__init__(log, abort_time)
self.store = IMasterStore(self.target_table_class)
self.target_table_name = self.target_table_class.__storm_table__
+ self._unique_counter += 1
+ self.cursor_name = (
+ 'bulkprunerid_%s_%d'
+ % (self.__class__.__name__, self._unique_counter)).lower()
+
# Open the cursor.
self.store.execute(
- "DECLARE bulkprunerid NO SCROLL CURSOR WITH HOLD FOR %s"
- % self.ids_to_prune_query)
+ "DECLARE %s NO SCROLL CURSOR WITH HOLD FOR %s"
+ % (self.cursor_name, self.ids_to_prune_query))
_num_removed = None
@@ -156,15 +165,16 @@
result = self.store.execute("""
DELETE FROM %s WHERE %s IN (
SELECT id FROM
- cursor_fetch('bulkprunerid', %d) AS f(id integer))
- """
- % (self.target_table_name, self.target_table_key, chunk_size))
+ cursor_fetch('%s', %d) AS f(id integer))
+ """ % (
+ self.target_table_name, self.target_table_key,
+ self.cursor_name, chunk_size))
self._num_removed = result.rowcount
transaction.commit()
def cleanUp(self):
"""See `ITunableLoop`."""
- self.store.execute("CLOSE bulkprunerid")
+ self.store.execute("CLOSE %s" % self.cursor_name)
class POTranslationPruner(BulkPruner):
@@ -894,56 +904,99 @@
default=False, action="store_true",
help="Run experimental jobs. Normally this is just for staging.")
self.parser.add_option("--abort-script",
- dest="abort_script", default=None, action="store", type="int",
+ dest="abort_script", default=None, action="store", type="float",
metavar="SECS", help="Abort script after SECS seconds.")
self.parser.add_option("--abort-task",
- dest="abort_task", default=None, action="store", type="int",
+ dest="abort_task", default=None, action="store", type="float",
metavar="SECS", help="Abort a task if it runs over SECS seconds.")
+ self.parser.add_option("--threads",
+ dest="threads", default=multiprocessing.cpu_count(),
+ action="store", type="int", metavar='NUM',
+ help="Run NUM tasks in parallel [Default %d]."
+ % multiprocessing.cpu_count())
def main(self):
start_time = time.time()
- failure_count = 0
+
+ # Stores the number of failed tasks.
+ self.failure_count = 0
if self.options.experimental:
- tunable_loops = (
+ tunable_loops = list(
self.tunable_loops + self.experimental_tunable_loops)
else:
- tunable_loops = self.tunable_loops
+ tunable_loops = list(self.tunable_loops)
a_very_long_time = 31536000 # 1 year
abort_task = self.options.abort_task or a_very_long_time
abort_script = self.options.abort_script or a_very_long_time
- for tunable_loop in tunable_loops:
- self.logger.info("Running %s" % tunable_loop.__name__)
-
- if abort_script <= 0:
- self.logger.warn(
- "Script aborted after %d seconds." % abort_script)
+ def worker():
+ self.logger.debug(
+ "Worker thread %s running.", threading.currentThread().name)
+ self.login()
+ while True:
+ try:
+ tunable_loop_class = tunable_loops.pop(0)
+ except IndexError:
+ break
+
+ if start_time + abort_script - time.time() <= 0:
+ # Exit silently. We warn later.
+ self.logger.debug(
+ "Worker thread %s detected script timeout.",
+ threading.currentThread().name)
+ break
+
+ self.logger.info("Running %s", tunable_loop_class.__name__)
+
+ abort_time = min(
+ abort_task,
+ abort_script + start_time - time.time())
+
+ tunable_loop = tunable_loop_class(
+ abort_time=abort_time, log=self.logger)
+
+ if self._maximum_chunk_size is not None:
+ tunable_loop.maximum_chunk_size = self._maximum_chunk_size
+
+ try:
+ tunable_loop.run()
+ self.logger.debug(
+ "%s completed sucessfully.",
+ tunable_loop_class.__name__)
+ except Exception:
+ self.logger.exception("Unhandled exception")
+ self.failure_count += 1
+ finally:
+ transaction.abort()
+
+ threads = set()
+ for count in range(0, self.options.threads):
+ thread = threading.Thread(
+ target=worker,name='Worker-%d' % (count+1,))
+ thread.start()
+ threads.add(thread)
+
+ # Block until all the worker threads have completed. We block
+ # until the script timeout is hit, plus 60 seconds. We wait the
+ # extra time because the loops are supposed to shut themselves
+ # down when the script timeout is hit, and the extra time is to
+ # give them a chance to clean up.
+ for thread in threads:
+ time_to_go = start_time + abort_script - time.time() + 60
+ if time_to_go > 0:
+ thread.join(time_to_go)
+ else:
break
- abort_time = min(
- abort_task, abort_script + start_time - time.time())
-
- tunable_loop = tunable_loop(
- abort_time=abort_time, log=self.logger)
-
- if self._maximum_chunk_size is not None:
- tunable_loop.maximum_chunk_size = self._maximum_chunk_size
-
- try:
- tunable_loop.run()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- if not self.continue_on_failure:
- raise
- self.logger.exception("Unhandled exception")
- failure_count += 1
- transaction.abort()
- transaction.abort()
- if failure_count:
- raise SilentLaunchpadScriptFailure(failure_count)
+ # If the script ran out of time, warn.
+ if start_time + abort_script - time.time() < 0:
+ self.logger.warn(
+ "Script aborted after %d seconds.", abort_script)
+
+ if self.failure_count:
+ raise SilentLaunchpadScriptFailure(self.failure_count)
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
=== modified file 'lib/lp/services/scripts/base.py'
--- lib/lp/services/scripts/base.py 2011-03-29 00:11:57 +0000
+++ lib/lp/services/scripts/base.py 2011-03-30 16:47:23 +0000
@@ -227,7 +227,7 @@
# Convenience or death
#
@log_unhandled_exception_and_exit
- def login(self, user):
+ def login(self, user=ANONYMOUS):
"""Super-convenience method that avoids the import."""
setupInteractionByEmail(user)
Follow ups