← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jtv/launchpad/commandspawner into lp:launchpad

 

Jeroen T. Vermeulen has proposed merging lp:~jtv/launchpad/commandspawner into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers): code
Related bugs:
  #181368 cron.daily explodes in non-obvious ways if drescher is under load
  https://bugs.launchpad.net/bugs/181368

For more details, see:
https://code.launchpad.net/~jtv/launchpad/commandspawner/+merge/48226

= CommandSpawner =

In order to fix bug 181368, we need to parallelize some runs of external commands.  There doesn't seem to be any good reusable way to do this, so this branch provides one: CommandSpawner.  It lets you run multiple commands in parallel, like so:

spawner = CommandSpawner()
spawner.start("/usr/local/bin/frobnicate")
spawner.start(["wget", url])
spawner.start("do_other_processing")
spawner.complete()

This example runs three commands in parallel, as separate processes, and in the last line waits for them all to complete.  There are optional callback hooks that let you deal with output and error output, as well as one for process completion.  Two simple handler classes provide a starting point for these callback hooks.

(I also added two convenience methods to FakeMethod to facilitate tests.  It may come in handy elsewhere.)

I also converted ftparchive to use the CommandSpawner, though I kept actual parallelization of the run separate to keep the branch under control.

To test this I ran all Soyuz tests, as well as all that mentioned "archive" or "publish."  In addition a full EC2 run is underway.

For Q/A, we'll want to ensure that the publish-distro script still works as expected and still runs its usual apt-ftparchive command line.  This takes long enough that it's easy to observe in "top" or "ps."


No lint,

Jeroen
-- 
https://code.launchpad.net/~jtv/launchpad/commandspawner/+merge/48226
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jtv/launchpad/commandspawner into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/ftparchive.py'
--- lib/lp/archivepublisher/ftparchive.py	2010-12-19 22:47:25 +0000
+++ lib/lp/archivepublisher/ftparchive.py	2011-02-01 18:46:59 +0000
@@ -2,9 +2,7 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 import os
-from select import select
 from StringIO import StringIO
