← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] lp:~jameinel/launchpad/lp-service into lp:launchpad

 

Review: Needs Information
Wow, what a branch.  I've been quite picky in this, that's not because
I hate you <wink> but because it's subtle code and has potential to
cause issues, so I wanted to make sure all the comments make sense and
there's no redundant code, etc.

I worry slightly about what would happen if the forking service fell
over -- but clearly, we rely on the sftp server itself staying up, so
this probably isn't such a big deal.  I guess we also need to make
sure the forking service is running in production.

All that said, I have no real problems with the code and look forward
to getting my branches onto and off of Launchpad that bit faster!

I'm marking this needs information because I want to see a response to my
comments.  But I don't think any of them are deep.

> === modified file 'Makefile'
> --- Makefile	2010-09-07 18:15:01 +0000
> +++ Makefile	2010-10-06 00:19:31 +0000
> @@ -253,7 +253,7 @@
>
>  run_codehosting: check_schema inplace stop
>  	$(RM) thread*.request
> -	bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
> +	bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
>
>
>  start_librarian: compile

You should add the forker service to the run_all target too.

> === added directory 'bzrplugins/lpserve'
> === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
> --- bzrplugins/lpserve.py	2010-04-19 06:35:23 +0000
> +++ bzrplugins/lpserve/__init__.py	2010-10-06 00:19:31 +0000
> @@ -8,15 +8,33 @@
>
>  __metaclass__ = type
>
> -__all__ = ['cmd_launchpad_server']
> -
> -
> +__all__ = ['cmd_launchpad_server',
> +           'cmd_launchpad_forking_service',
> +          ]

This shoudl be formatted like this:

__all__ = [
    'cmd_launchpad_server',
    'cmd_launchpad_forking_service',
    ]

> +
> +
> +import errno
> +import os
>  import resource
> +import shlex
> +import shutil
> +import signal
> +import socket
>  import sys
> +import tempfile
> +import threading
> +import time
>
>  from bzrlib.commands import Command, register_command
>  from bzrlib.option import Option
> -from bzrlib import lockdir, ui
> +from bzrlib import (
> +    commands,
> +    errors,
> +    lockdir,
> +    osutils,
> +    trace,
> +    ui,
> +    )
>
>  from bzrlib.smart import medium, server
>  from bzrlib.transport import get_transport
> @@ -110,3 +128,724 @@
>
>
>  register_command(cmd_launchpad_server)
> +
> +
> +class LPForkingService(object):
> +    """A service that can be asked to start a new bzr subprocess via fork.
> +
> +    The basic idea is that python startup is very expensive. For example, the
> +    original 'lp-serve' command could take 2.5s just to start up, before any
> +    actual actions could be performed.

Well, it's not really Python startup, its more the importing thats
slow.  Maybe phrase this as "starting Python and importing Launchpad
is very...".

> +    This class provides a service sitting on a socket, which can then be
> +    requested to fork and run a given bzr command.
> +
> +    Clients connect to the socket and make a simple request, which then
> +    receives a response. The possible requests are:
> +
> +        "hello\n":  Trigger a heartbeat to report that the program is still
> +                    running, and write status information to the log file.
> +        "quit\n":   Stop the service, but do so 'nicely', waiting for children
> +                    to exit, etc. Once this is received the service will stop
> +                    taking new requests on the port.
> +        "fork <command>\n": Request a new subprocess to be started.
> +            <command> is the bzr command to be run, such as "rocks" or
> +            "lp-serve --inet 12".
> +            The immediate response will be the path-on-disk to a directory full
> +            of named pipes (fifos) that will be the stdout/stderr/stdin of the
> +            new process.

This doesn't quite make it clear what the names of the files will be.
The obvious guess is correct, but I'd rather be certain.

> +            If a client holds the socket open, when the child process exits,
> +            the exit status (as given by 'wait()') will be written to the
> +            socket.

This appears to be out of date -- there's also a fork-env command now.
Is the fork command still useful?

> +            Note that one of the key bits is that the client will not be
> +            started with exec*, we just call 'commands.run_bzr*()' directly.
> +            This way, any modules that are already loaded will not need to be
> +            loaded again. However, care must be taken with any global-state
> +            that should be reset.
> +    """
> +
> +    # Design decisions. These are bits where we could have chosen a different
> +    # method/implementation and weren't sure what would be best. Documenting
> +    # the current decision, and the alternatives.
> +    #
> +    # [Decision #1]
> +    #   Serve on a named AF_UNIX socket.
> +    #       1) It doesn't make sense to serve to arbitrary hosts, we only want
> +    #          the local host to make requests. (Since the client needs to
> +    #          access the named fifos on the current filesystem.)
> +    #       2) You can set security parameters on a filesystem path (g+rw,
> +    #          a-rw).
> +    # [Decision #2]
> +    #   SIGCHLD
> +    #       We want to quickly detect that children have exited so that we can
> +    #       inform the client process quickly. At the moment, we register a
> +    #       SIGCHLD handler that doesn't do anything. However, it means that
> +    #       when we get the signal, if we are currently blocked in something
> +    #       like '.accept()', we will jump out temporarily. At that point the
> +    #       main loop will check if any children have exited. We could have
> +    #       done this work as part of the signal handler, but that felt 'racy'
> +    #       doing any serious work in a signal handler.
> +    #       If we just used socket.timeout as the indicator to go poll for
> +    #       children exiting, it slows the disconnect by as much as the full
> +    #       timeout. (So a timeout of 1.0s will cause the process to hang by
> +    #       that long until it determines that a child has exited, and can
> +    #       close the connection.)
> +    #       The current flow means that we'll notice exited children whenever
> +    #       we finish the current work.
> +    # [Decision #3]
> +    #   Child vs Parent actions.
> +    #       There are several actions that are done when we get a new request.
> +    #       We have to create the fifos on disk, fork a new child, connect the
> +    #       child to those handles, and inform the client of the new path (not
> +    #       necessarily in that order.) It makes sense to wait to send the path
> +    #       message until after the fifos have been created. That way the
> +    #       client can just try to open them immediately, and the
> +    #       client-and-child will be synchronized by the open() calls.
> +    #       However, should the client be the one doing the mkfifo, should the
> +    #       server? Who should be sending the message? Should we fork after the
> +    #       mkfifo or before.
> +    #       The current thoughts:
> +    #           1) Try to do work in the child when possible. This should allow
> +    #              for 'scaling' because the server is single-threaded.
> +    #           2) We create the directory itself in the server, because that
> +    #              allows the server to monitor whether the client failed to
> +    #              clean up after itself or not.
> +    #           3) Otherwise we create the fifos in the client, and then send
> +    #              the message back.
> +    # [Decision #4]
> +    #   Exit information
> +    #       Inform the client that the child has exited on the socket they used
> +    #       to request the fork.
> +    #       1) Arguably they could see that stdout and stderr have been closed,
> +    #          and thus stop reading. In testing, I wrote a client which uses
> +    #          select.poll() over stdin/stdout/stderr and used that to ferry
> +    #          the content to the appropriate local handle. However for the
> +    #          FIFOs, when the remote end closed, I wouldn't see any
> +    #          corresponding information on the local end. There obviously
> +    #          wasn't any data to be read, so they wouldn't show up as
> +    #          'readable' (for me to try to read, and get 0 bytes, indicating
> +    #          it was closed). I also wasn't seeing POLLHUP, which seemed to be
> +    #          the correct indicator.  As such, we decided to inform the client
> +    #          on the socket that they originally made the fork request, rather
> +    #          than just closing the socket immediately.
> +    #       2) We could have had the forking server close the socket, and only
> +    #          the child hold the socket open. When the child exits, then the
> +    #          OS naturally closes the socket.
> +    #          If we want the returncode, then we should put that as bytes on
> +    #          the socket before we exit. Having the child do the work means
> +    #          that in error conditions, it could easily die before being able to
> +    #          write anything (think SEGFAULT, etc). The forking server is
> +    #          already 'wait'() ing on its children. So that we don't get
> +    #          zombies, and with wait3() we can get the rusage (user time,
> +    #          memory consumption, etc.)
> +    #          As such, it seems reasonable that the server can then also
> +    #          report back when a child is seen as exiting.
> +    # [Decision #5]
> +    #   cleanup once connected
> +    #       The child process blocks during 'open()' waiting for the client to
> +    #       connect to its fifos. Once the client has connected, the child then
> +    #       deletes the temporary directory and the fifos from disk. This means
> +    #       that there isn't much left for diagnosis, but it also means that
> +    #       the client won't leave garbage around if it crashes, etc.
> +    #       Note that the forking service itself still monitors the paths
> +    #       created, and will delete garbage if it sees that a child failed to
> +    #       do so.
> +    # [Decision #6]
> +    #   os._exit(retcode) in the child
> +    #       Calling sys.exit(retcode) raises an exception, which then bubbles
> +    #       up the stack and runs exit functions (and finally statements). When
> +    #       I tried using it originally, I would see the current child bubble
> +    #       all the way up the stack (through the server code that it fork()
> +    #       through), and then get to main() returning code 0. The process
> +    #       would still exit nonzero. My guess is that something in the atexit
> +    #       functions was failing, but that it was happening after logging, etc
> +    #       had been shut down.
> +    #       Any global state from the child process should be flushed before
> +    #       run_bzr_* has exited (which we *do* wait for), and any other global
> +    #       state is probably a remnant from the service process. Which will be
> +    #       cleaned up by the service itself, rather than the child.
> +    #       There is some concern that log files may not get flushed, so we
> +    #       currently call sys.exitfunc() first. The main problem is that I
> +    #       don't know any way to *remove* a function registered via 'atexit()'
> +    #       so if the forking service has some state, we my try to clean it up
> +    #       incorrectly.
> +    #       Note that the bzr script itself uses sys.exitfunc(); os._exit() in
> +    #       the 'bzr' main script, as the teardown time of all the python state
> +    #       was quite noticeable in real-world runtime. As such, bzrlib should
> +    #       be pretty safe, or it would have been failing for people already.
> +    # [Decision #7]
> +    #   prefork vs max children vs ?
> +    #       For simplicity it seemed easiest to just fork when requested. Over
> +    #       time, I realized it would be easy to allow running an arbitrary
> +    #       command (no harder than just running one command), so it seemed
> +    #       reasonable to switch over. If we go the prefork route, then we'll
> +    #       need a way to tell the pre-forked children what command to run.
> +    #       This could be as easy as just adding one more fifo that they wait
> +    #       on in the same directory.
> +    #       For now, I've chosen not to limit the number of forked children. I
> +    #       don't know what a reasonable value is, and probably there are
> +    #       already limitations at play. (If Conch limits connections, then it
> +    #       will already be doing all the work, etc.)
> +    # [Decision #8]
> +    #   nicer errors on the request socket
> +    #       This service is meant to be run only on the local system. As such,
> +    #       we don't try to be extra defensive about leaking information to
> +    #       the one connecting to the socket. (We should still watch out what
> +    #       we send across the per-child fifos, since those are connected to
> +    #       remote clients.) Instead we try to be helpful, and tell them as
> +    #       much as we know about what went wrong.

