← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/launchpad/lp-service into lp:launchpad/devel

 

John A Meinel has proposed merging lp:~jameinel/launchpad/lp-service into lp:launchpad/devel.

Requested reviews:
  Jonathan Lange (jml)
  Launchpad code reviewers (launchpad-reviewers)


I'm pretty sure this code is based on the 'launchpad/devel' branch, but it might be db-devel.

The goal of this submission is to improve the time for "bzr+ssh" to connect and be useful. The new method must be activated by setting:
  [codehosting]
  use_forking_server = True

It is set to be enabled in "development" mode, but the default is still disabled. I can't give a recommendation for the production config, because the branch is private.

This implements a new service (LaunchpadForkingService). It sits on a socket and waits for a request to 'fork <command>'. When received, it creates a stdin/stdout/stderr fifo on disk, and forks itself, running 'run_bzr_*(command)', rather than using 'exec()' which requires bootstrapping the python process.

The benefit is seen with: time echo hello | ssh localhost bzr serve --inet ...

Without this patch, it is 2.5s to serve on the loopback. With this patch, it is 0.25s.

I'm very happy to work with someone to smooth out the finer points of this submission. I tried to be explicit about places in the code that I had a decision to make, and why I chose the method I did.

I didn't use the FeatureFlag system, because setting it up via the config file was a lot more straightforward. (globally enabling/disabling the functionality).
As near as I can tell, there should be no impact of rolling this code out on production, if use_forking_daemon: False.

(The make run_codehosting will not actually spawn the daemon, and the twisted Conch code just gets an 'if ' check to see that it should use the old code path.)

I haven't run the full test suite, but I have:
 1) Run all of the locally relevant tests "bzr selftest -s bt.lp_serve' and 'bin/test 
    lp.codehosting.sshserver
 2) Manually started the sftp service and run commands through the local instance. Both with 
    and without the forking service enabled. (And I have confirmed that it isn't used when 
    disabled, and is used when enabled, etc.)


-- 
https://code.launchpad.net/~jameinel/launchpad/lp-service/+merge/35877
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/lp-service into lp:launchpad/devel.
=== modified file 'Makefile'
--- Makefile	2010-09-07 18:15:01 +0000
+++ Makefile	2010-09-17 20:34:36 +0000
@@ -253,7 +253,7 @@
 
 run_codehosting: check_schema inplace stop
 	$(RM) thread*.request
-	bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
+	bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
 
 
 start_librarian: compile

=== added file 'bzrplugins/connect_to_lpservice.py'
--- bzrplugins/connect_to_lpservice.py	1970-01-01 00:00:00 +0000
+++ bzrplugins/connect_to_lpservice.py	2010-09-17 20:34:36 +0000
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+"""Script to ask lp-service to fork and connect.
+
+Meant to be equivalent to 'bzr lp-serve --inet' only connecting to the service
+to handle the request.
+"""
+
+import os
+import select
+import socket
+import sys
+import time
+
+# This is done as a simple script so that startup time can be kept low, only
+# having a minimal set of dependencies.
+
+class TrivialForwarder(object):
+    """Forward requests from stdin/out/err to an appropriate pipe."""
+
+    _timeout = 10000
+    _buf_size = 8192
+
+    def __init__(self):
+        self.poller = select.poll()
+
+        self.in_to_out = {}
+        self.inout_to_buffer = {}
+        self.start_time = time.time()
+
+    def log(self, info):
+        sys.stderr.write('%.3fs %s\n' % (time.time() - self.start_time, info))
+
+    def add_fifo_to_fid(self, fifo_path, fid):
+        """Read from fifo, write to fid"""
+        buf = []
+        # out from the child gets mapped back to our out, so we read from
+        # the child, and write to stdout
+        fd_child = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
+        self.in_to_out[fd_child] = fid
+        ## self.log('reading from %s to %d' % (fifo_path, fid))
+        self.poller.register(fd_child, select.POLLIN)
+        self.inout_to_buffer[fid] = buf
+        self.inout_to_buffer[fd_child] = buf
+
+    def add_fid_to_fifo(self, fid, fifo_path):
+        """Read from fid, write to fifo"""
+        buf = []
+        # We don't use O_NONBLOCK, because otherwise it raises an error if
+        # the write side isn't open yet, however it does mean we definitely
+        # need to open it *last*
+        fd_child = os.open(fifo_path, os.O_WRONLY)
+        ## self.log('reading from %d to %s' % (fid, fifo_path))
+        self.in_to_out[fid] = fd_child
+        self.poller.register(fid, select.POLLIN)
+        self.inout_to_buffer[fid] = buf
+        self.inout_to_buffer[fd_child] = buf
+
+    def run(self):
+        should_close = set()
+        while True:
+            events = self.poller.poll(self._timeout) # TIMEOUT?
+            if not events:
+                ## self.log('** timeout')
+                # TODO: check if all buffers are indicated 'closed' so we
+                #       should exit
+                continue
+            for fd, event in events:
+                ## self.log('event: %s %s  ' % (fd, event))
+                if event & select.POLLIN:
+                    # Register the output buffer, buffer a bit, and wait for
+                    # the output to be available
+                    buf = self.inout_to_buffer[fd]
+                    # TODO: We could set a maximum size for buf, and if we go
+                    #       beyond that, we stop reading
+                    # n_buffered = sum(map(len, buf))
+                    thebytes = os.read(fd, self._buf_size)
+                    buf.append(thebytes)
+                    out_fd = self.in_to_out[fd]
+                    ## self.log('read %d => %d register %d\n'
+                    ##          % (len(thebytes), sum(map(len, buf)),
+                    ##             out_fd))
+                    # Let the poller know that we need to do non-blocking output
+                    # We always re-register, we could know that it is already
+                    # active
+                    if not thebytes:
+                        # Input without content, treat this as a close request
+                        should_close.add(out_fd)
+                        self.poller.unregister(fd)
+                        self.log('%d empty read, closed' % (fd,))
+                        os.close(fd)
+                        ## self.log('no bytes closed closed, closing %d\n'
+                        ##          % (out_fd,))
+                    self.log('%d registering from %d' % (out_fd, fd))
+                    self.poller.register(out_fd, select.POLLOUT)
+                elif event & select.POLLOUT:
+                    # We can write some bytes without blocking, do so
+                    buf = self.inout_to_buffer[fd]
+                    if not buf:
+                        # the buffer is now empty, we have written everything
+                        # so unregister this buffer so we don't keep polling
+                        # for the ability to write without blocking
+                        ## self.log('unregistered\n')
+                        self.poller.unregister(fd)
+                        # Check to see if the input has been closed, and close
+                        # if true
+                        if fd in should_close:
+                            self.log('%d closed' % (fd,))
+                            os.close(fd)
+                        continue
+                    thebytes = ''.join(buf)
+                    n_written = os.write(fd, thebytes)
+                    thebytes = thebytes[n_written:]
+                    ## self.log('\n  wrote %d => %d remain\n'
+                    ##          % (n_written, len(thebytes)))
+                    if thebytes:
+                        buf[:] = [thebytes]
+                    else:
+                        del buf[:]
+                        # We *could* unregister the output here, but I have the
+                        # feeling waiting for another poll loop will be better
+                        # because it will avoid looping, oh we have bytes,
+                        # register, loop, find bytes, write them, unregister,
+                        # loop, find more bytes, register, loop, etc.
+                        # I don't know for sure, but I think this gives us at
+                        # least a chance to have more bytes to write before we
+                        # unregister
+                elif event & select.POLLHUP:
+                    # The connection hung up, I'm assuming these only occur on
+                    # the inputs for now..., but carry across the action.
+                    # Importantly, we don't close the out_fd yet, because we
+                    # want to flush the buffer first
+                    self.poller.unregister(fd)
+                    out_fd = self.in_to_out[fd]
+                    should_close.add(out_fd)
+                    ## self.log('closed, closing %d\n'
+                    ##          % (out_fd,))
+
+
+def _get_host_and_port(port):
+    host = None
+    if port is not None:
+        if ':' in port:
+            host, port = port.rsplit(':', 1)
+        port = int(port)
+    return host, port
+
+DEFAULT_HOST = '127.0.0.1'
+DEFAULT_PORT = 4156
+
+
+def main(args):
+    import optparse
+    p = optparse.OptionParser('%prog [options] userid')
+    p.add_option('--port',
+        help='The [host:]portnumber where the service is running')
+    opts, args = p.parse_args()
+    if len(args) != 1:
+        p.print_usage()
+        return 1
+    userid = int(args[0])
+    host, port = _get_host_and_port(opts.port)
+    if host is None:
+        host = DEFAULT_HOST
+    if port is None:
+        port = DEFAULT_PORT
+    t = time.time()
+    path = _request_fork(host, port, userid)
+    sys.stderr.write('got path in %.3fs: %s\n' % (time.time() - t, path))
+    _connect_to_fifos(path, t)
+
+
+def _request_fork(host, port, user_id):
+    """Ask the server to fork, and find out the new process's disk path."""
+    # Connect to the service, and request a new connection.
+    addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+        socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+    (family, socktype, proto, canonname, sockaddr) = addrs[0]
+    client_sock = socket.socket(family, socktype, proto)
+    try:
+        client_sock.connect(sockaddr)
+        client_sock.sendall('fork %d\n' % (user_id,))
+        response = client_sock.recv(1024)
+    except socket.error, e:
+        raise RuntimeError('Failed to connect: %s' % (e,))
+    if response.startswith('FAILURE'):
+        raise RuntimeError('Server rejected with: %s' % (response,))
+    # we got a valid path back, so lets return it
+    return response.strip()
+
+
+def _connect_to_fifos(path, tstart):
+    stdin_path = os.path.join(path, 'stdin')
+    stdout_path = os.path.join(path, 'stdout')
+    stderr_path = os.path.join(path, 'stderr')
+    forwarder = TrivialForwarder()
+    forwarder.start_time = tstart
+    forwarder.add_fid_to_fifo(sys.stdin.fileno(), stdin_path)
+    forwarder.add_fifo_to_fid(stdout_path, sys.stdout.fileno())
+    forwarder.add_fifo_to_fid(stderr_path, sys.stderr.fileno())
+    forwarder.run()
+
+if __name__ == '__main__':
+    sys.exit(main(sys.argv[1:]))

=== added directory 'bzrplugins/lpserve'
=== renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
--- bzrplugins/lpserve.py	2010-04-19 06:35:23 +0000
+++ bzrplugins/lpserve/__init__.py	2010-09-17 20:34:36 +0000
@@ -8,15 +8,33 @@
 
 __metaclass__ = type
 
