← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad with lp:~abentley/launchpad/simplify-twisted-runner as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #833888 in Launchpad itself: "Twisted job runner is more complex than needed"
  https://bugs.launchpad.net/launchpad/+bug/833888

For more details, see:
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner-2/+merge/73402

= Summary =
Ensure runJobInSubprocess doesn't violate its expected return value.

== Proposed fix ==
Always return a Deferred from runJobInSubprocess, by returning twisted.internet.defer.success(None) instead of None when a LeaseHeld is raised.

== Pre-implementation notes ==
None

== Implementation details ==
Unified lease acquisition by moving it to BaseJobRunner.acquireLease.
Also extracted job string generation to BaseJobRunner.job_str, and so it can be used by BaseJobRunner.acquireLease.

== Tests ==
bin/test -t test_lease_held_handled

== Demo and Q/A ==
Propose a merge on qastaging.  Run merge-proposal-jobs.py on qastaging.  It
should run successfully.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
-- 
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner-2/+merge/73402
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad.
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/runner.py	2011-08-30 16:10:30 +0000
@@ -42,6 +42,9 @@
 from twisted.internet import (
     reactor,
     )
+from twisted.internet.defer import (
+    succeed,
+    )
 from twisted.protocols import amp
 from twisted.python import log
 from zope.component import getUtility
@@ -173,15 +176,32 @@
         if self.error_utility is None:
             self.error_utility = errorlog.globalErrorUtility
 
+    def acquireLease(self, job):
+        self.logger.debug(
+            'Trying to acquire lease for job in state %s' % (
+                job.status.title,))
+        try:
+            job.acquireLease()
+        except LeaseHeld:
+            self.logger.debug(
+                'Could not acquire lease for %s' % self.job_str(job))
+            self.incomplete_jobs.append(job)
+            return False
+        return True
+
+    @staticmethod
+    def job_str(job):
+        class_name = job.__class__.__name__
+        ijob_id = removeSecurityProxy(job).job.id
+        return '%s (ID %d)' % (class_name, ijob_id)
+
     def runJob(self, job):
         """Attempt to run a job, updating its status as appropriate."""
         job = IRunnableJob(job)
 
-        class_name = job.__class__.__name__
-        job_id = removeSecurityProxy(job).job.id
         self.logger.info(
-            'Running %s (ID %d) in status %s' % (
-                class_name, job_id, job.status.title,))
+            'Running %s in status %s' % (
+                self.job_str(job), job.status.title))
         job.start()
         transaction.commit()
         do_retry = False
@@ -291,14 +311,7 @@
         """Run all the Jobs for this JobRunner."""
         for job in self.jobs:
             job = IRunnableJob(job)
-            self.logger.debug(
-                'Trying to acquire lease for job in state %s' % (
-                    job.status.title,))
-            try:
-                job.acquireLease()
-            except LeaseHeld:
-                self.logger.debug('Could not acquire lease for job')
-                self.incomplete_jobs.append(job)
+            if not self.acquireLease(job):
                 continue
             # Commit transaction to clear the row lock.
             transaction.commit()
@@ -412,21 +425,16 @@
         :return: a Deferred that fires when the job has completed.
         """
         job = IRunnableJob(job)
-        try:
-            job.acquireLease()
-        except LeaseHeld:
-            self.incomplete_jobs.append(job)
-            return
+        if not self.acquireLease(job):
+            return succeed(None)
         # Commit transaction to clear the row lock.
         transaction.commit()
         job_id = job.id
         deadline = timegm(job.lease_expires.timetuple())
 
         # Log the job class and database ID for debugging purposes.
-        class_name = job.__class__.__name__
-        ijob_id = removeSecurityProxy(job).job.id
         self.logger.info(
-            'Running %s (ID %d).' % (class_name, ijob_id))
+            'Running %s.' % self.job_str(job))
         self.logger.debug(
             'Running %r, lease expires %s', job, job.lease_expires)
         deferred = self.pool.doWork(

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/tests/test_runner.py	2011-08-30 16:10:30 +0000
@@ -23,6 +23,7 @@
 from lp.services.job.interfaces.job import (
     IRunnableJob,
     JobStatus,
+    LeaseHeld,
     SuspendJobException,
     )
 from lp.services.job.model.job import Job
@@ -507,6 +508,22 @@
     jobs = []
 
 
+class LeaseHeldJob(StaticJobSource):
+
+    implements(IRunnableJob)
+
+    jobs = [()]
+
+    done = False
+
+    def __init__(self, id):
+        self.job = Job()
+        self.id = id
+
+    def acquireLease(self):
+        raise LeaseHeld()
+
+
 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
 
     layer = ZopelessDatabaseLayer
@@ -623,6 +640,16 @@
         self.assertEqual(
             (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
 
+    def test_lease_held_handled(self):
+        """Jobs that raise LeaseHeld are handled correctly."""
+        logger = BufferLogger()
+        logger.setLevel(logging.DEBUG)
+        runner = TwistedJobRunner.runFromSource(
+            LeaseHeldJob, 'branchscanner', logger)
+        self.assertIn('Could not acquire lease', logger.getLogBuffer())
+        self.assertEqual(
+            (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
 
 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):