All this seems reasonable to me.

> +    DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'

I'm not sure of the value of providing a default here, as it seems
that its always in practice going to be overridden by the config.  If
it makes testing easier, it's worth it I guess.

> +    DEFAULT_PERMISSIONS = 00660 # Permissions on the master socket (rw-rw----)
> +    WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children
> +    SOCKET_TIMEOUT = 1.0
> +    SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
> +    WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
> +                                   # be read
> +
> +    _fork_function = os.fork
> +
> +    def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS):
> +        self.master_socket_path = path
> +        self._perms = perms
> +        self._start_time = time.time()

It's pedantry in the extreme, but does it make more sense to set this
in main_loop than here?

> +        self._should_terminate = threading.Event()

The use of a threading primitive is a bit surprising as this process
is single threaded.  Is this to make testing easier?

> +        # We address these locally, in case of shutdown socket may be gc'd
> +        # before we are
> +        self._socket_timeout = socket.timeout
> +        self._socket_error = socket.error

This must have been fun to figure out :/

> +        self._socket_timeout = socket.timeout
> +        self._socket_error = socket.error

You don't need to do it twice though :-)

> +        # Map from pid => information
> +        self._child_processes = {}

I'd love to know more than just 'information' here.

> +        self._children_spawned = 0
> +
> +    def _create_master_socket(self):
> +        self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> +        self._server_socket.bind(self.master_socket_path)
> +        if self._perms is not None:
> +            os.chmod(self.master_socket_path, self._perms)

The pedant in me thinks this is a bit racy.  But I don't think it's
important (in fact, I wonder if the permission stuff is necessary
really).

> +        self._server_socket.setsockopt(socket.SOL_SOCKET,
> +            socket.SO_REUSEADDR, 1)

Does this work with unix domain sockets?  I didn't think it did, and
your comments in _cleanup_master_socket lend weight to my position :-)

Another option would be to use a socket in the 'abstract namespace',
see man 7 unix, although that might be a bit magical.

> +        self._sockname = self._server_socket.getsockname()
> +        # self.host = self._sockname[0]
> +        self.port = self._sockname[1]

Is there any point to these three lines?

> +        self._server_socket.listen(5)
> +        self._server_socket.settimeout(self.SOCKET_TIMEOUT)
> +        trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
> +
> +    def _cleanup_master_socket(self):
> +        self._server_socket.close()
> +        try:
> +            os.remove(self.master_socket_path)
> +        except (OSError, IOError), e:
> +            # If we don't delete it, then we get 'address already in
> +            # use' failures
> +            trace.mutter('failed to cleanup: %s'
> +                         % (self.master_socket_path,))
> +
> +    def _handle_sigchld(self, signum, frm):
> +        # We don't actually do anything here, we just want an interrupt (EINTR)
> +        # on socket.accept() when SIGCHLD occurs.
> +        pass
> +
> +    def _handle_sigterm(self, signum, frm):
> +        # Unregister this as the default handler, 2 SIGTERMs will exit us.
> +        signal.signal(signal.SIGTERM, signal.SIG_DFL)
> +        # SIGTERM should also generate EINTR on our wait loop, so this should
> +        # be enough
> +        self._should_terminate.set()
> +
> +    def _register_signals(self):
> +        """Register a SIGCHILD and SIGTERM handler.
> +
> +        If we have a trigger for SIGCHILD then we can quickly respond to
> +        clients when their process exits. The main risk is getting more EAGAIN
> +        errors elsewhere.
> +
> +        SIGTERM allows us to cleanup nicely before we exit.
> +        """
> +        signal.signal(signal.SIGCHLD, self._handle_sigchld)
> +        signal.signal(signal.SIGTERM, self._handle_sigterm)

When you run 'make run_codehosting' and C-c it, do you know which
signal gets sent?  I don't, I have to admit.  I guess SIGINT will work
fine by default...

> +    def _unregister_signals(self):
> +        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
> +        signal.signal(signal.SIGTERM, signal.SIG_DFL)
> +
> +    def _create_child_file_descriptors(self, base_path):
> +        stdin_path = os.path.join(base_path, 'stdin')
> +        stdout_path = os.path.join(base_path, 'stdout')
> +        stderr_path = os.path.join(base_path, 'stderr')
> +        os.mkfifo(stdin_path)
> +        os.mkfifo(stdout_path)
> +        os.mkfifo(stderr_path)
> +
> +    def _bind_child_file_descriptors(self, base_path):
> +        import logging
> +        from bzrlib import ui

Any reason to do these imports at function level?  If there is, can
you explain it in a comment?

> +        stdin_path = os.path.join(base_path, 'stdin')
> +        stdout_path = os.path.join(base_path, 'stdout')
> +        stderr_path = os.path.join(base_path, 'stderr')
> +        # Opening for writing blocks (or fails), so do those last

man 3 mkfifo says that opening for both reading and writing blocks.  I
guess this means it's very important that the client and child process
open the fifos in the same order!

> +        # TODO: Consider buffering, though that might interfere with reading
> +        #       and writing the smart protocol
> +        stdin_fid = os.open(stdin_path, os.O_RDONLY)
> +        stdout_fid = os.open(stdout_path, os.O_WRONLY)
> +        stderr_fid = os.open(stderr_path, os.O_WRONLY)
> +        # Note: by this point bzrlib has opened stderr for logging
> +        #       (as part of starting the service process in the first place). As
> +        #       such, it has a stream handler that writes to stderr. logging
> +        #       tries to flush and close that, but the file is already closed.
> +        #       This just supresses that exception
> +        logging.raiseExceptions = False
> +        sys.stdin.close()
> +        sys.stdout.close()
> +        sys.stderr.close()
> +        os.dup2(stdin_fid, 0)
> +        os.dup2(stdout_fid, 1)
> +        os.dup2(stderr_fid, 2)
> +        sys.stdin = os.fdopen(stdin_fid, 'rb')
> +        sys.stdout = os.fdopen(stdout_fid, 'wb')
> +        sys.stderr = os.fdopen(stderr_fid, 'wb')
> +        ui.ui_factory.stdin = sys.stdin
> +        ui.ui_factory.stdout = sys.stdout
> +        ui.ui_factory.stderr = sys.stderr
> +        # Now that we've opened the handles, delete everything so that we don't
> +        # leave garbage around. Because the open() is done in blocking mode, we
> +        # know that someone has already connected to them, and we don't want
> +        # anyone else getting confused and connecting.
> +        # See [Decision #5]
> +        os.remove(stderr_path)
> +        os.remove(stdout_path)
> +        os.remove(stdin_path)
> +        os.rmdir(base_path)
> +
> +    def _close_child_file_descriptons(self):
> +        sys.stdin.close()
> +        sys.stderr.close()
> +        sys.stdout.close()

This is surely meant to be called _close_child_file_descriptoRs ?

> +
> +    def become_child(self, command_argv, path):
> +        """We are in the spawned child code, do our magic voodoo."""
> +        # Stop tracking new signals
> +        self._unregister_signals()
> +        # Reset the start time
> +        trace._bzr_log_start_time = time.time()
> +        trace.mutter('%d starting %r'
> +                     % (os.getpid(), command_argv,))
> +        self.host = None
> +        self.port = None
> +        self._sockname = None
> +        self._bind_child_file_descriptors(path)
> +        self._run_child_command(command_argv)
> +
> +    def _run_child_command(self, command_argv):
> +        # This is the point where we would actually want to do something with
> +        # our life
> +        # TODO: We may want to consider special-casing the 'lp-serve' command.
> +        #       As that is the primary use-case for this service, it might be
> +        #       interesting to have an already-instantiated instance, where we
> +        #       can just pop on an extra argument and be ready to go. However,
> +        #       that would probably only really be measurable if we prefork. As
> +        #       it looks like ~200ms is 'fork()' time, but only 50ms is
> +        #       run-the-command time.
> +        retcode = commands.run_bzr_catch_errors(command_argv)
> +        self._close_child_file_descriptons()
> +        trace.mutter('%d finished %r'
> +                     % (os.getpid(), command_argv,))
> +        # We force os._exit() here, because we don't want to unwind the stack,
> +        # which has complex results. (We can get it to unwind back to the
> +        # cmd_launchpad_forking_service code, and even back to main() reporting
> +        # thereturn code, but after that, suddenly the return code changes from
> +        # a '0' to a '1', with no logging of info.
> +        # TODO: Should we call sys.exitfunc() here? it allows atexit functions
> +        #       to fire, however, some of those may be still around from the
> +        #       parent process, which we don't really want.

Does bzr install atexit functions?

> +        sys.exitfunc()
> +        # See [Decision #6]
> +        os._exit(retcode)
> +
> +    @staticmethod
> +    def command_to_argv(command_str):
> +        """Convert a 'foo bar' style command to [u'foo', u'bar']"""
> +        # command_str must be a utf-8 string
> +        return [s.decode('utf-8') for s in shlex.split(command_str)]
> +
> +    @staticmethod
> +    def parse_env(env_str):
> +        """Convert the environment information into a dict.
> +
> +        :param env_str: A string full of environment variable declarations.
> +            Each key is simple ascii "key: value\n"
> +            The string must end with "end\n".
> +        :return: A dict of environment variables
> +        """
> +        env = {}
> +        if not env_str.endswith('end\n'):
> +            raise ValueError('Invalid env-str: %r' % (env_str,))
> +        env_str = env_str[:-5]
> +        if not env_str:
> +            return env
> +        env_entries = env_str.split('\n')
> +        for entry in env_entries:
> +            key, value = entry.split(': ', 1)
> +            env[key] = value
> +        return env
> +
> +    def fork_one_request(self, conn, client_addr, command_argv, env):
> +        """Fork myself and serve a request."""
> +        temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
> +        # Now that we've set everything up, send the response to the client we
> +        # create them first, so the client can start trying to connect to them,
> +        # while we fork and have the child do the same.
> +        self._children_spawned += 1
> +        pid = self._fork_function()
> +        if pid == 0:
> +            pid = os.getpid()
> +            trace.mutter('%d spawned' % (pid,))
> +            self._server_socket.close()
> +            for env_var, value in env.iteritems():
> +                osutils.set_or_unset_env(env_var, value)

