launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #01364
[Merge] lp:~jameinel/launchpad/lp-service into lp:launchpad
John A Meinel has proposed merging lp:~jameinel/launchpad/lp-service into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Retargetted from https://code.edge.launchpad.net/~jameinel/launchpad/lp-service/+merge/35877
because I accidentally started from db-devel in the beginning. This patch includes all suggestions from the original submission.
The goal of this submission is to improve the time for "bzr+ssh" to connect and be useful. The new method must be activated by setting:
[codehosting]
use_forking_server = True
It is set to be enabled in "development" mode, but the default is still disabled. I can't give a recommendation for the production config, because the branch is private.
This implements a new service (LaunchpadForkingService). It sits on a socket and waits for a request to 'fork <command>'. When received, it creates a stdin/stdout/stderr fifo on disk, and forks itself, running 'run_bzr_*(command)', rather than using 'exec()' which requires bootstrapping the python process.
The benefit is seen with: time echo hello | ssh localhost bzr serve --inet ...
Without this patch, it is 2.5s to serve on the loopback. With this patch, it is 0.25s.
I'm very happy to work with someone to smooth out the finer points of this submission. I tried to be explicit about places in the code that I had a decision to make, and why I chose the method I did.
I didn't use the FeatureFlag system, because setting it up via the config file was a lot more straightforward. (globally enabling/disabling the functionality).
As near as I can tell, there should be no impact of rolling this code out on production, if use_forking_daemon: False.
(The make run_codehosting will not actually spawn the daemon, and the twisted Conch code just gets an 'if ' check to see that it should use the old code path.)
I haven't run the full test suite, but I have:
1) Run all of the locally relevant tests "bzr selftest -s bt.lp_serve' and 'bin/test
lp.codehosting.sshserver
2) Manually started the sftp service and run commands through the local instance. Both with
and without the forking service enabled. (And I have confirmed that it isn't used when
disabled, and is used when enabled, etc.)
--
https://code.launchpad.net/~jameinel/launchpad/lp-service/+merge/37531
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/lp-service into lp:launchpad.
=== modified file 'Makefile'
--- Makefile 2010-09-07 18:15:01 +0000
+++ Makefile 2010-10-04 22:07:56 +0000
@@ -253,7 +253,7 @@
run_codehosting: check_schema inplace stop
$(RM) thread*.request
- bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
+ bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
start_librarian: compile
=== added directory 'bzrplugins/lpserve'
=== renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
--- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
+++ bzrplugins/lpserve/__init__.py 2010-10-04 22:07:56 +0000
@@ -8,15 +8,33 @@
__metaclass__ = type
-__all__ = ['cmd_launchpad_server']
-
-
+__all__ = ['cmd_launchpad_server',
+ 'cmd_launchpad_forking_service',
+ ]
+
+
+import errno
+import os
import resource
+import shlex
+import shutil
+import signal
+import socket
import sys
+import tempfile
+import threading
+import time
from bzrlib.commands import Command, register_command
from bzrlib.option import Option
-from bzrlib import lockdir, ui
+from bzrlib import (
+ commands,
+ errors,
+ lockdir,
+ osutils,
+ trace,
+ ui,
+ )
from bzrlib.smart import medium, server
from bzrlib.transport import get_transport
@@ -110,3 +128,724 @@
register_command(cmd_launchpad_server)
+
+
+class LPForkingService(object):
+ """A service that can be asked to start a new bzr subprocess via fork.
+
+ The basic idea is that python startup is very expensive. For example, the
+ original 'lp-serve' command could take 2.5s just to start up, before any
+ actual actions could be performed.
+
+ This class provides a service sitting on a socket, which can then be
+ requested to fork and run a given bzr command.
+
+ Clients connect to the socket and make a simple request, which then
+ receives a response. The possible requests are:
+
+ "hello\n": Trigger a heartbeat to report that the program is still
+ running, and write status information to the log file.
+ "quit\n": Stop the service, but do so 'nicely', waiting for children
+ to exit, etc. Once this is received the service will stop
+ taking new requests on the port.
+ "fork <command>\n": Request a new subprocess to be started.
+ <command> is the bzr command to be run, such as "rocks" or
+ "lp-serve --inet 12".
+ The immediate response will be the path-on-disk to a directory full
+ of named pipes (fifos) that will be the stdout/stderr/stdin of the
+ new process.
+ If a client holds the socket open, when the child process exits,
+ the exit status (as given by 'wait()') will be written to the
+ socket.
+
+ Note that one of the key bits is that the client will not be
+ started with exec*, we just call 'commands.run_bzr*()' directly.
+ This way, any modules that are already loaded will not need to be
+ loaded again. However, care must be taken with any global-state
+ that should be reset.
+ """
+
+ # Design decisions. These are bits where we could have chosen a different
+ # method/implementation and weren't sure what would be best. Documenting
+ # the current decision, and the alternatives.
+ #
+ # [Decision #1]
+ # Serve on a named AF_UNIX socket.
+ # 1) It doesn't make sense to serve to arbitrary hosts, we only want
+ # the local host to make requests. (Since the client needs to
+ # access the named fifos on the current filesystem.)
+ # 2) You can set security parameters on a filesystem path (g+rw,
+ # a-rw).
+ # [Decision #2]
+ # SIGCHLD
+ # We want to quickly detect that children have exited so that we can
+ # inform the client process quickly. At the moment, we register a
+ # SIGCHLD handler that doesn't do anything. However, it means that
+ # when we get the signal, if we are currently blocked in something
+ # like '.accept()', we will jump out temporarily. At that point the
+ # main loop will check if any children have exited. We could have
+ # done this work as part of the signal handler, but that felt 'racy'
+ # doing any serious work in a signal handler.
+ # If we just used socket.timeout as the indicator to go poll for
+ # children exiting, it slows the disconnect by as much as the full
+ # timeout. (So a timeout of 1.0s will cause the process to hang by
+ # that long until it determines that a child has exited, and can
+ # close the connection.)
+ # The current flow means that we'll notice exited children whenever
+ # we finish the current work.
+ # [Decision #3]
+ # Child vs Parent actions.
+ # There are several actions that are done when we get a new request.
+ # We have to create the fifos on disk, fork a new child, connect the
+ # child to those handles, and inform the client of the new path (not
+ # necessarily in that order.) It makes sense to wait to send the path
+ # message until after the fifos have been created. That way the
+ # client can just try to open them immediately, and the
+ # client-and-child will be synchronized by the open() calls.
+ # However, should the client be the one doing the mkfifo, should the
+ # server? Who should be sending the message? Should we fork after the
+ # mkfifo or before.
+ # The current thoughts:
+ # 1) Try to do work in the child when possible. This should allow
+ # for 'scaling' because the server is single-threaded.
+ # 2) We create the directory itself in the server, because that
+ # allows the server to monitor whether the client failed to
+ # clean up after itself or not.
+ # 3) Otherwise we create the fifos in the client, and then send
+ # the message back.
+ # [Decision #4]
+ # Exit information
+ # Inform the client that the child has exited on the socket they used
+ # to request the fork.
+ # 1) Arguably they could see that stdout and stderr have been closed,
+ # and thus stop reading. In testing, I wrote a client which uses
+ # select.poll() over stdin/stdout/stderr and used that to ferry
+ # the content to the appropriate local handle. However for the
+ # FIFOs, when the remote end closed, I wouldn't see any
+ # corresponding information on the local end. There obviously
+ # wasn't any data to be read, so they wouldn't show up as
+ # 'readable' (for me to try to read, and get 0 bytes, indicating
+ # it was closed). I also wasn't seeing POLLHUP, which seemed to be
+ # the correct indicator. As such, we decided to inform the client
+ # on the socket that they originally made the fork request, rather
+ # than just closing the socket immediately.
+ # 2) We could have had the forking server close the socket, and only
+ # the child hold the socket open. When the child exits, then the
+ # OS naturally closes the socket.
+ # If we want the returncode, then we should put that as bytes on
+ # the socket before we exit. Having the child do the work means
+ # that in error conditions, it could easily die before being able to
+ # write anything (think SEGFAULT, etc). The forking server is
+ # already 'wait'() ing on its children. So that we don't get
+ # zombies, and with wait3() we can get the rusage (user time,
+ # memory consumption, etc.)
+ # As such, it seems reasonable that the server can then also
+ # report back when a child is seen as exiting.
+ # [Decision #5]
+ # cleanup once connected
+ # The child process blocks during 'open()' waiting for the client to
+ # connect to its fifos. Once the client has connected, the child then
+ # deletes the temporary directory and the fifos from disk. This means
+ # that there isn't much left for diagnosis, but it also means that
+ # the client won't leave garbage around if it crashes, etc.
+ # Note that the forking service itself still monitors the paths
+ # created, and will delete garbage if it sees that a child failed to
+ # do so.
+ # [Decision #6]
+ # os._exit(retcode) in the child
+ # Calling sys.exit(retcode) raises an exception, which then bubbles
+ # up the stack and runs exit functions (and finally statements). When
+ # I tried using it originally, I would see the current child bubble
+ # all the way up the stack (through the server code that it fork()
+ # through), and then get to main() returning code 0. The process
+ # would still exit nonzero. My guess is that something in the atexit
+ # functions was failing, but that it was happening after logging, etc
+ # had been shut down.
+ # Any global state from the child process should be flushed before
+ # run_bzr_* has exited (which we *do* wait for), and any other global
+ # state is probably a remnant from the service process. Which will be
+ # cleaned up by the service itself, rather than the child.
+ # There is some concern that log files may not get flushed, so we
+ # currently call sys.exitfunc() first. The main problem is that I
+ # don't know any way to *remove* a function registered via 'atexit()'
+ # so if the forking service has some state, we my try to clean it up
+ # incorrectly.
+ # Note that the bzr script itself uses sys.exitfunc(); os._exit() in
+ # the 'bzr' main script, as the teardown time of all the python state
+ # was quite noticeable in real-world runtime. As such, bzrlib should
+ # be pretty safe, or it would have been failing for people already.
+ # [Decision #7]
+ # prefork vs max children vs ?
+ # For simplicity it seemed easiest to just fork when requested. Over
+ # time, I realized it would be easy to allow running an arbitrary
+ # command (no harder than just running one command), so it seemed
+ # reasonable to switch over. If we go the prefork route, then we'll
+ # need a way to tell the pre-forked children what command to run.
+ # This could be as easy as just adding one more fifo that they wait
+ # on in the same directory.
+ # For now, I've chosen not to limit the number of forked children. I
+ # don't know what a reasonable value is, and probably there are
+ # already limitations at play. (If Conch limits connections, then it
+ # will already be doing all the work, etc.)
+ # [Decision #8]
+ # nicer errors on the request socket
+ # This service is meant to be run only on the local system. As such,
+ # we don't try to be extra defensive about leaking information to
+ # the one connecting to the socket. (We should still watch out what
+ # we send across the per-child fifos, since those are connected to
+ # remote clients.) Instead we try to be helpful, and tell them as
+ # much as we know about what went wrong.
+
+ DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
+ DEFAULT_PERMISSIONS = 00660 # Permissions on the master socket (rw-rw----)
+ WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children
+ SOCKET_TIMEOUT = 1.0
+ SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
+ WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
+ # be read
+
+ _fork_function = os.fork
+
+ def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS):
+ self.master_socket_path = path
+ self._perms = perms
+ self._start_time = time.time()
+ self._should_terminate = threading.Event()
+ # We address these locally, in case of shutdown socket may be gc'd
+ # before we are
+ self._socket_timeout = socket.timeout
+ self._socket_error = socket.error
+ self._socket_timeout = socket.timeout
+ self._socket_error = socket.error
+ # Map from pid => information
+ self._child_processes = {}
+ self._children_spawned = 0
+
+ def _create_master_socket(self):
+ self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self._server_socket.bind(self.master_socket_path)
+ if self._perms is not None:
+ os.chmod(self.master_socket_path, self._perms)
+ self._server_socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ self._sockname = self._server_socket.getsockname()
+ # self.host = self._sockname[0]
+ self.port = self._sockname[1]
+ self._server_socket.listen(5)
+ self._server_socket.settimeout(self.SOCKET_TIMEOUT)
+ trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
+
+ def _cleanup_master_socket(self):
+ self._server_socket.close()
+ try:
+ os.remove(self.master_socket_path)
+ except (OSError, IOError), e:
+ # If we don't delete it, then we get 'address already in
+ # use' failures
+ trace.mutter('failed to cleanup: %s'
+ % (self.master_socket_path,))
+
+ def _handle_sigchld(self, signum, frm):
+ # We don't actually do anything here, we just want an interrupt (EINTR)
+ # on socket.accept() when SIGCHLD occurs.
+ pass
+
+ def _handle_sigterm(self, signum, frm):
+ # Unregister this as the default handler, 2 SIGTERMs will exit us.
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ # SIGTERM should also generate EINTR on our wait loop, so this should
+ # be enough
+ self._should_terminate.set()
+
+ def _register_signals(self):
+ """Register a SIGCHILD and SIGTERM handler.
+
+ If we have a trigger for SIGCHILD then we can quickly respond to
+ clients when their process exits. The main risk is getting more EAGAIN
+ errors elsewhere.
+
+ SIGTERM allows us to cleanup nicely before we exit.
+ """
+ signal.signal(signal.SIGCHLD, self._handle_sigchld)
+ signal.signal(signal.SIGTERM, self._handle_sigterm)
+
+ def _unregister_signals(self):
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+ def _create_child_file_descriptors(self, base_path):
+ stdin_path = os.path.join(base_path, 'stdin')
+ stdout_path = os.path.join(base_path, 'stdout')
+ stderr_path = os.path.join(base_path, 'stderr')
+ os.mkfifo(stdin_path)
+ os.mkfifo(stdout_path)
+ os.mkfifo(stderr_path)
+
+ def _bind_child_file_descriptors(self, base_path):
+ import logging
+ from bzrlib import ui
+ stdin_path = os.path.join(base_path, 'stdin')
+ stdout_path = os.path.join(base_path, 'stdout')
+ stderr_path = os.path.join(base_path, 'stderr')
+ # Opening for writing blocks (or fails), so do those last
+ # TODO: Consider buffering, though that might interfere with reading
+ # and writing the smart protocol
+ stdin_fid = os.open(stdin_path, os.O_RDONLY)
+ stdout_fid = os.open(stdout_path, os.O_WRONLY)
+ stderr_fid = os.open(stderr_path, os.O_WRONLY)
+ # Note: by this point bzrlib has opened stderr for logging
+ # (as part of starting the service process in the first place). As
+ # such, it has a stream handler that writes to stderr. logging
+ # tries to flush and close that, but the file is already closed.
+ # This just supresses that exception
+ logging.raiseExceptions = False
+ sys.stdin.close()
+ sys.stdout.close()
+ sys.stderr.close()
+ os.dup2(stdin_fid, 0)
+ os.dup2(stdout_fid, 1)
+ os.dup2(stderr_fid, 2)
+ sys.stdin = os.fdopen(stdin_fid, 'rb')
+ sys.stdout = os.fdopen(stdout_fid, 'wb')
+ sys.stderr = os.fdopen(stderr_fid, 'wb')
+ ui.ui_factory.stdin = sys.stdin
+ ui.ui_factory.stdout = sys.stdout
+ ui.ui_factory.stderr = sys.stderr
+ # Now that we've opened the handles, delete everything so that we don't
+ # leave garbage around. Because the open() is done in blocking mode, we
+ # know that someone has already connected to them, and we don't want
+ # anyone else getting confused and connecting.
+ # See [Decision #5]
+ os.remove(stderr_path)
+ os.remove(stdout_path)
+ os.remove(stdin_path)
+ os.rmdir(base_path)
+
+ def _close_child_file_descriptons(self):
+ sys.stdin.close()
+ sys.stderr.close()
+ sys.stdout.close()
+
+ def become_child(self, command_argv, path):
+ """We are in the spawned child code, do our magic voodoo."""
+ # Stop tracking new signals
+ self._unregister_signals()
+ # Reset the start time
+ trace._bzr_log_start_time = time.time()
+ trace.mutter('%d starting %r'
+ % (os.getpid(), command_argv,))
+ self.host = None
+ self.port = None
+ self._sockname = None
+ self._bind_child_file_descriptors(path)
+ self._run_child_command(command_argv)
+
+ def _run_child_command(self, command_argv):
+ # This is the point where we would actually want to do something with
+ # our life
+ # TODO: We may want to consider special-casing the 'lp-serve' command.
+ # As that is the primary use-case for this service, it might be
+ # interesting to have an already-instantiated instance, where we
+ # can just pop on an extra argument and be ready to go. However,
+ # that would probably only really be measurable if we prefork. As
+ # it looks like ~200ms is 'fork()' time, but only 50ms is
+ # run-the-command time.
+ retcode = commands.run_bzr_catch_errors(command_argv)
+ self._close_child_file_descriptons()
+ trace.mutter('%d finished %r'
+ % (os.getpid(), command_argv,))
+ # We force os._exit() here, because we don't want to unwind the stack,
+ # which has complex results. (We can get it to unwind back to the
+ # cmd_launchpad_forking_service code, and even back to main() reporting
+ # thereturn code, but after that, suddenly the return code changes from
+ # a '0' to a '1', with no logging of info.
+ # TODO: Should we call sys.exitfunc() here? it allows atexit functions
+ # to fire, however, some of those may be still around from the
+ # parent process, which we don't really want.
+ sys.exitfunc()
+ # See [Decision #6]
+ os._exit(retcode)
+
+ @staticmethod
+ def command_to_argv(command_str):
+ """Convert a 'foo bar' style command to [u'foo', u'bar']"""
+ # command_str must be a utf-8 string
+ return [s.decode('utf-8') for s in shlex.split(command_str)]
+
+ @staticmethod
+ def parse_env(env_str):
+ """Convert the environment information into a dict.
+
+ :param env_str: A string full of environment variable declarations.
+ Each key is simple ascii "key: value\n"
+ The string must end with "end\n".
+ :return: A dict of environment variables
+ """
+ env = {}
+ if not env_str.endswith('end\n'):
+ raise ValueError('Invalid env-str: %r' % (env_str,))
+ env_str = env_str[:-5]
+ if not env_str:
+ return env
+ env_entries = env_str.split('\n')
+ for entry in env_entries:
+ key, value = entry.split(': ', 1)
+ env[key] = value
+ return env
+
+ def fork_one_request(self, conn, client_addr, command_argv, env):
+ """Fork myself and serve a request."""
+ temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
+ # Now that we've set everything up, send the response to the client we
+ # create them first, so the client can start trying to connect to them,
+ # while we fork and have the child do the same.
+ self._children_spawned += 1
+ pid = self._fork_function()
+ if pid == 0:
+ pid = os.getpid()
+ trace.mutter('%d spawned' % (pid,))
+ self._server_socket.close()
+ for env_var, value in env.iteritems():
+ osutils.set_or_unset_env(env_var, value)
+ # See [Decision #3]
+ self._create_child_file_descriptors(temp_name)
+ conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
+ conn.close()
+ self.become_child(command_argv, temp_name)
+ trace.warning('become_child returned!!!')
+ sys.exit(1)
+ else:
+ self._child_processes[pid] = (temp_name, conn)
+ self.log(client_addr, 'Spawned process %s for %r: %s'
+ % (pid, command_argv, temp_name))
+
+ def main_loop(self):
+ self._should_terminate.clear()
+ self._register_signals()
+ self._create_master_socket()
+ trace.note('Listening on socket: %s' % (self.master_socket_path,))
+ try:
+ try:
+ self._do_loop()
+ finally:
+ # Stop talking to others, we are shutting down
+ self._cleanup_master_socket()
+ except KeyboardInterrupt:
+ # SIGINT received, try to shutdown cleanly
+ pass
+ trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
+ % (self.WAIT_FOR_CHILDREN_TIMEOUT,
+ len(self._child_processes),))
+ self._shutdown_children()
+ trace.note('Exiting')
+
+ def _do_loop(self):
+ while not self._should_terminate.isSet():
+ try:
+ conn, client_addr = self._server_socket.accept()
+ except self._socket_timeout:
+ pass # run shutdown and children checks
+ except self._socket_error, e:
+ if e.args[0] == errno.EINTR:
+ pass # run shutdown and children checks
+ elif e.args[0] != errno.EBADF:
+ # We can get EBADF here while we are shutting down
+ # So we just ignore it for now
+ pass
+ else:
+ # Log any other failure mode
+ trace.warning("listening socket error: %s", e)
+ else:
+ self.log(client_addr, 'connected')
+ # TODO: We should probably trap exceptions coming out of this
+ # and log them, so that we don't kill the service because
+ # of an unhandled error
+ # Note: settimeout is used so that a malformed request doesn't
+ # cause us to hang forever. Note that the particular
+ # implementation means that a malicious client could
+ # probably send us one byte every Xms, and we would just
+ # keep trying to read it. However, as a local service, we
+ # aren't worrying about it.
+ conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
+ try:
+ self.serve_one_connection(conn, client_addr)
+ except self._socket_timeout, e:
+ trace.log_exception_quietly()
+ self.log(client_addr, 'request timeout failure: %s' % (e,))
+ conn.sendall('FAILURE\nrequest timed out\n')
+ conn.close()
+ self._poll_children()
+
+ def log(self, client_addr, message):
+ """Log a message to the trace log.
+
+ Include the information about what connection is being served.
+ """
+ if client_addr is not None:
+ # Note, we don't use conn.getpeername() because if a client
+ # disconnects before we get here, that raises an exception
+ conn_info = '[%s] ' % (client_addr,)
+ else:
+ conn_info = ''
+ trace.mutter('%s%s' % (conn_info, message))
+
+ def log_information(self):
+ """Log the status information.
+
+ This includes stuff like number of children, and ... ?
+ """
+ self._poll_children()
+ self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
+ self.log(None, '%d children currently running (spawned %d total)'
+ % (len(self._child_processes), self._children_spawned))
+ # Read the current information about memory consumption, etc.
+ self.log(None, 'Self: %s'
+ % (resource.getrusage(resource.RUSAGE_SELF),))
+ # This seems to be the sum of all rusage for all children that have
+ # been collected (not for currently running children, or ones we
+ # haven't "wait"ed on.) We may want to read /proc/PID/status, since
+ # 'live' information is probably more useful.
+ self.log(None, 'Finished children: %s'
+ % (resource.getrusage(resource.RUSAGE_CHILDREN),))
+
+ def _poll_children(self):
+ """See if children are still running, etc.
+
+ One interesting hook here would be to track memory consumption, etc.
+ """
+ while self._child_processes:
+ try:
+ c_id, exit_code, rusage = os.wait3(os.WNOHANG)
+ except OSError, e:
+ if e.errno == errno.ECHILD:
+ # TODO: We handle this right now because the test suite
+ # fakes a child, since we wanted to test some code
+ # without actually forking anything
+ trace.mutter('_poll_children() called, and'
+ ' self._child_processes indicates there are'
+ ' children, but os.wait3() says there are not.'
+ ' current_children: %s' % (self._child_processes,))
+ return
+ if c_id == 0:
+ # No more children stopped right now
+ return
+ c_path, sock = self._child_processes.pop(c_id)
+ trace.mutter('%s exited %s and usage: %s'
+ % (c_id, exit_code, rusage))
+ # See [Decision #4]
+ try:
+ sock.sendall('exited\n%s\n' % (exit_code,))
+ except (self._socket_timeout, self._socket_error), e:
+ # The client disconnected before we wanted them to,
+ # no big deal
+ trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
+ else:
+ sock.close()
+ if os.path.exists(c_path):
+ # The child failed to cleanup after itself, do the work here
+ trace.warning('Had to clean up after child %d: %s\n'
+ % (c_id, c_path))
+ shutil.rmtree(c_path, ignore_errors=True)
+
+ def _wait_for_children(self, secs):
+ start = time.time()
+ end = start + secs
+ while self._child_processes:
+ self._poll_children()
+ if secs > 0 and time.time() > end:
+ break
+ time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+
+ def _shutdown_children(self):
+ self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
+ if self._child_processes:
+ trace.warning('Failed to stop children: %s'
+ % ', '.join(map(str, self._child_processes)))
+ for c_id in self._child_processes:
+ trace.warning('sending SIGINT to %d' % (c_id,))
+ os.kill(c_id, signal.SIGINT)
+ # We sent the SIGINT signal, see if they exited
+ self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+ if self._child_processes:
+ # No? Then maybe something more powerful
+ for c_id in self._child_processes:
+ trace.warning('sending SIGKILL to %d' % (c_id,))
+ os.kill(c_id, signal.SIGKILL)
+ # We sent the SIGKILL signal, see if they exited
+ self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
+ if self._child_processes:
+ for c_id, (c_path, sock) in self._child_processes.iteritems():
+ # TODO: We should probably put something into this message?
+ # However, the likelyhood is very small that this isn't
+ # already closed because of SIGKILL + _wait_for_children
+ # And I don't really know what to say...
+ sock.close()
+ if os.path.exists(c_path):
+ trace.warning('Cleaning up after immortal child %d: %s\n'
+ % (c_id, c_path))
+ shutil.rmtree(c_path)
+
+ def _parse_fork_request(self, conn, client_addr, request):
+ if request.startswith('fork-env '):
+ while not request.endswith('end\n'):
+ request += osutils.read_bytes_from_socket(conn)
+ request = request.replace('\r\n', '\n')
+ command, env = request[9:].split('\n', 1)
+ else:
+ command = request[5:].strip()
+ env = 'end\n' # No env set
+ try:
+ command_argv = self.command_to_argv(command)
+ env = self.parse_env(env)
+ except Exception, e:
+ # TODO: Log the traceback?
+ self.log(client_addr, 'command or env parsing failed: %r'
+ % (str(e),))
+ conn.sendall('FAILURE\ncommand or env parsing failed: %r'
+ % (str(e),))
+ else:
+ return command_argv, env
+ return None, None
+
+ def serve_one_connection(self, conn, client_addr):
+ request = ''
+ while '\n' not in request:
+ request += osutils.read_bytes_from_socket(conn)
+ # telnet likes to use '\r\n' rather than '\n', and it is nice to have
+ # an easy way to debug.
+ request = request.replace('\r\n', '\n')
+ self.log(client_addr, 'request: %r' % (request,))
+ if request == 'hello\n':
+ conn.sendall('ok\nyep, still alive\n')
+ self.log_information()
+ elif request == 'quit\n':
+ self._should_terminate.set()
+ conn.sendall('ok\nquit command requested... exiting\n')
+ elif request.startswith('fork ') or request.startswith('fork-env '):
+ command_argv, env = self._parse_fork_request(conn, client_addr,
+ request)
+ if command_argv is not None:
+ # See [Decision #7]
+ # TODO: Do we want to limit the number of children? And/or
+ # prefork additional instances? (the design will need to
+ # change if we prefork and run arbitrary commands.)
+ self.fork_one_request(conn, client_addr, command_argv, env)
+ # We don't close the conn like other code paths, since we use
+ # it again later.
+ return
+ else:
+ self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
+ # See [Decision #8]
+ conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
+ conn.close()
+
+
+class cmd_launchpad_forking_service(Command):
+ """Launch a long-running process, where you can ask for new processes.
+
+ The process will block on a given --port waiting for requests to be made.
+ When a request is made, it will fork itself and redirect stdout/in/err to
+ fifos on the filesystem, and start running the requested command. The
+ caller will be informed where those file handles can be found. Thus it only
+ makes sense that the process connecting to the port must be on the same
+ system.
+ """
+
+ aliases = ['lp-service']
+
+ takes_options = [Option('path',
+ help='Listen for connections at PATH',
+ type=str),
+ Option('perms',
+ help='Set the mode bits for the socket, interpreted'
+ ' as an octal integer (same as chmod)'),
+ Option('preload',
+ help="Do/don't preload libraries before startup."),
+ Option('children-timeout', type=int,
+ help="Only wait XX seconds for children to exit"),
+ ]
+
+ def _preload_libraries(self):
+ global libraries_to_preload
+ for pyname in libraries_to_preload:
+ try:
+ __import__(pyname)
+ except ImportError, e:
+ trace.mutter('failed to preload %s: %s' % (pyname, e))
+
+ def run(self, path=None, perms=None, preload=True,
+ children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT):
+ if path is None:
+ path = LPForkingService.DEFAULT_PATH
+ if perms is None:
+ perms = LPForkingService.DEFAULT_PERMISSIONS
+ if preload:
+ # We 'note' this because it often takes a fair amount of time.
+ trace.note('Preloading %d modules' % (len(libraries_to_preload),))
+ self._preload_libraries()
+ service = LPForkingService(path, perms)
+ service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
+ service.main_loop()
+
+register_command(cmd_launchpad_forking_service)
+
+
+class cmd_launchpad_replay(Command):
+ """Write input from stdin back to stdout or stderr.
+
+ This is a hidden command, primarily available for testing
+ cmd_launchpad_forking_service.
+ """
+
+ hidden = True
+
+ def run(self):
+ # Just read line-by-line from stdin, and write out to stdout or stderr
+ # depending on the prefix
+ for line in sys.stdin:
+ channel, contents = line.split(' ', 1)
+ channel = int(channel)
+ if channel == 1:
+ sys.stdout.write(contents)
+ sys.stdout.flush()
+ elif channel == 2:
+ sys.stderr.write(contents)
+ sys.stderr.flush()
+ else:
+ raise RuntimeError('Invalid channel request.')
+ return 0
+
+register_command(cmd_launchpad_replay)
+
+# This list was generated by run lsprofing a spawned child, and looking for
+# <module ...> times, which indicate an import occured. Other possibilities are
+# to just run "bzr lp-serve --profile-imports" manually, and observe what was
+# expensive to import. It doesn't seem very easy to get this right
+# automatically.
+libraries_to_preload = [
+ 'bzrlib.errors',
+ 'bzrlib.repofmt.groupcompress_repo',
+ 'bzrlib.repository',
+ 'bzrlib.smart',
+ 'bzrlib.smart.protocol',
+ 'bzrlib.smart.request',
+ 'bzrlib.smart.server',
+ 'bzrlib.smart.vfs',
+ 'bzrlib.transport.local',
+ 'bzrlib.transport.readonly',
+ 'lp.codehosting.bzrutils',
+ 'lp.codehosting.vfs',
+ 'lp.codehosting.vfs.branchfs',
+ 'lp.codehosting.vfs.branchfsclient',
+ 'lp.codehosting.vfs.hooks',
+ 'lp.codehosting.vfs.transport',
+ ]
+
+
+
+def load_tests(standard_tests, module, loader):
+ standard_tests.addTests(loader.loadTestsFromModuleNames(
+ [__name__ + '.' + x for x in [
+ 'test_lpserve',
+ ]]))
+ return standard_tests
=== added file 'bzrplugins/lpserve/test_lpserve.py'
--- bzrplugins/lpserve/test_lpserve.py 1970-01-01 00:00:00 +0000
+++ bzrplugins/lpserve/test_lpserve.py 2010-10-04 22:07:56 +0000
@@ -0,0 +1,541 @@
+# Copyright 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import os
+import signal
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+
+from testtools import content
+
+from bzrlib import (
+ osutils,
+ tests,
+ trace,
+ )
+from bzrlib.plugins import lpserve
+
+from canonical.config import config
+from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess
+
+
+class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
+ """Wrap starting and stopping an LPForkingService instance in a thread."""
+
+ # For testing, we set the timeouts much lower, because we want the tests to
+ # run quickly
+ WAIT_FOR_CHILDREN_TIMEOUT = 0.5
+ SOCKET_TIMEOUT = 0.01
+ SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
+ WAIT_FOR_REQUEST_TIMEOUT = 0.1
+
+ _fork_function = None
+
+ def __init__(self, path, perms=None):
+ self.service_started = threading.Event()
+ self.service_stopped = threading.Event()
+ self.this_thread = None
+ self.fork_log = []
+ super(TestingLPForkingServiceInAThread, self).__init__(
+ path=path, perms=None)
+
+ def _register_signals(self):
+ pass # Don't register it for the test suite
+
+ def _unregister_signals(self):
+ pass # We don't fork, and didn't register, so don't unregister
+
+ def _create_master_socket(self):
+ super(TestingLPForkingServiceInAThread, self)._create_master_socket()
+ self.service_started.set()
+
+ def main_loop(self):
+ self.service_stopped.clear()
+ super(TestingLPForkingServiceInAThread, self).main_loop()
+ self.service_stopped.set()
+
+ def fork_one_request(self, conn, client_addr, command, env):
+ # We intentionally don't allow the test suite to request a fork, as
+ # threads + forks and everything else don't exactly play well together
+ self.fork_log.append((command, env))
+ conn.sendall('ok\nfake forking\n')
+ conn.close()
+
+ @staticmethod
+ def start_service(test):
+ """Start a new LPForkingService in a thread at a random path.
+
+ This will block until the service has created its socket, and is ready
+ to communicate.
+
+ :return: A new TestingLPForkingServiceInAThread instance
+ """
+ fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
+ suffix='.sock')
+ # We don't want a temp file, we want a temp socket
+ os.close(fd)
+ os.remove(path)
+ new_service = TestingLPForkingServiceInAThread(path=path)
+ thread = threading.Thread(target=new_service.main_loop,
+ name='TestingLPForkingServiceInAThread')
+ new_service.this_thread = thread
+ # should we be doing thread.setDaemon(True) ?
+ thread.start()
+ new_service.service_started.wait(10.0)
+ if not new_service.service_started.isSet():
+ raise RuntimeError(
+ 'Failed to start the TestingLPForkingServiceInAThread')
+ test.addCleanup(new_service.stop_service)
+ # what about returning new_service._sockname ?
+ return new_service
+
+ def stop_service(self):
+ """Stop the test-server thread. This can be called multiple times."""
+ if self.this_thread is None:
+ # We already stopped the process
+ return
+ self._should_terminate.set()
+ self.service_stopped.wait(10.0)
+ if not self.service_stopped.isSet():
+ raise RuntimeError(
+ 'Failed to stop the TestingLPForkingServiceInAThread')
+ self.this_thread.join()
+ # Break any refcycles
+ self.this_thread = None
+
+
+class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
+
+ def test_start_and_stop_service(self):
+ service = TestingLPForkingServiceInAThread.start_service(self)
+ service.stop_service()
+
+ def test_multiple_stops(self):
+ service = TestingLPForkingServiceInAThread.start_service(self)
+ service.stop_service()
+ service.stop_service()
+
+ def test_autostop(self):
+ # We shouldn't leak a thread here, as it should be part of the test
+ # case teardown.
+ service = TestingLPForkingServiceInAThread.start_service(self)
+
+
+class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
+
+ def setUp(self):
+ super(TestCaseWithLPForkingService, self).setUp()
+ self.service = TestingLPForkingServiceInAThread.start_service(self)
+
+ def send_message_to_service(self, message, one_byte_at_a_time=False):
+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client_sock.connect(self.service.master_socket_path)
+ if one_byte_at_a_time:
+ for byte in message:
+ client_sock.send(byte)
+ else:
+ client_sock.sendall(message)
+ response = client_sock.recv(1024)
+ return response
+
+
+class TestLPForkingServiceCommandToArgv(tests.TestCase):
+
+ def assertAsArgv(self, argv, command_str):
+ self.assertEqual(argv,
+ lpserve.LPForkingService.command_to_argv(command_str))
+
+ def test_simple(self):
+ self.assertAsArgv([u'foo'], 'foo')
+ self.assertAsArgv([u'foo', u'bar'], 'foo bar')
+
+ def test_quoted(self):
+ self.assertAsArgv([u'foo'], 'foo')
+ self.assertAsArgv([u'foo bar'], '"foo bar"')
+
+ def test_unicode(self):
+ self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
+
+
+class TestLPForkingServiceParseEnv(tests.TestCase):
+
+ def assertEnv(self, env, env_str):
+ self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
+
+ def assertInvalid(self, env_str):
+ self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
+ env_str)
+
+ def test_no_entries(self):
+ self.assertEnv({}, 'end\n')
+
+ def test_one_entries(self):
+ self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx'},
+ 'BZR_EMAIL: joe@xxxxxxx\n'
+ 'end\n')
+
+ def test_two_entries(self):
+ self.assertEnv({'BZR_EMAIL': 'joe@xxxxxxx', 'BAR': 'foo'},
+ 'BZR_EMAIL: joe@xxxxxxx\n'
+ 'BAR: foo\n'
+ 'end\n')
+
+ def test_invalid_empty(self):
+ self.assertInvalid('')
+
+ def test_invalid_end(self):
+ self.assertInvalid("BZR_EMAIL: joe@xxxxxxx\n")
+
+ def test_invalid_entry(self):
+ self.assertInvalid("BZR_EMAIL joe@xxxxxxx\nend\n")
+
+
+class TestLPForkingService(TestCaseWithLPForkingService):
+
+ def test_send_quit_message(self):
+ response = self.send_message_to_service('quit\n')
+ self.assertEqual('ok\nquit command requested... exiting\n', response)
+ self.service.service_stopped.wait(10.0)
+ self.assertTrue(self.service.service_stopped.isSet())
+
+ def test_send_invalid_message_fails(self):
+ response = self.send_message_to_service('unknown\n')
+ self.assertStartsWith(response, 'FAILURE')
+
+ def test_send_hello_heartbeat(self):
+ response = self.send_message_to_service('hello\n')
+ self.assertEqual('ok\nyep, still alive\n', response)
+
+ def test_hello_supports_crlf(self):
+ # telnet seems to always write in text mode. It is nice to be able to
+ # debug with simple telnet, so lets support it.
+ response = self.send_message_to_service('hello\r\n')
+ self.assertEqual('ok\nyep, still alive\n', response)
+
+ def test_send_simple_fork(self):
+ response = self.send_message_to_service('fork rocks\n')
+ self.assertEqual('ok\nfake forking\n', response)
+ self.assertEqual([(['rocks'], {})], self.service.fork_log)
+
+ def test_send_fork_env_with_empty_env(self):
+ response = self.send_message_to_service(
+ 'fork-env rocks\n'
+ 'end\n')
+ self.assertEqual('ok\nfake forking\n', response)
+ self.assertEqual([(['rocks'], {})], self.service.fork_log)
+
+ def test_send_fork_env_with_env(self):
+ response = self.send_message_to_service(
+ 'fork-env rocks\n'
+ 'BZR_EMAIL: joe@xxxxxxxxxxx\n'
+ 'end\n')
+ self.assertEqual('ok\nfake forking\n', response)
+ self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+ self.service.fork_log)
+
+ def test_send_fork_env_slowly(self):
+ response = self.send_message_to_service(
+ 'fork-env rocks\n'
+ 'BZR_EMAIL: joe@xxxxxxxxxxx\n'
+ 'end\n', one_byte_at_a_time=True)
+ self.assertEqual('ok\nfake forking\n', response)
+ self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+ self.service.fork_log)
+ # It should work with 'crlf' endings as well
+ response = self.send_message_to_service(
+ 'fork-env rocks\r\n'
+ 'BZR_EMAIL: joe@xxxxxxxxxxx\r\n'
+ 'end\r\n', one_byte_at_a_time=True)
+ self.assertEqual('ok\nfake forking\n', response)
+ self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'}),
+ (['rocks'], {'BZR_EMAIL': 'joe@xxxxxxxxxxx'})],
+ self.service.fork_log)
+
+ def test_send_incomplete_fork_env_timeout(self):
+ # We should get a failure message if we can't quickly read the whole
+ # content
+ response = self.send_message_to_service(
+ 'fork-env rocks\n'
+ 'BZR_EMAIL: joe@xxxxxxxxxxx\n',
+ one_byte_at_a_time=True)
+ # Note that we *don't* send a final 'end\n'
+ self.assertStartsWith(response, 'FAILURE\n')
+
+ def test_send_incomplete_request_timeout(self):
+ # Requests end with '\n', send one without it
+ response = self.send_message_to_service('hello',
+ one_byte_at_a_time=True)
+ self.assertStartsWith(response, 'FAILURE\n')
+
+
+class TestCaseWithSubprocess(tests.TestCaseWithTransport):
+ """Override the bzr start_bzr_subprocess command.
+
+ The launchpad infrastructure requires a fair amount of configuration to get
+ paths, etc correct. So this provides that work.
+ """
+
+ def get_python_path(self):
+ """Return the path to the Python interpreter."""
+ return '%s/bin/py' % config.root
+
+ def start_bzr_subprocess(self, process_args, env_changes=None,
+ working_dir=None):
+ """Start bzr in a subprocess for testing.
+
+ Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
+ This version removes some of the skipping stuff, some of the
+ irrelevant comments (e.g. about win32) and uses Launchpad's own
+ mechanisms for getting the path to 'bzr'.
+
+ Comments starting with 'LAUNCHPAD' are comments about our
+ modifications.
+ """
+ if env_changes is None:
+ env_changes = {}
+ env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
+ old_env = {}
+
+ def cleanup_environment():
+ for env_var, value in env_changes.iteritems():
+ old_env[env_var] = osutils.set_or_unset_env(env_var, value)
+
+ def restore_environment():
+ for env_var, value in old_env.iteritems():
+ osutils.set_or_unset_env(env_var, value)
+
+ cwd = None
+ if working_dir is not None:
+ cwd = osutils.getcwd()
+ os.chdir(working_dir)
+
+ # LAUNCHPAD: Because of buildout, we need to get a custom Python
+ # binary, not sys.executable.
+ python_path = self.get_python_path()
+ # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
+ # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
+ bzr_path = get_bzr_path()
+ try:
+ cleanup_environment()
+ command = [python_path, bzr_path]
+ command.extend(process_args)
+ process = self._popen(
+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ finally:
+ restore_environment()
+ if cwd is not None:
+ os.chdir(cwd)
+
+ return process
+
+
+class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
+ """Tests will get a separate process to communicate to.
+
+ The number of these tests should be small, because it is expensive to start
+ and stop the daemon.
+
+ TODO: This should probably use testresources, or layers somehow...
+ """
+
+ def setUp(self):
+ super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
+ self.service_process, self.service_path = self.start_service_subprocess()
+ self.addCleanup(self.stop_service)
+
+ def start_conversation(self, message, one_byte_at_a_time=False):
+ """Start talking to the service, and get the initial response."""
+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ trace.mutter('sending %r to socket %s' % (message, self.service_path))
+ client_sock.connect(self.service_path)
+ if one_byte_at_a_time:
+ for byte in message:
+ client_sock.send(byte)
+ else:
+ client_sock.sendall(message)
+ response = client_sock.recv(1024)
+ trace.mutter('response: %r' % (response,))
+ if response.startswith("FAILURE"):
+ raise RuntimeError('Failed to send message: %r' % (response,))
+ return response, client_sock
+
+ def send_message_to_service(self, message, one_byte_at_a_time=False):
+ response, client_sock = self.start_conversation(message,
+ one_byte_at_a_time=one_byte_at_a_time)
+ client_sock.close()
+ return response
+
+ def send_fork_request(self, command, env=None):
+ if env is not None:
+ request_lines = ['fork-env %s\n' % (command,)]
+ for key, value in env.iteritems():
+ request_lines.append('%s: %s\n' % (key, value))
+ request_lines.append('end\n')
+ request = ''.join(request_lines)
+ else:
+ request = 'fork %s\n' % (command,)
+ response, sock = self.start_conversation(request)
+ ok, pid, path, tail = response.split('\n')
+ self.assertEqual('ok', ok)
+ self.assertEqual('', tail)
+ # Don't really care what it is, but should be an integer
+ pid = int(pid)
+ path = path.strip()
+ self.assertContainsRe(path, '/lp-forking-service-child-')
+ return path, pid, sock
+
+ def start_service_subprocess(self):
+ # Make sure this plugin is exposed to the subprocess
+ # SLOOWWW (~2 seconds, which is why we are doing the work anyway)
+ fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
+ # I'm not 100% sure about when cleanup runs versus addDetail, but I
+ # think this will work.
+ self.addCleanup(os.remove, tempname)
+ def read_log():
+ f = os.fdopen(fd)
+ f.seek(0)
+ content = f.read()
+ f.close()
+ return [content]
+ self.addDetail('server-log', content.Content(
+ content.ContentType('text', 'plain', {"charset": "utf8"}),
+ read_log))
+ service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-',
+ suffix='.sock')
+ os.close(service_fd)
+ os.remove(path) # service wants create it as a socket
+ env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0],
+ 'BZR_LOG': tempname}
+ proc = self.start_bzr_subprocess(
+ ['lp-service', '--path', path, '--no-preload',
+ '--children-timeout=1'],
+ env_changes=env_changes)
+ trace.mutter('started lp-service subprocess')
+ # preload_line = proc.stderr.readline()
+ # self.assertStartsWith(preload_line, 'Preloading')
+ expected = 'Listening on socket: %s\n' % (path,)
+ path_line = proc.stderr.readline()
+ trace.mutter(path_line)
+ self.assertEqual(expected, path_line)
+ # The process won't delete it, so we do
+ return proc, path
+
+ def stop_service(self):
+ if self.service_process is None:
+ # Already stopped
+ return
+ # First, try to stop the service gracefully, by sending a 'quit'
+ # message
+ try:
+ response = self.send_message_to_service('quit\n')
+ except socket.error, e:
+ # Ignore a failure to connect, the service must be stopping/stopped
+ # already
+ response = None
+ tend = time.time() + 10.0
+ while self.service_process.poll() is None:
+ if time.time() > tend:
+ self.finish_bzr_subprocess(process=self.service_process,
+ send_signal=signal.SIGINT, retcode=3)
+ self.fail('Failed to quit gracefully after 10.0 seconds')
+ time.sleep(0.1)
+ if response is not None:
+ self.assertEqual('ok\nquit command requested... exiting\n',
+ response)
+
+ def _get_fork_handles(self, path):
+ trace.mutter('getting handles for: %s' % (path,))
+ stdin_path = os.path.join(path, 'stdin')
+ stdout_path = os.path.join(path, 'stdout')
+ stderr_path = os.path.join(path, 'stderr')
+ # Consider the ordering, the other side should open 'stdin' first, but
+ # we want it to block until we open the last one, or we race and it can
+ # delete the other handles before we get to open them.
+ child_stdin = open(stdin_path, 'wb')
+ child_stdout = open(stdout_path, 'rb')
+ child_stderr = open(stderr_path, 'rb')
+ return child_stdin, child_stdout, child_stderr
+
+ def communicate_with_fork(self, path, stdin=None):
+ child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
+ if stdin is not None:
+ child_stdin.write(stdin)
+ child_stdin.close()
+ stdout_content = child_stdout.read()
+ stderr_content = child_stderr.read()
+ return stdout_content, stderr_content
+
+ def assertReturnCode(self, expected_code, sock):
+ """Assert that we get the expected return code as a message."""
+ response = sock.recv(1024)
+ self.assertStartsWith(response, 'exited\n')
+ code = int(response.split('\n', 1)[1])
+ self.assertEqual(expected_code, code)
+
+ def test_fork_lp_serve_hello(self):
+ path, _, sock = self.send_fork_request('lp-serve --inet 2')
+ stdout_content, stderr_content = self.communicate_with_fork(path,
+ 'hello\n')
+ self.assertEqual('ok\x012\n', stdout_content)
+ self.assertEqual('', stderr_content)
+ self.assertReturnCode(0, sock)
+
+ def test_fork_replay(self):
+ path, _, sock = self.send_fork_request('launchpad-replay')
+ stdout_content, stderr_content = self.communicate_with_fork(path,
+ '1 hello\n2 goodbye\n1 maybe\n')
+ self.assertEqualDiff('hello\nmaybe\n', stdout_content)
+ self.assertEqualDiff('goodbye\n', stderr_content)
+ self.assertReturnCode(0, sock)
+
+ def test_just_run_service(self):
+ # Start and stop are defined in setUp()
+ pass
+
+ def test_fork_multiple_children(self):
+ paths = []
+ for idx in range(4):
+ paths.append(self.send_fork_request('launchpad-replay'))
+ # Do them out of order, as order shouldn't matter.
+ for idx in [3, 2, 0, 1]:
+ p, pid, sock = paths[idx]
+ stdout_msg = 'hello %d\n' % (idx,)
+ stderr_msg = 'goodbye %d\n' % (idx+1,)
+ stdout, stderr = self.communicate_with_fork(p,
+ '1 %s2 %s' % (stdout_msg, stderr_msg))
+ self.assertEqualDiff(stdout_msg, stdout)
+ self.assertEqualDiff(stderr_msg, stderr)
+ self.assertReturnCode(0, sock)
+
+ def test_fork_respects_env_vars(self):
+ path, pid, sock = self.send_fork_request('whoami',
+ env={'BZR_EMAIL': 'this_test@xxxxxxxxxxx'})
+ stdout_content, stderr_content = self.communicate_with_fork(path)
+ self.assertEqual('', stderr_content)
+ self.assertEqual('this_test@xxxxxxxxxxx\n', stdout_content)
+
+ def _check_exits_nicely(self, sig_id):
+ path, _, sock = self.send_fork_request('rocks')
+ self.assertEqual(None, self.service_process.poll())
+ # Now when we send SIGTERM, it should wait for the child to exit,
+ # before it tries to exit itself.
+ # In python2.6+ we could use self.service_process.terminate()
+ os.kill(self.service_process.pid, sig_id)
+ self.assertEqual(None, self.service_process.poll())
+ # Now talk to the child, so the service can close
+ stdout_content, stderr_content = self.communicate_with_fork(path)
+ self.assertEqual('It sure does!\n', stdout_content)
+ self.assertEqual('', stderr_content)
+ self.assertReturnCode(0, sock)
+ # And the process should exit cleanly
+ self.assertEqual(0, self.service_process.wait())
+
+ def test_sigterm_exits_nicely(self):
+ self._check_exits_nicely(signal.SIGTERM)
+
+ def test_sigint_exits_nicely(self):
+ self._check_exits_nicely(signal.SIGINT)
=== modified file 'configs/development/launchpad-lazr.conf'
--- configs/development/launchpad-lazr.conf 2010-09-20 21:35:21 +0000
+++ configs/development/launchpad-lazr.conf 2010-10-04 22:07:56 +0000
@@ -76,6 +76,7 @@
lp_url_hosts: dev
access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log
blacklisted_hostnames:
+use_forking_daemon: True
[codeimport]
bazaar_branch_store: file:///tmp/bazaar-branches
=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf 2010-09-24 22:30:48 +0000
+++ lib/canonical/config/schema-lazr.conf 2010-10-04 22:07:56 +0000
@@ -301,6 +301,18 @@
# datatype: string
logfile: -
+# The location of the log file used by the LaunchpadForkingService
+# datatype: string
+forker_logfile: -
+
+# Should we be using the forking daemon? Or should we be calling spawnProcess
+# instead?
+# datatype: boolean
+use_forking_daemon: False
+# What disk path will the daemon listen on
+# datatype: string
+forking_daemon_socket: /var/tmp/launchpad_forking_service.sock
+
# The prefix of the web URL for all public branches. This should end with a
# slash.
#
=== modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py'
--- lib/canonical/launchpad/scripts/runlaunchpad.py 2010-09-17 20:46:58 +0000
+++ lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-04 22:07:56 +0000
@@ -184,6 +184,53 @@
stop_at_exit(process)
+class ForkingSessionService(Service):
+ """A lp-forking-service for handling ssh access."""
+
+ # TODO: The SFTP (and bzr+ssh) server depends fairly heavily on this
+ # service. It would seem reasonable to make one always start if the
+ # other one is started. Though this might be a way to "FeatureFlag"
+ # whether this is active or not.
+
+ @property
+ def should_launch(self):
+ return (config.codehosting.launch and
+ config.codehosting.use_forking_daemon)
+
+ @property
+ def logfile(self):
+ """Return the log file to use.
+
+ Default to the value of the configuration key logfile.
+ """
+ return config.codehosting.forker_logfile
+
+ def launch(self):
+ # Following the logic in TacFile. Specifically, if you configure sftp
+ # to not run (and thus bzr+ssh) then we don't want to run the forking
+ # service.
+ if not self.should_launch:
+ return
+ from lp.codehosting import get_bzr_path
+ command = [config.root + '/bin/py', get_bzr_path(),
+ 'launchpad-forking-service',
+ '--path', config.codehosting.forking_daemon_socket,
+ ]
+ env = dict(os.environ)
+ env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins'
+ logfile = self.logfile
+ if logfile == '-':
+ # This process uses a different logging infrastructure from the
+ # rest of the Launchpad code. As such, it cannot trivially use '-'
+ # as the logfile. So we just ignore this setting.
+ pass
+ else:
+ env['BZR_LOG'] = logfile
+ process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
+ process.stdin.close()
+ stop_at_exit(process)
+
+
def stop_at_exit(process):
"""Create and register an atexit hook for killing a process.
@@ -208,6 +255,9 @@
'librarian': TacFile('librarian', 'daemons/librarian.tac',
'librarian_server', prepare_for_librarian),
'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
+ # TODO, we probably need to run the forking service whenever somebody
+ # requests the sftp service...
+ 'forker': ForkingSessionService(),
'mailman': MailmanService(),
'codebrowse': CodebrowseService(),
'google-webservice': GoogleWebService(),
=== modified file 'lib/lp/codehosting/sshserver/session.py'
--- lib/lp/codehosting/sshserver/session.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/session.py 2010-10-04 22:07:56 +0000
@@ -9,9 +9,20 @@
]
import os
+import signal
+import socket
import urlparse
-from twisted.internet.process import ProcessExitedAlready
+from zope.event import notify
+from zope.interface import implements
+
+from twisted.internet import (
+ error,
+ fdesc,
+ interfaces,
+ main,
+ process,
+ )
from twisted.python import log
from zope.event import notify
@@ -35,6 +46,229 @@
"""Raised when a session is asked to execute a forbidden command."""
+class _WaitForExit(process.ProcessReader):
+ """Wait on a socket for the exit status."""
+
+ def __init__(self, reactor, proc, sock):
+ super(_WaitForExit, self).__init__(reactor, proc, 'exit',
+ sock.fileno())
+ self._sock = sock
+ self.connected = 1
+
+ def close(self):
+ self._sock.close()
+
+ def dataReceived(self, data):
+ # TODO: how do we handle getting only *some* of the content?, Maybe we
+ # need to read more bytes first...
+
+ # This is the only thing we do differently from the standard
+ # ProcessReader. When we get data on this socket, we need to treat it
+ # as a return code, or a failure.
+ if not data.startswith('exited'):
+ # Bad data, we want to signal that we are closing the connection
+ # TODO: How?
+ # self.proc.?
+ self.close()
+ # I don't know what to put here if we get bogus data, but I *do*
+ # want to say that the process is now considered dead to me
+ log.err('Got invalid exit information: %r' % (data,))
+ exit_status = (255 << 8)
+ else:
+ exit_status = int(data.split('\n')[1])
+ self.proc.processEnded(exit_status)
+
+
+class ForkedProcessTransport(process.BaseProcess):
+ """Wrap the forked process in a ProcessTransport so we can talk to it.
+
+ Note that instantiating the class creates the fork and sets it up in the
+ reactor.
+ """
+
+ implements(interfaces.IProcessTransport)
+
+ # Design decisions
+ # [Decision #a]
+ # Inherit from process.BaseProcess
+ # This seems slightly risky, as process.BaseProcess is actually
+ # imported from twisted.internet._baseprocess.BaseProcess. The
+ # real-world Process then actually inherits from process._BaseProcess
+ # I've also had to copy a fair amount from the actual Process
+ # command.
+ # One option would be to inherit from process.Process, and just
+ # override stuff like __init__ and reapProcess which I don't want to
+ # do in the same way. (Is it ok not to call your Base classes
+ # __init__ if you don't want to do that exact work?)
+
+ def __init__(self, reactor, executable, args, environment, proto):
+ process.BaseProcess.__init__(self, proto)
+ # Map from standard file descriptor to the associated pipe
+ self.pipes = {}
+ pid, path, sock = self._spawn(executable, args, environment)
+ self._fifo_path = path
+ self.pid = pid
+ self.process_sock = sock
+ self._fifo_path = path
+ self._connectSpawnToReactor(reactor)
+ if self.proto is not None:
+ self.proto.makeConnection(self)
+
+ def _sendMessageToService(self, message):
+ """Send a message to the Forking service and get the response"""
+ path = config.codehosting.forking_daemon_socket
+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ log.msg('Connecting to Forking Service @ socket: %s for %r'
+ % (path, message))
+ try:
+ client_sock.connect(path)
+ client_sock.sendall(message)
+ # We define the requests to be no bigger than 1kB. (For now)
+ response = client_sock.recv(1024)
+ except socket.error, e:
+ # TODO: What exceptions should be raised?
+ # Raising the raw exception seems to kill the twisted reactor
+ # Note that if the connection is refused, we *could* just
+ # fall back on a regular 'spawnProcess' call.
+ log.err('Connection failed: %s' % (e,))
+ raise
+ if response.startswith("FAILURE"):
+ raise RuntimeError('Failed to send message: %r' % (response,))
+ return response, client_sock
+
+ def _spawn(self, executable, args, environment):
+ assert executable == 'bzr', executable # Maybe .endswith()
+ assert args[0] == 'bzr', args[0]
+ command_str = ' '.join(args[1:])
+ message = ['fork-env %s\n' % (' '.join(args[1:]),)]
+ for key, value in environment.iteritems():
+ # XXX: Currently we only pass BZR_EMAIL, should we be passing
+ # everything else? Note that many won't be handled properly,
+ # since the process is already running.
+ if key != 'BZR_EMAIL':
+ continue
+ message.append('%s: %s\n' % (key, value))
+ message.append('end\n')
+ message = ''.join(message)
+ response, sock = self._sendMessageToService(message)
+ if response.startswith('FAILURE'):
+ # TODO: Is there a better error to raise?
+ raise RuntimeError("Failed while sending message to forking "
+ "service. message: %r, failure: %r"
+ % (message, response))
+ ok, pid, path, tail = response.split('\n')
+ assert ok == 'ok'
+ assert tail == ''
+ pid = int(pid)
+ log.msg('Forking returned pid: %d, path: %s' % (pid, path))
+ return pid, path, sock
+
+ def _connectSpawnToReactor(self, reactor):
+ stdin_path = os.path.join(self._fifo_path, 'stdin')
+ stdout_path = os.path.join(self._fifo_path, 'stdout')
+ stderr_path = os.path.join(self._fifo_path, 'stderr')
+ child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
+ self.pipes[0] = process.ProcessWriter(reactor, self, 0, child_stdin_fd)
+ child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
+ # forceReadHack=True ? Used in process.py doesn't seem to be needed here
+ self.pipes[1] = process.ProcessReader(reactor, self, 1, child_stdout_fd)
+ child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
+ self.pipes[2] = process.ProcessReader(reactor, self, 2, child_stderr_fd)
+ # Note: _exiter forms a GC cycle, since it points to us, and we hold a
+ # reference to it
+ self._exiter = _WaitForExit(reactor, self, self.process_sock)
+ self.pipes['exit'] = self._exiter
+
+ def _getReason(self, status):
+ # Copied from twisted.internet.process._BaseProcess
+ exitCode = sig = None
+ if os.WIFEXITED(status):
+ exitCode = os.WEXITSTATUS(status)
+ else:
+ sig = os.WTERMSIG(status)
+ if exitCode or sig:
+ return error.ProcessTerminated(exitCode, sig, status)
+ return error.ProcessDone(status)
+
+ def signalProcess(self, signalID):
+ """
+ Send the given signal C{signalID} to the process. It'll translate a
+ few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
+ representation to its int value, otherwise it'll pass directly the
+ value provided
+
+ @type signalID: C{str} or C{int}
+ """
+ # Copied from twisted.internet.process._BaseProcess
+ if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
+ signalID = getattr(signal, 'SIG%s' % (signalID,))
+ if self.pid is None:
+ raise process.ProcessExitedAlready()
+ os.kill(self.pid, signalID)
+
+ # Implemented because conch.ssh.session uses it, the Process implementation
+ # ignores writes if channel '0' is not available
+ def write(self, data):
+ self.pipes[0].write(data)
+
+ def writeToChild(self, childFD, data):
+ # Copied from twisted.internet.process.Process
+ self.pipes[childFD].write(data)
+
+ def closeChildFD(self, childFD):
+ if childFD in self.pipes:
+ self.pipes[childFD].loseConnection()
+
+ def closeStdin(self):
+ self.closeChildFD(0)
+
+ def closeStdout(self):
+ self.closeChildFD(1)
+
+ def closeStderr(self):
+ self.closeChildFD(2)
+
+ def loseConnection(self):
+ self.closeStdin()
+ self.closeStdout()
+ self.closeStderr()
+
+ # Implemented because ProcessWriter/ProcessReader want to call it
+ # Copied from twisted.internet.Process
+ def childDataReceived(self, name, data):
+ self.proto.childDataReceived(name, data)
+
+ # Implemented because ProcessWriter/ProcessReader want to call it
+ # Copied from twisted.internet.Process
+ def childConnectionLost(self, childFD, reason):
+ close = getattr(self.pipes[childFD], 'close', None)
+ if close is not None:
+ close()
+ else:
+ os.close(self.pipes[childFD].fileno())
+ del self.pipes[childFD]
+ try:
+ self.proto.childConnectionLost(childFD)
+ except:
+ log.err()
+ self.maybeCallProcessEnded()
+
+ # Implemented because of childConnectionLost
+ # Adapted from twisted.internet.Process
+ # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
+ # point, but the daemon will be doing the reaping for us (we can't
+ # because the process isn't a direct child.)
+ def maybeCallProcessEnded(self):
+ if self.pipes:
+ # Not done if we still have open pipes
+ return
+ if not self.lostProcess:
+ return
+ process.BaseProcess.maybeCallProcessEnded(self)
+
+ # pauseProducing, present in process.py, not a IProcessTransport interface
+
+
class ExecOnlySession(DoNothingSession):
"""Conch session that only allows executing commands."""
@@ -58,7 +292,7 @@
notify(BazaarSSHClosed(self.avatar))
try:
self._transport.signalProcess('HUP')
- except (OSError, ProcessExitedAlready):
+ except (OSError, process.ProcessExitedAlready):
pass
self._transport.loseConnection()
@@ -81,8 +315,7 @@
except ForbiddenCommand, e:
self.errorWithMessage(protocol, str(e) + '\r\n')
return
- log.msg('Running: %r, %r, %r'
- % (executable, arguments, self.environment))
+ log.msg('Running: %r, %r' % (executable, arguments))
if self._transport is not None:
log.err(
"ERROR: %r already running a command on transport %r"
@@ -91,8 +324,12 @@
# violation. Apart from this line and its twin, this class knows
# nothing about Bazaar.
notify(BazaarSSHStarted(self.avatar))
- self._transport = self.reactor.spawnProcess(
- protocol, executable, arguments, env=self.environment)
+ self._transport = self._spawn(protocol, executable, arguments,
+ env=self.environment)
+
+ def _spawn(self, protocol, executable, arguments, env):
+ return self.reactor.spawnProcess(protocol, executable, arguments,
+ env=env)
def getCommandToRun(self, command):
"""Return the command that will actually be run given `command`.
@@ -144,21 +381,62 @@
% {'user_id': self.avatar.user_id})
+class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
+ """Use the Forking Service instead of spawnProcess."""
+
+ def _simplifyEnvironment(self, env):
+ """Pull out the bits of the environment we want to pass along."""
+ env = {}
+ for env_var in ['BZR_EMAIL']:
+ if env_var in self.environment:
+ env[env_var] = self.environment[env_var]
+ return env
+
+ def getCommandToFork(self, executable, arguments, env):
+ assert executable.endswith('/bin/py')
+ assert arguments[0] == executable
+ assert arguments[1].endswith('/bzr')
+ executable = 'bzr'
+ arguments = arguments[1:]
+ arguments[0] = 'bzr'
+ env = self._simplifyEnvironment(env)
+ return executable, arguments, env
+
+ def _spawn(self, protocol, executable, arguments, env):
+ # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr"
+ # and the executable
+ executable, arguments, env = self.getCommandToFork(executable,
+ arguments, env)
+ return ForkedProcessTransport(self.reactor, executable,
+ arguments, env, protocol)
+
+
def launch_smart_server(avatar):
from twisted.internet import reactor
- command = (
- "%(root)s/bin/py %(bzr)s lp-serve --inet "
- % {'root': config.root, 'bzr': get_bzr_path()})
+ python_command = "%(root)s/bin/py %(bzr)s" % {
+ 'root': config.root, 'bzr': get_bzr_path()
+ }
+ args = " lp-serve --inet %(user_id)s"
+ command = python_command + args
+ forking_command = "bzr" + args
environment = dict(os.environ)
# Extract the hostname from the supermirror root config.
hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]
environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)
- return RestrictedExecOnlySession(
+ klass = RestrictedExecOnlySession
+ # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
+ # If the forking daemon has been spawned, then we can use it if the
+ # feature is set to true for the given user, etc.
+ # A global config is a good first step, but does require restarting
+ # the service to change the flag. Or does 'config' support SIGHUP?
+ if config.codehosting.use_forking_daemon:
+ klass = ForkingRestrictedExecOnlySession
+ return klass(
avatar,
reactor,
'bzr serve --inet --directory=/ --allow-writes',
- command + ' %(user_id)s',
+ command,
environment=environment)
=== modified file 'lib/lp/codehosting/sshserver/tests/test_session.py'
--- lib/lp/codehosting/sshserver/tests/test_session.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/tests/test_session.py 2010-10-04 22:07:56 +0000
@@ -5,6 +5,7 @@
__metaclass__ = type
+import socket
import unittest
from twisted.conch.interfaces import ISession
@@ -21,7 +22,9 @@
from lp.codehosting.sshserver.session import (
ExecOnlySession,
ForbiddenCommand,
+ ForkingRestrictedExecOnlySession,
RestrictedExecOnlySession,
+ _WaitForExit,
)
from lp.codehosting.tests.helpers import AvatarTestCase
@@ -40,6 +43,9 @@
usePTY, childFDs))
return MockProcessTransport(executable)
+ def addReader(self, reader):
+ self.log.append(('addReader', reader))
+
class MockSSHSession:
"""Just enough of SSHSession to allow checking of reporting to stderr."""
@@ -60,6 +66,7 @@
self._executable = executable
self.log = []
self.session = MockSSHSession(self.log)
+ self.status = None
def closeStdin(self):
self.log.append(('closeStdin',))
@@ -77,6 +84,43 @@
def write(self, data):
self.log.append(('write', data))
+ def processEnded(self, status):
+ self.log.append(('processEnded', status))
+
+
+# TODO: What base *should* I be using?
+class Test_WaitForExit(AvatarTestCase):
+
+ def setUp(self):
+ AvatarTestCase.setUp(self)
+ # self.avatar = CodehostingAvatar(self.aliceUserDict, None)
+ # The logging system will try to get the id of avatar.transport, so
+ # let's give it something to take the id of.
+ # self.avatar.transport = object()
+ self.reactor = MockReactor()
+ self.proc = MockProcessTransport('executable')
+ sock = socket.socket()
+ self.exiter = _WaitForExit(self.reactor, self.proc, sock)
+
+ def test__init__starts_reading(self):
+ self.assertEqual([('addReader', self.exiter)], self.reactor.log)
+
+ def test_dataReceived_ends_cleanly(self):
+ self.exiter.dataReceived('exited\n0\n')
+ self.assertEqual([('processEnded', 0)], self.proc.log)
+
+ def test_dataReceived_ends_with_errno(self):
+ self.exiter.dataReceived('exited\n256\n')
+ self.assertEqual([('processEnded', 256)], self.proc.log)
+
+ def test_dataReceived_bad_data(self):
+ # Note: The dataReceived code calls 'log.err' which ends up getting
+ # printed during the test run. How do I suppress that or even
+ # better, check that it does so?
+ # self.flushLoggedErrors() doesn't seem to do anything.
+ self.exiter.dataReceived('bogus\n')
+ self.assertEqual([('processEnded', (255 << 8))], self.proc.log)
+
class TestExecOnlySession(AvatarTestCase):
"""Tests for ExecOnlySession.
@@ -340,6 +384,35 @@
self.assertRaises(
ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
+ def test_avatarAdaptsToOnlyRestrictedSession(self):
+ config.push('codehosting-no-forking',
+ "[codehosting]\nuse_forking_daemon: False\n")
+ self.addCleanup(config.pop, 'codehosting-no-forking')
+ session = ISession(self.avatar)
+ self.failIf(isinstance(session, ForkingRestrictedExecOnlySession),
+ "ISession(avatar) shouldn't adapt to "
+ " ForkingRestrictedExecOnlySession when forking is disabled. ")
+
+ def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
+ # config.push('codehosting-forking',
+ # "[codehosting]\nuse_forking_daemon: True\n")
+ # self.addCleanup(config.pop, 'codehosting-forking')
+ session = ISession(self.avatar)
+ self.failUnless(
+ isinstance(session, ForkingRestrictedExecOnlySession),
+ "ISession(avatar) doesn't adapt to "
+ " ForkingRestrictedExecOnlySession. "
+ "Got %r instead." % (session,))
+ executable, arguments = session.getCommandToRun(
+ 'bzr serve --inet --directory=/ --allow-writes')
+ executable, arguments, env = session.getCommandToFork(
+ executable, arguments, session.environment)
+ self.assertEqual('bzr', executable)
+ self.assertEqual(
+ ['bzr', 'lp-serve',
+ '--inet', str(self.avatar.user_id)],
+ list(arguments))
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
=== modified file 'lib/lp/codehosting/tests/test_lpserve.py'
--- lib/lp/codehosting/tests/test_lpserve.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/tests/test_lpserve.py 2010-10-04 22:07:56 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for the lp-serve plugin."""
@@ -8,6 +8,7 @@
import os
import re
from subprocess import PIPE
+import threading
import unittest
from bzrlib import (
@@ -17,16 +18,13 @@
from bzrlib.smart import medium
from bzrlib.tests import TestCaseWithTransport
from bzrlib.transport import remote
+from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess
from canonical.config import config
-from lp.codehosting import (
- get_bzr_path,
- get_BZR_PLUGIN_PATH_for_subprocess,
- )
from lp.codehosting.bzrutils import make_error_utility
-class TestLaunchpadServe(TestCaseWithTransport):
+class TestLaunchpadServe(TestCaseWithSubprocess):
"""Tests for the lp-serve plugin.
Most of the helper methods here are copied from bzrlib.tests and
@@ -38,59 +36,6 @@
"""Assert that a server process finished cleanly."""
self.assertEqual((0, '', ''), tuple(result))
- def get_python_path(self):
- """Return the path to the Python interpreter."""
- return '%s/bin/py' % config.root
-
- def start_bzr_subprocess(self, process_args, env_changes=None,
- working_dir=None):
- """Start bzr in a subprocess for testing.
-
- Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
- This version removes some of the skipping stuff, some of the
- irrelevant comments (e.g. about win32) and uses Launchpad's own
- mechanisms for getting the path to 'bzr'.
-
- Comments starting with 'LAUNCHPAD' are comments about our
- modifications.
- """
- if env_changes is None:
- env_changes = {}
- env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
- old_env = {}
-
- def cleanup_environment():
- for env_var, value in env_changes.iteritems():
- old_env[env_var] = osutils.set_or_unset_env(env_var, value)
-
- def restore_environment():
- for env_var, value in old_env.iteritems():
- osutils.set_or_unset_env(env_var, value)
-
- cwd = None
- if working_dir is not None:
- cwd = osutils.getcwd()
- os.chdir(working_dir)
-
- # LAUNCHPAD: Because of buildout, we need to get a custom Python
- # binary, not sys.executable.
- python_path = self.get_python_path()
- # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
- # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
- bzr_path = get_bzr_path()
- try:
- cleanup_environment()
- command = [python_path, bzr_path]
- command.extend(process_args)
- process = self._popen(
- command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- finally:
- restore_environment()
- if cwd is not None:
- os.chdir(cwd)
-
- return process
-
def finish_lpserve_subprocess(self, process):
"""Shut down the server process.
@@ -169,4 +114,10 @@
def test_suite():
- return unittest.TestLoader().loadTestsFromName(__name__)
+ from bzrlib import tests
+ from bzrlib.plugins import lpserve
+
+ loader = tests.TestLoader()
+ suite = loader.loadTestsFromName(__name__)
+ suite = lpserve.load_tests(suite, lpserve, loader)
+ return suite
Follow ups