← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master

 

Quentin Debhi has proposed merging ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master.

Commit message:
Parse CVE new json format

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~ruinedyourlife/launchpad/+git/launchpad/+merge/477375
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~ruinedyourlife/launchpad:json-cve-parser into launchpad:master.
diff --git a/lib/lp/bugs/scripts/cveimport.py b/lib/lp/bugs/scripts/cveimport.py
index 464fe58..7c5d0c6 100644
--- a/lib/lp/bugs/scripts/cveimport.py
+++ b/lib/lp/bugs/scripts/cveimport.py
@@ -7,7 +7,12 @@ CVE's are fully registered in Launchpad."""
 
 import gzip
 import io
+import os
 import time
+import zipfile
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from urllib.parse import urljoin
 
 import defusedxml.cElementTree as cElementTree
 import requests
@@ -204,9 +209,361 @@ class CVEUpdater(LaunchpadCronScript):
             default=config.cveupdater.cve_db_url,
             help="The URL for the XML CVE database.",
         )
+        self.parser.add_option(
+            "--baselinecvedir",
+            dest="baselinecvedir",
+            default=None,
+            help="Directory containing CVE JSON files in year/group structure",
+        )
+        self.parser.add_option(
+            "--deltacvedir",
+            dest="deltacvedir",
+            default=None,
+            help="Directory containing delta CVE JSON files (flat structure)",
+        )
+        self.parser.add_option(
+            "-b",
+            "--baseline",
+            dest="baseline",
+            action="store_true",
+            default=False,
+            help="Download baseline full CVE data from GitHub releases",
+        )
+        self.parser.add_option(
+            "-d",
+            "--delta",
+            dest="delta",
+            action="store_true",
+            default=False,
+            help="Download and process hourly delta CVE data from GitHub "
+            "releases",
+        )
+
+    def construct_github_url(self, delta=False):
+        """Construct the GitHub release URL for CVE data.
+
+        :param delta: If True, construct URL for hourly delta, otherwise for
+            daily baseline
+        :return: tuple of (url, year)
+        """
+        now = datetime.now(timezone.utc)
+        date_str = now.strftime("%Y-%m-%d")
+        year = now.strftime("%Y")
+        hour = now.hour
+
+        base_url = config.cveupdater.github_cve_url
+
+        if delta:
+            # If we're the next day, we should try to get yesterday's
+            # end_of_day file
+            if hour == 0:
+                yesterday = now - timedelta(days=1)
+                date_str = yesterday.strftime("%Y-%m-%d")
+                release_tag = f"cve_{date_str}_at_end_of_day"
+                filename = f"{date_str}_delta_CVEs_at_end_of_day.zip"
+            # If we're between 23:00-23:59, use the 2300Z file
+            elif hour == 23:
+                release_tag = f"cve_{date_str}_2300Z"
+                filename = f"{date_str}_delta_CVEs_at_2300Z.zip"
+            else:
+                # For all other hours, use the standard hourly format
+                hour_str = f"{hour:02d}00"
+                release_tag = f"cve_{date_str}_{hour_str}Z"
+                filename = f"{date_str}_delta_CVEs_at_{hour_str}Z.zip"
+        else:
+            release_tag = f"cve_{date_str}_0000Z"
+            filename = f"{date_str}_all_CVEs_at_midnight.zip.zip"
+
+        # Construct the full URL
+        url = urljoin(base_url, f"{release_tag}/{filename}")
+        return url, year
+
+    def process_delta_directory(self, delta_dir):
+        """Process a directory containing delta CVE JSON files.
+
+        Expected structure:
+        deltaCves/
+            CVE-XXXX-XXXX.json
+            CVE-XXXX-YYYY.json
+            ...
+
+        :param delta_dir: Path to the directory containing delta CVE files
+        :return: tuple of (processed_count, error_count)
+        """
+        total_processed = 0
+        total_errors = 0
+
+        delta_path = Path(delta_dir)
+        if not delta_path.exists():
+            raise LaunchpadScriptFailure(
+                f"Delta directory not found: {delta_dir}"
+            )
+
+        # process each CVE JSON file in the delta directory
+        for cve_file in sorted(delta_path.glob("CVE-*.json")):
+            try:
+                with open(cve_file) as f:
+                    import json
+
+                    cve_data = json.load(f)
+
+                self.logger.debug(f"Processing delta {cve_file.name}")
+                self.processCVEJSON(cve_data)
+                total_processed += 1
+
+                # commit after each CVE to avoid large transactions
+                self.txn.commit()
+
+            except (OSError, json.JSONDecodeError) as e:
+                self.logger.error(
+                    f"Error processing delta {cve_file}: {str(e)}"
+                )
+                total_errors += 1
+                continue
+
+            if total_processed % 10 == 0:
+                self.logger.info(
+                    f"Processed {total_processed} delta CVE files..."
+                )
+
+        return total_processed, total_errors
+
+    def extract_github_zip(self, zip_content, delta=False):
+        """Extract the GitHub ZIP file to a temporary directory.
+
+        :param zip_content: The downloaded ZIP file content
+        :param delta: If True, expect delta structure, otherwise baseline
+            structure
+        :return: Path to the extracted directory containing CVE files
+        """
+        import shutil
+        import tempfile
+
+        # create a temporary directory
+        temp_dir = tempfile.mkdtemp(prefix="cve_import_")
+
+        try:
+            # write outer zip content to a temporary file
+            outer_zip_path = os.path.join(temp_dir, "downloaded.zip")
+            with open(outer_zip_path, "wb") as f:
+                f.write(zip_content)
+
+            # extract the outer zip file
+            with zipfile.ZipFile(outer_zip_path) as outer_zf:
+                if delta:
+                    # for delta, extract deltacves directory
+                    members = [
+                        m
+                        for m in outer_zf.namelist()
+                        if m.startswith("deltaCves/")
+                    ]
+                    outer_zf.extractall(temp_dir, members=members)
+                    target_dir = os.path.join(temp_dir, "deltaCves")
+                else:
+                    # for baseline, handle nested zip structure
+                    outer_zf.extract("cves.zip", temp_dir)
+                    inner_zip_path = os.path.join(temp_dir, "cves.zip")
+
+                    with zipfile.ZipFile(inner_zip_path) as inner_zf:
+                        inner_zf.extractall(temp_dir)
+
+                    os.unlink(inner_zip_path)
+                    target_dir = os.path.join(temp_dir, "cves")
+
+            os.unlink(outer_zip_path)
+
+            if not os.path.exists(target_dir):
+                raise LaunchpadScriptFailure(
+                    f"Expected directory not found in ZIP: {target_dir}"
+                )
+
+            return target_dir
+
+        except Exception as e:
+            # clean up on any error
+            shutil.rmtree(temp_dir)
+            raise LaunchpadScriptFailure(
+                f"Failed to extract ZIP files: {str(e)}"
+            )
+
+    def process_json_directory(self, base_dir):
+        """Process a directory of CVE JSON files organized by year and groups.
+
+        Expected structure:
+        base_dir/
+            1999/
+                0xxx/
+                    CVE-1999-0001.json
+                    ...
+                1xxx/
+                    CVE-1999-1001.json
+                    ...
+            2024/
+                0xxx/
+                    CVE-2024-0001.json
+                    ...
+                1xxx/
+                    CVE-2024-1001.json
+                    ...
+
+        :param base_dir: Path to the base directory containing year folders
+        """
+        base_path = Path(base_dir)
+        total_processed = 0
+        total_errors = 0
+
+        # process each year directory
+        for year in sorted(base_path.glob("[0-9][0-9][0-9][0-9]")):
+            self.logger.info(f"Processing year {year.name}...")
+
+            # process each group directory (0xxx, 1xxx, etc)
+            for group in sorted(year.glob("[0-9]xxx")):
+                self.logger.info(f"Processing group {group.name}...")
+
+                # process each cve json file
+                for cve_file in sorted(group.glob("CVE-*.json")):
+                    try:
+                        with open(cve_file) as f:
+                            import json
+
+                            cve_data = json.load(f)
+
+                        self.logger.debug(f"Processing {cve_file.name}")
+                        self.processCVEJSON(cve_data)
+                        total_processed += 1
+
+                        # commit after each cve to avoid large transactions
+                        self.txn.commit()
+
+                    except (OSError, json.JSONDecodeError) as e:
+                        self.logger.error(
+                            f"Error processing {cve_file}: {str(e)}"
+                        )
+                        total_errors += 1
+                        continue
+
+                    if total_processed % 100 == 0:
+                        self.logger.info(
+                            f"Processed {total_processed} CVE files..."
+                        )
+
+        return total_processed, total_errors
 
     def main(self):
         self.logger.info("Initializing...")
