← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~spiv/launchpad/haproxy-for-twisted-services into lp:launchpad

 

Andrew Bennetts has proposed merging lp:~spiv/launchpad/haproxy-for-twisted-services into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  #702024 Restarting codehosting SSH server drops existing connections
  https://bugs.launchpad.net/bugs/702024

For more details, see:
https://code.launchpad.net/~spiv/launchpad/haproxy-for-twisted-services/+merge/46996

This enhances the codehosting SSH daemon to:

 * not exit immediately upon SIGTERM (or SIGINT), instead stop listening on the SSH port immediately but postpone exiting until all existing connections have finished;
 * provide an HTTP service that returns status code 200 (OK) normally, and 503 (Service Unavailable) after SIGTERM/INT.

The idea we can use multiple instances of this SSH daemon + a frontend like HAProxy to provide graceful server restarts the same sort of way we restart the lpnet services:
 * SIGTERM some fraction of the codehosting instances
 * Those instances will stop listening on their port
 * The frontend will observe (via the HTTP status service) that those services are no longer to be used for new connections
 * when all outstanding connections on those instances have finished they will stop, and then can be restarted (with new configs or code or whatever)
 * repeat this process for the other instances

The code itself is fairly small: it adds some infrastructure in lp.services.twistedsupport.gracefulshutdown, then uses them in sftp.tac.  As a followup I'll apply these helpers to other tac files too (librarian, poppy, maybe others?).

The significant part of the HTTP service is the the status code (200 vs. 503), but it also emits some basic information about the number of connections and their addresses for the benefit of humans looking at the service.

There's a first guess at likely production config changes at <lp:~spiv/lp-production-configs/codehost-codehost>: basically we need to allocate ports (and OOPS prefixes) for each instance, just like we do for the lpnet* instances.  We'll also need to arrange for HAProxy or whatever frontend to be deployed too.

Some of this code is fairly generic, and perhaps should be contributed to upstream (Twisted).

We should obviously take care to make sure this works nicely on qastaging before putting this on production!


-- 
https://code.launchpad.net/~spiv/launchpad/haproxy-for-twisted-services/+merge/46996
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~spiv/launchpad/haproxy-for-twisted-services into lp:launchpad.
=== modified file 'daemons/sftp.tac'
--- daemons/sftp.tac	2010-10-20 18:43:29 +0000
+++ daemons/sftp.tac	2011-01-20 23:34:52 +0000
@@ -6,6 +6,7 @@
 # or similar.  Refer to the twistd(1) man page for details.
 
 from twisted.application import service
+from twisted.protocols.policies import TimeoutFactory
 
 from canonical.config import config
 from canonical.launchpad.daemons import readyservice
@@ -14,10 +15,32 @@
     ACCESS_LOG_NAME, get_key_path, LOG_NAME, make_portal, OOPS_CONFIG_SECTION,
     PRIVATE_KEY_FILE, PUBLIC_KEY_FILE)
 from lp.services.sshserver.service import SSHService
+from lp.services.twistedsupport.gracefulshutdown import (
+    ConnTrackingFactoryWrapper, ShutdownCleanlyService, OrderedMultiService,
+    make_web_status_service)
 
 
 # Construct an Application that has the codehosting SSH server.
 application = service.Application('sftponly')
+
+ordered_services = OrderedMultiService()
+ordered_services.setServiceParent(application)
+
+tracked_factories = set()
+
+web_svc = make_web_status_service(
+    config.codehosting.web_status_port, tracked_factories)
+web_svc.setServiceParent(ordered_services)
+
+shutdown_cleanly_svc = ShutdownCleanlyService(tracked_factories)
+shutdown_cleanly_svc.setServiceParent(ordered_services)
+
+def ssh_factory_decorator(factory):
+    f = TimeoutFactory(factory, timeoutPeriod=config.codehosting.idle_timeout)
+    f = ConnTrackingFactoryWrapper(f)
+    tracked_factories.add(f)
+    return f
+
 svc = SSHService(
     portal=make_portal(),
     private_key_path=get_key_path(PRIVATE_KEY_FILE),
@@ -27,9 +50,9 @@
     access_log=ACCESS_LOG_NAME,
     access_log_path=config.codehosting.access_log,
     strport=config.codehosting.port,
-    idle_timeout=config.codehosting.idle_timeout,
+    factory_decorator=ssh_factory_decorator,
     banner=config.codehosting.banner)
-svc.setServiceParent(application)
+svc.setServiceParent(shutdown_cleanly_svc)
 
 # Service that announces when the daemon is ready
 readyservice.ReadyService().setServiceParent(application)