-__all__ = ['cmd_launchpad_server']
-
-
+__all__ = ['cmd_launchpad_server',
+           'cmd_launchpad_forking_service',
+          ]
+
+
+import errno
+import os
 import resource
+import shlex
+import shutil
+import signal
+import socket
 import sys
+import tempfile
+import threading
+import time
 
 from bzrlib.commands import Command, register_command
 from bzrlib.option import Option
-from bzrlib import lockdir, ui
+from bzrlib import (
+    commands,
+    errors,
+    lockdir,
+    osutils,
+    trace,
+    ui,
+    )
 
 from bzrlib.smart import medium, server
 from bzrlib.transport import get_transport
@@ -110,3 +128,732 @@
 
 
 register_command(cmd_launchpad_server)
+
+
+class LPForkingService(object):
+    """A service that can be asked to start a new bzr subprocess via fork.
+
+    The basic idea is that python startup is very expensive. For example, the
+    original 'lp-serve' command could take 2.5s just to start up, before any
+    actual actions could be performed.
+
+    This class provides a service sitting on a socket, which can then be
+    requested to fork and run a given bzr command.
+
+    Clients connect to the socket and make a simple request, which then
+    receives a response. The possible requests are:
+
+        "hello\n":  Trigger a heartbeat to report that the program is still
+                    running, and write status information to the log file.
+        "quit\n":   Stop the service, but do so 'nicely', waiting for children
+                    to exit, etc. Once this is received the service will stop
+                    taking new requests on the port.
+        "fork <command>\n": Request a new subprocess to be started.
+            <command> is the bzr command to be run, such as "rocks" or
+            "lp-serve --inet 12".
+            The immediate response will be the path-on-disk to a directory full
+            of named pipes (fifos) that will be the stdout/stderr/stdin of the
+            new process.
+            If a client holds the socket open, when the child process exits,
+            the exit status (as given by 'wait()') will be written to the
+            socket.
+
+            Note that one of the key bits is that the client will not be
+            started with exec*, we just call 'commands.run_bzr*()' directly.
+            This way, any modules that are already loaded will not need to be
+            loaded again. However, care must be taken with any global-state
+            that should be reset.
+    """
+
+    # Design decisions. These are bits where we could have chosen a different
+    # method/implementation and weren't sure what would be best. Documenting
+    # the current decision, and the alternatives.
+    #
+    # [Decision #1]
+    #   Serve on a socket on port 4156 on the loopback
+    #       1) It doesn't make sense to serve to arbitrary hosts, we only want
+    #          the local host to make requests. (Since the client needs to
+    #          access the named fifos on the current filesystem.)
+    #       2) 4156 was chosen as the bzr serve port (4155) + 1.
+    #       3) It would also be reasonable to just used a named AF_UNIX socket
+    #          in a known or configurable location. Arguably we can provide
+    #          better security that way (since you can set rwx on a named
+    #          socket, but can't really do so on a port.)
+    # [Decision #2]
+    #   SIGCHLD
+    #       We want to quickly detect that children have exited so that we can
+    #       inform the client process quickly. At the moment, we register a
+    #       SIGCHLD handler that doesn't do anything. However, it means that
+    #       when we get the signal, if we are currently blocked in something
+    #       like '.accept()', we will jump out temporarily. At that point the
+    #       main loop will check if any children have exited. We could have
+    #       done this work as part of the signal handler, but that felt 'racy'
+    #       doing any serious work in a signal handler.
+    #       If we just used socket.timeout as the indicator to go poll for
+    #       children exiting, it slows the disconnect by as much as the full
+    #       timeout. (So a timeout of 1.0s will cause the process to hang by
+    #       that long until it determines that a child has exited, and can
+    #       close the connection.)
+    #       The current flow means that we'll notice exited children whenever
+    #       we finish the current work.
+    # [Decision #3]
+    #   Child vs Parent actions.
+    #       There are several actions that are done when we get a new request.
+    #       We have to create the fifos on disk, fork a new child, connect the
+    #       child to those handles, and inform the client of the new path (not
+    #       necessarily in that order.) It makes sense to wait to send the path
+    #       message until after the fifos have been created. That way the
+    #       client can just try to open them immediately, and the
+    #       client-and-child will be synchronized by the open() calls.
+    #       However, should the client be the one doing the mkfifo, should the
+    #       server? Who should be sending the message? Should we fork after the
+    #       mkfifo or before.
+    #       The current thoughts:
+    #           1) Try to do work in the child when possible. This should allow
+    #              for 'scaling' because the server is single-threaded.
+    #           2) We create the directory itself in the server, because that
+    #              allows the server to monitor whether the client failed to
+    #              clean up after itself or not.
+    #           3) Otherwise we create the fifos in the client, and then send
+    #              the message back.
+    # [Decision #4]
+    #   Exit information
+    #       How do we inform the client process that the child has exited?
+    #       1) Arguably they could see that stdout and stderr have been closed,
+    #          and thus stop reading. In testing, I wrote a client which uses
+    #          select.poll() over stdin/stdout/stderr and used that to ferry
+    #          the content to the appropriate local handle. However for the
+    #          FIFOs, when the remote end closed, I wouldn't see any
+    #          corresponding information on the local end. There obviously
+    #          wasn't any data to be read, so they wouldn't show up as
+    #          'readable' (for me to try to read, and get 0 bytes, indicating
+    #          it was closed). I also wasn't seeing POLLHUP, which seemed to be
+    #          the correct indicator.  As such, we decided to inform the client
+    #          on the socket that they originally made the fork request, rather
+    #          than just closing the socket immediately.
+    #       2) Going further, we could have had the forking server close the
+    #          socket, and only the child hold the socket open. When the child
+    #          exits, then the OS naturally closes the socket. However, if we
+    #          want the returncode, then we should put that as bytes on the
+    #          socket before we exit. Having the client do the work means that
+    #          in error conditions, it could easily die before being able to
+    #          write anything (think SEGFAULT, etc). We already want the
+    #          forking server to be 'wait'() ing on its children. Both so that
+    #          we don't get zombies, and with wait3() we can get the rusage
+    #          (user time, memory consumption, etc.)
+    #          As such, it seems reasonable that the server can then also
+    #          report back when a child is seen as exiting.
+    # [Decision #5]
+    #   cleanup once connected
+    #       The child process blocks during 'open()' waiting for the client to
+    #       connect to its fifos. Once the client has connected, the child then
+    #       deletes the temporary directory and the fifos from disk. This means
+    #       that there isn't much left for diagnosis, but it also means that
+    #       the client won't leave garbage around if it crashes, etc.
+    #       Note that the forking service itself still monitors the paths
+    #       created, and will delete garbage if it sees that a child failed to
+    #       do so.
+    # [Decision #6]
+    #   os._exit(retcode) in the child
+    #       Calling sys.exit(retcode) raises an exception, which then bubbles
+    #       up the stack and runs exit functions (and finally statements). When
+    #       I tried using it originally, I would see the current child bubble
+    #       all the way up the stack (through the server code that it fork()
+    #       through), and then get to main() returning code 0. However, the
+    #       process would exit nonzero. My guess is that something in the
+    #       atexit functions was failing, but that it was happening after
+    #       logging, etc had been shut down.
+    #       Note that whatever global state has been set up by the client,
+    #       should have been flushed before run_bzr_* has exited (which we *do*
+    #       wait for), and any other global state is probably a remnant from
+    #       the service process. Which will be cleaned up by the service
+    #       itself, rather than the child.
+    #       There is some possibility that files won't get flushed, etc. So we
+    #       may want to be calling sys.exitfunc() first. Note that bzr itself
+    #       uses sys.exitfunc(); os._exit() in the 'bzr' main script, as the
+    #       teardown time of all the python state was quite noticeable in
+    #       real-world runtime. As such, bzrlib should be pretty safe, or it
+    #       would have been failing for people already.
+    # [Decision #7]
+    #   prefork vs max children vs ?
+    #       For simplicity it seemed easiest to just fork when requested. Over
+    #       time, I realized it would be easy to allow running an arbitrary
+    #       command (no harder than just running one command), so it seemed
+    #       reasonable to switch over. If we go the prefork route, then we'll
+    #       need a way to tell the pre-forked children what command to run.
+    #       This could be as easy as just adding one more fifo that they wait
+    #       on in the same directory.
+    #       For now, I've chosen not to limit the number of forked children. I
+    #       don't know what a reasonable value is, and probably there are
+    #       already limitations at play. (If Conch limits connections, then it
+    #       will already be doing all the work, etc.)
+    # [Decision #8]
+    #   env vars
+    #       We could go with a much more structured definition for this data.
+    #       Or we could use bencode, or rio, or ...
+    #       I wanted something that would be easy enough to parse, sufficient
+    #       in complexity for what we want to convey, and gives us a good
+    #       way to know if we need to read more content from the socket.
+    #       Also, if we go for structured data, then we should structure all of
+    #       the requests.
+    # [Decision #9]
+    #   nicer errors to clients
+    #       This service is meant to be run only on the local system. As such,
+    #       we don't try to be extra defensive about leaking information to
+    #       clients. Instead we try to be helpful, and tell them as much as we
+    #       know about what went wrong.
+
+    DEFAULT_HOST = '127.0.0.1' # See [Decision #1]
+    DEFAULT_PORT = 4156
+    WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children
+    SOCKET_TIMEOUT = 1.0
+    SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
+    WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
+                                   # be read
+
+    _fork_function = os.fork
+
+    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
+        if host is None:
+            self.host = self.DEFAULT_HOST
+        else:
+            self.host = host
+        if port is None:
+            self.port = self.DEFAULT_PORT
+        else:
+            self.port = port
+        self._start_time = time.time()
+        self._should_terminate = threading.Event()
+        # We address these locally, in case of shutdown socket may be gc'd
+        # before we are
+        self._socket_timeout = socket.timeout
+        self._socket_error = socket.error
+        self._socket_timeout = socket.timeout
+        self._socket_error = socket.error
+        # Map from pid => information
+        self._child_processes = {}
+        self._children_spawned = 0
+
+    def _create_master_socket(self):
+        addrs = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC,
+            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
+        (family, socktype, proto, canonname, sockaddr) = addrs
+        self._server_socket = socket.socket(family, socktype, proto)
+        if sys.platform != 'win32':
+            self._server_socket.setsockopt(socket.SOL_SOCKET,
+                socket.SO_REUSEADDR, 1)
+        try:
+            self._server_socket.bind(sockaddr)
+        except self._socket_error, message:
+            raise errors.CannotBindAddress(self.host, self.port, message)
+        self._sockname = self._server_socket.getsockname()
+        # self.host = self._sockname[0]
+        self.port = self._sockname[1]
+        self._server_socket.listen(5)
+        self._server_socket.settimeout(self.SOCKET_TIMEOUT)
+        trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
+
+    def _handle_sigchld(self, signum, frm):
+        # We don't actually do anything here, we just want an interrupt (EINTR)
+        # on socket.accept() when SIGCHLD occurs.
+        pass
+
+    def _handle_sigterm(self, signum, frm):
+        # Unregister this as the default handler, 2 SIGTERMs will exit us.
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        # SIGTERM should also generate EINTR on our wait loop, so this should
+        # be enough
+        self._should_terminate.set()
+
+    def _register_signals(self):
+        """Register a SIGCHILD and SIGTERM handler.
+
+        If we have a trigger for SIGCHILD then we can quickly respond to
+        clients when their process exits. The main risk is getting more EAGAIN
+        errors elsewhere.
+
+        SIGTERM allows us to cleanup nicely before we exit.
+        """
+        signal.signal(signal.SIGCHLD, self._handle_sigchld)
+        signal.signal(signal.SIGTERM, self._handle_sigterm)
+
+    def _unregister_signals(self):
+        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+    def _create_child_file_descriptors(self, base_path):
+        stdin_path = os.path.join(base_path, 'stdin')
+        stdout_path = os.path.join(base_path, 'stdout')
+        stderr_path = os.path.join(base_path, 'stderr')
+        os.mkfifo(stdin_path)
+        os.mkfifo(stdout_path)
+        os.mkfifo(stderr_path)
+
+    def _bind_child_file_descriptors(self, base_path):
+        import logging
+        from bzrlib import ui
+        stdin_path = os.path.join(base_path, 'stdin')
+        stdout_path = os.path.join(base_path, 'stdout')
+        stderr_path = os.path.join(base_path, 'stderr')
+        # Opening for writing blocks (or fails), so do those last
+        # TODO: Consider buffering, though that might interfere with reading
+        #       and writing the smart protocol
+        stdin_fid = os.open(stdin_path, os.O_RDONLY)
+        stdout_fid = os.open(stdout_path, os.O_WRONLY)
+        stderr_fid = os.open(stderr_path, os.O_WRONLY)
+        # XXX: Cheap hack. by this point bzrlib has opened stderr for logging
+        #      (as part of starting the service process in the first place). As
+        #      such, it has a stream handler that writes to stderr. logging
+        #      tries to flush and close that, but the file is already closed.
+        #      This just supresses that exception
+        logging.raiseExceptions = False
+        sys.stdin.close()
+        sys.stdout.close()
+        sys.stderr.close()
+        os.dup2(stdin_fid, 0)
+        os.dup2(stdout_fid, 1)
+        os.dup2(stderr_fid, 2)
+        sys.stdin = os.fdopen(stdin_fid, 'rb')
+        sys.stdout = os.fdopen(stdout_fid, 'wb')
+        sys.stderr = os.fdopen(stderr_fid, 'wb')
+        ui.ui_factory.stdin = sys.stdin
+        ui.ui_factory.stdout = sys.stdout
+        ui.ui_factory.stderr = sys.stderr
+        # Now that we've opened the handles, delete everything so that we don't
+        # leave garbage around. Because the open() is done in blocking mode, we
+        # know that someone has already connected to them, and we don't want
+        # anyone else getting confused and connecting.
+        # See [Decision #5]
+        os.remove(stderr_path)
+        os.remove(stdout_path)
+        os.remove(stdin_path)
+        os.rmdir(base_path)
+
+    def _close_child_file_descriptons(self):
+        sys.stdin.close()
+        sys.stderr.close()
+        sys.stdout.close()
+
+    def become_child(self, command_argv, path):
+        """We are in the spawned child code, do our magic voodoo."""
+        # Stop tracking new signals
+        self._unregister_signals()
+        # Reset the start time
+        trace._bzr_log_start_time = time.time()
+        trace.mutter('%d starting %r'
+                     % (os.getpid(), command_argv,))
+        self.host = None
+        self.port = None
+        self._sockname = None
+        self._bind_child_file_descriptors(path)
+        self._run_child_command(command_argv)
+
+    def _run_child_command(self, command_argv):
+        # This is the point where we would actually want to do something with
+        # our life
+        # TODO: We may want to consider special-casing the 'lp-serve' command.
+        #       As that is the primary use-case for this service, it might be
+        #       interesting to have an already-instantiated instance, where we
+        #       can just pop on an extra argument and be ready to go. However,
+        #       that would probably only really be measurable if we prefork. As
+        #       it looks like ~200ms is 'fork()' time, but only 50ms is
+        #       run-the-command time.
+        retcode = commands.run_bzr_catch_errors(command_argv)
+        self._close_child_file_descriptons()
+        trace.mutter('%d finished %r'
+                     % (os.getpid(), command_argv,))
+        # We force os._exit() here, because we don't want to unwind the stack,
+        # which has complex results. (We can get it to unwind back to the
+        # cmd_launchpad_forking_service code, and even back to main() reporting
+        # thereturn code, but after that, suddenly the return code changes from
+        # a '0' to a '1', with no logging of info.
+        # TODO: Should we call sys.exitfunc() here? it allows atexit functions
+        #       to fire, however, some of those may be still around from the
+        #       parent process, which we don't really want.
+        ## sys.exitfunc()
+        # See [Decision #6]
+        os._exit(retcode)
+
+    @staticmethod
+    def command_to_argv(command_str):
+        """Convert a 'foo bar' style command to [u'foo', u'bar']"""
+        # command_str must be a utf-8 string
+        return [s.decode('utf-8') for s in shlex.split(command_str)]
+
+    @staticmethod
+    def parse_env(env_str):
+        """Convert the environment information into a dict.
+
+        :param env_str: A string full of environment variable declarations.
+            Each key is simple ascii "key: value\n"
+            The string must end with "end\n".
+        :return: A dict of environment variables
+        """
+        # See [Decision #8]
+        env = {}
+        if not env_str.endswith('end\n'):
+            raise ValueError('Invalid env-str: %r' % (env_str,))
+        env_str = env_str[:-5]
+        if not env_str:
+            return env
+        env_entries = env_str.split('\n')
+        for entry in env_entries:
+            key, value = entry.split(': ', 1)
+            env[key] = value
+        return env
+
+    def fork_one_request(self, conn, client_addr, command_argv, env):
+        """Fork myself and serve a request."""
+        temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
+        # Now that we've set everything up, send the response to the client we
+        # create them first, so the client can start trying to connect to them,
+        # while we fork and have the child do the same.
+        self._children_spawned += 1
+        pid = self._fork_function()
+        if pid == 0:
+            pid = os.getpid()
+            trace.mutter('%d spawned' % (pid,))
+            self._server_socket.close()
+            for env_var, value in env.iteritems():
+                osutils.set_or_unset_env(env_var, value)
+            # See [Decision #3]
+            self._create_child_file_descriptors(temp_name)
+            conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
+            conn.close()
+            self.become_child(command_argv, temp_name)
+            trace.warning('become_child returned!!!')
+            sys.exit(1)
+        else:
+            self._child_processes[pid] = (temp_name, conn)
+            self.log(client_addr, 'Spawned process %s for %r: %s'
+                            % (pid, command_argv, temp_name))
+
+    def main_loop(self):
+        self._should_terminate.clear()
+        self._register_signals()
+        self._create_master_socket()
+        trace.note('Listening on port: %s' % (self.port,))
+        try:
+            try:
+                self._do_loop()
+            finally:
+                # Stop talking to others, we are shutting down
+                self._server_socket.close()
+        except KeyboardInterrupt:
+            # SIGINT received, try to shutdown cleanly
+            pass
+        trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
+                   % (self.WAIT_FOR_CHILDREN_TIMEOUT,
+                      len(self._child_processes),))
+        self._shutdown_children()
+        trace.note('Exiting')
+
+    def _do_loop(self):
+        while not self._should_terminate.isSet():
+            try:
+                conn, client_addr = self._server_socket.accept()
+            except self._socket_timeout:
+                pass # run shutdown and children checks
+            except self._socket_error, e:
+                if e.args[0] == errno.EINTR:
+                    pass # run shutdown and children checks
+                elif e.args[0] != errno.EBADF:
+                    # We can get EBADF here while we are shutting down
+                    # So we just ignore it for now
+                    pass
+                else:
+                    # Log any other failure mode
+                    trace.warning("listening socket error: %s", e)
+            else:
+                self.log(client_addr, 'connected')
+                # TODO: We should probably trap exceptions coming out of this
+                #       and log them, so that we don't kill the service because
+                #       of an unhandled error
+                # Note: settimeout is used so that a malformed request doesn't
+                #       cause us to hang forever. Note that the particular
+                #       implementation means that a malicious client could
+                #       probably send us one byte every Xms, and we would just
+                #       keep trying to read it. However, as a local service, we
+                #       aren't worrying about it.
+                conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
+                try:
+                    self.serve_one_connection(conn, client_addr)
+                except self._socket_timeout, e:
+                    trace.log_exception_quietly()
+                    self.log(client_addr, 'request timeout failure: %s' % (e,))
+                    conn.sendall('FAILURE\nrequest timed out\n')
+                    conn.close()
+            self._poll_children()
+
+    def log(self, client_addr, message):
+        """Log a message to the trace log.
+
+        Include the information about what connection is being served.
+        """
+        if client_addr is not None:
+            # Note, we don't use conn.getpeername() because if a client
+            # disconnects before we get here, that raises an exception
+            peer_host, peer_port = client_addr
+            conn_info = '[%s:%d] ' % (peer_host, peer_port)
+        else:
+            conn_info = ''
+        trace.mutter('%s%s' % (conn_info, message))
+
+    def log_information(self):
+        """Log the status information.
+
+        This includes stuff like number of children, and ... ?
+        """
+        self._poll_children()
+        self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
+        self.log(None, '%d children currently running (spawned %d total)'
+                       % (len(self._child_processes), self._children_spawned))
+        # Read the current information about memory consumption, etc.
+        self.log(None, 'Self: %s'
+                       % (resource.getrusage(resource.RUSAGE_SELF),))
+        # This seems to be the sum of all rusage for all children that have
+        # been collected (not for currently running children, or ones we
+        # haven't "wait"ed on.) We may want to read /proc/PID/status, since
+        # 'live' information is probably more useful.
+        self.log(None, 'Finished children: %s'
+                       % (resource.getrusage(resource.RUSAGE_CHILDREN),))
+
+    def _poll_children(self):
+        """See if children are still running, etc.
+
+        One interesting hook here would be to track memory consumption, etc.
+        """
+        to_remove = []
+        while self._child_processes:
+            try:
+                c_id, exit_code, rusage = os.wait3(os.WNOHANG)
+            except OSError, e:
+                if e.errno == errno.ECHILD:
+                    # TODO: We handle this right now because the test suite
+                    #       fakes a child, since we wanted to test some code
+                    #       without actually forking anything
+                    trace.mutter('_poll_children() called, and'
+                        ' self._child_processes indicates there are'
+                        ' children, but os.wait3() says there are not.'
+                        ' current_children: %s' % (self._child_processes,))
+                    return
+            if c_id == 0:
+                # No more children stopped right now
+                return
+            c_path, sock = self._child_processes.pop(c_id)
+            trace.mutter('%s exited %s and usage: %s'
+                         % (c_id, exit_code, rusage))
+            # See [Decision #4]
+            try:
+                sock.sendall('exited\n%s\n' % (exit_code,))
+            except (self._socket_timeout, self._socket_error), e:
+                # The client disconnected before we wanted them to,
+                # no big deal
+                trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
+            else:
+                sock.close()
+            if os.path.exists(c_path):
+                # The child failed to cleanup after itself, do the work here
+                trace.warning('Had to clean up after child %d: %s\n'
+                              % (c_id, c_path))
+                shutil.rmtree(c_path)
+
+    def _wait_for_children(self, secs):
+        start = time.time()
+        end = start + secs
+        while self._child_processes:
+            self._poll_children()
+            if secs > 0 and time.time() > end:
+                break
+            time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+
+    def _shutdown_children(self):
+        self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
+        if self._child_processes:
+            trace.warning('Failed to stop children: %s'
+                % ', '.join(map(str, self._child_processes)))
+            for c_id in self._child_processes:
+                trace.warning('sending SIGINT to %d' % (c_id,))
+                os.kill(c_id, signal.SIGINT)
+            # We sent the SIGINT signal, see if they exited
+            self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+        if self._child_processes:
+            # No? Then maybe something more powerful
+            for c_id in self._child_processes:
+                trace.warning('sending SIGKILL to %d' % (c_id,))
+                os.kill(c_id, signal.SIGKILL)
+            # We sent the SIGKILL signal, see if they exited
+            self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+        if self._child_processes:
+            for c_id, (c_path, sock) in self._child_processes.iteritems():
+                # TODO: We should probably put something into this message?
+                #       However, the likelyhood is very small that this isn't
+                #       already closed because of SIGKILL + _wait_for_children
+                #       And I don't really know what to say...
+                sock.close()
+                if os.path.exists(c_path):
+                    trace.warning('Cleaning up after immortal child %d: %s\n'
+                                  % (c_id, c_path))
+                    shutil.rmtree(c_path)
+
+    def _parse_fork_request(self, conn, client_addr, request):
+        if request.startswith('fork-env '):
+            while not request.endswith('end\n'):
+                request += osutils.read_bytes_from_socket(conn)
+                request = request.replace('\r\n', '\n')
+            command, env = request[9:].split('\n', 1)
+        else:
+            command = request[5:].strip()
+            env = 'end\n' # No env set
+        try:
+            command_argv = self.command_to_argv(command)
+            env = self.parse_env(env)
+        except Exception, e:
+            # TODO: Log the traceback?
+            self.log(client_addr, 'command or env parsing failed: %r'
+                                  % (str(e),))
+            conn.sendall('FAILURE\ncommand or env parsing failed: %r'
+                         % (str(e),))
+        else:
+            return command_argv, env
+        return None, None
+
+    def serve_one_connection(self, conn, client_addr):
+        request = ''
+        while '\n' not in request:
+            request += osutils.read_bytes_from_socket(conn)
+        # telnet likes to use '\r\n' rather than '\n', and it is nice to have
+        # an easy way to debug.
+        request = request.replace('\r\n', '\n')
+        self.log(client_addr, 'request: %r' % (request,))
+        if request == 'hello\n':
+            conn.sendall('ok\nyep, still alive\n')
+            self.log_information()
+        elif request == 'quit\n':
+            self._should_terminate.set()
+            conn.sendall('ok\nquit command requested... exiting\n')
+        elif request.startswith('fork ') or request.startswith('fork-env '):
+            command_argv, env = self._parse_fork_request(conn, client_addr,
+                                                         request)
+            if command_argv is not None:
+                # See [Decision #7]
+                # TODO: Do we want to limit the number of children? And/or
+                #       prefork additional instances? (the design will need to
+                #       change if we prefork and run arbitrary commands.)
+                self.fork_one_request(conn, client_addr, command_argv, env)
+                # We don't close the conn like other code paths, since we use
+                # it again later.
+                return
+        else:
+            self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
+            # See [Decision #9]
+            conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
+        conn.close()
+
+
+class cmd_launchpad_forking_service(Command):
+    """Launch a long-running process, where you can ask for new processes.
+
+    The process will block on a given --port waiting for requests to be made.
+    When a request is made, it will fork itself and redirect stdout/in/err to
+    fifos on the filesystem, and start running the requseted command. The
+    caller will be informed where those file handles can be found. Thus it only
+    makes sense that the process connecting to the port must be on the same
+    system.
+    """
+
+    aliases = ['lp-service']
+
+    takes_options = [Option('port',
+                        help='Listen for connections on [host:]portnumber',
+                        type=str),
+                     Option('preload',
+                        help="Do/don't preload libraries before startup."),
+                     Option('children-timeout', type=int,
+                        help="Only wait XX seconds for children to exit"),
+                    ]
+
+    def _preload_libraries(self):
+        global libraries_to_preload
+        for pyname in libraries_to_preload:
+            try:
+                __import__(pyname)
+            except ImportError, e:
+                trace.mutter('failed to preload %s: %s' % (pyname, e))
+
+    def _get_host_and_port(self, port):
+        host = None
+        if port is not None:
+            if ':' in port:
+                host, port = port.rsplit(':', 1)
+            port = int(port)
+        return host, port
+
+    def run(self, port=None, preload=True,
+            children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT):
+        host, port = self._get_host_and_port(port)
+        if preload:
+            # We note this because it often takes a fair amount of time.
+            trace.note('Preloading %d modules' % (len(libraries_to_preload),))
+            self._preload_libraries()
+        service = LPForkingService(host, port)
+        service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
+        service.main_loop()
+
+register_command(cmd_launchpad_forking_service)
+
+
+class cmd_launchpad_replay(Command):
+    """Write input from stdin back to stdout or stderr.
+
+    This is a hidden command, primarily available for testing
+    cmd_launchpad_forking_service.
+    """
+
+    hidden = True
+
+    def run(self):
+        # Just read line-by-line from stdin, and write out to stdout or stderr
+        # depending on the prefix
+        for line in sys.stdin:
+            channel, contents = line.split(' ', 1)
+            channel = int(channel)
+            if channel == 1:
+                sys.stdout.write(contents)
+                sys.stdout.flush()
+            elif channel == 2:
+                sys.stderr.write(contents)
+                sys.stderr.flush()
+            else:
+                raise RuntimeError('Invalid channel request.')
+        return 0
+
+register_command(cmd_launchpad_replay)
+
+libraries_to_preload = [
+    'bzrlib.errors',
+    'bzrlib.repofmt.groupcompress_repo',
+    'bzrlib.repository',
+    'bzrlib.smart',
+    'bzrlib.smart.protocol',
+    'bzrlib.smart.request',
+    'bzrlib.smart.server',
+    'bzrlib.smart.vfs',
+    'bzrlib.transport.local',
+    'bzrlib.transport.readonly',
+    'lp.codehosting.bzrutils',
+    'lp.codehosting.vfs',
+    'lp.codehosting.vfs.branchfs',
+    'lp.codehosting.vfs.branchfsclient',
+    'lp.codehosting.vfs.hooks',
+    'lp.codehosting.vfs.transport',
+    ]
+
+
+
+def load_tests(standard_tests, module, loader):
+    standard_tests.addTests(loader.loadTestsFromModuleNames(
+        [__name__ + '.' + x for x in [
+            'test_lpserve',
+        ]]))
+    return standard_tests

