launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04779
[Merge] lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #833888 in Launchpad itself: "Twisted job runner is more complex than needed"
https://bugs.launchpad.net/launchpad/+bug/833888
For more details, see:
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner/+merge/72933
= Summary =
Fix bug 833888, "Twisted job runner is more complex than needed" and possibly 605772, "merge-proposal-jobs is "hanging", apparently with nothing to do"
== Proposed fix ==
Stop using ParallelLimitedTaskConsumer
== Pre-implementation notes ==
Discussed with deryck
== Implementation details ==
The Twisted job runner now iterates through the list of jobs only once, instead of trying to process new jobs as they become ready.
== Tests ==
bin/test -v test_runner
== Demo and Q/A ==
Run any job script that uses the TwistedJobRunner. It should work when there are pending jobs, and when there are no pending jobs.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/job/runner.py
--
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner/+merge/72933
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad.
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-08-19 14:12:28 +0000
+++ lib/lp/services/job/runner.py 2011-08-25 18:05:26 +0000
@@ -40,7 +40,6 @@
from lazr.delegates import delegates
import transaction
from twisted.internet import (
- defer,
reactor,
)
from twisted.protocols import amp
@@ -61,10 +60,6 @@
from lp.services.mail.sendmail import MailController
from lp.services.scripts.base import LaunchpadCronScript
from lp.services.twistedsupport import run_reactor
-from lp.services.twistedsupport.task import (
- ParallelLimitedTaskConsumer,
- PollingTaskSource,
- )
class BaseRunnableJobSource:
@@ -472,36 +467,21 @@
oops = self._doOops(job, sys.exc_info())
self._logOopsId(oops['id'])
- def getTaskSource(self):
- """Return a task source for all jobs in job_source."""
-
- def producer():
- while True:
- # XXX: JonathanLange bug=741204: If we're getting all of the
- # jobs at the start anyway, we can use a DeferredSemaphore,
- # instead of the more complex PollingTaskSource, which is
- # better suited to cases where we don't know how much work
- # there will be.
- jobs = list(self.job_source.iterReady())
- if len(jobs) == 0:
- yield None
- for job in jobs:
- yield lambda: self.runJobInSubprocess(job)
- return PollingTaskSource(5, producer().next)
-
- def doConsumer(self):
- """Create a ParallelLimitedTaskConsumer for this job type."""
- # 1 is hard-coded for now until we're sure we'd get gains by running
- # more than one at a time. Note that several tests, including
- # test_timeout, rely on this being 1.
- consumer = ParallelLimitedTaskConsumer(1, logger=None)
- return consumer.consume(self.getTaskSource())
-
def runAll(self):
- """Run all ready jobs, and any that become ready while running."""
+ """Run all ready jobs."""
self.pool.start()
- d = defer.maybeDeferred(self.doConsumer)
- d.addCallbacks(self.terminated, self.failed)
+ try:
+ jobs = list(self.job_source.iterReady())
+ if len(jobs) == 0:
+ self.terminated()
+ return
+ d = self.runJobInSubprocess(jobs[0])
+ for job in jobs[1:]:
+ d.addCallback(lambda ignored: self.runJobInSubprocess(job))
+ d.addCallbacks(self.terminated, self.failed)
+ except:
+ self.terminated()
+ raise
def terminated(self, ignored=None):
"""Callback to stop the processpool and reactor."""
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2011-08-19 14:25:39 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-08-25 18:05:26 +0000
@@ -500,6 +500,13 @@
self.x = '*' * (10 ** 6)
+class NoJobs(StaticJobSource):
+
+ done = False
+
+ jobs = []
+
+
class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -608,6 +615,14 @@
oops = self.getOopsReport(runner, 0)
self.assertEqual('MemoryError', oops.type)
+ def test_no_jobs(self):
+ logger = BufferLogger()
+ logger.setLevel(logging.INFO)
+ runner = TwistedJobRunner.runFromSource(
+ NoJobs, 'branchscanner', logger)
+ self.assertEqual(
+ (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):