← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master.

Commit message:
Rewrite buildd-manager file fetching using threads

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1866868 in Launchpad itself: "buildd-manager frequently gets stuck and stops gathering files from builders"
  https://bugs.launchpad.net/launchpad/+bug/1866868

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/381668

In production, buildd-manager's single thread often seems to fail to keep up with incoming data (Canonical employees can look at https://portal.admin.canonical.com/C125027).  We speculate that this may be because the sketchy use of blocking database calls in the reactor thread may finally have caught up with us, so let's try using straightforward `requests` in a thread pool instead.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master.
diff --git a/lib/lp/buildmaster/interactor.py b/lib/lp/buildmaster/interactor.py
index 4f809c4..e711862 100644
--- a/lib/lp/buildmaster/interactor.py
+++ b/lib/lp/buildmaster/interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -11,21 +11,22 @@ __all__ = [
 from collections import namedtuple
 import logging
 import os.path
+import sys
 import tempfile
+import traceback
 
+from requests import Session
+from requests_toolbelt.downloadutils import stream
+import six
 from six.moves.urllib.parse import urlparse
 import transaction
 from twisted.internet import (
     defer,
     reactor as default_reactor,
+    threads,
     )
-from twisted.internet.protocol import Protocol
+from twisted.python.threadpool import ThreadPool
 from twisted.web import xmlrpc
-from twisted.web.client import (
-    Agent,
-    HTTPConnectionPool,
-    ResponseDone,
-    )
 from zope.security.proxy import (
     isinstance as zope_isinstance,
     removeSecurityProxy,
@@ -55,94 +56,35 @@ class QuietQueryFactory(xmlrpc._QueryFactory):
     noisy = False
 
 
-class FileWritingProtocol(Protocol):
-    """A protocol that saves data to a file."""
-
-    def __init__(self, finished, file_to_write):
-        self.finished = finished
-        if isinstance(file_to_write, (bytes, unicode)):
-            self.filename = file_to_write
-            self.file = tempfile.NamedTemporaryFile(
-                mode="wb", prefix=os.path.basename(self.filename) + "_",
-                dir=os.path.dirname(self.filename), delete=False)
-        else:
-            self.filename = None
-            self.file = file_to_write
-
-    def dataReceived(self, data):
-        try:
-            self.file.write(data)
-        except IOError:
-            try:
-                self.file.close()
-            except IOError:
-                pass
-            self.file = None
-            self.finished.errback()
-
-    def connectionLost(self, reason):
-        try:
-            if self.file is not None:
-                self.file.close()
-            if self.filename is not None and reason.check(ResponseDone):
-                os.rename(self.file.name, self.filename)
-        except IOError:
-            self.finished.errback()
-        else:
-            if reason.check(ResponseDone):
-                self.finished.callback(None)
-            else:
-                self.finished.errback(reason)
-
-
-class LimitedHTTPConnectionPool(HTTPConnectionPool):
-    """A connection pool with an upper limit on open connections."""
-
-    # XXX cjwatson 2016-05-25: This actually only limits active connections,
-    # and doesn't count idle but open connections towards the limit; this is
-    # because it's very difficult to do the latter with HTTPConnectionPool's
-    # current design.  Users of this pool must therefore expect some
-    # additional file descriptors to be open for idle connections.
-
-    def __init__(self, reactor, limit, persistent=True):
-        super(LimitedHTTPConnectionPool, self).__init__(
-            reactor, persistent=persistent)
-        self._semaphore = defer.DeferredSemaphore(limit)
+_default_threadpool = None
+_default_threadpool_shutdown = None
 
-    def getConnection(self, key, endpoint):
-        d = self._semaphore.acquire()
-        d.addCallback(
-            lambda _: super(LimitedHTTPConnectionPool, self).getConnection(
-                key, endpoint))
-        return d
-
-    def _putConnection(self, key, connection):
-        super(LimitedHTTPConnectionPool, self)._putConnection(key, connection)
-        # Only release the semaphore in the next main loop iteration; if we
-        # release it here then the next request may start using this
-        # connection's parser before this request has quite finished with
-        # it.
-        self._reactor.callLater(0, self._semaphore.release)
-
-
-_default_pool = None
 
-
-def default_pool(reactor=None):
-    global _default_pool
+def default_threadpool(reactor=None):
+    global _default_threadpool, _default_threadpool_shutdown
     if reactor is None:
         reactor = default_reactor
-    if _default_pool is None:
-        # Circular import.
-        from lp.buildmaster.manager import SlaveScanner
-        # Short cached connection timeout to avoid potential weirdness with
-        # virtual builders that reboot frequently.
-        _default_pool = LimitedHTTPConnectionPool(
-            reactor, config.builddmaster.download_connections)
-        _default_pool.maxPersistentPerHost = (
-            config.builddmaster.idle_download_connections_per_builder)
-        _default_pool.cachedConnectionTimeout = SlaveScanner.SCAN_INTERVAL
-    return _default_pool
+    if _default_threadpool is None:
+        _default_threadpool = ThreadPool(
+            maxthreads=config.builddmaster.download_connections,
+            name=six.ensure_str('buildd-manager-requests'))
+        _default_threadpool.start()
+        shutdown_id = reactor.addSystemEventTrigger(
+            'during', 'shutdown', _default_threadpool.stop)
+        _default_threadpool_shutdown = (reactor, shutdown_id)
+    return _default_threadpool
+
+
+def shut_down_default_threadpool():
+    """Shut down the default threadpool.  Used in test cleanup."""
+    global _default_threadpool, _default_threadpool_shutdown
+    if _default_threadpool is not None:
+        _default_threadpool.stop()
+        _default_threadpool = None
+    if _default_threadpool_shutdown is not None:
+        reactor, shutdown_id = _default_threadpool_shutdown
+        reactor.removeSystemEventTrigger(shutdown_id)
+        _default_threadpool_shutdown = None
 
 
 class BuilderSlave(object):
@@ -157,7 +99,8 @@ class BuilderSlave(object):
     # many false positives in your test run and will most likely break
     # production.
 
-    def __init__(self, proxy, builder_url, vm_host, timeout, reactor, pool):
+    def __init__(self, proxy, builder_url, vm_host, timeout, reactor,
+                 threadpool):
         """Initialize a BuilderSlave.
 
         :param proxy: An XML-RPC proxy, implementing 'callRemote'. It must
@@ -173,13 +116,13 @@ class BuilderSlave(object):
         if reactor is None:
             reactor = default_reactor
         self.reactor = reactor
-        if pool is None:
-            pool = default_pool(reactor=reactor)
-        self.pool = pool
+        if threadpool is None:
+            threadpool = default_threadpool(reactor=reactor)
+        self.threadpool = threadpool
 
     @classmethod
     def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,
-                         proxy=None, pool=None):
+                         proxy=None, threadpool=None):
         """Create and return a `BuilderSlave`.
 
         :param builder_url: The URL of the slave buildd machine,
@@ -188,7 +131,7 @@ class BuilderSlave(object):
             here.
         :param reactor: Used by tests to override the Twisted reactor.
         :param proxy: Used By tests to override the xmlrpc.Proxy.
-        :param pool: Used by tests to override the HTTPConnectionPool.
+        :param threadpool: Used by tests to override the ThreadPool.
         """
         rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')
         if proxy is None:
@@ -197,7 +140,8 @@ class BuilderSlave(object):
             server_proxy.queryFactory = QuietQueryFactory
         else:
             server_proxy = proxy
-        return cls(server_proxy, builder_url, vm_host, timeout, reactor, pool)
+        return cls(
+            server_proxy, builder_url, vm_host, timeout, reactor, threadpool)
 
     def _with_timeout(self, d, timeout=None):
         return cancel_on_timeout(d, timeout or self.timeout, self.reactor)
@@ -236,6 +180,7 @@ class BuilderSlave(object):
         """Get the URL for a file on the builder with a given SHA-1."""
         return urlappend(self._file_cache_url, sha1).encode('utf8')
 
+    @defer.inlineCallbacks
     def getFile(self, sha_sum, file_to_write, logger=None):
         """Fetch a file from the builder.
 
@@ -248,26 +193,42 @@ class BuilderSlave(object):
             errback with the error string.
         """
         file_url = self.getURL(sha_sum)
-        d = Agent(self.reactor, pool=self.pool).request("GET", file_url)
-
-        def got_response(response):
-            finished = defer.Deferred()
-            response.deliverBody(FileWritingProtocol(finished, file_to_write))
-            return finished
 
-        def log_success(result):
-            logger.info("Grabbed %s" % file_url)
-            return result
-
-        def log_failure(failure):
-            logger.info("Failed to grab %s: %s\n%s" % (
-                file_url, failure.getErrorMessage(), failure.getTraceback()))
-            return failure
+        def download():
+            session = Session()
+            session.trust_env = False
+            response = session.get(file_url, timeout=self.timeout, stream=True)
+            response.raise_for_status()
+            if isinstance(file_to_write, six.string_types):
+                f = tempfile.NamedTemporaryFile(
+                    mode="wb", prefix=os.path.basename(file_to_write) + "_",
+                    dir=os.path.dirname(file_to_write), delete=False)
+            else:
+                f = file_to_write
+            try:
+                stream.stream_response_to_file(response, path=f)
+            except Exception:
+                f.close()
+                os.unlink(f.name)
+                raise
+            else:
+                f.close()
+                if isinstance(file_to_write, six.string_types):
+                    os.rename(f.name, file_to_write)
 
-        d.addCallback(got_response)
-        if logger is not None:
-            d.addCallbacks(log_success, log_failure)
-        return d
+        try:
+            session = Session()
+            session.trust_env = False
+            yield threads.deferToThreadPool(
+                self.reactor, self.threadpool, download)
+            if logger is not None:
+                logger.info("Grabbed %s" % file_url)
+        except Exception as e:
+            if logger is not None:
+                logger.info("Failed to grab %s: %s\n%s" % (
+                    file_url, str(e),
+                    " ".join(traceback.format_exception(*sys.exc_info()))))
+            raise
 
     def getFiles(self, files, logger=None):
         """Fetch many files from the builder.
diff --git a/lib/lp/buildmaster/tests/mock_slaves.py b/lib/lp/buildmaster/tests/mock_slaves.py
index 32633ea..441be0b 100644
--- a/lib/lp/buildmaster/tests/mock_slaves.py
+++ b/lib/lp/buildmaster/tests/mock_slaves.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Mock Build objects for tests soyuz buildd-system."""
@@ -313,14 +313,14 @@ class SlaveTestHelpers(fixtures.Fixture):
                 lambda: open(tachandler.logfile, 'r').readlines()))
         return tachandler
 
-    def getClientSlave(self, reactor=None, proxy=None, pool=None):
+    def getClientSlave(self, reactor=None, proxy=None, threadpool=None):
         """Return a `BuilderSlave` for use in testing.
 
         Points to a fixed URL that is also used by `BuilddSlaveTestSetup`.
         """
         return BuilderSlave.makeBuilderSlave(
             self.base_url, 'vmhost', config.builddmaster.socket_timeout,
-            reactor=reactor, proxy=proxy, pool=pool)
+            reactor=reactor, proxy=proxy, threadpool=threadpool)
 
     def makeCacheFile(self, tachandler, filename, contents=b'something'):
         """Make a cache file available on the remote slave.
diff --git a/lib/lp/buildmaster/tests/test_interactor.py b/lib/lp/buildmaster/tests/test_interactor.py
index 8514bb5..5ace1d6 100644
--- a/lib/lp/buildmaster/tests/test_interactor.py
+++ b/lib/lp/buildmaster/tests/test_interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test BuilderInteractor features."""
@@ -16,13 +16,8 @@ import signal
 import tempfile
 
 from lpbuildd.slave import BuilderStatus
-from lpbuildd.tests.harness import BuilddSlaveTestSetup
 from six.moves import xmlrpc_client
-from testtools.matchers import (
-    ContainsAll,
-    HasLength,
-    MatchesDict,
-    )
+from testtools.matchers import ContainsAll
 from testtools.testcase import ExpectedException
 from testtools.twistedsupport import (
     assert_fails_with,
@@ -31,12 +26,10 @@ from testtools.twistedsupport import (
     SynchronousDeferredRunTest,
     )
 import treq
-from twisted.internet import (
-    defer,
-    reactor as default_reactor,
-    )
+from twisted.internet import defer
 from twisted.internet.task import Clock
 from twisted.python.failure import Failure
+from twisted.python.threadpool import ThreadPool
 from zope.security.proxy import removeSecurityProxy
 
 from lp.buildmaster.enums import (
@@ -49,7 +42,7 @@ from lp.buildmaster.interactor import (
     BuilderInteractor,
     BuilderSlave,
     extract_vitals_from_db,
-    LimitedHTTPConnectionPool,
+    shut_down_default_threadpool,
     )
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonIsolationError,
@@ -123,6 +116,10 @@ class TestBuilderInteractor(TestCase):
 
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=10)
 
+    def setUp(self):
+        super(TestBuilderInteractor, self).setUp()
+        self.addCleanup(shut_down_default_threadpool)
+
     def test_extractBuildStatus_baseline(self):
         # extractBuildStatus picks the name of the build status out of a
         # dict describing the slave's status.
@@ -474,6 +471,7 @@ class TestSlave(TestCase):
     def setUp(self):
         super(TestSlave, self).setUp()
         self.slave_helper = self.useFixture(SlaveTestHelpers())
+        self.addCleanup(shut_down_default_threadpool)
 
     def test_abort(self):
         slave = self.slave_helper.getClientSlave()
@@ -673,8 +671,11 @@ class TestSlaveTimeouts(TestCase):
         self.slave_helper = self.useFixture(SlaveTestHelpers())
         self.clock = Clock()
         self.proxy = DeadProxy("url")
+        threadpool = ThreadPool()
+        threadpool.start()
+        self.addCleanup(threadpool.stop)
         self.slave = self.slave_helper.getClientSlave(
-            reactor=self.clock, proxy=self.proxy)
+            reactor=self.clock, proxy=self.proxy, threadpool=threadpool)
 
     def assertCancelled(self, d, timeout=None):
         self.clock.advance((timeout or config.builddmaster.socket_timeout) + 1)
@@ -726,7 +727,12 @@ class TestSlaveConnectionTimeouts(TestCase):
         # only the config value should.
         self.pushConfig('builddmaster', socket_timeout=180)
 
-        slave = self.slave_helper.getClientSlave(reactor=self.clock)
+        threadpool = ThreadPool()
+        threadpool.start()
+        self.addCleanup(threadpool.stop)
+
+        slave = self.slave_helper.getClientSlave(
+            reactor=self.clock, threadpool=threadpool)
         d = slave.echo()
         # Advance past the 30 second timeout.  The real reactor will
         # never call connectTCP() since we're not spinning it up.  This
@@ -750,6 +756,7 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
     def setUp(self):
         super(TestSlaveWithLibrarian, self).setUp()
         self.slave_helper = self.useFixture(SlaveTestHelpers())
+        self.addCleanup(shut_down_default_threadpool)
 
     def test_ensurepresent_librarian(self):
         # ensurepresent, when given an http URL for a file will download the
@@ -803,7 +810,6 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
             for sha1, local_file in files:
                 with open(local_file) as f:
                     self.assertEqual(content_map[sha1], f.read())
-            return slave.pool.closeCachedConnections()
 
         def finished_uploading(ignored):
             d = slave.getFiles(files)
@@ -831,10 +837,13 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
     def test_getFiles_open_connections(self):
         # getFiles honours the configured limit on active download
         # connections.
-        pool = LimitedHTTPConnectionPool(default_reactor, 2)
+        threadpool = ThreadPool(minthreads=1, maxthreads=2)
+        threadpool.start()
+        self.addCleanup(threadpool.stop)
+
         contents = [self.factory.getUniqueString() for _ in range(10)]
         self.slave_helper.getServerSlave()
-        slave = self.slave_helper.getClientSlave(pool=pool)
+        slave = self.slave_helper.getClientSlave(threadpool=threadpool)
         files = []
         content_map = {}
 
@@ -844,12 +853,8 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
             for sha1, local_file in files:
                 with open(local_file) as f:
                     self.assertEqual(content_map[sha1], f.read())
-            # Only two connections were used.
-            port = BuilddSlaveTestSetup().daemon_port
-            self.assertThat(
-                slave.pool._connections,
-                MatchesDict({("http", "localhost", port): HasLength(2)}))
-            return slave.pool.closeCachedConnections()
+            # Only two workers were used.
+            self.assertEqual(2, threadpool.workers)
 
         def finished_uploading(ignored):
             d = slave.getFiles(files)
@@ -884,7 +889,6 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
         yield slave.getFiles([(lf.content.sha1, os.fdopen(temp_fd, "w"))])
         with open(temp_name) as f:
             self.assertEqual('content', f.read())
-        yield slave.pool.closeCachedConnections()
 
     @defer.inlineCallbacks
     def test_getFiles_with_empty_file(self):
@@ -898,4 +902,3 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
         yield slave.getFiles([(empty_sha1, temp_name)])
         with open(temp_name) as f:
             self.assertEqual(b'', f.read())
-        yield slave.pool.closeCachedConnections()
diff --git a/lib/lp/buildmaster/tests/test_manager.py b/lib/lp/buildmaster/tests/test_manager.py
index 491d2d5..d814bab 100644
--- a/lib/lp/buildmaster/tests/test_manager.py
+++ b/lib/lp/buildmaster/tests/test_manager.py
@@ -2,7 +2,7 @@
 # NOTE: The first line above must stay first; do not move the copyright
 # notice to the top.  See http://www.python.org/dev/peps/pep-0263/.
 #
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the renovated slave scanner aka BuilddManager."""
@@ -40,6 +40,7 @@ from lp.buildmaster.interactor import (
     BuilderInteractor,
     BuilderSlave,
     extract_vitals_from_db,
+    shut_down_default_threadpool,
     )
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonIsolationError,
@@ -120,6 +121,7 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         hoary = ubuntu.getSeries('hoary')
         self.test_publisher.setUpDefaultDistroSeries(hoary)
         self.test_publisher.addFakeChroots(db_only=True)
+        self.addCleanup(shut_down_default_threadpool)
 
     def _resetBuilder(self, builder):
         """Reset the given builder and its job."""
@@ -660,6 +662,10 @@ class TestSlaveScannerWithLibrarian(TestCaseWithFactory):
     layer = LaunchpadZopelessLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
 
+    def setUp(self):
+        super(TestSlaveScannerWithLibrarian, self).setUp()
+        self.addCleanup(shut_down_default_threadpool)
+
     @defer.inlineCallbacks
     def test_end_to_end(self):
         # Test that SlaveScanner.scan() successfully finds, dispatches,
@@ -836,6 +842,10 @@ class TestSlaveScannerWithoutDB(TestCase):
 
     run_tests_with = AsynchronousDeferredRunTest
 
+    def setUp(self):
+        super(TestSlaveScannerWithoutDB, self).setUp()
+        self.addCleanup(shut_down_default_threadpool)
+
     def getScanner(self, builder_factory=None, interactor=None, slave=None,
                    behaviour=None):
         if builder_factory is None:
@@ -1028,6 +1038,7 @@ class TestCancellationChecking(TestCaseWithFactory):
         builder_name = BOB_THE_BUILDER_NAME
         self.builder = getUtility(IBuilderSet)[builder_name]
         self.builder.virtualized = True
+        self.addCleanup(shut_down_default_threadpool)
 
     @property
     def vitals(self):

Follow ups