← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad.

Commit message:
Fix handling of CVE database URLs returning data with Content-Encoding: gzip.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/cveimport-requests-handles-gzip/+merge/347465

I broke this in https://code.launchpad.net/~cjwatson/launchpad/cveimport-requests/+merge/347201.  We have to be a bit cunning to arrange that the file:// URL test in cve-update.txt still works.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad.
=== modified file 'lib/lp/bugs/scripts/cveimport.py'
--- lib/lp/bugs/scripts/cveimport.py	2018-05-31 13:14:12 +0000
+++ lib/lp/bugs/scripts/cveimport.py	2018-06-05 17:07:09 +0000
@@ -8,7 +8,7 @@
 __metaclass__ = type
 
 import gzip
-import StringIO
+import io
 import time
 import xml.etree.cElementTree as cElementTree
 
@@ -198,7 +198,7 @@
         self.parser.add_option(
             "-u", "--cveurl", dest="cveurl",
             default=config.cveupdater.cve_db_url,
-            help="The URL for the gzipped XML CVE database.")
+            help="The URL for the XML CVE database.")
 
     def main(self):
         self.logger.info('Initializing...')
@@ -209,29 +209,8 @@
                 raise LaunchpadScriptFailure(
                     'Unable to open CVE database in %s'
                     % self.options.cvefile)
-
         elif self.options.cveurl is not None:
-            self.logger.info("Downloading CVE database from %s..." %
-                             self.options.cveurl)
-            proxies = {}
-            if config.launchpad.http_proxy:
-                proxies['http'] = config.launchpad.http_proxy
-                proxies['https'] = config.launchpad.http_proxy
-            try:
-                with override_timeout(config.cveupdater.timeout):
-                    # Command-line options are trusted, so allow file://
-                    # URLs to ease testing.
-                    response = urlfetch(
-                        self.options.cveurl, proxies=proxies, allow_file=True)
-            except requests.RequestException:
-                raise LaunchpadScriptFailure(
-                    'Unable to connect for CVE database %s'
-                    % self.options.cveurl)
-
-            cve_db_gz = response.content
-            self.logger.info("%d bytes downloaded." % len(cve_db_gz))
-            cve_db = gzip.GzipFile(
-                fileobj=StringIO.StringIO(cve_db_gz)).read()
+            cve_db = self.fetchCVEURL(self.options.cveurl)
         else:
             raise LaunchpadScriptFailure('No CVE database file or URL given.')
 
@@ -243,6 +222,27 @@
         self.logger.info('%d seconds to update database.'
                 % (finish_time - start_time))
 
+    def fetchCVEURL(self, url):
+        """Fetch CVE data from a URL, decompressing if necessary."""
+        self.logger.info("Downloading CVE database from %s..." % url)
+        try:
+            with override_timeout(config.cveupdater.timeout):
+                # Command-line options are trusted, so allow file://
+                # URLs to ease testing.
+                response = urlfetch(url, use_proxy=True, allow_file=True)
+        except requests.RequestException:
+            raise LaunchpadScriptFailure(
+                'Unable to connect for CVE database %s' % url)
+
+        cve_db = response.content
+        self.logger.info("%d bytes downloaded." % len(cve_db))
+        # requests will normally decompress this automatically, but that
+        # might not be the case if we're given a file:// URL to a gzipped
+        # file.
+        if cve_db[:2] == b'\037\213':  # gzip magic
+            cve_db = gzip.GzipFile(fileobj=io.BytesIO(cve_db)).read()
+        return cve_db
+
     def processCVEXML(self, cve_xml):
         """Process the CVE XML file.
 

=== added file 'lib/lp/bugs/scripts/tests/test_cveimport.py'
--- lib/lp/bugs/scripts/tests/test_cveimport.py	1970-01-01 00:00:00 +0000
+++ lib/lp/bugs/scripts/tests/test_cveimport.py	2018-06-05 17:07:09 +0000
@@ -0,0 +1,63 @@
+# Copyright 2018 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+import gzip
+import io
+
+import responses
+
+from lp.bugs.scripts.cveimport import CVEUpdater
+from lp.services.log.logger import DevNullLogger
+from lp.testing import TestCase
+
+
+class TestCVEUpdater(TestCase):
+
+    @responses.activate
+    def test_fetch_uncompressed(self):
+        # Fetching a URL returning uncompressed data works.
+        url = 'http://cve.example.com/allitems.xml'
+        body = b'<?xml version="1.0"?>'
+        responses.add(
+            'GET', url, headers={'Content-Type': 'text/xml'}, body=body)
+        cve_updater = CVEUpdater(
+            'cve-updater', test_args=[], logger=DevNullLogger())
+        self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+    @responses.activate
+    def test_fetch_content_encoding_gzip(self):
+        # Fetching a URL returning Content-Encoding: gzip works.
+        url = 'http://cve.example.com/allitems.xml.gz'
+        body = b'<?xml version="1.0"?>'
+        gzipped_body_file = io.BytesIO()
+        with gzip.GzipFile(fileobj=gzipped_body_file, mode='wb') as f:
+            f.write(body)
+        responses.add(
+            'GET', url,
+            headers={
+                'Content-Type': 'text/xml',
+                'Content-Encoding': 'gzip',
+            },
+            body=gzipped_body_file.getvalue())
+        cve_updater = CVEUpdater(
+            'cve-updater', test_args=[], logger=DevNullLogger())
+        self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+    @responses.activate
+    def test_fetch_gzipped(self):
+        # Fetching a URL returning gzipped data without Content-Encoding works.
+        url = 'http://cve.example.com/allitems.xml.gz'
+        body = b'<?xml version="1.0"?>'
+        gzipped_body_file = io.BytesIO()
+        with gzip.GzipFile(fileobj=gzipped_body_file, mode='wb') as f:
+            f.write(body)
+        responses.add(
+            'GET', url, headers={'Content-Type': 'application/x-gzip'},
+            body=gzipped_body_file.getvalue())
+        cve_updater = CVEUpdater(
+            'cve-updater', test_args=[], logger=DevNullLogger())
+        self.assertEqual(body, cve_updater.fetchCVEURL(url))


Follow ups