launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04819
[Merge] lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad with lp:~abentley/launchpad/simplify-twisted-runner as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #833888 in Launchpad itself: "Twisted job runner is more complex than needed"
https://bugs.launchpad.net/launchpad/+bug/833888
For more details, see:
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner-2/+merge/73402
= Summary =
Ensure runJobInSubprocess doesn't violate its expected return value.
== Proposed fix ==
Always return a Deferred from runJobInSubprocess, by returning twisted.internet.defer.success(None) instead of None when a LeaseHeld is raised.
== Pre-implementation notes ==
None
== Implementation details ==
Unified lease acquisition by moving it to BaseJobRunner.acquireLease.
Also extracted job string generation to BaseJobRunner.job_str, and so it can be used by BaseJobRunner.acquireLease.
== Tests ==
bin/test -t test_lease_held_handled
== Demo and Q/A ==
Propose a merge on qastaging. Run merge-proposal-jobs.py on qastaging. It
should run successfully.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/job/runner.py
--
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner-2/+merge/73402
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad.
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/runner.py 2011-08-30 16:10:30 +0000
@@ -42,6 +42,9 @@
from twisted.internet import (
reactor,
)
+from twisted.internet.defer import (
+ succeed,
+ )
from twisted.protocols import amp
from twisted.python import log
from zope.component import getUtility
@@ -173,15 +176,32 @@
if self.error_utility is None:
self.error_utility = errorlog.globalErrorUtility
+ def acquireLease(self, job):
+ self.logger.debug(
+ 'Trying to acquire lease for job in state %s' % (
+ job.status.title,))
+ try:
+ job.acquireLease()
+ except LeaseHeld:
+ self.logger.debug(
+ 'Could not acquire lease for %s' % self.job_str(job))
+ self.incomplete_jobs.append(job)
+ return False
+ return True
+
+ @staticmethod
+ def job_str(job):
+ class_name = job.__class__.__name__
+ ijob_id = removeSecurityProxy(job).job.id
+ return '%s (ID %d)' % (class_name, ijob_id)
+
def runJob(self, job):
"""Attempt to run a job, updating its status as appropriate."""
job = IRunnableJob(job)
- class_name = job.__class__.__name__
- job_id = removeSecurityProxy(job).job.id
self.logger.info(
- 'Running %s (ID %d) in status %s' % (
- class_name, job_id, job.status.title,))
+ 'Running %s in status %s' % (
+ self.job_str(job), job.status.title))
job.start()
transaction.commit()
do_retry = False
@@ -291,14 +311,7 @@
"""Run all the Jobs for this JobRunner."""
for job in self.jobs:
job = IRunnableJob(job)
- self.logger.debug(
- 'Trying to acquire lease for job in state %s' % (
- job.status.title,))
- try:
- job.acquireLease()
- except LeaseHeld:
- self.logger.debug('Could not acquire lease for job')
- self.incomplete_jobs.append(job)
+ if not self.acquireLease(job):
continue
# Commit transaction to clear the row lock.
transaction.commit()
@@ -412,21 +425,16 @@
:return: a Deferred that fires when the job has completed.
"""
job = IRunnableJob(job)
- try:
- job.acquireLease()
- except LeaseHeld:
- self.incomplete_jobs.append(job)
- return
+ if not self.acquireLease(job):
+ return succeed(None)
# Commit transaction to clear the row lock.
transaction.commit()
job_id = job.id
deadline = timegm(job.lease_expires.timetuple())
# Log the job class and database ID for debugging purposes.
- class_name = job.__class__.__name__
- ijob_id = removeSecurityProxy(job).job.id
self.logger.info(
- 'Running %s (ID %d).' % (class_name, ijob_id))
+ 'Running %s.' % self.job_str(job))
self.logger.debug(
'Running %r, lease expires %s', job, job.lease_expires)
deferred = self.pool.doWork(
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:30 +0000
@@ -23,6 +23,7 @@
from lp.services.job.interfaces.job import (
IRunnableJob,
JobStatus,
+ LeaseHeld,
SuspendJobException,
)
from lp.services.job.model.job import Job
@@ -507,6 +508,22 @@
jobs = []
+class LeaseHeldJob(StaticJobSource):
+
+ implements(IRunnableJob)
+
+ jobs = [()]
+
+ done = False
+
+ def __init__(self, id):
+ self.job = Job()
+ self.id = id
+
+ def acquireLease(self):
+ raise LeaseHeld()
+
+
class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -623,6 +640,16 @@
self.assertEqual(
(0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+ def test_lease_held_handled(self):
+ """Jobs that raise LeaseHeld are handled correctly."""
+ logger = BufferLogger()
+ logger.setLevel(logging.DEBUG)
+ runner = TwistedJobRunner.runFromSource(
+ LeaseHeldJob, 'branchscanner', logger)
+ self.assertIn('Could not acquire lease', logger.getLogBuffer())
+ self.assertEqual(
+ (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):