launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04010
[Merge] lp:~abentley/launchpad/memlimit-jobs into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/memlimit-jobs into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #786804 in Launchpad itself: "branch scanner rlimit failures cause the next branch to be incorrectly scanned and fail"
https://bugs.launchpad.net/launchpad/+bug/786804
For more details, see:
https://code.launchpad.net/~abentley/launchpad/memlimit-jobs/+merge/65256
= Summary =
Fix bug #786804: branch scanner rlimit failures cause the next branch to be incorrectly scanned and fail.
== Proposed fix ==
Memory-limit the job, rather than the script itself.
== Pre-implementation notes ==
None
== Implementation details ==
There are interlocking fixes:
1. When a job dies due to an exception, its process is terminated. We assume that its abrupt termination may have led to an unclean process state, as seen in the bug.
2. We provide an infrastructure that allows jobs run by the TwistedJobRunner to be run with a memory limit.
3. We move the branch scan memory limit to the BranchScanJob and use the TwistedJobRunner. Using the TwistedJobRunner requires allowing access to IBranchJob.id, and providing BranchScanJob.get.
Additionally, I
- Extracted run_reactor, since it's not Job-specific.
- Added ErrorReportingUtility.getOopsReportById (and UniqueFileAllocator.listRecentReportFiles) to make our tests less race-prone.
- Provided an easy way to turn on Twisted logging.
== Tests ==
bin/test -t scan_branches -t test_previous_failure_gives_new_process -t test_successful_jobs_share_process -t test_memory_hog_job
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/code/configure.zcml
cronscripts/scan_branches.py
lib/lp/services/job/runner.py
lib/lp/services/job/interfaces/job.py
lib/lp/code/interfaces/branchjob.py
lib/canonical/launchpad/webapp/errorlog.py
lib/lp/services/twistedsupport/__init__.py
lib/lp/code/model/branchjob.py
lib/lp/services/log/uniquefileallocator.py
./cronscripts/scan_branches.py
10: '_pythonpath' imported but unused
--
https://code.launchpad.net/~abentley/launchpad/memlimit-jobs/+merge/65256
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/memlimit-jobs into lp:launchpad.
=== modified file 'cronscripts/scan_branches.py'
--- cronscripts/scan_branches.py 2010-12-15 05:29:34 +0000
+++ cronscripts/scan_branches.py 2011-06-20 19:18:31 +0000
@@ -9,9 +9,8 @@
import _pythonpath
-import resource
-from lp.services.job.runner import JobCronScript
+from lp.services.job.runner import JobCronScript, TwistedJobRunner
from lp.code.interfaces.branchjob import IBranchScanJobSource
@@ -21,10 +20,11 @@
config_name = 'branchscanner'
source_interface = IBranchScanJobSource
+ def __init__(self):
+ super(RunScanBranches, self).__init__(runner_class=TwistedJobRunner)
+
if __name__ == '__main__':
- resource.setrlimit(resource.RLIMIT_AS, (2L << 30, 2L << 30))
-
script = RunScanBranches()
script.lock_and_run()
=== modified file 'lib/canonical/launchpad/webapp/errorlog.py'
--- lib/canonical/launchpad/webapp/errorlog.py 2011-06-14 15:03:56 +0000
+++ lib/canonical/launchpad/webapp/errorlog.py 2011-06-20 19:18:31 +0000
@@ -314,6 +314,28 @@
finally:
oops_report.close()
+ def getOopsReportById(self, oops_id):
+ """Return the oops report for a given OOPS-ID.
+
+ Only recent reports are found. The report's filename is assumed to
+ have the same numeric suffix as the oops_id. The OOPS report must be
+ located in the error directory used by this ErrorReportingUtility.
+
+ If no report is found, return None.
+ """
+ suffix = re.search('[0-9]*$', oops_id).group(0)
+ for directory, name in self.log_namer.listRecentReportFiles():
+ if not name.endswith(suffix):
+ continue
+ with open(os.path.join(directory, name), 'r') as oops_report_file:
+ try:
+ report = ErrorReport.read(oops_report_file)
+ except TypeError:
+ continue
+ if report.id != oops_id:
+ continue
+ return report
+
def getLastOopsReport(self):
"""Return the last ErrorReport reported with the current config.
=== modified file 'lib/lp/code/configure.zcml'
--- lib/lp/code/configure.zcml 2011-06-02 20:30:39 +0000
+++ lib/lp/code/configure.zcml 2011-06-20 19:18:31 +0000
@@ -869,6 +869,7 @@
</securedutility>
<class class="lp.code.model.branchjob.BranchScanJob">
<allow interface="lp.services.job.interfaces.job.IRunnableJob" />
+ <allow interface="lp.code.interfaces.branchjob.IBranchJob" />
</class>
<!-- Linked branches -->
=== modified file 'lib/lp/code/interfaces/branchjob.py'
--- lib/lp/code/interfaces/branchjob.py 2010-08-20 20:31:18 +0000
+++ lib/lp/code/interfaces/branchjob.py 2011-06-20 19:18:31 +0000
@@ -53,6 +53,8 @@
class IBranchJob(Interface):
"""A job related to a branch."""
+ id = Int(title=_('Unique id of BranchScanJob.'))
+
branch = Object(
title=_('Branch to use for this job.'), required=False,
schema=IBranch)
@@ -102,6 +104,7 @@
:param branch: The database branch to upgrade.
"""
+
class IBranchUpgradeJob(IRunnableJob):
"""A job to upgrade branches with out-of-date formats."""
=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py 2011-05-11 13:59:38 +0000
+++ lib/lp/code/model/branchjob.py 2011-06-20 19:18:31 +0000
@@ -41,6 +41,7 @@
import simplejson
from sqlobject import (
ForeignKey,
+ SQLObjectNotFound,
StringCol,
)
from storm.expr import (
@@ -58,6 +59,7 @@
from canonical.config import config
from canonical.database.enumcol import EnumCol
from canonical.database.sqlbase import SQLBase
+from canonical.launchpad.interfaces.lpstorm import IStore
from canonical.launchpad.webapp import (
canonical_url,
errorlog,
@@ -251,6 +253,18 @@
Job.id.is_in(Job.ready_jobs)))
return (cls(job) for job in jobs)
+ @classmethod
+ def get(cls, key):
+ """Return the instance of this class whose key is supplied.
+
+ :raises: SQLObjectNotFound
+ """
+ instance = IStore(BranchJob).get(BranchJob, key)
+ if instance is None or instance.job_type != cls.class_job_type:
+ raise SQLObjectNotFound(
+ 'No occurrence of %s has key %s' % (cls.__name__, key))
+ return cls(instance)
+
def getOopsVars(self):
"""See `IRunnableJob`."""
vars = BaseRunnableJob.getOopsVars(self)
@@ -313,6 +327,7 @@
classProvides(IBranchScanJobSource)
class_job_type = BranchJobType.SCAN_BRANCH
+ memory_limit = 2 * (1024 ** 3)
server = None
@classmethod
@@ -945,7 +960,7 @@
file_names, changed_files, uploader = iter_info
for upload_file_name, upload_file_content in changed_files:
if len(upload_file_content) == 0:
- continue # Skip empty files
+ continue # Skip empty files
entry = translation_import_queue.addOrUpdateEntry(
upload_file_name, upload_file_content,
True, uploader, productseries=series)
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py 2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/interfaces/job.py 2011-06-20 19:18:31 +0000
@@ -174,6 +174,9 @@
class IJobSource(Interface):
"""Interface for creating and getting jobs."""
+ memory_limit = Int(
+ title=_('Maximum amount of memory which may be used by the process.'))
+
def iterReady():
"""Iterate through all jobs."""
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/runner.py 2011-06-20 19:18:31 +0000
@@ -19,9 +19,12 @@
import contextlib
import logging
import os
+from resource import (
+ getrlimit,
+ RLIMIT_AS,
+ setrlimit,
+ )
from signal import (
- getsignal,
- SIGCHLD,
SIGHUP,
signal,
)
@@ -40,6 +43,7 @@
reactor,
)
from twisted.protocols import amp
+from twisted.python import log
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -55,6 +59,7 @@
)
from lp.services.mail.sendmail import MailController
from lp.services.scripts.base import LaunchpadCronScript
+from lp.services.twistedsupport import run_reactor
from lp.services.twistedsupport.task import (
ParallelLimitedTaskConsumer,
PollingTaskSource,
@@ -76,6 +81,8 @@
retry_error_types = ()
+ memory_limit = None
+
# We redefine __eq__ and __ne__ here to prevent the security proxy
# from mucking up our comparisons in tests and elsewhere.
def __eq__(self, job):
@@ -162,6 +169,7 @@
logger = logging.getLogger()
self.logger = logger
self.error_utility = error_utility
+ self.oops_ids = []
if self.error_utility is None:
self.error_utility = errorlog.globalErrorUtility
@@ -250,6 +258,7 @@
"""Report oopses by id to the log."""
if self.logger is not None:
self.logger.info('Job resulted in OOPS: %s' % oops_id)
+ self.oops_ids.append(oops_id)
class JobRunner(BaseJobRunner):
@@ -361,6 +370,11 @@
"""Run a job from this job_source according to its job id."""
runner = BaseJobRunner()
job = self.job_source.get(job_id)
+ if self.job_source.memory_limit is not None:
+ soft_limit, hard_limit = getrlimit(RLIMIT_AS)
+ if soft_limit != self.job_source.memory_limit:
+ limits = (self.job_source.memory_limit, hard_limit)
+ setrlimit(RLIMIT_AS, limits)
oops = runner.runJobHandleError(job)
if oops is None:
oops_id = ''
@@ -416,6 +430,9 @@
else:
self.incomplete_jobs.append(job)
self.logger.debug('Incomplete %r', job)
+ # Kill the worker that experienced a failure; this only
+ # works because there's a single worker.
+ self.pool.stopAWorker()
if response['oops_id'] != '':
self._logOopsId(response['oops_id'])
@@ -461,8 +478,8 @@
def doConsumer(self):
"""Create a ParallelLimitedTaskConsumer for this job type."""
# 1 is hard-coded for now until we're sure we'd get gains by running
- # more than one at a time. Note that test_timeout relies on this
- # being 1.
+ # more than one at a time. Note that several tests, including
+ # test_timeout, rely on this being 1.
consumer = ParallelLimitedTaskConsumer(1, logger=None)
return consumer.consume(self.getTaskSource())
@@ -483,19 +500,25 @@
self.terminated()
@classmethod
- def runFromSource(cls, job_source, dbuser, logger):
+ def runFromSource(cls, job_source, dbuser, logger, _log_twisted=False):
"""Run all ready jobs provided by the specified source.
The dbuser parameter is not ignored.
+ :param _log_twisted: For debugging: If True, emit verbose Twisted
+ messages to stderr.
"""
logger.info("Running through Twisted.")
+ if _log_twisted:
+ logging.getLogger().setLevel(0)
+ logger_object = logging.getLogger('twistedjobrunner')
+ handler = logging.StreamHandler(sys.stderr)
+ logger_object.addHandler(handler)
+ observer = log.PythonLoggingObserver(
+ loggerName='twistedjobrunner')
+ log.startLoggingWithObserver(observer.emit)
runner = cls(job_source, dbuser, logger)
reactor.callWhenRunning(runner.runAll)
- handler = getsignal(SIGCHLD)
- try:
- reactor.run()
- finally:
- signal(SIGCHLD, handler)
+ run_reactor()
return runner
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-06-20 19:18:31 +0000
@@ -9,6 +9,7 @@
from time import sleep
from testtools.testcase import ExpectedException
+from testtools.matchers import MatchesRegex
import transaction
from zope.interface import implements
@@ -377,7 +378,22 @@
self.assertIn(job, runner.incomplete_jobs)
-class StuckJob(BaseRunnableJob):
+class StaticJobSource(BaseRunnableJob):
+
+ @classmethod
+ def iterReady(cls):
+ if not cls.done:
+ for index, args in enumerate(cls.jobs):
+ yield cls.get(index)
+ cls.done = True
+
+ @classmethod
+ def get(cls, index):
+ args = cls.jobs[index]
+ return cls(index, *args)
+
+
+class StuckJob(StaticJobSource):
"""Simulation of a job that stalls."""
implements(IRunnableJob)
@@ -389,22 +405,10 @@
# doesn't expire and so we soak up the ZCML loading time. For the
# second job, have a short lease so we hit the timeout.
jobs = [
- (0, 10000, 0),
- (1, 5, 30),
+ (10000, 0),
+ (5, 30),
]
- @classmethod
- def iterReady(cls):
- if not cls.done:
- for id, lease_length, delay in cls.jobs:
- yield cls(id, lease_length, delay)
- cls.done = True
-
- @classmethod
- def get(cls, id):
- id, lease_length, delay = cls.jobs[id]
- return cls(id, lease_length, delay)
-
def __init__(self, id, lease_length, delay):
self.id = id
self.lease_length = lease_length
@@ -426,11 +430,76 @@
"""Simulation of a job that stalls."""
jobs = [
- (0, 10000, 0),
- (1, 0.05, 30),
+ (10000, 0),
+ (0.05, 30),
]
+class InitialFailureJob(StaticJobSource):
+
+ implements(IRunnableJob)
+
+ jobs = [(True,), (False,)]
+
+ has_failed = False
+
+ done = False
+
+ def __init__(self, id, fail):
+ self.id = id
+ self.job = Job()
+ self.fail = fail
+
+ def run(self):
+ if self.fail:
+ InitialFailureJob.has_failed = True
+ raise ValueError('I failed.')
+ else:
+ if InitialFailureJob.has_failed:
+ raise ValueError('Previous failure.')
+
+
+class ProcessSharingJob(StaticJobSource):
+
+ implements(IRunnableJob)
+
+ jobs = [(True,), (False,)]
+
+ initial_job_was_here = False
+
+ done = False
+
+ def __init__(self, id, first):
+ self.id = id
+ self.job = Job()
+ self.first = first
+
+ def run(self):
+ if self.first:
+ ProcessSharingJob.initial_job_was_here = True
+ else:
+ if not ProcessSharingJob.initial_job_was_here:
+ raise ValueError('Different process.')
+
+
+class MemoryHogJob(StaticJobSource):
+
+ implements(IRunnableJob)
+
+ jobs = [()]
+
+ done = False
+
+ memory_limit = 0
+
+ def __init__(self, id):
+ self.job = Job()
+ self.id = id
+
+ def run(self):
+ self.x = '*' * (10 ** 6)
+
+
class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -444,6 +513,10 @@
sys.path.append(config.root)
self.addCleanup(sys.path.remove, config.root)
+ @staticmethod
+ def getOopsReport(runner, index):
+ return runner.error_utility.getOopsReportById(runner.oops_ids[index])
+
def test_timeout_long(self):
"""When a job exceeds its lease, an exception is raised.
@@ -458,19 +531,16 @@
runner = TwistedJobRunner.runFromSource(
StuckJob, 'branchscanner', logger)
- # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
- # condition. Another OOPS could be logged. Also confusing because it
- # might be polluted by values from previous jobs.
- oops = errorlog.globalErrorUtility.getLastOopsReport()
self.assertEqual(
(1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+ oops = self.getOopsReport(runner, 0)
self.assertEqual(
- (dedent("""\
- INFO Running through Twisted.
- INFO Job resulted in OOPS: %s
- """) % oops.id,
- 'TimeoutError', 'Job ran too long.'),
- (logger.getLogBuffer(), oops.type, oops.value))
+ ('TimeoutError', 'Job ran too long.'), (oops.type, oops.value))
+ self.assertThat(logger.getLogBuffer(), MatchesRegex(
+ dedent("""\
+ INFO Running through Twisted.
+ INFO Job resulted in OOPS: .*
+ """)))
def test_timeout_short(self):
"""When a job exceeds its lease, an exception is raised.
@@ -486,10 +556,7 @@
runner = TwistedJobRunner.runFromSource(
ShorterStuckJob, 'branchscanner', logger)
- # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
- # condition. Another OOPS could be logged. Also confusing because it
- # might be polluted by values from previous jobs.
- oops = errorlog.globalErrorUtility.getLastOopsReport()
+ oops = self.getOopsReport(runner, 0)
self.assertEqual(
(1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
self.assertEqual(
@@ -500,6 +567,42 @@
'TimeoutError', 'Job ran too long.'),
(logger.getLogBuffer(), oops.type, oops.value))
+ def test_previous_failure_gives_new_process(self):
+ """Failed jobs cause their worker to be terminated.
+
+ When a job fails, it's not clear whether its process can be safely
+ reused for a new job, so we kill the worker.
+ """
+ logger = BufferLogger()
+ runner = TwistedJobRunner.runFromSource(
+ InitialFailureJob, 'branchscanner', logger)
+ self.assertEqual(
+ (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
+ def test_successful_jobs_share_process(self):
+ """Successful jobs allow process reuse.
+
+ When a job succeeds, we assume that its process can be safely reused
+ for a new job, so we reuse the worker.
+ """
+ logger = BufferLogger()
+ runner = TwistedJobRunner.runFromSource(
+ ProcessSharingJob, 'branchscanner', logger)
+ self.assertEqual(
+ (2, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
+ def test_memory_hog_job(self):
+ """A job with a memory limit will trigger MemoryError on excess."""
+ logger = BufferLogger()
+ logger.setLevel(logging.INFO)
+ runner = TwistedJobRunner.runFromSource(
+ MemoryHogJob, 'branchscanner', logger)
+ self.assertEqual(
+ (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+ self.assertIn('Job resulted in OOPS', logger.getLogBuffer())
+ oops = self.getOopsReport(runner, 0)
+ self.assertEqual('MemoryError', oops.type)
+
class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
=== modified file 'lib/lp/services/log/uniquefileallocator.py'
--- lib/lp/services/log/uniquefileallocator.py 2010-10-13 16:35:06 +0000
+++ lib/lp/services/log/uniquefileallocator.py 2011-06-20 19:18:31 +0000
@@ -180,6 +180,15 @@
self._lock.release()
return result
+ def listRecentReportFiles(self):
+ now = datetime.datetime.now(UTC)
+ yesterday = now - datetime.timedelta(days=1)
+ directories = [self.output_dir(now), self.output_dir(yesterday)]
+ for directory in directories:
+ report_names = os.listdir(directory)
+ for name in sorted(report_names, reverse=True):
+ yield directory, name
+
def setToken(self, token):
"""Append a string to the log subtype in filenames and log ids.
=== modified file 'lib/lp/services/twistedsupport/__init__.py'
--- lib/lp/services/twistedsupport/__init__.py 2011-04-08 03:16:01 +0000
+++ lib/lp/services/twistedsupport/__init__.py 2011-06-20 19:18:31 +0000
@@ -11,11 +11,17 @@
'gatherResults',
'no_traceback_failures',
'suppress_stderr',
+ 'run_reactor',
]
import functools
import StringIO
+from signal import (
+ getsignal,
+ SIGCHLD,
+ signal,
+ )
import sys
from twisted.internet import (
@@ -112,6 +118,7 @@
if reactor is None:
reactor = default_reactor
delayed_call = reactor.callLater(timeout, d.cancel)
+
def cancel_timeout(passthrough):
if not delayed_call.called:
delayed_call.cancel()
@@ -121,7 +128,7 @@
def no_traceback_failures(func):
"""Decorator to return traceback-less Failures instead of raising errors.
-
+
This is useful for functions used as callbacks or errbacks for a Deferred.
Traceback-less failures are much faster than the automatic Failures
Deferred constructs internally.
@@ -134,3 +141,12 @@
return Failure(e)
return wrapped
+
+
+def run_reactor():
+ """Run the reactor and return with the SIGCHLD handler unchanged."""
+ handler = getsignal(SIGCHLD)
+ try:
+ default_reactor.run()
+ finally:
+ signal(SIGCHLD, handler)