← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/retry-job into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/retry-job into lp:launchpad with lp:~abentley/launchpad/handle-concurrent-lint as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #787588 in Launchpad itself: "Jobs should support retries"
  https://bugs.launchpad.net/launchpad/+bug/787588

For more details, see:
https://code.launchpad.net/~abentley/launchpad/retry-job/+merge/62145

= Summary =
Fix bug #787588: Jobs should support retries.

== Proposed fix ==
Implement retries in BaseJobRunner.runJob.

== Pre-implementation notes ==
Discussed with Deryck.

== Implementation details ==
Provide Job.max_retries in the model, so that BaseJobRunner.runJob can use it.
Provede Job.retry_error_types so that retries can be performed on a limited
list of exception types.
Implement ExpectedLog to ensure that retries are logged.

== Tests ==
bin/test -vt test_runJob_retry_error

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/translations/scripts/tests/test_packaging_translations.py
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/services/job/interfaces/job.py
  lib/lp/testing/__init__.py
  lib/lp/services/job/model/job.py
-- 
https://code.launchpad.net/~abentley/launchpad/retry-job/+merge/62145
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/retry-job into lp:launchpad.
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py	2010-09-30 21:29:11 +0000
+++ lib/lp/services/job/interfaces/job.py	2011-05-24 14:47:12 +0000
@@ -99,6 +99,9 @@
     attempt_count = Int(title=_(
         'The number of attempts to perform this job that have been made.'))
 
+    max_retries = Int(title=_(
+        'The number of retries permitted before this job permanently fails.'))
+
     def acquireLease(duration=300):
         """Acquire the lease for this Job, or raise LeaseHeld."""
 
@@ -149,6 +152,9 @@
     user_error_types = Attribute(
         'A tuple of exception classes which result from user error.')
 
+    retry_error_types = Attribute(
+        'A tuple of exception classes which should cause a retry.')
+
     def notifyUserError(e):
         """Notify interested parties that this job encountered a user error.
 

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2011-03-23 15:30:02 +0000
+++ lib/lp/services/job/model/job.py	2011-05-24 14:47:12 +0000
@@ -68,6 +68,8 @@
 
     attempt_count = IntCol(default=0)
 
+    max_retries = IntCol(default=0)
+
     # Mapping of valid target states from a given state.
     _valid_transitions = {
         JobStatus.WAITING:

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2011-04-28 09:04:45 +0000
+++ lib/lp/services/job/runner.py	2011-05-24 14:47:12 +0000
@@ -73,6 +73,8 @@
 
     user_error_types = ()
 
+    retry_error_types = ()
+
     # We redefine __eq__ and __ne__ here to prevent the security proxy
     # from mucking up our comparisons in tests and elsewhere.
     def __eq__(self, job):
@@ -170,9 +172,16 @@
             'Running job in status %s' % (job.status.title,))
         job.start()
         transaction.commit()
-
+        do_retry = False
         try:
-            job.run()
+            try:
+                job.run()
+            except job.retry_error_types, e:
+                if job.attempt_count > job.max_retries:
+                    raise
+                self.logger.exception(
+                    "Scheduling retry due to %s.", e.__class__.__name__)
+                do_retry = True
         except Exception:
             self.logger.exception("Job execution raised an exception.")
             transaction.abort()
@@ -184,8 +193,12 @@
         else:
             # Commit transaction to update the DB time.
             transaction.commit()
-            job.complete()
-            self.completed_jobs.append(job)
+            if do_retry:
+                job.queue()
+                self.incomplete_jobs.append(job)
+            else:
+                job.complete()
+                self.completed_jobs.append(job)
         # Commit transaction to update job status.
         transaction.commit()
 

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2011-05-24 14:47:11 +0000
+++ lib/lp/services/job/tests/test_runner.py	2011-05-24 14:47:12 +0000
@@ -8,6 +8,7 @@
 from textwrap import dedent
 from time import sleep
 
+from testtools.testcase import ExpectedException
 import transaction
 from zope.interface import implements
 
@@ -116,6 +117,20 @@
         raise RaisingJobException('oops notifying users')
 
 
+class RetryError(Exception):
+    pass
+
+
+class RaisingRetryJob(NullJob):
+
+    retry_error_types = (RetryError,)
+
+    max_retries = 1
+
+    def run(self):
+        raise RetryError()
+
+
 class TestJobRunner(TestCaseWithFactory):
     """Ensure JobRunner behaves as expected."""
 
@@ -307,6 +322,18 @@
         runner.runJobHandleError(job)
         self.assertEqual(0, len(self.oopses))
 
+    def test_runJob_retry_error(self):
+        job = RaisingRetryJob('completion')
+        runner = JobRunner([job])
+        with self.expectedLog('Scheduling retry due to RetryError'):
+            runner.runJob(job)
+        self.assertEqual(0, len(self.oopses))
+        self.assertEqual(JobStatus.WAITING, job.status)
+        self.assertNotIn(job, runner.completed_jobs)
+        self.assertIn(job, runner.incomplete_jobs)
+        with ExpectedException(RetryError, ''):
+            runner.runJob(job)
+
     def test_runJobHandleErrors_oops_generated_notify_fails(self):
         """A second oops is logged if the notification of the oops fails."""
         job = RaisingJobRaisingNotifyOops('boom')

=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py	2011-05-24 14:47:11 +0000
+++ lib/lp/testing/__init__.py	2011-05-24 14:47:12 +0000
@@ -51,6 +51,7 @@
     'ZopeTestInSubProcess',
     ]
 
+from cStringIO import StringIO
 from contextlib import contextmanager
 from datetime import (
     datetime,
@@ -91,6 +92,7 @@
 import testtools
 from testtools.content import Content
 from testtools.content_type import UTF8_TEXT
+from testtools.matchers import MatchesRegex
 import transaction
 from windmill.authoring import WindmillTestClient
 from zope.component import (
@@ -529,6 +531,19 @@
         expected_vector, observed_vector = zip(*args)
         return self.assertEqual(expected_vector, observed_vector)
 
+    @contextmanager
+    def expectedLog(self, regex):
+        """Expect a log to be written that matches the regex."""
+        output = StringIO()
+        handler = logging.StreamHandler(output)
+        logger = logging.getLogger()
+        logger.addHandler(handler)
+        try:
+            yield
+        finally:
+            logger.removeHandler(handler)
+        self.assertThat(output.getvalue(), MatchesRegex(regex))
+
     def pushConfig(self, section, **kwargs):
         """Push some key-value pairs into a section of the config.