=== added file 'bzrplugins/lpserve/test_lpserve.py'
--- bzrplugins/lpserve/test_lpserve.py	1970-01-01 00:00:00 +0000
+++ bzrplugins/lpserve/test_lpserve.py	2010-09-17 20:34:36 +0000
@@ -0,0 +1,541 @@
+# Copyright 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import os
+import signal
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+
+from testtools import content
+
+from bzrlib import (
+    osutils,
+    tests,
+    trace,
+    )
+from bzrlib.plugins import lpserve
+
+from canonical.config import config
+from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess
+
+
+class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
+    """Wrap starting and stopping an LPForkingService instance in a thread."""
+
+    # For testing, we set the timeouts much lower, because we want the tests to
+    # run quickly
+    WAIT_FOR_CHILDREN_TIMEOUT = 0.5
+    SOCKET_TIMEOUT = 0.01
+    SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
+    WAIT_FOR_REQUEST_TIMEOUT = 0.1
+
+    _fork_function = None
+
+    def __init__(self, host='127.0.0.1', port=0):
+        self.service_started = threading.Event()
+        self.service_stopped = threading.Event()
+        self.this_thread = None
+        self.fork_log = []
+        super(TestingLPForkingServiceInAThread, self).__init__(host=host,
+                                                               port=port)
+
+    def _register_signals(self):
+        pass # Don't register it for the test suite
+
+    def _unregister_signals(self):
+        pass # We don't fork, and didn't register, so don't unregister
+
+    def _create_master_socket(self):
+        trace.mutter('creating master socket')
+        super(TestingLPForkingServiceInAThread, self)._create_master_socket()
+        trace.mutter('setting service_started')
+        self.service_started.set()
+
+    def main_loop(self):
+        self.service_stopped.clear()
+        super(TestingLPForkingServiceInAThread, self).main_loop()
+        self.service_stopped.set()
+
+    def fork_one_request(self, conn, client_addr, command, env):
+        # We intentionally don't allow the test suite to request a fork, as
+        # threads + forks and everything else don't exactly play well together
+        self.fork_log.append((command, env))
+        conn.sendall('ok\nfake forking\n')
+        conn.close()
+
+    @staticmethod
+    def start_service(test):
+        """Start a new LPForkingService in a thread on a random port.
+
+        This will block until the service has created its socket, and is ready
+        to communicate.
+
+        :return: A new TestingLPForkingServiceInAThread instance
+        """
+        # Allocate a new port on only the loopback device
+        new_service = TestingLPForkingServiceInAThread()
+        thread = threading.Thread(target=new_service.main_loop,
+                                  name='TestingLPForkingServiceInAThread')
+        new_service.this_thread = thread
+        # should we be doing thread.setDaemon(True) ?
+        thread.start()
+        new_service.service_started.wait(10.0)
+        if not new_service.service_started.isSet():
+            raise RuntimeError(
+                'Failed to start the TestingLPForkingServiceInAThread')
+        test.addCleanup(new_service.stop_service)
+        # what about returning new_service._sockname ?
+        return new_service
+
+    def stop_service(self):
+        """Stop the test-server thread. This can be called multiple times."""
+        if self.this_thread is None:
+            # We already stopped the process
+            return
+        self._should_terminate.set()
+        self.service_stopped.wait(10.0)
+        if not self.service_stopped.isSet():
+            raise RuntimeError(
+                'Failed to stop the TestingLPForkingServiceInAThread')
+        self.this_thread.join()
+        # Break any refcycles
+        self.this_thread = None
+
+
+class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
+
+    def test_start_and_stop_service(self):
+        service = TestingLPForkingServiceInAThread.start_service(self)
+        service.stop_service()
+
+    def test_multiple_stops(self):
+        service = TestingLPForkingServiceInAThread.start_service(self)
+        service.stop_service()
+        service.stop_service()
+
+    def test_autostop(self):
+        # We shouldn't leak a thread here, as it should be part of the test
+        # case teardown.
+        service = TestingLPForkingServiceInAThread.start_service(self)
+
+
+class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
+
+    def setUp(self):
+        super(TestCaseWithLPForkingService, self).setUp()
+        self.service = TestingLPForkingServiceInAThread.start_service(self)
+
+    def send_message_to_service(self, message, one_byte_at_a_time=False):
+        host, port = self.service._sockname
+        addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+        (family, socktype, proto, canonname, sockaddr) = addrs[0]
+        client_sock = socket.socket(family, socktype, proto)
+        client_sock.connect(sockaddr)
+        if one_byte_at_a_time:
+            for byte in message:
+                client_sock.send(byte)
+        else:
+            client_sock.sendall(message)
+        response = client_sock.recv(1024)
+        return response
+
+
+class TestLPForkingServiceCommandToArgv(tests.TestCase):
+
+    def assertAsArgv(self, argv, command_str):
+        self.assertEqual(argv,
+            lpserve.LPForkingService.command_to_argv(command_str))
+
+    def test_simple(self):
+        self.assertAsArgv([u'foo'], 'foo')
+        self.assertAsArgv([u'foo', u'bar'], 'foo bar')
+
+    def test_quoted(self):
+        self.assertAsArgv([u'foo'], 'foo')
+        self.assertAsArgv([u'foo bar'], '"foo bar"')
+
+    def test_unicode(self):
+        self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
+
+
+class TestLPForkingServiceParseEnv(tests.TestCase):
+
+    def assertEnv(self, env, env_str):
+        self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
+
+    def assertInvalid(self, env_str):
+        self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
+                                      env_str)
+
+    def test_no_entries(self):
+        self.assertEnv({}, 'end\n')
+
+    def test_one_entries(self):
+        self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx'},
+                       'BZR_EMAIL: joe@xxxxxxx\n'
+                       'end\n')
+
+    def test_two_entries(self):
+        self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx', 'BAR': 'foo'},
+                       'BZR_EMAIL: joe@xxxxxxx\n'
+                       'BAR: foo\n'
+                       'end\n')
+
+    def test_invalid_empty(self):
+        self.assertInvalid('')
+
+    def test_invalid_end(self):
+        self.assertInvalid("BZR_EMAIL: joe@xxxxxxx\n")
+
+    def test_invalid_entry(self):
+        self.assertInvalid("BZR_EMAIL joe@xxxxxxx\nend\n")
+
+
+class TestLPForkingService(TestCaseWithLPForkingService):
+
+    def test_send_quit_message(self):
+        response = self.send_message_to_service('quit\n')
+        self.assertEqual('ok\nquit command requested... exiting\n', response)
+        self.service.service_stopped.wait(10.0)
+        self.assertTrue(self.service.service_stopped.isSet())
+
+    def test_send_invalid_message_fails(self):
+        response = self.send_message_to_service('unknown\n')
+        self.assertStartsWith(response, 'FAILURE')
+
+    def test_send_hello_heartbeat(self):
+        response = self.send_message_to_service('hello\n')
+        self.assertEqual('ok\nyep, still alive\n', response)
+
+    def test_hello_supports_crlf(self):
+        # telnet seems to always write in text mode. It is nice to be able to
+        # debug with simple telnet, so lets support it.
+        response = self.send_message_to_service('hello\r\n')
+        self.assertEqual('ok\nyep, still alive\n', response)
+
+    def test_send_simple_fork(self):
+        response = self.send_message_to_service('fork rocks\n')
+        self.assertEqual('ok\nfake forking\n', response)
+        self.assertEqual([(['rocks'], {})], self.service.fork_log)
+
+    def test_send_fork_env_with_empty_env(self):
+        response = self.send_message_to_service(
+            'fork-env rocks\n'
+            'end\n')
+        self.assertEqual('ok\nfake forking\n', response)
+        self.assertEqual([(['rocks'], {})], self.service.fork_log)
+
+    def test_send_fork_env_with_env(self):
+        response = self.send_message_to_service(
+            'fork-env rocks\n'
+            'BZR_EMAIL: joe@xxxxxxxxxxx\n'
+            'end\n')
+        self.assertEqual('ok\nfake forking\n', response)
+        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+                         self.service.fork_log)
+
+    def test_send_fork_env_slowly(self):
+        response = self.send_message_to_service(
+            'fork-env rocks\n'
+            'BZR_EMAIL: joe@xxxxxxxxxxx\n'
+            'end\n', one_byte_at_a_time=True)
+        self.assertEqual('ok\nfake forking\n', response)
+        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+                         self.service.fork_log)
+        # It should work with 'crlf' endings as well
+        response = self.send_message_to_service(
+            'fork-env rocks\r\n'
+            'BZR_EMAIL: joe@xxxxxxxxxxx\r\n'
+            'end\r\n', one_byte_at_a_time=True)
+        self.assertEqual('ok\nfake forking\n', response)
+        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'}),
+                          (['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+                         self.service.fork_log)
+
+    def test_send_incomplete_fork_env_timeout(self):
+        # We should get a failure message if we can't quickly read the whole
+        # content
+        response = self.send_message_to_service(
+            'fork-env rocks\n'
+            'BZR_EMAIL: joe@xxxxxxxxxxx\n',
+            one_byte_at_a_time=True)
+        # Note that we *don't* send a final 'end\n'
+        self.assertStartsWith(response, 'FAILURE\n')
+
+    def test_send_incomplete_request_timeout(self):
+        # Requests end with '\n', send one without it
+        response = self.send_message_to_service('hello',
+                                                one_byte_at_a_time=True)
+        self.assertStartsWith(response, 'FAILURE\n')
+
+
+class TestCaseWithSubprocess(tests.TestCaseWithTransport):
+    """Override the bzr start_bzr_subprocess command.
+
+    The launchpad infrastructure requires a fair amount of configuration to get
+    paths, etc correct. So this provides that work.
+    """
+
+    def get_python_path(self):
+        """Return the path to the Python interpreter."""
+        return '%s/bin/py' % config.root
+
+    def start_bzr_subprocess(self, process_args, env_changes=None,
+                             working_dir=None):
+        """Start bzr in a subprocess for testing.
+
+        Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
+        This version removes some of the skipping stuff, some of the
+        irrelevant comments (e.g. about win32) and uses Launchpad's own
+        mechanisms for getting the path to 'bzr'.
+
+        Comments starting with 'LAUNCHPAD' are comments about our
+        modifications.
+        """
+        if env_changes is None:
+            env_changes = {}
+        env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
+        old_env = {}
+
+        def cleanup_environment():
+            for env_var, value in env_changes.iteritems():
+                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
+
+        def restore_environment():
+            for env_var, value in old_env.iteritems():
+                osutils.set_or_unset_env(env_var, value)
+
+        cwd = None
+        if working_dir is not None:
+            cwd = osutils.getcwd()
+            os.chdir(working_dir)
+
+        # LAUNCHPAD: Because of buildout, we need to get a custom Python
+        # binary, not sys.executable.
+        python_path = self.get_python_path()
+        # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
+        # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
+        bzr_path = get_bzr_path()
+        try:
+            cleanup_environment()
+            command = [python_path, bzr_path]
+            command.extend(process_args)
+            process = self._popen(
+                command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE)
+        finally:
+            restore_environment()
+            if cwd is not None:
+                os.chdir(cwd)
+
+        return process
+
+
+class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
+    """Tests will get a separate process to communicate to.
+
+    The number of these tests should be small, because it is expensive to start
+    and stop the daemon.
+
+    TODO: This should probably use testresources, or layers somehow...
+    """
+
+    def setUp(self):
+        super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
+        self.service_process, self.service_port = self.start_service_subprocess()
+        self.addCleanup(self.stop_service)
+
+    def start_conversation(self, message, one_byte_at_a_time=False):
+        """Start talking to the service, and get the initial response."""
+        addrs = socket.getaddrinfo('127.0.0.1', self.service_port,
+            socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+        (family, socktype, proto, canonname, sockaddr) = addrs[0]
+        client_sock = socket.socket(family, socktype, proto)
+        trace.mutter('sending %r to port %s' % (message, self.service_port))
+        client_sock.connect(sockaddr)
+        if one_byte_at_a_time:
+            for byte in message:
+                client_sock.send(byte)
+        else:
+            client_sock.sendall(message)
+        response = client_sock.recv(1024)
+        trace.mutter('response: %r' % (response,))
+        if response.startswith("FAILURE"):
+            raise RuntimeError('Failed to send message: %r' % (response,))
+        return response, client_sock
+
+    def send_message_to_service(self, message, one_byte_at_a_time=False):
+        response, client_sock = self.start_conversation(message,
+            one_byte_at_a_time=one_byte_at_a_time)
+        client_sock.close()
+        return response
+
+    def send_fork_request(self, command, env=None):
+        if env is not None:
+            request_lines = ['fork-env %s\n' % (command,)]
+            for key, value in env.iteritems():
+                request_lines.append('%s: %s\n' % (key, value))
+            request_lines.append('end\n')
+            request = ''.join(request_lines)
+        else:
+            request = 'fork %s\n' % (command,)
+        response, sock = self.start_conversation(request)
+        ok, pid, path, tail = response.split('\n')
+        self.assertEqual('ok', ok)
+        self.assertEqual('', tail)
+        # Don't really care what it is, but should be an integer
+        pid = int(pid)
+        path = path.strip()
+        self.assertContainsRe(path, '/lp-forking-service-child-')
+        return path, pid, sock
+
+    def start_service_subprocess(self):
+        # Make sure this plugin is exposed to the subprocess
+        # SLOOWWW (~2.4 seconds, which is why we are doing the work anyway)
+        fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
+        # I'm not 100% sure about when cleanup runs versus addDetail, but I
+        # think this will work.
+        self.addCleanup(os.remove, tempname)
+        def read_log():
+            f = os.fdopen(fd)
+            f.seek(0)
+            content = f.read()
+            f.close()
+            return [content]
+        self.addDetail('server-log', content.Content(
+            content.ContentType('text', 'plain', {"charset": "utf8"}),
+            read_log))
+        env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0],
+                       'BZR_LOG': tempname}
+        proc = self.start_bzr_subprocess(
+            ['lp-service', '--port', '127.0.0.1:0', '--no-preload',
+             '--children-timeout=1'],
+            env_changes=env_changes)
+        trace.mutter('started lp-service subprocess')
+        # preload_line = proc.stderr.readline()
+        # self.assertStartsWith(preload_line, 'Preloading')
+        prefix = 'Listening on port: '
+        port_line = proc.stderr.readline()
+        self.assertStartsWith(port_line, prefix)
+        port = int(port_line[len(prefix):])
+        trace.mutter(port_line)
+        return proc, port
+
+    def stop_service(self):
+        if self.service_process is None:
+            # Already stopped
+            return
+        # First, try to stop the service gracefully, by sending a 'quit'
+        # message
+        try:
+            response = self.send_message_to_service('quit\n')
+        except socket.error, e:
+            # Ignore a failure to connect, the service must be stopping/stopped
+            # already
+            response = None
+        tend = time.time() + 10.0
+        while self.service_process.poll() is None:
+            if time.time() > tend:
+                self.finish_bzr_subprocess(process=self.service_process,
+                    send_signal=signal.SIGINT, retcode=3)
+                self.fail('Failed to quit gracefully after 10.0 seconds')
+            time.sleep(0.1)
+        if response is not None:
+            self.assertEqual('ok\nquit command requested... exiting\n',
+                             response)
+
+    def _get_fork_handles(self, path):
+        trace.mutter('getting handles for: %s' % (path,))
+        stdin_path = os.path.join(path, 'stdin')
+        stdout_path = os.path.join(path, 'stdout')
+        stderr_path = os.path.join(path, 'stderr')
+        # Consider the ordering, the other side should open 'stdin' first, but
+        # we want it to block until we open the last one, or we race and it can
+        # delete the other handles before we get to open them.
+        child_stdin = open(stdin_path, 'wb')
+        child_stdout = open(stdout_path, 'rb')
+        child_stderr = open(stderr_path, 'rb')
+        return child_stdin, child_stdout, child_stderr
+
+    def communicate_with_fork(self, path, stdin=None):
+        child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
+        if stdin is not None:
+            child_stdin.write(stdin)
+        child_stdin.close()
+        stdout_content = child_stdout.read()
+        stderr_content = child_stderr.read()
+        return stdout_content, stderr_content
+
+    def assertReturnCode(self, expected_code, sock):
+        """Assert that we get the expected return code as a message."""
+        response = sock.recv(1024)
+        self.assertStartsWith(response, 'exited\n')
+        code = int(response.split('\n', 1)[1])
+        self.assertEqual(expected_code, code)
+
+    def test_fork_lp_serve_hello(self):
+        path, _, sock = self.send_fork_request('lp-serve --inet 2')
+        stdout_content, stderr_content = self.communicate_with_fork(path,
+            'hello\n')
+        self.assertEqual('ok\x012\n', stdout_content)
+        self.assertEqual('', stderr_content)
+        self.assertReturnCode(0, sock)
+
+    def test_fork_replay(self):
+        path, _, sock = self.send_fork_request('launchpad-replay')
+        stdout_content, stderr_content = self.communicate_with_fork(path,
+            '1 hello\n2 goodbye\n1 maybe\n')
+        self.assertEqualDiff('hello\nmaybe\n', stdout_content)
+        self.assertEqualDiff('goodbye\n', stderr_content)
+        self.assertReturnCode(0, sock)
+
+    def test_just_run_service(self):
+        # Start and stop are defined in setUp()
+        pass
+
+    def test_fork_multiple_children(self):
+        paths = []
+        for idx in range(4):
+            paths.append(self.send_fork_request('launchpad-replay'))
+        for idx in [3, 2, 0, 1]:
+            p, pid, sock = paths[idx]
+            stdout_msg = 'hello %d\n' % (idx,)
+            stderr_msg = 'goodbye %d\n' % (idx+1,)
+            stdout, stderr = self.communicate_with_fork(p,
+                '1 %s2 %s' % (stdout_msg, stderr_msg))
+            self.assertEqualDiff(stdout_msg, stdout)
+            self.assertEqualDiff(stderr_msg, stderr)
+            self.assertReturnCode(0, sock)
+
+    def test_fork_respects_env_vars(self):
+        path, pid, sock = self.send_fork_request('whoami',
+            env={'BZR_EMAIL': 'this_test@xxxxxxxxxxx'})
+        stdout_content, stderr_content = self.communicate_with_fork(path)
+        self.assertEqual('', stderr_content)
+        self.assertEqual('this_test@xxxxxxxxxxx\n', stdout_content)
+
+    def _check_exits_nicely(self, sig_id):
+        path, _, sock = self.send_fork_request('rocks')
+        self.assertEqual(None, self.service_process.poll())
+        # Now when we send SIGTERM, it should wait for the child to exit,
+        # before it tries to exit itself.
+        # In python2.6+ we could use self.service_process.terminate()
+        os.kill(self.service_process.pid, sig_id)
+        self.assertEqual(None, self.service_process.poll())
+        # Now talk to the child, so the service can close
+        stdout_content, stderr_content = self.communicate_with_fork(path)
+        self.assertEqual('It sure does!\n', stdout_content)
+        self.assertEqual('', stderr_content)
+        self.assertReturnCode(0, sock)
+        # And the process should exit cleanly
+        self.assertEqual(0, self.service_process.wait())
+
+    def test_sigterm_exits_nicely(self):
+        self._check_exits_nicely(signal.SIGTERM)
+
+    def test_sigint_exits_nicely(self):
+        self._check_exits_nicely(signal.SIGINT)

=== modified file 'configs/development/launchpad-lazr.conf'
--- configs/development/launchpad-lazr.conf	2010-09-02 16:21:05 +0000
+++ configs/development/launchpad-lazr.conf	2010-09-17 20:34:36 +0000
@@ -76,6 +76,7 @@
 lp_url_hosts: dev
 access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log
 blacklisted_hostnames:
+use_forking_daemon: True
 
 [codeimport]
 bazaar_branch_store: file:///tmp/bazaar-branches

=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf	2010-09-17 09:16:27 +0000
+++ lib/canonical/config/schema-lazr.conf	2010-09-17 20:34:36 +0000
@@ -301,6 +301,18 @@
 # datatype: string
 logfile: -
 
+# The location of the log file used by the LaunchpadForkingService
+# datatype: string
+forker_logfile: -
+
+# Should we be using the forking daemon? Or should we be calling spawnProcess
+# instead?
+# datatype: boolean
+use_forking_daemon: False
+# What host address and port is the daemon running on
+# datatype: string
+forking_daemon_address: 127.0.0.1:4156
+
 # The prefix of the web URL for all public branches. This should end with a
 # slash.
 #

=== modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py'
--- lib/canonical/launchpad/scripts/runlaunchpad.py	2010-08-20 20:31:18 +0000
+++ lib/canonical/launchpad/scripts/runlaunchpad.py	2010-09-17 20:34:36 +0000
@@ -180,6 +180,56 @@
         stop_at_exit(process)
 
 
+class ForkingSessionService(Service):
+    """A lp-forking-service for handling ssh access."""
+
+    # TODO: The SFTP (and bzr+ssh) server depends fairly heavily on this
+    #       service. It would seem reasonable to make one always start if the
+    #       other one is started. Though this might be a way to "FeatureFlag"
+    #       whether this is active or not.
+
+    @property
+    def should_launch(self):
+        # We tie this directly into the SFTP service. If one should launch,
+        # then both of them should
+        return (config.codehosting.launch and
+                config.codehosting.use_forking_daemon)
+
+    @property
+    def logfile(self):
+        """Return the log file to use.
+
+        Default to the value of the configuration key logfile.
+        """
+        return config.codehosting.forker_logfile
+
+    def launch(self):
+        # Following the logic in TacFile. Specifically, if you configure sftp
+        # to not run (and thus bzr+ssh) then we don't want to run the forking
+        # service.
+        if not self.should_launch:
+            return
+        from lp.codehosting import get_bzr_path
+        # TODO: Configs for the port to serve on, etc.
+        command = [config.root + '/bin/py', get_bzr_path(),
+                   'launchpad-forking-service',
+                   '--port', config.codehosting.forking_daemon_address,
+                  ]
+        env = dict(os.environ)
+        env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins'
+        logfile = self.logfile
+        if logfile == '-':
+            # This process uses a different logging infrastructure from the
+            # rest of the Launchpad code. As such, it cannot trivially use '-'
+            # as the logfile. So we just ignore this setting.
+            pass
+        else:
+            env['BZR_LOG'] = logfile
+        process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
+        process.stdin.close()
+        stop_at_exit(process)
+
+
 def stop_at_exit(process):
     """Create and register an atexit hook for killing a process.
 
@@ -203,6 +253,9 @@
     'librarian': TacFile('librarian', 'daemons/librarian.tac',
                          'librarian_server', prepare_for_librarian),
     'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
+    # TODO, we probably need to run the forking service whenever somebody
+    #       requests the sftp service...
+    'forker': ForkingSessionService(),
     'mailman': MailmanService(),
     'codebrowse': CodebrowseService(),
     'google-webservice': GoogleWebService(),

=== modified file 'lib/lp/codehosting/sshserver/session.py'
--- lib/lp/codehosting/sshserver/session.py	2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/session.py	2010-09-17 20:34:36 +0000
@@ -9,9 +9,24 @@
     ]
 
 import os
+import signal
+import socket
 import urlparse
 
+<<<<<<< TREE
 from twisted.internet.process import ProcessExitedAlready
+=======
+from zope.event import notify
+from zope.interface import implements
+
+from twisted.internet import (
+    error,
+    fdesc,
+    interfaces,
+    main,
+    process,
+    )
+>>>>>>> MERGE-SOURCE
 from twisted.python import log
 from zope.event import notify
 
@@ -35,6 +50,238 @@
     """Raised when a session is asked to execute a forbidden command."""
 
 
+class _WaitForExit(process.ProcessReader):
+    """Wait on a socket for the exit status."""
+
+    def __init__(self, reactor, proc, sock):
+        super(_WaitForExit, self).__init__(reactor, proc, 'exit',
+                                           sock.fileno())
+        self._sock = sock
+        self.connected = 1
+
+    def close(self):
+        self._sock.close()
+
+    def dataReceived(self, data):
+        # TODO: how do we handle getting only *some* of the content?, Maybe we
+        #       need to read more bytes first...
+
+        # This is the only thing we do differently from the standard
+        # ProcessReader. When we get data on this socket, we need to treat it
+        # as a return code, or a failure.
+        if not data.startswith('exited'):
+            # Bad data, we want to signal that we are closing the connection
+            # TODO: How?
+            # self.proc.?
+            self.close()
+            # I don't know what to put here if we get bogus data, but I *do*
+            # want to say that the process is now considered dead to me
+            log.err('Got invalid exit information: %r' % (data,))
+            exit_status = (255 << 8)
+        else:
+            exit_status = int(data.split('\n')[1])
+        self.proc.processEnded(exit_status)
+
+
+class ForkedProcessTransport(process.BaseProcess):
+    """Wrap the forked process in a ProcessTransport so we can talk to it.
+
+    Note that instantiating the class creates the fork and sets it up in the
+    reactor.
+    """
+
+    implements(interfaces.IProcessTransport)
+
+    # Design decisions
+    # [Decision #a]
+    #   Inherit from process.BaseProcess
+    #       This seems slightly risky, as process.BaseProcess is actually
+    #       imported from twisted.internet._baseprocess.BaseProcess. The
+    #       real-world Process then actually inherits from process._BaseProcess
+    #       I've also had to copy a fair amount from the actual Process
+    #       command.
+    #       One option would be to inherit from process.Process, and just
+    #       override stuff like __init__ and reapProcess which I don't want to
+    #       do in the same way. (Is it ok not to call your Base classes
+    #       __init__ if you don't want to do that exact work?)
+
+    def __init__(self, reactor, executable, args, environment, proto):
+        process.BaseProcess.__init__(self, proto)
+        # Map from standard file descriptor to the associated pipe
+        self.pipes = {}
+        pid, path, sock = self._spawn(executable, args, environment)
+        self._fifo_path = path
+        self.pid = pid
+        self.process_sock = sock
+        self._fifo_path = path
+        self._connectSpawnToReactor(reactor)
+        if self.proto is not None:
+            self.proto.makeConnection(self)
+
+    def _sendMessageToService(self, message):
+        """Send a message to the Forking service and get the response"""
+        address = config.codehosting.forking_daemon_address
+        if ':' in address:
+            address, port = address.split(':', 1)
+        else:
+            port = address
+            address = '127.0.0.1'
+        port = int(port)
+        addrs = socket.getaddrinfo(address, port,
+            socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+        (family, socktype, proto, canonname, sockaddr) = addrs[0]
+        client_sock = socket.socket(family, socktype, proto)
+        log.msg('Connecting to Forking Service @ port: %s for %r'
+                % (sockaddr, message))
+        try:
+            client_sock.connect(sockaddr)
+            client_sock.sendall(message)
+            # We define the requests to be no bigger than 1kB. (For now)
+            response = client_sock.recv(1024)
+        except socket.error, e:
+            # TODO: What exceptions should be raised?
+            #       Raising the raw exception seems to kill the twisted reactor
+            #       Note that if the connection is refused, we *could* just
+            #       fall back on a regular 'spawnProcess' call.
+            log.err('Connection failed: %s' % (e,))
+            raise
+        if response.startswith("FAILURE"):
+            raise RuntimeError('Failed to send message: %r' % (response,))
+        return response, client_sock
+
+    def _spawn(self, executable, args, environment):
+        assert executable == 'bzr', executable # Maybe .endswith()
+        assert args[0] == 'bzr', args[0]
+        command_str = ' '.join(args[1:])
+        message = ['fork-env %s\n' % (' '.join(args[1:]),)]
+        for key, value in environment.iteritems():
+            # XXX: Currently we only pass BZR_EMAIL, should we be passing
+            #      everything else? Note that many won't be handled properly,
+            #      since the process is already running.
+            if key != 'BZR_EMAIL':
+                continue
+            message.append('%s: %s\n' % (key, value))
+        message.append('end\n')
+        message = ''.join(message)
+        response, sock = self._sendMessageToService(message)
+        if response.startswith('FAILURE'):
+            # TODO: Is there a better error to raise?
+            raise RuntimeError("Failed while sending message to forking "
+                "service. message: %r, failure: %r"
+                % (message, response))
+        ok, pid, path, tail = response.split('\n')
+        assert ok == 'ok'
+        assert tail == ''
+        pid = int(pid)
+        log.msg('Forking returned pid: %d, path: %s' % (pid, path))
+        return pid, path, sock
+
+    def _connectSpawnToReactor(self, reactor):
+        stdin_path = os.path.join(self._fifo_path, 'stdin')
+        stdout_path = os.path.join(self._fifo_path, 'stdout')
+        stderr_path = os.path.join(self._fifo_path, 'stderr')
+        child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
+        self.pipes[0] = process.ProcessWriter(reactor, self, 0, child_stdin_fd)
+        child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
+        # forceReadHack=True ? Used in process.py doesn't seem to be needed here
+        self.pipes[1] = process.ProcessReader(reactor, self, 1, child_stdout_fd)
+        child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
+        self.pipes[2] = process.ProcessReader(reactor, self, 2, child_stderr_fd)
+        # Note: _exiter forms a GC cycle, since it points to us, and we hold a
+        #       reference to it
+        self._exiter = _WaitForExit(reactor, self, self.process_sock)
+        self.pipes['exit'] = self._exiter
+
+    def _getReason(self, status):
+        # Copied from twisted.internet.process._BaseProcess
+        exitCode = sig = None
+        if os.WIFEXITED(status):
+            exitCode = os.WEXITSTATUS(status)
+        else:
+            sig = os.WTERMSIG(status)
+        if exitCode or sig:
+            return error.ProcessTerminated(exitCode, sig, status)
+        return error.ProcessDone(status)
+
+    def signalProcess(self, signalID):
+        """
+        Send the given signal C{signalID} to the process. It'll translate a
+        few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
+        representation to its int value, otherwise it'll pass directly the
+        value provided
+
+        @type signalID: C{str} or C{int}
+        """
+        # Copied from twisted.internet.process._BaseProcess
+        if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
+            signalID = getattr(signal, 'SIG%s' % (signalID,))
+        if self.pid is None:
+            raise process.ProcessExitedAlready()
+        os.kill(self.pid, signalID)
+
+    # Implemented because conch.ssh.session uses it, the Process implementation
+    # ignores writes if channel '0' is not available
+    def write(self, data):
+        self.pipes[0].write(data)
+
+    def writeToChild(self, childFD, data):
+        # Copied from twisted.internet.process.Process
+        self.pipes[childFD].write(data)
+
+    def closeChildFD(self, childFD):
+        if childFD in self.pipes:
+            self.pipes[childFD].loseConnection()
+
+    def closeStdin(self):
+        self.closeChildFD(0)
+
+    def closeStdout(self):
+        self.closeChildFD(1)
+
+    def closeStderr(self):
+        self.closeChildFD(2)
+
+    def loseConnection(self):
+        self.closeStdin()
+        self.closeStdout()
+        self.closeStderr()
+
+    # Implemented because ProcessWriter/ProcessReader want to call it
+    # Copied from twisted.internet.Process
+    def childDataReceived(self, name, data):
+        self.proto.childDataReceived(name, data)
+
+    # Implemented because ProcessWriter/ProcessReader want to call it
+    # Copied from twisted.internet.Process
+    def childConnectionLost(self, childFD, reason):
+        close = getattr(self.pipes[childFD], 'close', None)
+        if close is not None:
+            close()
+        else:
+            os.close(self.pipes[childFD].fileno())
+        del self.pipes[childFD]
+        try:
+            self.proto.childConnectionLost(childFD)
+        except:
+            log.err()
+        self.maybeCallProcessEnded()
+
+    # Implemented because of childConnectionLost
+    # Adapted from twisted.internet.Process
+    # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
+    #       point, but the daemon will be doing the reaping for us (we can't
+    #       because the process isn't a direct child.)
+    def maybeCallProcessEnded(self):
+        if self.pipes:
+            # Not done if we still have open pipes
+            return
+        if not self.lostProcess:
+            return
+        process.BaseProcess.maybeCallProcessEnded(self)
+
+    # pauseProducing, present in process.py, not a IProcessTransport interface
+
+
 class ExecOnlySession(DoNothingSession):
     """Conch session that only allows executing commands."""
 
@@ -58,7 +305,7 @@
             notify(BazaarSSHClosed(self.avatar))
             try:
                 self._transport.signalProcess('HUP')
-            except (OSError, ProcessExitedAlready):
+            except (OSError, process.ProcessExitedAlready):
                 pass
             self._transport.loseConnection()
 
@@ -81,8 +328,7 @@
         except ForbiddenCommand, e:
             self.errorWithMessage(protocol, str(e) + '\r\n')
             return
-        log.msg('Running: %r, %r, %r'
-                % (executable, arguments, self.environment))
+        log.msg('Running: %r, %r' % (executable, arguments))
         if self._transport is not None:
             log.err(
                 "ERROR: %r already running a command on transport %r"
@@ -91,8 +337,12 @@
         # violation. Apart from this line and its twin, this class knows
         # nothing about Bazaar.
         notify(BazaarSSHStarted(self.avatar))
-        self._transport = self.reactor.spawnProcess(
-            protocol, executable, arguments, env=self.environment)
+        self._transport = self._spawn(protocol, executable, arguments,
+                                      env=self.environment)
+
+    def _spawn(self, protocol, executable, arguments, env):
+        return self.reactor.spawnProcess(protocol, executable, arguments,
+                                         env=env)
 
     def getCommandToRun(self, command):
         """Return the command that will actually be run given `command`.
@@ -144,21 +394,62 @@
             % {'user_id': self.avatar.user_id})
 
 
