← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad

 

John A Meinel has proposed merging lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~jameinel/launchpad/lp-serve-child-hangup/+merge/51893

I'm resubmitting, since it has been a while since the previous conversation. I now use 'signal.alarm' to kill the child after 120s if it hasn't finished opening its handles. (sigalarm was preferred to threading.Timer, presumably because you don't really want to mix threading and signals, and because python threads tend to be flaky anyway because of GIL issues.)

Thinking about it, I really wonder if it should be more like 30s or even less. If Conch asks for a process, it should be able to connect to that process immediately. It may take a while for the process to start, etc. But all my timings show fork + start working happening in <100ms, not 30s. The original 30s to spawn a child was including the ssh handshake, and all the other work. However, this is supposed to be a rather exceptional condition, so I'll leave it at 2min for now.

This is the final step in fixing bug #717345 that I know of. The other patches make sure we clean up as much as we can, and this closes the one last handle the master process holds on to. The socket that tells it the child has exited. It doesn't close with the others, because it is connected to the master forking service process, waiting for child exit codes, etc.

-- 
https://code.launchpad.net/~jameinel/launchpad/lp-serve-child-hangup/+merge/51893
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad.
=== modified file 'bzrplugins/lpserve/__init__.py'
--- bzrplugins/lpserve/__init__.py	2011-02-17 21:47:07 +0000
+++ bzrplugins/lpserve/__init__.py	2011-03-02 13:11:28 +0000
@@ -15,6 +15,7 @@
 
 
 import errno
+import fcntl
 import logging
 import os
 import resource
@@ -31,6 +32,7 @@
 from bzrlib.option import Option
 from bzrlib import (
     commands,
+    errors,
     lockdir,
     osutils,
     trace,
@@ -309,6 +311,11 @@
     SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
     WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
                                    # be read
+    CHILD_CONNECT_TIMEOUT = 120   # If we get a fork() request, but nobody
+                                  # connects just exit
+                                  # On a heavily loaded server, it could take a
+                                  # couple secs, but it should never take
+                                  # minutes
 
     _fork_function = os.fork
 
@@ -324,6 +331,7 @@
         # Map from pid => (temp_path_for_handles, request_socket)
         self._child_processes = {}
         self._children_spawned = 0
+        self._child_connect_timeout = self.CHILD_CONNECT_TIMEOUT
 
     def _create_master_socket(self):
         self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -372,28 +380,94 @@
         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
         signal.signal(signal.SIGTERM, signal.SIG_DFL)
 
+    def _compute_paths(self, base_path):
+        stdin_path = os.path.join(base_path, 'stdin')
+        stdout_path = os.path.join(base_path, 'stdout')
+        stderr_path = os.path.join(base_path, 'stderr')
+        return (stdin_path, stdout_path, stderr_path)
+
     def _create_child_file_descriptors(self, base_path):
-        stdin_path = os.path.join(base_path, 'stdin')
-        stdout_path = os.path.join(base_path, 'stdout')
-        stderr_path = os.path.join(base_path, 'stderr')
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
         os.mkfifo(stdin_path)
         os.mkfifo(stdout_path)
         os.mkfifo(stderr_path)
 
-    def _bind_child_file_descriptors(self, base_path):
-        stdin_path = os.path.join(base_path, 'stdin')
-        stdout_path = os.path.join(base_path, 'stdout')
-        stderr_path = os.path.join(base_path, 'stderr')
+    def _set_blocking(self, fd):
+        """Change the file descriptor to unset the O_NONBLOCK flag."""
+        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+        flags = flags & (~os.O_NONBLOCK)
+        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+    def _get_child_connect_timeout(self):
+        """signal.alarm only supports 1s granularity.
+
+        We have to make sure we don't ever send 0, which would not generate an
+        alarm.
+        """
+        timeout = int(self._child_connect_timeout)
+        if timeout <= 0:
+            timeout = 1
+        return timeout
+
+    def _open_handles(self, base_path):
+        """Open the given file handles.
+
+        This will attempt to open all of these file handles, but will not block
+        while opening them, timing out after self._child_connect_timeout
+        seconds.
+
+        :param base_path: The directory where all FIFOs are located
+        :return: (stdin_fid, stdout_fid, stderr_fid)
+        """
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
         # These open calls will block until another process connects (which
         # must connect in the same order)
-        stdin_fid = os.open(stdin_path, os.O_RDONLY)
-        stdout_fid = os.open(stdout_path, os.O_WRONLY)
-        stderr_fid = os.open(stderr_path, os.O_WRONLY)
+        fids = []
+        to_open = [(stdin_path, os.O_RDONLY), (stdout_path, os.O_WRONLY),
+                   (stderr_path, os.O_WRONLY)]
+        signal.alarm(self._get_child_connect_timeout())
+        tstart = time.time()
+        for path, flags in to_open:
+            try:
+                fids.append(os.open(path, flags))
+            except OSError, e:
+                if e.errno == errno.EINTR:
+                    error = ('After %.3fs we failed to open %s, exiting'
+                             % (time.time() - tstart, path,))
+                    trace.warning(error)
+                    for fid in fids:
+                        try:
+                            os.close(fid)
+                        except OSError:
+                            pass
+                    self._cleanup_fifos(base_path)
+                    raise errors.BzrError(error)
+                raise
+        # If we get to here, that means all the handles were opened
+        # successfully, so cancel the wakeup call.
+        signal.alarm(0)
+        return fids
+
+    def _cleanup_fifos(self, base_path):
+        """Remove the FIFO objects and directory from disk."""
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
+        # Now that we've opened the handles, delete everything so that we don't
+        # leave garbage around. Because the open() is done in blocking mode, we
+        # know that someone has already connected to them, and we don't want
+        # anyone else getting confused and connecting.
+        # See [Decision #5]
+        os.remove(stdin_path)
+        os.remove(stdout_path)
+        os.remove(stderr_path)
+        os.rmdir(base_path)
+
+    def _bind_child_file_descriptors(self, base_path):
         # Note: by this point bzrlib has opened stderr for logging
         #       (as part of starting the service process in the first place).
         #       As such, it has a stream handler that writes to stderr. logging
         #       tries to flush and close that, but the file is already closed.
         #       This just supresses that exception
+        stdin_fid, stdout_fid, stderr_fid = self._open_handles(base_path)
         logging.raiseExceptions = False
         sys.stdin.close()
         sys.stdout.close()
@@ -407,15 +481,7 @@
         ui.ui_factory.stdin = sys.stdin
         ui.ui_factory.stdout = sys.stdout
         ui.ui_factory.stderr = sys.stderr
-        # Now that we've opened the handles, delete everything so that we don't
-        # leave garbage around. Because the open() is done in blocking mode, we
-        # know that someone has already connected to them, and we don't want
-        # anyone else getting confused and connecting.
-        # See [Decision #5]
-        os.remove(stderr_path)
-        os.remove(stdout_path)
-        os.remove(stdin_path)
-        os.rmdir(base_path)
+        self._cleanup_fifos(base_path)
 
     def _close_child_file_descriptors(self):
         sys.stdin.close()
@@ -724,6 +790,15 @@
             self._should_terminate.set()
             conn.sendall('ok\nquit command requested... exiting\n')
             conn.close()
+        elif request.startswith('child_connect_timeout '):
+            try:
+                value = int(request.split(' ', 1)[1])
+            except ValueError, e:
+                conn.sendall('FAILURE: %r\n' % (e,))
+            else:
+                self._child_connect_timeout = value
+                conn.sendall('ok\n')
+            conn.close()
         elif request.startswith('fork ') or request.startswith('fork-env '):
             command_argv, env = self._parse_fork_request(conn, client_addr,
                                                          request)

=== modified file 'bzrplugins/lpserve/test_lpserve.py'
--- bzrplugins/lpserve/test_lpserve.py	2010-11-08 22:43:58 +0000
+++ bzrplugins/lpserve/test_lpserve.py	2011-03-02 13:11:28 +0000
@@ -3,6 +3,7 @@
 
 import errno
 import os
+import shutil
 import signal
 import socket
 import subprocess
@@ -13,6 +14,7 @@
 from testtools import content
 
 from bzrlib import (
+    errors,
     osutils,
     tests,
     trace,
@@ -263,6 +265,46 @@
                                                 one_byte_at_a_time=True)
         self.assertStartsWith(response, 'FAILURE\n')
 
+    def test_child_connection_timeout(self):
+        self.assertEqual(self.service.CHILD_CONNECT_TIMEOUT,
+                         self.service._child_connect_timeout)
+        response = self.send_message_to_service('child_connect_timeout 1\n')
+        self.assertEqual('ok\n', response)
+        self.assertEqual(1, self.service._child_connect_timeout)
+
+    def test_child_connection_timeout_bad_float(self):
+        self.assertEqual(self.service.CHILD_CONNECT_TIMEOUT,
+                         self.service._child_connect_timeout)
+        response = self.send_message_to_service('child_connect_timeout 1.2\n')
+        self.assertStartsWith(response, 'FAILURE:')
+
+    def test_child_connection_timeout_no_val(self):
+        response = self.send_message_to_service('child_connect_timeout \n')
+        self.assertStartsWith(response, 'FAILURE:')
+
+    def test_child_connection_timeout_bad_val(self):
+        response = self.send_message_to_service('child_connect_timeout b\n')
+        self.assertStartsWith(response, 'FAILURE:')
+
+    def test__open_handles_will_timeout(self):
+        # signal.alarm() has only 1-second granularity. :(
+        self.service._child_connect_timeout = 1
+        tempdir = tempfile.mkdtemp(prefix='testlpserve-')
+        self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
+        os.mkfifo(os.path.join(tempdir, 'stdin'))
+        os.mkfifo(os.path.join(tempdir, 'stdout'))
+        os.mkfifo(os.path.join(tempdir, 'stderr'))
+        def noop_on_alarm(signal, frame):
+            return
+        signal.signal(signal.SIGALRM, noop_on_alarm)
+        self.addCleanup(signal.signal, signal.SIGALRM, signal.SIG_DFL)
+        e = self.assertRaises(errors.BzrError,
+            self.service._open_handles, tempdir)
+        self.assertContainsRe(str(e), r'After \d+.\d+s we failed to open.*')
+        # Even though it timed out, we still cleanup the temp dir
+        self.assertFalse(os.path.exists(tempdir))
+
+
 
 class TestCaseWithSubprocess(tests.TestCaseWithTransport):
     """Override the bzr start_bzr_subprocess command.
@@ -452,9 +494,9 @@
         stderr_path = os.path.join(path, 'stderr')
         # The ordering must match the ordering of the service or we get a
         # deadlock.
-        child_stdin = open(stdin_path, 'wb')
-        child_stdout = open(stdout_path, 'rb')
-        child_stderr = open(stderr_path, 'rb')
+        child_stdin = open(stdin_path, 'wb', 0)
+        child_stdout = open(stdout_path, 'rb', 0)
+        child_stderr = open(stderr_path, 'rb', 0)
         return child_stdin, child_stdout, child_stderr
 
     def communicate_with_fork(self, path, stdin=None):
@@ -484,6 +526,24 @@
         self.assertEqual('', stderr_content)
         self.assertReturnCode(0, sock)
 
+    def DONT_test_fork_lp_serve_multiple_hello(self):
+        # This ensures that the fifos are all set to blocking mode
+        # We can't actually run this test, because by default 'bzr serve
+        # --inet' does not flush after each message. So we end up blocking
+        # forever waiting for the server to finish responding to the first
+        # request.
+        path, _, sock = self.send_fork_request('lp-serve --inet 2')
+        child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
+        child_stdin.write('hello\n')
+        child_stdin.flush()
+        self.assertEqual('ok\x012\n', child_stdout.read())
+        child_stdin.write('hello\n')
+        self.assertEqual('ok\x012\n', child_stdout.read())
+        child_stdin.close()
+        self.assertEqual('', child_stderr.read())
+        child_stdout.close()
+        child_stderr.close()
+
     def test_fork_replay(self):
         path, _, sock = self.send_fork_request('launchpad-replay')
         stdout_content, stderr_content = self.communicate_with_fork(path,
@@ -540,6 +600,29 @@
     def test_sigint_exits_nicely(self):
         self._check_exits_nicely(signal.SIGINT)
 
+    def test_child_exits_eventually(self):
+        # We won't ever bind to the socket the child wants, and after some
+        # time, the child should exit cleanly.
+        # First, tell the subprocess that we want children to exit quickly.
+        # *sigh* signal.alarm only has 1s resolution, so this test is slow.
+        response = self.send_message_to_service('child_connect_timeout 1\n')
+        self.assertEqual('ok\n', response)
+        # Now request a fork
+        path, pid, sock = self.send_fork_request('rocks')
+        # # Open one handle, but not all of them
+        stdin_path = os.path.join(path, 'stdin')
+        stdout_path = os.path.join(path, 'stdout')
+        stderr_path = os.path.join(path, 'stderr')
+        child_stdin = open(stdin_path, 'wb')
+        # We started opening the child, but stop before we get all handles
+        # open. After 1 second, the child should get signaled and die.
+        # The master process should notice, and tell us the status of the
+        # exited child.
+        val = sock.recv(4096)
+        self.assertEqual('exited\n%s\n' % (signal.SIGALRM,), val)
+        # The master process should clean up after the now deceased child.
+        self.failIfExists(path)
+
 
 class TestCaseWithLPForkingServiceDaemon(
     TestCaseWithLPForkingServiceSubprocess):