It's bascially by chance, but I wanted to mention that this provides a
place to do something I've wanted to do for a long time: have bzr
lp-serve processes have BZR_LOG set to something based on their pid.
Not in this branch though :-)

> +            # See [Decision #3]
> +            self._create_child_file_descriptors(temp_name)
> +            conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
> +            conn.close()
> +            self.become_child(command_argv, temp_name)
> +            trace.warning('become_child returned!!!')
> +            sys.exit(1)

Would os._exit() not be more appropriate here?

> +        else:
> +            self._child_processes[pid] = (temp_name, conn)
> +            self.log(client_addr, 'Spawned process %s for %r: %s'
> +                            % (pid, command_argv, temp_name))
> +
> +    def main_loop(self):
> +        self._should_terminate.clear()
> +        self._register_signals()
> +        self._create_master_socket()
> +        trace.note('Listening on socket: %s' % (self.master_socket_path,))
> +        try:
> +            try:
> +                self._do_loop()
> +            finally:
> +                # Stop talking to others, we are shutting down
> +                self._cleanup_master_socket()
> +        except KeyboardInterrupt:
> +            # SIGINT received, try to shutdown cleanly
> +            pass
> +        trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
> +                   % (self.WAIT_FOR_CHILDREN_TIMEOUT,
> +                      len(self._child_processes),))
> +        self._shutdown_children()
> +        trace.note('Exiting')
> +
> +    def _do_loop(self):
> +        while not self._should_terminate.isSet():
> +            try:
> +                conn, client_addr = self._server_socket.accept()
> +            except self._socket_timeout:
> +                pass # run shutdown and children checks
> +            except self._socket_error, e:
> +                if e.args[0] == errno.EINTR:
> +                    pass # run shutdown and children checks
> +                elif e.args[0] != errno.EBADF:
> +                    # We can get EBADF here while we are shutting down
> +                    # So we just ignore it for now
> +                    pass
> +                else:
> +                    # Log any other failure mode
> +                    trace.warning("listening socket error: %s", e)
> +            else:
> +                self.log(client_addr, 'connected')
> +                # TODO: We should probably trap exceptions coming out of this
> +                #       and log them, so that we don't kill the service because
> +                #       of an unhandled error

Er.  Yes.

> +                # Note: settimeout is used so that a malformed request doesn't
> +                #       cause us to hang forever. Note that the particular
> +                #       implementation means that a malicious client could
> +                #       probably send us one byte every Xms, and we would just
> +                #       keep trying to read it. However, as a local service, we
> +                #       aren't worrying about it.
> +                conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
> +                try:
> +                    self.serve_one_connection(conn, client_addr)
> +                except self._socket_timeout, e:
> +                    trace.log_exception_quietly()
> +                    self.log(client_addr, 'request timeout failure: %s' % (e,))
> +                    conn.sendall('FAILURE\nrequest timed out\n')
> +                    conn.close()
> +            self._poll_children()
> +
> +    def log(self, client_addr, message):
> +        """Log a message to the trace log.
> +
> +        Include the information about what connection is being served.
> +        """
> +        if client_addr is not None:
> +            # Note, we don't use conn.getpeername() because if a client
> +            # disconnects before we get here, that raises an exception
> +            conn_info = '[%s] ' % (client_addr,)
> +        else:
> +            conn_info = ''
> +        trace.mutter('%s%s' % (conn_info, message))
> +
> +    def log_information(self):
> +        """Log the status information.
> +
> +        This includes stuff like number of children, and ... ?
> +        """
> +        self._poll_children()
> +        self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
> +        self.log(None, '%d children currently running (spawned %d total)'
> +                       % (len(self._child_processes), self._children_spawned))
> +        # Read the current information about memory consumption, etc.
> +        self.log(None, 'Self: %s'
> +                       % (resource.getrusage(resource.RUSAGE_SELF),))
> +        # This seems to be the sum of all rusage for all children that have
> +        # been collected (not for currently running children, or ones we
> +        # haven't "wait"ed on.) We may want to read /proc/PID/status, since
> +        # 'live' information is probably more useful.
> +        self.log(None, 'Finished children: %s'
> +                       % (resource.getrusage(resource.RUSAGE_CHILDREN),))
> +
> +    def _poll_children(self):
> +        """See if children are still running, etc.
> +
> +        One interesting hook here would be to track memory consumption, etc.
> +        """
> +        while self._child_processes:
> +            try:
> +                c_id, exit_code, rusage = os.wait3(os.WNOHANG)
> +            except OSError, e:
> +                if e.errno == errno.ECHILD:
> +                    # TODO: We handle this right now because the test suite
> +                    #       fakes a child, since we wanted to test some code
> +                    #       without actually forking anything
> +                    trace.mutter('_poll_children() called, and'
> +                        ' self._child_processes indicates there are'
> +                        ' children, but os.wait3() says there are not.'
> +                        ' current_children: %s' % (self._child_processes,))
> +                    return
> +            if c_id == 0:
> +                # No more children stopped right now
> +                return
> +            c_path, sock = self._child_processes.pop(c_id)
> +            trace.mutter('%s exited %s and usage: %s'
> +                         % (c_id, exit_code, rusage))
> +            # See [Decision #4]
> +            try:
> +                sock.sendall('exited\n%s\n' % (exit_code,))
> +            except (self._socket_timeout, self._socket_error), e:
> +                # The client disconnected before we wanted them to,
> +                # no big deal
> +                trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
> +            else:
> +                sock.close()
> +            if os.path.exists(c_path):
> +                # The child failed to cleanup after itself, do the work here
> +                trace.warning('Had to clean up after child %d: %s\n'
> +                              % (c_id, c_path))
> +                shutil.rmtree(c_path, ignore_errors=True)
> +
> +    def _wait_for_children(self, secs):
> +        start = time.time()
> +        end = start + secs
> +        while self._child_processes:
> +            self._poll_children()
> +            if secs > 0 and time.time() > end:
> +                break
> +            time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)

I find the practice of calling this function with an argument of
self.SLEEP_FOR_CHILDREN_TIMEOUT a little strange.  Isn't that just the
same as calling self._poll_children()?

> +    def _shutdown_children(self):
> +        self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
> +        if self._child_processes:
> +            trace.warning('Failed to stop children: %s'
> +                % ', '.join(map(str, self._child_processes)))

This comment seems a bit wrong -- there's not actually been an attempt
to close the children down yet has there?  All we did was wait, and
they haven't exited.

A nice feature of the general architecture is that it would be quite
easy to disconnect listening for more connections from exiting the
daemon, something that will help one day with reducing the downtime
associated with an update.  But that can be sorted out later.

> +            for c_id in self._child_processes:
> +                trace.warning('sending SIGINT to %d' % (c_id,))
> +                os.kill(c_id, signal.SIGINT)
> +            # We sent the SIGINT signal, see if they exited
> +            self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
> +        if self._child_processes:
> +            # No? Then maybe something more powerful
> +            for c_id in self._child_processes:
> +                trace.warning('sending SIGKILL to %d' % (c_id,))
> +                os.kill(c_id, signal.SIGKILL)
> +            # We sent the SIGKILL signal, see if they exited
> +            self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
> +        if self._child_processes:
> +            for c_id, (c_path, sock) in self._child_processes.iteritems():
> +                # TODO: We should probably put something into this message?
> +                #       However, the likelyhood is very small that this isn't
> +                #       already closed because of SIGKILL + _wait_for_children
> +                #       And I don't really know what to say...
> +                sock.close()
> +                if os.path.exists(c_path):
> +                    trace.warning('Cleaning up after immortal child %d: %s\n'
> +                                  % (c_id, c_path))
> +                    shutil.rmtree(c_path)
> +
> +    def _parse_fork_request(self, conn, client_addr, request):
> +        if request.startswith('fork-env '):
> +            while not request.endswith('end\n'):
> +                request += osutils.read_bytes_from_socket(conn)

Hang on.  You don't have any guarantee here that
read_bytes_from_socket won't return 'end\n' and then the start of the
next request do you?  Hmm, I guess you do really, because 'end\n' is
always the last thing the client sends.  But that does seem a little
delicate -- it should at least be documented.

> +                request = request.replace('\r\n', '\n')
> +            command, env = request[9:].split('\n', 1)
> +        else:
> +            command = request[5:].strip()
> +            env = 'end\n' # No env set
> +        try:
> +            command_argv = self.command_to_argv(command)
> +            env = self.parse_env(env)
> +        except Exception, e:
> +            # TODO: Log the traceback?
> +            self.log(client_addr, 'command or env parsing failed: %r'
> +                                  % (str(e),))
> +            conn.sendall('FAILURE\ncommand or env parsing failed: %r'
> +                         % (str(e),))
> +        else:
> +            return command_argv, env
> +        return None, None

I think it would be clearer if the "return None, None" was in the
except clause (which *is* equivalent, I think?).

> +    def serve_one_connection(self, conn, client_addr):
> +        request = ''
> +        while '\n' not in request:
> +            request += osutils.read_bytes_from_socket(conn)
> +        # telnet likes to use '\r\n' rather than '\n', and it is nice to have
> +        # an easy way to debug.
> +        request = request.replace('\r\n', '\n')

This must be a relic of listening to an INET port?  Can it be deleted now?

> +        self.log(client_addr, 'request: %r' % (request,))
> +        if request == 'hello\n':
> +            conn.sendall('ok\nyep, still alive\n')
> +            self.log_information()
> +        elif request == 'quit\n':
> +            self._should_terminate.set()
> +            conn.sendall('ok\nquit command requested... exiting\n')
> +        elif request.startswith('fork ') or request.startswith('fork-env '):
> +            command_argv, env = self._parse_fork_request(conn, client_addr,
> +                                                         request)
> +            if command_argv is not None:
> +                # See [Decision #7]
> +                # TODO: Do we want to limit the number of children? And/or
> +                #       prefork additional instances? (the design will need to
> +                #       change if we prefork and run arbitrary commands.)
> +                self.fork_one_request(conn, client_addr, command_argv, env)
> +                # We don't close the conn like other code paths, since we use
> +                # it again later.
> +                return
> +        else:
> +            self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
> +            # See [Decision #8]
> +            conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
> +        conn.close()

I find this logic around closing the connections slightly obtuse.  I
realize it would be longer but I think closing the connection or not
in each branch of the elif tree would be easier to follow.

