launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25108
[Merge] ~cjwatson/launchpad:remove-bzr-forking-service into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:remove-bzr-forking-service into launchpad:master.
Commit message:
Remove the bzr forking service
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/388583
The performance optimisation idea was a reasonable one, but it never quite made it to production (https://bugs.launchpad.net/launchpad/+bug/732496), and now that we have git hosting support it's unlikely that we're going to want to put much effort into fixing a bzr optimisation. It can always be resurrected from history if need be.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:remove-bzr-forking-service into launchpad:master.
diff --git a/Makefile b/Makefile
index 1d81830..2a5ee82 100644
--- a/Makefile
+++ b/Makefile
@@ -327,7 +327,7 @@ start-gdb: build inplace stop support_files run.gdb
run_all: build inplace stop
bin/run \
- -r librarian,sftp,forker,codebrowse,bing-webservice,\
+ -r librarian,sftp,codebrowse,bing-webservice,\
memcached,rabbitmq -i $(LPCONFIG)
run_codebrowse: compile
@@ -340,7 +340,7 @@ stop_codebrowse:
$(PY) scripts/stop-loggerhead.py
run_codehosting: build inplace stop
- bin/run -r librarian,sftp,forker,codebrowse,rabbitmq -i $(LPCONFIG)
+ bin/run -r librarian,sftp,codebrowse,rabbitmq -i $(LPCONFIG)
start_librarian: compile
bin/start_librarian
diff --git a/brzplugins/lpserve/__init__.py b/brzplugins/lpserve/__init__.py
index eab193c..15a1b04 100644
--- a/brzplugins/lpserve/__init__.py
+++ b/brzplugins/lpserve/__init__.py
@@ -10,30 +10,13 @@ __metaclass__ = type
__all__ = [
'cmd_launchpad_server',
- 'cmd_launchpad_forking_service',
]
-import errno
-import fcntl
-import logging
-import os
import resource
-import shlex
-import shutil
-import signal
-import socket
-import sys
-import tempfile
-import threading
-import time
from breezy import (
- commands,
- errors,
lockdir,
- osutils,
- trace,
ui,
)
from breezy.commands import (
@@ -48,7 +31,6 @@ from breezy.transport import (
get_transport,
transport_server_registry,
)
-import six
class cmd_launchpad_server(Command):
@@ -157,844 +139,6 @@ class cmd_launchpad_server(Command):
register_command(cmd_launchpad_server)
-class LPForkingService(object):
- """A service that can be asked to start a new bzr subprocess via fork.
-
- The basic idea is that bootstrapping time is long. Most of this is time
- spent during import of all needed libraries (lp.*). For example, the
- original 'lp-serve' command could take 2.5s just to start up, before any
- actual actions could be performed.
-
- This class provides a service sitting on a socket, which can then be
- requested to fork and run a given bzr command.
-
- Clients connect to the socket and make a single request, which then
- receives a response. The possible requests are:
-
- "hello\n": Trigger a heartbeat to report that the program is still
- running, and write status information to the log file.
- "quit\n": Stop the service, but do so 'nicely', waiting for children
- to exit, etc. Once this is received the service will stop
- taking new requests on the port.
- "fork-env <command>\n<env>\nend\n": Request a new subprocess to be
- started. <command> is the bzr command to be run, such as "rocks"
- or "lp-serve --inet 12".
- The immediate response will be the path-on-disk to a directory
- full of named pipes (fifos) that will be the stdout/stderr/stdin
- (named accordingly) of the new process.
- If a client holds the socket open, when the child process exits,
- the exit status (as given by 'wait()') will be written to the
- socket.
-
- Note that one of the key bits is that the client will not be
- started with exec*, we just call 'commands.run_bzr*()' directly.
- This way, any modules that are already loaded will not need to be
- loaded again. However, care must be taken with any global-state
- that should be reset.
-
- fork-env allows you to supply environment variables such as
- "BRZ_EMAIL: joe@xxxxxxx" which will be set in os.environ before
- the command is run.
- """
-
- # Design decisions. These are bits where we could have chosen a different
- # method/implementation and weren't sure what would be best. Documenting
- # the current decision, and the alternatives.
- #
- # [Decision #1]
- # Serve on a named AF_UNIX socket.
- # 1) It doesn't make sense to serve to arbitrary hosts, we only want
- # the local host to make requests. (Since the client needs to
- # access the named fifos on the current filesystem.)
- # 2) You can set security parameters on a filesystem path (g+rw,
- # a-rw).
- # [Decision #2]
- # SIGCHLD
- # We want to quickly detect that children have exited so that we can
- # inform the client process quickly. At the moment, we register a
- # SIGCHLD handler that doesn't do anything. However, it means that
- # when we get the signal, if we are currently blocked in something
- # like '.accept()', we will jump out temporarily. At that point the
- # main loop will check if any children have exited. We could have
- # done this work as part of the signal handler, but that felt 'racy'
- # doing any serious work in a signal handler.
- # If we just used socket.timeout as the indicator to go poll for
- # children exiting, it slows the disconnect by as much as the full
- # timeout. (So a timeout of 1.0s will cause the process to hang by
- # that long until it determines that a child has exited, and can
- # close the connection.)
- # The current flow means that we'll notice exited children whenever
- # we finish the current work.
- # [Decision #3]
- # Child vs Parent actions.
- # There are several actions that are done when we get a new request.
- # We have to create the fifos on disk, fork a new child, connect the
- # child to those handles, and inform the client of the new path (not
- # necessarily in that order.) It makes sense to wait to send the
- # path message until after the fifos have been created. That way the
- # client can just try to open them immediately, and the
- # client-and-child will be synchronized by the open() calls.
- # However, should the client be the one doing the mkfifo, should the
- # server? Who should be sending the message? Should we fork after
- # the mkfifo or before?
- # The current thoughts:
- # 1) Try to do work in the child when possible. This should
- # allow for 'scaling' because the server is single-threaded.
- # 2) We create the directory itself in the server, because that
- # allows the server to monitor whether the client failed to
- # clean up after itself or not.
- # 3) Otherwise we create the fifos in the client, and then send
- # the message back.
- # [Decision #4]
- # Exit information
- # Inform the client that the child has exited on the socket they
- # used to request the fork.
- # 1) Arguably they could see that stdout and stderr have been
- # closed, and thus stop reading. In testing, I wrote a client
- # which uses select.poll() over stdin/stdout/stderr and used that
- # to ferry the content to the appropriate local handle. However
- # for the FIFOs, when the remote end closed, I wouldn't see any
- # corresponding information on the local end. There obviously
- # wasn't any data to be read, so they wouldn't show up as
- # 'readable' (for me to try to read, and get 0 bytes, indicating
- # it was closed). I also wasn't seeing POLLHUP, which seemed to
- # be the correct indicator. As such, we decided to inform the
- # client on the socket that they originally made the fork
- # request, rather than just closing the socket immediately.
- # 2) We could have had the forking server close the socket, and only
- # the child hold the socket open. When the child exits, then the
- # OS naturally closes the socket.
- # If we want the returncode, then we should put that as bytes on
- # the socket before we exit. Having the child do the work means
- # that in error conditions, it could easily die before being able
- # to write anything (think SEGFAULT, etc). The forking server is
- # already 'wait'() ing on its children. So that we don't get
- # zombies, and with wait3() we can get the rusage (user time,
- # memory consumption, etc.)
- # As such, it seems reasonable that the server can then also
- # report back when a child is seen as exiting.
- # [Decision #5]
- # cleanup once connected
- # The child process blocks during 'open()' waiting for the client to
- # connect to its fifos. Once the client has connected, the child
- # then deletes the temporary directory and the fifos from disk. This
- # means that there isn't much left for diagnosis, but it also means
- # that the client won't leave garbage around if it crashes, etc.
- # Note that the forking service itself still monitors the paths
- # created, and will delete garbage if it sees that a child failed to
- # do so.
- # [Decision #6]
- # os._exit(retcode) in the child
- # Calling sys.exit(retcode) raises an exception, which then bubbles
- # up the stack and runs exit functions (and finally statements).
- # When I tried using it originally, I would see the current child
- # bubble all the way up the stack (through the server code that it
- # fork() through), and then get to main() returning code 0. The
- # process would still exit nonzero. My guess is that something in
- # the atexit functions was failing, but that it was happening after
- # logging, etc had been shut down.
- # Any global state from the child process should be flushed before
- # run_bzr_* has exited (which we *do* wait for), and any other
- # global state is probably a remnant from the service process. Which
- # will be cleaned up by the service itself, rather than the child.
- # There is some concern that log files may not get flushed, so we
- # currently call sys.exitfunc() first. The main problem is that I
- # don't know any way to *remove* a function registered via
- # 'atexit()' so if the forking service has some state, we my try to
- # clean it up incorrectly.
- # Note that the brz script itself uses sys.exitfunc(); os._exit() in
- # the 'brz' main script, as the teardown time of all the python
- # state was quite noticeable in real-world runtime. As such, breezy
- # should be pretty safe, or it would have been failing for people
- # already.
- # [Decision #7]
- # prefork vs max children vs ?
- # For simplicity it seemed easiest to just fork when requested. Over
- # time, I realized it would be easy to allow running an arbitrary
- # command (no harder than just running one command), so it seemed
- # reasonable to switch over. If we go the prefork route, then we'll
- # need a way to tell the pre-forked children what command to run.
- # This could be as easy as just adding one more fifo that they wait
- # on in the same directory.
- # For now, I've chosen not to limit the number of forked children. I
- # don't know what a reasonable value is, and probably there are
- # already limitations at play. (If Conch limits connections, then it
- # will already be doing all the work, etc.)
- # [Decision #8]
- # nicer errors on the request socket
- # This service is meant to be run only on the local system. As such,
- # we don't try to be extra defensive about leaking information to
- # the one connecting to the socket. (We should still watch out what
- # we send across the per-child fifos, since those are connected to
- # remote clients.) Instead we try to be helpful, and tell them as
- # much as we know about what went wrong.
-
- DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
-
- # Permissions on the master socket (rw-rw----)
- DEFAULT_PERMISSIONS = 0o0660
-
- # Wait no more than 5 minutes for children.
- WAIT_FOR_CHILDREN_TIMEOUT = 5 * 60
-
- SOCKET_TIMEOUT = 1.0
- SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
-
- # No request should take longer than this to be read.
- WAIT_FOR_REQUEST_TIMEOUT = 1.0
-
- # If we get a fork() request, but nobody connects, just exit.
- # On a heavily loaded server it could take a few seconds, but it
- # should never take minutes.
- CHILD_CONNECT_TIMEOUT = 120
-
- _fork_function = os.fork
-
- def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS):
- self.master_socket_path = path
- self._perms = perms
- self._start_time = None
- self._should_terminate = threading.Event()
- # We address these locally, in case of shutdown socket may be gc'd
- # before we are
- self._socket_timeout = socket.timeout
- self._socket_error = socket.error
- # Map from pid => (temp_path_for_handles, request_socket)
- self._child_processes = {}
- self._children_spawned = 0
- self._child_connect_timeout = self.CHILD_CONNECT_TIMEOUT
-
- def _create_master_socket(self):
- self._server_socket = socket.socket(
- socket.AF_UNIX, socket.SOCK_STREAM)
- self._server_socket.bind(self.master_socket_path)
- if self._perms is not None:
- os.chmod(self.master_socket_path, self._perms)
- self._server_socket.listen(5)
- self._server_socket.settimeout(self.SOCKET_TIMEOUT)
- trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
-
- def _cleanup_master_socket(self):
- self._server_socket.close()
- try:
- os.remove(self.master_socket_path)
- except (OSError, IOError):
- # If we don't delete it, then we get 'address already in
- # use' failures.
- trace.mutter('failed to cleanup: %s' % (self.master_socket_path,))
-
- def _handle_sigchld(self, signum, frm):
- # We don't actually do anything here, we just want an interrupt
- # (EINTR) on socket.accept() when SIGCHLD occurs.
- pass
-
- def _handle_sigterm(self, signum, frm):
- # Unregister this as the default handler, 2 SIGTERMs will exit us.
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- # SIGTERM should also generate EINTR on our wait loop, so this
- # should be enough.
- self._should_terminate.set()
-
- def _register_signals(self):
- """Register a SIGCHILD and SIGTERM handler.
-
- If we have a trigger for SIGCHILD then we can quickly respond to
- clients when their process exits. The main risk is getting more EAGAIN
- errors elsewhere.
-
- SIGTERM allows us to cleanup nicely before we exit.
- """
- signal.signal(signal.SIGCHLD, self._handle_sigchld)
- signal.signal(signal.SIGTERM, self._handle_sigterm)
-
- def _unregister_signals(self):
- signal.signal(signal.SIGCHLD, signal.SIG_DFL)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
-
- def _compute_paths(self, base_path):
- stdin_path = os.path.join(base_path, 'stdin')
- stdout_path = os.path.join(base_path, 'stdout')
- stderr_path = os.path.join(base_path, 'stderr')
- return (stdin_path, stdout_path, stderr_path)
-
- def _create_child_file_descriptors(self, base_path):
- stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
- os.mkfifo(stdin_path)
- os.mkfifo(stdout_path)
- os.mkfifo(stderr_path)
-
- def _set_blocking(self, fd):
- """Change the file descriptor to unset the O_NONBLOCK flag."""
- flags = fcntl.fcntl(fd, fcntl.F_GETFD)
- flags = flags & (~os.O_NONBLOCK)
- fcntl.fcntl(fd, fcntl.F_SETFD, flags)
-
- def _open_handles(self, base_path):
- """Open the given file handles.
-
- This will attempt to open all of these file handles, but will not
- block while opening them, timing out after self._child_connect_timeout
- seconds.
-
- :param base_path: The directory where all FIFOs are located.
- :return: (stdin_fid, stdout_fid, stderr_fid).
- """
- stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
- # These open calls will block until another process connects (which
- # must connect in the same order)
- fids = []
- to_open = [(stdin_path, os.O_RDONLY), (stdout_path, os.O_WRONLY),
- (stderr_path, os.O_WRONLY)]
- # If we set it to 0, we won't get an alarm, so require some time > 0.
- signal.alarm(max(1, self._child_connect_timeout))
- tstart = time.time()
- for path, flags in to_open:
- try:
- fids.append(os.open(path, flags))
- except OSError:
- # In production code, signal.alarm will generally just kill
- # us. But if something installs a signal handler for SIGALRM,
- # do what we can to die gracefully.
- error = ('After %.3fs we failed to open %s, exiting'
- % (time.time() - tstart, path,))
- trace.warning(error)
- for fid in fids:
- try:
- os.close(fid)
- except OSError:
- pass
- raise errors.BzrError(error)
- # If we get to here, that means all the handles were opened
- # successfully, so cancel the wakeup call.
- signal.alarm(0)
- return fids
-
- def _cleanup_fifos(self, base_path):
- """Remove the FIFO objects and directory from disk."""
- stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
- # Now that we've opened the handles, delete everything so that
- # we don't leave garbage around. Because the open() is done in
- # blocking mode, we know that someone has already connected to
- # them, and we don't want anyone else getting confused and
- # connecting.
- # See [Decision #5].
- os.remove(stdin_path)
- os.remove(stdout_path)
- os.remove(stderr_path)
- os.rmdir(base_path)
-
- def _bind_child_file_descriptors(self, base_path):
- # Note: by this point breezy has opened stderr for logging
- # (as part of starting the service process in the first place).
- # As such, it has a stream handler that writes to stderr.
- # logging tries to flush and close that, but the file is already
- # closed.
- # This just supresses that exception.
- stdin_fid, stdout_fid, stderr_fid = self._open_handles(base_path)
- logging.raiseExceptions = False
- sys.stdin.close()
- sys.stdout.close()
- sys.stderr.close()
- os.dup2(stdin_fid, 0)
- os.dup2(stdout_fid, 1)
- os.dup2(stderr_fid, 2)
- sys.stdin = os.fdopen(stdin_fid, 'rb')
- sys.stdout = os.fdopen(stdout_fid, 'wb')
- sys.stderr = os.fdopen(stderr_fid, 'wb')
- ui.ui_factory.stdin = sys.stdin
- ui.ui_factory.stdout = sys.stdout
- ui.ui_factory.stderr = sys.stderr
- self._cleanup_fifos(base_path)
-
- def _close_child_file_descriptors(self):
- sys.stdin.close()
- sys.stderr.close()
- sys.stdout.close()
-
- def become_child(self, command_argv, path):
- """We are in the spawned child code, do our magic voodoo."""
- retcode = 127 # Failed in a bad way, poor cleanup, etc.
- try:
- # Stop tracking new signals
- self._unregister_signals()
- # Reset the start time
- trace._bzr_log_start_time = time.time()
- trace.mutter('%d starting %r'
- % (os.getpid(), command_argv))
- self._bind_child_file_descriptors(path)
- retcode = self._run_child_command(command_argv)
- finally:
- # We force os._exit() here, because we don't want to unwind
- # the stack, which has complex results. (We can get it to
- # unwind back to the cmd_launchpad_forking_service code, and
- # even back to main() reporting thereturn code, but after
- # that, suddenly the return code changes from a '0' to a
- # '1', with no logging of info.
- os._exit(retcode)
-
- def _run_child_command(self, command_argv):
- # This is the point where we would actually want to do something with
- # our life
- # TODO: We may want to consider special-casing the 'lp-serve'
- # command. As that is the primary use-case for this service, it
- # might be interesting to have an already-instantiated instance,
- # where we can just pop on an extra argument and be ready to go.
- # However, that would probably only really be measurable if we
- # prefork. As it looks like ~200ms is 'fork()' time, but only
- # 50ms is run-the-command time.
- retcode = commands.run_bzr_catch_errors(command_argv)
- self._close_child_file_descriptors()
- trace.mutter('%d finished %r'
- % (os.getpid(), command_argv))
- # TODO: Should we call sys.exitfunc() here? it allows atexit
- # functions to fire, however, some of those may be still
- # around from the parent process, which we don't really want.
- sys.exitfunc()
- # See [Decision #6]
- return retcode
-
- @staticmethod
- def command_to_argv(command_str):
- """Convert a 'foo bar' style command to [u'foo', u'bar']"""
- # command_str must be a utf-8 string
- return [s.decode('utf-8') for s in shlex.split(command_str)]
-
- @staticmethod
- def parse_env(env_str):
- """Convert the environment information into a dict.
-
- :param env_str: A string full of environment variable declarations.
- Each key is simple ascii "key: value\n"
- The string must end with "end\n".
- :return: A dict of environment variables
- """
- env = {}
- if not env_str.endswith('end\n'):
- raise ValueError('Invalid env-str: %r' % (env_str,))
- env_str = env_str[:-5]
- if not env_str:
- return env
- env_entries = env_str.split('\n')
- for entry in env_entries:
- key, value = entry.split(': ', 1)
- env[key] = value
- return env
-
- def fork_one_request(self, conn, client_addr, command_argv, env):
- """Fork myself and serve a request."""
- temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
- # Now that we've set everything up, send the response to the
- # client we create them first, so the client can start trying to
- # connect to them, while we fork and have the child do the same.
- self._children_spawned += 1
- pid = self._fork_function()
- if pid == 0:
- pid = os.getpid()
- trace.mutter('%d spawned' % (pid,))
- self._server_socket.close()
- for env_var, value in six.iteritems(env):
- osutils.set_or_unset_env(env_var, value)
- # See [Decision #3]
- self._create_child_file_descriptors(temp_name)
- conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
- conn.close()
- self.become_child(command_argv, temp_name)
- trace.warning('become_child returned!!!')
- sys.exit(1)
- else:
- self._child_processes[pid] = (temp_name, conn)
- self.log(client_addr, 'Spawned process %s for %r: %s'
- % (pid, command_argv, temp_name))
-
- def main_loop(self):
- self._start_time = time.time()
- self._should_terminate.clear()
- self._register_signals()
- self._create_master_socket()
- trace.note('Listening on socket: %s' % (self.master_socket_path,))
- try:
- try:
- self._do_loop()
- finally:
- # Stop talking to others, we are shutting down
- self._cleanup_master_socket()
- except KeyboardInterrupt:
- # SIGINT received, try to shutdown cleanly
- pass
- trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
- % (self.WAIT_FOR_CHILDREN_TIMEOUT,
- len(self._child_processes)))
- self._shutdown_children()
- trace.note('Exiting')
-
- def _do_loop(self):
- while not self._should_terminate.is_set():
- try:
- conn, client_addr = self._server_socket.accept()
- except self._socket_timeout:
- pass # Run shutdown and children checks.
- except self._socket_error as e:
- if e.args[0] == errno.EINTR:
- pass # Run shutdown and children checks.
- elif e.args[0] != errno.EBADF:
- # We can get EBADF here while we are shutting down
- # So we just ignore it for now
- pass
- else:
- # Log any other failure mode
- trace.warning("listening socket error: %s", e)
- else:
- self.log(client_addr, 'connected')
- # TODO: We should probably trap exceptions coming out of
- # this and log them, so that we don't kill the service
- # because of an unhandled error.
- # Note: settimeout is used so that a malformed request
- # doesn't cause us to hang forever. Also note that the
- # particular implementation means that a malicious
- # client could probably send us one byte every once in a
- # while, and we would just keep trying to read it.
- # However, as a local service, we aren't worrying about
- # it.
- conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
- try:
- self.serve_one_connection(conn, client_addr)
- except self._socket_timeout as e:
- trace.log_exception_quietly()
- self.log(
- client_addr, 'request timeout failure: %s' % (e,))
- conn.sendall('FAILURE\nrequest timed out\n')
- conn.close()
- except Exception as e:
- trace.log_exception_quietly()
- self.log(client_addr, 'trapped a failure while handling'
- ' connection: %s' % (e,))
- self._poll_children()
-
- def log(self, client_addr, message):
- """Log a message to the trace log.
-
- Include the information about what connection is being served.
- """
- if client_addr is not None:
- # Note, we don't use conn.getpeername() because if a client
- # disconnects before we get here, that raises an exception
- conn_info = '[%s] ' % (client_addr,)
- else:
- conn_info = ''
- trace.mutter('%s%s' % (conn_info, message))
-
- def log_information(self):
- """Log the status information.
-
- This includes stuff like number of children, and ... ?
- """
- self._poll_children()
- self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
- self.log(None, '%d children currently running (spawned %d total)'
- % (len(self._child_processes), self._children_spawned))
- # Read the current information about memory consumption, etc.
- self.log(None, 'Self: %s'
- % (resource.getrusage(resource.RUSAGE_SELF),))
- # This seems to be the sum of all rusage for all children that have
- # been collected (not for currently running children, or ones we
- # haven't "wait"ed on.) We may want to read /proc/PID/status, since
- # 'live' information is probably more useful.
- self.log(None, 'Finished children: %s'
- % (resource.getrusage(resource.RUSAGE_CHILDREN),))
-
- def _poll_children(self):
- """See if children are still running, etc.
-
- One interesting hook here would be to track memory consumption, etc.
- """
- while self._child_processes:
- try:
- c_id, exit_code, rusage = os.wait3(os.WNOHANG)
- except OSError as e:
- if e.errno == errno.ECHILD:
- # TODO: We handle this right now because the test suite
- # fakes a child, since we wanted to test some code
- # without actually forking anything
- trace.mutter('_poll_children() called, and'
- ' self._child_processes indicates there are'
- ' children, but os.wait3() says there are not.'
- ' current_children: %s' % (self._child_processes,))
- return
- if c_id == 0:
- # No more children stopped right now
- return
- c_path, sock = self._child_processes.pop(c_id)
- trace.mutter('%s exited %s and usage: %s'
- % (c_id, exit_code, rusage))
- # Cleanup the child path, before mentioning it exited to the
- # caller. This avoids a race condition in the test suite.
- if os.path.exists(c_path):
- # The child failed to cleanup after itself, do the work here
- trace.warning('Had to clean up after child %d: %s\n'
- % (c_id, c_path))
- shutil.rmtree(c_path, ignore_errors=True)
- # See [Decision #4]
- try:
- sock.sendall('exited\n%s\n' % (exit_code,))
- except (self._socket_timeout, self._socket_error) as e:
- # The client disconnected before we wanted them to,
- # no big deal
- trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
- else:
- sock.close()
-
- def _wait_for_children(self, secs):
- start = time.time()
- end = start + secs
- while self._child_processes:
- self._poll_children()
- if secs > 0 and time.time() > end:
- break
- time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)
-
- def _shutdown_children(self):
- self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
- if self._child_processes:
- trace.warning('Children still running: %s'
- % ', '.join(map(str, self._child_processes)))
- for c_id in self._child_processes:
- trace.warning('sending SIGINT to %d' % (c_id,))
- os.kill(c_id, signal.SIGINT)
- # We sent the SIGINT signal, see if they exited
- self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
- if self._child_processes:
- # No? Then maybe something more powerful
- for c_id in self._child_processes:
- trace.warning('sending SIGKILL to %d' % (c_id,))
- os.kill(c_id, signal.SIGKILL)
- # We sent the SIGKILL signal, see if they exited
- self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
- if self._child_processes:
- for c_id, (c_path, sock) in six.iteritems(self._child_processes):
- # TODO: We should probably put something into this message?
- # However, the likelyhood is very small that this isn't
- # already closed because of SIGKILL + _wait_for_children
- # And I don't really know what to say...
- sock.close()
- if os.path.exists(c_path):
- trace.warning('Cleaning up after immortal child %d: %s\n'
- % (c_id, c_path))
- shutil.rmtree(c_path)
-
- def _parse_fork_request(self, conn, client_addr, request):
- if request.startswith('fork-env '):
- while not request.endswith('end\n'):
- request += osutils.read_bytes_from_socket(conn)
- command, env = request[9:].split('\n', 1)
- else:
- command = request[5:].strip()
- env = 'end\n' # No env set.
- try:
- command_argv = self.command_to_argv(command)
- env = self.parse_env(env)
- except Exception as e:
- # TODO: Log the traceback?
- self.log(client_addr, 'command or env parsing failed: %r'
- % (str(e),))
- conn.sendall('FAILURE\ncommand or env parsing failed: %r'
- % (str(e),))
- else:
- return command_argv, env
- return None, None
-
- def serve_one_connection(self, conn, client_addr):
- request = ''
- while '\n' not in request:
- request += osutils.read_bytes_from_socket(conn)
- # telnet likes to use '\r\n' rather than '\n', and it is nice to have
- # an easy way to debug.
- request = request.replace('\r\n', '\n')
- self.log(client_addr, 'request: %r' % (request,))
- if request == 'hello\n':
- conn.sendall('ok\nyep, still alive\n')
- self.log_information()
- conn.close()
- elif request == 'quit\n':
- self._should_terminate.set()
- conn.sendall('ok\nquit command requested... exiting\n')
- conn.close()
- elif request.startswith('child_connect_timeout '):
- try:
- value = int(request.split(' ', 1)[1])
- except ValueError as e:
- conn.sendall('FAILURE: %r\n' % (e,))
- else:
- self._child_connect_timeout = value
- conn.sendall('ok\n')
- conn.close()
- elif request.startswith('fork ') or request.startswith('fork-env '):
- command_argv, env = self._parse_fork_request(conn, client_addr,
- request)
- if command_argv is not None:
- # See [Decision #7]
- # TODO: Do we want to limit the number of children? And/or
- # prefork additional instances? (the design will need to
- # change if we prefork and run arbitrary commands.)
- self.fork_one_request(conn, client_addr, command_argv, env)
- # We don't close the conn like other code paths, since we use
- # it again later.
- else:
- conn.close()
- else:
- self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
- # See [Decision #8]
- conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
- conn.close()
-
-
-class cmd_launchpad_forking_service(Command):
- """Launch a long-running process, where you can ask for new processes.
-
- The process will block on a given AF_UNIX socket waiting for requests to
- be made. When a request is made, it will fork itself and redirect
- stdout/in/err to fifos on the filesystem, and start running the requested
- command. The caller will be informed where those file handles can be
- found. Thus it only makes sense that the process connecting to the port
- must be on the same system.
- """
-
- aliases = ['lp-service']
-
- takes_options = [Option('path',
- help='Listen for connections at PATH',
- type=str),
- Option('perms',
- help='Set the mode bits for the socket, interpreted'
- ' as an octal integer (same as chmod)'),
- Option('preload',
- help="Do/don't preload libraries before startup."),
- Option('children-timeout', type=int, argname='SEC',
- help="Only wait SEC seconds for children to exit"),
- Option('pid-file', type=unicode,
- help='Write the process PID to this file.')
- ]
-
- def _preload_libraries(self):
- for pyname in libraries_to_preload:
- try:
- __import__(pyname)
- except ImportError as e:
- trace.mutter('failed to preload %s: %s' % (pyname, e))
-
- def _daemonize(self, pid_filename):
- """Turn this process into a child-of-init daemon.
-
- Upon request, we relinquish our control and switch to daemon mode,
- writing out the final pid of the daemon process.
- """
- # If fork fails, it will bubble out naturally and be reported by the
- # cmd logic
- pid = os.fork()
- if pid > 0:
- # Original process exits cleanly
- os._exit(0)
-
- # Disconnect from the parent process
- os.setsid()
-
- # fork again, to truly become a daemon.
- pid = os.fork()
- if pid > 0:
- os._exit(0)
-
- # Redirect file handles
- stdin = open('/dev/null', 'r')
- os.dup2(stdin.fileno(), sys.stdin.fileno())
- stdout = open('/dev/null', 'a+')
- os.dup2(stdout.fileno(), sys.stdout.fileno())
- stderr = open('/dev/null', 'a+', 0)
- os.dup2(stderr.fileno(), sys.stderr.fileno())
-
- # Now that we are a daemon, let people know what pid is running
- f = open(pid_filename, 'wb')
- try:
- f.write('%d\n' % (os.getpid(),))
- finally:
- f.close()
-
- def run(self, path=None, perms=None, preload=True,
- children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT,
- pid_file=None):
- if pid_file is not None:
- self._daemonize(pid_file)
- if path is None:
- path = LPForkingService.DEFAULT_PATH
- if perms is None:
- perms = LPForkingService.DEFAULT_PERMISSIONS
- if preload:
- # We 'note' this because it often takes a fair amount of time.
- trace.note('Preloading %d modules' % (len(libraries_to_preload),))
- self._preload_libraries()
- service = LPForkingService(path, perms)
- service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
- service.main_loop()
- if pid_file is not None:
- try:
- os.remove(pid_file)
- except (OSError, IOError) as e:
- trace.mutter('Failed to cleanup pid_file: %s\n%s'
- % (pid_file, e))
-
-register_command(cmd_launchpad_forking_service)
-
-
-class cmd_launchpad_replay(Command):
- """Write input from stdin back to stdout or stderr.
-
- This is a hidden command, primarily available for testing
- cmd_launchpad_forking_service.
- """
-
- hidden = True
-
- def run(self):
- # Just read line-by-line from stdin, and write out to stdout or stderr
- # depending on the prefix
- for line in sys.stdin:
- channel, contents = line.split(' ', 1)
- channel = int(channel)
- if channel == 1:
- sys.stdout.write(contents)
- sys.stdout.flush()
- elif channel == 2:
- sys.stderr.write(contents)
- sys.stderr.flush()
- else:
- raise RuntimeError('Invalid channel request.')
- return 0
-
-register_command(cmd_launchpad_replay)
-
-# This list was generated by "run lsprof"ing a spawned child, and
-# looking for <module ...> times, which indicate that an import
-# occurred. Another option is to run "bzr lp-serve --profile-imports"
-# manually, and observe what was expensive to import. It doesn't seem
-# very easy to get this right automatically.
-libraries_to_preload = [
- 'breezy.bzr.groupcompress_repo',
- 'breezy.bzr.smart',
- 'breezy.bzr.smart.protocol',
- 'breezy.bzr.smart.request',
- 'breezy.bzr.smart.server',
- 'breezy.bzr.smart.vfs',
- 'breezy.errors',
- 'breezy.repository',
- 'breezy.transport.local',
- 'breezy.transport.readonly',
- 'lp.codehosting.bzrutils',
- 'lp.codehosting.vfs',
- 'lp.codehosting.vfs.branchfs',
- 'lp.codehosting.vfs.branchfsclient',
- 'lp.codehosting.vfs.hooks',
- 'lp.codehosting.vfs.transport',
- ]
-
-
def load_tests(standard_tests, module, loader):
standard_tests.addTests(loader.loadTestsFromModuleNames(
[__name__ + '.' + x for x in [
diff --git a/brzplugins/lpserve/test_lpserve.py b/brzplugins/lpserve/test_lpserve.py
index 1420ed7..a9563ec 100644
--- a/brzplugins/lpserve/test_lpserve.py
+++ b/brzplugins/lpserve/test_lpserve.py
@@ -1,313 +1,19 @@
# Copyright 2010-2017 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-import errno
import os
-import shutil
-import signal
-import socket
import subprocess
-import tempfile
-import threading
-import time
from breezy import (
- errors,
osutils,
tests,
- trace,
)
-from breezy.plugins import lpserve
import six
-from testtools import content
from lp.codehosting import (
get_brz_path,
get_BRZ_PLUGIN_PATH_for_subprocess,
)
-from lp.testing.fakemethod import FakeMethod
-
-
-class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
- """A test-double to run a "forking service" in a thread.
-
- Note that we don't allow actually forking, but it does allow us to
- interact with the service for other operations.
- """
-
- # For testing we set the timeouts much lower, because we want the
- # tests to run quickly.
- WAIT_FOR_CHILDREN_TIMEOUT = 0.5
- SOCKET_TIMEOUT = 0.01
- SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
- WAIT_FOR_REQUEST_TIMEOUT = 0.1
-
- # We're running in a thread as part of the test suite. Blow up at
- # any attempt to fork.
- _fork_function = None
-
- def __init__(self, path, perms=None):
- self.service_started = threading.Event()
- self.service_stopped = threading.Event()
- self.this_thread = None
- self.fork_log = []
- super(TestingLPForkingServiceInAThread, self).__init__(
- path=path, perms=None)
-
- def _register_signals(self):
- # Don't register it for the test suite.
- pass
-
- def _unregister_signals(self):
- # We don't fork, and didn't register, so don't unregister.
- pass
-
- def _create_master_socket(self):
- super(TestingLPForkingServiceInAThread, self)._create_master_socket()
- self.service_started.set()
-
- def main_loop(self):
- self.service_stopped.clear()
- super(TestingLPForkingServiceInAThread, self).main_loop()
- self.service_stopped.set()
-
- def fork_one_request(self, conn, client_addr, command, env):
- # We intentionally don't allow the test suite to request a fork, as
- # threads + forks and everything else don't exactly play well together
- self.fork_log.append((command, env))
- conn.sendall('ok\nfake forking\n')
- conn.close()
-
- @staticmethod
- def start_service(test):
- """Start a new LPForkingService in a thread at a random path.
-
- This will block until the service has created its socket, and is ready
- to communicate.
-
- :return: A new TestingLPForkingServiceInAThread instance
- """
- fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
- suffix='.sock')
- # We don't want a temp file, we want a temp socket
- os.close(fd)
- os.remove(path)
- new_service = TestingLPForkingServiceInAThread(path=path)
- thread = threading.Thread(target=new_service.main_loop,
- name='TestingLPForkingServiceInAThread')
- new_service.this_thread = thread
- # should we be doing thread.daemon = True ?
- thread.start()
- new_service.service_started.wait(10.0)
- if not new_service.service_started.is_set():
- raise RuntimeError(
- 'Failed to start the TestingLPForkingServiceInAThread')
- test.addCleanup(new_service.stop_service)
- # what about returning new_service._sockname ?
- return new_service
-
- def stop_service(self):
- """Stop the test-server thread. This can be called multiple times."""
- if self.this_thread is None:
- # We already stopped the process
- return
- self._should_terminate.set()
- self.service_stopped.wait(10.0)
- if not self.service_stopped.is_set():
- raise RuntimeError(
- 'Failed to stop the TestingLPForkingServiceInAThread')
- self.this_thread.join()
- # Break any refcycles
- self.this_thread = None
-
-
-class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
-
- def test_start_and_stop_service(self):
- service = TestingLPForkingServiceInAThread.start_service(self)
- service.stop_service()
-
- def test_multiple_stops(self):
- service = TestingLPForkingServiceInAThread.start_service(self)
- service.stop_service()
- # calling stop_service repeatedly is a no-op (and not an error)
- service.stop_service()
-
- def test_autostop(self):
- # We shouldn't leak a thread here, as it should be part of the test
- # case teardown.
- TestingLPForkingServiceInAThread.start_service(self)
-
-
-class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
-
- def setUp(self):
- super(TestCaseWithLPForkingService, self).setUp()
- self.service = TestingLPForkingServiceInAThread.start_service(self)
-
- def send_message_to_service(self, message, one_byte_at_a_time=False):
- client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- client_sock.connect(self.service.master_socket_path)
- if one_byte_at_a_time:
- for byte in message:
- client_sock.send(byte)
- else:
- client_sock.sendall(message)
- response = client_sock.recv(1024)
- return response
-
-
-class TestLPForkingServiceCommandToArgv(tests.TestCase):
-
- def assertAsArgv(self, argv, command_str):
- self.assertEqual(argv,
- lpserve.LPForkingService.command_to_argv(command_str))
-
- def test_simple(self):
- self.assertAsArgv([u'foo'], 'foo')
- self.assertAsArgv([u'foo', u'bar'], 'foo bar')
-
- def test_quoted(self):
- self.assertAsArgv([u'foo'], 'foo')
- self.assertAsArgv([u'foo bar'], '"foo bar"')
-
- def test_unicode(self):
- self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
-
-
-class TestLPForkingServiceParseEnv(tests.TestCase):
-
- def assertEnv(self, env, env_str):
- self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
-
- def assertInvalid(self, env_str):
- self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
- env_str)
-
- def test_no_entries(self):
- self.assertEnv({}, 'end\n')
-
- def test_one_entries(self):
- self.assertEnv({'BRZ_EMAIL': 'joe@xxxxxxx'},
- 'BRZ_EMAIL: joe@xxxxxxx\n'
- 'end\n')
-
- def test_two_entries(self):
- self.assertEnv({'BRZ_EMAIL': 'joe@xxxxxxx', 'BAR': 'foo'},
- 'BRZ_EMAIL: joe@xxxxxxx\n'
- 'BAR: foo\n'
- 'end\n')
-
- def test_invalid_empty(self):
- self.assertInvalid('')
-
- def test_invalid_end(self):
- self.assertInvalid("BRZ_EMAIL: joe@xxxxxxx\n")
-
- def test_invalid_entry(self):
- self.assertInvalid("BRZ_EMAIL joe@xxxxxxx\nend\n")
-
-
-class TestLPForkingService(TestCaseWithLPForkingService):
-
- def test_send_quit_message(self):
- response = self.send_message_to_service('quit\n')
- self.assertEqual('ok\nquit command requested... exiting\n', response)
- self.service.service_stopped.wait(10.0)
- self.assertTrue(self.service.service_stopped.is_set())
-
- def test_send_invalid_message_fails(self):
- response = self.send_message_to_service('unknown\n')
- self.assertStartsWith(response, 'FAILURE')
-
- def test_send_hello_heartbeat(self):
- response = self.send_message_to_service('hello\n')
- self.assertEqual('ok\nyep, still alive\n', response)
-
- def test_send_simple_fork(self):
- response = self.send_message_to_service('fork rocks\n')
- self.assertEqual('ok\nfake forking\n', response)
- self.assertEqual([(['rocks'], {})], self.service.fork_log)
-
- def test_send_fork_env_with_empty_env(self):
- response = self.send_message_to_service(
- 'fork-env rocks\n'
- 'end\n')
- self.assertEqual('ok\nfake forking\n', response)
- self.assertEqual([(['rocks'], {})], self.service.fork_log)
-
- def test_send_fork_env_with_env(self):
- response = self.send_message_to_service(
- 'fork-env rocks\n'
- 'BRZ_EMAIL: joe@xxxxxxxxxxx\n'
- 'end\n')
- self.assertEqual('ok\nfake forking\n', response)
- self.assertEqual([(['rocks'], {'BRZ_EMAIL': 'joe@xxxxxxxxxxx'})],
- self.service.fork_log)
-
- def test_send_fork_env_slowly(self):
- response = self.send_message_to_service(
- 'fork-env rocks\n'
- 'BRZ_EMAIL: joe@xxxxxxxxxxx\n'
- 'end\n', one_byte_at_a_time=True)
- self.assertEqual('ok\nfake forking\n', response)
- self.assertEqual([(['rocks'], {'BRZ_EMAIL': 'joe@xxxxxxxxxxx'})],
- self.service.fork_log)
-
- def test_send_incomplete_fork_env_timeout(self):
- # We should get a failure message if we can't quickly read the whole
- # content
- response = self.send_message_to_service(
- 'fork-env rocks\n'
- 'BRZ_EMAIL: joe@xxxxxxxxxxx\n',
- one_byte_at_a_time=True)
- # Note that we *don't* send a final 'end\n'
- self.assertStartsWith(response, 'FAILURE\n')
-
- def test_send_incomplete_request_timeout(self):
- # Requests end with '\n', send one without it
- response = self.send_message_to_service('hello',
- one_byte_at_a_time=True)
- self.assertStartsWith(response, 'FAILURE\n')
-
- def test_child_connection_timeout(self):
- self.assertEqual(self.service.CHILD_CONNECT_TIMEOUT,
- self.service._child_connect_timeout)
- response = self.send_message_to_service('child_connect_timeout 1\n')
- self.assertEqual('ok\n', response)
- self.assertEqual(1, self.service._child_connect_timeout)
-
- def test_child_connection_timeout_bad_float(self):
- self.assertEqual(self.service.CHILD_CONNECT_TIMEOUT,
- self.service._child_connect_timeout)
- response = self.send_message_to_service('child_connect_timeout 1.2\n')
- self.assertStartsWith(response, 'FAILURE:')
-
- def test_child_connection_timeout_no_val(self):
- response = self.send_message_to_service('child_connect_timeout \n')
- self.assertStartsWith(response, 'FAILURE:')
-
- def test_child_connection_timeout_bad_val(self):
- response = self.send_message_to_service('child_connect_timeout b\n')
- self.assertStartsWith(response, 'FAILURE:')
-
- def test__open_handles_will_timeout(self):
- # signal.alarm() has only 1-second granularity. :(
- self.service._child_connect_timeout = 1
- tempdir = tempfile.mkdtemp(prefix='testlpserve-')
- self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
- os.mkfifo(os.path.join(tempdir, 'stdin'))
- os.mkfifo(os.path.join(tempdir, 'stdout'))
- os.mkfifo(os.path.join(tempdir, 'stderr'))
-
- # catch SIGALRM so we don't stop the test suite. It will still
- # interupt the blocking open() calls.
- signal.signal(signal.SIGALRM, FakeMethod())
-
- self.addCleanup(signal.signal, signal.SIGALRM, signal.SIG_DFL)
- e = self.assertRaises(errors.BzrError,
- self.service._open_handles, tempdir)
- self.assertContainsRe(str(e), r'After \d+.\d+s we failed to open.*')
class TestCaseWithSubprocess(tests.TestCaseWithTransport):
@@ -365,386 +71,3 @@ class TestCaseWithSubprocess(tests.TestCaseWithTransport):
os.chdir(cwd)
return process
-
-
-class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
- """Tests will get a separate process to communicate to.
-
- The number of these tests should be small, because it is expensive to
- start and stop the daemon.
-
- TODO: This should probably use testresources, or layers somehow...
- """
-
- def setUp(self):
- super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
- (self.service_process,
- self.service_path) = self.start_service_subprocess()
- self.addCleanup(self.stop_service)
-
- def start_conversation(self, message, one_byte_at_a_time=False):
- """Start talking to the service, and get the initial response."""
- client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- trace.mutter('sending %r to socket %s' % (message, self.service_path))
- client_sock.connect(self.service_path)
- if one_byte_at_a_time:
- for byte in message:
- client_sock.send(byte)
- else:
- client_sock.sendall(message)
- response = client_sock.recv(1024)
- trace.mutter('response: %r' % (response,))
- if response.startswith("FAILURE"):
- raise RuntimeError('Failed to send message: %r' % (response,))
- return response, client_sock
-
- def send_message_to_service(self, message, one_byte_at_a_time=False):
- response, client_sock = self.start_conversation(message,
- one_byte_at_a_time=one_byte_at_a_time)
- client_sock.close()
- return response
-
- def send_fork_request(self, command, env=None):
- if env is not None:
- request_lines = ['fork-env %s\n' % (command,)]
- for key, value in six.iteritems(env):
- request_lines.append('%s: %s\n' % (key, value))
- request_lines.append('end\n')
- request = ''.join(request_lines)
- else:
- request = 'fork %s\n' % (command,)
- response, sock = self.start_conversation(request)
- ok, pid, path, tail = response.split('\n')
- self.assertEqual('ok', ok)
- self.assertEqual('', tail)
- # Don't really care what it is, but should be an integer
- pid = int(pid)
- path = path.strip()
- self.assertContainsRe(path, '/lp-forking-service-child-')
- return path, pid, sock
-
- def _start_subprocess(self, path, env_changes):
- proc = self.start_bzr_subprocess(
- ['lp-service', '--path', path, '--no-preload',
- '--children-timeout=1'],
- env_changes=env_changes)
- trace.mutter('started lp-service subprocess')
- expected = 'Listening on socket: %s\n' % (path,)
- while True:
- path_line = proc.stderr.readline()
- # Stop once we have found the path line.
- if path_line.startswith('Listening on socket:'):
- break
- # If the subprocess has finished, there is no more to read.
- if proc.poll() is not None:
- break
- trace.mutter(path_line)
- self.assertEqual(expected, path_line)
- return proc
-
- def start_service_subprocess(self):
- # Make sure this plugin is exposed to the subprocess
- # SLOOWWW (~2 seconds, which is why we are doing the work anyway)
- fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
- # I'm not 100% sure about when cleanup runs versus addDetail, but I
- # think this will work.
- self.addCleanup(os.remove, tempname)
-
- def read_log():
- f = os.fdopen(fd)
- f.seek(0)
- content = f.read()
- f.close()
- return [content]
- self.addDetail('server-log', content.Content(
- content.ContentType('text', 'plain', {"charset": "utf8"}),
- read_log))
- service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-',
- suffix='.sock')
- os.close(service_fd)
- # The service wants to create this file as a socket.
- os.remove(path)
- env_changes = {
- 'BRZ_PLUGIN_PATH': lpserve.__path__[0],
- 'BRZ_LOG': tempname,
- }
- proc = self._start_subprocess(path, env_changes)
- return proc, path
-
- def stop_service(self):
- if self.service_process is None:
- # Already stopped.
- return
- # First, try to stop the service gracefully, by sending a 'quit'
- # message.
- try:
- response = self.send_message_to_service('quit\n')
- except socket.error:
- # Ignore a failure to connect; the service must be
- # stopping/stopped already.
- response = None
- tend = time.time() + 10.0
- while self.service_process.poll() is None:
- if time.time() > tend:
- self.finish_bzr_subprocess(process=self.service_process,
- send_signal=signal.SIGINT, retcode=3)
- self.fail('Failed to quit gracefully after 10.0 seconds')
- time.sleep(0.1)
- if response is not None:
- self.assertEqual('ok\nquit command requested... exiting\n',
- response)
-
- def _get_fork_handles(self, path):
- trace.mutter('getting handles for: %s' % (path,))
- stdin_path = os.path.join(path, 'stdin')
- stdout_path = os.path.join(path, 'stdout')
- stderr_path = os.path.join(path, 'stderr')
- # The ordering must match the ordering of the service or we get a
- # deadlock.
- child_stdin = open(stdin_path, 'wb', 0)
- child_stdout = open(stdout_path, 'rb', 0)
- child_stderr = open(stderr_path, 'rb', 0)
- return child_stdin, child_stdout, child_stderr
-
- def communicate_with_fork(self, path, stdin=None):
- child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
- if stdin is not None:
- child_stdin.write(stdin)
- child_stdin.close()
- stdout_content = child_stdout.read()
- stderr_content = child_stderr.read()
- return stdout_content, stderr_content
-
- def assertReturnCode(self, expected_code, sock):
- """Assert that we get the expected return code as a message."""
- response = sock.recv(1024)
- self.assertStartsWith(response, 'exited\n')
- code = int(response.split('\n', 1)[1])
- self.assertEqual(expected_code, code)
-
-
-class TestLPServiceInSubprocess(TestCaseWithLPForkingServiceSubprocess):
-
- def test_fork_lp_serve_hello(self):
- path, _, sock = self.send_fork_request('lp-serve --inet 2')
- stdout_content, stderr_content = self.communicate_with_fork(path,
- 'hello\n')
- self.assertEqual('ok\x012\n', stdout_content)
- self.assertEqual('', stderr_content)
- self.assertReturnCode(0, sock)
-
- def DONT_test_fork_lp_serve_multiple_hello(self):
- # This ensures that the fifos are all set to blocking mode
- # We can't actually run this test, because by default 'bzr serve
- # --inet' does not flush after each message. So we end up blocking
- # forever waiting for the server to finish responding to the first
- # request.
- path, _, sock = self.send_fork_request('lp-serve --inet 2')
- child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
- child_stdin.write('hello\n')
- child_stdin.flush()
- self.assertEqual('ok\x012\n', child_stdout.read())
- child_stdin.write('hello\n')
- self.assertEqual('ok\x012\n', child_stdout.read())
- child_stdin.close()
- self.assertEqual('', child_stderr.read())
- child_stdout.close()
- child_stderr.close()
-
- def test_fork_replay(self):
- path, _, sock = self.send_fork_request('launchpad-replay')
- stdout_content, stderr_content = self.communicate_with_fork(path,
- '1 hello\n2 goodbye\n1 maybe\n')
- self.assertEqualDiff('hello\nmaybe\n', stdout_content)
- self.assertEqualDiff('goodbye\n', stderr_content)
- self.assertReturnCode(0, sock)
-
- def test_just_run_service(self):
- # Start and stop are defined in setUp()
- pass
-
- def test_fork_multiple_children(self):
- paths = []
- for idx in range(4):
- paths.append(self.send_fork_request('launchpad-replay'))
- # Do them out of order, as order shouldn't matter.
- for idx in [3, 2, 0, 1]:
- p, pid, sock = paths[idx]
- stdout_msg = 'hello %d\n' % (idx,)
- stderr_msg = 'goodbye %d\n' % (idx + 1,)
- stdout, stderr = self.communicate_with_fork(p,
- '1 %s2 %s' % (stdout_msg, stderr_msg))
- self.assertEqualDiff(stdout_msg, stdout)
- self.assertEqualDiff(stderr_msg, stderr)
- self.assertReturnCode(0, sock)
-
- def test_fork_respects_env_vars(self):
- path, pid, sock = self.send_fork_request('whoami',
- env={'BRZ_EMAIL': 'this_test@xxxxxxxxxxx'})
- stdout_content, stderr_content = self.communicate_with_fork(path)
- self.assertEqual('', stderr_content)
- self.assertEqual('this_test@xxxxxxxxxxx\n', stdout_content)
-
- def _check_exits_nicely(self, sig_id):
- path, _, sock = self.send_fork_request('rocks')
- self.assertEqual(None, self.service_process.poll())
- # Now when we send SIGTERM, it should wait for the child to exit,
- # before it tries to exit itself.
- self.service_process.send_signal(sig_id)
- self.assertEqual(None, self.service_process.poll())
- # Now talk to the child, so the service can close
- stdout_content, stderr_content = self.communicate_with_fork(path)
- self.assertEqual('It sure does!\n', stdout_content)
- self.assertEqual('', stderr_content)
- self.assertReturnCode(0, sock)
- # And the process should exit cleanly
- self.assertEqual(0, self.service_process.wait())
-
- def test_sigterm_exits_nicely(self):
- self._check_exits_nicely(signal.SIGTERM)
-
- def test_sigint_exits_nicely(self):
- self._check_exits_nicely(signal.SIGINT)
-
- def test_child_exits_eventually(self):
- # We won't ever bind to the socket the child wants, and after some
- # time, the child should exit cleanly.
- # First, tell the subprocess that we want children to exit quickly.
- # *sigh* signal.alarm only has 1s resolution, so this test is slow.
- response = self.send_message_to_service('child_connect_timeout 1\n')
- self.assertEqual('ok\n', response)
- # Now request a fork.
- path, pid, sock = self.send_fork_request('rocks')
- # We started opening the child, but stop before we get all handles
- # open. After 1 second, the child should get signaled and die.
- # The master process should notice, and tell us the status of the
- # exited child.
- val = sock.recv(4096)
- self.assertEqual('exited\n%s\n' % (signal.SIGALRM,), val)
- # The master process should clean up after the now deceased child.
- self.assertPathDoesNotExist(path)
-
-
-class TestCaseWithLPForkingServiceDaemon(
- TestCaseWithLPForkingServiceSubprocess):
- """Test LPForkingService interaction, when run in daemon mode."""
-
- def _cleanup_daemon(self, pid, pid_filename):
- try:
- os.kill(pid, signal.SIGKILL)
- except (OSError, IOError) as e:
- trace.mutter('failed to kill pid %d, might be already dead: %s'
- % (pid, e))
- try:
- os.remove(pid_filename)
- except (OSError, IOError) as e:
- if e.errno != errno.ENOENT:
- trace.mutter('failed to remove %r: %s'
- % (pid_filename, e))
-
- def _start_subprocess(self, path, env_changes):
- fd, pid_filename = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
- suffix='.pid')
- self.service_pid_filename = pid_filename
- os.close(fd)
- proc = self.start_bzr_subprocess(
- ['lp-service', '--path', path, '--no-preload',
- '--children-timeout=1', '--pid-file', pid_filename],
- env_changes=env_changes)
- trace.mutter('started lp-service daemon')
- # We wait for the spawned process to exit, expecting it to report the
- # final pid into the pid_filename.
- tnow = time.time()
- tstop_waiting = tnow + 1.0
- # When this returns, the first fork has completed and the parent has
- # exited.
- proc.wait()
- while tnow < tstop_waiting:
- # Wait for the socket to become available
- if os.path.exists(path):
- # The service has created the socket for us to communicate
- break
- time.sleep(0.1)
- tnow = time.time()
-
- with open(pid_filename, 'rb') as f:
- pid = f.read()
- trace.mutter('found pid: %r' % (pid,))
- pid = int(pid.strip())
- # This is now the pid of the final daemon
- trace.mutter('lp-forking-service daemon at pid %s' % (pid,))
- # Because nothing else will clean this up, add this final handler to
- # clean up if all else fails.
- self.addCleanup(self._cleanup_daemon, pid, pid_filename)
- # self.service_process will now be the pid of the daemon,
- # rather than a Popen object.
- return pid
-
- def stop_service(self):
- if self.service_process is None:
- # Already stopped
- return
- # First, try to stop the service gracefully, by sending a 'quit'
- # message
- try:
- response = self.send_message_to_service('quit\n')
- except socket.error as e:
- # Ignore a failure to connect; the service must be
- # stopping/stopped already.
- response = None
- if response is not None:
- self.assertEqual('ok\nquit command requested... exiting\n',
- response)
- # Wait for the process to actually exit, or force it if necessary.
- tnow = time.time()
- tend = tnow + 2.0
- # We'll be nice a couple of times, and then get mean
- attempts = [None, None, None, signal.SIGTERM, signal.SIGKILL]
- stopped = False
- unclean = False
- while tnow < tend:
- try:
- os.kill(self.service_process, 0)
- except (OSError, IOError) as e:
- if e.errno == errno.ESRCH:
- # The process has successfully exited
- stopped = True
- break
- raise
- else:
- # The process has not exited yet
- time.sleep(0.1)
- if attempts:
- sig = attempts.pop(0)
- if sig is not None:
- unclean = True
- try:
- os.kill(self.service_process, sig)
- except (OSError, IOError) as e:
- if e.errno == errno.ESRCH:
- stopped = True
- break
- raise
- if not stopped:
- self.fail('Unable to stop the daemon process (pid %s) after 2.0s'
- % (self.service_process,))
- elif unclean:
- self.fail('Process (pid %s) had to be shut-down'
- % (self.service_process,))
- self.service_process = None
-
- def test_simple_start_and_stop(self):
- # All the work is done in setUp().
- pass
-
- def test_starts_and_cleans_up(self):
- # The service should be up and responsive.
- response = self.send_message_to_service('hello\n')
- self.assertEqual('ok\nyep, still alive\n', response)
- self.assertTrue(os.path.isfile(self.service_pid_filename))
- with open(self.service_pid_filename, 'rb') as f:
- content = f.read()
- self.assertEqualDiff('%d\n' % (self.service_process,), content)
- # We're done. Shut it down.
- self.stop_service()
- self.assertFalse(os.path.isfile(self.service_pid_filename))
diff --git a/configs/development/launchpad-lazr.conf b/configs/development/launchpad-lazr.conf
index 0e239fc..b8f2c55 100644
--- a/configs/development/launchpad-lazr.conf
+++ b/configs/development/launchpad-lazr.conf
@@ -47,7 +47,6 @@ bzr_lp_prefix: lp://dev/
lp_url_hosts: dev
access_log: /var/tmp/bazaar.launchpad.test/codehosting-access.log
blacklisted_hostnames:
-use_forking_daemon: True
internal_bzr_api_endpoint: http://bazaar.launchpad.test:8090/
internal_git_api_endpoint: http://git.launchpad.test:19417/
git_browse_root: https://git.launchpad.test/
diff --git a/lib/lp/codehosting/sshserver/session.py b/lib/lp/codehosting/sshserver/session.py
index a77855b..ef7825a 100644
--- a/lib/lp/codehosting/sshserver/session.py
+++ b/lib/lp/codehosting/sshserver/session.py
@@ -9,23 +9,13 @@ __all__ = [
]
import os
-import signal
-import socket
-import sys
from lazr.sshserver.events import AvatarEvent
from lazr.sshserver.session import DoNothingSession
-import six
-from six import reraise
from six.moves.urllib.parse import urlparse
-from twisted.internet import (
- error,
- interfaces,
- process,
- )
+from twisted.internet import process
from twisted.python import log
from zope.event import notify
-from zope.interface import implementer
from lp.codehosting import get_brz_path
from lp.services.config import config
@@ -45,266 +35,6 @@ class ForbiddenCommand(Exception):
"""Raised when a session is asked to execute a forbidden command."""
-class _WaitForExit(process.ProcessReader):
- """Wait on a socket for the exit status."""
-
- def __init__(self, reactor, proc, sock):
- super(_WaitForExit, self).__init__(reactor, proc, 'exit',
- sock.fileno())
- self._sock = sock
- self.connected = 1
-
- def close(self):
- self._sock.close()
-
- def dataReceived(self, data):
- # TODO: how do we handle getting only *some* of the content?, Maybe we
- # need to read more bytes first...
-
- # This is the only thing we do differently from the standard
- # ProcessReader. When we get data on this socket, we need to treat it
- # as a return code, or a failure.
- if not data.startswith('exited'):
- # Bad data, we want to signal that we are closing the connection
- # TODO: How?
- self.proc.childConnectionLost(self.name, "invalid data")
- self.close()
- # I don't know what to put here if we get bogus data, but I *do*
- # want to say that the process is now considered dead to me
- log.err('Got invalid exit information: %r' % (data,))
- exit_status = (255 << 8)
- else:
- exit_status = int(data.split('\n')[1])
- self.proc.processEnded(exit_status)
-
-
-@implementer(interfaces.IProcessTransport)
-class ForkedProcessTransport(process.BaseProcess):
- """Wrap the forked process in a ProcessTransport so we can talk to it.
-
- Note that instantiating the class creates the fork and sets it up in the
- reactor.
- """
-
- # Design decisions
- # [Decision #a]
- # Inherit from process.BaseProcess
- # This seems slightly risky, as process.BaseProcess is actually
- # imported from twisted.internet._baseprocess.BaseProcess. The
- # real-world Process then actually inherits from process._BaseProcess
- # I've also had to copy a fair amount from the actual Process
- # command.
- # One option would be to inherit from process.Process, and just
- # override stuff like __init__ and reapProcess which I don't want to
- # do in the same way. (Is it ok not to call your Base classes
- # __init__ if you don't want to do that exact work?)
- def __init__(self, reactor, executable, args, environment, proto):
- process.BaseProcess.__init__(self, proto)
- # Map from standard file descriptor to the associated pipe
- self.pipes = {}
- pid, path, sock = self._spawn(executable, args, environment)
- self._fifo_path = path
- self.pid = pid
- self.process_sock = sock
- self._fifo_path = path
- self._connectSpawnToReactor(reactor)
- if self.proto is not None:
- self.proto.makeConnection(self)
-
- def _sendMessageToService(self, message):
- """Send a message to the Forking service and get the response"""
- path = config.codehosting.forking_daemon_socket
- client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- log.msg('Connecting to Forking Service @ socket: %s for %r'
- % (path, message))
- try:
- client_sock.connect(path)
- client_sock.sendall(message)
- # We define the requests to be no bigger than 1kB. (For now)
- response = client_sock.recv(1024)
- except socket.error as e:
- # TODO: What exceptions should be raised?
- # Raising the raw exception seems to kill the twisted reactor
- # Note that if the connection is refused, we *could* just
- # fall back on a regular 'spawnProcess' call.
- log.err('Connection failed: %s' % (e,))
- raise
- if response.startswith("FAILURE"):
- client_sock.close()
- raise RuntimeError('Failed to send message: %r' % (response,))
- return response, client_sock
-
- def _spawn(self, executable, args, environment):
- """Start the new process.
-
- This talks to the ForkingSessionService and requests a new process be
- started. Similar to what Process.__init__/_fork would do.
-
- :return: The pid, communication directory, and request socket.
- """
- assert executable == 'brz', executable # Maybe .endswith()
- assert args[0] == 'brz', args[0]
- message = ['fork-env %s\n' % (' '.join(args[1:]),)]
- for key, value in six.iteritems(environment):
- # XXX: Currently we only pass BRZ_EMAIL, should we be passing
- # everything else? Note that many won't be handled properly,
- # since the process is already running.
- if key in ('BZR_EMAIL', 'BRZ_EMAIL'):
- # Map both of these to BRZ_EMAIL, since we run brz on the
- # server.
- message.append('BRZ_EMAIL: %s\n' % (value,))
- message.append('end\n')
- message = ''.join(message)
- response, sock = self._sendMessageToService(message)
- try:
- ok, pid, path, tail = response.split('\n')
- assert ok == 'ok'
- assert tail == ''
- pid = int(pid)
- log.msg('Forking returned pid: %d, path: %s' % (pid, path))
- except:
- sock.close()
- raise
- return pid, path, sock
-
- def _openHandleFailures(self, call_on_failure, path, flags, proc_class,
- reactor, child_fd):
- """Open the given path, adding a cleanup as appropriate.
-
- :param call_on_failure: A list holding (callback, args) tuples. We will
- append new entries for things that we open
- :param path: The path to open
- :param flags: Flags to pass to os.open
- :param proc_class: The ProcessWriter/ProcessReader class to wrap this
- connection.
- :param reactor: The Twisted reactor we are connecting to.
- :param child_fd: The child file descriptor number passed to proc_class
- """
- fd = os.open(path, flags)
- call_on_failure.append((os.close, fd))
- p = proc_class(reactor, self, child_fd, fd)
- # Now that p has been created, it will close fd for us. So switch the
- # cleanup to calling p.connectionLost()
- call_on_failure[-1] = (p.connectionLost, (None,))
- self.pipes[child_fd] = p
-
- def _connectSpawnToReactor(self, reactor):
- self._exiter = _WaitForExit(reactor, self, self.process_sock)
- call_on_failure = [(self._exiter.connectionLost, (None,))]
- stdin_path = os.path.join(self._fifo_path, 'stdin')
- stdout_path = os.path.join(self._fifo_path, 'stdout')
- stderr_path = os.path.join(self._fifo_path, 'stderr')
- try:
- self._openHandleFailures(call_on_failure, stdin_path, os.O_WRONLY,
- process.ProcessWriter, reactor, 0)
- self._openHandleFailures(call_on_failure, stdout_path, os.O_RDONLY,
- process.ProcessReader, reactor, 1)
- self._openHandleFailures(call_on_failure, stderr_path, os.O_RDONLY,
- process.ProcessReader, reactor, 2)
- except:
- exc_class, exc_value, exc_tb = sys.exc_info()
- for func, args in call_on_failure:
- try:
- func(*args)
- except:
- # Just log any exceptions at this point. This makes sure
- # all cleanups get called so we don't get leaks. We know
- # there is an active exception, or we wouldn't be here.
- log.err()
- reraise(exc_class, exc_value, tb=exc_tb)
- self.pipes['exit'] = self._exiter
-
- def _getReason(self, status):
- # Copied from twisted.internet.process._BaseProcess
- exitCode = sig = None
- if os.WIFEXITED(status):
- exitCode = os.WEXITSTATUS(status)
- else:
- sig = os.WTERMSIG(status)
- if exitCode or sig:
- return error.ProcessTerminated(exitCode, sig, status)
- return error.ProcessDone(status)
-
- def signalProcess(self, signalID):
- """
- Send the given signal C{signalID} to the process. It'll translate a
- few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
- representation to its int value, otherwise it'll pass directly the
- value provided
-
- @type signalID: C{str} or C{int}
- """
- # Copied from twisted.internet.process._BaseProcess
- if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
- signalID = getattr(signal, 'SIG%s' % (signalID,))
- if self.pid is None:
- raise process.ProcessExitedAlready()
- os.kill(self.pid, signalID)
-
- # Implemented because conch.ssh.session uses it, the Process implementation
- # ignores writes if channel '0' is not available
- def write(self, data):
- self.pipes[0].write(data)
-
- def writeToChild(self, childFD, data):
- # Copied from twisted.internet.process.Process
- self.pipes[childFD].write(data)
-
- def closeChildFD(self, childFD):
- if childFD in self.pipes:
- self.pipes[childFD].loseConnection()
-
- def closeStdin(self):
- self.closeChildFD(0)
-
- def closeStdout(self):
- self.closeChildFD(1)
-
- def closeStderr(self):
- self.closeChildFD(2)
-
- def loseConnection(self):
- self.closeStdin()
- self.closeStdout()
- self.closeStderr()
-
- # Implemented because ProcessWriter/ProcessReader want to call it
- # Copied from twisted.internet.Process
- def childDataReceived(self, name, data):
- self.proto.childDataReceived(name, data)
-
- # Implemented because ProcessWriter/ProcessReader want to call it
- # Copied from twisted.internet.Process
- def childConnectionLost(self, childFD, reason):
- pipe = self.pipes.get(childFD)
- if pipe is not None:
- close = getattr(pipe, 'close', None)
- if close is not None:
- close()
- else:
- os.close(self.pipes[childFD].fileno())
- del self.pipes[childFD]
- try:
- self.proto.childConnectionLost(childFD)
- except:
- log.err()
- self.maybeCallProcessEnded()
-
- # Implemented because of childConnectionLost
- # Adapted from twisted.internet.Process
- # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
- # point, but the daemon will be doing the reaping for us (we can't
- # because the process isn't a direct child.)
- def maybeCallProcessEnded(self):
- if self.pipes:
- # Not done if we still have open pipes
- return
- if not self.lostProcess:
- return
- process.BaseProcess.maybeCallProcessEnded(self)
- # pauseProducing, present in process.py, not a IProcessTransport interface
-
-
class ExecOnlySession(DoNothingSession):
"""Conch session that only allows executing commands."""
@@ -414,36 +144,6 @@ class RestrictedExecOnlySession(ExecOnlySession):
% {'user_id': self.avatar.user_id})
-class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
- """Use the Forking Service instead of spawnProcess."""
-
- def _simplifyEnvironment(self, env):
- """Pull out the bits of the environment we want to pass along."""
- env = {}
- for env_var in ['BRZ_EMAIL']:
- if env_var in self.environment:
- env[env_var] = self.environment[env_var]
- return env
-
- def getCommandToFork(self, executable, arguments, env):
- assert executable.endswith('/bin/py')
- assert arguments[0] == executable
- assert arguments[1].endswith('/brz')
- executable = 'brz'
- arguments = arguments[1:]
- arguments[0] = 'brz'
- env = self._simplifyEnvironment(env)
- return executable, arguments, env
-
- def _spawn(self, protocol, executable, arguments, env):
- # When spawning, adapt the idea of "bin/py .../brz" to just using "brz"
- # and the executable
- executable, arguments, env = self.getCommandToFork(executable,
- arguments, env)
- return ForkedProcessTransport(self.reactor, executable,
- arguments, env, protocol)
-
-
def lookup_command_template(command):
"""Map a command to a command template.
@@ -474,17 +174,6 @@ def launch_smart_server(avatar):
# Extract the hostname from the supermirror root config.
hostname = urlparse(config.codehosting.supermirror_root)[1]
environment['BRZ_EMAIL'] = '%s@%s' % (avatar.username, hostname)
- # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
- # If the forking daemon has been spawned, then we can use it if the
- # feature is set to true for the given user, etc.
- # A global config is a good first step, but does require restarting
- # the service to change the flag. 'config' doesn't support SIGHUP.
- # For now, restarting the service is necessary to enabled/disable the
- # forking daemon.
- if config.codehosting.use_forking_daemon:
- klass = ForkingRestrictedExecOnlySession
- else:
- klass = RestrictedExecOnlySession
-
- return klass(avatar, reactor, lookup_command_template,
- environment=environment)
+
+ return RestrictedExecOnlySession(
+ avatar, reactor, lookup_command_template, environment=environment)
diff --git a/lib/lp/codehosting/sshserver/tests/test_session.py b/lib/lp/codehosting/sshserver/tests/test_session.py
index a212873..da06fc8 100644
--- a/lib/lp/codehosting/sshserver/tests/test_session.py
+++ b/lib/lp/codehosting/sshserver/tests/test_session.py
@@ -5,8 +5,6 @@
__metaclass__ = type
-import socket
-
from twisted.conch.interfaces import ISession
from twisted.conch.ssh import connection
from twisted.internet.process import ProcessExitedAlready
@@ -18,10 +16,8 @@ from lp.codehosting import (
)
from lp.codehosting.sshserver.daemon import CodehostingAvatar
from lp.codehosting.sshserver.session import (
- _WaitForExit,
ExecOnlySession,
ForbiddenCommand,
- ForkingRestrictedExecOnlySession,
lookup_command_template,
RestrictedExecOnlySession,
)
@@ -92,36 +88,6 @@ class MockProcessTransport:
self.log.append(('processEnded', status))
-class Test_WaitForExit(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.reactor = MockReactor()
- self.proc = MockProcessTransport('executable')
- sock = socket.socket()
- self.exiter = _WaitForExit(self.reactor, self.proc, sock)
-
- def test__init__starts_reading(self):
- self.assertEqual([('addReader', self.exiter)], self.reactor.log)
-
- def test_dataReceived_ends_cleanly(self):
- self.exiter.dataReceived('exited\n0\n')
- self.assertEqual([('processEnded', 0)], self.proc.log)
-
- def test_dataReceived_ends_with_errno(self):
- self.exiter.dataReceived('exited\n256\n')
- self.assertEqual([('processEnded', 256)], self.proc.log)
-
- def test_dataReceived_bad_data(self):
- # Note: The dataReceived code calls 'log.err' which ends up getting
- # printed during the test run. How do I suppress that or even
- # better, check that it does so?
- # flush_logged_errors() doesn't seem to do anything.
- self.exiter.dataReceived('bogus\n')
- self.assertEqual([('childConnectionLost', 'exit', 'invalid data'),
- ('processEnded', (255 << 8))], self.proc.log)
-
-
class TestExecOnlySession(AvatarTestCase):
"""Tests for ExecOnlySession.
@@ -399,35 +365,6 @@ class TestSessionIntegration(AvatarTestCase):
self.assertRaises(
ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
- def test_avatarAdaptsToOnlyRestrictedSession(self):
- config.push('codehosting-no-forking',
- "[codehosting]\nuse_forking_daemon: False\n")
- self.addCleanup(config.pop, 'codehosting-no-forking')
- session = ISession(self.avatar)
- self.assertFalse(isinstance(session, ForkingRestrictedExecOnlySession),
- "ISession(avatar) shouldn't adapt to "
- " ForkingRestrictedExecOnlySession when forking is disabled. ")
-
- def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
- config.push('codehosting-forking',
- "[codehosting]\nuse_forking_daemon: True\n")
- self.addCleanup(config.pop, 'codehosting-forking')
- session = ISession(self.avatar)
- self.assertTrue(
- isinstance(session, ForkingRestrictedExecOnlySession),
- "ISession(avatar) doesn't adapt to "
- " ForkingRestrictedExecOnlySession. "
- "Got %r instead." % (session,))
- executable, arguments = session.getCommandToRun(
- 'bzr serve --inet --directory=/ --allow-writes')
- executable, arguments, env = session.getCommandToFork(
- executable, arguments, session.environment)
- self.assertEqual('brz', executable)
- self.assertEqual(
- ['brz', 'lp-serve',
- '--inet', str(self.avatar.user_id)],
- list(arguments))
-
class TestLookupCommand(TestCase):
diff --git a/lib/lp/codehosting/tests/test_acceptance.py b/lib/lp/codehosting/tests/test_acceptance.py
index 38cb7e9..85ec8c0 100644
--- a/lib/lp/codehosting/tests/test_acceptance.py
+++ b/lib/lp/codehosting/tests/test_acceptance.py
@@ -7,10 +7,6 @@ __metaclass__ = type
import os
import re
-import signal
-import subprocess
-import sys
-import time
import breezy.branch
from breezy.tests import TestCaseWithTransport
@@ -56,85 +52,9 @@ from lp.testing import TestCaseWithFactory
from lp.testing.layers import ZopelessAppServerLayer
-class ForkingServerForTests(object):
- """Map starting/stopping a LPForkingService to setUp() and tearDown()."""
-
- def __init__(self):
- self.process = None
- self.socket_path = None
-
- def setUp(self):
- brz_path = get_brz_path()
- BRZ_PLUGIN_PATH = get_BRZ_PLUGIN_PATH_for_subprocess()
- env = os.environ.copy()
- env['BRZ_PLUGIN_PATH'] = BRZ_PLUGIN_PATH
- # TODO: We probably want to use a random disk path for
- # forking_daemon_socket, but we need to update config so that
- # the CodeHosting service can find it.
- # The main problem is that CodeHostingTac seems to start a tac
- # server directly from the disk configs, and doesn't use the
- # in-memory config. So we can't just override the memory
- # settings, we have to somehow pass it a new config-on-disk to
- # use.
- self.socket_path = config.codehosting.forking_daemon_socket
- command = [sys.executable, brz_path, 'launchpad-forking-service',
- '--path', self.socket_path, '-Derror']
- process = subprocess.Popen(
- command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
- self.process = process
- stderr = []
- # The first line should be "Preloading" indicating it is ready
- stderr.append(process.stderr.readline())
- # The next line is the "Listening on socket" line
- stderr.append(process.stderr.readline())
- # Now it should be ready. If there were any errors, let's check, and
- # report them.
- if (process.poll() is not None or
- not stderr[1].strip().startswith('Listening on socket')):
- if process.poll() is None:
- time.sleep(1) # Give the traceback a chance to render.
- os.kill(process.pid, signal.SIGTERM)
- process.wait()
- self.process = None
- # Looks like there was a problem. We cannot use the "addDetail"
- # method because this class is not a TestCase and does not have
- # access to one. It runs as part of a layer. A "print" is the
- # best we can do. That should still be visible on buildbot, which
- # is where we have seen spurious failures so far.
- print
- print "stdout:"
- print process.stdout.read()
- print "-" * 70
- print "stderr:"
- print ''.join(stderr)
- print process.stderr.read()
- print "-" * 70
- raise RuntimeError(
- 'Breezy server did not start correctly. See stdout and '
- 'stderr reported above. Command was "%s". PYTHONPATH was '
- '"%s". BRZ_PLUGIN_PATH was "%s".' %
- (' '.join(command),
- env.get('PYTHONPATH'),
- env.get('BRZ_PLUGIN_PATH')))
-
- def tearDown(self):
- # SIGTERM is the graceful exit request, potentially we could wait a
- # bit and send something stronger?
- if self.process is not None and self.process.poll() is None:
- os.kill(self.process.pid, signal.SIGTERM)
- self.process.wait()
- self.process = None
- # We want to make sure the socket path has been cleaned up, so that
- # future runs can work correctly
- if os.path.exists(self.socket_path):
- # Should there be a warning/error here?
- os.remove(self.socket_path)
-
-
class SSHServerLayer(ZopelessAppServerLayer):
_tac_handler = None
- _forker_service = None
@classmethod
def getTacHandler(cls):
@@ -144,26 +64,17 @@ class SSHServerLayer(ZopelessAppServerLayer):
return cls._tac_handler
@classmethod
- def getForker(cls):
- if cls._forker_service is None:
- cls._forker_service = ForkingServerForTests()
- return cls._forker_service
-
- @classmethod
@profiled
def setUp(cls):
tac_handler = SSHServerLayer.getTacHandler()
tac_handler.setUp()
SSHServerLayer._reset()
- forker = SSHServerLayer.getForker()
- forker.setUp()
@classmethod
@profiled
def tearDown(cls):
SSHServerLayer._reset()
SSHServerLayer.getTacHandler().tearDown()
- SSHServerLayer.getForker().tearDown()
@classmethod
@profiled
diff --git a/lib/lp/scripts/runlaunchpad.py b/lib/lp/scripts/runlaunchpad.py
index c7019bd..969cfd6 100644
--- a/lib/lp/scripts/runlaunchpad.py
+++ b/lib/lp/scripts/runlaunchpad.py
@@ -168,52 +168,6 @@ class MemcachedService(Service):
process.stdin.close()
-class ForkingSessionService(Service):
- """A lp-forking-service for handling codehosting access."""
-
- # TODO: The "sftp" (aka codehosting) server depends fairly heavily on this
- # service. It would seem reasonable to make one always start if the
- # other one is started. Though this might be a way to "FeatureFlag"
- # whether this is active or not.
- @property
- def should_launch(self):
- return (config.codehosting.launch and
- config.codehosting.use_forking_daemon)
-
- @property
- def logfile(self):
- """Return the log file to use.
-
- Default to the value of the configuration key logfile.
- """
- return config.codehosting.forker_logfile
-
- def launch(self):
- # Following the logic in TacFile. Specifically, if you configure sftp
- # to not run (and thus bzr+ssh) then we don't want to run the forking
- # service.
- if not self.should_launch:
- return
- from lp.codehosting import get_brz_path
- command = [config.root + '/bin/py', get_brz_path(),
- 'launchpad-forking-service',
- '--path', config.codehosting.forking_daemon_socket,
- ]
- env = dict(os.environ)
- env['BRZ_PLUGIN_PATH'] = config.root + '/brzplugins'
- logfile = self.logfile
- if logfile == '-':
- # This process uses a different logging infrastructure from the
- # rest of the Launchpad code. As such, it cannot trivially use '-'
- # as the logfile. So we just ignore this setting.
- pass
- else:
- env['BRZ_LOG'] = logfile
- process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
- self.addCleanup(stop_process, process)
- process.stdin.close()
-
-
class RabbitService(Service):
"""A RabbitMQ service."""
@@ -247,7 +201,6 @@ SERVICES = {
'librarian': TacFile('librarian', 'daemons/librarian.tac',
'librarian_server', prepare_for_librarian),
'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
- 'forker': ForkingSessionService(),
'bing-webservice': BingWebService(),
'codebrowse': CodebrowseService(),
'memcached': MemcachedService(),
@@ -312,7 +265,6 @@ def start_testapp(argv=list(sys.argv)):
from lp.testing.layers import (
BaseLayer,
DatabaseLayer,
- LayerProcessController,
LibrarianLayer,
RabbitMQLayer,
)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index 24118e8..6a9eebb 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -226,18 +226,6 @@ secret_path:
# datatype: string
logfile: -
-# The location of the log file used by the LaunchpadForkingService
-# datatype: string
-forker_logfile: -
-
-# Should we be using the forking daemon? Or should we be calling spawnProcess
-# instead?
-# datatype: boolean
-use_forking_daemon: False
-# What disk path will the daemon listen on
-# datatype: string
-forking_daemon_socket: /var/tmp/launchpad_forking_service.sock
-
# The prefix of the web URL for all public branches. This should end with a
# slash.
#
diff --git a/lib/lp/services/features/__init__.py b/lib/lp/services/features/__init__.py
index 0c44375..6604472 100644
--- a/lib/lp/services/features/__init__.py
+++ b/lib/lp/services/features/__init__.py
@@ -134,7 +134,7 @@ of the 'getFeatureFlag' XML-RPC method.
server_proxy = xmlrpc_client.ServerProxy(
config.launchpad.feature_flags_endpoint, allow_none=True)
if server_proxy.getFeatureFlag(
- 'codehosting.use_forking_server', ['user:' + user_name]):
+ 'example_flag.enabled', ['user:' + user_name]):
pass
Debugging feature usage