← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #833888 in Launchpad itself: "Twisted job runner is more complex than needed"
  https://bugs.launchpad.net/launchpad/+bug/833888

For more details, see:
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner/+merge/72933

= Summary =
Fix bug 833888, "Twisted job runner is more complex than needed" and possibly 605772, "merge-proposal-jobs is "hanging", apparently with nothing to do"

== Proposed fix ==
Stop using ParallelLimitedTaskConsumer

== Pre-implementation notes ==
Discussed with deryck

== Implementation details ==
The Twisted job runner now iterates through the list of jobs only once, instead of trying to process new jobs as they become ready.

== Tests ==
bin/test -v test_runner

== Demo and Q/A ==
Run any job script that uses the TwistedJobRunner.  It should work when there are pending jobs, and when there are no pending jobs.


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
-- 
https://code.launchpad.net/~abentley/launchpad/simplify-twisted-runner/+merge/72933
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad.
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2011-08-19 14:12:28 +0000
+++ lib/lp/services/job/runner.py	2011-08-25 18:05:26 +0000
@@ -40,7 +40,6 @@
 from lazr.delegates import delegates
 import transaction
 from twisted.internet import (
-    defer,
     reactor,
     )
 from twisted.protocols import amp
@@ -61,10 +60,6 @@
 from lp.services.mail.sendmail import MailController
 from lp.services.scripts.base import LaunchpadCronScript
 from lp.services.twistedsupport import run_reactor
-from lp.services.twistedsupport.task import (
-    ParallelLimitedTaskConsumer,
-    PollingTaskSource,
-    )
 
 
 class BaseRunnableJobSource:
@@ -472,36 +467,21 @@
             oops = self._doOops(job, sys.exc_info())
             self._logOopsId(oops['id'])
 
-    def getTaskSource(self):
-        """Return a task source for all jobs in job_source."""
-
-        def producer():
-            while True:
-                # XXX: JonathanLange bug=741204: If we're getting all of the
-                # jobs at the start anyway, we can use a DeferredSemaphore,
-                # instead of the more complex PollingTaskSource, which is
-                # better suited to cases where we don't know how much work
-                # there will be.
-                jobs = list(self.job_source.iterReady())
-                if len(jobs) == 0:
-                    yield None
-                for job in jobs:
-                    yield lambda: self.runJobInSubprocess(job)
-        return PollingTaskSource(5, producer().next)
-
-    def doConsumer(self):
-        """Create a ParallelLimitedTaskConsumer for this job type."""
-        # 1 is hard-coded for now until we're sure we'd get gains by running
-        # more than one at a time.  Note that several tests, including
-        # test_timeout, rely on this being 1.
-        consumer = ParallelLimitedTaskConsumer(1, logger=None)
-        return consumer.consume(self.getTaskSource())
-
     def runAll(self):
-        """Run all ready jobs, and any that become ready while running."""
+        """Run all ready jobs."""
         self.pool.start()
-        d = defer.maybeDeferred(self.doConsumer)
-        d.addCallbacks(self.terminated, self.failed)
+        try:
+            jobs = list(self.job_source.iterReady())
+            if len(jobs) == 0:
+                self.terminated()
+                return
+            d = self.runJobInSubprocess(jobs[0])
+            for job in jobs[1:]:
+                d.addCallback(lambda ignored: self.runJobInSubprocess(job))
+            d.addCallbacks(self.terminated, self.failed)
+        except:
+            self.terminated()
+            raise
 
     def terminated(self, ignored=None):
         """Callback to stop the processpool and reactor."""

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2011-08-19 14:25:39 +0000
+++ lib/lp/services/job/tests/test_runner.py	2011-08-25 18:05:26 +0000
@@ -500,6 +500,13 @@
         self.x = '*' * (10 ** 6)
 
 
+class NoJobs(StaticJobSource):
+
+    done = False
+
+    jobs = []
+
+
 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
 
     layer = ZopelessDatabaseLayer
@@ -608,6 +615,14 @@
         oops = self.getOopsReport(runner, 0)
         self.assertEqual('MemoryError', oops.type)
 
+    def test_no_jobs(self):
+        logger = BufferLogger()
+        logger.setLevel(logging.INFO)
+        runner = TwistedJobRunner.runFromSource(
+            NoJobs, 'branchscanner', logger)
+        self.assertEqual(
+            (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
+
 
 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):