launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #22615
[Merge] lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad.
Commit message:
Fix handling of CVE database URLs returning data with Content-Encoding: gzip.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/cveimport-requests-handles-gzip/+merge/347465
I broke this in https://code.launchpad.net/~cjwatson/launchpad/cveimport-requests/+merge/347201. We have to be a bit cunning to arrange that the file:// URL test in cve-update.txt still works.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/cveimport-requests-handles-gzip into lp:launchpad.
=== modified file 'lib/lp/bugs/scripts/cveimport.py'
--- lib/lp/bugs/scripts/cveimport.py 2018-05-31 13:14:12 +0000
+++ lib/lp/bugs/scripts/cveimport.py 2018-06-05 17:07:09 +0000
@@ -8,7 +8,7 @@
__metaclass__ = type
import gzip
-import StringIO
+import io
import time
import xml.etree.cElementTree as cElementTree
@@ -198,7 +198,7 @@
self.parser.add_option(
"-u", "--cveurl", dest="cveurl",
default=config.cveupdater.cve_db_url,
- help="The URL for the gzipped XML CVE database.")
+ help="The URL for the XML CVE database.")
def main(self):
self.logger.info('Initializing...')
@@ -209,29 +209,8 @@
raise LaunchpadScriptFailure(
'Unable to open CVE database in %s'
% self.options.cvefile)
-
elif self.options.cveurl is not None:
- self.logger.info("Downloading CVE database from %s..." %
- self.options.cveurl)
- proxies = {}
- if config.launchpad.http_proxy:
- proxies['http'] = config.launchpad.http_proxy
- proxies['https'] = config.launchpad.http_proxy
- try:
- with override_timeout(config.cveupdater.timeout):
- # Command-line options are trusted, so allow file://
- # URLs to ease testing.
- response = urlfetch(
- self.options.cveurl, proxies=proxies, allow_file=True)
- except requests.RequestException:
- raise LaunchpadScriptFailure(
- 'Unable to connect for CVE database %s'
- % self.options.cveurl)
-
- cve_db_gz = response.content
- self.logger.info("%d bytes downloaded." % len(cve_db_gz))
- cve_db = gzip.GzipFile(
- fileobj=StringIO.StringIO(cve_db_gz)).read()
+ cve_db = self.fetchCVEURL(self.options.cveurl)
else:
raise LaunchpadScriptFailure('No CVE database file or URL given.')
@@ -243,6 +222,27 @@
self.logger.info('%d seconds to update database.'
% (finish_time - start_time))
+ def fetchCVEURL(self, url):
+ """Fetch CVE data from a URL, decompressing if necessary."""
+ self.logger.info("Downloading CVE database from %s..." % url)
+ try:
+ with override_timeout(config.cveupdater.timeout):
+ # Command-line options are trusted, so allow file://
+ # URLs to ease testing.
+ response = urlfetch(url, use_proxy=True, allow_file=True)
+ except requests.RequestException:
+ raise LaunchpadScriptFailure(
+ 'Unable to connect for CVE database %s' % url)
+
+ cve_db = response.content
+ self.logger.info("%d bytes downloaded." % len(cve_db))
+ # requests will normally decompress this automatically, but that
+ # might not be the case if we're given a file:// URL to a gzipped
+ # file.
+ if cve_db[:2] == b'\037\213': # gzip magic
+ cve_db = gzip.GzipFile(fileobj=io.BytesIO(cve_db)).read()
+ return cve_db
+
def processCVEXML(self, cve_xml):
"""Process the CVE XML file.
=== added file 'lib/lp/bugs/scripts/tests/test_cveimport.py'
--- lib/lp/bugs/scripts/tests/test_cveimport.py 1970-01-01 00:00:00 +0000
+++ lib/lp/bugs/scripts/tests/test_cveimport.py 2018-06-05 17:07:09 +0000
@@ -0,0 +1,63 @@
+# Copyright 2018 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+import gzip
+import io
+
+import responses
+
+from lp.bugs.scripts.cveimport import CVEUpdater
+from lp.services.log.logger import DevNullLogger
+from lp.testing import TestCase
+
+
+class TestCVEUpdater(TestCase):
+
+ @responses.activate
+ def test_fetch_uncompressed(self):
+ # Fetching a URL returning uncompressed data works.
+ url = 'http://cve.example.com/allitems.xml'
+ body = b'<?xml version="1.0"?>'
+ responses.add(
+ 'GET', url, headers={'Content-Type': 'text/xml'}, body=body)
+ cve_updater = CVEUpdater(
+ 'cve-updater', test_args=[], logger=DevNullLogger())
+ self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+ @responses.activate
+ def test_fetch_content_encoding_gzip(self):
+ # Fetching a URL returning Content-Encoding: gzip works.
+ url = 'http://cve.example.com/allitems.xml.gz'
+ body = b'<?xml version="1.0"?>'
+ gzipped_body_file = io.BytesIO()
+ with gzip.GzipFile(fileobj=gzipped_body_file, mode='wb') as f:
+ f.write(body)
+ responses.add(
+ 'GET', url,
+ headers={
+ 'Content-Type': 'text/xml',
+ 'Content-Encoding': 'gzip',
+ },
+ body=gzipped_body_file.getvalue())
+ cve_updater = CVEUpdater(
+ 'cve-updater', test_args=[], logger=DevNullLogger())
+ self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+ @responses.activate
+ def test_fetch_gzipped(self):
+ # Fetching a URL returning gzipped data without Content-Encoding works.
+ url = 'http://cve.example.com/allitems.xml.gz'
+ body = b'<?xml version="1.0"?>'
+ gzipped_body_file = io.BytesIO()
+ with gzip.GzipFile(fileobj=gzipped_body_file, mode='wb') as f:
+ f.write(body)
+ responses.add(
+ 'GET', url, headers={'Content-Type': 'application/x-gzip'},
+ body=gzipped_body_file.getvalue())
+ cve_updater = CVEUpdater(
+ 'cve-updater', test_args=[], logger=DevNullLogger())
+ self.assertEqual(body, cve_updater.fetchCVEURL(url))
Follow ups