launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #03260
[Merge] lp:~stub/launchpad/garbo-locking into lp:launchpad
Stuart Bishop has proposed merging lp:~stub/launchpad/garbo-locking into lp:launchpad with lp:~stub/launchpad/garbo-logging as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~stub/launchpad/garbo-locking/+merge/56954
If we don't worry about a lock for the garbo script, and instead grab a lock for each individual task, we can happily have multiple garbo scripts running simultaneously. This allows us to cope better when there is a large backlog of work.
This branch also breaks out the core run-a-task-in-a-thread code into smaller components - the method was getting long and unweildy.
--
https://code.launchpad.net/~stub/launchpad/garbo-locking/+merge/56954
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/garbo-locking into lp:launchpad.
=== modified file 'cronscripts/garbo-daily.py'
--- cronscripts/garbo-daily.py 2010-10-14 22:31:11 +0000
+++ cronscripts/garbo-daily.py 2011-04-08 14:31:01 +0000
@@ -18,5 +18,4 @@
if __name__ == '__main__':
script = DailyDatabaseGarbageCollector()
script.continue_on_failure = True
- script.lock_and_run()
-
+ script.run() # No global lock. Garbo tasks use individual locks.
=== modified file 'cronscripts/garbo-hourly.py'
--- cronscripts/garbo-hourly.py 2010-10-14 22:31:11 +0000
+++ cronscripts/garbo-hourly.py 2011-04-08 14:31:01 +0000
@@ -18,5 +18,4 @@
if __name__ == '__main__':
script = HourlyDatabaseGarbageCollector()
script.continue_on_failure = True
- script.lock_and_run()
-
+ script.run() # No global lock. Garbo tasks use individual locks.
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py 2011-04-08 14:31:00 +0000
+++ lib/lp/scripts/garbo.py 2011-04-08 14:31:01 +0000
@@ -36,6 +36,10 @@
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
+from contrib.glock import (
+ GlobalLock,
+ LockAlreadyAcquired,
+ )
from canonical.config import config
from canonical.database import postgresql
from canonical.database.sqlbase import (
@@ -85,6 +89,7 @@
from lp.services.memcache.interfaces import IMemcacheClient
from lp.services.scripts.base import (
LaunchpadCronScript,
+ LOCK_PATH,
SilentLaunchpadScriptFailure,
)
from lp.services.session.model import SessionData
@@ -999,93 +1004,22 @@
% multiprocessing.cpu_count())
def main(self):
- start_time = time.time()
+ self.start_time = time.time()
# Stores the number of failed tasks.
self.failure_count = 0
+ # Copy the list so we can safely consume it.
+ tunable_loops = list(self.tunable_loops)
if self.options.experimental:
- tunable_loops = list(
- self.tunable_loops + self.experimental_tunable_loops)
- else:
- tunable_loops = list(self.tunable_loops)
-
- a_very_long_time = float(31536000) # 1 year
- abort_script = self.options.abort_script or a_very_long_time
-
- def worker():
- self.logger.debug(
- "Worker thread %s running.", threading.currentThread().name)
- self.login()
- while True:
- if start_time + abort_script - time.time() <= 0:
- # Exit silently. We warn later.
- self.logger.debug(
- "Worker thread %s detected script timeout.",
- threading.currentThread().name)
- break
-
- num_remaining_tasks = len(tunable_loops)
- if not num_remaining_tasks:
- break
- tunable_loop_class = tunable_loops.pop(0)
-
- loop_name = tunable_loop_class.__name__
-
- # Configure logging for this loop to use a prefix. Log
- # output from multiple threads will be interleaved, and
- # this lets us tell log output from different tasks
- # apart.
- loop_logger = logging.getLogger('garbo.' + loop_name)
- loop_logger.addFilter(PrefixFilter(loop_name))
-
- loop_logger.info("Running %s", loop_name)
-
- # How long until the script should abort.
- remaining_script_time = (
- abort_script + start_time - time.time())
-
- # How long until the task should abort.
- if self.options.abort_task is not None:
- # Task timeout specified on command line.
- abort_task = self.options.abort_task
-
- elif num_remaining_tasks <= self.options.threads:
- # We have a thread for every remaining task. Let the
- # task run until the script timeout.
- self.logger.debug2("Task may run until script timeout.")
- abort_task = remaining_script_time
-
- else:
- # Evenly distribute the remaining time to the
- # remaining tasks.
- abort_task = (
- self.options.threads
- * remaining_script_time / num_remaining_tasks)
-
- abort_time = min(abort_task, remaining_script_time)
- self.logger.debug2(
- "Task will be terminated in %0.3f seconds", abort_time)
-
- tunable_loop = tunable_loop_class(
- abort_time=abort_time, log=loop_logger)
-
- if self._maximum_chunk_size is not None:
- tunable_loop.maximum_chunk_size = self._maximum_chunk_size
-
- try:
- tunable_loop.run()
- loop_logger.debug("%s completed sucessfully.", loop_name)
- except Exception:
- loop_logger.exception("Unhandled exception")
- self.failure_count += 1
- finally:
- transaction.abort()
+ tunable_loops.extend(self.experimental_tunable_loops)
threads = set()
for count in range(0, self.options.threads):
thread = threading.Thread(
- target=worker, name='Worker-%d' % (count+1,))
+ target=self.run_tasks_in_thread,
+ name='Worker-%d' % (count+1,),
+ args=(tunable_loops,))
thread.start()
threads.add(thread)
@@ -1095,20 +1029,152 @@
# down when the script timeout is hit, and the extra time is to
# give them a chance to clean up.
for thread in threads:
- time_to_go = start_time + abort_script - time.time() + 60
+ time_to_go = self.get_remaining_script_time() + 60
if time_to_go > 0:
thread.join(time_to_go)
else:
break
# If the script ran out of time, warn.
- if start_time + abort_script - time.time() < 0:
+ if self.get_remaining_script_time() < 0:
self.logger.warn(
- "Script aborted after %d seconds.", abort_script)
+ "Script aborted after %d seconds.", self.script_timeout)
+
+ if tunable_loops:
+ self.logger.warn("%d tasks did not run.", len(tunable_loops))
if self.failure_count:
+ self.logger.error("%d tasks failed.", self.failure_count)
raise SilentLaunchpadScriptFailure(self.failure_count)
+ def get_remaining_script_time(self):
+ return self.start_time + self.script_timeout - time.time()
+
+ @property
+ def script_timeout(self):
+ a_very_long_time = 31536000 # 1 year
+ return self.options.abort_script or a_very_long_time
+
+ def get_loop_logger(self, loop_name):
+ """Retrieve a logger for use by a particular task.
+
+ The logger will be configured to add the loop_name as a
+ prefix to all log messages, making interleaved output from
+ multiple threads somewhat readable.
+ """
+ loop_logger = logging.getLogger('garbo.' + loop_name)
+ for filter in loop_logger.filters:
+ if isinstance(filter, PrefixFilter):
+ return loop_logger # Already have a PrefixFilter attached.
+ loop_logger.addFilter(PrefixFilter(loop_name))
+ return loop_logger
+
+ def get_loop_abort_time(self, num_remaining_tasks):
+ # How long until the task should abort.
+ if self.options.abort_task is not None:
+ # Task timeout specified on command line.
+ abort_task = self.options.abort_task
+
+ elif num_remaining_tasks <= self.options.threads:
+ # We have a thread for every remaining task. Let
+ # the task run until the script timeout.
+ self.logger.debug2(
+ "Task may run until script timeout.")
+ abort_task = self.get_remaining_script_time()
+
+ else:
+ # Evenly distribute the remaining time to the
+ # remaining tasks.
+ abort_task = (
+ self.options.threads
+ * self.get_remaining_script_time() / num_remaining_tasks)
+
+ return min(abort_task, self.get_remaining_script_time())
+
+ def run_tasks_in_thread(self, tunable_loops):
+ """Worker thread target to run tasks.
+
+ Tasks are removed from tunable_loops and run one at a time,
+ until all tasks that can be run have been run or the script
+ has timed out.
+ """
+ self.logger.debug(
+ "Worker thread %s running.", threading.currentThread().name)
+ self.login()
+
+ while True:
+ # How long until the script should abort.
+ if self.get_remaining_script_time() <= 0:
+ # Exit silently. We warn later.
+ self.logger.debug(
+ "Worker thread %s detected script timeout.",
+ threading.currentThread().name)
+ break
+
+ num_remaining_tasks = len(tunable_loops)
+ if not num_remaining_tasks:
+ break
+ tunable_loop_class = tunable_loops.pop(0)
+
+ loop_name = tunable_loop_class.__name__
+
+ loop_logger = self.get_loop_logger(loop_name)
+
+ # Aquire a lock for the task. Multiple garbo processes
+ # might be running simultaneously.
+ loop_lock_path = os.path.join(
+ LOCK_PATH, 'launchpad-garbo-%s.lock' % loop_name)
+ # No logger - too noisy, so report issues ourself.
+ loop_lock = GlobalLock(loop_lock_path, logger=None)
+ try:
+ loop_lock.acquire()
+ loop_logger.debug("Aquired lock %s.", loop_lock_path)
+ except LockAlreadyAcquired:
+ # If the lock cannot be acquired, but we have plenty
+ # of time remaining, just put the task back to the
+ # end of the queue.
+ if self.get_remaining_script_time() > 60:
+ loop_logger.debug3(
+ "Unable to acquire lock %s. Running elsewhere?",
+ loop_lock_path)
+ time.sleep(0.3) # Avoid spinning.
+ tunable_loops.append(tunable_loop_class)
+ # Otherwise, emit a warning and skip the task.
+ else:
+ loop_logger.warn(
+ "Unable to acquire lock %s. Running elsewhere?",
+ loop_lock_path)
+ continue
+
+ try:
+ loop_logger.info("Running %s", loop_name)
+
+ abort_time = self.get_loop_abort_time(num_remaining_tasks)
+ loop_logger.debug2(
+ "Task will be terminated in %0.3f seconds",
+ abort_time)
+
+ tunable_loop = tunable_loop_class(
+ abort_time=abort_time, log=loop_logger)
+
+ # Allow the test suite to override the chunk size.
+ if self._maximum_chunk_size is not None:
+ tunable_loop.maximum_chunk_size = (
+ self._maximum_chunk_size)
+
+ try:
+ tunable_loop.run()
+ loop_logger.debug(
+ "%s completed sucessfully.", loop_name)
+ except Exception:
+ loop_logger.exception("Unhandled exception")
+ self.failure_count += 1
+
+ finally:
+ loop_lock.release()
+ loop_logger.debug("Released lock %s.", loop_lock_path)
+ transaction.abort()
+
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
script_name = 'garbo-hourly'
=== modified file 'lib/lp/services/scripts/base.py'
--- lib/lp/services/scripts/base.py 2011-03-30 16:39:06 +0000
+++ lib/lp/services/scripts/base.py 2011-04-08 14:31:01 +0000
@@ -6,6 +6,7 @@
'LaunchpadCronScript',
'LaunchpadScript',
'LaunchpadScriptFailure',
+ 'LOCK_PATH',
'SilentLaunchpadScriptFailure',
]
Follow ups