> +class cmd_launchpad_forking_service(Command):
> +    """Launch a long-running process, where you can ask for new processes.
> +
> +    The process will block on a given --port waiting for requests to be made.

INET legacy again!

> +    When a request is made, it will fork itself and redirect stdout/in/err to
> +    fifos on the filesystem, and start running the requested command. The
> +    caller will be informed where those file handles can be found. Thus it only
> +    makes sense that the process connecting to the port must be on the same
> +    system.
> +    """
> +
> +    aliases = ['lp-service']
> +
> +    takes_options = [Option('path',
> +                        help='Listen for connections at PATH',
> +                        type=str),
> +                     Option('perms',
> +                        help='Set the mode bits for the socket, interpreted'
> +                             ' as an octal integer (same as chmod)'),
> +                     Option('preload',
> +                        help="Do/don't preload libraries before startup."),
> +                     Option('children-timeout', type=int,
> +                        help="Only wait XX seconds for children to exit"),

s/XX/this many/ ?

> +                    ]
> +
> +    def _preload_libraries(self):
> +        global libraries_to_preload

This line is unecesary.

> +        for pyname in libraries_to_preload:
> +            try:
> +                __import__(pyname)
> +            except ImportError, e:
> +                trace.mutter('failed to preload %s: %s' % (pyname, e))
> +
> +    def run(self, path=None, perms=None, preload=True,
> +            children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT):
> +        if path is None:
> +            path = LPForkingService.DEFAULT_PATH
> +        if perms is None:
> +            perms = LPForkingService.DEFAULT_PERMISSIONS
> +        if preload:
> +            # We 'note' this because it often takes a fair amount of time.
> +            trace.note('Preloading %d modules' % (len(libraries_to_preload),))
> +            self._preload_libraries()
> +        service = LPForkingService(path, perms)
> +        service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
> +        service.main_loop()
> +
> +register_command(cmd_launchpad_forking_service)
> +
> +
> +class cmd_launchpad_replay(Command):
> +    """Write input from stdin back to stdout or stderr.
> +
> +    This is a hidden command, primarily available for testing
> +    cmd_launchpad_forking_service.
> +    """
> +
> +    hidden = True
> +
> +    def run(self):
> +        # Just read line-by-line from stdin, and write out to stdout or stderr
> +        # depending on the prefix
> +        for line in sys.stdin:

In some situations

while True:
    line = sys.stdin.readline()
    if not line:
        break

will behave better wrt buffering.  But I guess what you have works for
what you need it for :-)

> +            channel, contents = line.split(' ', 1)
> +            channel = int(channel)
> +            if channel == 1:
> +                sys.stdout.write(contents)
> +                sys.stdout.flush()
> +            elif channel == 2:
> +                sys.stderr.write(contents)
> +                sys.stderr.flush()
> +            else:
> +                raise RuntimeError('Invalid channel request.')
> +        return 0
> +
> +register_command(cmd_launchpad_replay)
> +
> +# This list was generated by run lsprofing a spawned child, and looking for
> +# <module ...> times, which indicate an import occured. Other possibilities are
> +# to just run "bzr lp-serve --profile-imports" manually, and observe what was
> +# expensive to import. It doesn't seem very easy to get this right
> +# automatically.
> +libraries_to_preload = [
> +    'bzrlib.errors',
> +    'bzrlib.repofmt.groupcompress_repo',
> +    'bzrlib.repository',
> +    'bzrlib.smart',
> +    'bzrlib.smart.protocol',
> +    'bzrlib.smart.request',
> +    'bzrlib.smart.server',
> +    'bzrlib.smart.vfs',
> +    'bzrlib.transport.local',
> +    'bzrlib.transport.readonly',
> +    'lp.codehosting.bzrutils',
> +    'lp.codehosting.vfs',
> +    'lp.codehosting.vfs.branchfs',
> +    'lp.codehosting.vfs.branchfsclient',
> +    'lp.codehosting.vfs.hooks',
> +    'lp.codehosting.vfs.transport',
> +    ]
> +
> +
> +
> +def load_tests(standard_tests, module, loader):
> +    standard_tests.addTests(loader.loadTestsFromModuleNames(
> +        [__name__ + '.' + x for x in [
> +            'test_lpserve',
> +        ]]))
> +    return standard_tests

Is this actually necessary/useful in the launchpad tree?

> === added file 'bzrplugins/lpserve/test_lpserve.py'
> --- bzrplugins/lpserve/test_lpserve.py	1970-01-01 00:00:00 +0000
> +++ bzrplugins/lpserve/test_lpserve.py	2010-10-06 00:19:31 +0000
> @@ -0,0 +1,541 @@
> +# Copyright 2010 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +import os
> +import signal
> +import socket
> +import subprocess
> +import tempfile
> +import threading
> +import time
> +
> +from testtools import content
> +
> +from bzrlib import (
> +    osutils,
> +    tests,
> +    trace,
> +    )
> +from bzrlib.plugins import lpserve
> +
> +from canonical.config import config
> +from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess
> +
> +
> +class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
> +    """Wrap starting and stopping an LPForkingService instance in a thread."""

I think this class does a bit more than that, can you expand a bit?
For example, the way forking is nobbled.

> +    # For testing, we set the timeouts much lower, because we want the tests to
> +    # run quickly
> +    WAIT_FOR_CHILDREN_TIMEOUT = 0.5
> +    SOCKET_TIMEOUT = 0.01
> +    SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
> +    WAIT_FOR_REQUEST_TIMEOUT = 0.1

It's a shame you can't use a deterministic fake time source.  But
settimeout doesn't support that idea :(

> +    _fork_function = None

This is a bit surprising.  Do you just do this so that any attempt to
fork will blow up, rather than do strange things to the test suite?
If so, comment please.

> +    def __init__(self, path, perms=None):
> +        self.service_started = threading.Event()
> +        self.service_stopped = threading.Event()
> +        self.this_thread = None
> +        self.fork_log = []
> +        super(TestingLPForkingServiceInAThread, self).__init__(
> +            path=path, perms=None)
> +
> +    def _register_signals(self):
> +        pass # Don't register it for the test suite
> +
> +    def _unregister_signals(self):
> +        pass # We don't fork, and didn't register, so don't unregister
> +
> +    def _create_master_socket(self):
> +        super(TestingLPForkingServiceInAThread, self)._create_master_socket()
> +        self.service_started.set()
> +
> +    def main_loop(self):
> +        self.service_stopped.clear()
> +        super(TestingLPForkingServiceInAThread, self).main_loop()
> +        self.service_stopped.set()
> +
> +    def fork_one_request(self, conn, client_addr, command, env):
> +        # We intentionally don't allow the test suite to request a fork, as
> +        # threads + forks and everything else don't exactly play well together
> +        self.fork_log.append((command, env))
> +        conn.sendall('ok\nfake forking\n')
> +        conn.close()
> +
> +    @staticmethod
> +    def start_service(test):
> +        """Start a new LPForkingService in a thread at a random path.
> +
> +        This will block until the service has created its socket, and is ready
> +        to communicate.
> +
> +        :return: A new TestingLPForkingServiceInAThread instance
> +        """
> +        fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
> +                                    suffix='.sock')
> +        # We don't want a temp file, we want a temp socket
> +        os.close(fd)
> +        os.remove(path)
> +        new_service = TestingLPForkingServiceInAThread(path=path)
> +        thread = threading.Thread(target=new_service.main_loop,
> +                                  name='TestingLPForkingServiceInAThread')
> +        new_service.this_thread = thread
> +        # should we be doing thread.setDaemon(True) ?
> +        thread.start()
> +        new_service.service_started.wait(10.0)
> +        if not new_service.service_started.isSet():
> +            raise RuntimeError(
> +                'Failed to start the TestingLPForkingServiceInAThread')
> +        test.addCleanup(new_service.stop_service)
> +        # what about returning new_service._sockname ?
> +        return new_service
> +
> +    def stop_service(self):
> +        """Stop the test-server thread. This can be called multiple times."""
> +        if self.this_thread is None:
> +            # We already stopped the process
> +            return
> +        self._should_terminate.set()
> +        self.service_stopped.wait(10.0)
> +        if not self.service_stopped.isSet():
> +            raise RuntimeError(
> +                'Failed to stop the TestingLPForkingServiceInAThread')
> +        self.this_thread.join()
> +        # Break any refcycles
> +        self.this_thread = None
> +
> +
> +class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
> +
> +    def test_start_and_stop_service(self):
> +        service = TestingLPForkingServiceInAThread.start_service(self)
> +        service.stop_service()
> +
> +    def test_multiple_stops(self):
> +        service = TestingLPForkingServiceInAThread.start_service(self)
> +        service.stop_service()
> +        service.stop_service()

This test could do with an intent revealing comment ("after
stop_service is called, calling it again does nothing, silently"
maybe? -- if I've guessed the intent right)

> +    def test_autostop(self):
> +        # We shouldn't leak a thread here, as it should be part of the test
> +        # case teardown.
> +        service = TestingLPForkingServiceInAThread.start_service(self)
> +
> +
> +class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
> +
> +    def setUp(self):
> +        super(TestCaseWithLPForkingService, self).setUp()
> +        self.service = TestingLPForkingServiceInAThread.start_service(self)
> +
> +    def send_message_to_service(self, message, one_byte_at_a_time=False):
> +        client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> +        client_sock.connect(self.service.master_socket_path)
> +        if one_byte_at_a_time:
> +            for byte in message:
> +                client_sock.send(byte)
> +        else:
> +            client_sock.sendall(message)
> +        response = client_sock.recv(1024)
> +        return response
> +
> +
> +class TestLPForkingServiceCommandToArgv(tests.TestCase):
> +
> +    def assertAsArgv(self, argv, command_str):
> +        self.assertEqual(argv,
> +            lpserve.LPForkingService.command_to_argv(command_str))
> +
> +    def test_simple(self):
> +        self.assertAsArgv([u'foo'], 'foo')
> +        self.assertAsArgv([u'foo', u'bar'], 'foo bar')
> +
> +    def test_quoted(self):
> +        self.assertAsArgv([u'foo'], 'foo')
> +        self.assertAsArgv([u'foo bar'], '"foo bar"')
> +
> +    def test_unicode(self):
> +        self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
> +
> +
> +class TestLPForkingServiceParseEnv(tests.TestCase):
> +
> +    def assertEnv(self, env, env_str):
> +        self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
> +
> +    def assertInvalid(self, env_str):
> +        self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
> +                                      env_str)
> +
> +    def test_no_entries(self):
> +        self.assertEnv({}, 'end\n')
> +
> +    def test_one_entries(self):
> +        self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx'},
> +                       'BZR_EMAIL: joe@xxxxxxx\n'
> +                       'end\n')
> +
> +    def test_two_entries(self):
> +        self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx', 'BAR': 'foo'},
> +                       'BZR_EMAIL: joe@xxxxxxx\n'
> +                       'BAR: foo\n'
> +                       'end\n')
> +
> +    def test_invalid_empty(self):
> +        self.assertInvalid('')
> +
> +    def test_invalid_end(self):
> +        self.assertInvalid("BZR_EMAIL: joe@xxxxxxx\n")
> +
> +    def test_invalid_entry(self):
> +        self.assertInvalid("BZR_EMAIL joe@xxxxxxx\nend\n")
> +
> +
> +class TestLPForkingService(TestCaseWithLPForkingService):
> +
> +    def test_send_quit_message(self):
> +        response = self.send_message_to_service('quit\n')
> +        self.assertEqual('ok\nquit command requested... exiting\n', response)
> +        self.service.service_stopped.wait(10.0)
> +        self.assertTrue(self.service.service_stopped.isSet())
> +
> +    def test_send_invalid_message_fails(self):
> +        response = self.send_message_to_service('unknown\n')
> +        self.assertStartsWith(response, 'FAILURE')
> +
> +    def test_send_hello_heartbeat(self):
> +        response = self.send_message_to_service('hello\n')
> +        self.assertEqual('ok\nyep, still alive\n', response)
> +
> +    def test_hello_supports_crlf(self):
> +        # telnet seems to always write in text mode. It is nice to be able to
> +        # debug with simple telnet, so lets support it.