+class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
+    """Use the Forking Service instead of spawnProcess."""
+
+    def _simplifyEnvironment(self, env):
+        """Pull out the bits of the environment we want to pass along."""
+        env = {}
+        for env_var in ['BZR_EMAIL']:
+            if env_var in self.environment:
+                env[env_var] = self.environment[env_var]
+        return env
+
+    def getCommandToFork(self, executable, arguments, env):
+        assert executable.endswith('/bin/py')
+        assert arguments[0] == executable
+        assert arguments[1].endswith('/bzr')
+        executable = 'bzr'
+        arguments = arguments[1:]
+        arguments[0] = 'bzr'
+        env = self._simplifyEnvironment(env)
+        return executable, arguments, env
+
+    def _spawn(self, protocol, executable, arguments, env):
+        # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr"
+        # and the executable
+        executable, arguments, env = self.getCommandToFork(executable,
+                                                           arguments, env)
+        return ForkedProcessTransport(self.reactor, executable,
+                                      arguments, env, protocol)
+
+
 def launch_smart_server(avatar):
     from twisted.internet import reactor
 
-    command = (
-        "%(root)s/bin/py %(bzr)s lp-serve --inet "
-        % {'root': config.root, 'bzr': get_bzr_path()})
+    python_command = "%(root)s/bin/py %(bzr)s" % {
+            'root': config.root, 'bzr': get_bzr_path()
+            }
+    args = " lp-serve --inet %(user_id)s"
+    command = python_command + args
+    forking_command = "bzr" + args
 
     environment = dict(os.environ)
 
     # Extract the hostname from the supermirror root config.
     hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]
     environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)