-import subprocess
 
 from storm.expr import (
     Desc,
@@ -24,6 +22,11 @@
 from lp.archivepublisher.utils import process_in_batches
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.registry.model.sourcepackagename import SourcePackageName
+from lp.services.command_spawner import (
+    CommandSpawner,
+    OutputLineHandler,
+    ReturnCodeReceiver,
+    )
 from lp.soyuz.enums import PackagePublishingStatus
 from lp.soyuz.model.component import Component
 from lp.soyuz.model.section import Section
@@ -46,49 +49,6 @@
         os.makedirs(path, 0755)
 
 
-# XXX malcc 2006-09-20 : Move this somewhere useful. If generalised with
-# timeout handling and stderr passthrough, could be a single method used for
-# this and the similar requirement in test_on_merge.py.
-
-def run_subprocess_with_logging(process_and_args, log, prefix):
-    """Run a subprocess, gathering the output as it runs and logging it.
-
-    process_and_args is a list containing the process to run and the
-    arguments for it, just as passed in the first argument to
-    subprocess.Popen.
-
-    log is a logger to pass the output we gather.
-
-    prefix is a prefix to attach to each line of output when we log it.
-    """
-    proc = subprocess.Popen(process_and_args,
-                            stdin=subprocess.PIPE,
-                            stdout=subprocess.PIPE,
-                            stderr=subprocess.PIPE,
-                            close_fds=True)
-    proc.stdin.close()
-    open_readers = set([proc.stdout, proc.stderr])
-    buf = ""
-    while open_readers:
-        rlist, wlist, xlist = select(open_readers, [], [])
-
-        for reader in rlist:
-            chunk = os.read(reader.fileno(), 1024)
-            if chunk == "":
-                open_readers.remove(reader)
-                if buf:
-                    log.debug(buf)
-            else:
-                buf += chunk
-                lines = buf.split("\n")
-                for line in lines[0:-1]:
-                    log.debug("%s%s" % (prefix, line))
-                buf = lines[-1]
-
-    ret = proc.wait()
-    return ret
-
-
 DEFAULT_COMPONENT = "main"
 
 CONFIG_HEADER = """
@@ -170,13 +130,31 @@
     def runApt(self, apt_config_filename):
         """Run apt in a subprocess and verify its return value. """
         self.log.debug("Filepath: %s" % apt_config_filename)
-        ret = run_subprocess_with_logging(["apt-ftparchive", "--no-contents",
-                                           "generate", apt_config_filename],
-                                          self.log, "a-f: ")
-        if ret:
+        # XXX JeroenVermeulen 2011-02-01 bug=181368: Run parallel
+        # apt-ftparchive processes for the various architectures (plus
+        # source).
+        stdout_handler = OutputLineHandler(self.log.debug, "a-f: ")
+        stderr_handler = OutputLineHandler(self.log.warning, "a-f: ")
+        completion_handler = ReturnCodeReceiver()
+        command = [
+            "apt-ftparchive",
+            "--no-contents",
+            "generate",
+            apt_config_filename,
+            ]
+        spawner = CommandSpawner()
+        spawner.start(
+            command, stdout_handler=stdout_handler,
+            stderr_handler=stderr_handler,
+            completion_handler=completion_handler)
+        spawner.complete()
+        stdout_handler.finalize()
+        stderr_handler.finalize()
+        if completion_handler.returncode != 0:
             raise AssertionError(
-                "Failure from apt-ftparchive. Return code %s" % ret)
-        return ret
+                "Failure from apt-ftparchive. Return code %s"
+                % completion_handler.returncode)
+        return completion_handler.returncode
 
     #
     # Empty Pocket Requests

=== added file 'lib/lp/services/command_spawner.py'
--- lib/lp/services/command_spawner.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/command_spawner.py	2011-02-01 18:46:59 +0000
@@ -0,0 +1,246 @@
+# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Execute commands in parallel sub-processes."""
+
+__metaclass__ = type
+__all__ = [
+    'CommandSpawner',
+    'OutputLineHandler',
+    'ReturnCodeReceiver',
+    ]
+
+from fcntl import (
+    fcntl,
+    F_GETFL,
+    F_SETFL,
+    )
+from os import O_NONBLOCK
+import select
+import subprocess
+
+
+def get_process_output_files(process):
+    """Return the files we watch for output coming from `process`."""
+    return [
+        process.stdout,
+        process.stderr,
+        ]
+
+
+def make_files_nonblocking(files):
+    """Put each of `files` in non-blocking mode.
+
+    This allows the `CommandSpawner` to read all available output from a
+    process without blocking until the process completes.
+    """
+    for this_file in files:
+        fcntl(this_file, F_SETFL, fcntl(this_file, F_GETFL) | O_NONBLOCK)
+
+
+def has_pending_output(poll_event):
+    """Does the given event mask from `poll` indicate there's data to read?"""
+    input_mask = (select.POLLIN | select.POLLPRI)
+    return (poll_event & input_mask) != 0
+
+
+def has_terminated(poll_event):
+    """Does the given event mask from `poll` indicate process death?"""
+    death_mask = (select.POLLERR | select.POLLHUP | select.POLLNVAL)
+    return (poll_event & death_mask) != 0
+
+
+STDOUT = 1
+STDERR = 2
+COMPLETION = 3
+
+
+class CommandSpawner:
+    """Simple manager to execute commands in parallel.
+
+    Lets you run commands in sub-processes that will run simulaneously.
+    The CommandSpawner looks for output from the running processes, and
+    manages their cleanup.
+
+    The typical usage pattern is:
+
+    >>> spawner = CommandSpawner()
+    >>> spawner.start(["echo", "One parallel process"])
+    >>> spawner.start(["echo", "Another parallel process"])
+    >>> spawner.complete()
+
+    There are facilities for processing output and error output from the
+    sub-processes, as well as dealing with success and failure.  You can
+    pass callbacks to the `start` method, to be called when these events
+    occur.
+
+    As yet there is no facility for feeding input to the processes.
+    """
+
+    def __init__(self):
+        self.running_processes = {}
+        self.poll = select.poll()
+
+    def start(self, command, stdout_handler=None, stderr_handler=None,
+              completion_handler=None):
+        """Run `command` in a sub-process.
+
+        This starts the command, but does not wait for it to complete.
+        Instead of waiting for completion, you can pass handlers that
+        will be called when certain events occur.
+
+        :param command: Command line to execute in a sub-process.  May be
+            either a string (for a single executable name) or a list of
+            strings (for an executable name plus arguments).
+        :param stdout_handler: Callback to handle output received from the
+            sub-process.  Must take the output as its sole argument.  May be
+            called any number of times as output comes in.
+        :param stderr_handler: Callback to handle error output received from
+            the sub-process.  Must take the output as its sole argument.  May
+            be called any number of times as output comes in.
+        :param failure_handler: Callback to be invoked, exactly once, when the
+            sub-process exits.  Must take `command`'s numeric return code as
+            its sole argument.
+        """
+        process = self._spawn(command)
+        handlers = {
+            STDOUT: stdout_handler,
+            STDERR: stderr_handler,
+            COMPLETION: completion_handler,
+        }
+        self.running_processes[process] = handlers
+        pipes = get_process_output_files(process)
+        for pipe in pipes:
+            self.poll.register(pipe, select.POLLIN | select.POLLPRI)
+        make_files_nonblocking(pipes)
+
+    def communicate(self):
+        """Execute one iteration of the main event loop.  Blocks."""
+        # Poll for output, but also wake up periodically to check for
+        # completed processes.
+        milliseconds = 1
+        poll_result = self.poll.poll(milliseconds)
+
+        # Map each file descriptor to its poll events.
+        events_by_fd = dict(poll_result)
+
+        # Iterate over a copy of the processes list: we may be removing
+        # items from the original as processes complete.
+        processes = self.running_processes.keys()
+        for process in processes:
+            self._service(process, events_by_fd)
+            if process.returncode is not None:
+                # Process has completed.  Remove it.
+                try:
+                    self._handle(process, COMPLETION, process.returncode)
+                finally:
+                    for pipe in get_process_output_files(process):
+                        self.poll.unregister(pipe)
+                    del self.running_processes[process]
+
+    def complete(self):
+        """Run `self.communicate` until all sub-processes have completed."""
+        while len(self.running_processes) > 0:
+            self.communicate()
+
+    def kill(self):
+        """Kill any remaining child processes."""
+        for process in self.running_processes.iterkeys():
+            process.terminate()
+
+    def _spawn(self, command):
+        """Spawn a sub-process for `command`.  Overridable in tests."""
+        return subprocess.Popen(
+            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+            close_fds=True)
+
+    def _handle(self, process, event, *args):
+        """If we have a handler for `event` on `process`, call it."""
+        process_handlers = self.running_processes[process]
+        handler = process_handlers.get(event)
+        if handler is not None:
+            handler(*args)
+
+    def _read(self, process, pipe_file, event):
+        """Read output from `pipe_file`."""
+        try:
+            output = pipe_file.read()
+        except IOError, e:
+            # "Resource temporarily unavailable"--not an error for a
+            # nonblocking socket, just nothing to read.
+            if e.errno != 11:
+                raise
+        else:
+            if len(output) > 0:
+                self._handle(process, event, output)
+
+    def _service(self, process, events_by_fd):
+        """Service `process`."""
+        stdout_events = events_by_fd.get(process.stdout.fileno(), 0)
+        stderr_events = events_by_fd.get(process.stderr.fileno(), 0)
+        if has_pending_output(stdout_events):
+            self._read(process, process.stdout, STDOUT)
+        if has_pending_output(stderr_events):
+            self._read(process, process.stderr, STDERR)
+        if has_terminated(stdout_events):
+            process.wait()
+
+
+class OutputLineHandler:
+    """Collect and handle lines of output from a sub-process.
+
+    Output received from a sub-process may not be neatly broken down by
+    line.  This class collects them into lines and processes them one by
+    one.  If desired, it can also add a prefix to each.
+    """
+
+    def __init__(self, line_processor, prefix=""):
+        """Set up an output line handler.
+
+        :param line_processor: A callback to be invoked for each line of
+            output received.  Will receive exactly one argument: a single
+            nonempty line of text, without the trailing newline.
+        :param prefix: An optional string to be prefixed to each line of
+            output before it is sent into the `line_processor`.
+        """
+        self.line_processor = line_processor
+        self.prefix = prefix
+        self.incomplete_buffer = ""
+
+    def process_line(self, line):
+        """Process a single line of output."""
+        if line != "":
+            self.line_processor("%s%s" % (self.prefix, line))
+
+    def __call__(self, output):
+        """Process multi-line output.
+
+        Any trailing text not (yet) terminated with a newline is buffered.
+        """
+        lines = (self.incomplete_buffer + output).split("\n")
+        if not output.endswith("\n") and len(lines) > 0:
+            self.incomplete_buffer = lines[-1]
+            lines = lines[:-1]
+        for line in lines:
+            self.process_line(line)
+
+    def finalize(self):
+        """Process the remaining incomplete line, if any."""
+        if self.incomplete_buffer != "":
+            self.process_line(self.incomplete_buffer)
+            self.incomplete_buffer = ""
+
+
+class ReturnCodeReceiver:
+    """A minimal completion handler for `CommandSpawner` processes.
+
+    Does nothing but collect the command's return code.
+
+    :ivar returncode: The numerical return code retrieved from the
+        process.  Stays None until the process completes.
+    """
+
+    returncode = None
+
+    def __call__(self, returncode):
+        self.returncode = returncode

=== added file 'lib/lp/services/tests/test_command_spawner.py'
--- lib/lp/services/tests/test_command_spawner.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/tests/test_command_spawner.py	2011-02-01 18:46:59 +0000
@@ -0,0 +1,396 @@
+# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for `CommandSpawner`."""
+
+__metaclass__ = type
+
+from datetime import (
+    datetime,
+    timedelta,
+    )
+from fcntl import (
+    fcntl,
+    F_GETFL,
+    )
+from os import (
+    fdopen,
+    O_NONBLOCK,
+    pipe,
+    )
+from pytz import utc
+from testtools.matchers import LessThan
+
+from lp.testing import TestCase
+from lp.testing.fakemethod import FakeMethod
+from lp.services.command_spawner import (
+    CommandSpawner,
+    OutputLineHandler,
+    ReturnCodeReceiver,
+    )
+
+
+def make_pipe():
+    """Create a pipe of `file` objects."""
+    r, w = pipe()
+    return fdopen(r, 'r'), fdopen(w, 'w')
+
+
+def write_and_flush(pipe, text):
+    """Write `text` into `pipe`, and flush."""
+    pipe.write(text)
+    pipe.flush()
+
+
+class FakeProcess:
+    """Fake `subprocess.Popen` result."""
+
+    def __init__(self, returncode=None):
+        self.returncode = returncode
+        self.stdout, self.stdout_sink = make_pipe()
+        self.stderr, self.stderr_sink = make_pipe()
+
+
+def instrument_spawn(spawner, process):
+    """Instrument `spawner` to spawn a fake process."""
+    spawner._spawn = FakeMethod(result=process)
+
+
+def is_nonblocking(this_file):
+    """Is `this_file` in non-blocking mode?"""
+    flags = fcntl(this_file, F_GETFL)
+    return flags & O_NONBLOCK != 0
+
+
+class TestCommandSpawner(TestCase):
+    """Unit tests for `CommandSpawner`.
+
+    Uses fake processes, so does not test all the way down to the bare metal.
+    Commands are not actually run.
+    """
+
+    def _makeSpawnerAndProcess(self, returncode=None):
+        """Create a `CommandSpawner` and instrument it with a `FakeProcess`.
+
+        :return: A tuple of the spawner and the fake process it will "run."
+        """
+        spawner = CommandSpawner()
+        process = FakeProcess(returncode=returncode)
+        instrument_spawn(spawner, process)
+        return spawner, process
+
+    def test_starts_out_with_no_processes(self):
+        spawner = CommandSpawner()
+        self.assertEqual({}, spawner.running_processes)
+
+    def test_completes_with_no_processes(self):
+        spawner = CommandSpawner()
+        spawner.complete()
+        self.assertEqual({}, spawner.running_processes)
+
+    def test_kill_works_with_no_processes(self):
+        spawner = CommandSpawner()
+        spawner.kill()
+        self.assertEqual({}, spawner.running_processes)
+
+    def test_start_adds_a_process(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        spawner.start("/bin/true")
+        self.assertEqual([process], spawner.running_processes.keys())
+
+    def test_start_runs_its_command(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        spawner.start("/bin/true")
+        spawn_calls = spawner._spawn.calls
+        self.assertEqual([("/bin/true", )], spawner._spawn.extract_args())
+
+    def test_output_is_nonblocking(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        spawner.start("/bin/true")
+        self.assertTrue(is_nonblocking(process.stdout))
+        self.assertTrue(is_nonblocking(process.stderr))
+
+    def test_can_add_multiple_processes(self):
+        spawner = CommandSpawner()
+
+        first_process = FakeProcess()
+        instrument_spawn(spawner, first_process)
+        spawner.start(["/bin/echo", "1"])
+
+        second_process = FakeProcess()
+        instrument_spawn(spawner, second_process)
+        spawner.start(["/bin/echo", "2"])
+
+        self.assertContentEqual(
+            [first_process, second_process], spawner.running_processes)
+
+    def test_kill_terminates_processes(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        process.terminate = FakeMethod()
+        spawner.start("/bin/cat")
+        spawner.kill()
+        self.assertNotEqual(0, process.terminate.call_count)
+
+    def test_handles_output(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        stdout_handler = FakeMethod()
+        spawner.start("ls", stdout_handler=stdout_handler)
+        write_and_flush(process.stdout_sink, "readme.txt\n")
+        spawner.communicate()
+        self.assertEqual([("readme.txt\n", )], stdout_handler.extract_args())
+
+    def test_handles_error_output(self):
+        spawner, process = self._makeSpawnerAndProcess()
+        stderr_handler = FakeMethod()
+        spawner.start("ls", stderr_handler=stderr_handler)
+        write_and_flush(process.stderr_sink, "File not found.\n")
+        spawner.communicate()
+        self.assertEqual(
+            [("File not found.\n", )], stderr_handler.extract_args())
+
+    def test_does_not_call_completion_handler_until_completion(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=None)
+        completion_handler = FakeMethod()
+        spawner.start("echo", completion_handler=completion_handler)
+        spawner.communicate()
+        self.assertEqual(0, completion_handler.call_count)
+
+    def test_calls_completion_handler_on_success(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=0)
+        completion_handler = FakeMethod()
+        spawner.start("echo", completion_handler=completion_handler)
+        spawner.complete()
+        self.assertEqual(1, completion_handler.call_count)
+
+    def test_calls_completion_handler_on_failure(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=1)
+        completion_handler = FakeMethod()
+        spawner.start("echo", completion_handler=completion_handler)
+        spawner.complete()
+        self.assertEqual(1, completion_handler.call_count)
+
+    def test_does_not_call_completion_handler_twice(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=0)
+        completion_handler = FakeMethod()
+        spawner.start("echo", completion_handler=completion_handler)
+        spawner.complete()
+        spawner.complete()
+        self.assertEqual(1, completion_handler.call_count)
+
+    def test_passes_return_code_to_completion_handler(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=101)
+        completion_handler = FakeMethod()
+        spawner.start("echo", completion_handler=completion_handler)
+        spawner.complete()
+        self.assertEqual(((101, ), {}), completion_handler.calls[-1])
+
+    def test_handles_output_before_completion(self):
+        spawner, process = self._makeSpawnerAndProcess(returncode=0)
+        handler = FakeMethod()
+        spawner.start(
+            "hello", stdout_handler=handler, completion_handler=handler)
+        write_and_flush(process.stdout_sink, "Hello\n")
+        spawner.complete()
+        self.assertEqual([("Hello\n", ), (0, )], handler.extract_args())
+
+    def test_handles_multiple_processes(self):
+        spawner = CommandSpawner()
+        handler = FakeMethod()
+
+        first_process = FakeProcess(returncode=1)
+        instrument_spawn(spawner, first_process)
+        spawner.start(["/bin/echo", "1"], completion_handler=handler)
+
+        second_process = FakeProcess(returncode=2)
+        instrument_spawn(spawner, second_process)
+        spawner.start(["/bin/echo", "2"], completion_handler=handler)
+
+        spawner.complete()
+        self.assertContentEqual([(1, ), (2, )], handler.extract_args())
+
+
+class AcceptOutput:
+    """Simple stdout or stderr handler."""
+
+    def __call__(self, output):
+        self.output = output
+
+
+class TestCommandSpawnerAcceptance(TestCase):
+    """Acceptance tests for `CommandSpawner`.
+
+    This test spawns actual processes, so be careful:
+     * Think about security when running commands.
+     * Don't rely on nonstandard commands.
+     * Don't hold up the test suite with slow commands.
+    """
+
+    def test_command_can_be_string(self):
+        spawner = CommandSpawner()
+        spawner.start("/bin/pwd")
+        spawner.complete()
+
+    def test_command_can_be_list(self):
+        spawner = CommandSpawner()
+        spawner.start(["/bin/pwd"])
+        spawner.complete()
+
+    def test_calls_stdout_handler(self):
+        spawner = CommandSpawner()
+        stdout_handler = AcceptOutput()
+        spawner.start(["echo", "hi"], stdout_handler=stdout_handler)
+        spawner.complete()
+        self.assertEqual("hi\n", stdout_handler.output)
+
+    def test_calls_completion_handler(self):
+        spawner = CommandSpawner()
+        completion_handler = ReturnCodeReceiver()
+        spawner.start("/bin/true", completion_handler=completion_handler)
+        spawner.complete()
+        self.assertEqual(0, completion_handler.returncode)
+
+    def test_communicate_returns_after_event(self):
+        spawner = CommandSpawner()
+        self.addCleanup(spawner.kill)
+        before = datetime.now(utc)
+        spawner.start(["/bin/sleep", "10"])
+        spawner.start("/bin/pwd")
+        spawner.communicate()
+        after = datetime.now(utc)
+        self.assertThat(after - before, LessThan(timedelta(seconds=10)))
+
+    def test_kill_terminates_processes(self):
+        spawner = CommandSpawner()
+        spawner.start(["/bin/sleep", "10"])
+        spawner.start(["/bin/sleep", "10"])
+        before = datetime.now(utc)
+        spawner.kill()
+        spawner.complete()
+        after = datetime.now(utc)
+        self.assertThat(after - before, LessThan(timedelta(seconds=10)))
+
+    def test_start_does_not_block(self):
+        spawner = CommandSpawner()
+        self.addCleanup(spawner.kill)
+        before = datetime.now(utc)
+        spawner.start(["/bin/sleep", "10"])
+        after = datetime.now(utc)
+        self.assertThat(after - before, LessThan(timedelta(seconds=10)))
+
+    def test_subprocesses_run_in_parallel(self):
+        spawner = CommandSpawner()
+
+        processes = 10
+        seconds = 0.2
+        for counter in xrange(processes):
+            spawner.start(["/bin/sleep", str(seconds)])
+
+        before = datetime.now(utc)
+        spawner.complete()
+        after = datetime.now(utc)
+
+        sequential_time = timedelta(seconds=(seconds * processes))
+        self.assertThat(after - before, LessThan(sequential_time))
+
+    def test_integrates_with_outputlinehandler(self):
+        spawner = CommandSpawner()
+        handler = OutputLineHandler(FakeMethod())
+        spawner.start(["echo", "hello"], stdout_handler=handler)
+        spawner.complete()
+        self.assertEqual([("hello", )], handler.line_processor.extract_args())
+
+    def test_integrates_with_returncodereceiver(self):
+        spawner = CommandSpawner()
+        handler = ReturnCodeReceiver()
+        spawner.start("/bin/true", completion_handler=handler)
+        spawner.complete()
+        self.assertEqual(0, handler.returncode)
+
+
+class TestOutputLineHandler(TestCase):
+    """Unit tests for `OutputLineHandler`."""
+
+    def setUp(self):
+        super(TestOutputLineHandler, self).setUp()
+        self.handler = OutputLineHandler(FakeMethod())
+
+    def _getLines(self):
+        """Get the lines that were passed to `handler`'s line processor."""
+        return [
+            line
+            for (line, ) in self.handler.line_processor.extract_args()]
+
+    def test_processes_line(self):
+        self.handler("x\n")
+        self.assertEqual(["x"], self._getLines())
+
+    def test_buffers_partial_line(self):
+        self.handler("x")
+        self.assertEqual([], self._getLines())
+
+    def test_splits_lines(self):
+        self.handler("a\nb\n")
+        self.assertEqual(["a", "b"], self._getLines())
+
+    def test_ignores_empty_output(self):
+        self.handler("")
+        self.assertEqual([], self._getLines())
+
+    def test_finalize_ignores_empty_output(self):
+        self.handler("")
+        self.handler.finalize()
+        self.assertEqual([], self._getLines())
+
+    def test_ignores_empty_line(self):
+        self.handler("\n")
+        self.assertEqual([], self._getLines())
+
+    def test_joins_partial_lines(self):
+        self.handler("h")
+        self.handler("i\n")
+        self.assertEqual(["hi"], self._getLines())
+
+    def test_joins_lines_across_multiple_calls(self):
+        self.handler("h")
+        self.handler("i")
+        self.handler("!\n")
+        self.assertEqual(["hi!"], self._getLines())
+
+    def test_joins_lines_across_empty_calls(self):
+        self.handler("h")
+        self.handler("")
+        self.handler("i\n")
+        self.assertEqual(["hi"], self._getLines())
+
+    def test_finalize_processes_remaining_partial_line(self):
+        self.handler("foo")
+        self.handler.finalize()
+        self.assertEqual(["foo"], self._getLines())
+
+    def test_finalize_is_idempotent(self):
+        self.handler("foo")
+        self.handler.finalize()
+        self.handler.finalize()
+        self.assertEqual(["foo"], self._getLines())
+
+    def test_finalize_joins_partial_lines(self):
+        self.handler("h")
+        self.handler("i")
+        self.handler.finalize()
+        self.assertEqual(["hi"], self._getLines())
+
+    def test_adds_prefix(self):
+        self.handler.prefix = "->"
+        self.handler("here\n")
+        self.assertEqual(["->here"], self._getLines())
+
+    def test_finalize_adds_prefix(self):
+        self.handler.prefix = "->"
+        self.handler("here")
+        self.handler.finalize()
+        self.assertEqual(["->here"], self._getLines())
+
+    def test_empty_lines_are_ignored_despite_prefix(self):
+        self.handler.prefix = "->"
+        self.handler("\n")
+        self.assertEqual([], self._getLines())

=== modified file 'lib/lp/testing/fakemethod.py'
--- lib/lp/testing/fakemethod.py	2010-04-01 09:39:20 +0000
+++ lib/lp/testing/fakemethod.py	2011-02-01 18:46:59 +0000
@@ -1,9 +1,8 @@
-# Copyright 2009, 2010 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # pylint: disable-msg=E0702
 
-
 __metaclass__ = type
 __all__ = [
     'FakeMethod',
@@ -55,3 +54,11 @@
     @property
     def call_count(self):
         return len(self.calls)
+
+    def extract_args(self):
+        """Return just the calls' positional-arguments tuples."""
+        return [args for args, kwargs in self.calls]
+
+    def extract_kwargs(self):
+        """Return just the calls' keyword-arguments dicts."""
+        return [kwargs for args, kwargs in self.calls]


Follow ups