As above, do you need this?

> +        response = self.send_message_to_service('hello\r\n')
> +        self.assertEqual('ok\nyep, still alive\n', response)
> +
> +    def test_send_simple_fork(self):
> +        response = self.send_message_to_service('fork rocks\n')
> +        self.assertEqual('ok\nfake forking\n', response)
> +        self.assertEqual([(['rocks'], {})], self.service.fork_log)
> +
> +    def test_send_fork_env_with_empty_env(self):
> +        response = self.send_message_to_service(
> +            'fork-env rocks\n'
> +            'end\n')
> +        self.assertEqual('ok\nfake forking\n', response)
> +        self.assertEqual([(['rocks'], {})], self.service.fork_log)
> +
> +    def test_send_fork_env_with_env(self):
> +        response = self.send_message_to_service(
> +            'fork-env rocks\n'
> +            'BZR_EMAIL: joe@xxxxxxxxxxx\n'
> +            'end\n')
> +        self.assertEqual('ok\nfake forking\n', response)
> +        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
> +                         self.service.fork_log)
> +
> +    def test_send_fork_env_slowly(self):
> +        response = self.send_message_to_service(
> +            'fork-env rocks\n'
> +            'BZR_EMAIL: joe@xxxxxxxxxxx\n'
> +            'end\n', one_byte_at_a_time=True)
> +        self.assertEqual('ok\nfake forking\n', response)
> +        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
> +                         self.service.fork_log)
> +        # It should work with 'crlf' endings as well
> +        response = self.send_message_to_service(
> +            'fork-env rocks\r\n'
> +            'BZR_EMAIL: joe@xxxxxxxxxxx\r\n'
> +            'end\r\n', one_byte_at_a_time=True)
> +        self.assertEqual('ok\nfake forking\n', response)
> +        self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'}),
> +                          (['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
> +                         self.service.fork_log)
> +
> +    def test_send_incomplete_fork_env_timeout(self):
> +        # We should get a failure message if we can't quickly read the whole
> +        # content
> +        response = self.send_message_to_service(
> +            'fork-env rocks\n'
> +            'BZR_EMAIL: joe@xxxxxxxxxxx\n',
> +            one_byte_at_a_time=True)
> +        # Note that we *don't* send a final 'end\n'
> +        self.assertStartsWith(response, 'FAILURE\n')
> +
> +    def test_send_incomplete_request_timeout(self):
> +        # Requests end with '\n', send one without it
> +        response = self.send_message_to_service('hello',
> +                                                one_byte_at_a_time=True)
> +        self.assertStartsWith(response, 'FAILURE\n')
> +
> +
> +class TestCaseWithSubprocess(tests.TestCaseWithTransport):
> +    """Override the bzr start_bzr_subprocess command.
> +
> +    The launchpad infrastructure requires a fair amount of configuration to get
> +    paths, etc correct. So this provides that work.

provides ... what?  Missing word, I think.  And then I realize that
this probably isn't your typo :-)

> +    """
> +
> +    def get_python_path(self):
> +        """Return the path to the Python interpreter."""
> +        return '%s/bin/py' % config.root
> +
> +    def start_bzr_subprocess(self, process_args, env_changes=None,
> +                             working_dir=None):
> +        """Start bzr in a subprocess for testing.
> +
> +        Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
> +        This version removes some of the skipping stuff, some of the
> +        irrelevant comments (e.g. about win32) and uses Launchpad's own
> +        mechanisms for getting the path to 'bzr'.
> +
> +        Comments starting with 'LAUNCHPAD' are comments about our
> +        modifications.
> +        """
> +        if env_changes is None:
> +            env_changes = {}
> +        env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
> +        old_env = {}
> +
> +        def cleanup_environment():
> +            for env_var, value in env_changes.iteritems():
> +                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
> +
> +        def restore_environment():
> +            for env_var, value in old_env.iteritems():
> +                osutils.set_or_unset_env(env_var, value)
> +
> +        cwd = None
> +        if working_dir is not None:
> +            cwd = osutils.getcwd()
> +            os.chdir(working_dir)
> +
> +        # LAUNCHPAD: Because of buildout, we need to get a custom Python
> +        # binary, not sys.executable.
> +        python_path = self.get_python_path()
> +        # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
> +        # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
> +        bzr_path = get_bzr_path()
> +        try:
> +            cleanup_environment()
> +            command = [python_path, bzr_path]
> +            command.extend(process_args)
> +            process = self._popen(
> +                command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
> +                stderr=subprocess.PIPE)
> +        finally:
> +            restore_environment()
> +            if cwd is not None:
> +                os.chdir(cwd)
> +
> +        return process
> +
> +
> +class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
> +    """Tests will get a separate process to communicate to.
> +
> +    The number of these tests should be small, because it is expensive to start
> +    and stop the daemon.
> +
> +    TODO: This should probably use testresources, or layers somehow...
> +    """
> +
> +    def setUp(self):
> +        super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
> +        self.service_process, self.service_path = self.start_service_subprocess()
> +        self.addCleanup(self.stop_service)
> +
> +    def start_conversation(self, message, one_byte_at_a_time=False):
> +        """Start talking to the service, and get the initial response."""
> +        client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> +        trace.mutter('sending %r to socket %s' % (message, self.service_path))
> +        client_sock.connect(self.service_path)
> +        if one_byte_at_a_time:
> +            for byte in message:
> +                client_sock.send(byte)
> +        else:
> +            client_sock.sendall(message)
> +        response = client_sock.recv(1024)
> +        trace.mutter('response: %r' % (response,))
> +        if response.startswith("FAILURE"):
> +            raise RuntimeError('Failed to send message: %r' % (response,))
> +        return response, client_sock
> +
> +    def send_message_to_service(self, message, one_byte_at_a_time=False):
> +        response, client_sock = self.start_conversation(message,
> +            one_byte_at_a_time=one_byte_at_a_time)
> +        client_sock.close()
> +        return response
> +
> +    def send_fork_request(self, command, env=None):
> +        if env is not None:
> +            request_lines = ['fork-env %s\n' % (command,)]
> +            for key, value in env.iteritems():
> +                request_lines.append('%s: %s\n' % (key, value))
> +            request_lines.append('end\n')
> +            request = ''.join(request_lines)
> +        else:
> +            request = 'fork %s\n' % (command,)
> +        response, sock = self.start_conversation(request)
> +        ok, pid, path, tail = response.split('\n')
> +        self.assertEqual('ok', ok)
> +        self.assertEqual('', tail)
> +        # Don't really care what it is, but should be an integer
> +        pid = int(pid)
> +        path = path.strip()
> +        self.assertContainsRe(path, '/lp-forking-service-child-')
> +        return path, pid, sock
> +
> +    def start_service_subprocess(self):
> +        # Make sure this plugin is exposed to the subprocess
> +        # SLOOWWW (~2 seconds, which is why we are doing the work anyway)
> +        fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
> +        # I'm not 100% sure about when cleanup runs versus addDetail, but I
> +        # think this will work.
> +        self.addCleanup(os.remove, tempname)
> +        def read_log():
> +            f = os.fdopen(fd)
> +            f.seek(0)
> +            content = f.read()
> +            f.close()
> +            return [content]
> +        self.addDetail('server-log', content.Content(
> +            content.ContentType('text', 'plain', {"charset": "utf8"}),
> +            read_log))
> +        service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-',
> +                                            suffix='.sock')
> +        os.close(service_fd)
> +        os.remove(path) # service wants create it as a socket
> +        env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0],
> +                       'BZR_LOG': tempname}
> +        proc = self.start_bzr_subprocess(
> +            ['lp-service', '--path', path, '--no-preload',
> +             '--children-timeout=1'],
> +            env_changes=env_changes)
> +        trace.mutter('started lp-service subprocess')
> +        # preload_line = proc.stderr.readline()
> +        # self.assertStartsWith(preload_line, 'Preloading')

Do you want this or not?

> +        expected = 'Listening on socket: %s\n' % (path,)
> +        path_line = proc.stderr.readline()
> +        trace.mutter(path_line)
> +        self.assertEqual(expected, path_line)
> +        # The process won't delete it, so we do
> +        return proc, path
> +
> +    def stop_service(self):
> +        if self.service_process is None:
> +            # Already stopped
> +            return
> +        # First, try to stop the service gracefully, by sending a 'quit'
> +        # message
> +        try:
> +            response = self.send_message_to_service('quit\n')
> +        except socket.error, e:
> +            # Ignore a failure to connect, the service must be stopping/stopped
> +            # already
> +            response = None
> +        tend = time.time() + 10.0
> +        while self.service_process.poll() is None:
> +            if time.time() > tend:
> +                self.finish_bzr_subprocess(process=self.service_process,
> +                    send_signal=signal.SIGINT, retcode=3)
> +                self.fail('Failed to quit gracefully after 10.0 seconds')
> +            time.sleep(0.1)
> +        if response is not None:
> +            self.assertEqual('ok\nquit command requested... exiting\n',
> +                             response)
> +
> +    def _get_fork_handles(self, path):
> +        trace.mutter('getting handles for: %s' % (path,))
> +        stdin_path = os.path.join(path, 'stdin')
> +        stdout_path = os.path.join(path, 'stdout')
> +        stderr_path = os.path.join(path, 'stderr')
> +        # Consider the ordering, the other side should open 'stdin' first, but
> +        # we want it to block until we open the last one, or we race and it can
> +        # delete the other handles before we get to open them.