-    return RestrictedExecOnlySession(
+    klass = RestrictedExecOnlySession
+    # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
+    #       If the forking daemon has been spawned, then we can use it if the
+    #       feature is set to true for the given user, etc.
+    #       A global config is a good first step, but does require restarting
+    #       the service to change the flag. Or does 'config' support SIGHUP?
+    if config.codehosting.use_forking_daemon:
+        klass = ForkingRestrictedExecOnlySession
+    return klass(
         avatar,
         reactor,
         'bzr serve --inet --directory=/ --allow-writes',
-        command + ' %(user_id)s',
+        command,
         environment=environment)

=== modified file 'lib/lp/codehosting/sshserver/tests/test_session.py'
--- lib/lp/codehosting/sshserver/tests/test_session.py	2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/tests/test_session.py	2010-09-17 20:34:36 +0000
@@ -5,6 +5,7 @@
 
 __metaclass__ = type
 
+import socket
 import unittest
 
 from twisted.conch.interfaces import ISession
@@ -19,10 +20,19 @@
     )
 from lp.codehosting.sshserver.daemon import CodehostingAvatar
 from lp.codehosting.sshserver.session import (
-    ExecOnlySession,
-    ForbiddenCommand,
-    RestrictedExecOnlySession,
-    )
+<<<<<<< TREE
+    ExecOnlySession,
+    ForbiddenCommand,
+    RestrictedExecOnlySession,
+    )
+=======
+    ExecOnlySession,
+    ForbiddenCommand,
+    ForkingRestrictedExecOnlySession,
+    RestrictedExecOnlySession,
+    _WaitForExit,
+    )
+>>>>>>> MERGE-SOURCE
 from lp.codehosting.tests.helpers import AvatarTestCase
 
 
