← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad

 

John A Meinel has proposed merging lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad with lp:~jameinel/launchpad/lp-forking-serve-cleaner-childre as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~jameinel/launchpad/lp-serve-child-hangup/+merge/50055

This branch builds on my earlier branch, which has the children spawned by LPForkingService clean themselves up more readily.

This adds code so that the blocking calls will be unblocked after 2 minutes. At the moment, if the Conch server requests a fork, but is unable to actually connect to the child, that child ends up hanging forever on a file handle that it will never open.

Originally, I got this working using a loop an O_NONBLOCK. But after doing some testing, fcntl.fcntl(...) was unable to actually change the file descriptor to be blocking again. It looks good in man, and it does unset the flag, but I still get EAGAIN failures in the smart server code.

I don't really like spawning a thread to send SIGUSR1 back to myself, but it seemed the best tradeoff. If we want, we could even make it SIGTERM or something, since we know we are going to kill the process if the connection fails.

I figured 2 minutes was reasonable. This is the time from a successful ssh handshake until we actually connect to a newly spawned child. Even the worst-case time that I've seen was about 30s for a child to be spawned. So this gives us a 4x margin of error.

This is cleanup related to bug #717345, but it doesn't directly fix the problem. That will be in my next branch.
-- 
https://code.launchpad.net/~jameinel/launchpad/lp-serve-child-hangup/+merge/50055
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/lp-serve-child-hangup into lp:launchpad.
=== modified file 'bzrplugins/lpserve/__init__.py'
--- bzrplugins/lpserve/__init__.py	2011-02-16 21:44:15 +0000
+++ bzrplugins/lpserve/__init__.py	2011-02-16 21:44:17 +0000
@@ -15,6 +15,7 @@
 
 
 import errno
+import fcntl
 import logging
 import os
 import resource
