← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/memlimit-jobs into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/memlimit-jobs into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #786804 in Launchpad itself: "branch scanner rlimit failures cause the next branch to be incorrectly scanned and fail"
  https://bugs.launchpad.net/launchpad/+bug/786804

For more details, see:
https://code.launchpad.net/~abentley/launchpad/memlimit-jobs/+merge/65256

= Summary =
Fix bug #786804: branch scanner rlimit failures cause the next branch to be incorrectly scanned and fail.

== Proposed fix ==
Memory-limit the job, rather than the script itself.

== Pre-implementation notes ==
None

== Implementation details ==
There are interlocking fixes:
1. When a job dies due to an exception, its process is terminated.  We assume that its abrupt termination may have led to an unclean process state, as seen in the bug.

2. We provide an infrastructure that allows jobs run by the TwistedJobRunner to be run with a memory limit.

3. We move the branch scan memory limit to the BranchScanJob and use the TwistedJobRunner.  Using the TwistedJobRunner requires allowing access to IBranchJob.id, and providing BranchScanJob.get.


Additionally, I
- Extracted run_reactor, since it's not Job-specific.
- Added ErrorReportingUtility.getOopsReportById (and UniqueFileAllocator.listRecentReportFiles) to make our tests less race-prone.
- Provided an easy way to turn on Twisted logging.

== Tests ==
bin/test -t scan_branches -t test_previous_failure_gives_new_process -t test_successful_jobs_share_process -t test_memory_hog_job

== Demo and Q/A ==
None


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/code/configure.zcml
  cronscripts/scan_branches.py
  lib/lp/services/job/runner.py
  lib/lp/services/job/interfaces/job.py
  lib/lp/code/interfaces/branchjob.py
  lib/canonical/launchpad/webapp/errorlog.py
  lib/lp/services/twistedsupport/__init__.py
  lib/lp/code/model/branchjob.py
  lib/lp/services/log/uniquefileallocator.py

./cronscripts/scan_branches.py
      10: '_pythonpath' imported but unused
-- 
https://code.launchpad.net/~abentley/launchpad/memlimit-jobs/+merge/65256
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/memlimit-jobs into lp:launchpad.
=== modified file 'cronscripts/scan_branches.py'
--- cronscripts/scan_branches.py	2010-12-15 05:29:34 +0000
+++ cronscripts/scan_branches.py	2011-06-20 19:18:31 +0000
@@ -9,9 +9,8 @@
 
 import _pythonpath
 
-import resource
 
-from lp.services.job.runner import JobCronScript
+from lp.services.job.runner import JobCronScript, TwistedJobRunner
 from lp.code.interfaces.branchjob import IBranchScanJobSource
 
 
@@ -21,10 +20,11 @@
     config_name = 'branchscanner'
     source_interface = IBranchScanJobSource
 
+    def __init__(self):
+        super(RunScanBranches, self).__init__(runner_class=TwistedJobRunner)
+
 
 if __name__ == '__main__':
 
-    resource.setrlimit(resource.RLIMIT_AS, (2L << 30, 2L << 30))
-
     script = RunScanBranches()
     script.lock_and_run()

=== modified file 'lib/canonical/launchpad/webapp/errorlog.py'
--- lib/canonical/launchpad/webapp/errorlog.py	2011-06-14 15:03:56 +0000
+++ lib/canonical/launchpad/webapp/errorlog.py	2011-06-20 19:18:31 +0000
@@ -314,6 +314,28 @@
         finally:
             oops_report.close()
 