+
+        # handle GitHub delta download case
+        if self.options.delta:
+            try:
+                url, _ = self.construct_github_url(delta=True)
+                self.logger.info(
+                    f"Downloading delta CVE data from GitHub: {url}"
+                )
+
+                # download the ZIP file
+                response = self.fetchCVEURL(url)
+
+                # extract to temporary directory
+                temp_dir = self.extract_github_zip(response, delta=True)
+
+                try:
+                    # process the extracted directory
+                    total_processed, total_errors = (
+                        self.process_delta_directory(temp_dir)
+                    )
+                    self.logger.info(
+                        f"Processed {total_processed} delta CVE files "
+                        f"({total_errors} errors)"
+                    )
+                finally:
+                    # clean up temporary directory
+                    import shutil
+
+                    shutil.rmtree(temp_dir)
+
+                return
+
+            except Exception as e:
+                raise LaunchpadScriptFailure(
+                    f"Error processing GitHub delta CVE data: {str(e)}"
+                )
+
+        # handle local delta directory case
+        if self.options.deltacvedir is not None:
+            try:
+                start_time = time.time()
+                total_processed, total_errors = self.process_delta_directory(
+                    self.options.deltacvedir
+                )
+                finish_time = time.time()
+
+                self.logger.info(
+                    f"Processed {total_processed} delta CVE files "
+                    f"({total_errors} errors) in "
+                    f"{finish_time - start_time:.2f} seconds"
+                )
+                return
+
+            except Exception as e:
+                raise LaunchpadScriptFailure(
+                    f"Error processing local delta CVE directory: {str(e)}"
+                )
+
+        # handle GitHub download case
+        if self.options.baseline:
+            try:
+                url, _ = self.construct_github_url()
+
+                # download the ZIP file
+                response = self.fetchCVEURL(url)
+
+                # extract to temporary directory
+                temp_dir = self.extract_github_zip(response)
+
+                try:
+                    # process the extracted directory
+                    total_processed, total_errors = (
+                        self.process_json_directory(temp_dir)
+                    )
+                    self.logger.info(
+                        f"Processed {total_processed} CVE files "
+                        f"({total_errors} errors)"
+                    )
+                finally:
+                    # clean up temporary directory
+                    import shutil
+
+                    shutil.rmtree(temp_dir)
+
+                return
+
+            except Exception as e:
+                raise LaunchpadScriptFailure(
+                    f"Error processing GitHub CVE data: {str(e)}"
+                )
+
+        # handle local JSON directory case
+        if self.options.baselinecvedir is not None:
+            try:
+                start_time = time.time()
+                total_processed, total_errors = self.process_json_directory(
+                    self.options.baselinecvedir
+                )
+                finish_time = time.time()
+
+                self.logger.info(
+                    f"Processed {total_processed} CVE files "
+                    f"({total_errors} errors) in "
+                    f"{finish_time - start_time:.2f} seconds"
+                )
+                return
+
+            except Exception as e:
+                raise LaunchpadScriptFailure(
+                    f"Error processing JSON CVE directory: {str(e)}"
+                )
+
+        # existing XML handling
         if self.options.cvefile is not None:
             try:
                 with open(self.options.cvefile) as f:
