← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:revert-buildmaster-fetch-in-thread into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:revert-buildmaster-fetch-in-thread into launchpad:master.

Commit message:
Revert rewrite of buildd-manager file fetching

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/383979

This rewrite resulted in a very large number of CancelledError failures, which on inspection with tcpdump seemed to be because buildd-manager was taking much too long to deal with TCP packets sent by builders.  The exact cause is unclear, but I think perhaps moving work off the reactor thread meant that proportionally more of the reactor thread was taken up with doing synchronous database work, and so counterintuitively it exacerbated the problem.  My secondary plan is to see if we can move the database work off the reactor thread instead.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:revert-buildmaster-fetch-in-thread into launchpad:master.
diff --git a/lib/lp/buildmaster/interactor.py b/lib/lp/buildmaster/interactor.py
index 64a54f3..4f809c4 100644
--- a/lib/lp/buildmaster/interactor.py
+++ b/lib/lp/buildmaster/interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -6,28 +6,26 @@ __metaclass__ = type
 __all__ = [
     'BuilderInteractor',
     'extract_vitals_from_db',
-    'shut_down_default_threadpool',
     ]
 
 from collections import namedtuple
 import logging
 import os.path
-import sys
 import tempfile
-import traceback
 
-from requests import Session
-from requests_toolbelt.downloadutils import stream
-import six
 from six.moves.urllib.parse import urlparse
 import transaction
 from twisted.internet import (
     defer,
     reactor as default_reactor,
-    threads,
     )
-from twisted.python.threadpool import ThreadPool
+from twisted.internet.protocol import Protocol
 from twisted.web import xmlrpc