As above, on my reading of the man pages, this reasoning is bogus.

> +        child_stdin = open(stdin_path, 'wb')
> +        child_stdout = open(stdout_path, 'rb')
> +        child_stderr = open(stderr_path, 'rb')
> +        return child_stdin, child_stdout, child_stderr
> +
> +    def communicate_with_fork(self, path, stdin=None):
> +        child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
> +        if stdin is not None:
> +            child_stdin.write(stdin)
> +        child_stdin.close()
> +        stdout_content = child_stdout.read()
> +        stderr_content = child_stderr.read()
> +        return stdout_content, stderr_content
> +
> +    def assertReturnCode(self, expected_code, sock):
> +        """Assert that we get the expected return code as a message."""
> +        response = sock.recv(1024)
> +        self.assertStartsWith(response, 'exited\n')
> +        code = int(response.split('\n', 1)[1])
> +        self.assertEqual(expected_code, code)
> +
> +    def test_fork_lp_serve_hello(self):
> +        path, _, sock = self.send_fork_request('lp-serve --inet 2')
> +        stdout_content, stderr_content = self.communicate_with_fork(path,
> +            'hello\n')
> +        self.assertEqual('ok\x012\n', stdout_content)
> +        self.assertEqual('', stderr_content)
> +        self.assertReturnCode(0, sock)
> +
> +    def test_fork_replay(self):
> +        path, _, sock = self.send_fork_request('launchpad-replay')
> +        stdout_content, stderr_content = self.communicate_with_fork(path,
> +            '1 hello\n2 goodbye\n1 maybe\n')
> +        self.assertEqualDiff('hello\nmaybe\n', stdout_content)
> +        self.assertEqualDiff('goodbye\n', stderr_content)
> +        self.assertReturnCode(0, sock)
> +
> +    def test_just_run_service(self):
> +        # Start and stop are defined in setUp()
> +        pass
> +
> +    def test_fork_multiple_children(self):
> +        paths = []
> +        for idx in range(4):
> +            paths.append(self.send_fork_request('launchpad-replay'))
> +        # Do them out of order, as order shouldn't matter.
> +        for idx in [3, 2, 0, 1]:
> +            p, pid, sock = paths[idx]
> +            stdout_msg = 'hello %d\n' % (idx,)
> +            stderr_msg = 'goodbye %d\n' % (idx+1,)
> +            stdout, stderr = self.communicate_with_fork(p,
> +                '1 %s2 %s' % (stdout_msg, stderr_msg))
> +            self.assertEqualDiff(stdout_msg, stdout)
> +            self.assertEqualDiff(stderr_msg, stderr)
> +            self.assertReturnCode(0, sock)
> +
> +    def test_fork_respects_env_vars(self):
> +        path, pid, sock = self.send_fork_request('whoami',
> +            env={'BZR_EMAIL': 'this_test@xxxxxxxxxxx'})
> +        stdout_content, stderr_content = self.communicate_with_fork(path)
> +        self.assertEqual('', stderr_content)
> +        self.assertEqual('this_test@xxxxxxxxxxx\n', stdout_content)
> +
> +    def _check_exits_nicely(self, sig_id):
> +        path, _, sock = self.send_fork_request('rocks')
> +        self.assertEqual(None, self.service_process.poll())
> +        # Now when we send SIGTERM, it should wait for the child to exit,
> +        # before it tries to exit itself.
> +        # In python2.6+ we could use self.service_process.terminate()
> +        os.kill(self.service_process.pid, sig_id)
> +        self.assertEqual(None, self.service_process.poll())
> +        # Now talk to the child, so the service can close
> +        stdout_content, stderr_content = self.communicate_with_fork(path)
> +        self.assertEqual('It sure does!\n', stdout_content)
> +        self.assertEqual('', stderr_content)
> +        self.assertReturnCode(0, sock)
> +        # And the process should exit cleanly
> +        self.assertEqual(0, self.service_process.wait())
> +
> +    def test_sigterm_exits_nicely(self):
> +        self._check_exits_nicely(signal.SIGTERM)
> +
> +    def test_sigint_exits_nicely(self):
> +        self._check_exits_nicely(signal.SIGINT)

> === modified file 'configs/development/launchpad-lazr.conf'
> --- configs/development/launchpad-lazr.conf	2010-09-20 21:35:21 +0000
> +++ configs/development/launchpad-lazr.conf	2010-10-06 00:19:31 +0000
> @@ -76,6 +76,7 @@
>  lp_url_hosts: dev
>  access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log
>  blacklisted_hostnames:
> +use_forking_daemon: True
>
>  [codeimport]
>  bazaar_branch_store: file:///tmp/bazaar-branches

> === modified file 'lib/canonical/config/schema-lazr.conf'
> --- lib/canonical/config/schema-lazr.conf	2010-09-24 22:30:48 +0000
> +++ lib/canonical/config/schema-lazr.conf	2010-10-06 00:19:31 +0000
> @@ -301,6 +301,18 @@
>  # datatype: string
>  logfile: -
>
> +# The location of the log file used by the LaunchpadForkingService
> +# datatype: string
> +forker_logfile: -
> +
> +# Should we be using the forking daemon? Or should we be calling spawnProcess
> +# instead?
> +# datatype: boolean
> +use_forking_daemon: False
> +# What disk path will the daemon listen on
> +# datatype: string
> +forking_daemon_socket: /var/tmp/launchpad_forking_service.sock

I think I'd leave this defaulting to emptry or something invalid in
the schema.

>  # The prefix of the web URL for all public branches. This should end with a
>  # slash.
>  #

> === modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py'
> --- lib/canonical/launchpad/scripts/runlaunchpad.py	2010-09-17 20:46:58 +0000
> +++ lib/canonical/launchpad/scripts/runlaunchpad.py	2010-10-06 00:19:31 +0000
> @@ -184,6 +184,53 @@
>          stop_at_exit(process)
>
>
> +class ForkingSessionService(Service):
> +    """A lp-forking-service for handling ssh access."""
> +
> +    # TODO: The SFTP (and bzr+ssh) server depends fairly heavily on this

The /codehosting/ sftp &c.  We have others now :)