+    def getOopsReportById(self, oops_id):
+        """Return the oops report for a given OOPS-ID.
+
+        Only recent reports are found.  The report's filename is assumed to
+        have the same numeric suffix as the oops_id.  The OOPS report must be
+        located in the error directory used by this ErrorReportingUtility.
+
+        If no report is found, return None.
+        """
+        suffix = re.search('[0-9]*$', oops_id).group(0)
+        for directory, name in self.log_namer.listRecentReportFiles():
+            if not name.endswith(suffix):
+                continue
+            with open(os.path.join(directory, name), 'r') as oops_report_file:
+                try:
+                    report = ErrorReport.read(oops_report_file)
+                except TypeError:
+                    continue
+            if report.id != oops_id:
+                continue
+            return report
+
     def getLastOopsReport(self):
         """Return the last ErrorReport reported with the current config.
 

=== modified file 'lib/lp/code/configure.zcml'
--- lib/lp/code/configure.zcml	2011-06-02 20:30:39 +0000
+++ lib/lp/code/configure.zcml	2011-06-20 19:18:31 +0000
@@ -869,6 +869,7 @@
   </securedutility>
   <class class="lp.code.model.branchjob.BranchScanJob">
     <allow interface="lp.services.job.interfaces.job.IRunnableJob" />
+    <allow interface="lp.code.interfaces.branchjob.IBranchJob" />
   </class>
 
   <!-- Linked branches -->

=== modified file 'lib/lp/code/interfaces/branchjob.py'
--- lib/lp/code/interfaces/branchjob.py	2010-08-20 20:31:18 +0000
+++ lib/lp/code/interfaces/branchjob.py	2011-06-20 19:18:31 +0000
@@ -53,6 +53,8 @@
 class IBranchJob(Interface):
     """A job related to a branch."""
 
+    id = Int(title=_('Unique id of BranchScanJob.'))
+
     branch = Object(
         title=_('Branch to use for this job.'), required=False,
         schema=IBranch)
@@ -102,6 +104,7 @@
         :param branch: The database branch to upgrade.
         """
 
+
 class IBranchUpgradeJob(IRunnableJob):
     """A job to upgrade branches with out-of-date formats."""
 

=== modified file 'lib/lp/code/model/branchjob.py'
--- lib/lp/code/model/branchjob.py	2011-05-11 13:59:38 +0000
+++ lib/lp/code/model/branchjob.py	2011-06-20 19:18:31 +0000
@@ -41,6 +41,7 @@
 import simplejson
 from sqlobject import (
     ForeignKey,
+    SQLObjectNotFound,
     StringCol,
     )
 from storm.expr import (
@@ -58,6 +59,7 @@
 from canonical.config import config
 from canonical.database.enumcol import EnumCol
 from canonical.database.sqlbase import SQLBase
+from canonical.launchpad.interfaces.lpstorm import IStore
 from canonical.launchpad.webapp import (
     canonical_url,
     errorlog,
@@ -251,6 +253,18 @@
                 Job.id.is_in(Job.ready_jobs)))
         return (cls(job) for job in jobs)
 
+    @classmethod
+    def get(cls, key):
+        """Return the instance of this class whose key is supplied.
+
+        :raises: SQLObjectNotFound
+        """
+        instance = IStore(BranchJob).get(BranchJob, key)
+        if instance is None or instance.job_type != cls.class_job_type:
+            raise SQLObjectNotFound(
+                'No occurrence of %s has key %s' % (cls.__name__, key))
+        return cls(instance)
+
     def getOopsVars(self):
         """See `IRunnableJob`."""
         vars = BaseRunnableJob.getOopsVars(self)
@@ -313,6 +327,7 @@
 
     classProvides(IBranchScanJobSource)
     class_job_type = BranchJobType.SCAN_BRANCH
+    memory_limit = 2 * (1024 ** 3)
     server = None
 
     @classmethod
@@ -945,7 +960,7 @@
                 file_names, changed_files, uploader = iter_info
                 for upload_file_name, upload_file_content in changed_files:
                     if len(upload_file_content) == 0:
-                        continue # Skip empty files
+                        continue  # Skip empty files
                     entry = translation_import_queue.addOrUpdateEntry(
                         upload_file_name, upload_file_content,
                         True, uploader, productseries=series)

=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py	2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/interfaces/job.py	2011-06-20 19:18:31 +0000
@@ -174,6 +174,9 @@
 class IJobSource(Interface):
     """Interface for creating and getting jobs."""
 
+    memory_limit = Int(
+        title=_('Maximum amount of memory which may be used by the process.'))
+
     def iterReady():
         """Iterate through all jobs."""
 

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/runner.py	2011-06-20 19:18:31 +0000
@@ -19,9 +19,12 @@
 import contextlib
 import logging
 import os
+from resource import (
+    getrlimit,
+    RLIMIT_AS,
+    setrlimit,
+    )
 from signal import (
-    getsignal,
-    SIGCHLD,
     SIGHUP,
     signal,
     )
@@ -40,6 +43,7 @@
     reactor,
     )
 from twisted.protocols import amp