+from twisted.web.client import (
+    Agent,
+    HTTPConnectionPool,
+    ResponseDone,
+    )
 from zope.security.proxy import (
     isinstance as zope_isinstance,
     removeSecurityProxy,
@@ -57,35 +55,94 @@ class QuietQueryFactory(xmlrpc._QueryFactory):
     noisy = False
 
 
-_default_threadpool = None
-_default_threadpool_shutdown = None
+class FileWritingProtocol(Protocol):
+    """A protocol that saves data to a file."""
+
+    def __init__(self, finished, file_to_write):
+        self.finished = finished
+        if isinstance(file_to_write, (bytes, unicode)):
+            self.filename = file_to_write
+            self.file = tempfile.NamedTemporaryFile(
+                mode="wb", prefix=os.path.basename(self.filename) + "_",
+                dir=os.path.dirname(self.filename), delete=False)
+        else:
+            self.filename = None
+            self.file = file_to_write
+
+    def dataReceived(self, data):
+        try:
+            self.file.write(data)
+        except IOError:
+            try:
+                self.file.close()
+            except IOError:
+                pass
+            self.file = None
+            self.finished.errback()
+
+    def connectionLost(self, reason):
+        try:
+            if self.file is not None:
+                self.file.close()
+            if self.filename is not None and reason.check(ResponseDone):
+                os.rename(self.file.name, self.filename)
+        except IOError:
+            self.finished.errback()
+        else:
+            if reason.check(ResponseDone):
+                self.finished.callback(None)
+            else:
+                self.finished.errback(reason)
+
+
+class LimitedHTTPConnectionPool(HTTPConnectionPool):
+    """A connection pool with an upper limit on open connections."""
+
+    # XXX cjwatson 2016-05-25: This actually only limits active connections,
+    # and doesn't count idle but open connections towards the limit; this is
+    # because it's very difficult to do the latter with HTTPConnectionPool's
+    # current design.  Users of this pool must therefore expect some
+    # additional file descriptors to be open for idle connections.
+
+    def __init__(self, reactor, limit, persistent=True):
+        super(LimitedHTTPConnectionPool, self).__init__(
+            reactor, persistent=persistent)
+        self._semaphore = defer.DeferredSemaphore(limit)
 
+    def getConnection(self, key, endpoint):
+        d = self._semaphore.acquire()
+        d.addCallback(
+            lambda _: super(LimitedHTTPConnectionPool, self).getConnection(
+                key, endpoint))
+        return d
+
+    def _putConnection(self, key, connection):
+        super(LimitedHTTPConnectionPool, self)._putConnection(key, connection)
+        # Only release the semaphore in the next main loop iteration; if we
+        # release it here then the next request may start using this
+        # connection's parser before this request has quite finished with
+        # it.
+        self._reactor.callLater(0, self._semaphore.release)
+
+
+_default_pool = None
 
-def default_threadpool(reactor=None):
-    global _default_threadpool, _default_threadpool_shutdown
+
+def default_pool(reactor=None):
+    global _default_pool
     if reactor is None:
         reactor = default_reactor
-    if _default_threadpool is None:
-        _default_threadpool = ThreadPool(
-            maxthreads=config.builddmaster.download_connections,
-            name=six.ensure_str('buildd-manager-requests'))
-        _default_threadpool.start()
-        shutdown_id = reactor.addSystemEventTrigger(
-            'during', 'shutdown', _default_threadpool.stop)
-        _default_threadpool_shutdown = (reactor, shutdown_id)
-    return _default_threadpool
-
-
-def shut_down_default_threadpool():
-    """Shut down the default threadpool.  Used in test cleanup."""
-    global _default_threadpool, _default_threadpool_shutdown
-    if _default_threadpool is not None:
-        _default_threadpool.stop()
-        _default_threadpool = None
-    if _default_threadpool_shutdown is not None:
-        reactor, shutdown_id = _default_threadpool_shutdown
-        reactor.removeSystemEventTrigger(shutdown_id)
-        _default_threadpool_shutdown = None
+    if _default_pool is None:
+        # Circular import.
+        from lp.buildmaster.manager import SlaveScanner
+        # Short cached connection timeout to avoid potential weirdness with
+        # virtual builders that reboot frequently.
+        _default_pool = LimitedHTTPConnectionPool(
+            reactor, config.builddmaster.download_connections)
+        _default_pool.maxPersistentPerHost = (
+            config.builddmaster.idle_download_connections_per_builder)
+        _default_pool.cachedConnectionTimeout = SlaveScanner.SCAN_INTERVAL
+    return _default_pool
 
 
 class BuilderSlave(object):
@@ -100,8 +157,7 @@ class BuilderSlave(object):
     # many false positives in your test run and will most likely break
     # production.
 
-    def __init__(self, proxy, builder_url, vm_host, timeout, reactor,
-                 threadpool):
+    def __init__(self, proxy, builder_url, vm_host, timeout, reactor, pool):
         """Initialize a BuilderSlave.
 
         :param proxy: An XML-RPC proxy, implementing 'callRemote'. It must
@@ -117,13 +173,13 @@ class BuilderSlave(object):
         if reactor is None:
             reactor = default_reactor
         self.reactor = reactor
-        if threadpool is None:
-            threadpool = default_threadpool(reactor=reactor)
-        self.threadpool = threadpool
+        if pool is None:
+            pool = default_pool(reactor=reactor)
+        self.pool = pool
 
     @classmethod
     def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,
-                         proxy=None, threadpool=None):
+                         proxy=None, pool=None):
         """Create and return a `BuilderSlave`.
 
         :param builder_url: The URL of the slave buildd machine,
@@ -132,7 +188,7 @@ class BuilderSlave(object):
             here.
         :param reactor: Used by tests to override the Twisted reactor.
         :param proxy: Used By tests to override the xmlrpc.Proxy.
-        :param threadpool: Used by tests to override the ThreadPool.
+        :param pool: Used by tests to override the HTTPConnectionPool.
         """
         rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')
         if proxy is None:
@@ -141,8 +197,7 @@ class BuilderSlave(object):
             server_proxy.queryFactory = QuietQueryFactory
         else:
             server_proxy = proxy
-        return cls(
-            server_proxy, builder_url, vm_host, timeout, reactor, threadpool)
+        return cls(server_proxy, builder_url, vm_host, timeout, reactor, pool)
 
     def _with_timeout(self, d, timeout=None):
         return cancel_on_timeout(d, timeout or self.timeout, self.reactor)
@@ -181,7 +236,6 @@ class BuilderSlave(object):
         """Get the URL for a file on the builder with a given SHA-1."""
         return urlappend(self._file_cache_url, sha1).encode('utf8')
 
-    @defer.inlineCallbacks
     def getFile(self, sha_sum, file_to_write, logger=None):
         """Fetch a file from the builder.
 
@@ -194,42 +248,26 @@ class BuilderSlave(object):
             errback with the error string.
         """
         file_url = self.getURL(sha_sum)
+        d = Agent(self.reactor, pool=self.pool).request("GET", file_url)
 
-        def download():
-            session = Session()
-            session.trust_env = False
-            response = session.get(file_url, timeout=self.timeout, stream=True)
-            response.raise_for_status()
-            if isinstance(file_to_write, six.string_types):
-                f = tempfile.NamedTemporaryFile(
-                    mode="wb", prefix=os.path.basename(file_to_write) + "_",
-                    dir=os.path.dirname(file_to_write), delete=False)
-            else:
-                f = file_to_write
-            try:
-                stream.stream_response_to_file(response, path=f)
-            except Exception:
-                f.close()
-                os.unlink(f.name)
-                raise
-            else:
-                f.close()
-                if isinstance(file_to_write, six.string_types):
-                    os.rename(f.name, file_to_write)
+        def got_response(response):
+            finished = defer.Deferred()
+            response.deliverBody(FileWritingProtocol(finished, file_to_write))
+            return finished
 
-        try:
-            session = Session()
-            session.trust_env = False
-            yield threads.deferToThreadPool(
-                self.reactor, self.threadpool, download)
-            if logger is not None:
-                logger.info("Grabbed %s" % file_url)
-        except Exception as e:
-            if logger is not None:
-                logger.info("Failed to grab %s: %s\n%s" % (
-                    file_url, str(e),
-                    " ".join(traceback.format_exception(*sys.exc_info()))))
-            raise
+        def log_success(result):
+            logger.info("Grabbed %s" % file_url)
+            return result
+
+        def log_failure(failure):
+            logger.info("Failed to grab %s: %s\n%s" % (
+                file_url, failure.getErrorMessage(), failure.getTraceback()))
+            return failure
+
+        d.addCallback(got_response)
+        if logger is not None:
+            d.addCallbacks(log_success, log_failure)
+        return d
 
     def getFiles(self, files, logger=None):
         """Fetch many files from the builder.
diff --git a/lib/lp/buildmaster/tests/mock_slaves.py b/lib/lp/buildmaster/tests/mock_slaves.py
index 441be0b..32633ea 100644
--- a/lib/lp/buildmaster/tests/mock_slaves.py
+++ b/lib/lp/buildmaster/tests/mock_slaves.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Mock Build objects for tests soyuz buildd-system."""
@@ -313,14 +313,14 @@ class SlaveTestHelpers(fixtures.Fixture):
                 lambda: open(tachandler.logfile, 'r').readlines()))
         return tachandler
 
-    def getClientSlave(self, reactor=None, proxy=None, threadpool=None):
+    def getClientSlave(self, reactor=None, proxy=None, pool=None):
         """Return a `BuilderSlave` for use in testing.
 
         Points to a fixed URL that is also used by `BuilddSlaveTestSetup`.
         """
         return BuilderSlave.makeBuilderSlave(
             self.base_url, 'vmhost', config.builddmaster.socket_timeout,
-            reactor=reactor, proxy=proxy, threadpool=threadpool)
+            reactor=reactor, proxy=proxy, pool=pool)
 
     def makeCacheFile(self, tachandler, filename, contents=b'something'):
         """Make a cache file available on the remote slave.
diff --git a/lib/lp/buildmaster/tests/test_interactor.py b/lib/lp/buildmaster/tests/test_interactor.py
index 5ace1d6..8514bb5 100644
--- a/lib/lp/buildmaster/tests/test_interactor.py
+++ b/lib/lp/buildmaster/tests/test_interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test BuilderInteractor features."""
@@ -16,8 +16,13 @@ import signal
 import tempfile
 
 from lpbuildd.slave import BuilderStatus
+from lpbuildd.tests.harness import BuilddSlaveTestSetup
 from six.moves import xmlrpc_client
-from testtools.matchers import ContainsAll
+from testtools.matchers import (
+    ContainsAll,
+    HasLength,
+    MatchesDict,
+    )
 from testtools.testcase import ExpectedException
 from testtools.twistedsupport import (
     assert_fails_with,
@@ -26,10 +31,12 @@ from testtools.twistedsupport import (
     SynchronousDeferredRunTest,
     )
 import treq
-from twisted.internet import defer
+from twisted.internet import (
+    defer,
+    reactor as default_reactor,
+    )
 from twisted.internet.task import Clock
 from twisted.python.failure import Failure
-from twisted.python.threadpool import ThreadPool
 from zope.security.proxy import removeSecurityProxy
 
 from lp.buildmaster.enums import (
@@ -42,7 +49,7 @@ from lp.buildmaster.interactor import (
     BuilderInteractor,
     BuilderSlave,
     extract_vitals_from_db,
-    shut_down_default_threadpool,
+    LimitedHTTPConnectionPool,
     )
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonIsolationError,
@@ -116,10 +123,6 @@ class TestBuilderInteractor(TestCase):
 
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=10)
 
-    def setUp(self):
-        super(TestBuilderInteractor, self).setUp()
-        self.addCleanup(shut_down_default_threadpool)
-
     def test_extractBuildStatus_baseline(self):
         # extractBuildStatus picks the name of the build status out of a
         # dict describing the slave's status.
@@ -471,7 +474,6 @@ class TestSlave(TestCase):
     def setUp(self):
         super(TestSlave, self).setUp()
         self.slave_helper = self.useFixture(SlaveTestHelpers())
-        self.addCleanup(shut_down_default_threadpool)
 
     def test_abort(self):
         slave = self.slave_helper.getClientSlave()
@@ -671,11 +673,8 @@ class TestSlaveTimeouts(TestCase):
         self.slave_helper = self.useFixture(SlaveTestHelpers())
         self.clock = Clock()
         self.proxy = DeadProxy("url")
-        threadpool = ThreadPool()
-        threadpool.start()
-        self.addCleanup(threadpool.stop)
         self.slave = self.slave_helper.getClientSlave(
-            reactor=self.clock, proxy=self.proxy, threadpool=threadpool)
+            reactor=self.clock, proxy=self.proxy)
 
     def assertCancelled(self, d, timeout=None):
         self.clock.advance((timeout or config.builddmaster.socket_timeout) + 1)
@@ -727,12 +726,7 @@ class TestSlaveConnectionTimeouts(TestCase):
         # only the config value should.
         self.pushConfig('builddmaster', socket_timeout=180)
 
-        threadpool = ThreadPool()
-        threadpool.start()
-        self.addCleanup(threadpool.stop)
-
-        slave = self.slave_helper.getClientSlave(
-            reactor=self.clock, threadpool=threadpool)
+        slave = self.slave_helper.getClientSlave(reactor=self.clock)
         d = slave.echo()
         # Advance past the 30 second timeout.  The real reactor will
         # never call connectTCP() since we're not spinning it up.  This
@@ -756,7 +750,6 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
     def setUp(self):
         super(TestSlaveWithLibrarian, self).setUp()
         self.slave_helper = self.useFixture(SlaveTestHelpers())
-        self.addCleanup(shut_down_default_threadpool)
 
     def test_ensurepresent_librarian(self):
         # ensurepresent, when given an http URL for a file will download the
@@ -810,6 +803,7 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
             for sha1, local_file in files:
                 with open(local_file) as f:
                     self.assertEqual(content_map[sha1], f.read())
+            return slave.pool.closeCachedConnections()
 
         def finished_uploading(ignored):
             d = slave.getFiles(files)
@@ -837,13 +831,10 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
     def test_getFiles_open_connections(self):
         # getFiles honours the configured limit on active download
         # connections.
-        threadpool = ThreadPool(minthreads=1, maxthreads=2)
-        threadpool.start()
-        self.addCleanup(threadpool.stop)
-
+        pool = LimitedHTTPConnectionPool(default_reactor, 2)
         contents = [self.factory.getUniqueString() for _ in range(10)]
         self.slave_helper.getServerSlave()
-        slave = self.slave_helper.getClientSlave(threadpool=threadpool)
+        slave = self.slave_helper.getClientSlave(pool=pool)
         files = []
         content_map = {}
 
@@ -853,8 +844,12 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
             for sha1, local_file in files:
                 with open(local_file) as f:
                     self.assertEqual(content_map[sha1], f.read())
-            # Only two workers were used.
-            self.assertEqual(2, threadpool.workers)
+            # Only two connections were used.
+            port = BuilddSlaveTestSetup().daemon_port
+            self.assertThat(
+                slave.pool._connections,
+                MatchesDict({("http", "localhost", port): HasLength(2)}))
+            return slave.pool.closeCachedConnections()
 
         def finished_uploading(ignored):
             d = slave.getFiles(files)
@@ -889,6 +884,7 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
         yield slave.getFiles([(lf.content.sha1, os.fdopen(temp_fd, "w"))])
         with open(temp_name) as f:
             self.assertEqual('content', f.read())
+        yield slave.pool.closeCachedConnections()
 
     @defer.inlineCallbacks
     def test_getFiles_with_empty_file(self):
@@ -902,3 +898,4 @@ class TestSlaveWithLibrarian(TestCaseWithFactory):
         yield slave.getFiles([(empty_sha1, temp_name)])
         with open(temp_name) as f:
             self.assertEqual(b'', f.read())
+        yield slave.pool.closeCachedConnections()
diff --git a/lib/lp/buildmaster/tests/test_manager.py b/lib/lp/buildmaster/tests/test_manager.py
index d814bab..491d2d5 100644
--- a/lib/lp/buildmaster/tests/test_manager.py
+++ b/lib/lp/buildmaster/tests/test_manager.py
@@ -2,7 +2,7 @@
 # NOTE: The first line above must stay first; do not move the copyright
 # notice to the top.  See http://www.python.org/dev/peps/pep-0263/.
 #
-# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the renovated slave scanner aka BuilddManager."""
@@ -40,7 +40,6 @@ from lp.buildmaster.interactor import (
     BuilderInteractor,
     BuilderSlave,
     extract_vitals_from_db,
-    shut_down_default_threadpool,
     )
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonIsolationError,
@@ -121,7 +120,6 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         hoary = ubuntu.getSeries('hoary')
         self.test_publisher.setUpDefaultDistroSeries(hoary)
         self.test_publisher.addFakeChroots(db_only=True)
-        self.addCleanup(shut_down_default_threadpool)
 
     def _resetBuilder(self, builder):
         """Reset the given builder and its job."""
@@ -662,10 +660,6 @@ class TestSlaveScannerWithLibrarian(TestCaseWithFactory):
     layer = LaunchpadZopelessLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
 
-    def setUp(self):
-        super(TestSlaveScannerWithLibrarian, self).setUp()
-        self.addCleanup(shut_down_default_threadpool)
-
     @defer.inlineCallbacks
     def test_end_to_end(self):
         # Test that SlaveScanner.scan() successfully finds, dispatches,
@@ -842,10 +836,6 @@ class TestSlaveScannerWithoutDB(TestCase):
 
     run_tests_with = AsynchronousDeferredRunTest
 
-    def setUp(self):
-        super(TestSlaveScannerWithoutDB, self).setUp()
-        self.addCleanup(shut_down_default_threadpool)
-
     def getScanner(self, builder_factory=None, interactor=None, slave=None,
                    behaviour=None):
         if builder_factory is None:
@@ -1038,7 +1028,6 @@ class TestCancellationChecking(TestCaseWithFactory):
         builder_name = BOB_THE_BUILDER_NAME
         self.builder = getUtility(IBuilderSet)[builder_name]
         self.builder.virtualized = True
-        self.addCleanup(shut_down_default_threadpool)
 
     @property
     def vitals(self):
diff --git a/lib/lp/oci/tests/test_ocirecipebuildbehaviour.py b/lib/lp/oci/tests/test_ocirecipebuildbehaviour.py
index ee18740..8c084c1 100644
--- a/lib/lp/oci/tests/test_ocirecipebuildbehaviour.py
+++ b/lib/lp/oci/tests/test_ocirecipebuildbehaviour.py
@@ -41,10 +41,7 @@ from lp.buildmaster.enums import (
     BuildBaseImageType,
     BuildStatus,
     )
-from lp.buildmaster.interactor import (
-    BuilderInteractor,
-    shut_down_default_threadpool,
-    )
+from lp.buildmaster.interactor import BuilderInteractor
 from lp.buildmaster.interfaces.builder import (
     BuildDaemonError,
     CannotBuild,
@@ -114,7 +111,7 @@ class MakeOCIBuildMixin:
         builder.processor = job.build.processor
         slave = self.useFixture(SlaveTestHelpers()).getClientSlave()
         job.setBuilder(builder, slave)
-        self.addCleanup(shut_down_default_threadpool)
+        self.addCleanup(slave.pool.closeCachedConnections)
 
         # Taken from test_archivedependencies.py
         for component_name in ("main", "universe"):
@@ -210,7 +207,7 @@ class TestAsyncOCIRecipeBuildBehaviour(MakeOCIBuildMixin, TestCaseWithFactory):
                         Equals(b"Basic " + base64.b64encode(
                             b"admin-launchpad.test:admin-secret"))]),
                     b"Content-Type": MatchesListwise([
-                        Equals(b"application/json"),
+                        Equals(b"application/json; charset=UTF-8"),
                         ]),
                     }),
                 "content": AfterPreprocessing(json.loads, MatchesDict({
diff --git a/lib/lp/snappy/model/snapbuildbehaviour.py b/lib/lp/snappy/model/snapbuildbehaviour.py
index c34534e..cbcba1d 100644
--- a/lib/lp/snappy/model/snapbuildbehaviour.py
+++ b/lib/lp/snappy/model/snapbuildbehaviour.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2015-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """An `IBuildFarmJobBehaviour` for `SnapBuild`.
@@ -17,15 +17,12 @@ __all__ = [
 import base64
 import time
 
-from requests import Session
 from six.moves.urllib.parse import (
     urlsplit,
     urlunsplit,
     )
-from twisted.internet import (
-    defer,
-    threads,
-    )
+import treq
+from twisted.internet import defer
 from zope.component import adapter
 from zope.interface import implementer
 from zope.security.proxy import removeSecurityProxy
@@ -42,6 +39,7 @@ from lp.registry.interfaces.series import SeriesStatus
 from lp.services.config import config
 from lp.services.features import getFeatureFlag
 from lp.services.twistedsupport import cancel_on_timeout
+from lp.services.twistedsupport.treq import check_status
 from lp.snappy.interfaces.snap import (
     SNAP_SNAPCRAFT_CHANNEL_FEATURE_FLAG,
     SnapBuildArchiveOwnerMismatch,
@@ -103,14 +101,13 @@ class SnapProxyMixin:
         auth_string = '{}:{}'.format(admin_username, secret).strip()
         auth_header = b'Basic ' + base64.b64encode(auth_string)
 
-        session = Session()
-        session.trust_env = False
-        response = yield threads.deferToThreadPool(
-            self._slave.reactor, self._slave.threadpool, session.post,
+        response = yield treq.post(
             url, headers={'Authorization': auth_header},
-            json={'username': proxy_username})
-        response.raise_for_status()
-        token = response.json()
+            json={'username': proxy_username},
+            reactor=self._slave.reactor,
+            pool=self._slave.pool)
+        response = yield check_status(response)
+        token = yield treq.json_content(response)
         defer.returnValue(token)
 
 
diff --git a/lib/lp/snappy/tests/test_snapbuildbehaviour.py b/lib/lp/snappy/tests/test_snapbuildbehaviour.py
index dfee042..f851008 100644
--- a/lib/lp/snappy/tests/test_snapbuildbehaviour.py
+++ b/lib/lp/snappy/tests/test_snapbuildbehaviour.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2015-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test snap package build behaviour."""
@@ -57,7 +57,6 @@ from lp.buildmaster.enums import (
     BuildBaseImageType,
     BuildStatus,
     )
-from lp.buildmaster.interactor import shut_down_default_threadpool
 from lp.buildmaster.interfaces.builder import CannotBuild
 from lp.buildmaster.interfaces.buildfarmjobbehaviour import (
     IBuildFarmJobBehaviour,
@@ -315,7 +314,7 @@ class TestAsyncSnapBuildBehaviour(TestSnapBuildBehaviourBase):
         builder.processor = job.build.processor
         slave = self.useFixture(SlaveTestHelpers()).getClientSlave()
         job.setBuilder(builder, slave)
-        self.addCleanup(shut_down_default_threadpool)
+        self.addCleanup(slave.pool.closeCachedConnections)
         return job
 
     @defer.inlineCallbacks
@@ -357,7 +356,7 @@ class TestAsyncSnapBuildBehaviour(TestSnapBuildBehaviourBase):
                         Equals(b"Basic " + base64.b64encode(
                             b"admin-launchpad.test:admin-secret"))]),
                     b"Content-Type": MatchesListwise([
-                        Equals(b"application/json"),
+                        Equals(b"application/json; charset=UTF-8"),
                         ]),
                     }),
                 "content": AfterPreprocessing(json.loads, MatchesDict({