=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf	2011-01-05 05:09:24 +0000
+++ lib/canonical/config/schema-lazr.conf	2011-01-20 23:34:52 +0000
@@ -455,6 +455,13 @@
 # generate a diff for a merge proposal (in the UpdatePreviewDiffJob).
 update_preview_diff_ready_timeout: 15
 
+# Web status port
+#
+# An HTTP service that will have status 200 OK when the service is available
+# for more connections, and 503 Service Unavailable when it is in the process
+# of shutting down and so should not receive any more connections.
+web_status_port = tcp:8022
+
 
 [codeimport]
 # Where the Bazaar imports are stored.
@@ -1640,7 +1647,7 @@
 # See [error_reports].
 copy_to_zlog: False
 
-# The codehosting access log location. Information such as connection, SSH
+# The poppy access log location. Information such as connection, SSH
 # login and session start times will be logged here.
 access_log: /tmp/poppy-access.log
 

=== modified file 'lib/lp/codehosting/tests/test_acceptance.py'
--- lib/lp/codehosting/tests/test_acceptance.py	2010-10-21 03:22:06 +0000
+++ lib/lp/codehosting/tests/test_acceptance.py	2011-01-20 23:34:52 +0000
@@ -12,6 +12,7 @@
 import subprocess
 import sys
 import unittest
+import urllib2
 import xmlrpclib
 
 import bzrlib.branch
@@ -685,6 +686,16 @@
         self.assertTrue(
             message in last_line, '%r not in %r' % (message, last_line))
 