@@ -220,7 +577,7 @@ class CVEUpdater(LaunchpadCronScript):
         else:
             raise LaunchpadScriptFailure("No CVE database file or URL given.")
 
-        # Start analysing the data.
+        # start analysing the data
         start_time = time.time()
         self.logger.info("Processing CVE XML...")
         self.processCVEXML(cve_db)
@@ -234,8 +591,8 @@ class CVEUpdater(LaunchpadCronScript):
         self.logger.info("Downloading CVE database from %s..." % url)
         try:
             with override_timeout(config.cveupdater.timeout):
-                # Command-line options are trusted, so allow file://
-                # URLs to ease testing.
+                # command-line options are trusted, so allow file://
+                # URLs to ease testing
                 response = urlfetch(url, use_proxy=True, allow_file=True)
         except requests.RequestException:
             raise LaunchpadScriptFailure(
@@ -262,8 +619,112 @@ class CVEUpdater(LaunchpadCronScript):
             raise LaunchpadScriptFailure("No CVEs found in XML file.")
         self.logger.info("Updating database...")
 
-        # We use Looptuner to control the ideal number of CVEs
-        # processed in each transaction, during at least 2 seconds.
+        # we use Looptuner to control the ideal number of CVEs
+        # processed in each transaction, during at least 2 seconds
         loop = CveUpdaterTunableLoop(items, self.txn, self.logger)
         loop_tuner = LoopTuner(loop, 2)
         loop_tuner.run()
+
+    def processCVEJSON(self, cve_json):
+        """Process the CVE JSON data.
+
+        :param cve_json: The CVE JSON as a string or dict.
+        """
+        if isinstance(cve_json, str):
+            import json
+
+            data = json.loads(cve_json)
+        else:
+            data = cve_json
+
+        if data.get("dataType") != "CVE_RECORD":
+            raise LaunchpadScriptFailure("Invalid CVE record format")
+
+        # process each CVE record
+        cve_metadata = data.get("cveMetadata", {})
+        containers = data.get("containers", {})
+        cna_data = containers.get("cna", {})
+
+        # get basic CVE information
+        sequence = cve_metadata.get("cveId", "").replace("CVE-", "")
+
+        # get description (required to be in English)
+        description = None
+        for desc in cna_data.get("descriptions", []):
+            if desc.get("lang", "").startswith("en"):
+                description = desc.get("value")
+                break
+
+        if not description:
+            self.logger.debug(f"No description for CVE-{sequence}")
+            return
+
+        # find or create CVE entry
+        cveset = getUtility(ICveSet)
+        cve = cveset[sequence]
+        if cve is None:
+            cve = cveset.new(sequence, description, CveStatus.ENTRY)
+            self.logger.info(f"CVE-{sequence} created")
+
+        # update CVE if needed
+        modified = False
+        if cve.description != description:
+            self.logger.info(f"CVE-{sequence} updated description")
+            cve.description = description
+            modified = True
+
+        # handle references
+        if self._handle_json_references(cna_data.get("references", []), cve):
+            modified = True
+
+        if modified:
+            notify(ObjectModifiedEvent(cve))
+
+    def _handle_json_references(self, references, cve):
+        """Handle references from the JSON format.
+
+        :param references: List of reference objects from JSON
+        :param cve: CVE database object
+        :return: True if references were modified
+        """
+        modified = False
+        old_references = set(cve.references)
+        new_references = set()
+
+        for ref in references:
+            url = ref.get("url")
+            source = "external"  # default source
+            content = ref.get("name", "")
+
+            # look for existing reference
+            was_there_previously = False
+            for old_ref in old_references:
+                if (
+                    old_ref.url == url
+                    and old_ref.source == source
+                    and old_ref.content == content
+                ):
+                    was_there_previously = True
+                    new_references.add(old_ref)
+                    break
+
+            if not was_there_previously:
+                self.logger.info(
+                    f"Creating new {source} reference for {cve.sequence}"
+                )
+                ref_obj = cve.createReference(source, content, url=url)
+                new_references.add(ref_obj)
+                modified = True
+
+        # remove old references not in new set
+        for ref in sorted(
+            old_references, key=lambda a: (a.source, a.content, a.url)
+        ):
+            if ref not in new_references:
+                self.logger.info(
+                    f"Removing {ref.source} reference for {cve.sequence}"
+                )
+                cve.removeReference(ref)
+                modified = True
+
+        return modified
diff --git a/lib/lp/bugs/scripts/tests/test_cveimport.py b/lib/lp/bugs/scripts/tests/test_cveimport.py
index ebac876..3ccda57 100644
--- a/lib/lp/bugs/scripts/tests/test_cveimport.py
+++ b/lib/lp/bugs/scripts/tests/test_cveimport.py
@@ -1,14 +1,24 @@
-# Copyright 2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2024 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 import gzip
 import io
+import json
+import shutil
+import tempfile
+from datetime import datetime, timezone
+from pathlib import Path
 
 import responses
+from testtools.matchers import Contains
+from zope.component import getUtility
 
+from lp.bugs.interfaces.cve import CveStatus, ICveSet
 from lp.bugs.scripts.cveimport import CVEUpdater
 from lp.services.log.logger import DevNullLogger
+from lp.services.scripts.base import LaunchpadScriptFailure
 from lp.testing import TestCase
+from lp.testing.layers import LaunchpadZopelessLayer
 
 
 class TestCVEUpdater(TestCase):
@@ -65,3 +75,157 @@ class TestCVEUpdater(TestCase):
             "cve-updater", test_args=[], logger=DevNullLogger()
         )
         self.assertEqual(body, cve_updater.fetchCVEURL(url))
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_dir)
+
+    def create_test_json_cve(
+        self, cve_id="2024-0001", description="Test description"
+    ):
+        """Helper to create a test CVE JSON file"""
+        cve_data = {
+            "dataType": "CVE_RECORD",
+            "cveMetadata": {"cveId": f"CVE-{cve_id}"},
+            "containers": {
+                "cna": {
+                    "descriptions": [{"lang": "en", "value": description}],
+                    "references": [
+                        {
+                            "url": "http://example.com/ref1";,
+                            "name": "Reference 1",
+                        }
+                    ],
+                }
+            },
+        }
+        return cve_data
+
+    def make_updater(self, test_args=None):
+        """Helper to create a properly initialized CVEUpdater."""
+        if test_args is None:
+            test_args = []
+        updater = CVEUpdater(
+            "cve-updater", test_args=test_args, logger=DevNullLogger()
+        )
+        # Initialize just the database connection
+        updater._init_db(isolation="read_committed")
+        return updater
+
+    def test_process_json_directory(self):
+        """Test processing a directory of CVE JSON files."""
+        # Create test directory structure
+        base_dir = Path(self.temp_dir) / "cves"
+        year_dir = base_dir / "2024"
+        group_dir = year_dir / "0xxx"
+        group_dir.mkdir(parents=True)
+
+        # Create a test CVE file
+        cve_file = group_dir / "CVE-2024-0001.json"
+        cve_data = self.create_test_json_cve()
+        cve_file.write_text(json.dumps(cve_data))
+
+        # Process the directory using the script infrastructure
+        updater = self.make_updater([str(base_dir)])
+        processed, errors = updater.process_json_directory(str(base_dir))
+
+        # Verify results
+        self.assertEqual(1, processed)
+        self.assertEqual(0, errors)
+
+        # Verify CVE was created
+        cveset = getUtility(ICveSet)
+        cve = cveset["2024-0001"]
+        self.assertIsNotNone(cve)
+        self.assertEqual("Test description", cve.description)
+
+    def test_process_delta_directory(self):
+        """Test processing a directory of delta CVE files."""
+        # Create test delta directory
+        delta_dir = Path(self.temp_dir) / "deltaCves"
+        delta_dir.mkdir()
+
+        # Create a test delta CVE file
+        cve_file = delta_dir / "CVE-2024-0002.json"
+        cve_data = self.create_test_json_cve(
+            cve_id="2024-0002", description="Delta CVE"
+        )
+        cve_file.write_text(json.dumps(cve_data))
+
+        # Process the directory using the script infrastructure
+        updater = self.make_updater([str(delta_dir)])
+        processed, errors = updater.process_delta_directory(str(delta_dir))
+
+        # Verify results
+        self.assertEqual(1, processed)
+        self.assertEqual(0, errors)
+
+        # Verify CVE was created
+        cveset = getUtility(ICveSet)
+        cve = cveset["2024-0002"]
+        self.assertIsNotNone(cve)
+        self.assertEqual("Delta CVE", cve.description)
+
+    def test_construct_github_url(self):
+        """Test GitHub URL construction for different scenarios."""
+        updater = CVEUpdater(
+            "cve-updater", test_args=[], logger=DevNullLogger()
+        )
+
+        # Test baseline URL
+        url, year = updater.construct_github_url(delta=False)
+        expected = "_all_CVEs_at_midnight.zip"
+        self.assertThat(url, Contains(expected))
+        self.assertEqual(datetime.now(timezone.utc).strftime("%Y"), year)
+
+        # Test delta URL (normal hour)
+        url, _ = updater.construct_github_url(delta=True)
+        current_hour = datetime.now(timezone.utc).hour
+        if current_hour not in (0, 23):
+            expected = f"_delta_CVEs_at_{current_hour:02d}00Z.zip"
+            self.assertThat(url, Contains(expected))
+
+    def test_invalid_json_cve(self):
+        """Test handling of invalid CVE JSON data."""
+        updater = CVEUpdater(
+            "cve-updater", test_args=[], logger=DevNullLogger()
+        )
+
+        # Test invalid dataType
+        invalid_data = {
+            "dataType": "INVALID",
+            "cveMetadata": {"cveId": "CVE-2024-0003"},
+        }
+
+        self.assertRaises(
+            LaunchpadScriptFailure, updater.processCVEJSON, invalid_data
+        )
+
+    def test_update_existing_cve(self):
+        """Test updating an existing CVE with new data."""
+        # First create a CVE
+        original_desc = "Original description"
+        cveset = getUtility(ICveSet)
+
+        # Create initial CVE using a properly initialized updater
+        updater = self.make_updater()
+        cveset.new("2024-0004", original_desc, CveStatus.ENTRY)
+        updater.txn.commit()
+
+        # Create updated data
+        new_desc = "Updated description"
+        cve_data = self.create_test_json_cve(
+            cve_id="2024-0004", description=new_desc
+        )
+
+        # Process the update with a fresh updater
+        updater = self.make_updater()
+        updater.processCVEJSON(cve_data)
+        updater.txn.commit()
+
+        # Verify the update
+        updated_cve = cveset["2024-0004"]
+        self.assertEqual(new_desc, updated_cve.description)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index f634d11..0d4b817 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -578,6 +578,9 @@ dbuser: cve
 # datatype: string
 cve_db_url: https://cve.mitre.org/data/downloads/allitems.xml.gz
 
+# datatype: string
+github_cve_url: https://github.com/CVEProject/cvelistV5/releases/download/
+
 # datatype: integer
 timeout: 30