← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/garbo-locking into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/garbo-locking into lp:launchpad with lp:~stub/launchpad/garbo-logging as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~stub/launchpad/garbo-locking/+merge/56954

If we don't worry about a lock for the garbo script, and instead grab a lock for each individual task, we can happily have multiple garbo scripts running simultaneously. This allows us to cope better when there is a large backlog of work.

This branch also breaks out the core run-a-task-in-a-thread code into smaller components - the method was getting long and unweildy.
-- 
https://code.launchpad.net/~stub/launchpad/garbo-locking/+merge/56954
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/garbo-locking into lp:launchpad.
=== modified file 'cronscripts/garbo-daily.py'
--- cronscripts/garbo-daily.py	2010-10-14 22:31:11 +0000
+++ cronscripts/garbo-daily.py	2011-04-12 12:24:34 +0000
@@ -19,4 +19,3 @@
     script = DailyDatabaseGarbageCollector()
     script.continue_on_failure = True
     script.lock_and_run()
-

=== modified file 'cronscripts/garbo-hourly.py'
--- cronscripts/garbo-hourly.py	2010-10-14 22:31:11 +0000
+++ cronscripts/garbo-hourly.py	2011-04-12 12:24:34 +0000
@@ -19,4 +19,3 @@
     script = HourlyDatabaseGarbageCollector()
     script.continue_on_failure = True
     script.lock_and_run()
-

=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2011-04-12 12:24:31 +0000
+++ lib/lp/scripts/garbo.py	2011-04-12 12:24:34 +0000
@@ -36,6 +36,10 @@
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
+from contrib.glock import (
+    GlobalLock,
+    LockAlreadyAcquired,
+    )
 from canonical.config import config
 from canonical.database import postgresql
 from canonical.database.sqlbase import (
@@ -85,6 +89,7 @@
 from lp.services.memcache.interfaces import IMemcacheClient
 from lp.services.scripts.base import (
     LaunchpadCronScript,
+    LOCK_PATH,
     SilentLaunchpadScriptFailure,
     )
 from lp.services.session.model import SessionData
@@ -999,93 +1004,22 @@
             % multiprocessing.cpu_count())
 
     def main(self):
-        start_time = time.time()
+        self.start_time = time.time()
 
         # Stores the number of failed tasks.
         self.failure_count = 0
 
+        # Copy the list so we can safely consume it.
+        tunable_loops = list(self.tunable_loops)
         if self.options.experimental:
-            tunable_loops = list(
-                self.tunable_loops + self.experimental_tunable_loops)
-        else:
-            tunable_loops = list(self.tunable_loops)
-
-        a_very_long_time = float(31536000) # 1 year
-        abort_script = self.options.abort_script or a_very_long_time
-
-        def worker():
-            self.logger.debug(
-                "Worker thread %s running.", threading.currentThread().name)
-            self.login()
-            while True:
-                if start_time + abort_script - time.time() <= 0:
-                    # Exit silently. We warn later.
-                    self.logger.debug(
-                        "Worker thread %s detected script timeout.",
-                        threading.currentThread().name)
-                    break
-
-                num_remaining_tasks = len(tunable_loops)
-                if not num_remaining_tasks:
-                    break
-                tunable_loop_class = tunable_loops.pop(0)
-
-                loop_name = tunable_loop_class.__name__
-
-                # Configure logging for this loop to use a prefix. Log
-                # output from multiple threads will be interleaved, and
-                # this lets us tell log output from different tasks
-                # apart.
-                loop_logger = logging.getLogger('garbo.' + loop_name)
-                loop_logger.addFilter(PrefixFilter(loop_name))
-
-                loop_logger.info("Running %s", loop_name)
-
-                # How long until the script should abort.
-                remaining_script_time = (
-                    abort_script + start_time - time.time())
-
-                # How long until the task should abort.
-                if self.options.abort_task is not None:
-                    # Task timeout specified on command line.
-                    abort_task = self.options.abort_task
-
-                elif num_remaining_tasks <= self.options.threads:
-                    # We have a thread for every remaining task. Let the
-                    # task run until the script timeout.
-                    self.logger.debug2("Task may run until script timeout.")
-                    abort_task = remaining_script_time
-
-                else:
-                    # Evenly distribute the remaining time to the
-                    # remaining tasks.
-                    abort_task = (
-                        self.options.threads
-                        * remaining_script_time / num_remaining_tasks)
-
-                abort_time = min(abort_task, remaining_script_time)
-                self.logger.debug2(
-                    "Task will be terminated in %0.3f seconds", abort_time)
-
-                tunable_loop = tunable_loop_class(
-                    abort_time=abort_time, log=loop_logger)
-
-                if self._maximum_chunk_size is not None:
-                    tunable_loop.maximum_chunk_size = self._maximum_chunk_size
-
-                try:
-                    tunable_loop.run()
-                    loop_logger.debug("%s completed sucessfully.", loop_name)
-                except Exception:
-                    loop_logger.exception("Unhandled exception")
-                    self.failure_count += 1
-                finally:
-                    transaction.abort()
+            tunable_loops.extend(self.experimental_tunable_loops)
 
         threads = set()
         for count in range(0, self.options.threads):
             thread = threading.Thread(
-                target=worker, name='Worker-%d' % (count+1,))
+                target=self.run_tasks_in_thread,
+                name='Worker-%d' % (count+1,),
+                args=(tunable_loops,))
             thread.start()
             threads.add(thread)
 
@@ -1095,20 +1029,152 @@
         # down when the script timeout is hit, and the extra time is to
         # give them a chance to clean up.
         for thread in threads:
-            time_to_go = start_time + abort_script - time.time() + 60
+            time_to_go = self.get_remaining_script_time() + 60
             if time_to_go > 0:
                 thread.join(time_to_go)
             else:
                 break
 
         # If the script ran out of time, warn.
-        if start_time + abort_script - time.time() < 0:
+        if self.get_remaining_script_time() < 0:
             self.logger.warn(
-                "Script aborted after %d seconds.", abort_script)
+                "Script aborted after %d seconds.", self.script_timeout)
+
+        if tunable_loops:
+            self.logger.warn("%d tasks did not run.", len(tunable_loops))
 
         if self.failure_count:
+            self.logger.error("%d tasks failed.", self.failure_count)
             raise SilentLaunchpadScriptFailure(self.failure_count)
 
