launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #33056
[Merge] ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master
Ines Almeida has proposed merging ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master.
Commit message:
Refactor SVT exporter and record
These changes clean up code in preparation to enable the UCT Handler for vulnerability exports. The main changes included removing unused and redundant parameters from the SVTExporter, renaming `to_yaml` to `to_str` and minor updates in the SVT interfaces to make it more explicit
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ines-almeida/launchpad/+git/launchpad/+merge/493462
These changes don't change any behavior of the current code
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master.
diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
index 254b6e6..6da2147 100644
--- a/lib/lp/bugs/model/exportvulnerabilityjob.py
+++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
@@ -17,7 +17,6 @@ from zope.component import getUtility
from zope.interface import implementer, provider
from zope.security.proxy import removeSecurityProxy
-from lp.app.enums import InformationType
from lp.bugs.enums import VulnerabilityHandlerEnum
from lp.bugs.interfaces.vulnerabilityjob import (
IExportVulnerabilityJob,
@@ -27,14 +26,14 @@ from lp.bugs.interfaces.vulnerabilityjob import (
VulnerabilityJobType,
)
from lp.bugs.model.bug import Bug as BugModel
-from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import HANDLER_DISTRIBUTION_MAP, Vulnerability
from lp.bugs.model.vulnerabilityjob import (
VulnerabilityJob,
VulnerabilityJobDerived,
)
-from lp.bugs.scripts.soss.models import SOSSRecord
from lp.bugs.scripts.soss.sossexport import SOSSExporter
+from lp.bugs.scripts.svthandler import SVTExporter, SVTRecord
+from lp.bugs.scripts.uct.uctexport import UCTExporter
from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.model.distribution import Distribution
from lp.services.config import config
@@ -48,8 +47,6 @@ EXPIRATION_WEEK_INTERVAL = 4
PROCESSED_CVE_LOG_INTERVAL = 500
logger = logging.getLogger(__name__)
-HANDLER_EXPORTER_MAP = {VulnerabilityHandlerEnum.SOSS: SOSSExporter}
-
@implementer(IExportVulnerabilityJob)
@provider(IExportVulnerabilityJobSource)
@@ -72,10 +69,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
return self.metadata.get("request").get("sources")
@property
- def information_type(self):
- return self.metadata.get("request").get("information_type")
-
- @property
def error_description(self):
return self.metadata.get("result").get("error_description")
@@ -84,7 +77,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
cls,
handler,
sources: Optional[List[str]] = None,
- information_type=InformationType.PRIVATESECURITY.value,
):
"""Create a new `ExportVulnerabilityJob`.
@@ -114,7 +106,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
metadata = {
"request": {
"sources": sources if sources is not None else [],
- "information_type": information_type,
},
"result": {
"error_description": [],
@@ -157,36 +148,35 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
parts += ", metadata: %s" % self.metadata
return "<%s>" % parts
- def _get_exporter_to_record(
+ def _get_exporter(
self,
handler: VulnerabilityHandlerEnum,
- information_type: InformationType = InformationType.PRIVATESECURITY,
- ):
- """Decide which parser and importer to use
+ ) -> SVTExporter:
+ """Decide which exporter to use
:return: a tuple of (parser, importer) where parser is the function
that gets a blob and returns a record and importer is the function that
gets a record and imports it.
"""
- exporter = HANDLER_EXPORTER_MAP.get(handler)
-
- if not exporter:
- exception = VulnerabilityJobException("Handler not found")
+ if handler == VulnerabilityHandlerEnum.SOSS:
+ return SOSSExporter()
+ elif handler == VulnerabilityHandlerEnum.UCT:
+ return UCTExporter()
+ else:
+ exception = VulnerabilityJobException(
+ f"Handler '{handler}' not found"
+ )
self.notifyUserError(exception)
raise exception
- exporter_to_record = exporter(
- information_type=information_type
- ).to_record
-
- return exporter_to_record
-
def _get_distribution(self, handler) -> Distribution:
distribution_name = HANDLER_DISTRIBUTION_MAP.get(handler)
if not distribution_name:
- exception = VulnerabilityJobException("Handler not found")
+ exception = VulnerabilityJobException(
+ f"Handler '{handler}' not found"
+ )
self.notifyUserError(exception)
raise exception
@@ -222,11 +212,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
def run(self):
"""See `IRunnableJob`."""
- information_type = InformationType.items[self.information_type]
- export_to_record = self._get_exporter_to_record(
- self.context.handler,
- information_type,
- )
+ exporter = self._get_exporter(self.context.handler)
distribution = self._get_distribution(self.context.handler)
distribution_vulnerability_set = (
distribution.getVulnerabilitiesVisibleToUser(
@@ -242,9 +228,8 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
# Log progress every PROCESSED_CVE_LOG_INTERVAL CVEs
num_parsed_cves = 0
- exported_cves: List[Tuple[SOSSRecord, str]] = []
+ exported_cves: List[Tuple[SVTRecord, str]] = []
for vul in distribution_vulnerability_set:
- cve: CveModel = vul.cve
bug = self._get_bug(vul)
@@ -257,14 +242,14 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
continue
try:
- record = export_to_record(cve, distribution, bug, vul)
+ record = exporter.to_record(bug, vul)
except ValueError as e:
self.notifyUserError(e)
continue
if record is None:
exception = VulnerabilityJobException(
- f"CVE-{cve.sequence} couldn't be converted"
+ f"CVE-{vul.cve.sequence} couldn't be converted"
)
self.notifyUserError(exception)
continue
@@ -283,7 +268,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
f"Processed {num_parsed_cves} CVEs in total."
)
- if exported_cves == []:
+ if len(exported_cves) == 0:
exception = VulnerabilityJobException("No CVEs to export")
self.notifyUserError(exception)
raise exception
@@ -303,7 +288,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
file_path = os.path.join(temp_dir, cve_name)
with open(file_path, "w") as f:
- f.write(record.to_yaml())
+ f.write(record.to_str())
# Create a zip archive of the temp folder
buf = io.BytesIO()
diff --git a/lib/lp/bugs/scripts/soss/models.py b/lib/lp/bugs/scripts/soss/models.py
index ca9d202..22445fc 100644
--- a/lib/lp/bugs/scripts/soss/models.py
+++ b/lib/lp/bugs/scripts/soss/models.py
@@ -218,5 +218,5 @@ class SOSSRecord(SVTRecord):
return serialized
- def to_yaml(self) -> str:
+ def to_str(self) -> str:
return yaml.dump(self.to_dict(), sort_keys=False)
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index c158dd3..94a7f98 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -7,10 +7,8 @@ from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Union
-from lp.app.enums import InformationType
from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
-from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
from lp.bugs.scripts.soss.models import SOSSRecord
from lp.bugs.scripts.soss.sossimport import (
@@ -20,7 +18,6 @@ from lp.bugs.scripts.soss.sossimport import (
)
from lp.bugs.scripts.svthandler import SVTExporter
from lp.registry.interfaces.role import IPersonRoles
-from lp.registry.model.distribution import Distribution
from lp.registry.security import SecurityAdminDistribution
__all__ = [
@@ -43,12 +40,6 @@ class SOSSExporter(SVTExporter):
files.
"""
- def __init__(
- self,
- information_type: InformationType = InformationType.PROPRIETARY,
- ) -> None:
- self.information_type = information_type
-
def _get_packages(
self, bugtasks: List[BugTask]
) -> Dict[SOSSRecord.PackageTypeEnum, SOSSRecord.Package]:
@@ -95,15 +86,13 @@ class SOSSExporter(SVTExporter):
def to_record(
self,
- lp_cve: CveModel,
- distribution: Distribution,
bug: BugModel,
vulnerability: Vulnerability,
) -> SOSSRecord:
"""Return a SOSSRecord exporting Launchpad data for the specified
cve_sequence.
"""
- self._validate_to_record_args(lp_cve, distribution, bug, vulnerability)
+ self._validate_to_record_args(bug, vulnerability)
# Parse bug
desc_parts = bug.description.rsplit("\n\nReferences:\n", maxsplit=1)
@@ -131,7 +120,7 @@ class SOSSExporter(SVTExporter):
priority_reason=vulnerability.importance_explanation,
assigned_to=assigned_to,
packages=packages,
- candidate=f"CVE-{lp_cve.sequence}",
+ candidate=f"CVE-{vulnerability.cve.sequence}",
description=vulnerability.description,
cvss=self._get_cvss(vulnerability.cvss),
public_date=public_date,
@@ -157,16 +146,12 @@ class SOSSExporter(SVTExporter):
def _validate_to_record_args(
self,
- lp_cve: CveModel,
- distribution: Distribution,
bug: BugModel,
vulnerability: Vulnerability,
):
required_args = {
- "Cve": lp_cve,
"Bug": bug,
"Vulnerability": vulnerability,
- "Distribution": distribution,
}
for name, value in required_args.items():
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
index 07802dc..1752d8e 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
@@ -79,12 +79,7 @@ class TestSOSSExporter(TestCaseWithFactory):
soss_record = SOSSRecord.from_yaml(f)
bug, vulnerability = soss_importer.import_cve_from_file(file)
-
- cve_sequence = file.name.lstrip("CVE-")
- lp_cve = self.cve_set[cve_sequence]
- exported = self.soss_exporter.to_record(
- lp_cve, self.soss, bug, vulnerability
- )
+ exported = self.soss_exporter.to_record(bug, vulnerability)
self.assertEqual(soss_record, exported)
@@ -98,12 +93,7 @@ class TestSOSSExporter(TestCaseWithFactory):
for file in self.sampledata.iterdir():
bug, vulnerability = soss_importer.import_cve_from_file(file)
-
- cve_sequence = file.name.lstrip("CVE-")
- lp_cve = self.cve_set[cve_sequence]
- exported = self.soss_exporter.to_record(
- lp_cve, self.soss, bug, vulnerability
- )
+ exported = self.soss_exporter.to_record(bug, vulnerability)
with open(file) as f:
- self.assertEqual(f.read(), exported.to_yaml())
+ self.assertEqual(f.read(), exported.to_str())
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
index a4d0285..1353c99 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
@@ -342,19 +342,19 @@ class TestSOSSRecord(TestCase):
self.soss_record_dict,
)
- def test_to_yaml(self):
+ def test_to_str(self):
load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979"
with open(load_from) as f:
sample_data = f.read()
- self.assertEqual(self.soss_record.to_yaml(), sample_data),
+ self.assertEqual(self.soss_record.to_str(), sample_data),
def _verify_import_export_yaml(self, file):
with open(file) as f:
soss_record_read = f.read()
soss_record = SOSSRecord.from_yaml(soss_record_read)
- self.assertEqual(soss_record_read, soss_record.to_yaml())
+ self.assertEqual(soss_record_read, soss_record.to_str())
def test_verify_import_export_yaml(self):
files = self.get_sample_files()
diff --git a/lib/lp/bugs/scripts/svthandler.py b/lib/lp/bugs/scripts/svthandler.py
index 9e9d096..51efb2d 100644
--- a/lib/lp/bugs/scripts/svthandler.py
+++ b/lib/lp/bugs/scripts/svthandler.py
@@ -7,21 +7,28 @@ __all__ = [
"SVTExporter",
]
+from dataclasses import dataclass
+
from lp.bugs.interfaces.bug import IBug
-from lp.bugs.interfaces.cve import ICve
from lp.bugs.interfaces.vulnerability import IVulnerability
-from lp.registry.interfaces.distribution import IDistribution
+@dataclass
class SVTRecord:
"""A dataclass that contains the exact same info as a cve file."""
+ @classmethod
def from_str(string: str) -> "SVTRecord":
"""Parse a string and return a SVTRecord."""
raise NotImplementedError()
+ def to_str(self) -> str:
+ """Convert the SVTRecord to a string."""
+ raise NotImplementedError()
+
class SVTImporter:
+
def from_record(
record: SVTRecord, cve_sequence: str
) -> (IBug, IVulnerability):
@@ -34,12 +41,15 @@ class SVTImporter:
class SVTExporter:
+
def to_record(
- lp_cve: ICve,
- distribution: IDistribution,
bug: IBug,
vulnerability: IVulnerability,
) -> SVTRecord:
"""Export the bug and vulnerability related to a cve in a distribution
and return a SVTRecord."""
raise NotImplementedError()
+
+ def checkUserPermissions(self, user, distribution):
+ """Checks if the user has permissions to use this handler."""
+ raise NotImplementedError()
diff --git a/lib/lp/bugs/scripts/uct/models.py b/lib/lp/bugs/scripts/uct/models.py
index e707b49..42331d0 100644
--- a/lib/lp/bugs/scripts/uct/models.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -5,6 +5,7 @@ import logging
import re
import tempfile
from collections import OrderedDict, defaultdict
+from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
@@ -60,6 +61,7 @@ class CVSS(NamedTuple):
vector_string: str
+@dataclass
class UCTRecord(SVTRecord):
"""
UCTRecord represents a single CVE record (file) in the ubuntu-cve-tracker.
@@ -104,45 +106,24 @@ class UCTRecord(SVTRecord):
tags: Set[str]
patches: List["UCTRecord.Patch"]
- def __init__(
- self,
- parent_dir: str,
- assigned_to: str,
- bugs: List[str],
- cvss: List[CVSS],
- candidate: str,
- crd: Optional[datetime],
- public_date: Optional[datetime],
- public_date_at_USN: Optional[datetime],
- description: str,
- discovered_by: str,
- mitigation: Optional[str],
- notes: str,
- priority: Priority,
- references: List[str],
- ubuntu_description: str,
- packages: List[Package],
- global_tags: Set[str],
- priority_explanation: str = "",
- ):
- self.parent_dir = parent_dir
- self.assigned_to = assigned_to
- self.bugs = bugs
- self.cvss = cvss
- self.candidate = candidate
- self.crd = crd
- self.public_date = public_date
- self.public_date_at_USN = public_date_at_USN
- self.description = description
- self.discovered_by = discovered_by
- self.mitigation = mitigation
- self.notes = notes
- self.priority = priority
- self.priority_explanation = priority_explanation
- self.references = references
- self.ubuntu_description = ubuntu_description
- self.packages = packages
- self.global_tags = global_tags
+ parent_dir: str
+ assigned_to: str
+ bugs: List[str]
+ cvss: List[CVSS]
+ candidate: str
+ crd: Optional[datetime]
+ public_date: Optional[datetime]
+ public_date_at_USN: Optional[datetime]
+ description: str
+ discovered_by: str
+ mitigation: Optional[str]
+ notes: str
+ priority: "UCTRecord.Priority"
+ references: List[str]
+ ubuntu_description: str
+ packages: List["UCTRecord.Package"]
+ global_tags: Set[str]
+ priority_explanation: str = ""
def __eq__(self, other):
if not isinstance(other, UCTRecord):
Follow ups