+from twisted.python import log
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
@@ -55,6 +59,7 @@
     )
 from lp.services.mail.sendmail import MailController
 from lp.services.scripts.base import LaunchpadCronScript
+from lp.services.twistedsupport import run_reactor
 from lp.services.twistedsupport.task import (
     ParallelLimitedTaskConsumer,
     PollingTaskSource,
@@ -76,6 +81,8 @@
 
     retry_error_types = ()
 
+    memory_limit = None
+
     # We redefine __eq__ and __ne__ here to prevent the security proxy
     # from mucking up our comparisons in tests and elsewhere.
     def __eq__(self, job):
@@ -162,6 +169,7 @@
             logger = logging.getLogger()
         self.logger = logger
         self.error_utility = error_utility
+        self.oops_ids = []
         if self.error_utility is None:
             self.error_utility = errorlog.globalErrorUtility
 
@@ -250,6 +258,7 @@
         """Report oopses by id to the log."""
         if self.logger is not None:
             self.logger.info('Job resulted in OOPS: %s' % oops_id)
+        self.oops_ids.append(oops_id)
 
 
 class JobRunner(BaseJobRunner):
@@ -361,6 +370,11 @@
         """Run a job from this job_source according to its job id."""
         runner = BaseJobRunner()
         job = self.job_source.get(job_id)
+        if self.job_source.memory_limit is not None:
+            soft_limit, hard_limit = getrlimit(RLIMIT_AS)
+            if soft_limit != self.job_source.memory_limit:
+                limits = (self.job_source.memory_limit, hard_limit)
+                setrlimit(RLIMIT_AS, limits)
         oops = runner.runJobHandleError(job)
         if oops is None:
             oops_id = ''
@@ -416,6 +430,9 @@
             else:
                 self.incomplete_jobs.append(job)
                 self.logger.debug('Incomplete %r', job)
+                # Kill the worker that experienced a failure; this only
+                # works because there's a single worker.
+                self.pool.stopAWorker()
             if response['oops_id'] != '':
                 self._logOopsId(response['oops_id'])
 
@@ -461,8 +478,8 @@
     def doConsumer(self):
         """Create a ParallelLimitedTaskConsumer for this job type."""
         # 1 is hard-coded for now until we're sure we'd get gains by running
-        # more than one at a time.  Note that test_timeout relies on this
-        # being 1.
+        # more than one at a time.  Note that several tests, including
+        # test_timeout, rely on this being 1.
         consumer = ParallelLimitedTaskConsumer(1, logger=None)
         return consumer.consume(self.getTaskSource())
 
@@ -483,19 +500,25 @@
         self.terminated()
 
     @classmethod
-    def runFromSource(cls, job_source, dbuser, logger):
+    def runFromSource(cls, job_source, dbuser, logger, _log_twisted=False):
         """Run all ready jobs provided by the specified source.
 
         The dbuser parameter is not ignored.
+        :param _log_twisted: For debugging: If True, emit verbose Twisted
+            messages to stderr.
         """
         logger.info("Running through Twisted.")
+        if _log_twisted:
+            logging.getLogger().setLevel(0)
+            logger_object = logging.getLogger('twistedjobrunner')
+            handler = logging.StreamHandler(sys.stderr)
+            logger_object.addHandler(handler)
+            observer = log.PythonLoggingObserver(
+                loggerName='twistedjobrunner')
+            log.startLoggingWithObserver(observer.emit)
         runner = cls(job_source, dbuser, logger)
         reactor.callWhenRunning(runner.runAll)
-        handler = getsignal(SIGCHLD)
-        try:
-            reactor.run()
-        finally:
-            signal(SIGCHLD, handler)
+        run_reactor()
         return runner
 
 

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2011-05-26 16:29:59 +0000
+++ lib/lp/services/job/tests/test_runner.py	2011-06-20 19:18:31 +0000
@@ -9,6 +9,7 @@
 from time import sleep
 
 from testtools.testcase import ExpectedException
+from testtools.matchers import MatchesRegex
 import transaction
 from zope.interface import implements
 
@@ -377,7 +378,22 @@
         self.assertIn(job, runner.incomplete_jobs)
 
 
-class StuckJob(BaseRunnableJob):
+class StaticJobSource(BaseRunnableJob):
+
+    @classmethod
+    def iterReady(cls):
+        if not cls.done:
+            for index, args in enumerate(cls.jobs):
+                yield cls.get(index)
+        cls.done = True
+
+    @classmethod
+    def get(cls, index):
+        args = cls.jobs[index]
+        return cls(index, *args)
+
+
+class StuckJob(StaticJobSource):
     """Simulation of a job that stalls."""
     implements(IRunnableJob)
 
@@ -389,22 +405,10 @@
     # doesn't expire and so we soak up the ZCML loading time.  For the
     # second job, have a short lease so we hit the timeout.
     jobs = [
-        (0, 10000, 0),
-        (1, 5, 30),
+        (10000, 0),
+        (5, 30),
         ]
 
-    @classmethod
-    def iterReady(cls):
-        if not cls.done:
-            for id, lease_length, delay in cls.jobs:
-                yield cls(id, lease_length, delay)
-        cls.done = True
-
-    @classmethod
-    def get(cls, id):
-        id, lease_length, delay = cls.jobs[id]
-        return cls(id, lease_length, delay)
-
     def __init__(self, id, lease_length, delay):
         self.id = id
         self.lease_length = lease_length
@@ -426,11 +430,76 @@
     """Simulation of a job that stalls."""
 
     jobs = [
-        (0, 10000, 0),
-        (1, 0.05, 30),
+        (10000, 0),
+        (0.05, 30),
         ]
 
 
+class InitialFailureJob(StaticJobSource):
+
+    implements(IRunnableJob)
+
+    jobs = [(True,), (False,)]
+
+    has_failed = False
+
+    done = False
+
+    def __init__(self, id, fail):
+        self.id = id
+        self.job = Job()
+        self.fail = fail
+
+    def run(self):
+        if self.fail:
+            InitialFailureJob.has_failed = True
+            raise ValueError('I failed.')
+        else:
+            if InitialFailureJob.has_failed:
+                raise ValueError('Previous failure.')
+
+
+class ProcessSharingJob(StaticJobSource):
+
+    implements(IRunnableJob)
+
+    jobs = [(True,), (False,)]
+
+    initial_job_was_here = False
+
+    done = False
+
+    def __init__(self, id, first):
+        self.id = id
+        self.job = Job()
+        self.first = first
+
+    def run(self):
+        if self.first:
+            ProcessSharingJob.initial_job_was_here = True
+        else:
+            if not ProcessSharingJob.initial_job_was_here:
+                raise ValueError('Different process.')
+
+
+class MemoryHogJob(StaticJobSource):
+
+    implements(IRunnableJob)
+
+    jobs = [()]
+
+    done = False
+
+    memory_limit = 0
+
+    def __init__(self, id):
+        self.job = Job()
+        self.id = id
+
+    def run(self):
+        self.x = '*' * (10 ** 6)
+
+
 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
 
     layer = ZopelessDatabaseLayer
@@ -444,6 +513,10 @@
             sys.path.append(config.root)
             self.addCleanup(sys.path.remove, config.root)
 
+    @staticmethod
+    def getOopsReport(runner, index):
+        return runner.error_utility.getOopsReportById(runner.oops_ids[index])
+
     def test_timeout_long(self):
         """When a job exceeds its lease, an exception is raised.
 
@@ -458,19 +531,16 @@
         runner = TwistedJobRunner.runFromSource(
             StuckJob, 'branchscanner', logger)
 
-        # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
-        # condition. Another OOPS could be logged.  Also confusing because it
-        # might be polluted by values from previous jobs.
-        oops = errorlog.globalErrorUtility.getLastOopsReport()
         self.assertEqual(
             (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+        oops = self.getOopsReport(runner, 0)
         self.assertEqual(
-            (dedent("""\
-             INFO Running through Twisted.
-             INFO Job resulted in OOPS: %s
-             """) % oops.id,
-             'TimeoutError', 'Job ran too long.'),
-            (logger.getLogBuffer(), oops.type, oops.value))
+            ('TimeoutError', 'Job ran too long.'), (oops.type, oops.value))
+        self.assertThat(logger.getLogBuffer(), MatchesRegex(
+            dedent("""\
+            INFO Running through Twisted.
+            INFO Job resulted in OOPS: .*
+            """)))
 
     def test_timeout_short(self):
         """When a job exceeds its lease, an exception is raised.
@@ -486,10 +556,7 @@
         runner = TwistedJobRunner.runFromSource(
             ShorterStuckJob, 'branchscanner', logger)
 
-        # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
-        # condition. Another OOPS could be logged.  Also confusing because it
-        # might be polluted by values from previous jobs.
-        oops = errorlog.globalErrorUtility.getLastOopsReport()
+        oops = self.getOopsReport(runner, 0)
         self.assertEqual(
             (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
         self.assertEqual(
@@ -500,6 +567,42 @@
              'TimeoutError', 'Job ran too long.'),
             (logger.getLogBuffer(), oops.type, oops.value))
 
+    def test_previous_failure_gives_new_process(self):
+        """Failed jobs cause their worker to be terminated.
+
+        When a job fails, it's not clear whether its process can be safely
+        reused for a new job, so we kill the worker.
+        """
+        logger = BufferLogger()
+        runner = TwistedJobRunner.runFromSource(
+            InitialFailureJob, 'branchscanner', logger)
+        self.assertEqual(
+            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
+    def test_successful_jobs_share_process(self):
+        """Successful jobs allow process reuse.
+
+        When a job succeeds, we assume that its process can be safely reused
+        for a new job, so we reuse the worker.
+        """
+        logger = BufferLogger()
+        runner = TwistedJobRunner.runFromSource(
+            ProcessSharingJob, 'branchscanner', logger)
+        self.assertEqual(
+            (2, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
+    def test_memory_hog_job(self):
+        """A job with a memory limit will trigger MemoryError on excess."""
+        logger = BufferLogger()
+        logger.setLevel(logging.INFO)
+        runner = TwistedJobRunner.runFromSource(
+            MemoryHogJob, 'branchscanner', logger)
+        self.assertEqual(
+            (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+        self.assertIn('Job resulted in OOPS', logger.getLogBuffer())
+        oops = self.getOopsReport(runner, 0)
+        self.assertEqual('MemoryError', oops.type)
+
 
 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
 

=== modified file 'lib/lp/services/log/uniquefileallocator.py'
--- lib/lp/services/log/uniquefileallocator.py	2010-10-13 16:35:06 +0000
+++ lib/lp/services/log/uniquefileallocator.py	2011-06-20 19:18:31 +0000
@@ -180,6 +180,15 @@
                 self._lock.release()
         return result
 
+    def listRecentReportFiles(self):
+        now = datetime.datetime.now(UTC)
+        yesterday = now - datetime.timedelta(days=1)
+        directories = [self.output_dir(now), self.output_dir(yesterday)]
+        for directory in directories:
+            report_names = os.listdir(directory)
+            for name in sorted(report_names, reverse=True):
+                yield directory, name
+
     def setToken(self, token):
         """Append a string to the log subtype in filenames and log ids.
 

=== modified file 'lib/lp/services/twistedsupport/__init__.py'
--- lib/lp/services/twistedsupport/__init__.py	2011-04-08 03:16:01 +0000
+++ lib/lp/services/twistedsupport/__init__.py	2011-06-20 19:18:31 +0000
@@ -11,11 +11,17 @@
     'gatherResults',
     'no_traceback_failures',
     'suppress_stderr',
+    'run_reactor',
     ]
 
 
 import functools
 import StringIO
+from signal import (
+    getsignal,
+    SIGCHLD,
+    signal,
+    )
 import sys
 
 from twisted.internet import (
@@ -112,6 +118,7 @@
     if reactor is None:
         reactor = default_reactor
     delayed_call = reactor.callLater(timeout, d.cancel)
+
     def cancel_timeout(passthrough):
         if not delayed_call.called:
             delayed_call.cancel()
@@ -121,7 +128,7 @@
 
 def no_traceback_failures(func):
     """Decorator to return traceback-less Failures instead of raising errors.
-    
+
     This is useful for functions used as callbacks or errbacks for a Deferred.
     Traceback-less failures are much faster than the automatic Failures
     Deferred constructs internally.
@@ -134,3 +141,12 @@
             return Failure(e)
 
     return wrapped
+
+
+def run_reactor():
+    """Run the reactor and return with the SIGCHLD handler unchanged."""
+    handler = getsignal(SIGCHLD)
+    try:
+        default_reactor.run()
+    finally:
+        signal(SIGCHLD, handler)