> +    #       service. It would seem reasonable to make one always start if the
> +    #       other one is started. Though this might be a way to "FeatureFlag"
> +    #       whether this is active or not.
> +
> +    @property
> +    def should_launch(self):
> +        return (config.codehosting.launch and
> +                config.codehosting.use_forking_daemon)
> +
> +    @property
> +    def logfile(self):
> +        """Return the log file to use.
> +
> +        Default to the value of the configuration key logfile.
> +        """
> +        return config.codehosting.forker_logfile
> +
> +    def launch(self):
> +        # Following the logic in TacFile. Specifically, if you configure sftp
> +        # to not run (and thus bzr+ssh) then we don't want to run the forking
> +        # service.
> +        if not self.should_launch:
> +            return
> +        from lp.codehosting import get_bzr_path
> +        command = [config.root + '/bin/py', get_bzr_path(),
> +                   'launchpad-forking-service',
> +                   '--path', config.codehosting.forking_daemon_socket,
> +                  ]
> +        env = dict(os.environ)
> +        env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins'
> +        logfile = self.logfile
> +        if logfile == '-':
> +            # This process uses a different logging infrastructure from the
> +            # rest of the Launchpad code. As such, it cannot trivially use '-'
> +            # as the logfile. So we just ignore this setting.
> +            pass
> +        else:
> +            env['BZR_LOG'] = logfile
> +        process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
> +        process.stdin.close()
> +        stop_at_exit(process)
> +
> +
>  def stop_at_exit(process):
>      """Create and register an atexit hook for killing a process.
>
> @@ -208,6 +255,9 @@
>      'librarian': TacFile('librarian', 'daemons/librarian.tac',
>                           'librarian_server', prepare_for_librarian),
>      'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
> +    # TODO, we probably need to run the forking service whenever somebody
> +    #       requests the sftp service...
> +    'forker': ForkingSessionService(),
>      'mailman': MailmanService(),
>      'codebrowse': CodebrowseService(),
>      'google-webservice': GoogleWebService(),

> === modified file 'lib/lp/codehosting/sshserver/session.py'
> --- lib/lp/codehosting/sshserver/session.py	2010-08-20 20:31:18 +0000
> +++ lib/lp/codehosting/sshserver/session.py	2010-10-06 00:19:31 +0000
> @@ -9,9 +9,20 @@
>      ]
>
>  import os
> +import signal
> +import socket
>  import urlparse
>
> -from twisted.internet.process import ProcessExitedAlready
> +from zope.event import notify
> +from zope.interface import implements
> +
> +from twisted.internet import (
> +    error,
> +    fdesc,
> +    interfaces,
> +    main,
> +    process,
> +    )
>  from twisted.python import log
>  from zope.event import notify
>
> @@ -35,6 +46,229 @@
>      """Raised when a session is asked to execute a forbidden command."""
>
>
> +class _WaitForExit(process.ProcessReader):

I don't really understand why this inherits from ProcessReader.  But
I'm 1600 lines into this review now, so maybe the problem is at my end
:-)

> +    """Wait on a socket for the exit status."""
> +
> +    def __init__(self, reactor, proc, sock):
> +        super(_WaitForExit, self).__init__(reactor, proc, 'exit',
> +                                           sock.fileno())
> +        self._sock = sock
> +        self.connected = 1
> +
> +    def close(self):
> +        self._sock.close()
> +
> +    def dataReceived(self, data):
> +        # TODO: how do we handle getting only *some* of the content?, Maybe we
> +        #       need to read more bytes first...

Yeah.  You want lineReceiver's logic really.

> +        # This is the only thing we do differently from the standard
> +        # ProcessReader. When we get data on this socket, we need to treat it
> +        # as a return code, or a failure.
> +        if not data.startswith('exited'):
> +            # Bad data, we want to signal that we are closing the connection
> +            # TODO: How?
> +            # self.proc.?

childConnectionLost ?

> +            self.close()
> +            # I don't know what to put here if we get bogus data, but I *do*
> +            # want to say that the process is now considered dead to me
> +            log.err('Got invalid exit information: %r' % (data,))
> +            exit_status = (255 << 8)
> +        else:
> +            exit_status = int(data.split('\n')[1])
> +        self.proc.processEnded(exit_status)
> +
> +
> +class ForkedProcessTransport(process.BaseProcess):
> +    """Wrap the forked process in a ProcessTransport so we can talk to it.
> +
> +    Note that instantiating the class creates the fork and sets it up in the
> +    reactor.
> +    """
> +
> +    implements(interfaces.IProcessTransport)
> +
> +    # Design decisions
> +    # [Decision #a]
> +    #   Inherit from process.BaseProcess
> +    #       This seems slightly risky, as process.BaseProcess is actually
> +    #       imported from twisted.internet._baseprocess.BaseProcess. The
> +    #       real-world Process then actually inherits from process._BaseProcess
> +    #       I've also had to copy a fair amount from the actual Process
> +    #       command.
> +    #       One option would be to inherit from process.Process, and just
> +    #       override stuff like __init__ and reapProcess which I don't want to
> +    #       do in the same way. (Is it ok not to call your Base classes
> +    #       __init__ if you don't want to do that exact work?)
> +
> +    def __init__(self, reactor, executable, args, environment, proto):
> +        process.BaseProcess.__init__(self, proto)
> +        # Map from standard file descriptor to the associated pipe
> +        self.pipes = {}
> +        pid, path, sock = self._spawn(executable, args, environment)
> +        self._fifo_path = path
> +        self.pid = pid
> +        self.process_sock = sock
> +        self._fifo_path = path
> +        self._connectSpawnToReactor(reactor)
> +        if self.proto is not None:
> +            self.proto.makeConnection(self)
> +
> +    def _sendMessageToService(self, message):
> +        """Send a message to the Forking service and get the response"""
> +        path = config.codehosting.forking_daemon_socket
> +        client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> +        log.msg('Connecting to Forking Service @ socket: %s for %r'
> +                % (path, message))
> +        try:
> +            client_sock.connect(path)
> +            client_sock.sendall(message)
> +            # We define the requests to be no bigger than 1kB. (For now)
> +            response = client_sock.recv(1024)
> +        except socket.error, e:
> +            # TODO: What exceptions should be raised?
> +            #       Raising the raw exception seems to kill the twisted reactor
> +            #       Note that if the connection is refused, we *could* just
> +            #       fall back on a regular 'spawnProcess' call.

What does the client see if the forking service is not running?

> +            log.err('Connection failed: %s' % (e,))
> +            raise
> +        if response.startswith("FAILURE"):
> +            raise RuntimeError('Failed to send message: %r' % (response,))
> +        return response, client_sock
> +
> +    def _spawn(self, executable, args, environment):
> +        assert executable == 'bzr', executable # Maybe .endswith()
> +        assert args[0] == 'bzr', args[0]
> +        command_str = ' '.join(args[1:])
> +        message = ['fork-env %s\n' % (' '.join(args[1:]),)]
> +        for key, value in environment.iteritems():
> +            # XXX: Currently we only pass BZR_EMAIL, should we be passing
> +            #      everything else? Note that many won't be handled properly,
> +            #      since the process is already running.
> +            if key != 'BZR_EMAIL':
> +                continue
> +            message.append('%s: %s\n' % (key, value))
> +        message.append('end\n')
> +        message = ''.join(message)
> +        response, sock = self._sendMessageToService(message)
> +        if response.startswith('FAILURE'):
> +            # TODO: Is there a better error to raise?
> +            raise RuntimeError("Failed while sending message to forking "
> +                "service. message: %r, failure: %r"
> +                % (message, response))
> +        ok, pid, path, tail = response.split('\n')
> +        assert ok == 'ok'
> +        assert tail == ''
> +        pid = int(pid)
> +        log.msg('Forking returned pid: %d, path: %s' % (pid, path))
> +        return pid, path, sock
> +
> +    def _connectSpawnToReactor(self, reactor):
> +        stdin_path = os.path.join(self._fifo_path, 'stdin')
> +        stdout_path = os.path.join(self._fifo_path, 'stdout')
> +        stderr_path = os.path.join(self._fifo_path, 'stderr')
> +        child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
> +        self.pipes[0] = process.ProcessWriter(reactor, self, 0, child_stdin_fd)
> +        child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
> +        # forceReadHack=True ? Used in process.py doesn't seem to be needed here
> +        self.pipes[1] = process.ProcessReader(reactor, self, 1, child_stdout_fd)
> +        child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
> +        self.pipes[2] = process.ProcessReader(reactor, self, 2, child_stderr_fd)
> +        # Note: _exiter forms a GC cycle, since it points to us, and we hold a
> +        #       reference to it
> +        self._exiter = _WaitForExit(reactor, self, self.process_sock)
> +        self.pipes['exit'] = self._exiter
> +
> +    def _getReason(self, status):
> +        # Copied from twisted.internet.process._BaseProcess
> +        exitCode = sig = None
> +        if os.WIFEXITED(status):
> +            exitCode = os.WEXITSTATUS(status)
> +        else:
> +            sig = os.WTERMSIG(status)
> +        if exitCode or sig:
> +            return error.ProcessTerminated(exitCode, sig, status)
> +        return error.ProcessDone(status)
> +
> +    def signalProcess(self, signalID):
> +        """
> +        Send the given signal C{signalID} to the process. It'll translate a
> +        few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
> +        representation to its int value, otherwise it'll pass directly the
> +        value provided
> +
> +        @type signalID: C{str} or C{int}
> +        """
> +        # Copied from twisted.internet.process._BaseProcess
> +        if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
> +            signalID = getattr(signal, 'SIG%s' % (signalID,))
> +        if self.pid is None:
> +            raise process.ProcessExitedAlready()
> +        os.kill(self.pid, signalID)
> +
> +    # Implemented because conch.ssh.session uses it, the Process implementation
> +    # ignores writes if channel '0' is not available
> +    def write(self, data):
> +        self.pipes[0].write(data)
> +
> +    def writeToChild(self, childFD, data):
> +        # Copied from twisted.internet.process.Process
> +        self.pipes[childFD].write(data)
> +
> +    def closeChildFD(self, childFD):
> +        if childFD in self.pipes:
> +            self.pipes[childFD].loseConnection()
> +
> +    def closeStdin(self):
> +        self.closeChildFD(0)
> +
> +    def closeStdout(self):
> +        self.closeChildFD(1)
> +
> +    def closeStderr(self):
> +        self.closeChildFD(2)
> +
> +    def loseConnection(self):
> +        self.closeStdin()
> +        self.closeStdout()
> +        self.closeStderr()
> +
> +    # Implemented because ProcessWriter/ProcessReader want to call it
> +    # Copied from twisted.internet.Process
> +    def childDataReceived(self, name, data):
> +        self.proto.childDataReceived(name, data)
> +
> +    # Implemented because ProcessWriter/ProcessReader want to call it
> +    # Copied from twisted.internet.Process
> +    def childConnectionLost(self, childFD, reason):
> +        close = getattr(self.pipes[childFD], 'close', None)
> +        if close is not None:
> +            close()
> +        else:
> +            os.close(self.pipes[childFD].fileno())
> +        del self.pipes[childFD]
> +        try:
> +            self.proto.childConnectionLost(childFD)
> +        except:
> +            log.err()
> +        self.maybeCallProcessEnded()
> +
> +    # Implemented because of childConnectionLost
> +    # Adapted from twisted.internet.Process
> +    # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
> +    #       point, but the daemon will be doing the reaping for us (we can't
> +    #       because the process isn't a direct child.)
> +    def maybeCallProcessEnded(self):
> +        if self.pipes:
> +            # Not done if we still have open pipes
> +            return
> +        if not self.lostProcess:
> +            return
> +        process.BaseProcess.maybeCallProcessEnded(self)
> +
> +    # pauseProducing, present in process.py, not a IProcessTransport interface

It's a shame there's so much copy & paste here.  Also, the use of
blocking code in a Twisted application always makes me uneasy, but I
don't know how serious a problem it is here.

>  class ExecOnlySession(DoNothingSession):
>      """Conch session that only allows executing commands."""
>
> @@ -58,7 +292,7 @@
>              notify(BazaarSSHClosed(self.avatar))
>              try:
>                  self._transport.signalProcess('HUP')
> -            except (OSError, ProcessExitedAlready):
> +            except (OSError, process.ProcessExitedAlready):
>                  pass
>              self._transport.loseConnection()
>
> @@ -81,8 +315,7 @@
>          except ForbiddenCommand, e:
>              self.errorWithMessage(protocol, str(e) + '\r\n')
>              return
> -        log.msg('Running: %r, %r, %r'
> -                % (executable, arguments, self.environment))
> +        log.msg('Running: %r, %r' % (executable, arguments))
>          if self._transport is not None:
>              log.err(
>                  "ERROR: %r already running a command on transport %r"
> @@ -91,8 +324,12 @@
>          # violation. Apart from this line and its twin, this class knows
>          # nothing about Bazaar.
>          notify(BazaarSSHStarted(self.avatar))
> -        self._transport = self.reactor.spawnProcess(
> -            protocol, executable, arguments, env=self.environment)
> +        self._transport = self._spawn(protocol, executable, arguments,
> +                                      env=self.environment)
> +
> +    def _spawn(self, protocol, executable, arguments, env):

I would really like to see this method grow a docstring.

> +        return self.reactor.spawnProcess(protocol, executable, arguments,
> +                                         env=env)
>
>      def getCommandToRun(self, command):
>          """Return the command that will actually be run given `command`.
> @@ -144,21 +381,62 @@
>              % {'user_id': self.avatar.user_id})
>
>
> +class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
> +    """Use the Forking Service instead of spawnProcess."""
> +
> +    def _simplifyEnvironment(self, env):
> +        """Pull out the bits of the environment we want to pass along."""
> +        env = {}
> +        for env_var in ['BZR_EMAIL']:
> +            if env_var in self.environment:
> +                env[env_var] = self.environment[env_var]
> +        return env
> +
> +    def getCommandToFork(self, executable, arguments, env):
> +        assert executable.endswith('/bin/py')
> +        assert arguments[0] == executable
> +        assert arguments[1].endswith('/bzr')
> +        executable = 'bzr'
> +        arguments = arguments[1:]
> +        arguments[0] = 'bzr'
> +        env = self._simplifyEnvironment(env)
> +        return executable, arguments, env
> +
> +    def _spawn(self, protocol, executable, arguments, env):
> +        # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr"
> +        # and the executable
> +        executable, arguments, env = self.getCommandToFork(executable,
> +                                                           arguments, env)
> +        return ForkedProcessTransport(self.reactor, executable,
> +                                      arguments, env, protocol)
> +
> +
>  def launch_smart_server(avatar):
>      from twisted.internet import reactor
>
> -    command = (
> -        "%(root)s/bin/py %(bzr)s lp-serve --inet "
> -        % {'root': config.root, 'bzr': get_bzr_path()})
> +    python_command = "%(root)s/bin/py %(bzr)s" % {
> +            'root': config.root, 'bzr': get_bzr_path()
> +            }
> +    args = " lp-serve --inet %(user_id)s"
> +    command = python_command + args
> +    forking_command = "bzr" + args
>
>      environment = dict(os.environ)
>
>      # Extract the hostname from the supermirror root config.
>      hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]
>      environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)
> -    return RestrictedExecOnlySession(
> +    klass = RestrictedExecOnlySession
> +    # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
> +    #       If the forking daemon has been spawned, then we can use it if the
> +    #       feature is set to true for the given user, etc.
> +    #       A global config is a good first step, but does require restarting
> +    #       the service to change the flag. Or does 'config' support SIGHUP?

To answer the question, no.

> +    if config.codehosting.use_forking_daemon:
> +        klass = ForkingRestrictedExecOnlySession

It's good that we can so simply disable the new feature, if it is
unstable in production.

> +    return klass(
>          avatar,
>          reactor,
>          'bzr serve --inet --directory=/ --allow-writes',
> -        command + ' %(user_id)s',
> +        command,
>          environment=environment)

> === modified file 'lib/lp/codehosting/sshserver/tests/test_session.py'
> --- lib/lp/codehosting/sshserver/tests/test_session.py	2010-08-20 20:31:18 +0000
> +++ lib/lp/codehosting/sshserver/tests/test_session.py	2010-10-06 00:19:31 +0000
> @@ -5,6 +5,7 @@
>
>  __metaclass__ = type
>
> +import socket
>  import unittest
>
>  from twisted.conch.interfaces import ISession
> @@ -21,7 +22,9 @@
>  from lp.codehosting.sshserver.session import (
>      ExecOnlySession,
>      ForbiddenCommand,
> +    ForkingRestrictedExecOnlySession,
>      RestrictedExecOnlySession,
> +    _WaitForExit,
>      )
>  from lp.codehosting.tests.helpers import AvatarTestCase
>
> @@ -40,6 +43,9 @@
>                           usePTY, childFDs))
>          return MockProcessTransport(executable)
>
> +    def addReader(self, reader):
> +        self.log.append(('addReader', reader))
> +
>
>  class MockSSHSession:
>      """Just enough of SSHSession to allow checking of reporting to stderr."""
> @@ -60,6 +66,7 @@
>          self._executable = executable
>          self.log = []
>          self.session = MockSSHSession(self.log)
> +        self.status = None
>
>      def closeStdin(self):
>          self.log.append(('closeStdin',))
> @@ -77,6 +84,43 @@
>      def write(self, data):
>          self.log.append(('write', data))
>
> +    def processEnded(self, status):
> +        self.log.append(('processEnded', status))
> +
> +
> +# TODO: What base *should* I be using?
> +class Test_WaitForExit(AvatarTestCase):

