← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/explicit-proxy-product-release-finder into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/explicit-proxy-product-release-finder into lp:launchpad.

Commit message:
Convert the product release finder to urlfetch.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/explicit-proxy-product-release-finder/+merge/348564
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/explicit-proxy-product-release-finder into lp:launchpad.
=== modified file 'lib/lp/registry/scripts/productreleasefinder/finder.py'
--- lib/lp/registry/scripts/productreleasefinder/finder.py	2014-06-03 10:43:24 +0000
+++ lib/lp/registry/scripts/productreleasefinder/finder.py	2018-06-26 20:55:58 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -12,11 +12,12 @@
 import mimetypes
 import os
 import re
-import urllib
+import tempfile
 import urlparse
 
 from cscvs.dircompare import path
 import pytz
+import requests
 from zope.component import getUtility
 
 from lp.app.validators.name import invalid_name_pattern
@@ -32,12 +33,17 @@
 from lp.registry.model.productseries import ProductSeries
 from lp.registry.scripts.productreleasefinder.filter import FilterPattern
 from lp.registry.scripts.productreleasefinder.hose import Hose
+from lp.services.config import config
 from lp.services.database import (
     read_transaction,
     write_transaction,
     )
 from lp.services.database.interfaces import IStore
 from lp.services.librarian.model import LibraryFileAlias
