launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32890
Re: [Merge] ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master
Added a few comments
Will be great to see how this works with celery
Diff comments:
> diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
> index f1280a4..9a3c2a0 100644
> --- a/lib/lp/bugs/interfaces/bug.py
> +++ b/lib/lp/bugs/interfaces/bug.py
> @@ -115,6 +116,8 @@ class CreateBugParams:
> milestone=None,
> assignee=None,
> cve=None,
> + metadata=None,
> + check_permissions=None,
Can we have the default be a bool if that's its datatype?
> ):
> self.owner = owner
> self.title = title
> diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
> index 474309f..6ff2c1c 100644
> --- a/lib/lp/bugs/interfaces/vulnerability.py
> +++ b/lib/lp/bugs/interfaces/vulnerability.py
> @@ -97,6 +97,15 @@ class VulnerabilityChange(DBEnumeratedType):
> """,
> )
>
> + PRIVACY = DBItem(
Should it be `CVSS = DBItem`?
> + 6,
> + """
> + CVSS
> +
> + Common Vulnerability Scoring System
> + """,
> + )
> +
>
> class IVulnerabilityView(Interface):
> """`IVulnerability` attributes that require launchpad.View."""
> diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
> index d147171..21da533 100644
> --- a/lib/lp/bugs/model/vulnerability.py
> +++ b/lib/lp/bugs/model/vulnerability.py
> @@ -452,6 +459,10 @@ def get_vulnerability_privacy_filter(user):
> elif IPersonRoles.providedBy(user):
> user = user.person
>
> + role = IPersonRoles(user)
> + if role.in_bug_importer or role.in_admin:
> + return True
Let's discuss at the daily sync about this:
We aren't right now considering embargoed data, but I think even admins shouldn't have access to the embargoed information_type things, for example
> +
> artifact_grant_query = Coalesce(
> ArrayIntersects(
> SQL("Vulnerability.access_grants"),
> diff --git a/lib/lp/bugs/scripts/soss/sossimport.py b/lib/lp/bugs/scripts/soss/sossimport.py
> new file mode 100644
> index 0000000..4695dcf
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/soss/sossimport.py
> @@ -0,0 +1,464 @@
> +# Copyright 2025 Canonical Ltd. This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""A SOSS (SOSS CVE Tracker) bug importer"""
> +import logging
> +import os
> +from collections import defaultdict
> +from datetime import timezone
> +from typing import Dict, List, Optional, Tuple
> +
> +import transaction
> +from zope.component import getUtility
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityStatus
> +from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
> +from lp.bugs.interfaces.bugtask import (
> + BugTaskImportance,
> + BugTaskStatus,
> + IBugTaskSet,
> +)
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.registry.interfaces.distribution import IDistributionSet
> +from lp.registry.interfaces.externalpackage import ExternalPackageType
> +from lp.registry.interfaces.person import IPersonSet
> +from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
> +from lp.registry.model.distribution import Distribution
> +from lp.registry.model.externalpackage import ExternalPackage
> +from lp.registry.model.person import Person
> +from lp.testing import person_logged_in
> +
> +__all__ = [
> + "SOSSImporter",
> +]
> +
> +logger = logging.getLogger(__name__)
> +
> +# Constants moved to module level with proper naming
> +PRIORITY_ENUM_MAP = {
> + SOSSRecord.PriorityEnum.NEEDS_TRIAGE: BugTaskImportance.UNDECIDED,
> + SOSSRecord.PriorityEnum.NEGLIGIBLE: BugTaskImportance.WISHLIST,
> + SOSSRecord.PriorityEnum.LOW: BugTaskImportance.LOW,
> + SOSSRecord.PriorityEnum.MEDIUM: BugTaskImportance.MEDIUM,
> + SOSSRecord.PriorityEnum.HIGH: BugTaskImportance.HIGH,
> + SOSSRecord.PriorityEnum.CRITICAL: BugTaskImportance.CRITICAL,
> +}
> +
> +PACKAGE_TYPE_MAP = {
> + SOSSRecord.PackageTypeEnum.UNPACKAGED: ExternalPackageType.GENERIC,
> + SOSSRecord.PackageTypeEnum.PYTHON: ExternalPackageType.PYTHON,
> + SOSSRecord.PackageTypeEnum.MAVEN: ExternalPackageType.MAVEN,
> + SOSSRecord.PackageTypeEnum.CONDA: ExternalPackageType.CONDA,
> + SOSSRecord.PackageTypeEnum.RUST: ExternalPackageType.CARGO,
> +}
> +
> +PACKAGE_STATUS_MAP = {
> + SOSSRecord.PackageStatusEnum.IGNORED: BugTaskStatus.WONTFIX,
> + SOSSRecord.PackageStatusEnum.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
> + SOSSRecord.PackageStatusEnum.RELEASED: BugTaskStatus.FIXRELEASED,
> + SOSSRecord.PackageStatusEnum.NOT_AFFECTED: BugTaskStatus.INVALID,
> + SOSSRecord.PackageStatusEnum.DEFERRED: BugTaskStatus.DEFERRED,
> + SOSSRecord.PackageStatusEnum.NEEDED: BugTaskStatus.NEW,
> +}
> +
> +DISTRIBUTION_NAME = "soss"
> +
> +
> +class SOSSImporter:
> + """
> + SOSSImporter is used to import SOSS CVE files to Launchpad database.
> + """
> +
> + def __init__(
> + self,
> + information_type: InformationType = InformationType.PRIVATESECURITY,
> + dry_run: bool = False,
> + ) -> None:
> + self.information_type = information_type
> + self.dry_run = dry_run
> + self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> + self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
> + self.person_set = getUtility(IPersonSet)
> + self.source_package_name_set = getUtility(ISourcePackageNameSet)
> + self.bugtask_set = getUtility(IBugTaskSet)
> + self.vulnerability_set = getUtility(IVulnerabilitySet)
> + self.bug_set = getUtility(IBugSet)
> + self.cve_set = getUtility(ICveSet)
> +
> + def import_cve_from_file(
> + self, cve_path: str
> + ) -> Tuple[BugModel, Vulnerability]:
> + """Import CVE from file path."""
> + cve_sequence = os.path.basename(cve_path)
> + logger.info(f"Importing {cve_sequence}")
What do you think of adding a prefix to all logs created by this class so that it's easy to follow logs from an import?
It could just be:
[SOSSImporter] <log message>
Or something even more specific
> +
> + with open(cve_path, encoding="utf-8") as file:
> + soss_record = SOSSRecord.from_yaml(file)
> +
> + with person_logged_in(self.bug_importer):
> + bug, vulnerability = self.import_cve(soss_record, cve_sequence)
> +
> + return bug, vulnerability
> +
> + def import_cve(
> + self, soss_record: SOSSRecord, cve_sequence: str
> + ) -> Tuple[BugModel, Vulnerability]:
> + """Import CVE from SOSS record."""
> + if not self._validate_soss_record(soss_record, cve_sequence):
> + return None, None
> +
> + lp_cve = self._get_launchpad_cve(cve_sequence)
> + if lp_cve is None:
> + return None, None
> +
> + bug = self._find_existing_bug(soss_record, lp_cve, self.soss)
> + if not bug:
> + bug = self._create_bug(soss_record, lp_cve)
> + else:
> + bug = self._update_bug(bug, soss_record, lp_cve)
It would be great to keep track of the bugtasks and/or vulnerabilities we create vs update as part of this script
I see we log it in the actual functions - nice - but maybe we can keep track of it and add it as an output... Not sure, let's discuss together what this might be.
In any case, I think this looks good, and we can just add to it later
> +
> + vulnerability = self._find_existing_vulnerability(bug, self.soss)
> + if not vulnerability:
> + vulnerability = self._create_vulnerability(
> + bug, soss_record, lp_cve, self.soss
> + )
> + else:
> + vulnerability = self._update_vulnerability(
> + vulnerability, soss_record
> + )
> +
> + if not self.dry_run:
> + transaction.commit()
> + logger.info(f"Successfully committed changes for {cve_sequence}")
> +
> + return bug, vulnerability
> +
> + def _create_bug(
> + self, soss_record: SOSSRecord, lp_cve: CveModel
> + ) -> BugModel:
> + """
> + Create a Bug model based on the information contained in a
> + SOSSRecord.
> +
> + :param soss_record: SOSSRecord with information from a SOSS cve
> + :param lp_cve: Launchpad Cve model
> + """
> + packagetype, package = self._get_first_package_info(soss_record)
> + assignee = self._get_assignee(soss_record.assigned_to)
> +
> + externalpackage = self._create_external_package(package, packagetype)
> + metadata = {"repositories": package.repositories}
> +
> + # Create the bug, only first bugtask
> + bug, _ = self.bug_set.createBug(
> + CreateBugParams(
> + comment=self._make_bug_description(soss_record),
> + title=lp_cve.sequence,
> + information_type=self.information_type,
> + owner=self.bug_importer,
> + target=externalpackage,
> + status=PACKAGE_STATUS_MAP[package.status],
> + status_explanation=package.note,
> + assignee=assignee,
> + importance=PRIORITY_ENUM_MAP[soss_record.priority],
> + cve=lp_cve,
> + metadata=metadata,
> + check_permissions=False,
> + ),
> + notify_event=False,
> + )
> +
> + # Create next bugtasks
> + self._update_bugtasks(bug, soss_record)
> +
> + logger.info(f"Created bug with ID: {bug.id}")
> + return bug
> +
> + def _update_bug(
> + self, bug: BugModel, soss_record: SOSSRecord, lp_cve: CveModel
> + ) -> BugModel:
> + """
> + Update a Bug model with the information contained in a SOSSRecord.
> +
> + :param bug: Bug model to be updated
> + :param soss_record: SOSSRecord with information from a SOSS cve
> + :param lp_cve: Launchpad Cve model
> + """
> + bug.description = self._make_bug_description(soss_record)
This is the sort of things we need to ask Security to define. But happy with going with one thing and then refining it later
> + bug.title = lp_cve.sequence
> + bug.transitionToInformationType(
> + self.information_type, self.bug_importer
> + )
> + self._update_bugtasks(bug, soss_record)
> +
> + logger.info(f"Updated Bug with ID: {bug.id}")
> + return bug
> +
> + def _create_vulnerability(
> + self,
> + bug: BugModel,
> + soss_record: SOSSRecord,
> + lp_cve: CveModel,
> + distribution: Distribution,
> + ) -> Vulnerability:
> + """
> + Create a Vulnerability instance based on the information from
> + the given SOSSRecord instance and link to the specified Bug
> + and LP's Cve model.
> +
> + :param bug: Bug model associated with the vulnerability
> + :param soss_record: SOSSRecord with information from a SOSS cve
> + :param lp_cve: Launchpad Cve model
> + :param distribution: a Distribution affected by the vulnerability
> + :return: a Vulnerability
> + """
> + vulnerability: Vulnerability = self.vulnerability_set.new(
> + distribution=distribution,
> + status=VulnerabilityStatus.NEEDS_TRIAGE,
> + importance=PRIORITY_ENUM_MAP[soss_record.priority],
> + creator=bug.owner,
> + information_type=self.information_type,
> + cve=lp_cve,
> + description=soss_record.description,
> + notes="\n".join(soss_record.notes),
> + mitigation=None,
> + importance_explanation=soss_record.priority_reason,
> + date_made_public=self._normalize_date_with_timezone(
> + soss_record.public_date
> + ),
> + date_notice_issued=None,
> + date_coordinated_release=None,
> + cvss=self._prepare_cvss_data(soss_record),
> + )
> + vulnerability.linkBug(bug, bug.owner)
> +
> + logger.info(
> + f"Created vulnerability with ID: {vulnerability.id} "
> + f"for {distribution.name}",
> + )
> +
> + return vulnerability
> +
> + def _update_vulnerability(
> + self, vulnerability: Vulnerability, soss_record: SOSSRecord
> + ) -> None:
> + """
> + Update a Vulnerability model with the information
> + contained in a SOSSRecord
> +
> + :param vulnerability: Vulnerability model to be updated
> + :param soss_record: SOSSRecord with information from a SOSS cve
> + """
> + vulnerability.status = VulnerabilityStatus.NEEDS_TRIAGE
> + vulnerability.description = soss_record.description
> + vulnerability.notes = "\n".join(soss_record.notes)
> + vulnerability.mitigation = None
> + vulnerability.importance = PRIORITY_ENUM_MAP[soss_record.priority]
> + vulnerability.importance_explanation = soss_record.priority_reason
> + vulnerability.date_made_public = self._normalize_date_with_timezone(
> + soss_record.public_date
> + )
> + vulnerability.date_notice_issued = None
> + vulnerability.date_coordinated_release = None
> + vulnerability.cvss = self._prepare_cvss_data(soss_record)
> +
> + logger.info(
> + f"Updated Vulnerability with ID: {vulnerability.id} "
> + f"for {vulnerability.distribution.name}",
> + )
> +
> + def _find_existing_bug(
> + self,
> + soss_record: SOSSRecord,
> + lp_cve: CveModel,
> + distribution: Distribution,
> + ) -> Optional[BugModel]:
> + """Find existing bug for the given CVE."""
> + for vulnerability in lp_cve.vulnerabilities:
> + if vulnerability.distribution == distribution:
> + bugs = vulnerability.bugs
> + if len(bugs) > 1:
> + raise ValueError(
> + "Multiple existing bugs found for CVE ",
> + soss_record.sequence,
> + )
> + if bugs:
> + return bugs[0]
> + return None
> +
> + def _find_existing_vulnerability(
> + self, bug: BugModel, distribution: Distribution
> + ) -> Optional[Vulnerability]:
> + """Find existing vulnerability for the current distribution"""
> + if not bug:
> + return None
> +
> + vulnerability = next(
> + (v for v in bug.vulnerabilities if v.distribution == distribution),
> + None,
> + )
> + return vulnerability
> +
> + def _update_bugtasks(self, bug: BugModel, soss_record: SOSSRecord) -> None:
> + """
> + Add bug tasks to the given Bug model based on the information
Maybe the name should say create_or_update? And this docstring should also mention why this both creates or updates
> + from a SOSSRecord.
> +
> + This may be called multiple times, only new targets will be created.
> + Existing targets will be updated. Packages that were removed from the
> + SOSSRecord will be removed.
> +
> + :param bug: Bug model to be updated
> + :param packages: list of SOSSRecord.Packages from a SOSSRecord
> + """
> + packages: List[SOSSRecord.Package] = soss_record.packages.items()
> + assignee = self._get_assignee(soss_record.assigned_to)
> +
> + # Build a lookup dict for existing bug tasks
> + bugtask_by_target = {task.target: task for task in bug.bugtasks}
> +
> + for packagetype, package_list in packages:
> + for package in package_list:
> + target = self._create_external_package(package, packagetype)
get_or_create?
> + metadata = (
> + {"repositories": package.repositories}
> + if package.repositories
> + else None
> + )
> +
> + if target not in bugtask_by_target:
> + self.bugtask_set.createTask(
> + bug,
> + self.bug_importer,
> + target,
> + status=PACKAGE_STATUS_MAP[package.status],
> + importance=PRIORITY_ENUM_MAP[soss_record.priority],
> + assignee=assignee,
> + metadata=metadata,
> + )
> + else:
> + bugtask = bugtask_by_target[target]
> +
> + # This should not appear again, we use this to remove the
> + # not used bugtasks
> + bugtask_by_target.pop(target)
> +
> + bugtask.transitionToStatus(
> + PACKAGE_STATUS_MAP[package.status]
> + )
> + bugtask.transitionToImportance(
> + PRIORITY_ENUM_MAP[soss_record.priority]
> + )
> + # We always have rights to change assignees
> + bugtask.transitionToAssignee(assignee, validate=False)
> + bugtask.metadata = metadata
> +
> + # Remove bugtasks that were deleted from the record
> + for bugtask in bugtask_by_target.values():
> + bugtask.destroySelf()
> +
> + def _get_launchpad_cve(self, cve_sequence: str) -> Optional[CveModel]:
> + """Get CVE from Launchpad."""
> + lp_cve: CveModel = removeSecurityProxy(self.cve_set[cve_sequence])
> + if lp_cve is None:
> + logger.warning(
> + "%s: could not find the CVE in LP. Aborting. "
> + "%s was not imported.",
> + cve_sequence,
> + cve_sequence,
> + )
> + return lp_cve
> +
> + def _make_bug_description(self, soss_record: SOSSRecord) -> str:
> + """
> + Some SOSSRecord fields can't be mapped to Launchpad models.
> +
> + They are saved to bug description.
> +
> + :param soss_record: SOSSRecord with information from UCT
> + :return: bug description
> + """
> + parts = [soss_record.description] if soss_record.description else []
> + if soss_record.references:
> + parts.extend(["", "References:"])
> + parts.extend(soss_record.references)
> + return "\n".join(parts) if parts else "-"
> +
> + def _get_assignee(self, assigned_to: Optional[str]) -> Person:
> + """Get assignee person object if assigned_to is provided."""
> + if not assigned_to:
> + return None
> +
> + person = self.person_set.getByName(assigned_to)
> + if not person:
> + logger.warning(f"Assignee not found: {assigned_to}")
> +
> + return person
> +
> + def _create_external_package(
> + self,
> + package: SOSSRecord.Package,
> + packagetype: SOSSRecord.PackageTypeEnum,
> + ) -> ExternalPackage:
> + """Create external package for the given package."""
> + source_package_name = self.source_package_name_set.getOrCreateByName(
> + package.name
> + )
> + return self.soss.getExternalPackage(
> + name=source_package_name,
> + packagetype=PACKAGE_TYPE_MAP[packagetype],
> + channel=package.channel.value,
> + )
> +
> + def _prepare_cvss_data(self, soss_record: SOSSRecord) -> Dict:
> + """Prepare CVSS data from SOSS record."""
> + cvss_data = defaultdict(list)
> + for cvss in soss_record.cvss:
> + cvss_data[cvss.source].append(cvss.to_dict())
> + return dict(cvss_data)
> +
> + def _normalize_date_with_timezone(self, date_obj) -> Optional:
> + """Normalize date to UTC timezone if needed."""
> + if date_obj and date_obj.tzinfo is None:
> + return date_obj.replace(tzinfo=timezone.utc)
> + return date_obj
> +
> + def _validate_soss_record(
> + self, soss_record: SOSSRecord, cve_sequence: str
> + ) -> bool:
> + """Validate SOSS record before processing."""
> + if soss_record.candidate and soss_record.candidate != cve_sequence:
> + logger.warning(
> + "CVE sequence mismatch: %s != %s",
> + soss_record.candidate,
> + cve_sequence,
> + )
> + return False
> +
> + if not soss_record.packages:
> + logger.warning(
> + "%s: could not find any affected packages, aborting."
> + "%s was not imported.",
> + cve_sequence,
> + cve_sequence,
> + )
> + return False
> +
> + return True
> +
> + def _get_first_package_info(
> + self, soss_record: SOSSRecord
> + ) -> Tuple[SOSSRecord.PackageTypeEnum, SOSSRecord.Package]:
> + """Get first package type and package from SOSS record."""
> + first_item = next(iter(soss_record.packages.items()))
> + packagetype = first_item[0]
> + package = first_item[1][0]
> + return packagetype, package
> diff --git a/lib/lp/bugs/scripts/sossimport.py b/lib/lp/bugs/scripts/sossimport.py
> new file mode 100644
> index 0000000..1a905bf
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/sossimport.py
> @@ -0,0 +1,82 @@
> +import logging
> +from pathlib import Path
> +
> +from lp.app.enums import InformationType
> +from lp.app.validators.cve import CVEREF_PATTERN
> +from lp.bugs.scripts.soss import SOSSImporter
> +from lp.services.scripts.base import LaunchpadScript
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +class SOSSImportScript(LaunchpadScript):
> + """CLI for SOSSImport
Do we need to have this as a script? We can keep it if it makes things easier, just wondering if it's necessary to call these with celery
We do have a UCT import script similarly.
Is this just another entry point for the same functionality as celery will call?
Or do you think we will just call this from the celery task itself?
> +
> + Command line options:
> + The filter option takes a glob-style pattern.
> + Example: `2007*` filters all CVEs from the year 2007.
> + """
> +
> + usage = "usage: %prog [options] PATH"
> + description = (
> + "Import bugs into Launchpad from CVE entries in soss-cve-tracker. "
> + "PATH is either path to a CVE file, or path to a directory "
> + "containing the CVE files."
> + )
> + loglevel = logging.INFO
> +
> + def add_my_options(self):
> + self.parser.add_option(
> + "--dry-run",
> + action="store_true",
> + dest="dry_run",
> + default=False,
> + help="Don't commit changes to the DB.",
> + )
> + self.parser.add_option(
> + "--filter",
> + action="store",
> + dest="filter",
> + default="*",
> + help="Apply given glob-style pattern to filter CVEs.",
> + )
> + self.parser.add_option(
> + "--information-type",
> + action="store",
> + dest="information_type",
> + default="private",
> + help=(
> + "The types used to control which users and teams can see the "
> + "bugs and vulnerabilities. Allowed values: public, private"
> + ),
> + )
> +
> + def main(self):
> + if len(self.args) != 1:
> + self.parser.error("Please specify a path to import")
> + path = Path(self.args[0])
> + if path.is_dir():
> + logger.info(
> + "Importing CVE files from directory: %s", path.resolve()
> + )
> + cve_paths = sorted(
> + p
> + for p in path.rglob("CVE-%s" % self.options.filter)
> + if p.is_file() and CVEREF_PATTERN.match(p.name)
> + )
> + if not cve_paths:
> + logger.warning("Could not find CVE files in %s", path)
> + return
> + else:
> + cve_paths = [path]
> +
> + information_type = InformationType.PRIVATESECURITY
> + if self.options.information_type == "public":
> + information_type = InformationType.PUBLICSECURITY
> +
> + logger.info(f"Bug/Vulnerability visibility: {information_type}")
> + importer = SOSSImporter(
> + information_type=information_type, dry_run=self.options.dry_run
> + )
> + for cve_path in cve_paths:
> + importer.import_cve_from_file(cve_path)
> diff --git a/lib/lp/bugs/scripts/tests/test_sossimport.py b/lib/lp/bugs/scripts/tests/test_sossimport.py
> new file mode 100644
> index 0000000..dc159a8
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/tests/test_sossimport.py
> @@ -0,0 +1,139 @@
> +from pathlib import Path
> +
> +import transaction
> +
> +from lp.testing import TestCaseWithFactory
> +from lp.testing.layers import LaunchpadZopelessLayer
> +from lp.testing.script import run_script
> +
> +
> +class TestSOSSImportScript(TestCaseWithFactory):
> + """Test the TestSOSSImportScript class."""
> +
> + layer = LaunchpadZopelessLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.factory.makeCVE(sequence="2025-1979")
> + self.soss = self.factory.makeDistribution(
> + name="soss", displayname="SOSS"
> + )
> + transaction.commit()
> +
> + self.sampledata = (
> + Path(__file__).parent / ".." / "soss" / "tests" / "sampledata"
> + )
> +
> + def test_no_path_given(self):
> + """TestSOSSImportScript errors when no path given"""
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[],
> + )
> + self.assertEqual(2, exit_code)
> + self.assertEqual("", out)
> + self.assertEqual(
> + "Usage: soss-import.py [options] PATH\n\nsoss-import.py: "
> + "error: Please specify a path to import\n",
> + err,
> + )
> +
> + def test_load_from_file(self):
> + load_from = self.sampledata / "CVE-2025-1979"
Can you add docstrings to the test to understand what they are testing more easily?
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "-vvv"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Importing CVE-2025-1979", err)
> + self.assertIn("Created bug with ID:", err)
> + self.assertIn("Created vulnerability with ID:", err)
> + self.assertIn("commit", err)
I assumed `err` meant `error`, but then this doesn't seem to be testing an error, is it a log?
> +
> + def test_load_from_directory(self):
> + load_from = Path(__file__).parent / ".." / "uct"
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "-vvv"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Could not find CVE files in", err)
> +
> + load_from = self.sampledata
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "-vvv"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Aborting. CVE-2005-1544 was not imported", err)
> + self.assertIn("Aborting. CVE-2011-5000 was not imported", err)
> + self.assertIn("Aborting. CVE-2021-21300 was not imported", err)
> + self.assertIn("Importing CVE-2025-1979", err)
> + self.assertIn("commit", err)
> +
> + def test_dry_run_does_not_crash(self):
> + load_from = self.sampledata / "CVE-2025-1979"
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--dry-run", "-vvv"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Importing CVE-2025-1979", err)
> + self.assertNotIn("commit", err)
> +
> + def test_filter_cve(self):
> + load_from = self.sampledata
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--filter", "2005*"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("CVE-2005-1544", err)
> + self.assertNotIn("CVE-2011-5000", err)
> + self.assertNotIn("CVE-2021-21300", err)
> + self.assertNotIn("CVE-2025-1979", err)
> +
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--filter", "202*"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertNotIn("CVE-2005-1544", err)
> + self.assertNotIn("CVE-2011-5000", err)
> + self.assertIn("CVE-2021-21300", err)
> + self.assertIn("CVE-2025-1979", err)
> +
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--filter", "20[02][15]*"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("CVE-2005-1544", err)
> + self.assertNotIn("CVE-2011-5000", err)
> + self.assertIn("CVE-2021-21300", err)
> + self.assertIn("CVE-2025-1979", err)
> +
> + def test_information_type(self):
> + load_from = self.sampledata / "CVE-2025-1979"
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--information-type", "public"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Bug/Vulnerability visibility: Public Security", err)
> +
> + exit_code, out, err = run_script(
> + script="scripts/soss-import.py",
> + args=[str(load_from), "--information-type", "private"],
> + )
> + self.assertEqual(0, exit_code)
> + self.assertEqual("", out)
> + self.assertIn("Bug/Vulnerability visibility: Private Security", err)
> diff --git a/scripts/soss-import.py b/scripts/soss-import.py
> new file mode 100755
> index 0000000..26a2131
> --- /dev/null
> +++ b/scripts/soss-import.py
> @@ -0,0 +1,11 @@
> +#!/usr/bin/python3 -S
> +#
> +# Copyright 2022 Canonical Ltd. This software is licensed under the
2025
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +import _pythonpath # noqa: F401
> +
> +from lp.bugs.scripts.sossimport import SOSSImportScript
> +
> +if __name__ == "__main__":
> + script = SOSSImportScript("lp.services.scripts.uctimport")
Should be `sossimport` instead of `uctimport`
> + script.run()
--
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491173
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master.
References