launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32091
[Merge] ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master
Quentin Debhi has proposed merging ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master.
Commit message:
Parse CVE new json format
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ruinedyourlife/launchpad/+git/launchpad/+merge/477375
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master.
diff --git a/lib/lp/bugs/scripts/cveimport.py b/lib/lp/bugs/scripts/cveimport.py
index 464fe58..7c5d0c6 100644
--- a/lib/lp/bugs/scripts/cveimport.py
+++ b/lib/lp/bugs/scripts/cveimport.py
@@ -7,7 +7,12 @@ CVE's are fully registered in Launchpad."""
import gzip
import io
+import os
import time
+import zipfile
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from urllib.parse import urljoin
import defusedxml.cElementTree as cElementTree
import requests
@@ -204,9 +209,361 @@ class CVEUpdater(LaunchpadCronScript):
default=config.cveupdater.cve_db_url,
help="The URL for the XML CVE database.",
)
+ self.parser.add_option(
+ "--baselinecvedir",
+ dest="baselinecvedir",
+ default=None,
+ help="Directory containing CVE JSON files in year/group structure",
+ )
+ self.parser.add_option(
+ "--deltacvedir",
+ dest="deltacvedir",
+ default=None,
+ help="Directory containing delta CVE JSON files (flat structure)",
+ )
+ self.parser.add_option(
+ "-b",
+ "--baseline",
+ dest="baseline",
+ action="store_true",
+ default=False,
+ help="Download baseline full CVE data from GitHub releases",
+ )
+ self.parser.add_option(
+ "-d",
+ "--delta",
+ dest="delta",
+ action="store_true",
+ default=False,
+ help="Download and process hourly delta CVE data from GitHub "
+ "releases",
+ )
+
+ def construct_github_url(self, delta=False):
+ """Construct the GitHub release URL for CVE data.
+
+ :param delta: If True, construct URL for hourly delta, otherwise for
+ daily baseline
+ :return: tuple of (url, year)
+ """
+ now = datetime.now(timezone.utc)
+ date_str = now.strftime("%Y-%m-%d")
+ year = now.strftime("%Y")
+ hour = now.hour
+
+ base_url = config.cveupdater.github_cve_url
+
+ if delta:
+ # If we're the next day, we should try to get yesterday's
+ # end_of_day file
+ if hour == 0:
+ yesterday = now - timedelta(days=1)
+ date_str = yesterday.strftime("%Y-%m-%d")
+ release_tag = f"cve_{date_str}_at_end_of_day"
+ filename = f"{date_str}_delta_CVEs_at_end_of_day.zip"
+ # If we're between 23:00-23:59, use the 2300Z file
+ elif hour == 23:
+ release_tag = f"cve_{date_str}_2300Z"
+ filename = f"{date_str}_delta_CVEs_at_2300Z.zip"
+ else:
+ # For all other hours, use the standard hourly format
+ hour_str = f"{hour:02d}00"
+ release_tag = f"cve_{date_str}_{hour_str}Z"
+ filename = f"{date_str}_delta_CVEs_at_{hour_str}Z.zip"
+ else:
+ release_tag = f"cve_{date_str}_0000Z"
+ filename = f"{date_str}_all_CVEs_at_midnight.zip.zip"
+
+ # Construct the full URL
+ url = urljoin(base_url, f"{release_tag}/{filename}")
+ return url, year
+
+ def process_delta_directory(self, delta_dir):
+ """Process a directory containing delta CVE JSON files.
+
+ Expected structure:
+ deltaCves/
+ CVE-XXXX-XXXX.json
+ CVE-XXXX-YYYY.json
+ ...
+
+ :param delta_dir: Path to the directory containing delta CVE files
+ :return: tuple of (processed_count, error_count)
+ """
+ total_processed = 0
+ total_errors = 0
+
+ delta_path = Path(delta_dir)
+ if not delta_path.exists():
+ raise LaunchpadScriptFailure(
+ f"Delta directory not found: {delta_dir}"
+ )
+
+ # process each CVE JSON file in the delta directory
+ for cve_file in sorted(delta_path.glob("CVE-*.json")):
+ try:
+ with open(cve_file) as f:
+ import json
+
+ cve_data = json.load(f)
+
+ self.logger.debug(f"Processing delta {cve_file.name}")
+ self.processCVEJSON(cve_data)
+ total_processed += 1
+
+ # commit after each CVE to avoid large transactions
+ self.txn.commit()
+
+ except (OSError, json.JSONDecodeError) as e:
+ self.logger.error(
+ f"Error processing delta {cve_file}: {str(e)}"
+ )
+ total_errors += 1
+ continue
+
+ if total_processed % 10 == 0:
+ self.logger.info(
+ f"Processed {total_processed} delta CVE files..."
+ )
+
+ return total_processed, total_errors
+
+ def extract_github_zip(self, zip_content, delta=False):
+ """Extract the GitHub ZIP file to a temporary directory.
+
+ :param zip_content: The downloaded ZIP file content
+ :param delta: If True, expect delta structure, otherwise baseline
+ structure
+ :return: Path to the extracted directory containing CVE files
+ """
+ import shutil
+ import tempfile
+
+ # create a temporary directory
+ temp_dir = tempfile.mkdtemp(prefix="cve_import_")
+
+ try:
+ # write outer zip content to a temporary file
+ outer_zip_path = os.path.join(temp_dir, "downloaded.zip")
+ with open(outer_zip_path, "wb") as f:
+ f.write(zip_content)
+
+ # extract the outer zip file
+ with zipfile.ZipFile(outer_zip_path) as outer_zf:
+ if delta:
+ # for delta, extract deltacves directory
+ members = [
+ m
+ for m in outer_zf.namelist()
+ if m.startswith("deltaCves/")
+ ]
+ outer_zf.extractall(temp_dir, members=members)
+ target_dir = os.path.join(temp_dir, "deltaCves")
+ else:
+ # for baseline, handle nested zip structure
+ outer_zf.extract("cves.zip", temp_dir)
+ inner_zip_path = os.path.join(temp_dir, "cves.zip")
+
+ with zipfile.ZipFile(inner_zip_path) as inner_zf:
+ inner_zf.extractall(temp_dir)
+
+ os.unlink(inner_zip_path)
+ target_dir = os.path.join(temp_dir, "cves")
+
+ os.unlink(outer_zip_path)
+
+ if not os.path.exists(target_dir):
+ raise LaunchpadScriptFailure(
+ f"Expected directory not found in ZIP: {target_dir}"
+ )
+
+ return target_dir
+
+ except Exception as e:
+ # clean up on any error
+ shutil.rmtree(temp_dir)
+ raise LaunchpadScriptFailure(
+ f"Failed to extract ZIP files: {str(e)}"
+ )
+
+ def process_json_directory(self, base_dir):
+ """Process a directory of CVE JSON files organized by year and groups.
+
+ Expected structure:
+ base_dir/
+ 1999/
+ 0xxx/
+ CVE-1999-0001.json
+ ...
+ 1xxx/
+ CVE-1999-1001.json
+ ...
+ 2024/
+ 0xxx/
+ CVE-2024-0001.json
+ ...
+ 1xxx/
+ CVE-2024-1001.json
+ ...
+
+ :param base_dir: Path to the base directory containing year folders
+ """
+ base_path = Path(base_dir)
+ total_processed = 0
+ total_errors = 0
+
+ # process each year directory
+ for year in sorted(base_path.glob("[0-9][0-9][0-9][0-9]")):
+ self.logger.info(f"Processing year {year.name}...")
+
+ # process each group directory (0xxx, 1xxx, etc)
+ for group in sorted(year.glob("[0-9]xxx")):
+ self.logger.info(f"Processing group {group.name}...")
+
+ # process each cve json file
+ for cve_file in sorted(group.glob("CVE-*.json")):
+ try:
+ with open(cve_file) as f:
+ import json
+
+ cve_data = json.load(f)
+
+ self.logger.debug(f"Processing {cve_file.name}")
+ self.processCVEJSON(cve_data)
+ total_processed += 1
+
+ # commit after each cve to avoid large transactions
+ self.txn.commit()
+
+ except (OSError, json.JSONDecodeError) as e:
+ self.logger.error(
+ f"Error processing {cve_file}: {str(e)}"
+ )
+ total_errors += 1
+ continue
+
+ if total_processed % 100 == 0:
+ self.logger.info(
+ f"Processed {total_processed} CVE files..."
+ )
+
+ return total_processed, total_errors
def main(self):
self.logger.info("Initializing...")
+
+ # handle GitHub delta download case
+ if self.options.delta:
+ try:
+ url, _ = self.construct_github_url(delta=True)
+ self.logger.info(
+ f"Downloading delta CVE data from GitHub: {url}"
+ )
+
+ # download the ZIP file
+ response = self.fetchCVEURL(url)
+
+ # extract to temporary directory
+ temp_dir = self.extract_github_zip(response, delta=True)
+
+ try:
+ # process the extracted directory
+ total_processed, total_errors = (
+ self.process_delta_directory(temp_dir)
+ )
+ self.logger.info(
+ f"Processed {total_processed} delta CVE files "
+ f"({total_errors} errors)"
+ )
+ finally:
+ # clean up temporary directory
+ import shutil
+
+ shutil.rmtree(temp_dir)
+
+ return
+
+ except Exception as e:
+ raise LaunchpadScriptFailure(
+ f"Error processing GitHub delta CVE data: {str(e)}"
+ )
+
+ # handle local delta directory case
+ if self.options.deltacvedir is not None:
+ try:
+ start_time = time.time()
+ total_processed, total_errors = self.process_delta_directory(
+ self.options.deltacvedir
+ )
+ finish_time = time.time()
+
+ self.logger.info(
+ f"Processed {total_processed} delta CVE files "
+ f"({total_errors} errors) in "
+ f"{finish_time - start_time:.2f} seconds"
+ )
+ return
+
+ except Exception as e:
+ raise LaunchpadScriptFailure(
+ f"Error processing local delta CVE directory: {str(e)}"
+ )
+
+ # handle GitHub download case
+ if self.options.baseline:
+ try:
+ url, _ = self.construct_github_url()
+
+ # download the ZIP file
+ response = self.fetchCVEURL(url)
+
+ # extract to temporary directory
+ temp_dir = self.extract_github_zip(response)
+
+ try:
+ # process the extracted directory
+ total_processed, total_errors = (
+ self.process_json_directory(temp_dir)
+ )
+ self.logger.info(
+ f"Processed {total_processed} CVE files "
+ f"({total_errors} errors)"
+ )
+ finally:
+ # clean up temporary directory
+ import shutil
+
+ shutil.rmtree(temp_dir)
+
+ return
+
+ except Exception as e:
+ raise LaunchpadScriptFailure(
+ f"Error processing GitHub CVE data: {str(e)}"
+ )
+
+ # handle local JSON directory case
+ if self.options.baselinecvedir is not None:
+ try:
+ start_time = time.time()
+ total_processed, total_errors = self.process_json_directory(
+ self.options.baselinecvedir
+ )
+ finish_time = time.time()
+
+ self.logger.info(
+ f"Processed {total_processed} CVE files "
+ f"({total_errors} errors) in "
+ f"{finish_time - start_time:.2f} seconds"
+ )
+ return
+
+ except Exception as e:
+ raise LaunchpadScriptFailure(
+ f"Error processing JSON CVE directory: {str(e)}"
+ )
+
+ # existing XML handling
if self.options.cvefile is not None:
try:
with open(self.options.cvefile) as f:
@@ -220,7 +577,7 @@ class CVEUpdater(LaunchpadCronScript):
else:
raise LaunchpadScriptFailure("No CVE database file or URL given.")
- # Start analysing the data.
+ # start analysing the data
start_time = time.time()
self.logger.info("Processing CVE XML...")
self.processCVEXML(cve_db)
@@ -234,8 +591,8 @@ class CVEUpdater(LaunchpadCronScript):
self.logger.info("Downloading CVE database from %s..." % url)
try:
with override_timeout(config.cveupdater.timeout):
- # Command-line options are trusted, so allow file://
- # URLs to ease testing.
+ # command-line options are trusted, so allow file://
+ # URLs to ease testing
response = urlfetch(url, use_proxy=True, allow_file=True)
except requests.RequestException:
raise LaunchpadScriptFailure(
@@ -262,8 +619,112 @@ class CVEUpdater(LaunchpadCronScript):
raise LaunchpadScriptFailure("No CVEs found in XML file.")
self.logger.info("Updating database...")
- # We use Looptuner to control the ideal number of CVEs
- # processed in each transaction, during at least 2 seconds.
+ # we use Looptuner to control the ideal number of CVEs
+ # processed in each transaction, during at least 2 seconds
loop = CveUpdaterTunableLoop(items, self.txn, self.logger)
loop_tuner = LoopTuner(loop, 2)
loop_tuner.run()
+
+ def processCVEJSON(self, cve_json):
+ """Process the CVE JSON data.
+
+ :param cve_json: The CVE JSON as a string or dict.
+ """
+ if isinstance(cve_json, str):
+ import json
+
+ data = json.loads(cve_json)
+ else:
+ data = cve_json
+
+ if data.get("dataType") != "CVE_RECORD":
+ raise LaunchpadScriptFailure("Invalid CVE record format")
+
+ # process each CVE record
+ cve_metadata = data.get("cveMetadata", {})
+ containers = data.get("containers", {})
+ cna_data = containers.get("cna", {})
+
+ # get basic CVE information
+ sequence = cve_metadata.get("cveId", "").replace("CVE-", "")
+
+ # get description (required to be in English)
+ description = None
+ for desc in cna_data.get("descriptions", []):
+ if desc.get("lang", "").startswith("en"):
+ description = desc.get("value")
+ break
+
+ if not description:
+ self.logger.debug(f"No description for CVE-{sequence}")
+ return
+
+ # find or create CVE entry
+ cveset = getUtility(ICveSet)
+ cve = cveset[sequence]
+ if cve is None:
+ cve = cveset.new(sequence, description, CveStatus.ENTRY)
+ self.logger.info(f"CVE-{sequence} created")
+
+ # update CVE if needed
+ modified = False
+ if cve.description != description:
+ self.logger.info(f"CVE-{sequence} updated description")
+ cve.description = description
+ modified = True
+
+ # handle references
+ if self._handle_json_references(cna_data.get("references", []), cve):
+ modified = True
+
+ if modified:
+ notify(ObjectModifiedEvent(cve))
+
+ def _handle_json_references(self, references, cve):
+ """Handle references from the JSON format.
+
+ :param references: List of reference objects from JSON
+ :param cve: CVE database object
+ :return: True if references were modified
+ """
+ modified = False
+ old_references = set(cve.references)
+ new_references = set()
+
+ for ref in references:
+ url = ref.get("url")
+ source = "external" # default source
+ content = ref.get("name", "")
+
+ # look for existing reference
+ was_there_previously = False
+ for old_ref in old_references:
+ if (
+ old_ref.url == url
+ and old_ref.source == source
+ and old_ref.content == content
+ ):
+ was_there_previously = True
+ new_references.add(old_ref)
+ break
+
+ if not was_there_previously:
+ self.logger.info(
+ f"Creating new {source} reference for {cve.sequence}"
+ )
+ ref_obj = cve.createReference(source, content, url=url)
+ new_references.add(ref_obj)
+ modified = True
+
+ # remove old references not in new set
+ for ref in sorted(
+ old_references, key=lambda a: (a.source, a.content, a.url)
+ ):
+ if ref not in new_references:
+ self.logger.info(
+ f"Removing {ref.source} reference for {cve.sequence}"
+ )
+ cve.removeReference(ref)
+ modified = True
+
+ return modified
diff --git a/lib/lp/bugs/scripts/tests/test_cveimport.py b/lib/lp/bugs/scripts/tests/test_cveimport.py
index ebac876..3ccda57 100644
--- a/lib/lp/bugs/scripts/tests/test_cveimport.py
+++ b/lib/lp/bugs/scripts/tests/test_cveimport.py
@@ -1,14 +1,24 @@
-# Copyright 2018 Canonical Ltd. This software is licensed under the
+# Copyright 2024 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
import gzip
import io
+import json
+import shutil
+import tempfile
+from datetime import datetime, timezone
+from pathlib import Path
import responses
+from testtools.matchers import Contains
+from zope.component import getUtility
+from lp.bugs.interfaces.cve import CveStatus, ICveSet
from lp.bugs.scripts.cveimport import CVEUpdater
from lp.services.log.logger import DevNullLogger
+from lp.services.scripts.base import LaunchpadScriptFailure
from lp.testing import TestCase
+from lp.testing.layers import LaunchpadZopelessLayer
class TestCVEUpdater(TestCase):
@@ -65,3 +75,157 @@ class TestCVEUpdater(TestCase):
"cve-updater", test_args=[], logger=DevNullLogger()
)
self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super().setUp()
+ self.temp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.temp_dir)
+
+ def create_test_json_cve(
+ self, cve_id="2024-0001", description="Test description"
+ ):
+ """Helper to create a test CVE JSON file"""
+ cve_data = {
+ "dataType": "CVE_RECORD",
+ "cveMetadata": {"cveId": f"CVE-{cve_id}"},
+ "containers": {
+ "cna": {
+ "descriptions": [{"lang": "en", "value": description}],
+ "references": [
+ {
+ "url": "http://example.com/ref1";,
+ "name": "Reference 1",
+ }
+ ],
+ }
+ },
+ }
+ return cve_data
+
+ def make_updater(self, test_args=None):
+ """Helper to create a properly initialized CVEUpdater."""
+ if test_args is None:
+ test_args = []
+ updater = CVEUpdater(
+ "cve-updater", test_args=test_args, logger=DevNullLogger()
+ )
+ # Initialize just the database connection
+ updater._init_db(isolation="read_committed")
+ return updater
+
+ def test_process_json_directory(self):
+ """Test processing a directory of CVE JSON files."""
+ # Create test directory structure
+ base_dir = Path(self.temp_dir) / "cves"
+ year_dir = base_dir / "2024"
+ group_dir = year_dir / "0xxx"
+ group_dir.mkdir(parents=True)
+
+ # Create a test CVE file
+ cve_file = group_dir / "CVE-2024-0001.json"
+ cve_data = self.create_test_json_cve()
+ cve_file.write_text(json.dumps(cve_data))
+
+ # Process the directory using the script infrastructure
+ updater = self.make_updater([str(base_dir)])
+ processed, errors = updater.process_json_directory(str(base_dir))
+
+ # Verify results
+ self.assertEqual(1, processed)
+ self.assertEqual(0, errors)
+
+ # Verify CVE was created
+ cveset = getUtility(ICveSet)
+ cve = cveset["2024-0001"]
+ self.assertIsNotNone(cve)
+ self.assertEqual("Test description", cve.description)
+
+ def test_process_delta_directory(self):
+ """Test processing a directory of delta CVE files."""
+ # Create test delta directory
+ delta_dir = Path(self.temp_dir) / "deltaCves"
+ delta_dir.mkdir()
+
+ # Create a test delta CVE file
+ cve_file = delta_dir / "CVE-2024-0002.json"
+ cve_data = self.create_test_json_cve(
+ cve_id="2024-0002", description="Delta CVE"
+ )
+ cve_file.write_text(json.dumps(cve_data))
+
+ # Process the directory using the script infrastructure
+ updater = self.make_updater([str(delta_dir)])
+ processed, errors = updater.process_delta_directory(str(delta_dir))
+
+ # Verify results
+ self.assertEqual(1, processed)
+ self.assertEqual(0, errors)
+
+ # Verify CVE was created
+ cveset = getUtility(ICveSet)
+ cve = cveset["2024-0002"]
+ self.assertIsNotNone(cve)
+ self.assertEqual("Delta CVE", cve.description)
+
+ def test_construct_github_url(self):
+ """Test GitHub URL construction for different scenarios."""
+ updater = CVEUpdater(
+ "cve-updater", test_args=[], logger=DevNullLogger()
+ )
+
+ # Test baseline URL
+ url, year = updater.construct_github_url(delta=False)
+ expected = "_all_CVEs_at_midnight.zip"
+ self.assertThat(url, Contains(expected))
+ self.assertEqual(datetime.now(timezone.utc).strftime("%Y"), year)
+
+ # Test delta URL (normal hour)
+ url, _ = updater.construct_github_url(delta=True)
+ current_hour = datetime.now(timezone.utc).hour
+ if current_hour not in (0, 23):
+ expected = f"_delta_CVEs_at_{current_hour:02d}00Z.zip"
+ self.assertThat(url, Contains(expected))
+
+ def test_invalid_json_cve(self):
+ """Test handling of invalid CVE JSON data."""
+ updater = CVEUpdater(
+ "cve-updater", test_args=[], logger=DevNullLogger()
+ )
+
+ # Test invalid dataType
+ invalid_data = {
+ "dataType": "INVALID",
+ "cveMetadata": {"cveId": "CVE-2024-0003"},
+ }
+
+ self.assertRaises(
+ LaunchpadScriptFailure, updater.processCVEJSON, invalid_data
+ )
+
+ def test_update_existing_cve(self):
+ """Test updating an existing CVE with new data."""
+ # First create a CVE
+ original_desc = "Original description"
+ cveset = getUtility(ICveSet)
+
+ # Create initial CVE using a properly initialized updater
+ updater = self.make_updater()
+ cveset.new("2024-0004", original_desc, CveStatus.ENTRY)
+ updater.txn.commit()
+
+ # Create updated data
+ new_desc = "Updated description"
+ cve_data = self.create_test_json_cve(
+ cve_id="2024-0004", description=new_desc
+ )
+
+ # Process the update with a fresh updater
+ updater = self.make_updater()
+ updater.processCVEJSON(cve_data)
+ updater.txn.commit()
+
+ # Verify the update
+ updated_cve = cveset["2024-0004"]
+ self.assertEqual(new_desc, updated_cve.description)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index f634d11..0d4b817 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -578,6 +578,9 @@ dbuser: cve
# datatype: string
cve_db_url: https://cve.mitre.org/data/downloads/allitems.xml.gz
+# datatype: string
+github_cve_url: https://github.com/CVEProject/cvelistV5/releases/download/
+
# datatype: integer
timeout: 30