← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/buildmaster-twisted-agent into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/buildmaster-twisted-agent into lp:launchpad.

Commit message:
Convert BuilderSlave to twisted.web.client.Agent, causing it to use a connection pool rather than trying to download everything at once.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1584744 in Launchpad itself: "buildd-manager dispatches all downloads at once and can hit EMFILE"
  https://bugs.launchpad.net/launchpad/+bug/1584744

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/buildmaster-twisted-agent/+merge/295593

Convert BuilderSlave to twisted.web.client.Agent, causing it to use a connection pool rather than trying to download everything at once.  The old approach could overflow open-file limits.

We'll need to give this a good workout on dogfood.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/buildmaster-twisted-agent into lp:launchpad.
=== modified file 'lib/lp/buildmaster/interactor.py'
--- lib/lp/buildmaster/interactor.py	2015-02-17 05:38:37 +0000
+++ lib/lp/buildmaster/interactor.py	2016-05-24 14:41:49 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2014 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -13,9 +13,17 @@
 from urlparse import urlparse
 
 import transaction
-from twisted.internet import defer
+from twisted.internet import (
+    defer,
+    reactor as default_reactor,
+    )
+from twisted.internet.protocol import Protocol
 from twisted.web import xmlrpc
-from twisted.web.client import downloadPage
+from twisted.web.client import (
+    Agent,
+    HTTPConnectionPool,
+    ResponseDone,
+    )
 from zope.security.proxy import (
     isinstance as zope_isinstance,
     removeSecurityProxy,
@@ -46,6 +54,56 @@
     noisy = False
 
 
+class FileWritingProtocol(Protocol):
+    """A protocol that saves data to a file."""
+
+    def __init__(self, finished, filename):
+        self.finished = finished
+        self.filename = filename
+        self.file = None
+
+    def dataReceived(self, data):
+        if self.file is None:
+            self.file = open(self.filename, "wb")
+        try:
+            self.file.write(data)
+        except IOError:
+            try:
+                self.file.close()
+            except IOError:
+                pass
+            self.file = None
+            self.finished.errback()
+
+    def connectionLost(self, reason):
+        try:
+            self.file.close()
+        except IOError:
+            self.finished.errback()
+        else:
+            if reason.check(ResponseDone):
+                self.finished.callback(None)
+            else:
+                self.finished.errback(reason)
+
+
+_default_pool = None
+
+
+def default_pool(reactor=None):
+    global _default_pool
+    if reactor is None:
+        reactor = default_reactor
+    if _default_pool is None:
+        # Circular import.
+        from lp.buildmaster.manager import SlaveScanner
+        # Short cached connection timeout to avoid potential weirdness with
+        # virtual builders that reboot frequently.
+        _default_pool = HTTPConnectionPool(reactor)
+        _default_pool.cachedConnectionTimeout = SlaveScanner.SCAN_INTERVAL
+    return _default_pool
+
+
 class BuilderSlave(object):
     """Add in a few useful methods for the XMLRPC slave.
 
@@ -58,7 +116,7 @@
     # many false positives in your test run and will most likely break
     # production.
 
-    def __init__(self, proxy, builder_url, vm_host, timeout, reactor):
+    def __init__(self, proxy, builder_url, vm_host, timeout, reactor, pool):
         """Initialize a BuilderSlave.
 
         :param proxy: An XML-RPC proxy, implementing 'callRemote'. It must
@@ -71,11 +129,16 @@
         self._file_cache_url = urlappend(builder_url, 'filecache')
         self._server = proxy
         self.timeout = timeout
+        if reactor is None:
+            reactor = default_reactor
         self.reactor = reactor
+        if pool is None:
+            pool = default_pool(reactor=reactor)
+        self.pool = pool
 
     @classmethod
     def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,
-                         proxy=None):
+                         proxy=None, pool=None):
         """Create and return a `BuilderSlave`.
 
         :param builder_url: The URL of the slave buildd machine,
@@ -84,6 +147,7 @@
             here.
         :param reactor: Used by tests to override the Twisted reactor.
         :param proxy: Used By tests to override the xmlrpc.Proxy.
+        :param pool: Used by tests to override the HTTPConnectionPool.
         """
         rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')
         if proxy is None:
@@ -92,7 +156,7 @@
             server_proxy.queryFactory = QuietQueryFactory
         else:
             server_proxy = proxy
-        return cls(server_proxy, builder_url, vm_host, timeout, reactor)
+        return cls(server_proxy, builder_url, vm_host, timeout, reactor, pool)
 
     def _with_timeout(self, d, timeout=None):
         return cancel_on_timeout(d, timeout or self.timeout, self.reactor)
@@ -138,10 +202,15 @@
             errback with the error string.
         """
         file_url = urlappend(self._file_cache_url, sha_sum).encode('utf8')
-        # If desired we can pass a param "timeout" here but let's leave
-        # it at the default value if it becomes obvious we need to
-        # change it.
-        return downloadPage(file_url, file_to_write, followRedirect=0)
+        d = Agent(self.reactor, pool=self.pool).request("GET", file_url)
+
+        def got_response(response):
+            finished = defer.Deferred()
+            response.deliverBody(FileWritingProtocol(finished, file_to_write))
+            return finished
+
+        d.addCallback(got_response)
+        return d
 
     def getFiles(self, files):
         """Fetch many files from the builder.

=== modified file 'lib/lp/buildmaster/tests/test_interactor.py'
--- lib/lp/buildmaster/tests/test_interactor.py	2015-11-04 14:30:09 +0000
+++ lib/lp/buildmaster/tests/test_interactor.py	2016-05-24 14:41:49 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test BuilderInteractor features."""
@@ -789,6 +789,7 @@
             for sha1, local_file in files:
                 with open(local_file) as f:
                     self.assertEqual(content_map[sha1], f.read())
+            return slave.pool.closeCachedConnections()
 
         def finished_uploading(ignored):
             d = slave.getFiles(files)


Follow ups