← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/launchpad/twisted-close-on-failure into lp:launchpad

 

John A Meinel has proposed merging lp:~jameinel/launchpad/twisted-close-on-failure into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  #717345 Updates to SFTP server leak file handles in use_forking_server mode
  https://bugs.launchpad.net/bugs/717345

For more details, see:
https://code.launchpad.net/~jameinel/launchpad/twisted-close-on-failure/+merge/50067

This is the final step I've put together for bug #717345.

This changes the spawning code so that it more directly handles when we fail to actually connect to a newly spawned child. This should decrease the chance of leaking file handles. And with https://code.launchpad.net/~jameinel/launchpad/lp-serve-child-hangup/+merge/50055 I see that even after forcing the server to run out of handles for a while, it recovers and all remaining handles get closed.

I tested this using 'ulimit -n 100', and then 'hammer_ssh.py --load=20'. So it starts running out of handles, but serves what it can on the remaining connections. Then once the load is gone, I can see all the remaining handles get closed, and the process is back to a clean state.

-- 
https://code.launchpad.net/~jameinel/launchpad/twisted-close-on-failure/+merge/50067
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/twisted-close-on-failure into lp:launchpad.
=== modified file 'lib/lp/codehosting/sshserver/session.py'
--- lib/lp/codehosting/sshserver/session.py	2010-10-07 23:43:40 +0000
+++ lib/lp/codehosting/sshserver/session.py	2011-02-16 22:53:51 +0000
@@ -21,7 +21,8 @@
     interfaces,
     process,
     )
-from twisted.python import log
+from twisted.python import log, failure
+from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE
 
 from canonical.config import config
 from lp.codehosting import get_bzr_path
@@ -129,6 +130,7 @@
             log.err('Connection failed: %s' % (e,))
             raise
         if response.startswith("FAILURE"):
+            client_sock.close()
             raise RuntimeError('Failed to send message: %r' % (response,))
         return response, client_sock
 
@@ -154,36 +156,55 @@
         message.append('end\n')
         message = ''.join(message)
         response, sock = self._sendMessageToService(message)
-        if response.startswith('FAILURE'):
-            # TODO: Is there a better error to raise?
-            raise RuntimeError("Failed while sending message to forking "
-                "service. message: %r, failure: %r"
-                % (message, response))
-        ok, pid, path, tail = response.split('\n')
-        assert ok == 'ok'
-        assert tail == ''
-        pid = int(pid)
-        log.msg('Forking returned pid: %d, path: %s' % (pid, path))
+        try:
+            ok, pid, path, tail = response.split('\n')
+            assert ok == 'ok'
+            assert tail == ''
+            pid = int(pid)
+            log.msg('Forking returned pid: %d, path: %s' % (pid, path))
+        except:
+            sock.close()
+            raise
         return pid, path, sock
 
+    def _openHandleFailures(self, call_on_failure, path, flags, proc_class,
+                            reactor, child_fd):
+        """Open the given path, adding a cleanup as appropriate.
+
+        :param call_on_failure: A list holding (callback, args) tuples. We will
+            append new entries for things that we open
+        :param path: The path to open
+        :param flags: Flags to pass to os.open
+        :param proc_class: The ProcessWriter/ProcessReader class to wrap this
+            connection.
+        :param reactor: The Twisted reactor we are connecting to.
+        :param child_fd: The child file descriptor number passed to proc_class
+        """
+        fd = os.open(path, flags)
+        call_on_failure.append((os.close, fd))
+        p = proc_class(reactor, self, child_fd, fd)
+        # The ProcessReader/Writer now handles this fileno, don't close
+        # directly anymore
+        call_on_failure[-1] = (p.connectionLost, (None,))
+        self.pipes[child_fd] = p
+
     def _connectSpawnToReactor(self, reactor):
+        self._exiter = _WaitForExit(reactor, self, self.process_sock)
+        call_on_failure = [(self._exiter.connectionLost, (None,))]
         stdin_path = os.path.join(self._fifo_path, 'stdin')
         stdout_path = os.path.join(self._fifo_path, 'stdout')
         stderr_path = os.path.join(self._fifo_path, 'stderr')
-        child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
-        self.pipes[0] = process.ProcessWriter(reactor, self, 0,
-                                              child_stdin_fd)
-        child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
-        # forceReadHack=True ? Used in process.py doesn't seem to be needed
-        # here
-        self.pipes[1] = process.ProcessReader(reactor, self, 1,
-                                              child_stdout_fd)
-        child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
-        self.pipes[2] = process.ProcessReader(reactor, self, 2,
-                                              child_stderr_fd)
-        # Note: _exiter forms a GC cycle, since it points to us, and we hold a
-        # reference to it
-        self._exiter = _WaitForExit(reactor, self, self.process_sock)
+        try:
+            self._openHandleFailures(call_on_failure, stdin_path, os.O_WRONLY,
+                process.ProcessWriter, reactor, 0)
+            self._openHandleFailures(call_on_failure, stdout_path, os.O_RDONLY,
+                process.ProcessReader, reactor, 1)
+            self._openHandleFailures(call_on_failure, stderr_path, os.O_RDONLY,
+                process.ProcessReader, reactor, 2)
+        except:
+            for func, args in call_on_failure:
+                func(*args)
+            raise
         self.pipes['exit'] = self._exiter
 
     def _getReason(self, status):
@@ -248,12 +269,14 @@
     # Implemented because ProcessWriter/ProcessReader want to call it
     # Copied from twisted.internet.Process
     def childConnectionLost(self, childFD, reason):
-        close = getattr(self.pipes[childFD], 'close', None)
-        if close is not None:
-            close()
-        else:
-            os.close(self.pipes[childFD].fileno())
-        del self.pipes[childFD]
+        pipe = self.pipes.get(childFD)
+        if pipe is not None:
+            close = getattr(pipe, 'close', None)
+            if close is not None:
+                close()
+            else:
+                os.close(self.pipes[childFD].fileno())
+            del self.pipes[childFD]
         try:
             self.proto.childConnectionLost(childFD)
         except: