← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~enriqueesanchz/launchpad:add-rejected-to-update-cve into launchpad:master

 

Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-rejected-to-update-cve into launchpad:master.

Commit message:
Add cveimport rejected cves

- Set imported rejected cves as CveStatus.REJECTED
- Set Cve.cvss

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/494919
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-rejected-to-update-cve into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
index a3037ec..8807fb5 100644
--- a/lib/lp/bugs/interfaces/cve.py
+++ b/lib/lp/bugs/interfaces/cve.py
@@ -74,6 +74,15 @@ class CveStatus(DBEnumeratedType):
         """,
     )
 
+    REJECTED = DBItem(
+        4,
+        """
+        Rejected
+
+        This CVE has been rejected or withdrawn by its CVE Numbering Authority.
+        """,
+    )
+
 
 @exported_as_webservice_entry(as_of="beta")
 class ICve(Interface):
diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
index 77cef96..050889d 100644
--- a/lib/lp/bugs/model/cve.py
+++ b/lib/lp/bugs/model/cve.py
@@ -65,18 +65,9 @@ class Cve(StormBase, BugLinkTargetMixin):
 
     date_made_public = DateTime(tzinfo=timezone.utc, allow_none=True)
     discovered_by = Unicode(allow_none=True)
-    _cvss = JSON(name="cvss", allow_none=True)
+    cvss = JSON(name="cvss", allow_none=True)
     metadata = JSON(name="metadata", allow_none=True)
 
-    @property
-    def cvss(self):
-        return self._cvss or {}
-
-    @cvss.setter
-    def cvss(self, value):
-        assert value is None or isinstance(value, dict)
-        self._cvss = value
-
     def __init__(
         self,
         sequence,
@@ -93,7 +84,7 @@ class Cve(StormBase, BugLinkTargetMixin):
         self.description = description
         self.date_made_public = date_made_public
         self.discovered_by = discovered_by
-        self._cvss = cvss
+        self.cvss = cvss
         self.metadata = metadata
 
     @property
diff --git a/lib/lp/bugs/scripts/cveimport.py b/lib/lp/bugs/scripts/cveimport.py
index 22ceaa3..759821c 100644
--- a/lib/lp/bugs/scripts/cveimport.py
+++ b/lib/lp/bugs/scripts/cveimport.py
@@ -35,6 +35,11 @@ from lp.services.timeout import override_timeout, urlfetch
 
 CVEDB_NS = "{http://cve.mitre.org/cve/downloads/1.0}";
 
+CVE_STATE_TO_STATUS = {
+    "PUBLISHED": CveStatus.ENTRY,
+    "REJECTED": CveStatus.REJECTED,
+}
+
 
 def getText(elem):
     """Get the text content of the given element"""
@@ -684,6 +689,9 @@ class CVEUpdater(LaunchpadCronScript):
 
         # process each CVE record
         cve_metadata = data.get("cveMetadata", {})
+        status = CVE_STATE_TO_STATUS.get(
+            cve_metadata.get("state", ""), CveStatus.ENTRY
+        )
         containers = data.get("containers", {})
         cna_data = containers.get("cna", {})
 
@@ -692,7 +700,12 @@ class CVEUpdater(LaunchpadCronScript):
 
         # get description (required to be in English)
         description = None
-        for desc in cna_data.get("descriptions", []):
+        if status == CveStatus.REJECTED:
+            description_key = "rejectedReasons"
+        else:
+            description_key = "descriptions"
+
+        for desc in cna_data.get(description_key, []):
             if desc.get("lang", "").startswith("en"):
                 description = desc.get("value")
                 break
@@ -708,7 +721,7 @@ class CVEUpdater(LaunchpadCronScript):
         cveset = getUtility(ICveSet)
         cve = cveset[sequence]
         if cve is None:
-            cve = cveset.new(sequence, description, CveStatus.ENTRY)
+            cve = cveset.new(sequence, description, status)
             self.logger.info(f"CVE-{sequence} created")
 
         # update CVE if needed
@@ -723,13 +736,22 @@ class CVEUpdater(LaunchpadCronScript):
             modified = True
 
         # Build metadata dict
-        metadata = {"affected": affected}
+        metadata = None
+        if affected:
+            metadata = {"affected": affected}
 
         # If anything changed, update cve.metadata
         if metadata != cve.metadata:
             cve.metadata = metadata
             modified = True
 
+        # Get CVSS metrics
+        cvss_list = cna_data.get("metrics", None)
+        # If anything changed, update cve.cvss
+        if cvss_list != cve.cvss:
+            cve.cvss = cvss_list
+            modified = True
+
         if modified:
             notify(ObjectModifiedEvent(cve))
 
diff --git a/lib/lp/bugs/scripts/tests/test_cveimport.py b/lib/lp/bugs/scripts/tests/test_cveimport.py
index 0ca501d..c17605d 100644
--- a/lib/lp/bugs/scripts/tests/test_cveimport.py
+++ b/lib/lp/bugs/scripts/tests/test_cveimport.py
@@ -107,6 +107,26 @@ class TestCVEUpdater(TestCase):
                             "name": "Reference 1",
                         }
                     ],
+                    "metrics": [
+                        {
+                            "cvssV3_0": {
+                                "version": "3.0",
+                                "baseScore": 7.3,
+                                "vectorString": (
+                                    "CVSS:3.0"
+                                    "/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:L/A:L"
+                                ),
+                                "baseSeverity": "HIGH",
+                            }
+                        },
+                        {
+                            "cvssV2_0": {
+                                "version": "2.0",
+                                "baseScore": 7.5,
+                                "vectorString": "AV:N/AC:L/Au:N/C:P/I:P/A:P",
+                            }
+                        },
+                    ],
                 }
             },
         }
@@ -243,6 +263,59 @@ class TestCVEUpdater(TestCase):
             expected = f"_delta_CVEs_at_{current_hour:02d}00Z.zip"
             self.assertThat(url, Contains(expected))
 
+    def test_processCVEJSON(self):
+        """Test handling of CVE JSON data."""
+        updater = CVEUpdater(
+            "cve-updater", test_args=[], logger=DevNullLogger()
+        )
+
+        test_cve = self.create_test_json_cve(
+            cve_id="2024-0003", description="Test CVE 2024-0003"
+        )
+        updater.processCVEJSON(test_cve)
+
+        # Verify CVE was created
+        cveset = getUtility(ICveSet)
+        cve = cveset["2024-0003"]
+
+        self.assertEqual("2024-0003", cve.sequence)
+        self.assertEqual("Test CVE 2024-0003", cve.description)
+        self.assertEqual(CveStatus.ENTRY, cve.status)
+
+        metrics = test_cve.get("containers").get("cna").get("metrics")
+        self.assertEqual(metrics, cve.cvss)
+        affected = test_cve.get("containers").get("cna").get("affected")
+        self.assertEqual({"affected": affected}, cve.metadata)
+
+    def test_processCVEJSON_rejected(self):
+        """Test handling of rejected CVE JSON data."""
+        updater = CVEUpdater(
+            "cve-updater", test_args=[], logger=DevNullLogger()
+        )
+
+        rejected_cve = {
+            "dataType": "CVE_RECORD",
+            "cveMetadata": {"cveId": "CVE-2024-0004", "state": "REJECTED"},
+            "containers": {
+                "cna": {
+                    "rejectedReasons": [
+                        {"lang": "en", "value": "This CVE has been rejected."}
+                    ],
+                }
+            },
+        }
+
+        updater.processCVEJSON(rejected_cve)
+
+        # Verify CVE was created
+        cveset = getUtility(ICveSet)
+        cve = cveset["2024-0004"]
+        self.assertEqual("2024-0004", cve.sequence)
+        self.assertEqual("This CVE has been rejected.", cve.description)
+        self.assertEqual(CveStatus.REJECTED, cve.status)
+        self.assertEqual(None, cve.cvss)
+        self.assertEqual(None, cve.metadata)
+
     def test_invalid_json_cve(self):
         """Test handling of invalid CVE JSON data."""
         updater = CVEUpdater(
diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
index 3abf98d..d333399 100644
--- a/lib/lp/bugs/tests/test_cve.py
+++ b/lib/lp/bugs/tests/test_cve.py
@@ -519,7 +519,7 @@ class TestCve(TestCaseWithFactory):
                 description="A critical vulnerability",
                 date_made_public=None,
                 discovered_by=None,
-                cvss={},
+                cvss=None,
                 metadata=None,
             ),
         )
@@ -567,27 +567,6 @@ class TestCve(TestCaseWithFactory):
             with ExpectedException(TypeError, "Expected datetime,.*"):
                 removeSecurityProxy(cve).date_made_public = invalid_value
 
-    def test_cve_cvss_invalid_values(self):
-        invalid_values = ["", "abcd", "2022-01-01", datetime.now()]
-        cve = self.factory.makeCVE(
-            sequence="2099-1234",
-            description="A critical vulnerability",
-            cvestate=CveStatus.CANDIDATE,
-        )
-        for invalid_value in invalid_values:
-            with ExpectedException(AssertionError):
-                removeSecurityProxy(cve).cvss = invalid_value
-
-    def test_cvss_value_returned_when_null(self):
-        cve = self.factory.makeCVE(
-            sequence="2099-1234",
-            description="A critical vulnerability",
-            cvestate=CveStatus.CANDIDATE,
-        )
-        cve = removeSecurityProxy(cve)
-        self.assertIsNone(cve._cvss)
-        self.assertEqual({}, cve.cvss)
-
     def test_getDistributionVulnerability(self):
         cve = self.factory.makeCVE(sequence="2099-1234")
         distribution = self.factory.makeDistribution(