← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master

 

Ines Almeida has proposed merging ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master.

Commit message:
Refactor SVT exporter and record

These changes clean up code in preparation to enable the UCT Handler for vulnerability exports. The main changes included removing unused and redundant parameters from the SVTExporter, renaming `to_yaml` to `to_str` and minor updates in the SVT interfaces to make it more explicit

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~ines-almeida/launchpad/+git/launchpad/+merge/493462

These changes don't change any behavior of the current code
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~ines-almeida/launchpad:svt-refactor-exports into launchpad:master.
diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
index 254b6e6..6da2147 100644
--- a/lib/lp/bugs/model/exportvulnerabilityjob.py
+++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
@@ -17,7 +17,6 @@ from zope.component import getUtility
 from zope.interface import implementer, provider
 from zope.security.proxy import removeSecurityProxy
 
-from lp.app.enums import InformationType
 from lp.bugs.enums import VulnerabilityHandlerEnum
 from lp.bugs.interfaces.vulnerabilityjob import (
     IExportVulnerabilityJob,
@@ -27,14 +26,14 @@ from lp.bugs.interfaces.vulnerabilityjob import (
     VulnerabilityJobType,
 )
 from lp.bugs.model.bug import Bug as BugModel
-from lp.bugs.model.cve import Cve as CveModel
 from lp.bugs.model.vulnerability import HANDLER_DISTRIBUTION_MAP, Vulnerability
 from lp.bugs.model.vulnerabilityjob import (
     VulnerabilityJob,
     VulnerabilityJobDerived,
 )
-from lp.bugs.scripts.soss.models import SOSSRecord
 from lp.bugs.scripts.soss.sossexport import SOSSExporter
+from lp.bugs.scripts.svthandler import SVTExporter, SVTRecord
+from lp.bugs.scripts.uct.uctexport import UCTExporter
 from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.model.distribution import Distribution
 from lp.services.config import config
@@ -48,8 +47,6 @@ EXPIRATION_WEEK_INTERVAL = 4
 PROCESSED_CVE_LOG_INTERVAL = 500
 logger = logging.getLogger(__name__)
 
-HANDLER_EXPORTER_MAP = {VulnerabilityHandlerEnum.SOSS: SOSSExporter}
-
 
 @implementer(IExportVulnerabilityJob)
 @provider(IExportVulnerabilityJobSource)
@@ -72,10 +69,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
         return self.metadata.get("request").get("sources")
 
     @property
-    def information_type(self):
-        return self.metadata.get("request").get("information_type")
-
-    @property
     def error_description(self):
         return self.metadata.get("result").get("error_description")
 
@@ -84,7 +77,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
         cls,
         handler,
         sources: Optional[List[str]] = None,
-        information_type=InformationType.PRIVATESECURITY.value,
     ):
         """Create a new `ExportVulnerabilityJob`.
 
@@ -114,7 +106,6 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
         metadata = {
             "request": {
                 "sources": sources if sources is not None else [],
-                "information_type": information_type,
             },
             "result": {
                 "error_description": [],
@@ -157,36 +148,35 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
         parts += ", metadata: %s" % self.metadata
         return "<%s>" % parts
 
-    def _get_exporter_to_record(
+    def _get_exporter(
         self,
         handler: VulnerabilityHandlerEnum,
-        information_type: InformationType = InformationType.PRIVATESECURITY,
-    ):
-        """Decide which parser and importer to use
+    ) -> SVTExporter:
+        """Decide which exporter to use
 
         :return: a tuple of (parser, importer) where parser is the function
         that gets a blob and returns a record and importer is the function that
         gets a record and imports it.
         """
 
-        exporter = HANDLER_EXPORTER_MAP.get(handler)
-
-        if not exporter:
-            exception = VulnerabilityJobException("Handler not found")
+        if handler == VulnerabilityHandlerEnum.SOSS:
+            return SOSSExporter()
+        elif handler == VulnerabilityHandlerEnum.UCT:
+            return UCTExporter()
+        else:
+            exception = VulnerabilityJobException(
+                f"Handler '{handler}' not found"
+            )
             self.notifyUserError(exception)
             raise exception
 