I think testtools.TestCase or lp.testing.TestCase is probably enough
for these tests.

> +    def setUp(self):
> +        AvatarTestCase.setUp(self)
> +        # self.avatar = CodehostingAvatar(self.aliceUserDict, None)
> +        # The logging system will try to get the id of avatar.transport, so
> +        # let's give it something to take the id of.
> +        # self.avatar.transport = object()
> +        self.reactor = MockReactor()
> +        self.proc = MockProcessTransport('executable')
> +        sock = socket.socket()
> +        self.exiter = _WaitForExit(self.reactor, self.proc, sock)
> +
> +    def test__init__starts_reading(self):
> +        self.assertEqual([('addReader', self.exiter)], self.reactor.log)
> +
> +    def test_dataReceived_ends_cleanly(self):
> +        self.exiter.dataReceived('exited\n0\n')
> +        self.assertEqual([('processEnded', 0)], self.proc.log)
> +
> +    def test_dataReceived_ends_with_errno(self):
> +        self.exiter.dataReceived('exited\n256\n')
> +        self.assertEqual([('processEnded', 256)], self.proc.log)
> +
> +    def test_dataReceived_bad_data(self):
> +        # Note: The dataReceived code calls 'log.err' which ends up getting
> +        #      printed during the test run. How do I suppress that or even
> +        #      better, check that it does so?
> +        #      self.flushLoggedErrors() doesn't seem to do anything.
> +        self.exiter.dataReceived('bogus\n')
> +        self.assertEqual([('processEnded', (255 << 8))], self.proc.log)
> +
>
>  class TestExecOnlySession(AvatarTestCase):
>      """Tests for ExecOnlySession.
> @@ -340,6 +384,35 @@
>          self.assertRaises(
>              ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
>
> +    def test_avatarAdaptsToOnlyRestrictedSession(self):
> +        config.push('codehosting-no-forking',
> +            "[codehosting]\nuse_forking_daemon: False\n")
> +        self.addCleanup(config.pop, 'codehosting-no-forking')

If you have lp.testing.TestCase in your test class hierarchy, you can
say self.pushConfig(...) instead of this.

> +        session = ISession(self.avatar)
> +        self.failIf(isinstance(session, ForkingRestrictedExecOnlySession),
> +            "ISession(avatar) shouldn't adapt to "
> +            " ForkingRestrictedExecOnlySession when forking is disabled. ")
> +
> +    def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
> +        # config.push('codehosting-forking',
> +        #     "[codehosting]\nuse_forking_daemon: True\n")
> +        # self.addCleanup(config.pop, 'codehosting-forking')

I think you can leave this here, to be super explicit.

> +        session = ISession(self.avatar)
> +        self.failUnless(
> +            isinstance(session, ForkingRestrictedExecOnlySession),
> +            "ISession(avatar) doesn't adapt to "
> +            " ForkingRestrictedExecOnlySession. "
> +            "Got %r instead." % (session,))
> +        executable, arguments = session.getCommandToRun(
> +            'bzr serve --inet --directory=/ --allow-writes')
> +        executable, arguments, env = session.getCommandToFork(
> +            executable, arguments, session.environment)
> +        self.assertEqual('bzr', executable)
> +        self.assertEqual(
> +             ['bzr', 'lp-serve',
> +              '--inet', str(self.avatar.user_id)],
> +             list(arguments))
> +
>
>  def test_suite():
>      return unittest.TestLoader().loadTestsFromName(__name__)

> === modified file 'lib/lp/codehosting/tests/test_lpserve.py'
> --- lib/lp/codehosting/tests/test_lpserve.py	2010-08-20 20:31:18 +0000
> +++ lib/lp/codehosting/tests/test_lpserve.py	2010-10-06 00:19:31 +0000
> @@ -1,4 +1,4 @@
> -# Copyright 2009 Canonical Ltd.  This software is licensed under the
> +# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
>  # GNU Affero General Public License version 3 (see the file LICENSE).
>
>  """Tests for the lp-serve plugin."""
> @@ -8,6 +8,7 @@
>  import os
>  import re
>  from subprocess import PIPE
> +import threading
>  import unittest
>
>  from bzrlib import (
> @@ -17,16 +18,13 @@
>  from bzrlib.smart import medium
>  from bzrlib.tests import TestCaseWithTransport
>  from bzrlib.transport import remote
> +from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess
>
>  from canonical.config import config
> -from lp.codehosting import (
> -    get_bzr_path,
> -    get_BZR_PLUGIN_PATH_for_subprocess,
> -    )
>  from lp.codehosting.bzrutils import make_error_utility
>
>
> -class TestLaunchpadServe(TestCaseWithTransport):
> +class TestLaunchpadServe(TestCaseWithSubprocess):
>      """Tests for the lp-serve plugin.
>
>      Most of the helper methods here are copied from bzrlib.tests and
> @@ -38,59 +36,6 @@
>          """Assert that a server process finished cleanly."""
>          self.assertEqual((0, '', ''), tuple(result))
>
> -    def get_python_path(self):
> -        """Return the path to the Python interpreter."""
> -        return '%s/bin/py' % config.root
> -
> -    def start_bzr_subprocess(self, process_args, env_changes=None,
> -                             working_dir=None):
> -        """Start bzr in a subprocess for testing.
> -
> -        Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
> -        This version removes some of the skipping stuff, some of the
> -        irrelevant comments (e.g. about win32) and uses Launchpad's own
> -        mechanisms for getting the path to 'bzr'.
> -
> -        Comments starting with 'LAUNCHPAD' are comments about our
> -        modifications.
> -        """
> -        if env_changes is None:
> -            env_changes = {}
> -        env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
> -        old_env = {}
> -
> -        def cleanup_environment():
> -            for env_var, value in env_changes.iteritems():
> -                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
> -
> -        def restore_environment():
> -            for env_var, value in old_env.iteritems():
> -                osutils.set_or_unset_env(env_var, value)
> -
> -        cwd = None
> -        if working_dir is not None:
> -            cwd = osutils.getcwd()
> -            os.chdir(working_dir)
> -
> -        # LAUNCHPAD: Because of buildout, we need to get a custom Python
> -        # binary, not sys.executable.
> -        python_path = self.get_python_path()
> -        # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
> -        # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
> -        bzr_path = get_bzr_path()
> -        try:
> -            cleanup_environment()
> -            command = [python_path, bzr_path]
> -            command.extend(process_args)
> -            process = self._popen(
> -                command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
> -        finally:
> -            restore_environment()
> -            if cwd is not None:
> -                os.chdir(cwd)
> -
> -        return process
> -
>      def finish_lpserve_subprocess(self, process):
>          """Shut down the server process.
>
> @@ -169,4 +114,10 @@
>
>
>  def test_suite():
> -    return unittest.TestLoader().loadTestsFromName(__name__)
> +    from bzrlib import tests
> +    from bzrlib.plugins import lpserve
> +
> +    loader = tests.TestLoader()
> +    suite = loader.loadTestsFromName(__name__)
> +    suite = lpserve.load_tests(suite, lpserve, loader)
> +    return suite

Ah ok, so load_tests does get called :-)

Cheers,
mwh

-- 
https://code.launchpad.net/~jameinel/launchpad/lp-service/+merge/37531
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/lp-service into lp:launchpad.



Follow ups

References