← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master

 

Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master.

Commit message:
Fix SOSSExport do not use removeSecurityProxy
    
The `ExportVulnerabilityJob` will remove the security proxy before
calling `SOSSExport`.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491949
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master.
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index 348d4d9..9a75eb6 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -7,24 +7,17 @@ from collections import defaultdict
 from datetime import datetime
 from typing import Dict, List, Optional
 
-from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
-
 from lp.app.enums import InformationType
-from lp.app.errors import NotFoundError
-from lp.bugs.interfaces.cve import ICveSet
 from lp.bugs.model.bug import Bug as BugModel
 from lp.bugs.model.bugtask import BugTask
 from lp.bugs.model.cve import Cve as CveModel
 from lp.bugs.model.vulnerability import Vulnerability
 from lp.bugs.scripts.soss.models import SOSSRecord
 from lp.bugs.scripts.soss.sossimport import (
-    DISTRIBUTION_NAME,
     PACKAGE_STATUS_MAP,
     PACKAGE_TYPE_MAP,
     PRIORITY_ENUM_MAP,
 )
-from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.model.distribution import Distribution
 
 __all__ = [
@@ -50,15 +43,8 @@ class SOSSExporter:
     def __init__(
         self,
         information_type: InformationType = InformationType.PROPRIETARY,
-        dry_run: bool = False,
     ) -> None:
-        self.dry_run = dry_run
-        self.cve_set = getUtility(ICveSet)
-        self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
-
-        if self.soss is None:
-            logger.error("[SOSSExporter] SOSS distribution not found")
-            raise NotFoundError("SOSS distribution not found")
+        self.information_type = information_type
 
     def _get_packages(
         self, bugtasks: List[BugTask]
@@ -104,103 +90,68 @@ class SOSSExporter:
                 )
         return cvss_list
 
-    def to_record(self, cve_sequence: str) -> SOSSRecord:
+    def to_record(
+        self,
+        lp_cve: CveModel,
+        distribution: Distribution,
+        bug: BugModel,
+        vulnerability: Vulnerability,
+    ) -> SOSSRecord:
         """Return a SOSSRecord exporting Launchpad data for the specified
         cve_sequence.
         """
-        lp_cve = self._get_launchpad_cve(cve_sequence)
-        if lp_cve is None:
-            logger.error(f"[SOSSExporter] {cve_sequence} not found")
-            return None
-
-        bug = self._find_existing_bug(cve_sequence, lp_cve, self.soss)
-        if bug is None:
-            logger.error(f"[SOSSExporter] No bug found for {cve_sequence}")
-            return None
-
-        vulnerability = self._find_existing_vulnerability(lp_cve, self.soss)
-        if vulnerability is None:
-            logger.error(
-                f"[SOSSExporter] No vulnerability found for {cve_sequence}"
-            )
-            return None
+        self._validate_to_record_args(lp_cve, distribution, bug, vulnerability)
 
-        # Export bug
-        desc_parts = bug.description.rsplit("\n\nReferences:\n")
+        # Parse bug
+        desc_parts = bug.description.rsplit("\n\nReferences:\n", maxsplit=1)
         references = desc_parts[1].split("\n") if len(desc_parts) > 1 else []
 
-        # Export bug.bugtasks
+        # Parse bug.bugtasks
         packages = self._get_packages(bug.bugtasks)
         assigned_to = (
             bug.bugtasks[0].assignee.name if bug.bugtasks[0].assignee else ""
         )
 
-        # Export vulnerability
-        description = vulnerability.description
-        public_date = vulnerability.date_made_public
+        # Parse vulnerability
+        public_date = self._normalize_date_without_timezone(
+            vulnerability.date_made_public
+        )
         notes = vulnerability.notes.split("\n") if vulnerability.notes else []
         priority = SOSSRecord.PriorityEnum(
             PRIORITY_ENUM_MAP_REVERSE[vulnerability.importance]
         )
-        priority_reason = vulnerability.importance_explanation
-
-        # Export vulnerability.cvss
-        cvss = self._get_cvss(vulnerability.cvss)
-
-        candidate = f"CVE-{lp_cve.sequence}"
 
         return SOSSRecord(
             references=references,
             notes=notes,
             priority=priority,
-            priority_reason=priority_reason,
+            priority_reason=vulnerability.importance_explanation,
             assigned_to=assigned_to,
             packages=packages,
-            candidate=candidate,
-            description=description,
-            cvss=cvss,
-            public_date=self._normalize_date_without_timezone(public_date),
+            candidate=f"CVE-{lp_cve.sequence}",
+            description=vulnerability.description,
+            cvss=self._get_cvss(vulnerability.cvss),
+            public_date=public_date,
         )
 
-    def _find_existing_bug(
+    def _validate_to_record_args(
         self,
-        cve_sequence: str,
         lp_cve: CveModel,
         distribution: Distribution,
-    ) -> Optional[BugModel]:
-        """Find existing bug for the given CVE."""
-        for vulnerability in lp_cve.vulnerabilities:
-            if vulnerability.distribution == distribution:
-                bugs = vulnerability.bugs
-                if len(bugs) > 1:
-                    raise ValueError(
-                        "Multiple existing bugs found for CVE ",
-                        cve_sequence,
-                    )
-                if bugs:
-                    return bugs[0]
-        return None
-
-    def _find_existing_vulnerability(
-        self, lp_cve: CveModel, distribution: Distribution
-    ) -> Optional[Vulnerability]:
-        """Find existing vulnerability for the current distribution"""
-        if not lp_cve:
-            return None
-
-        vulnerability = next(
-            (
-                v
-                for v in lp_cve.vulnerabilities
-                if v.distribution == distribution
-            ),
-            None,
-        )
-        return vulnerability
+        bug: BugModel,
+        vulnerability: Vulnerability,
+    ):
+        required_args = {
+            "Cve": lp_cve,
+            "Bug": bug,
+            "Vulnerability": vulnerability,
+            "Distribution": distribution,
+        }
 
-    def _get_launchpad_cve(self, cve_sequence: str) -> Optional[CveModel]:
-        """Get CVE from Launchpad."""
-        return removeSecurityProxy(self.cve_set[cve_sequence])
+        for name, value in required_args.items():
+            if value is None:
+                logger.error(f"[SOSSExporter] {name} can't be None")
+                raise ValueError(f"{name} can't be None")
 
     def _normalize_date_without_timezone(
         self, date_obj: datetime
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
index 02e813b..6c1e901 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
@@ -1,6 +1,5 @@
 from pathlib import Path
 
-import transaction
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
@@ -10,7 +9,7 @@ from lp.bugs.interfaces.cve import ICveSet
 from lp.bugs.scripts.soss import SOSSRecord
 from lp.bugs.scripts.soss.sossexport import SOSSExporter
 from lp.bugs.scripts.soss.sossimport import SOSSImporter
-from lp.testing import TestCaseWithFactory, person_logged_in
+from lp.testing import TestCaseWithFactory
 from lp.testing.layers import LaunchpadZopelessLayer
 
 
@@ -20,6 +19,9 @@ class TestSOSSExporter(TestCaseWithFactory):
 
     def setUp(self):
         super().setUp()
+        self.cve_set = getUtility(ICveSet)
+        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+
         self.sampledata = Path(__file__).parent / "sampledata"
         self.factory.makePerson(name="octagalland")
         self.soss = self.factory.makeDistribution(
@@ -27,21 +29,20 @@ class TestSOSSExporter(TestCaseWithFactory):
             displayname="SOSS",
             information_type=InformationType.PROPRIETARY,
         )
-        transaction.commit()
+        self._makeCves()
 
         self.soss_importer = SOSSImporter()
         self.soss_exporter = SOSSExporter()
 
-        self.cve_set = getUtility(ICveSet)
-        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
-
-    def test_get_packages(self):
-        """Test get SOSSRecord.Package list from bugtasks."""
+    def _makeCves(self):
         for file in self.sampledata.iterdir():
             cve_sequence = file.name.lstrip("CVE-")
             if not self.cve_set[cve_sequence]:
                 self.factory.makeCVE(sequence=cve_sequence)
 
+    def test_get_packages(self):
+        """Test get SOSSRecord.Package list from bugtasks."""
+        for file in self.sampledata.iterdir():
             with open(file) as f:
                 soss_record = SOSSRecord.from_yaml(f)
 
@@ -74,17 +75,16 @@ class TestSOSSExporter(TestCaseWithFactory):
         )
 
         for file in self.sampledata.iterdir():
-            cve_sequence = file.name.lstrip("CVE-")
-            if not self.cve_set[cve_sequence]:
-                self.factory.makeCVE(sequence=cve_sequence)
-
             with open(file) as f:
                 soss_record = SOSSRecord.from_yaml(f)
 
             bug, vulnerability = soss_importer.import_cve_from_file(file)
 
-            with person_logged_in(self.bug_importer):
-                exported = self.soss_exporter.to_record(file.name)
+            cve_sequence = file.name.lstrip("CVE-")
+            lp_cve = self.cve_set[cve_sequence]
+            exported = self.soss_exporter.to_record(
+                lp_cve, self.soss, bug, vulnerability
+            )
 
             self.assertEqual(soss_record, exported)
 
@@ -96,14 +96,14 @@ class TestSOSSExporter(TestCaseWithFactory):
         )
 
         for file in self.sampledata.iterdir():
-            cve_sequence = file.name.lstrip("CVE-")
-            if not self.cve_set[cve_sequence]:
-                self.factory.makeCVE(sequence=cve_sequence)
 
             bug, vulnerability = soss_importer.import_cve_from_file(file)
 
-            with person_logged_in(self.bug_importer):
-                exported = self.soss_exporter.to_record(file.name)
+            cve_sequence = file.name.lstrip("CVE-")
+            lp_cve = self.cve_set[cve_sequence]
+            exported = self.soss_exporter.to_record(
+                lp_cve, self.soss, bug, vulnerability
+            )
 
             with open(file) as f:
                 self.assertEqual(f.read(), exported.to_yaml())