-        exporter_to_record = exporter(
-            information_type=information_type
-        ).to_record
-
-        return exporter_to_record
-
     def _get_distribution(self, handler) -> Distribution:
         distribution_name = HANDLER_DISTRIBUTION_MAP.get(handler)
 
         if not distribution_name:
-            exception = VulnerabilityJobException("Handler not found")
+            exception = VulnerabilityJobException(
+                f"Handler '{handler}' not found"
+            )
             self.notifyUserError(exception)
             raise exception
 
@@ -222,11 +212,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
     def run(self):
         """See `IRunnableJob`."""
 
-        information_type = InformationType.items[self.information_type]
-        export_to_record = self._get_exporter_to_record(
-            self.context.handler,
-            information_type,
-        )
+        exporter = self._get_exporter(self.context.handler)
         distribution = self._get_distribution(self.context.handler)
         distribution_vulnerability_set = (
             distribution.getVulnerabilitiesVisibleToUser(
@@ -242,9 +228,8 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
         # Log progress every PROCESSED_CVE_LOG_INTERVAL CVEs
         num_parsed_cves = 0
 
-        exported_cves: List[Tuple[SOSSRecord, str]] = []
+        exported_cves: List[Tuple[SVTRecord, str]] = []
         for vul in distribution_vulnerability_set:
-            cve: CveModel = vul.cve
 
             bug = self._get_bug(vul)
 
@@ -257,14 +242,14 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
                 continue
 
             try:
-                record = export_to_record(cve, distribution, bug, vul)
+                record = exporter.to_record(bug, vul)
             except ValueError as e:
                 self.notifyUserError(e)
                 continue
 
             if record is None:
                 exception = VulnerabilityJobException(
-                    f"CVE-{cve.sequence} couldn't be converted"
+                    f"CVE-{vul.cve.sequence} couldn't be converted"
                 )
                 self.notifyUserError(exception)
                 continue
@@ -283,7 +268,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
             f"Processed {num_parsed_cves} CVEs in total."
         )
 
-        if exported_cves == []:
+        if len(exported_cves) == 0:
             exception = VulnerabilityJobException("No CVEs to export")
             self.notifyUserError(exception)
             raise exception
@@ -303,7 +288,7 @@ class ExportVulnerabilityJob(VulnerabilityJobDerived):
                 file_path = os.path.join(temp_dir, cve_name)
 
                 with open(file_path, "w") as f:
-                    f.write(record.to_yaml())
+                    f.write(record.to_str())
 
             # Create a zip archive of the temp folder
             buf = io.BytesIO()
diff --git a/lib/lp/bugs/scripts/soss/models.py b/lib/lp/bugs/scripts/soss/models.py
index ca9d202..22445fc 100644
--- a/lib/lp/bugs/scripts/soss/models.py
+++ b/lib/lp/bugs/scripts/soss/models.py
@@ -218,5 +218,5 @@ class SOSSRecord(SVTRecord):
 
         return serialized
 
-    def to_yaml(self) -> str:
+    def to_str(self) -> str:
         return yaml.dump(self.to_dict(), sort_keys=False)
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index c158dd3..94a7f98 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -7,10 +7,8 @@ from collections import defaultdict
 from datetime import datetime
 from typing import Dict, List, Optional, Union
 
-from lp.app.enums import InformationType
 from lp.bugs.model.bug import Bug as BugModel
 from lp.bugs.model.bugtask import BugTask
-from lp.bugs.model.cve import Cve as CveModel
 from lp.bugs.model.vulnerability import Vulnerability
 from lp.bugs.scripts.soss.models import SOSSRecord
 from lp.bugs.scripts.soss.sossimport import (
@@ -20,7 +18,6 @@ from lp.bugs.scripts.soss.sossimport import (
 )
 from lp.bugs.scripts.svthandler import SVTExporter
 from lp.registry.interfaces.role import IPersonRoles
-from lp.registry.model.distribution import Distribution
 from lp.registry.security import SecurityAdminDistribution
 
 __all__ = [
@@ -43,12 +40,6 @@ class SOSSExporter(SVTExporter):
     files.
     """
 
-    def __init__(
-        self,
-        information_type: InformationType = InformationType.PROPRIETARY,
-    ) -> None:
-        self.information_type = information_type
-
     def _get_packages(
         self, bugtasks: List[BugTask]
     ) -> Dict[SOSSRecord.PackageTypeEnum, SOSSRecord.Package]:
@@ -95,15 +86,13 @@ class SOSSExporter(SVTExporter):
 
     def to_record(
         self,
-        lp_cve: CveModel,
-        distribution: Distribution,
         bug: BugModel,
         vulnerability: Vulnerability,
     ) -> SOSSRecord:
         """Return a SOSSRecord exporting Launchpad data for the specified
         cve_sequence.
         """
-        self._validate_to_record_args(lp_cve, distribution, bug, vulnerability)
+        self._validate_to_record_args(bug, vulnerability)
 
         # Parse bug
         desc_parts = bug.description.rsplit("\n\nReferences:\n", maxsplit=1)
@@ -131,7 +120,7 @@ class SOSSExporter(SVTExporter):
             priority_reason=vulnerability.importance_explanation,
             assigned_to=assigned_to,
             packages=packages,
-            candidate=f"CVE-{lp_cve.sequence}",
+            candidate=f"CVE-{vulnerability.cve.sequence}",
             description=vulnerability.description,
             cvss=self._get_cvss(vulnerability.cvss),
             public_date=public_date,
@@ -157,16 +146,12 @@ class SOSSExporter(SVTExporter):
 
     def _validate_to_record_args(
         self,
-        lp_cve: CveModel,
-        distribution: Distribution,
         bug: BugModel,
         vulnerability: Vulnerability,
     ):
         required_args = {
-            "Cve": lp_cve,
             "Bug": bug,
             "Vulnerability": vulnerability,
-            "Distribution": distribution,
         }
 
         for name, value in required_args.items():
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
index 07802dc..1752d8e 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
@@ -79,12 +79,7 @@ class TestSOSSExporter(TestCaseWithFactory):
                 soss_record = SOSSRecord.from_yaml(f)
 
             bug, vulnerability = soss_importer.import_cve_from_file(file)
-
-            cve_sequence = file.name.lstrip("CVE-")
-            lp_cve = self.cve_set[cve_sequence]
-            exported = self.soss_exporter.to_record(
-                lp_cve, self.soss, bug, vulnerability
-            )
+            exported = self.soss_exporter.to_record(bug, vulnerability)
 
             self.assertEqual(soss_record, exported)
 
@@ -98,12 +93,7 @@ class TestSOSSExporter(TestCaseWithFactory):
         for file in self.sampledata.iterdir():
 
             bug, vulnerability = soss_importer.import_cve_from_file(file)
-
-            cve_sequence = file.name.lstrip("CVE-")
-            lp_cve = self.cve_set[cve_sequence]
-            exported = self.soss_exporter.to_record(
-                lp_cve, self.soss, bug, vulnerability
-            )
+            exported = self.soss_exporter.to_record(bug, vulnerability)
 
             with open(file) as f:
-                self.assertEqual(f.read(), exported.to_yaml())
+                self.assertEqual(f.read(), exported.to_str())
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
index a4d0285..1353c99 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
@@ -342,19 +342,19 @@ class TestSOSSRecord(TestCase):
             self.soss_record_dict,
         )
 
-    def test_to_yaml(self):
+    def test_to_str(self):
         load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979"
         with open(load_from) as f:
             sample_data = f.read()
 
-        self.assertEqual(self.soss_record.to_yaml(), sample_data),
+        self.assertEqual(self.soss_record.to_str(), sample_data),
 
     def _verify_import_export_yaml(self, file):
         with open(file) as f:
             soss_record_read = f.read()
 
         soss_record = SOSSRecord.from_yaml(soss_record_read)
-        self.assertEqual(soss_record_read, soss_record.to_yaml())
+        self.assertEqual(soss_record_read, soss_record.to_str())
 
     def test_verify_import_export_yaml(self):
         files = self.get_sample_files()
diff --git a/lib/lp/bugs/scripts/svthandler.py b/lib/lp/bugs/scripts/svthandler.py
index 9e9d096..51efb2d 100644
--- a/lib/lp/bugs/scripts/svthandler.py
+++ b/lib/lp/bugs/scripts/svthandler.py
@@ -7,21 +7,28 @@ __all__ = [
     "SVTExporter",
 ]
 
+from dataclasses import dataclass
+
 from lp.bugs.interfaces.bug import IBug
-from lp.bugs.interfaces.cve import ICve
 from lp.bugs.interfaces.vulnerability import IVulnerability
-from lp.registry.interfaces.distribution import IDistribution
 
 
+@dataclass
 class SVTRecord:
     """A dataclass that contains the exact same info as a cve file."""
 
+    @classmethod
     def from_str(string: str) -> "SVTRecord":
         """Parse a string and return a SVTRecord."""
         raise NotImplementedError()
 
+    def to_str(self) -> str:
+        """Convert the SVTRecord to a string."""
+        raise NotImplementedError()
+
 
 class SVTImporter:
+
     def from_record(
         record: SVTRecord, cve_sequence: str
     ) -> (IBug, IVulnerability):
@@ -34,12 +41,15 @@ class SVTImporter:
 
 
 class SVTExporter:
+
     def to_record(
-        lp_cve: ICve,
-        distribution: IDistribution,
         bug: IBug,
         vulnerability: IVulnerability,
     ) -> SVTRecord:
         """Export the bug and vulnerability related to a cve in a distribution
         and return a SVTRecord."""
         raise NotImplementedError()
+
+    def checkUserPermissions(self, user, distribution):
+        """Checks if the user has permissions to use this handler."""
+        raise NotImplementedError()
diff --git a/lib/lp/bugs/scripts/uct/models.py b/lib/lp/bugs/scripts/uct/models.py
index e707b49..42331d0 100644
--- a/lib/lp/bugs/scripts/uct/models.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -5,6 +5,7 @@ import logging
 import re
 import tempfile
 from collections import OrderedDict, defaultdict
+from dataclasses import dataclass
 from datetime import datetime
 from enum import Enum
 from pathlib import Path
@@ -60,6 +61,7 @@ class CVSS(NamedTuple):
     vector_string: str
 
 
+@dataclass
 class UCTRecord(SVTRecord):
     """
     UCTRecord represents a single CVE record (file) in the ubuntu-cve-tracker.
@@ -104,45 +106,24 @@ class UCTRecord(SVTRecord):
         tags: Set[str]
         patches: List["UCTRecord.Patch"]
 
-    def __init__(
-        self,
-        parent_dir: str,
-        assigned_to: str,
-        bugs: List[str],
-        cvss: List[CVSS],
-        candidate: str,
-        crd: Optional[datetime],
-        public_date: Optional[datetime],
-        public_date_at_USN: Optional[datetime],
-        description: str,
-        discovered_by: str,
-        mitigation: Optional[str],
-        notes: str,
-        priority: Priority,
-        references: List[str],
-        ubuntu_description: str,
-        packages: List[Package],
-        global_tags: Set[str],
-        priority_explanation: str = "",
-    ):
-        self.parent_dir = parent_dir
-        self.assigned_to = assigned_to
-        self.bugs = bugs
-        self.cvss = cvss
-        self.candidate = candidate
-        self.crd = crd
-        self.public_date = public_date
-        self.public_date_at_USN = public_date_at_USN
-        self.description = description
-        self.discovered_by = discovered_by
-        self.mitigation = mitigation
-        self.notes = notes
-        self.priority = priority
-        self.priority_explanation = priority_explanation
-        self.references = references
-        self.ubuntu_description = ubuntu_description
-        self.packages = packages
-        self.global_tags = global_tags
+    parent_dir: str
+    assigned_to: str
+    bugs: List[str]
+    cvss: List[CVSS]
+    candidate: str
+    crd: Optional[datetime]
+    public_date: Optional[datetime]
+    public_date_at_USN: Optional[datetime]
+    description: str
+    discovered_by: str
+    mitigation: Optional[str]
+    notes: str
+    priority: "UCTRecord.Priority"
+    references: List[str]
+    ubuntu_description: str
+    packages: List["UCTRecord.Package"]
+    global_tags: Set[str]
+    priority_explanation: str = ""
 
     def __eq__(self, other):
         if not isinstance(other, UCTRecord):

Follow ups