@@ -40,6 +50,9 @@
                          usePTY, childFDs))
         return MockProcessTransport(executable)
 
+    def addReader(self, reader):
+        self.log.append(('addReader', reader))
+
 
 class MockSSHSession:
     """Just enough of SSHSession to allow checking of reporting to stderr."""
@@ -60,6 +73,7 @@
         self._executable = executable
         self.log = []
         self.session = MockSSHSession(self.log)
+        self.status = None
 
     def closeStdin(self):
         self.log.append(('closeStdin',))
@@ -77,6 +91,42 @@
     def write(self, data):
         self.log.append(('write', data))
 
+    def processEnded(self, status):
+        self.log.append(('processEnded', status))
+
+
+# TODO: What base *should* I be using?
+class Test_WaitForExit(AvatarTestCase):
+
+    def setUp(self):
+        AvatarTestCase.setUp(self)
+        # self.avatar = CodehostingAvatar(self.aliceUserDict, None)
+        # The logging system will try to get the id of avatar.transport, so
+        # let's give it something to take the id of.
+        # self.avatar.transport = object()
+        self.reactor = MockReactor()
+        self.proc = MockProcessTransport('executable')
+        sock = socket.socket()
+        self.exiter = _WaitForExit(self.reactor, self.proc, sock)
+
+    def test__init__starts_reading(self):
+        self.assertEqual([('addReader', self.exiter)], self.reactor.log)
+
+    def test_dataReceived_ends_cleanly(self):
+        self.exiter.dataReceived('exited\n0\n')
+        self.assertEqual([('processEnded', 0)], self.proc.log)
+
+    def test_dataReceived_ends_with_errno(self):
+        self.exiter.dataReceived('exited\n256\n')
+        self.assertEqual([('processEnded', 256)], self.proc.log)
+
+    def test_dataReceived_bad_data(self):
+        # XXX: The dataReceived code calls 'log.err' which ends up getting
+        #      printed during the test run. How do I suppress that or even
+        #      better, check that it does so?
+        self.exiter.dataReceived('bogus\n')
+        self.assertEqual([('processEnded', (255 << 8))], self.proc.log)
+
 
 class TestExecOnlySession(AvatarTestCase):
     """Tests for ExecOnlySession.
@@ -340,6 +390,35 @@
         self.assertRaises(
             ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
 
+    def test_avatarAdaptsToOnlyRestrictedSession(self):
+        config.push('codehosting-no-forking',
+            "[codehosting]\nuse_forking_daemon: False\n")
+        self.addCleanup(config.pop, 'codehosting-no-forking')
+        session = ISession(self.avatar)
+        self.failIf(isinstance(session, ForkingRestrictedExecOnlySession),
+            "ISession(avatar) shouldn't adapt to "
+            " ForkingRestrictedExecOnlySession when forking is disabled. ")
+
+    def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
+        # config.push('codehosting-forking',
+        #     "[codehosting]\nuse_forking_daemon: True\n")
+        # self.addCleanup(config.pop, 'codehosting-forking')
+        session = ISession(self.avatar)
+        self.failUnless(
+            isinstance(session, ForkingRestrictedExecOnlySession),
+            "ISession(avatar) doesn't adapt to "
+            " ForkingRestrictedExecOnlySession. "
+            "Got %r instead." % (session,))
+        executable, arguments = session.getCommandToRun(
+            'bzr serve --inet --directory=/ --allow-writes')
+        executable, arguments, env = session.getCommandToFork(
+            executable, arguments, session.environment)
+        self.assertEqual('bzr', executable)
+        self.assertEqual(
+             ['bzr', 'lp-serve',
+              '--inet', str(self.avatar.user_id)],
+             list(arguments))
+
 
 def test_suite():
     return unittest.TestLoader().loadTestsFromName(__name__)

=== modified file 'lib/lp/codehosting/tests/test_lpserve.py'
--- lib/lp/codehosting/tests/test_lpserve.py	2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/tests/test_lpserve.py	2010-09-17 20:34:36 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the lp-serve plugin."""
@@ -8,6 +8,7 @@
 import os
 import re
 from subprocess import PIPE
