launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32938
[Merge] ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master
Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master.
Commit message:
Fix SOSSExport do not use removeSecurityProxy
The `ExportVulnerabilityJob` will remove the security proxy before
calling `SOSSExport`.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491949
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:fix-soss-export-security-proxy into launchpad:master.
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index 348d4d9..9a75eb6 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -7,24 +7,17 @@ from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
-from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
-
from lp.app.enums import InformationType
-from lp.app.errors import NotFoundError
-from lp.bugs.interfaces.cve import ICveSet
from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
from lp.bugs.scripts.soss.models import SOSSRecord
from lp.bugs.scripts.soss.sossimport import (
- DISTRIBUTION_NAME,
PACKAGE_STATUS_MAP,
PACKAGE_TYPE_MAP,
PRIORITY_ENUM_MAP,
)
-from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.model.distribution import Distribution
__all__ = [
@@ -50,15 +43,8 @@ class SOSSExporter:
def __init__(
self,
information_type: InformationType = InformationType.PROPRIETARY,
- dry_run: bool = False,
) -> None:
- self.dry_run = dry_run
- self.cve_set = getUtility(ICveSet)
- self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
-
- if self.soss is None:
- logger.error("[SOSSExporter] SOSS distribution not found")
- raise NotFoundError("SOSS distribution not found")
+ self.information_type = information_type
def _get_packages(
self, bugtasks: List[BugTask]
@@ -104,103 +90,68 @@ class SOSSExporter:
)
return cvss_list
- def to_record(self, cve_sequence: str) -> SOSSRecord:
+ def to_record(
+ self,
+ lp_cve: CveModel,
+ distribution: Distribution,
+ bug: BugModel,
+ vulnerability: Vulnerability,
+ ) -> SOSSRecord:
"""Return a SOSSRecord exporting Launchpad data for the specified
cve_sequence.
"""
- lp_cve = self._get_launchpad_cve(cve_sequence)
- if lp_cve is None:
- logger.error(f"[SOSSExporter] {cve_sequence} not found")
- return None
-
- bug = self._find_existing_bug(cve_sequence, lp_cve, self.soss)
- if bug is None:
- logger.error(f"[SOSSExporter] No bug found for {cve_sequence}")
- return None
-
- vulnerability = self._find_existing_vulnerability(lp_cve, self.soss)
- if vulnerability is None:
- logger.error(
- f"[SOSSExporter] No vulnerability found for {cve_sequence}"
- )
- return None
+ self._validate_to_record_args(lp_cve, distribution, bug, vulnerability)
- # Export bug
- desc_parts = bug.description.rsplit("\n\nReferences:\n")
+ # Parse bug
+ desc_parts = bug.description.rsplit("\n\nReferences:\n", maxsplit=1)
references = desc_parts[1].split("\n") if len(desc_parts) > 1 else []
- # Export bug.bugtasks
+ # Parse bug.bugtasks
packages = self._get_packages(bug.bugtasks)
assigned_to = (
bug.bugtasks[0].assignee.name if bug.bugtasks[0].assignee else ""
)
- # Export vulnerability
- description = vulnerability.description
- public_date = vulnerability.date_made_public
+ # Parse vulnerability
+ public_date = self._normalize_date_without_timezone(
+ vulnerability.date_made_public
+ )
notes = vulnerability.notes.split("\n") if vulnerability.notes else []
priority = SOSSRecord.PriorityEnum(
PRIORITY_ENUM_MAP_REVERSE[vulnerability.importance]
)
- priority_reason = vulnerability.importance_explanation
-
- # Export vulnerability.cvss
- cvss = self._get_cvss(vulnerability.cvss)
-
- candidate = f"CVE-{lp_cve.sequence}"
return SOSSRecord(
references=references,
notes=notes,
priority=priority,
- priority_reason=priority_reason,
+ priority_reason=vulnerability.importance_explanation,
assigned_to=assigned_to,
packages=packages,
- candidate=candidate,
- description=description,
- cvss=cvss,
- public_date=self._normalize_date_without_timezone(public_date),
+ candidate=f"CVE-{lp_cve.sequence}",
+ description=vulnerability.description,
+ cvss=self._get_cvss(vulnerability.cvss),
+ public_date=public_date,
)
- def _find_existing_bug(
+ def _validate_to_record_args(
self,
- cve_sequence: str,
lp_cve: CveModel,
distribution: Distribution,
- ) -> Optional[BugModel]:
- """Find existing bug for the given CVE."""
- for vulnerability in lp_cve.vulnerabilities:
- if vulnerability.distribution == distribution:
- bugs = vulnerability.bugs
- if len(bugs) > 1:
- raise ValueError(
- "Multiple existing bugs found for CVE ",
- cve_sequence,
- )
- if bugs:
- return bugs[0]
- return None
-
- def _find_existing_vulnerability(
- self, lp_cve: CveModel, distribution: Distribution
- ) -> Optional[Vulnerability]:
- """Find existing vulnerability for the current distribution"""
- if not lp_cve:
- return None
-
- vulnerability = next(
- (
- v
- for v in lp_cve.vulnerabilities
- if v.distribution == distribution
- ),
- None,
- )
- return vulnerability
+ bug: BugModel,
+ vulnerability: Vulnerability,
+ ):
+ required_args = {
+ "Cve": lp_cve,
+ "Bug": bug,
+ "Vulnerability": vulnerability,
+ "Distribution": distribution,
+ }
- def _get_launchpad_cve(self, cve_sequence: str) -> Optional[CveModel]:
- """Get CVE from Launchpad."""
- return removeSecurityProxy(self.cve_set[cve_sequence])
+ for name, value in required_args.items():
+ if value is None:
+ logger.error(f"[SOSSExporter] {name} can't be None")
+ raise ValueError(f"{name} can't be None")
def _normalize_date_without_timezone(
self, date_obj: datetime
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
index 02e813b..6c1e901 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
@@ -1,6 +1,5 @@
from pathlib import Path
-import transaction
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -10,7 +9,7 @@ from lp.bugs.interfaces.cve import ICveSet
from lp.bugs.scripts.soss import SOSSRecord
from lp.bugs.scripts.soss.sossexport import SOSSExporter
from lp.bugs.scripts.soss.sossimport import SOSSImporter
-from lp.testing import TestCaseWithFactory, person_logged_in
+from lp.testing import TestCaseWithFactory
from lp.testing.layers import LaunchpadZopelessLayer
@@ -20,6 +19,9 @@ class TestSOSSExporter(TestCaseWithFactory):
def setUp(self):
super().setUp()
+ self.cve_set = getUtility(ICveSet)
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+
self.sampledata = Path(__file__).parent / "sampledata"
self.factory.makePerson(name="octagalland")
self.soss = self.factory.makeDistribution(
@@ -27,21 +29,20 @@ class TestSOSSExporter(TestCaseWithFactory):
displayname="SOSS",
information_type=InformationType.PROPRIETARY,
)
- transaction.commit()
+ self._makeCves()
self.soss_importer = SOSSImporter()
self.soss_exporter = SOSSExporter()
- self.cve_set = getUtility(ICveSet)
- self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
-
- def test_get_packages(self):
- """Test get SOSSRecord.Package list from bugtasks."""
+ def _makeCves(self):
for file in self.sampledata.iterdir():
cve_sequence = file.name.lstrip("CVE-")
if not self.cve_set[cve_sequence]:
self.factory.makeCVE(sequence=cve_sequence)
+ def test_get_packages(self):
+ """Test get SOSSRecord.Package list from bugtasks."""
+ for file in self.sampledata.iterdir():
with open(file) as f:
soss_record = SOSSRecord.from_yaml(f)
@@ -74,17 +75,16 @@ class TestSOSSExporter(TestCaseWithFactory):
)
for file in self.sampledata.iterdir():
- cve_sequence = file.name.lstrip("CVE-")
- if not self.cve_set[cve_sequence]:
- self.factory.makeCVE(sequence=cve_sequence)
-
with open(file) as f:
soss_record = SOSSRecord.from_yaml(f)
bug, vulnerability = soss_importer.import_cve_from_file(file)
- with person_logged_in(self.bug_importer):
- exported = self.soss_exporter.to_record(file.name)
+ cve_sequence = file.name.lstrip("CVE-")
+ lp_cve = self.cve_set[cve_sequence]
+ exported = self.soss_exporter.to_record(
+ lp_cve, self.soss, bug, vulnerability
+ )
self.assertEqual(soss_record, exported)
@@ -96,14 +96,14 @@ class TestSOSSExporter(TestCaseWithFactory):
)
for file in self.sampledata.iterdir():
- cve_sequence = file.name.lstrip("CVE-")
- if not self.cve_set[cve_sequence]:
- self.factory.makeCVE(sequence=cve_sequence)
bug, vulnerability = soss_importer.import_cve_from_file(file)
- with person_logged_in(self.bug_importer):
- exported = self.soss_exporter.to_record(file.name)
+ cve_sequence = file.name.lstrip("CVE-")
+ lp_cve = self.cve_set[cve_sequence]
+ exported = self.soss_exporter.to_record(
+ lp_cve, self.soss, bug, vulnerability
+ )
with open(file) as f:
self.assertEqual(f.read(), exported.to_yaml())