+from lp.services.timeout import (
+    default_timeout,
+    urlfetch,
+    )
 
 
 processors = '|'.join([
@@ -104,8 +110,9 @@
 
     def findReleases(self):
         """Scan for new releases in all products."""
-        for product_name, filters in self.getFilters():
-            self.handleProduct(product_name, filters)
+        with default_timeout(config.productreleasefinder.timeout):
+            for product_name, filters in self.getFilters():
+                self.handleProduct(product_name, filters)
 
     @read_transaction
     def getFilters(self):
@@ -220,21 +227,27 @@
             mimetype = 'application/octet-stream'
 
         self.log.info("Downloading %s", url)
-        try:
-            local, headers = urllib.urlretrieve(url)
-            stat = os.stat(local)
-        except IOError:
-            self.log.error("Download of %s failed", url)
-            raise
-        except OSError:
-            self.log.error("Unable to stat downloaded file")
-            raise
+        with tempfile.TemporaryFile(prefix="product-release-finder") as fp:
+            try:
+                response = urlfetch(
+                    url, trust_env=False, use_proxy=True, output_file=fp)
+                # XXX cjwatson 2018-06-26: This will all change with
+                # requests 3.x.  See:
+                #   https://blog.petrzemek.net/2018/04/22/
+                expected_length = response.headers.get("Content-Length")
+                if expected_length is not None:
+                    actual_length = response.raw.tell()
+                    expected_length = int(expected_length)
+                    if actual_length < expected_length:
+                        raise IOError(
+                            "Incomplete read: got %d, expected %d" %
+                            (actual_length, expected_length))
+            except (IOError, requests.RequestException):
+                self.log.exception("Download of %s failed", url)
+                raise
+            stat = os.fstat(fp.fileno())
+            fp.seek(0)
 
-        try:
-            fp = open(local, 'r')
-            os.unlink(local)
             self.addReleaseTarball(product_name, series_name, version,
                                    filename, stat.st_size, fp, mimetype)
             file_names.add(filename)
-        finally:
-            fp.close()

=== modified file 'lib/lp/registry/tests/test_prf_finder.py'
--- lib/lp/registry/tests/test_prf_finder.py	2018-01-02 10:54:31 +0000
+++ lib/lp/registry/tests/test_prf_finder.py	2018-06-26 20:55:58 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 import logging
@@ -8,6 +8,7 @@
 import tempfile
 import unittest
 
+import responses
 import transaction
 from zope.component import getUtility
 from zope.interface.verify import verifyObject
@@ -219,22 +220,29 @@
     def create_tarball(self, file_name):
         """create a release tarball for testing"""
         file_path = os.path.join(self.release_root, file_name)
-        try:
-            fp = open(file_path, 'w')
+        with open(file_path, 'w') as fp:
             fp.write('foo')
-        finally:
-            fp.close()
         return file_path, file_name
 
+    def add_tarball_response(self, file_path):
+        def callback(request):
+            with open(file_path, 'rb') as f:
+                file_size = os.fstat(f.fileno()).st_size
+                return 200, {'Content-Length': str(file_size)}, f.read()
+
+        url = 'http://example.com/' + file_path.lstrip('/')
+        responses.add_callback('GET', url, callback)
+        return url
+
     def setUp(self):
         switch_dbuser(config.productreleasefinder.dbuser)
         self.release_root = tempfile.mkdtemp()
-        self.release_url = 'file://' + self.release_root
 
     def tearDown(self):
         shutil.rmtree(self.release_root, ignore_errors=True)
         reset_logging()
 
+    @responses.activate
     def test_handleRelease(self):
         ztm = self.layer.txn
         logging.basicConfig(level=logging.CRITICAL)
@@ -243,7 +251,8 @@
         file_path, file_name = self.create_tarball(
             'evolution-42.0.orig.tar.gz')
         file_names = set()
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
         self.assertTrue(file_name in file_names)
         self.assertFalse(alt_file_name in file_names)
 
@@ -271,6 +280,7 @@
             bound = field.bind(fileinfo)
             bound.validate(bound.get(fileinfo))
 
+    @responses.activate
     def test_handleReleaseWithExistingRelease(self):
         # Test that handleRelease() can add a file release to an
         # existing ProductRelease.
@@ -289,7 +299,8 @@
         prf = ProductReleaseFinder(ztm, logging.getLogger())
         file_path, file_name = self.create_tarball('evolution-2.1.6.tar.gz')
         file_names = prf.getReleaseFileNames('evolution')
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
 
         # verify that we now have files attached to the release:
         evo = getUtility(IProductSet).getByName('evolution')
@@ -297,6 +308,7 @@
         release = trunk.getRelease('2.1.6')
         self.assertEqual(release.files.count(), 1)
 
+    @responses.activate
     def test_handleReleaseTwice(self):
         # Test that handleRelease() handles the case where a tarball
         # has already been attached to the ProductRelease.  We do this
@@ -306,13 +318,15 @@
         prf = ProductReleaseFinder(ztm, logging.getLogger())
         file_path, file_name = self.create_tarball('evolution-42.0.tar.gz')
         file_names = prf.getReleaseFileNames('evolution')
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
         evo = getUtility(IProductSet).getByName('evolution')
         trunk = evo.getSeries('trunk')
         release = trunk.getRelease('42.0')
         self.assertEqual(release.files.count(), 1)
 
+    @responses.activate
     def test_handleReleaseTwice_multiple_series(self):
         # Series can have overlaping release file globs, but versions
         # are unique to a project. A file is uploaded to a release only
@@ -322,14 +336,17 @@
         prf = ProductReleaseFinder(ztm, logging.getLogger())
         file_path, file_name = self.create_tarball('evolution-1.2.3.tar.gz')
         file_names = prf.getReleaseFileNames('evolution')
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
         file_path, file_name = self.create_tarball('evolution-1.2.3.tar.gz')
-        prf.handleRelease('evolution', '1.0', file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', '1.0', url, file_names)
         product = getUtility(IProductSet).getByName('evolution')
         release = product.getMilestone('1.2.3').product_release
         self.assertEqual(release.files.count(), 1)
 
-    def test_handleRelease_alternate_verstion(self):
+    @responses.activate
+    def test_handleRelease_alternate_version(self):
         """Verify that tar.gz and tar.bz2 versions are both uploaded."""
         ztm = self.layer.txn
         logging.basicConfig(level=logging.CRITICAL)
@@ -338,8 +355,10 @@
         alt_file_path, alt_file_name = self.create_tarball(
             'evolution-45.0.tar.bz2')
         file_names = prf.getReleaseFileNames('evolution')
-        prf.handleRelease('evolution', 'trunk', file_path, file_names)
-        prf.handleRelease('evolution', 'trunk', alt_file_path, file_names)
+        url = self.add_tarball_response(file_path)
+        prf.handleRelease('evolution', 'trunk', url, file_names)
+        alt_url = self.add_tarball_response(alt_file_path)
+        prf.handleRelease('evolution', 'trunk', alt_url, file_names)
         evo = getUtility(IProductSet).getByName('evolution')
         trunk = evo.getSeries('trunk')
         release = trunk.getRelease('45.0')
@@ -365,12 +384,38 @@
         fp.write('foo')
         fp.close()
 
-        url = self.release_url + '/evolution420.tar.gz'
+        url = 'file://' + self.release_root + '/evolution420.tar.gz'
         file_names = prf.getReleaseFileNames('evolution')
         prf.handleRelease('evolution', 'trunk', url, file_names)
         self.assertEqual(
             "Unable to parse version from %s\n" % url, output.getvalue())
 
+    @responses.activate
+    def test_handleRelease_short_response(self):
+        # handleRelease raises an exception on short responses.
+        ztm = self.layer.txn
+        logging.basicConfig(level=logging.CRITICAL)
+        prf = ProductReleaseFinder(ztm, logging.getLogger())
+        file_path, file_name = self.create_tarball(
+            'evolution-42.0.orig.tar.gz')
+        file_size = os.stat(file_path).st_size
+        file_names = set()
+
+        def callback(request):
+            with open(file_path, 'rb') as f:
+                file_size = os.fstat(f.fileno()).st_size
+                return 200, {'Content-Length': str(file_size + 1)}, f.read()
+
+        url = 'http://example.com/' + file_path.lstrip('/')
+        responses.add_callback('GET', url, callback)
+        with self.assertRaises(IOError) as ctx:
+            prf.handleRelease('evolution', 'trunk', url, file_names)
+        self.assertEqual(
+            'Incomplete read: got %d, expected %d' % (
+                file_size, file_size + 1),
+            str(ctx.exception))
+        self.assertNotIn(file_name, file_names)
+
 
 class ExtractVersionTestCase(unittest.TestCase):
     """Verify that release version names are correctly extracted."""

=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2018-06-26 14:50:04 +0000
+++ lib/lp/services/config/schema-lazr.conf	2018-06-26 20:55:58 +0000
@@ -1540,6 +1540,11 @@
 # datatype: string
 dbuser: productreleasefinder
 
+# The time in seconds that the product release finder will allow for
+# downloading release tarballs.
+# datatype: integer
+timeout: 300
+
 
 [profiling]
 # When set to True, the user is allowed to request profiles be run for

=== modified file 'lib/lp/services/tests/test_timeout.py'
--- lib/lp/services/tests/test_timeout.py	2018-06-05 01:50:30 +0000
+++ lib/lp/services/tests/test_timeout.py	2018-06-26 20:55:58 +0000
@@ -430,6 +430,30 @@
             headers=ContainsDict({'Content-Length': Equals(8)}),
             content=Equals('Success.')))
 
+    def test_urlfetch_writes_to_output_file(self):
+        """If given an output_file, urlfetch writes to it."""
+        sock, http_server_url = self.make_test_socket()
+        sock.listen(1)
+
+        def success_result():
+            (client_sock, client_addr) = sock.accept()
+            client_sock.sendall(dedent("""\
+                HTTP/1.0 200 Ok
+                Content-Type: text/plain
+                Content-Length: 8
+
+                Success."""))
+            client_sock.close()
+
+        t = threading.Thread(target=success_result)
+        t.start()
+        output_path = self.useFixture(TempDir()).join('out')
+        with open(output_path, 'wb+') as f:
+            urlfetch(http_server_url, output_file=f)
+            f.seek(0)
+            self.assertEqual(b'Success.', f.read())
+        t.join()
+
     def test_xmlrpc_transport(self):
         """ Another use case for timeouts is communicating with external
         systems using XMLRPC.  In order to allow timeouts using XMLRPC we

=== modified file 'lib/lp/services/timeout.py'
--- lib/lp/services/timeout.py	2018-06-22 22:07:42 +0000
+++ lib/lp/services/timeout.py	2018-06-26 20:55:58 +0000
@@ -41,6 +41,7 @@
 from requests.packages.urllib3.exceptions import ClosedPoolError
 from requests.packages.urllib3.poolmanager import PoolManager
 from requests_file import FileAdapter
+from requests_toolbelt.downloadutils import stream
 from six import reraise
 
 from lp.services.config import config
@@ -324,7 +325,7 @@
 
     @with_timeout(cleanup='cleanup')
     def fetch(self, url, trust_env=None, use_proxy=False, allow_file=False,
-              **request_kwargs):
+              output_file=None, **request_kwargs):
         """Fetch the URL using a custom HTTP handler supporting timeout.
 
         :param url: The URL to fetch.
@@ -334,6 +335,8 @@
         :param use_proxy: If True, use Launchpad's configured proxy.
         :param allow_file: If True, allow file:// URLs.  (Be careful to only
             pass this if the URL is trusted.)
+        :param output_file: If not None, download the response content to
+            this file object or path.
         :param request_kwargs: Additional keyword arguments passed on to
             `Session.request`.
         """
@@ -351,10 +354,16 @@
             request_kwargs.setdefault("proxies", {})
             request_kwargs["proxies"]["http"] = config.launchpad.http_proxy
             request_kwargs["proxies"]["https"] = config.launchpad.http_proxy
+        if output_file is not None:
+            request_kwargs["stream"] = True
         response = self.session.request(url=url, **request_kwargs)
         response.raise_for_status()
-        # Make sure the content has been consumed before returning.
-        response.content
+        if output_file is None:
+            # Make sure the content has been consumed before returning.
+            response.content
+        else:
+            # Download the content to the given file.
+            stream.stream_response_to_file(response, path=output_file)
         # The responses library doesn't persist cookies in the session
         # (https://github.com/getsentry/responses/issues/80).  Work around
         # this.


Follow ups