+import threading
 import unittest
 
 from bzrlib import (
@@ -17,16 +18,21 @@
 from bzrlib.smart import medium
 from bzrlib.tests import TestCaseWithTransport
 from bzrlib.transport import remote
+from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess
 
 from canonical.config import config
+<<<<<<< TREE
 from lp.codehosting import (
     get_bzr_path,
     get_BZR_PLUGIN_PATH_for_subprocess,
     )
+=======
+
+>>>>>>> MERGE-SOURCE
 from lp.codehosting.bzrutils import make_error_utility
 
 
-class TestLaunchpadServe(TestCaseWithTransport):
+class TestLaunchpadServe(TestCaseWithSubprocess):
     """Tests for the lp-serve plugin.
 
     Most of the helper methods here are copied from bzrlib.tests and
@@ -38,59 +44,6 @@
         """Assert that a server process finished cleanly."""
         self.assertEqual((0, '', ''), tuple(result))
 
-    def get_python_path(self):
-        """Return the path to the Python interpreter."""
-        return '%s/bin/py' % config.root
-
-    def start_bzr_subprocess(self, process_args, env_changes=None,
-                             working_dir=None):
-        """Start bzr in a subprocess for testing.
-
-        Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
-        This version removes some of the skipping stuff, some of the
-        irrelevant comments (e.g. about win32) and uses Launchpad's own
-        mechanisms for getting the path to 'bzr'.
-
-        Comments starting with 'LAUNCHPAD' are comments about our
-        modifications.
-        """
-        if env_changes is None:
-            env_changes = {}
-        env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
-        old_env = {}
-
-        def cleanup_environment():
-            for env_var, value in env_changes.iteritems():
-                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
-
-        def restore_environment():
-            for env_var, value in old_env.iteritems():
-                osutils.set_or_unset_env(env_var, value)
-
-        cwd = None
-        if working_dir is not None:
-            cwd = osutils.getcwd()
-            os.chdir(working_dir)
-
-        # LAUNCHPAD: Because of buildout, we need to get a custom Python
-        # binary, not sys.executable.
-        python_path = self.get_python_path()
-        # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
-        # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
-        bzr_path = get_bzr_path()
-        try:
-            cleanup_environment()
-            command = [python_path, bzr_path]
-            command.extend(process_args)
-            process = self._popen(
-                command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
-        finally:
-            restore_environment()
-            if cwd is not None:
-                os.chdir(cwd)
-
-        return process
-
     def finish_lpserve_subprocess(self, process):
         """Shut down the server process.
 
@@ -169,4 +122,10 @@
 
 
 def test_suite():
-    return unittest.TestLoader().loadTestsFromName(__name__)
+    from bzrlib import tests
+    from bzrlib.plugins import lpserve
+
+    loader = tests.TestLoader()
+    suite = loader.loadTestsFromName(__name__)
+    suite = lpserve.load_tests(suite, lpserve, loader)
+    return suite

=== added file 'lp_service_interface.txt'
--- lp_service_interface.txt	1970-01-01 00:00:00 +0000
+++ lp_service_interface.txt	2010-09-17 20:34:36 +0000
@@ -0,0 +1,17 @@
+=======================================
+Launchpad Codehosting Service Interface
+=======================================
+
+The codehosting service is a way to speed up the time it takes to spawn
+many a new bzr smart-server for every codehosting connection.
+
+Overview
+========
+
+The service starts up and preloads various libraries. It then opens up
+a socket connection, where clients can make a few simple requests. One
+of these is to fork a smart server process. The server then responds to
+the client, indicating what FIFO handles they can use to communicate to
+the new process.
+
+.. vim: tw=71 sw=4 sts=4 ts=8 et si ft=rst


Follow ups