@@ -31,6 +32,7 @@
 from bzrlib.option import Option
 from bzrlib import (
     commands,
+    errors,
     lockdir,
     osutils,
     trace,
@@ -131,6 +133,31 @@
 register_command(cmd_launchpad_server)
 
 
+def _handle_sigusr1(signum, frame):
+    """This is used by _wake_me_up_in_a_few to just ignore the signal"""
+    # We only want to handle the signal one time
+    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+    return
+
+
+def _wake_me_up_in_a_few(sleep_time, sig_handler=_handle_sigusr1):
+    """Start a thread that will send a signal to the current process.
+
+    :param sleep_time: The number of seconds to wait before firing SIGUSR1.
+    :return: cancel() A callable which will cancel the signal.
+    """
+    # Set up the signal handler
+    timer = threading.Timer(sleep_time, os.kill,
+                            args=(os.getpid(), signal.SIGUSR1))
+    signal.signal(signal.SIGUSR1, sig_handler)
+    timer.start()
+    def cancel():
+        timer.cancel()
+        signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+        timer.join()
+    return cancel, timer
+
+
 class LPForkingService(object):
     """A service that can be asked to start a new bzr subprocess via fork.
 
@@ -309,6 +336,11 @@
     SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
     WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
                                    # be read
+    CHILD_CONNECT_TIMEOUT = 120.0 # If we get a fork() request, but nobody
+                                  # connects just exit
+                                  # On a heavily loaded server, it could take a
+                                  # couple secs, but it should never take
+                                  # minutes
 
     _fork_function = os.fork
 
@@ -324,6 +356,7 @@
         # Map from pid => (temp_path_for_handles, request_socket)
         self._child_processes = {}
         self._children_spawned = 0
+        self._child_connect_timeout = self.CHILD_CONNECT_TIMEOUT
 
     def _create_master_socket(self):
         self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -372,28 +405,83 @@
         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
         signal.signal(signal.SIGTERM, signal.SIG_DFL)
 
+    def _compute_paths(self, base_path):
+        stdin_path = os.path.join(base_path, 'stdin')
+        stdout_path = os.path.join(base_path, 'stdout')
+        stderr_path = os.path.join(base_path, 'stderr')
+        return (stdin_path, stdout_path, stderr_path)
+
     def _create_child_file_descriptors(self, base_path):
-        stdin_path = os.path.join(base_path, 'stdin')
-        stdout_path = os.path.join(base_path, 'stdout')
-        stderr_path = os.path.join(base_path, 'stderr')
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
         os.mkfifo(stdin_path)
         os.mkfifo(stdout_path)
         os.mkfifo(stderr_path)
 
-    def _bind_child_file_descriptors(self, base_path):
-        stdin_path = os.path.join(base_path, 'stdin')
-        stdout_path = os.path.join(base_path, 'stdout')
-        stderr_path = os.path.join(base_path, 'stderr')
+    def _set_blocking(self, fd):
+        """Change the file descriptor to unset the O_NONBLOCK flag."""
+        flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+        flags = flags & (~os.O_NONBLOCK)
+        fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+    def _open_handles(self, base_path):
+        """Open the given file handles.
+
+        This will attempt to open all of these file handles, but will not block
+        while opening them, timing out after self._child_connect_timeout
+        seconds.
+
+        :param base_path: The directory where all FIFOs are located
+        :return: (stdin_fid, stdout_fid, stderr_fid)
+        """
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
         # These open calls will block until another process connects (which
         # must connect in the same order)
-        stdin_fid = os.open(stdin_path, os.O_RDONLY)
-        stdout_fid = os.open(stdout_path, os.O_WRONLY)
-        stderr_fid = os.open(stderr_path, os.O_WRONLY)
+        fids = []
+        to_open = [(stdin_path, os.O_RDONLY), (stdout_path, os.O_WRONLY),
+                   (stderr_path, os.O_WRONLY)]
+        cancel, _ = _wake_me_up_in_a_few(self._child_connect_timeout)
+        tstart = time.time()
+        for path, flags in to_open:
+            try:
+                fids.append(os.open(path, flags))
+            except OSError, e:
+                if e.errno == errno.EINTR:
+                    error = ('After %.3fs we failed to open %s, exiting'
+                             % (time.time() - tstart, path,))
+                    trace.warning(error)
+                    for fid in fids:
+                        try:
+                            os.close(fid)
+                        except OSError:
+                            pass
+                    self._cleanup_fifos(base_path)
+                    raise errors.BzrError(error)
+                raise
+        # If we get to here, that means all the handles were opened
+        # successfully, so cancel the wakeup call.
+        cancel()
+        return fids
+
+    def _cleanup_fifos(self, base_path):
+        """Remove the FIFO objects and directory from disk."""
+        stdin_path, stdout_path, stderr_path = self._compute_paths(base_path)
+        # Now that we've opened the handles, delete everything so that we don't
+        # leave garbage around. Because the open() is done in blocking mode, we
+        # know that someone has already connected to them, and we don't want
+        # anyone else getting confused and connecting.
+        # See [Decision #5]
+        os.remove(stdin_path)
+        os.remove(stdout_path)
+        os.remove(stderr_path)
+        os.rmdir(base_path)
+
+    def _bind_child_file_descriptors(self, base_path):
         # Note: by this point bzrlib has opened stderr for logging
         #       (as part of starting the service process in the first place).
         #       As such, it has a stream handler that writes to stderr. logging
         #       tries to flush and close that, but the file is already closed.
         #       This just supresses that exception
+        stdin_fid, stdout_fid, stderr_fid = self._open_handles(base_path)
         logging.raiseExceptions = False
         sys.stdin.close()
         sys.stdout.close()
@@ -407,15 +495,7 @@
         ui.ui_factory.stdin = sys.stdin
         ui.ui_factory.stdout = sys.stdout
         ui.ui_factory.stderr = sys.stderr
-        # Now that we've opened the handles, delete everything so that we don't
-        # leave garbage around. Because the open() is done in blocking mode, we
-        # know that someone has already connected to them, and we don't want
-        # anyone else getting confused and connecting.
-        # See [Decision #5]
-        os.remove(stderr_path)
-        os.remove(stdout_path)
-        os.remove(stdin_path)
-        os.rmdir(base_path)
+        self._cleanup_fifos(base_path)
 
     def _close_child_file_descriptors(self):
         sys.stdin.close()
@@ -724,6 +804,15 @@
             self._should_terminate.set()
             conn.sendall('ok\nquit command requested... exiting\n')
             conn.close()
+        elif request.startswith('child_connect_timeout '):
+            try:
+                value = float(request.split(' ', 1)[1])
+            except ValueError, e:
+                conn.sendall('FAILURE: %r\n' % (e,))
+            else:
+                self._child_connect_timeout = value
+                conn.sendall('ok\n')
+            conn.close()
         elif request.startswith('fork ') or request.startswith('fork-env '):
             command_argv, env = self._parse_fork_request(conn, client_addr,
                                                          request)

=== modified file 'bzrplugins/lpserve/test_lpserve.py'
--- bzrplugins/lpserve/test_lpserve.py	2010-11-08 22:43:58 +0000
+++ bzrplugins/lpserve/test_lpserve.py	2011-02-16 21:44:17 +0000
@@ -3,6 +3,7 @@
 
 import errno
 import os
+import shutil
 import signal
 import socket
 import subprocess
@@ -13,6 +14,7 @@
 from testtools import content
 
 from bzrlib import (
+    errors,
     osutils,
     tests,
     trace,
@@ -263,6 +265,35 @@
                                                 one_byte_at_a_time=True)
         self.assertStartsWith(response, 'FAILURE\n')
 
+    def test_child_connection_timeout(self):
+        self.assertEqual(self.service.CHILD_CONNECT_TIMEOUT,
+                         self.service._child_connect_timeout)
+        response = self.send_message_to_service('child_connect_timeout 1.0\n')
+        self.assertEqual('ok\n', response)
+        self.assertEqual(1.0, self.service._child_connect_timeout)
+
+    def test_child_connection_timeout_no_val(self):
+        response = self.send_message_to_service('child_connect_timeout \n')
+        self.assertStartsWith(response, 'FAILURE:')
+
+    def test_child_connection_timeout_bad_val(self):
+        response = self.send_message_to_service('child_connect_timeout b\n')
+        self.assertStartsWith(response, 'FAILURE:')
+
+    def test__open_handles_will_timeout(self):
+        self.service._child_connect_timeout = 0.1
+        tempdir = tempfile.mkdtemp(prefix='testlpserve-')
+        self.addCleanup(shutil.rmtree, tempdir, ignore_errors=True)
+        os.mkfifo(os.path.join(tempdir, 'stdin'))
+        os.mkfifo(os.path.join(tempdir, 'stdout'))
+        os.mkfifo(os.path.join(tempdir, 'stderr'))
+        e = self.assertRaises(errors.BzrError,
+            self.service._open_handles, tempdir)
+        self.assertContainsRe(str(e), r'After \d+.\d+s we failed to open.*')
+        # Even though it timed out, we still cleanup the temp dir
+        self.assertFalse(os.path.exists(tempdir))
+
+
 
 class TestCaseWithSubprocess(tests.TestCaseWithTransport):
     """Override the bzr start_bzr_subprocess command.
@@ -452,9 +483,9 @@
         stderr_path = os.path.join(path, 'stderr')
         # The ordering must match the ordering of the service or we get a
         # deadlock.
-        child_stdin = open(stdin_path, 'wb')
-        child_stdout = open(stdout_path, 'rb')
-        child_stderr = open(stderr_path, 'rb')
+        child_stdin = open(stdin_path, 'wb', 0)
+        child_stdout = open(stdout_path, 'rb', 0)
+        child_stderr = open(stderr_path, 'rb', 0)
         return child_stdin, child_stdout, child_stderr
 
     def communicate_with_fork(self, path, stdin=None):
@@ -484,6 +515,24 @@
         self.assertEqual('', stderr_content)
         self.assertReturnCode(0, sock)
 
+    def DONT_test_fork_lp_serve_multiple_hello(self):
+        # This ensures that the fifos are all set to blocking mode
+        # We can't actually run this test, because by default 'bzr serve
+        # --inet' does not flush after each message. So we end up blocking
+        # forever waiting for the server to finish responding to the first
+        # request.
+        path, _, sock = self.send_fork_request('lp-serve --inet 2')
+        child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
+        child_stdin.write('hello\n')
+        child_stdin.flush()
+        self.assertEqual('ok\x012\n', child_stdout.read())
+        child_stdin.write('hello\n')
+        self.assertEqual('ok\x012\n', child_stdout.read())
+        child_stdin.close()
+        self.assertEqual('', child_stderr.read())
+        child_stdout.close()
+        child_stderr.close()
+
     def test_fork_replay(self):
         path, _, sock = self.send_fork_request('launchpad-replay')
         stdout_content, stderr_content = self.communicate_with_fork(path,
@@ -540,6 +589,27 @@
     def test_sigint_exits_nicely(self):
         self._check_exits_nicely(signal.SIGINT)
 
+    def test_child_exits_eventually(self):
+        # We won't ever bind to the socket the child wants, and after some
+        # time, the child should exit cleanly.
+        # First, tell the subprocess that we want children to exit quickly.
+        response = self.send_message_to_service('child_connect_timeout 0.05\n')
+        self.assertEqual('ok\n', response)
+        # Now request a fork
+        path, pid, sock = self.send_fork_request('rocks')
+        # # Open one handle, but not all of them
+        stdin_path = os.path.join(path, 'stdin')
+        stdout_path = os.path.join(path, 'stdout')
+        stderr_path = os.path.join(path, 'stderr')
+        child_stdin = open(stdin_path, 'wb')
+        # # I hate adding time.sleep here, but I don't see a better way, yet
+        for i in xrange(10):
+            if not os.path.exists(path):
+                break
+            time.sleep(0.01)
+        else:
+            self.fail('Child process failed to cleanup after timeout.')
+
 
 class TestCaseWithLPForkingServiceDaemon(
     TestCaseWithLPForkingServiceSubprocess):
@@ -663,3 +733,39 @@
         # We're done, shut it down
         self.stop_service()
         self.failIf(os.path.isfile(self.service_pid_filename))
+
+
+class Test_WakeUp(tests.TestCaseInTempDir):
+
+    def test_wakeup_interrupts_fifo_open(self):
+        os.mkfifo('test-fifo')
+        self.addCleanup(os.remove, 'test-fifo')
+        cancel, t = lpserve._wake_me_up_in_a_few(0.01)
+        e = self.assertRaises(OSError, os.open, 'test-fifo', os.O_RDONLY)
+        self.assertEqual(errno.EINTR, e.errno)
+        t.join()
+
+    def test_custom_callback_called(self):
+        called = []
+        def _sigusr1_called(sig, frame):
+            called.append(sig)
+            signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+        cancel, t = lpserve._wake_me_up_in_a_few(0.01, _sigusr1_called)
+        time.sleep(0.1)
+        self.assertEqual([signal.SIGUSR1], called)
+        t.join()
+
+    def test_cancel_aborts_interrupt(self):
+        called = []
+        def _sigusr1_called(sig, frame):
+            called.append(sig, frame)
+        cancel, t = lpserve._wake_me_up_in_a_few(0.01)
+        cancel()
+        time.sleep(0.1)
+        # The signal should not have been fired, and we should have reset the
+        # signal handler
+        self.assertEqual([], called)
+        self.assertEqual(signal.SIG_DFL,
+                         signal.signal(signal.SIGUSR1, signal.SIG_DFL))
+        # Should have already been joined in cancel()
+        t.join()


Follow ups