← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/garbo into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/garbo into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #721195 in Launchpad itself: "Remove garbage from POTranslation"
  https://bugs.launchpad.net/launchpad/+bug/721195

For more details, see:
https://code.launchpad.net/~stub/launchpad/garbo/+merge/56749

Now garbo tasks can be run in parallel, the default static timeouts no longer make sense. Calculate the default task timeout at run time.
-- 
https://code.launchpad.net/~stub/launchpad/garbo/+merge/56749
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/garbo into lp:launchpad.
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2011-03-31 20:55:25 +0000
+++ lib/lp/scripts/garbo.py	2011-04-08 08:12:55 +0000
@@ -962,6 +962,9 @@
     continue_on_failure = False # If True, an exception in a tunable loop
                                 # does not cause the script to abort.
 
+    # Default run time of the script in seconds. Override.
+    default_abort_script_time = None
+
     # _maximum_chunk_size is used to override the defined
     # maximum_chunk_size to allow our tests to ensure multiple calls to
     # __call__ are required without creating huge amounts of test data.
@@ -974,15 +977,19 @@
             test_args=test_args)
 
     def add_my_options(self):
+
         self.parser.add_option("-x", "--experimental", dest="experimental",
             default=False, action="store_true",
             help="Run experimental jobs. Normally this is just for staging.")
         self.parser.add_option("--abort-script",
-            dest="abort_script", default=None, action="store", type="float",
-            metavar="SECS", help="Abort script after SECS seconds.")
+            dest="abort_script", default=self.default_abort_script_time,
+            action="store", type="float", metavar="SECS",
+            help="Abort script after SECS seconds [Default %d]."
+            % self.default_abort_script_time)
         self.parser.add_option("--abort-task",
             dest="abort_task", default=None, action="store", type="float",
-            metavar="SECS", help="Abort a task if it runs over SECS seconds.")
+            metavar="SECS", help="Abort a task if it runs over SECS seconds "
+                "[Default (threads * abort_script / tasks)].")
         self.parser.add_option("--threads",
             dest="threads", default=multiprocessing.cpu_count(),
             action="store", type="int", metavar='NUM',
@@ -1001,8 +1008,7 @@
         else:
             tunable_loops = list(self.tunable_loops)
 
-        a_very_long_time = 31536000 # 1 year
-        abort_task = self.options.abort_task or a_very_long_time
+        a_very_long_time = float(31536000) # 1 year
         abort_script = self.options.abort_script or a_very_long_time
 
         def worker():
@@ -1010,11 +1016,6 @@
                 "Worker thread %s running.", threading.currentThread().name)
             self.login()
             while True:
-                try:
-                    tunable_loop_class = tunable_loops.pop(0)
-                except IndexError:
-                    break
-
                 if start_time + abort_script - time.time() <= 0:
                     # Exit silently. We warn later.
                     self.logger.debug(
@@ -1022,12 +1023,39 @@
                         threading.currentThread().name)
                     break
 
+                num_remaining_tasks = len(tunable_loops)
+                if not num_remaining_tasks:
+                    break
+                tunable_loop_class = tunable_loops.pop(0)
+
                 self.logger.info("Running %s", tunable_loop_class.__name__)
 
-                abort_time = min(
-                    abort_task,
+                # How long until the script should abort.
+                remaining_script_time = (
                     abort_script + start_time - time.time())
 
+                # How long until the task should abort.
+                if self.options.abort_task is not None:
+                    # Task timeout specified on command line.
+                    abort_task = self.options.abort_task
+
+                elif num_remaining_tasks <= self.options.threads:
+                    # We have a thread for every remaining task. Let the
+                    # task run until the script timeout.
+                    self.logger.debug2("Task may run until script timeout.")
+                    abort_task = remaining_script_time
+
+                else:
+                    # Evenly distribute the remaining time to the
+                    # remaining tasks.
+                    abort_task = (
+                        self.options.threads
+                        * remaining_script_time / num_remaining_tasks)
+
+                abort_time = min(abort_task, remaining_script_time)
+                self.logger.debug2(
+                    "Task will be terminated in %0.3f seconds", abort_time)
+
                 tunable_loop = tunable_loop_class(
                     abort_time=abort_time, log=self.logger)
 
@@ -1048,7 +1076,7 @@
         threads = set()
         for count in range(0, self.options.threads):
             thread = threading.Thread(
-                target=worker,name='Worker-%d' % (count+1,))
+                target=worker, name='Worker-%d' % (count+1,))
             thread.start()
             threads.add(thread)
 
@@ -1058,9 +1086,7 @@
         # down when the script timeout is hit, and the extra time is to
         # give them a chance to clean up.
         for thread in threads:
-            time_to_go = min(
-                abort_task,
-                start_time + abort_script - time.time()) + 60
+            time_to_go = start_time + abort_script - time.time() + 60
             if time_to_go > 0:
                 thread.join(time_to_go)
             else:
@@ -1091,12 +1117,7 @@
         ]
     experimental_tunable_loops = []
 
-    def add_my_options(self):
-        super(HourlyDatabaseGarbageCollector, self).add_my_options()
-        # By default, abort any tunable loop taking more than 15 minutes.
-        self.parser.set_defaults(abort_task=900)
-        # And abort the script if it takes more than 55 minutes.
-        self.parser.set_defaults(abort_script=55*60)
+    default_abort_script_time = 60 * 60
 
 
 class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
@@ -1118,7 +1139,4 @@
         PersonPruner,
         ]
 
-    def add_my_options(self):
-        super(DailyDatabaseGarbageCollector, self).add_my_options()
-        # Abort script after 24 hours by default.
-        self.parser.set_defaults(abort_script=86400)
+    default_abort_script_time = 60 * 60 * 24


References