+    def get_remaining_script_time(self):
+        return self.start_time + self.script_timeout - time.time()
+
+    @property
+    def script_timeout(self):
+        a_very_long_time = 31536000 # 1 year
+        return self.options.abort_script or a_very_long_time
+
+    def get_loop_logger(self, loop_name):
+        """Retrieve a logger for use by a particular task.
+
+        The logger will be configured to add the loop_name as a
+        prefix to all log messages, making interleaved output from
+        multiple threads somewhat readable.
+        """
+        loop_logger = logging.getLogger('garbo.' + loop_name)
+        for filter in loop_logger.filters:
+            if isinstance(filter, PrefixFilter):
+                return loop_logger # Already have a PrefixFilter attached.
+        loop_logger.addFilter(PrefixFilter(loop_name))
+        return loop_logger
+
+    def get_loop_abort_time(self, num_remaining_tasks):
+        # How long until the task should abort.
+        if self.options.abort_task is not None:
+            # Task timeout specified on command line.
+            abort_task = self.options.abort_task
+
+        elif num_remaining_tasks <= self.options.threads:
+            # We have a thread for every remaining task. Let
+            # the task run until the script timeout.
+            self.logger.debug2(
+                "Task may run until script timeout.")
+            abort_task = self.get_remaining_script_time()
+
+        else:
+            # Evenly distribute the remaining time to the
+            # remaining tasks.
+            abort_task = (
+                self.options.threads
+                * self.get_remaining_script_time() / num_remaining_tasks)
+
+        return min(abort_task, self.get_remaining_script_time())
+
+    def run_tasks_in_thread(self, tunable_loops):
+        """Worker thread target to run tasks.
+
+        Tasks are removed from tunable_loops and run one at a time,
+        until all tasks that can be run have been run or the script
+        has timed out.
+        """
+        self.logger.debug(
+            "Worker thread %s running.", threading.currentThread().name)
+        self.login()
+
+        while True:
+            # How long until the script should abort.
+            if self.get_remaining_script_time() <= 0:
+                # Exit silently. We warn later.
+                self.logger.debug(
+                    "Worker thread %s detected script timeout.",
+                    threading.currentThread().name)
+                break
+
+            num_remaining_tasks = len(tunable_loops)
+            if not num_remaining_tasks:
+                break
+            tunable_loop_class = tunable_loops.pop(0)
+
+            loop_name = tunable_loop_class.__name__
+
+            loop_logger = self.get_loop_logger(loop_name)
+
+            # Aquire a lock for the task. Multiple garbo processes
+            # might be running simultaneously.
+            loop_lock_path = os.path.join(
+                LOCK_PATH, 'launchpad-garbo-%s.lock' % loop_name)
+            # No logger - too noisy, so report issues ourself.
+            loop_lock = GlobalLock(loop_lock_path, logger=None)
+            try:
+                loop_lock.acquire()
+                loop_logger.debug("Aquired lock %s.", loop_lock_path)
+            except LockAlreadyAcquired:
+                # If the lock cannot be acquired, but we have plenty
+                # of time remaining, just put the task back to the
+                # end of the queue.
+                if self.get_remaining_script_time() > 60:
+                    loop_logger.debug3(
+                        "Unable to acquire lock %s. Running elsewhere?",
+                        loop_lock_path)
+                    time.sleep(0.3) # Avoid spinning.
+                    tunable_loops.append(tunable_loop_class)
+                # Otherwise, emit a warning and skip the task.
+                else:
+                    loop_logger.warn(
+                        "Unable to acquire lock %s. Running elsewhere?",
+                        loop_lock_path)
+                continue
+
+            try:
+                loop_logger.info("Running %s", loop_name)
+
+                abort_time = self.get_loop_abort_time(num_remaining_tasks)
+                loop_logger.debug2(
+                    "Task will be terminated in %0.3f seconds",
+                    abort_time)
+
+                tunable_loop = tunable_loop_class(
+                    abort_time=abort_time, log=loop_logger)
+
+                # Allow the test suite to override the chunk size.
+                if self._maximum_chunk_size is not None:
+                    tunable_loop.maximum_chunk_size = (
+                        self._maximum_chunk_size)
+
+                try:
+                    tunable_loop.run()
+                    loop_logger.debug(
+                        "%s completed sucessfully.", loop_name)
+                except Exception:
+                    loop_logger.exception("Unhandled exception")
+                    self.failure_count += 1
+
+            finally:
+                loop_lock.release()
+                loop_logger.debug("Released lock %s.", loop_lock_path)
+                transaction.abort()
+
 
 class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
     script_name = 'garbo-hourly'

=== modified file 'lib/lp/services/scripts/base.py'
--- lib/lp/services/scripts/base.py	2011-04-06 18:03:54 +0000
+++ lib/lp/services/scripts/base.py	2011-04-12 12:24:34 +0000
@@ -6,7 +6,11 @@
     'LaunchpadCronScript',
     'LaunchpadScript',
     'LaunchpadScriptFailure',
+<<<<<<< TREE
     'disable_oops_handler',
+=======
+    'LOCK_PATH',
+>>>>>>> MERGE-SOURCE
     'SilentLaunchpadScriptFailure',
     ]
 


References