+    def test_web_status_available(self):
+        # There is an HTTP service that reports whether the SSH server is
+        # available for new connections.
+        # Munge the config value in strport format into a URL.
+        self.assertEqual('tcp:', config.codehosting.web_status_port[:4])
+        port = int(config.codehosting.web_status_port[4:])
+        web_status_url = 'http://localhost:%d/' % port
+        urllib2.urlopen(web_status_url)
+
+
 
 def make_server_tests(base_suite, servers):
     from lp.codehosting.tests.helpers import (

=== modified file 'lib/lp/services/sshserver/service.py'
--- lib/lp/services/sshserver/service.py	2010-08-20 20:31:18 +0000
+++ lib/lp/services/sshserver/service.py	2011-01-20 23:34:52 +0000
@@ -23,7 +23,6 @@
 from twisted.conch.ssh.keys import Key
 from twisted.conch.ssh.transport import SSHServerTransport
 from twisted.internet import defer
-from twisted.protocols.policies import TimeoutFactory
 from zope.event import notify
 
 from lp.services.sshserver import (
@@ -128,7 +127,7 @@
 
     def __init__(self, portal, private_key_path, public_key_path,
                  oops_configuration, main_log, access_log,
-                 access_log_path, strport='tcp:22', idle_timeout=3600,
+                 access_log_path, strport='tcp:22', factory_decorator=None,
                  banner=None):
         """Construct an SSH service.
 
@@ -145,18 +144,16 @@
         :param access_log_path: The path to the access log file.
         :param strport: The port to run the server on, expressed in Twisted's
             "strports" mini-language. Defaults to 'tcp:22'.
-        :param idle_timeout: The number of seconds to wait before killing a
-            connection that isn't doing anything. Defaults to 3600.
         :param banner: An announcement printed to users when they connect.
             By default, announce nothing.
         """
-        ssh_factory = TimeoutFactory(
-            Factory(
-                portal,
-                private_key=Key.fromFile(private_key_path),
-                public_key=Key.fromFile(public_key_path),
-                banner=banner),
-            timeoutPeriod=idle_timeout)
+        ssh_factory = Factory(
+            portal,
+            private_key=Key.fromFile(private_key_path),
+            public_key=Key.fromFile(public_key_path),
+            banner=banner)
+        if factory_decorator is not None:
+            ssh_factory = factory_decorator(ssh_factory)
         self.service = strports.service(strport, ssh_factory)
         self._oops_configuration = oops_configuration
         self._main_log = main_log

=== added file 'lib/lp/services/twistedsupport/gracefulshutdown.py'
--- lib/lp/services/twistedsupport/gracefulshutdown.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/twistedsupport/gracefulshutdown.py	2011-01-20 23:34:52 +0000
@@ -0,0 +1,111 @@
+# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Utilities for graceful shutdown of Twisted services."""
+
+__metaclass__ = type
+__all__ = [
+    'ConnTrackingFactoryWrapper',
+    'ShutdownCleanlyService',
+    'ServerAvailableResource',
+    'OrderedMultiService',
+    ]
+
+
+from twisted.application import service, strports
+from twisted.protocols.policies import WrappingFactory
+from twisted.internet.defer import (
+    Deferred, gatherResults, maybeDeferred, inlineCallbacks)
+from twisted.web import resource, server
+from zope.interface import implements
+
+
+class ConnTrackingFactoryWrapper(WrappingFactory):
+
+    allConnectionsGone = None
+
+    def isAvailable(self):
+        return self.allConnectionsGone is None
+
+    def stopFactory(self):
+        WrappingFactory.stopFactory(self)
+        self.allConnectionsGone = Deferred()
+        if len(self.protocols) == 0:
+            self.allConnectionsGone.callback(None)
+
+    def unregisterProtocol(self, p):
+        WrappingFactory.unregisterProtocol(self, p)
+        if len(self.protocols) == 0:
+            if self.allConnectionsGone is not None:
+                self.allConnectionsGone.callback(None)
+
+
+class ShutdownCleanlyService(service.MultiService):
+
+    def __init__(self, factories):
+        self.factories = factories
+        service.MultiService.__init__(self)
+
+    def stopService(self):
+        d = maybeDeferred(service.MultiService.stopService, self)
+        return d.addCallback(self._cbServicesStopped)
+
+    def _cbServicesStopped(self, ignored):
+        return gatherResults([f.allConnectionsGone for f in self.factories])
+
+
+class ServerAvailableResource(resource.Resource):
+
+    def __init__(self, tracked_factories):
+        resource.Resource.__init__(self)
+        self.tracked_factories = tracked_factories
+
+    def _render_common(self, request):
+        state = 'available'
+        for tracked in self.tracked_factories:
+            if not tracked.isAvailable():
+                state = 'unavailable'
+        if state == 'available':
+            request.setResponseCode(200)
+        else:
+            request.setResponseCode(503)
+        request.setHeader('Content-Type', 'text/plain')
+
+    def render_GET(self, request):
+        state = self._render_common(request)
+        # Generate a bit of text for humans' benefit.
+        tracked_connections = set()
+        for tracked in self.tracked_factories:
+            tracked_connections.update(tracked.protocols)
+        return '%s\n\n%d connections: \n\n%s\n' % (
+            state, len(tracked_connections),
+            '\n'.join(
+                [str(c.transport.getPeer()) for c in tracked_connections]))
+
+    def render_HEAD(self, request):
+        self._render_common(request)
+        return ''
+
+
+class OrderedMultiService(service.MultiService):
+    """
+    A service that starts services in the order they are attached, and stops
+    them in reverse order (waiting for each to stop before stopping the next).
+    """
+
+    implements(service.IServiceCollection)
+
+    @inlineCallbacks
+    def stopService(self):
+        service.Service.stopService(self) # intentionally skip MultiService.stopService
+        while self.services:
+            svc = self.services.pop()
+            yield maybeDeferred(svc.stopService)
+
+
+def make_web_status_service(strport, tracking_factories):
+    server_available_resource = ServerAvailableResource(tracking_factories)
+    web_root = resource.Resource()
+    web_root.putChild('', server_available_resource)
+    web_factory = server.Site(web_root)
+    return strports.service(strport, web_factory)

=== added file 'lib/lp/services/twistedsupport/tests/test_gracefulshutdown.py'
--- lib/lp/services/twistedsupport/tests/test_gracefulshutdown.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/twistedsupport/tests/test_gracefulshutdown.py	2011-01-20 23:34:52 +0000
@@ -0,0 +1,171 @@
+# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for our graceful daemon shutdown support."""
+
+__metaclass__ = type
+
+from lp.testing import TestCase
+
+from lp.services.twistedsupport import gracefulshutdown
+
+from twisted.application import service
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import Factory, Protocol
+from twisted.web import http
+
+
+class TestConnTrackingFactoryWrapper(TestCase):
+
+    def test_isAvailable_initial_state(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        self.assertTrue(ctf.isAvailable())
+
+    def test_allConnectionsGone_when_no_connections(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        self.was_fired = False
+        self.assertTrue(ctf.isAvailable())
+        ctf.stopFactory()
+        self.assertFalse(ctf.isAvailable())
+        def cb(ignored):
+            self.was_fired = True
+        ctf.allConnectionsGone.addCallback(cb)
+        self.assertTrue(self.was_fired)
+
+    def test_allConnectionsGone_when_exactly_one_connection(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        # Make one connection
+        p = Protocol()
+        ctf.registerProtocol(p)
+        ctf.stopFactory()
+        self.was_fired = False
+        def cb(ignored):
+            self.was_fired = True
+        ctf.allConnectionsGone.addCallback(cb)
+        self.assertFalse(self.was_fired)
+        ctf.unregisterProtocol(p)
+        self.assertTrue(self.was_fired)
+
+    def test_allConnectionsGone_when_more_than_one_connection(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        # Make two connection
+        p1 = Protocol()
+        p2 = Protocol()
+        ctf.registerProtocol(p1)
+        ctf.registerProtocol(p2)
+        ctf.stopFactory()
+        self.was_fired = False
+        def cb(ignored):
+            self.was_fired = True
+        ctf.allConnectionsGone.addCallback(cb)
+        self.assertFalse(self.was_fired)
+        ctf.unregisterProtocol(p1)
+        self.assertFalse(self.was_fired)
+        ctf.unregisterProtocol(p2)
+        self.assertTrue(self.was_fired)
+
+    def test_unregisterProtocol_before_stopFactory(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        p = Protocol()
+        ctf.registerProtocol(p)
+        ctf.unregisterProtocol(p) # No error raised.
+
+
+class TestServerAvailableResource(TestCase):
+
+    def make_dummy_http_request(self):
+        return http.Request('fake channel', True)
+
+    def test_200_when_available(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        r = gracefulshutdown.ServerAvailableResource([ctf])
+        request = self.make_dummy_http_request()
+        r.render_HEAD(request)
+        self.assertEqual(200, request.code)
+        # GET works too
+        request = self.make_dummy_http_request()
+        r.render_GET(request)
+        self.assertEqual(200, request.code)
+
+    def test_503_after_shutdown_starts(self):
+        ctf = gracefulshutdown.ConnTrackingFactoryWrapper(Factory())
+        r = gracefulshutdown.ServerAvailableResource([ctf])
+        ctf.stopFactory()
+        request = self.make_dummy_http_request()
+        r.render_HEAD(request)
+        self.assertEqual(503, request.code)
+        # GET works too
+        request = self.make_dummy_http_request()
+        r.render_GET(request)
+        self.assertEqual(503, request.code)
+
+
+class TestService(service.Service):
+
+    def __init__(self, name, call_log):
+        self.setName(name)
+        self._call_log = call_log
+
+    def startService(self):
+        self._call_log.append(('startService', self.name))
+
+    def stopService(self):
+        self._call_log.append(('stopService', self.name))
+
+
+class ServiceWithAsyncStop(service.Service):
+    def __init__(self):
+        self.stop_called = False
+        self.stopDeferred = Deferred()
+    def stopService(self):
+        self.stop_called = True
+        return self.stopDeferred
+
+
+class TestOrderedMultiService(TestCase):
+
+    def test_startService_starts_services_in_the_order_they_were_added(self):
+        oms = gracefulshutdown.OrderedMultiService()
+        call_log = []
+        service1 = TestService('svc one', call_log)
+        service1.setServiceParent(oms)
+        service2 = TestService('svc two', call_log)
+        service2.setServiceParent(oms)
+        oms.startService()
+        self.assertEqual(
+            [('startService', 'svc one'), ('startService', 'svc two')],
+            call_log)
+        
+    def test_stopService_stops_in_reverse_order(self):
+        oms = gracefulshutdown.OrderedMultiService()
+        call_log = []
+        service1 = TestService('svc one', call_log)
+        service1.setServiceParent(oms)
+        service2 = TestService('svc two', call_log)
+        service2.setServiceParent(oms)
+        oms.startService()
+        del call_log[:]
+        d = oms.stopService()
+        self.assertEqual(
+            [('stopService', 'svc two'), ('stopService', 'svc one')],
+            call_log)
+        
+    def test_services_are_stopped_in_series_not_parallel(self):
+        oms = gracefulshutdown.OrderedMultiService()
+        service1 = ServiceWithAsyncStop()
+        service1.setServiceParent(oms)
+        service2 = ServiceWithAsyncStop()
+        service2.setServiceParent(oms)
+        oms.startService()
+        self.all_stopped = False
+        def cb_all_stopped(ignored):
+            self.all_stopped = True
+        oms.stopService().addCallback(cb_all_stopped)
+        self.assertFalse(self.all_stopped)
+        self.assertFalse(service1.stop_called)
+        self.assertTrue(service2.stop_called)
+        service2.stopDeferred.callback(None)
+        self.assertFalse(self.all_stopped)
+        self.assertTrue(service1.stop_called)
+        service1.stopDeferred.callback(None)
+        self.assertTrue(self.all_stopped)


Follow ups