← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jml/launchpad/test-timeout-505913 into lp:launchpad

 

Jonathan Lange has proposed merging lp:~jml/launchpad/test-timeout-505913 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #505913 in Launchpad itself: "TestTwistedJobRunner.test_timeout fails intermittently"
  https://bugs.launchpad.net/launchpad/+bug/505913

For more details, see:
https://code.launchpad.net/~jml/launchpad/test-timeout-505913/+merge/54598

This branch, I believe, fixes one of the intermittently failing tests. The linked bug has most of the relevant details. Essentially, the timeout was happening at random times. When it happened while we were actually running the job, everything was good. When it happened during some of the surrounding code (e.g. while the child process was unmarshalling input to turn it into a potential request to do a job), then it was ignored.

This is because the timeout mechanism was to raise an exception. In Twisted, when a SIGHUP signal handler raises an exception, it just raises in the code that's currently running. If we're in the reactor loop at the time, then it is logged as an unhandled error.

Instead of raising an exception, this branch just exits the worker process with a particular exit code (TwistedJobRunner.TIMEOUT_CODE).  When the runner sees a process exit with that code, we ignore the ProcessDone "failure" and log a timeout OOPS instead.

The testing strategy for this was to duplicate the test_timeout test that was failing intermittently. One of them would have the timeout bumped up a little (from 1 to 5) to make it more likely to exercise the case where the timeout happens during an actual job and not in surrounding machinery. The other has a much, much shorter timeout (0.05), which is guaranteed to hit the machinery.

In the branch, you'll see lots of other comments and cleanups. It's been a meandering experience, and I've tried to leave what I've learned marked on the walls of the labyrinth.
-- 
https://code.launchpad.net/~jml/launchpad/test-timeout-505913/+merge/54598
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jml/launchpad/test-timeout-505913 into lp:launchpad.
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2011-02-09 17:43:54 +0000
+++ lib/lp/services/job/runner.py	2011-03-23 20:31:14 +0000
@@ -303,9 +303,8 @@
 
     @classmethod
     def __enter__(cls):
-
         def handler(signum, frame):
-            raise TimeoutError
+            os._exit(TwistedJobRunner.TIMEOUT_CODE)
         scripts.execute_zcml_for_scripts(use_web_security=False)
         signal(SIGHUP, handler)
         initZopeless(dbuser=cls.dbuser)
@@ -340,6 +339,8 @@
 class TwistedJobRunner(BaseJobRunner):
     """Run Jobs via twisted."""
 
+    TIMEOUT_CODE = 42
+
     def __init__(self, job_source, dbuser, logger=None, error_utility=None):
         env = {'PATH': os.environ['PATH']}
         for name in ('PYTHONPATH', 'LPCONFIG'):
@@ -373,7 +374,7 @@
         self.logger.debug(
             'Running %r, lease expires %s', job, job.lease_expires)
         deferred = self.pool.doWork(
-            RunJobCommand, job_id = job_id, _deadline=deadline)
+            RunJobCommand, job_id=job_id, _deadline=deadline)
 
         def update(response):
             if response['success']:
@@ -387,17 +388,33 @@
 
         def job_raised(failure):
             self.incomplete_jobs.append(job)
-            info = (failure.type, failure.value, failure.tb)
-            oops = self._doOops(job, info)
-            self._logOopsId(oops.id)
+            exit_code = getattr(failure.value, 'exitCode', None)
+            if exit_code == self.TIMEOUT_CODE:
+                self._logTimeout(job)
+            else:
+                info = (failure.type, failure.value, failure.tb)
+                oops = self._doOops(job, info)
+                self._logOopsId(oops.id)
         deferred.addCallbacks(update, job_raised)
         return deferred
 
+    def _logTimeout(self, job):
+        try:
+            raise TimeoutError
+        except TimeoutError:
+            oops = self._doOops(job, sys.exc_info())
+            self._logOopsId(oops.id)
+
     def getTaskSource(self):
         """Return a task source for all jobs in job_source."""
 
         def producer():
             while True:
+                # XXX: JonathanLange bug=741204: If we're getting all of the
+                # jobs at the start anyway, we can use a DeferredSemaphore,
+                # instead of the more complex PollingTaskSource, which is
+                # better suited to cases where we don't know how much work
+                # there will be.
                 jobs = list(self.job_source.iterReady())
                 if len(jobs) == 0:
                     yield None
@@ -407,9 +424,9 @@
 
     def doConsumer(self):
         """Create a ParallelLimitedTaskConsumer for this job type."""
-        logger = logging.getLogger('gloop')
-        logger.addHandler(logging.StreamHandler(sys.stdout))
-        logger.setLevel(logging.DEBUG)
+        # 1 is hard-coded for now until we're sure we'd get gains by running
+        # more than one at a time.  Note that test_timeout relies on this
+        # being 1.
         consumer = ParallelLimitedTaskConsumer(1, logger=None)
         return consumer.consume(self.getTaskSource())
 

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2010-10-04 19:50:45 +0000
+++ lib/lp/services/job/tests/test_job.py	2011-03-23 20:31:14 +0000
@@ -1,11 +1,10 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
 
 from datetime import datetime
 import time
-from unittest import TestLoader
 
 import pytz
 from storm.locals import Store
@@ -18,7 +17,7 @@
     MAIN_STORE,
     )
 from canonical.launchpad.webapp.testing import verifyObject
-from canonical.testing.layers import LaunchpadZopelessLayer
+from canonical.testing.layers import ZopelessDatabaseLayer
 from lp.services.job.interfaces.job import (
     IJob,
     JobStatus,
@@ -34,7 +33,7 @@
 class TestJob(TestCase):
     """Ensure Job behaves as intended."""
 
-    layer = LaunchpadZopelessLayer
+    layer = ZopelessDatabaseLayer
 
     def test_implements_IJob(self):
         """Job should implement IJob."""
@@ -211,7 +210,7 @@
 class TestReadiness(TestCase):
     """Test the implementation of readiness."""
 
-    layer = LaunchpadZopelessLayer
+    layer = ZopelessDatabaseLayer
 
     def _sampleData(self):
         store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
@@ -296,7 +295,3 @@
         job = Job()
         job.acquireLease(-300)
         self.assertEqual(0, job.getTimeout())
-
-
-def test_suite():
-    return TestLoader().loadTestsFromName(__name__)

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2010-12-23 00:38:29 +0000
+++ lib/lp/services/job/tests/test_runner.py	2011-03-23 20:31:14 +0000
@@ -7,19 +7,16 @@
 import sys
 from textwrap import dedent
 from time import sleep
-from unittest import TestLoader
 
 import transaction
-from zope.component import getUtility
 from zope.interface import implements
 
+from canonical.config import config
 from canonical.launchpad.webapp import errorlog
-from canonical.launchpad.webapp.interfaces import (
-    DEFAULT_FLAVOR,
-    IStoreSelector,
-    MAIN_STORE,
+from canonical.testing.layers import (
+    LaunchpadZopelessLayer,
+    ZopelessDatabaseLayer,
     )
-from canonical.testing.layers import LaunchpadZopelessLayer
 from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
 from lp.services.job.interfaces.job import (
     IRunnableJob,
@@ -329,42 +326,68 @@
 
     done = False
 
+    # A list of jobs to run: id, lease_length, delay.
+    #
+    # For the first job, have a very long lease, so that it
+    # doesn't expire and so we soak up the ZCML loading time.  For the
+    # second job, have a short lease so we hit the timeout.
+    jobs = [
+        (0, 10000, 0),
+        (1, 5, 30),
+        ]
+
     @classmethod
     def iterReady(cls):
         if not cls.done:
-            yield StuckJob(1)
-            yield StuckJob(2)
+            for id, lease_length, delay in cls.jobs:
+                yield cls(id, lease_length, delay)
         cls.done = True
 
-    @staticmethod
-    def get(id):
-        return StuckJob(id)
+    @classmethod
+    def get(cls, id):
+        id, lease_length, delay = cls.jobs[id]
+        return cls(id, lease_length, delay)
 
-    def __init__(self, id):
+    def __init__(self, id, lease_length, delay):
         self.id = id
+        self.lease_length = lease_length
+        self.delay = delay
         self.job = Job()
 
+    def __repr__(self):
+        return '<StuckJob(%r, lease_length=%s, delay=%s)>' % (
+            self.id, self.lease_length, self.delay)
+
     def acquireLease(self):
-        if self.id == 2:
-            lease_length = 1
-        else:
-            lease_length = 10000
-        return self.job.acquireLease(lease_length)
+        return self.job.acquireLease(self.lease_length)
 
     def run(self):
-        if self.id == 2:
-            sleep(30)
-        else:
-            store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
-            assert (
-                'user=branchscanner' in store._connection._raw_connection.dsn)
+        sleep(self.delay)
+
+
+class ShorterStuckJob(StuckJob):
+    """Simulation of a job that stalls."""
+
+    jobs = [
+        (0, 10000, 0),
+        (1, 0.05, 30),
+        ]
 
 
 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
 
-    layer = LaunchpadZopelessLayer
-
-    def test_timeout(self):
+    layer = ZopelessDatabaseLayer
+
+    def setUp(self):
+        super(TestTwistedJobRunner, self).setUp()
+        # The test relies on _pythonpath being importable. Thus we need to add
+        # a directory that contains _pythonpath to the sys.path. We can rely
+        # on the root directory of the checkout containing _pythonpath.
+        if config.root not in sys.path:
+            sys.path.append(config.root)
+            self.addCleanup(sys.path.remove, config.root)
+
+    def test_timeout_long(self):
         """When a job exceeds its lease, an exception is raised.
 
         Unfortunately, timeouts include the time it takes for the zope
@@ -373,18 +396,52 @@
         """
         logger = BufferLogger()
         logger.setLevel(logging.INFO)
+        # StuckJob is actually a source of two jobs. The first is fast, the
+        # second slow.
         runner = TwistedJobRunner.runFromSource(
             StuckJob, 'branchscanner', logger)
 
-        self.assertEqual(1, len(runner.completed_jobs))
-        self.assertEqual(1, len(runner.incomplete_jobs))
-        oops = errorlog.globalErrorUtility.getLastOopsReport()
-        self.assertEqual(dedent("""\
-             INFO Running through Twisted.
-             INFO Job resulted in OOPS: %s
-             """) % oops.id, logger.getLogBuffer())
-        self.assertEqual('TimeoutError', oops.type)
-        self.assertIn('Job ran too long.', oops.value)
+        # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
+        # condition. Another OOPS could be logged.  Also confusing because it
+        # might be polluted by values from previous jobs.
+        oops = errorlog.globalErrorUtility.getLastOopsReport()
+        self.assertEqual(
+            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+        self.assertEqual(
+            (dedent("""\
+             INFO Running through Twisted.
+             INFO Job resulted in OOPS: %s
+             """) % oops.id,
+             'TimeoutError', 'Job ran too long.'),
+            (logger.getLogBuffer(), oops.type, oops.value))
+
+    def test_timeout_short(self):
+        """When a job exceeds its lease, an exception is raised.
+
+        Unfortunately, timeouts include the time it takes for the zope
+        machinery to start up, so we run a job that will not time out first,
+        followed by a job that is sure to time out.
+        """
+        logger = BufferLogger()
+        logger.setLevel(logging.INFO)
+        # StuckJob is actually a source of two jobs. The first is fast, the
+        # second slow.
+        runner = TwistedJobRunner.runFromSource(
+            ShorterStuckJob, 'branchscanner', logger)
+
+        # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
+        # condition. Another OOPS could be logged.  Also confusing because it
+        # might be polluted by values from previous jobs.
+        oops = errorlog.globalErrorUtility.getLastOopsReport()
+        self.assertEqual(
+            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+        self.assertEqual(
+            (dedent("""\
+             INFO Running through Twisted.
+             INFO Job resulted in OOPS: %s
+             """) % oops.id,
+             'TimeoutError', 'Job ran too long.'),
+            (logger.getLogBuffer(), oops.type, oops.value))
 
 
 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
@@ -425,7 +482,3 @@
             cronscript.main()
         finally:
             errorlog.globalErrorUtility = old_errorlog
-
-
-def test_suite():
-    return TestLoader().loadTestsFromName(__name__)

=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py	2011-03-21 21:06:46 +0000
+++ lib/lp/testing/__init__.py	2011-03-23 20:31:14 +0000
@@ -12,7 +12,6 @@
     'api_url',
     'build_yui_unittest_suite',
     'BrowserTestCase',
-    'capture_events',
     'celebrity_logged_in',
     'FakeTime',
     'get_lsb_information',
@@ -29,7 +28,7 @@
     'normalize_whitespace',
     'oauth_access_token_for',
     'person_logged_in',
-    'quote_jquery_expression'
+    'quote_jquery_expression',
     'record_statements',
     'run_with_login',
     'run_with_storm_debug',