launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24556
[Merge] ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master.
Commit message:
Rewrite buildd-manager file fetching using threads
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1866868 in Launchpad itself: "buildd-manager frequently gets stuck and stops gathering files from builders"
https://bugs.launchpad.net/launchpad/+bug/1866868
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/381668
In production, buildd-manager's single thread often seems to fail to keep up with incoming data (Canonical employees can look at https://portal.admin.canonical.com/C125027). We speculate that this may be because the sketchy use of blocking database calls in the reactor thread may finally have caught up with us, so let's try using straightforward `requests` in a thread pool instead.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:buildmaster-fetch-in-thread into launchpad:master.
diff --git a/lib/lp/buildmaster/interactor.py b/lib/lp/buildmaster/interactor.py
index 4f809c4..e711862 100644
--- a/lib/lp/buildmaster/interactor.py
+++ b/lib/lp/buildmaster/interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
@@ -11,21 +11,22 @@ __all__ = [
from collections import namedtuple
import logging
import os.path
+import sys
import tempfile
+import traceback
+from requests import Session
+from requests_toolbelt.downloadutils import stream
+import six
from six.moves.urllib.parse import urlparse
import transaction
from twisted.internet import (
defer,
reactor as default_reactor,
+ threads,
)
-from twisted.internet.protocol import Protocol
+from twisted.python.threadpool import ThreadPool
from twisted.web import xmlrpc
-from twisted.web.client import (
- Agent,
- HTTPConnectionPool,
- ResponseDone,
- )
from zope.security.proxy import (
isinstance as zope_isinstance,
removeSecurityProxy,
@@ -55,94 +56,35 @@ class QuietQueryFactory(xmlrpc._QueryFactory):
noisy = False
-class FileWritingProtocol(Protocol):
- """A protocol that saves data to a file."""
-
- def __init__(self, finished, file_to_write):
- self.finished = finished
- if isinstance(file_to_write, (bytes, unicode)):
- self.filename = file_to_write
- self.file = tempfile.NamedTemporaryFile(
- mode="wb", prefix=os.path.basename(self.filename) + "_",
- dir=os.path.dirname(self.filename), delete=False)
- else:
- self.filename = None
- self.file = file_to_write
-
- def dataReceived(self, data):
- try:
- self.file.write(data)
- except IOError:
- try:
- self.file.close()
- except IOError:
- pass
- self.file = None
- self.finished.errback()
-
- def connectionLost(self, reason):
- try:
- if self.file is not None:
- self.file.close()
- if self.filename is not None and reason.check(ResponseDone):
- os.rename(self.file.name, self.filename)
- except IOError:
- self.finished.errback()
- else:
- if reason.check(ResponseDone):
- self.finished.callback(None)
- else:
- self.finished.errback(reason)
-
-
-class LimitedHTTPConnectionPool(HTTPConnectionPool):
- """A connection pool with an upper limit on open connections."""
-
- # XXX cjwatson 2016-05-25: This actually only limits active connections,
- # and doesn't count idle but open connections towards the limit; this is
- # because it's very difficult to do the latter with HTTPConnectionPool's
- # current design. Users of this pool must therefore expect some
- # additional file descriptors to be open for idle connections.
-
- def __init__(self, reactor, limit, persistent=True):
- super(LimitedHTTPConnectionPool, self).__init__(
- reactor, persistent=persistent)
- self._semaphore = defer.DeferredSemaphore(limit)
+_default_threadpool = None
+_default_threadpool_shutdown = None
- def getConnection(self, key, endpoint):
- d = self._semaphore.acquire()
- d.addCallback(
- lambda _: super(LimitedHTTPConnectionPool, self).getConnection(
- key, endpoint))
- return d
-
- def _putConnection(self, key, connection):
- super(LimitedHTTPConnectionPool, self)._putConnection(key, connection)
- # Only release the semaphore in the next main loop iteration; if we
- # release it here then the next request may start using this
- # connection's parser before this request has quite finished with
- # it.
- self._reactor.callLater(0, self._semaphore.release)
-
-
-_default_pool = None
-
-def default_pool(reactor=None):
- global _default_pool
+def default_threadpool(reactor=None):
+ global _default_threadpool, _default_threadpool_shutdown
if reactor is None:
reactor = default_reactor
- if _default_pool is None:
- # Circular import.
- from lp.buildmaster.manager import SlaveScanner
- # Short cached connection timeout to avoid potential weirdness with
- # virtual builders that reboot frequently.
- _default_pool = LimitedHTTPConnectionPool(
- reactor, config.builddmaster.download_connections)
- _default_pool.maxPersistentPerHost = (
- config.builddmaster.idle_download_connections_per_builder)
- _default_pool.cachedConnectionTimeout = SlaveScanner.SCAN_INTERVAL
- return _default_pool
+ if _default_threadpool is None:
+ _default_threadpool = ThreadPool(
+ maxthreads=config.builddmaster.download_connections,
+ name=six.ensure_str('buildd-manager-requests'))
+ _default_threadpool.start()
+ shutdown_id = reactor.addSystemEventTrigger(
+ 'during', 'shutdown', _default_threadpool.stop)
+ _default_threadpool_shutdown = (reactor, shutdown_id)
+ return _default_threadpool
+
+
+def shut_down_default_threadpool():
+ """Shut down the default threadpool. Used in test cleanup."""
+ global _default_threadpool, _default_threadpool_shutdown
+ if _default_threadpool is not None:
+ _default_threadpool.stop()
+ _default_threadpool = None
+ if _default_threadpool_shutdown is not None:
+ reactor, shutdown_id = _default_threadpool_shutdown
+ reactor.removeSystemEventTrigger(shutdown_id)
+ _default_threadpool_shutdown = None
class BuilderSlave(object):
@@ -157,7 +99,8 @@ class BuilderSlave(object):
# many false positives in your test run and will most likely break
# production.
- def __init__(self, proxy, builder_url, vm_host, timeout, reactor, pool):
+ def __init__(self, proxy, builder_url, vm_host, timeout, reactor,
+ threadpool):
"""Initialize a BuilderSlave.
:param proxy: An XML-RPC proxy, implementing 'callRemote'. It must
@@ -173,13 +116,13 @@ class BuilderSlave(object):
if reactor is None:
reactor = default_reactor
self.reactor = reactor
- if pool is None:
- pool = default_pool(reactor=reactor)
- self.pool = pool
+ if threadpool is None:
+ threadpool = default_threadpool(reactor=reactor)
+ self.threadpool = threadpool
@classmethod
def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,
- proxy=None, pool=None):
+ proxy=None, threadpool=None):
"""Create and return a `BuilderSlave`.
:param builder_url: The URL of the slave buildd machine,
@@ -188,7 +131,7 @@ class BuilderSlave(object):
here.
:param reactor: Used by tests to override the Twisted reactor.
:param proxy: Used By tests to override the xmlrpc.Proxy.
- :param pool: Used by tests to override the HTTPConnectionPool.
+ :param threadpool: Used by tests to override the ThreadPool.
"""
rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')
if proxy is None:
@@ -197,7 +140,8 @@ class BuilderSlave(object):
server_proxy.queryFactory = QuietQueryFactory
else:
server_proxy = proxy
- return cls(server_proxy, builder_url, vm_host, timeout, reactor, pool)
+ return cls(
+ server_proxy, builder_url, vm_host, timeout, reactor, threadpool)
def _with_timeout(self, d, timeout=None):
return cancel_on_timeout(d, timeout or self.timeout, self.reactor)
@@ -236,6 +180,7 @@ class BuilderSlave(object):
"""Get the URL for a file on the builder with a given SHA-1."""
return urlappend(self._file_cache_url, sha1).encode('utf8')
+ @defer.inlineCallbacks
def getFile(self, sha_sum, file_to_write, logger=None):
"""Fetch a file from the builder.
@@ -248,26 +193,42 @@ class BuilderSlave(object):
errback with the error string.
"""
file_url = self.getURL(sha_sum)
- d = Agent(self.reactor, pool=self.pool).request("GET", file_url)
-
- def got_response(response):
- finished = defer.Deferred()
- response.deliverBody(FileWritingProtocol(finished, file_to_write))
- return finished
- def log_success(result):
- logger.info("Grabbed %s" % file_url)
- return result
-
- def log_failure(failure):
- logger.info("Failed to grab %s: %s\n%s" % (
- file_url, failure.getErrorMessage(), failure.getTraceback()))
- return failure
+ def download():
+ session = Session()
+ session.trust_env = False
+ response = session.get(file_url, timeout=self.timeout, stream=True)
+ response.raise_for_status()
+ if isinstance(file_to_write, six.string_types):
+ f = tempfile.NamedTemporaryFile(
+ mode="wb", prefix=os.path.basename(file_to_write) + "_",
+ dir=os.path.dirname(file_to_write), delete=False)
+ else:
+ f = file_to_write
+ try:
+ stream.stream_response_to_file(response, path=f)
+ except Exception:
+ f.close()
+ os.unlink(f.name)
+ raise
+ else:
+ f.close()
+ if isinstance(file_to_write, six.string_types):
+ os.rename(f.name, file_to_write)
- d.addCallback(got_response)
- if logger is not None:
- d.addCallbacks(log_success, log_failure)
- return d
+ try:
+ session = Session()
+ session.trust_env = False
+ yield threads.deferToThreadPool(
+ self.reactor, self.threadpool, download)
+ if logger is not None:
+ logger.info("Grabbed %s" % file_url)
+ except Exception as e:
+ if logger is not None:
+ logger.info("Failed to grab %s: %s\n%s" % (
+ file_url, str(e),
+ " ".join(traceback.format_exception(*sys.exc_info()))))
+ raise
def getFiles(self, files, logger=None):
"""Fetch many files from the builder.
diff --git a/lib/lp/buildmaster/tests/mock_slaves.py b/lib/lp/buildmaster/tests/mock_slaves.py
index 32633ea..441be0b 100644
--- a/lib/lp/buildmaster/tests/mock_slaves.py
+++ b/lib/lp/buildmaster/tests/mock_slaves.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Mock Build objects for tests soyuz buildd-system."""
@@ -313,14 +313,14 @@ class SlaveTestHelpers(fixtures.Fixture):
lambda: open(tachandler.logfile, 'r').readlines()))
return tachandler
- def getClientSlave(self, reactor=None, proxy=None, pool=None):
+ def getClientSlave(self, reactor=None, proxy=None, threadpool=None):
"""Return a `BuilderSlave` for use in testing.
Points to a fixed URL that is also used by `BuilddSlaveTestSetup`.
"""
return BuilderSlave.makeBuilderSlave(
self.base_url, 'vmhost', config.builddmaster.socket_timeout,
- reactor=reactor, proxy=proxy, pool=pool)
+ reactor=reactor, proxy=proxy, threadpool=threadpool)
def makeCacheFile(self, tachandler, filename, contents=b'something'):
"""Make a cache file available on the remote slave.
diff --git a/lib/lp/buildmaster/tests/test_interactor.py b/lib/lp/buildmaster/tests/test_interactor.py
index 8514bb5..5ace1d6 100644
--- a/lib/lp/buildmaster/tests/test_interactor.py
+++ b/lib/lp/buildmaster/tests/test_interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Test BuilderInteractor features."""
@@ -16,13 +16,8 @@ import signal
import tempfile
from lpbuildd.slave import BuilderStatus
-from lpbuildd.tests.harness import BuilddSlaveTestSetup
from six.moves import xmlrpc_client
-from testtools.matchers import (
- ContainsAll,
- HasLength,
- MatchesDict,
- )
+from testtools.matchers import ContainsAll
from testtools.testcase import ExpectedException
from testtools.twistedsupport import (
assert_fails_with,
@@ -31,12 +26,10 @@ from testtools.twistedsupport import (
SynchronousDeferredRunTest,
)
import treq
-from twisted.internet import (
- defer,
- reactor as default_reactor,
- )
+from twisted.internet import defer
from twisted.internet.task import Clock
from twisted.python.failure import Failure
+from twisted.python.threadpool import ThreadPool
from zope.security.proxy import removeSecurityProxy
from lp.buildmaster.enums import (
@@ -49,7 +42,7 @@ from lp.buildmaster.interactor import (
BuilderInteractor,
BuilderSlave,
extract_vitals_from_db,
- LimitedHTTPConnectionPool,
+ shut_down_default_threadpool,
)
from lp.buildmaster.interfaces.builder import (
BuildDaemonIsolationError,
@@ -123,6 +116,10 @@ class TestBuilderInteractor(TestCase):
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=10)
+ def setUp(self):
+ super(TestBuilderInteractor, self).setUp()
+ self.addCleanup(shut_down_default_threadpool)
+
def test_extractBuildStatus_baseline(self):
# extractBuildStatus picks the name of the build status out of a
# dict describing the slave's status.
@@ -474,6 +471,7 @@ class TestSlave(TestCase):
def setUp(self):
super(TestSlave, self).setUp()
self.slave_helper = self.useFixture(SlaveTestHelpers())
+ self.addCleanup(shut_down_default_threadpool)
def test_abort(self):
slave = self.slave_helper.getClientSlave()
@@ -673,8 +671,11 @@ class TestSlaveTimeouts(TestCase):
self.slave_helper = self.useFixture(SlaveTestHelpers())
self.clock = Clock()
self.proxy = DeadProxy("url")
+ threadpool = ThreadPool()
+ threadpool.start()
+ self.addCleanup(threadpool.stop)
self.slave = self.slave_helper.getClientSlave(
- reactor=self.clock, proxy=self.proxy)
+ reactor=self.clock, proxy=self.proxy, threadpool=threadpool)
def assertCancelled(self, d, timeout=None):
self.clock.advance((timeout or config.builddmaster.socket_timeout) + 1)
@@ -726,7 +727,12 @@ class TestSlaveConnectionTimeouts(TestCase):
# only the config value should.
self.pushConfig('builddmaster', socket_timeout=180)
- slave = self.slave_helper.getClientSlave(reactor=self.clock)
+ threadpool = ThreadPool()
+ threadpool.start()
+ self.addCleanup(threadpool.stop)
+
+ slave = self.slave_helper.getClientSlave(
+ reactor=self.clock, threadpool=threadpool)
d = slave.echo()
# Advance past the 30 second timeout. The real reactor will
# never call connectTCP() since we're not spinning it up. This
@@ -750,6 +756,7 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
def setUp(self):
super(TestSlaveWithLibrarian, self).setUp()
self.slave_helper = self.useFixture(SlaveTestHelpers())
+ self.addCleanup(shut_down_default_threadpool)
def test_ensurepresent_librarian(self):
# ensurepresent, when given an http URL for a file will download the
@@ -803,7 +810,6 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
for sha1, local_file in files:
with open(local_file) as f:
self.assertEqual(content_map[sha1], f.read())
- return slave.pool.closeCachedConnections()
def finished_uploading(ignored):
d = slave.getFiles(files)
@@ -831,10 +837,13 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
def test_getFiles_open_connections(self):
# getFiles honours the configured limit on active download
# connections.
- pool = LimitedHTTPConnectionPool(default_reactor, 2)
+ threadpool = ThreadPool(minthreads=1, maxthreads=2)
+ threadpool.start()
+ self.addCleanup(threadpool.stop)
+
contents = [self.factory.getUniqueString() for _ in range(10)]
self.slave_helper.getServerSlave()
- slave = self.slave_helper.getClientSlave(pool=pool)
+ slave = self.slave_helper.getClientSlave(threadpool=threadpool)
files = []
content_map = {}
@@ -844,12 +853,8 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
for sha1, local_file in files:
with open(local_file) as f:
self.assertEqual(content_map[sha1], f.read())
- # Only two connections were used.
- port = BuilddSlaveTestSetup().daemon_port
- self.assertThat(
- slave.pool._connections,
- MatchesDict({("http", "localhost", port): HasLength(2)}))
- return slave.pool.closeCachedConnections()
+ # Only two workers were used.
+ self.assertEqual(2, threadpool.workers)
def finished_uploading(ignored):
d = slave.getFiles(files)
@@ -884,7 +889,6 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
yield slave.getFiles([(lf.content.sha1, os.fdopen(temp_fd, "w"))])
with open(temp_name) as f:
self.assertEqual('content', f.read())
- yield slave.pool.closeCachedConnections()
@defer.inlineCallbacks
def test_getFiles_with_empty_file(self):
@@ -898,4 +902,3 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
yield slave.getFiles([(empty_sha1, temp_name)])
with open(temp_name) as f:
self.assertEqual(b'', f.read())
- yield slave.pool.closeCachedConnections()
diff --git a/lib/lp/buildmaster/tests/test_manager.py b/lib/lp/buildmaster/tests/test_manager.py
index 491d2d5..d814bab 100644
--- a/lib/lp/buildmaster/tests/test_manager.py
+++ b/lib/lp/buildmaster/tests/test_manager.py
@@ -2,7 +2,7 @@
# NOTE: The first line above must stay first; do not move the copyright
# notice to the top. See http://www.python.org/dev/peps/pep-0263/.
#
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for the renovated slave scanner aka BuilddManager."""
@@ -40,6 +40,7 @@ from lp.buildmaster.interactor import (
BuilderInteractor,
BuilderSlave,
extract_vitals_from_db,
+ shut_down_default_threadpool,
)
from lp.buildmaster.interfaces.builder import (
BuildDaemonIsolationError,
@@ -120,6 +121,7 @@ class TestSlaveScannerScan(TestCaseWithFactory):
hoary = ubuntu.getSeries('hoary')
self.test_publisher.setUpDefaultDistroSeries(hoary)
self.test_publisher.addFakeChroots(db_only=True)
+ self.addCleanup(shut_down_default_threadpool)
def _resetBuilder(self, builder):
"""Reset the given builder and its job."""
@@ -660,6 +662,10 @@ class TestSlaveScannerWithLibrarian(TestCaseWithFactory):
layer = LaunchpadZopelessLayer
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
+ def setUp(self):
+ super(TestSlaveScannerWithLibrarian, self).setUp()
+ self.addCleanup(shut_down_default_threadpool)
+
@defer.inlineCallbacks
def test_end_to_end(self):
# Test that SlaveScanner.scan() successfully finds, dispatches,
@@ -836,6 +842,10 @@ class TestSlaveScannerWithoutDB(TestCase):
run_tests_with = AsynchronousDeferredRunTest
+ def setUp(self):
+ super(TestSlaveScannerWithoutDB, self).setUp()
+ self.addCleanup(shut_down_default_threadpool)
+
def getScanner(self, builder_factory=None, interactor=None, slave=None,
behaviour=None):
if builder_factory is None:
@@ -1028,6 +1038,7 @@ class TestCancellationChecking(TestCaseWithFactory):
builder_name = BOB_THE_BUILDER_NAME
self.builder = getUtility(IBuilderSet)[builder_name]
self.builder.virtualized = True
+ self.addCleanup(shut_down_default_threadpool)
@property
def vitals(self):
Follow ups