← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master

 

Thank you! Addressed the comments

Diff comments:

> diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
> index f1280a4..9a3c2a0 100644
> --- a/lib/lp/bugs/interfaces/bug.py
> +++ b/lib/lp/bugs/interfaces/bug.py
> @@ -115,6 +116,8 @@ class CreateBugParams:
>          milestone=None,
>          assignee=None,
>          cve=None,
> +        metadata=None,
> +        check_permissions=None,

done

>      ):
>          self.owner = owner
>          self.title = title
> diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
> index 474309f..6ff2c1c 100644
> --- a/lib/lp/bugs/interfaces/vulnerability.py
> +++ b/lib/lp/bugs/interfaces/vulnerability.py
> @@ -97,6 +97,15 @@ class VulnerabilityChange(DBEnumeratedType):
>          """,
>      )
>  
> +    PRIVACY = DBItem(

good catch, thanks! :)

> +        6,
> +        """
> +        CVSS
> +
> +        Common Vulnerability Scoring System
> +        """,
> +    )
> +
>  
>  class IVulnerabilityView(Interface):
>      """`IVulnerability` attributes that require launchpad.View."""
> diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
> index d147171..21da533 100644
> --- a/lib/lp/bugs/model/vulnerability.py
> +++ b/lib/lp/bugs/model/vulnerability.py
> @@ -452,6 +459,10 @@ def get_vulnerability_privacy_filter(user):
>      elif IPersonRoles.providedBy(user):
>          user = user.person
>  
> +    role = IPersonRoles(user)
> +    if role.in_bug_importer or role.in_admin:
> +        return True

Right, done!

> +
>      artifact_grant_query = Coalesce(
>          ArrayIntersects(
>              SQL("Vulnerability.access_grants"),
> diff --git a/lib/lp/bugs/scripts/soss/sossimport.py b/lib/lp/bugs/scripts/soss/sossimport.py
> new file mode 100644
> index 0000000..4695dcf
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/soss/sossimport.py
> @@ -0,0 +1,464 @@
> +#  Copyright 2025 Canonical Ltd.  This software is licensed under the
> +#  GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""A SOSS (SOSS CVE Tracker) bug importer"""
> +import logging
> +import os
> +from collections import defaultdict
> +from datetime import timezone
> +from typing import Dict, List, Optional, Tuple
> +
> +import transaction
> +from zope.component import getUtility
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityStatus
> +from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
> +from lp.bugs.interfaces.bugtask import (
> +    BugTaskImportance,
> +    BugTaskStatus,
> +    IBugTaskSet,
> +)
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.registry.interfaces.distribution import IDistributionSet
> +from lp.registry.interfaces.externalpackage import ExternalPackageType
> +from lp.registry.interfaces.person import IPersonSet
> +from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
> +from lp.registry.model.distribution import Distribution
> +from lp.registry.model.externalpackage import ExternalPackage
> +from lp.registry.model.person import Person
> +from lp.testing import person_logged_in
> +
> +__all__ = [
> +    "SOSSImporter",
> +]
> +
> +logger = logging.getLogger(__name__)
> +
> +# Constants moved to module level with proper naming
> +PRIORITY_ENUM_MAP = {
> +    SOSSRecord.PriorityEnum.NEEDS_TRIAGE: BugTaskImportance.UNDECIDED,
> +    SOSSRecord.PriorityEnum.NEGLIGIBLE: BugTaskImportance.WISHLIST,
> +    SOSSRecord.PriorityEnum.LOW: BugTaskImportance.LOW,
> +    SOSSRecord.PriorityEnum.MEDIUM: BugTaskImportance.MEDIUM,
> +    SOSSRecord.PriorityEnum.HIGH: BugTaskImportance.HIGH,
> +    SOSSRecord.PriorityEnum.CRITICAL: BugTaskImportance.CRITICAL,
> +}
> +
> +PACKAGE_TYPE_MAP = {
> +    SOSSRecord.PackageTypeEnum.UNPACKAGED: ExternalPackageType.GENERIC,
> +    SOSSRecord.PackageTypeEnum.PYTHON: ExternalPackageType.PYTHON,
> +    SOSSRecord.PackageTypeEnum.MAVEN: ExternalPackageType.MAVEN,
> +    SOSSRecord.PackageTypeEnum.CONDA: ExternalPackageType.CONDA,
> +    SOSSRecord.PackageTypeEnum.RUST: ExternalPackageType.CARGO,
> +}
> +
> +PACKAGE_STATUS_MAP = {
> +    SOSSRecord.PackageStatusEnum.IGNORED: BugTaskStatus.WONTFIX,
> +    SOSSRecord.PackageStatusEnum.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
> +    SOSSRecord.PackageStatusEnum.RELEASED: BugTaskStatus.FIXRELEASED,
> +    SOSSRecord.PackageStatusEnum.NOT_AFFECTED: BugTaskStatus.INVALID,
> +    SOSSRecord.PackageStatusEnum.DEFERRED: BugTaskStatus.DEFERRED,
> +    SOSSRecord.PackageStatusEnum.NEEDED: BugTaskStatus.NEW,
> +}
> +
> +DISTRIBUTION_NAME = "soss"
> +
> +
> +class SOSSImporter:
> +    """
> +    SOSSImporter is used to import SOSS CVE files to Launchpad database.
> +    """
> +
> +    def __init__(
> +        self,
> +        information_type: InformationType = InformationType.PRIVATESECURITY,
> +        dry_run: bool = False,
> +    ) -> None:
> +        self.information_type = information_type
> +        self.dry_run = dry_run
> +        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +        self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
> +        self.person_set = getUtility(IPersonSet)
> +        self.source_package_name_set = getUtility(ISourcePackageNameSet)
> +        self.bugtask_set = getUtility(IBugTaskSet)
> +        self.vulnerability_set = getUtility(IVulnerabilitySet)
> +        self.bug_set = getUtility(IBugSet)
> +        self.cve_set = getUtility(ICveSet)
> +
> +    def import_cve_from_file(
> +        self, cve_path: str
> +    ) -> Tuple[BugModel, Vulnerability]:
> +        """Import CVE from file path."""
> +        cve_sequence = os.path.basename(cve_path)
> +        logger.info(f"Importing {cve_sequence}")

Done,     `fmt="[SOSSImporter] %(asctime)s %(levelname)s %(message)s"`

> +
> +        with open(cve_path, encoding="utf-8") as file:
> +            soss_record = SOSSRecord.from_yaml(file)
> +
> +        with person_logged_in(self.bug_importer):
> +            bug, vulnerability = self.import_cve(soss_record, cve_sequence)
> +
> +        return bug, vulnerability
> +
> +    def import_cve(
> +        self, soss_record: SOSSRecord, cve_sequence: str
> +    ) -> Tuple[BugModel, Vulnerability]:
> +        """Import CVE from SOSS record."""
> +        if not self._validate_soss_record(soss_record, cve_sequence):
> +            return None, None
> +
> +        lp_cve = self._get_launchpad_cve(cve_sequence)
> +        if lp_cve is None:
> +            return None, None
> +
> +        bug = self._find_existing_bug(soss_record, lp_cve, self.soss)
> +        if not bug:
> +            bug = self._create_bug(soss_record, lp_cve)
> +        else:
> +            bug = self._update_bug(bug, soss_record, lp_cve)

I think this is better to add later, but I agree with you.

> +
> +        vulnerability = self._find_existing_vulnerability(bug, self.soss)
> +        if not vulnerability:
> +            vulnerability = self._create_vulnerability(
> +                bug, soss_record, lp_cve, self.soss
> +            )
> +        else:
> +            vulnerability = self._update_vulnerability(
> +                vulnerability, soss_record
> +            )
> +
> +        if not self.dry_run:
> +            transaction.commit()
> +            logger.info(f"Successfully committed changes for {cve_sequence}")
> +
> +        return bug, vulnerability
> +
> +    def _create_bug(
> +        self, soss_record: SOSSRecord, lp_cve: CveModel
> +    ) -> BugModel:
> +        """
> +        Create a Bug model based on the information contained in a
> +        SOSSRecord.
> +
> +        :param soss_record: SOSSRecord with information from a SOSS cve
> +        :param lp_cve: Launchpad Cve model
> +        """
> +        packagetype, package = self._get_first_package_info(soss_record)
> +        assignee = self._get_assignee(soss_record.assigned_to)
> +
> +        externalpackage = self._create_external_package(package, packagetype)
> +        metadata = {"repositories": package.repositories}
> +
> +        # Create the bug, only first bugtask
> +        bug, _ = self.bug_set.createBug(
> +            CreateBugParams(
> +                comment=self._make_bug_description(soss_record),
> +                title=lp_cve.sequence,
> +                information_type=self.information_type,
> +                owner=self.bug_importer,
> +                target=externalpackage,
> +                status=PACKAGE_STATUS_MAP[package.status],
> +                status_explanation=package.note,
> +                assignee=assignee,
> +                importance=PRIORITY_ENUM_MAP[soss_record.priority],
> +                cve=lp_cve,
> +                metadata=metadata,
> +                check_permissions=False,
> +            ),
> +            notify_event=False,
> +        )
> +
> +        # Create next bugtasks
> +        self._update_bugtasks(bug, soss_record)
> +
> +        logger.info(f"Created bug with ID: {bug.id}")
> +        return bug
> +
> +    def _update_bug(
> +        self, bug: BugModel, soss_record: SOSSRecord, lp_cve: CveModel
> +    ) -> BugModel:
> +        """
> +        Update a Bug model with the information contained in a SOSSRecord.
> +
> +        :param bug: Bug model to be updated
> +        :param soss_record: SOSSRecord with information from a SOSS cve
> +        :param lp_cve: Launchpad Cve model
> +        """
> +        bug.description = self._make_bug_description(soss_record)

:+1:

> +        bug.title = lp_cve.sequence
> +        bug.transitionToInformationType(
> +            self.information_type, self.bug_importer
> +        )
> +        self._update_bugtasks(bug, soss_record)
> +
> +        logger.info(f"Updated Bug with ID: {bug.id}")
> +        return bug
> +
> +    def _create_vulnerability(
> +        self,
> +        bug: BugModel,
> +        soss_record: SOSSRecord,
> +        lp_cve: CveModel,
> +        distribution: Distribution,
> +    ) -> Vulnerability:
> +        """
> +        Create a Vulnerability instance based on the information from
> +        the given SOSSRecord instance and link to the specified Bug
> +        and LP's Cve model.
> +
> +        :param bug: Bug model associated with the vulnerability
> +        :param soss_record: SOSSRecord with information from a SOSS cve
> +        :param lp_cve: Launchpad Cve model
> +        :param distribution: a Distribution affected by the vulnerability
> +        :return: a Vulnerability
> +        """
> +        vulnerability: Vulnerability = self.vulnerability_set.new(
> +            distribution=distribution,
> +            status=VulnerabilityStatus.NEEDS_TRIAGE,
> +            importance=PRIORITY_ENUM_MAP[soss_record.priority],
> +            creator=bug.owner,
> +            information_type=self.information_type,
> +            cve=lp_cve,
> +            description=soss_record.description,
> +            notes="\n".join(soss_record.notes),
> +            mitigation=None,
> +            importance_explanation=soss_record.priority_reason,
> +            date_made_public=self._normalize_date_with_timezone(
> +                soss_record.public_date
> +            ),
> +            date_notice_issued=None,
> +            date_coordinated_release=None,
> +            cvss=self._prepare_cvss_data(soss_record),
> +        )
> +        vulnerability.linkBug(bug, bug.owner)
> +
> +        logger.info(
> +            f"Created vulnerability with ID: {vulnerability.id} "
> +            f"for {distribution.name}",
> +        )
> +
> +        return vulnerability
> +
> +    def _update_vulnerability(
> +        self, vulnerability: Vulnerability, soss_record: SOSSRecord
> +    ) -> None:
> +        """
> +        Update a Vulnerability model with the information
> +        contained in a SOSSRecord
> +
> +        :param vulnerability: Vulnerability model to be updated
> +        :param soss_record: SOSSRecord with information from a SOSS cve
> +        """
> +        vulnerability.status = VulnerabilityStatus.NEEDS_TRIAGE
> +        vulnerability.description = soss_record.description
> +        vulnerability.notes = "\n".join(soss_record.notes)
> +        vulnerability.mitigation = None
> +        vulnerability.importance = PRIORITY_ENUM_MAP[soss_record.priority]
> +        vulnerability.importance_explanation = soss_record.priority_reason
> +        vulnerability.date_made_public = self._normalize_date_with_timezone(
> +            soss_record.public_date
> +        )
> +        vulnerability.date_notice_issued = None
> +        vulnerability.date_coordinated_release = None
> +        vulnerability.cvss = self._prepare_cvss_data(soss_record)
> +
> +        logger.info(
> +            f"Updated Vulnerability with ID: {vulnerability.id} "
> +            f"for {vulnerability.distribution.name}",
> +        )
> +
> +    def _find_existing_bug(
> +        self,
> +        soss_record: SOSSRecord,
> +        lp_cve: CveModel,
> +        distribution: Distribution,
> +    ) -> Optional[BugModel]:
> +        """Find existing bug for the given CVE."""
> +        for vulnerability in lp_cve.vulnerabilities:
> +            if vulnerability.distribution == distribution:
> +                bugs = vulnerability.bugs
> +                if len(bugs) > 1:
> +                    raise ValueError(
> +                        "Multiple existing bugs found for CVE ",
> +                        soss_record.sequence,
> +                    )
> +                if bugs:
> +                    return bugs[0]
> +        return None
> +
> +    def _find_existing_vulnerability(
> +        self, bug: BugModel, distribution: Distribution
> +    ) -> Optional[Vulnerability]:
> +        """Find existing vulnerability for the current distribution"""
> +        if not bug:
> +            return None
> +
> +        vulnerability = next(
> +            (v for v in bug.vulnerabilities if v.distribution == distribution),
> +            None,
> +        )
> +        return vulnerability
> +
> +    def _update_bugtasks(self, bug: BugModel, soss_record: SOSSRecord) -> None:
> +        """
> +        Add bug tasks to the given Bug model based on the information
> +        from a SOSSRecord.
> +
> +        This may be called multiple times, only new targets will be created.
> +        Existing targets will be updated. Packages that were removed from the
> +        SOSSRecord will be removed.
> +
> +        :param bug: Bug model to be updated
> +        :param packages: list of SOSSRecord.Packages from a SOSSRecord
> +        """
> +        packages: List[SOSSRecord.Package] = soss_record.packages.items()
> +        assignee = self._get_assignee(soss_record.assigned_to)
> +
> +        # Build a lookup dict for existing bug tasks
> +        bugtask_by_target = {task.target: task for task in bug.bugtasks}
> +
> +        for packagetype, package_list in packages:
> +            for package in package_list:
> +                target = self._create_external_package(package, packagetype)

done

> +                metadata = (
> +                    {"repositories": package.repositories}
> +                    if package.repositories
> +                    else None
> +                )
> +
> +                if target not in bugtask_by_target:
> +                    self.bugtask_set.createTask(
> +                        bug,
> +                        self.bug_importer,
> +                        target,
> +                        status=PACKAGE_STATUS_MAP[package.status],
> +                        importance=PRIORITY_ENUM_MAP[soss_record.priority],
> +                        assignee=assignee,
> +                        metadata=metadata,
> +                    )
> +                else:
> +                    bugtask = bugtask_by_target[target]
> +
> +                    # This should not appear again, we use this to remove the
> +                    # not used bugtasks
> +                    bugtask_by_target.pop(target)
> +
> +                    bugtask.transitionToStatus(
> +                        PACKAGE_STATUS_MAP[package.status]
> +                    )
> +                    bugtask.transitionToImportance(
> +                        PRIORITY_ENUM_MAP[soss_record.priority]
> +                    )
> +                    # We always have rights to change assignees
> +                    bugtask.transitionToAssignee(assignee, validate=False)
> +                    bugtask.metadata = metadata
> +
> +        # Remove bugtasks that were deleted from the record
> +        for bugtask in bugtask_by_target.values():
> +            bugtask.destroySelf()
> +
> +    def _get_launchpad_cve(self, cve_sequence: str) -> Optional[CveModel]:
> +        """Get CVE from Launchpad."""
> +        lp_cve: CveModel = removeSecurityProxy(self.cve_set[cve_sequence])
> +        if lp_cve is None:
> +            logger.warning(
> +                "%s: could not find the CVE in LP. Aborting. "
> +                "%s was not imported.",
> +                cve_sequence,
> +                cve_sequence,
> +            )
> +        return lp_cve
> +
> +    def _make_bug_description(self, soss_record: SOSSRecord) -> str:
> +        """
> +        Some SOSSRecord fields can't be mapped to Launchpad models.
> +
> +        They are saved to bug description.
> +
> +        :param soss_record: SOSSRecord with information from UCT
> +        :return: bug description
> +        """
> +        parts = [soss_record.description] if soss_record.description else []
> +        if soss_record.references:
> +            parts.extend(["", "References:"])
> +            parts.extend(soss_record.references)
> +        return "\n".join(parts) if parts else "-"
> +
> +    def _get_assignee(self, assigned_to: Optional[str]) -> Person:
> +        """Get assignee person object if assigned_to is provided."""
> +        if not assigned_to:
> +            return None
> +
> +        person = self.person_set.getByName(assigned_to)
> +        if not person:
> +            logger.warning(f"Assignee not found: {assigned_to}")
> +
> +        return person
> +
> +    def _create_external_package(
> +        self,
> +        package: SOSSRecord.Package,
> +        packagetype: SOSSRecord.PackageTypeEnum,
> +    ) -> ExternalPackage:
> +        """Create external package for the given package."""
> +        source_package_name = self.source_package_name_set.getOrCreateByName(
> +            package.name
> +        )
> +        return self.soss.getExternalPackage(
> +            name=source_package_name,
> +            packagetype=PACKAGE_TYPE_MAP[packagetype],
> +            channel=package.channel.value,
> +        )
> +
> +    def _prepare_cvss_data(self, soss_record: SOSSRecord) -> Dict:
> +        """Prepare CVSS data from SOSS record."""
> +        cvss_data = defaultdict(list)
> +        for cvss in soss_record.cvss:
> +            cvss_data[cvss.source].append(cvss.to_dict())
> +        return dict(cvss_data)
> +
> +    def _normalize_date_with_timezone(self, date_obj) -> Optional:
> +        """Normalize date to UTC timezone if needed."""
> +        if date_obj and date_obj.tzinfo is None:
> +            return date_obj.replace(tzinfo=timezone.utc)
> +        return date_obj
> +
> +    def _validate_soss_record(
> +        self, soss_record: SOSSRecord, cve_sequence: str
> +    ) -> bool:
> +        """Validate SOSS record before processing."""
> +        if soss_record.candidate and soss_record.candidate != cve_sequence:
> +            logger.warning(
> +                "CVE sequence mismatch: %s != %s",
> +                soss_record.candidate,
> +                cve_sequence,
> +            )
> +            return False
> +
> +        if not soss_record.packages:
> +            logger.warning(
> +                "%s: could not find any affected packages, aborting."
> +                "%s was not imported.",
> +                cve_sequence,
> +                cve_sequence,
> +            )
> +            return False
> +
> +        return True
> +
> +    def _get_first_package_info(
> +        self, soss_record: SOSSRecord
> +    ) -> Tuple[SOSSRecord.PackageTypeEnum, SOSSRecord.Package]:
> +        """Get first package type and package from SOSS record."""
> +        first_item = next(iter(soss_record.packages.items()))
> +        packagetype = first_item[0]
> +        package = first_item[1][0]
> +        return packagetype, package
> diff --git a/lib/lp/bugs/scripts/sossimport.py b/lib/lp/bugs/scripts/sossimport.py
> new file mode 100644
> index 0000000..1a905bf
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/sossimport.py
> @@ -0,0 +1,82 @@
> +import logging
> +from pathlib import Path
> +
> +from lp.app.enums import InformationType
> +from lp.app.validators.cve import CVEREF_PATTERN
> +from lp.bugs.scripts.soss import SOSSImporter
> +from lp.services.scripts.base import LaunchpadScript
> +
> +logger = logging.getLogger(__name__)
> +
> +
> +class SOSSImportScript(LaunchpadScript):
> +    """CLI for SOSSImport

As discussed, saving this in another branch since we don't need it (at least for now). Will remove also its tests.

> +
> +    Command line options:
> +    The filter option takes a glob-style pattern.
> +    Example: `2007*` filters all CVEs from the year 2007.
> +    """
> +
> +    usage = "usage: %prog [options] PATH"
> +    description = (
> +        "Import bugs into Launchpad from CVE entries in soss-cve-tracker. "
> +        "PATH is either path to a CVE file, or path to a directory "
> +        "containing the CVE files."
> +    )
> +    loglevel = logging.INFO
> +
> +    def add_my_options(self):
> +        self.parser.add_option(
> +            "--dry-run",
> +            action="store_true",
> +            dest="dry_run",
> +            default=False,
> +            help="Don't commit changes to the DB.",
> +        )
> +        self.parser.add_option(
> +            "--filter",
> +            action="store",
> +            dest="filter",
> +            default="*",
> +            help="Apply given glob-style pattern to filter CVEs.",
> +        )
> +        self.parser.add_option(
> +            "--information-type",
> +            action="store",
> +            dest="information_type",
> +            default="private",
> +            help=(
> +                "The types used to control which users and teams can see the "
> +                "bugs and vulnerabilities. Allowed values: public, private"
> +            ),
> +        )
> +
> +    def main(self):
> +        if len(self.args) != 1:
> +            self.parser.error("Please specify a path to import")
> +        path = Path(self.args[0])
> +        if path.is_dir():
> +            logger.info(
> +                "Importing CVE files from directory: %s", path.resolve()
> +            )
> +            cve_paths = sorted(
> +                p
> +                for p in path.rglob("CVE-%s" % self.options.filter)
> +                if p.is_file() and CVEREF_PATTERN.match(p.name)
> +            )
> +            if not cve_paths:
> +                logger.warning("Could not find CVE files in %s", path)
> +                return
> +        else:
> +            cve_paths = [path]
> +
> +        information_type = InformationType.PRIVATESECURITY
> +        if self.options.information_type == "public":
> +            information_type = InformationType.PUBLICSECURITY
> +
> +        logger.info(f"Bug/Vulnerability visibility: {information_type}")
> +        importer = SOSSImporter(
> +            information_type=information_type, dry_run=self.options.dry_run
> +        )
> +        for cve_path in cve_paths:
> +            importer.import_cve_from_file(cve_path)
> diff --git a/lib/lp/bugs/scripts/tests/test_sossimport.py b/lib/lp/bugs/scripts/tests/test_sossimport.py
> new file mode 100644
> index 0000000..dc159a8
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/tests/test_sossimport.py
> @@ -0,0 +1,139 @@
> +from pathlib import Path
> +
> +import transaction
> +
> +from lp.testing import TestCaseWithFactory
> +from lp.testing.layers import LaunchpadZopelessLayer
> +from lp.testing.script import run_script
> +
> +
> +class TestSOSSImportScript(TestCaseWithFactory):
> +    """Test the TestSOSSImportScript class."""
> +
> +    layer = LaunchpadZopelessLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.factory.makeCVE(sequence="2025-1979")
> +        self.soss = self.factory.makeDistribution(
> +            name="soss", displayname="SOSS"
> +        )
> +        transaction.commit()
> +
> +        self.sampledata = (
> +            Path(__file__).parent / ".." / "soss" / "tests" / "sampledata"
> +        )
> +
> +    def test_no_path_given(self):
> +        """TestSOSSImportScript errors when no path given"""
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[],
> +        )
> +        self.assertEqual(2, exit_code)
> +        self.assertEqual("", out)
> +        self.assertEqual(
> +            "Usage: soss-import.py [options] PATH\n\nsoss-import.py: "
> +            "error: Please specify a path to import\n",
> +            err,
> +        )
> +
> +    def test_load_from_file(self):
> +        load_from = self.sampledata / "CVE-2025-1979"

removed the file as discussed

> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "-vvv"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Importing CVE-2025-1979", err)
> +        self.assertIn("Created bug with ID:", err)
> +        self.assertIn("Created vulnerability with ID:", err)
> +        self.assertIn("commit", err)

err is the erorr output. Removed the file as discussed.

> +
> +    def test_load_from_directory(self):
> +        load_from = Path(__file__).parent / ".." / "uct"
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "-vvv"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Could not find CVE files in", err)
> +
> +        load_from = self.sampledata
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "-vvv"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Aborting. CVE-2005-1544 was not imported", err)
> +        self.assertIn("Aborting. CVE-2011-5000 was not imported", err)
> +        self.assertIn("Aborting. CVE-2021-21300 was not imported", err)
> +        self.assertIn("Importing CVE-2025-1979", err)
> +        self.assertIn("commit", err)
> +
> +    def test_dry_run_does_not_crash(self):
> +        load_from = self.sampledata / "CVE-2025-1979"
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--dry-run", "-vvv"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Importing CVE-2025-1979", err)
> +        self.assertNotIn("commit", err)
> +
> +    def test_filter_cve(self):
> +        load_from = self.sampledata
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--filter", "2005*"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("CVE-2005-1544", err)
> +        self.assertNotIn("CVE-2011-5000", err)
> +        self.assertNotIn("CVE-2021-21300", err)
> +        self.assertNotIn("CVE-2025-1979", err)
> +
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--filter", "202*"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertNotIn("CVE-2005-1544", err)
> +        self.assertNotIn("CVE-2011-5000", err)
> +        self.assertIn("CVE-2021-21300", err)
> +        self.assertIn("CVE-2025-1979", err)
> +
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--filter", "20[02][15]*"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("CVE-2005-1544", err)
> +        self.assertNotIn("CVE-2011-5000", err)
> +        self.assertIn("CVE-2021-21300", err)
> +        self.assertIn("CVE-2025-1979", err)
> +
> +    def test_information_type(self):
> +        load_from = self.sampledata / "CVE-2025-1979"
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--information-type", "public"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Bug/Vulnerability visibility: Public Security", err)
> +
> +        exit_code, out, err = run_script(
> +            script="scripts/soss-import.py",
> +            args=[str(load_from), "--information-type", "private"],
> +        )
> +        self.assertEqual(0, exit_code)
> +        self.assertEqual("", out)
> +        self.assertIn("Bug/Vulnerability visibility: Private Security", err)
> diff --git a/scripts/soss-import.py b/scripts/soss-import.py
> new file mode 100755
> index 0000000..26a2131
> --- /dev/null
> +++ b/scripts/soss-import.py
> @@ -0,0 +1,11 @@
> +#!/usr/bin/python3 -S
> +#
> +# Copyright 2022 Canonical Ltd.  This software is licensed under the

removed the file as discussed

> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +import _pythonpath  # noqa: F401
> +
> +from lp.bugs.scripts.sossimport import SOSSImportScript
> +
> +if __name__ == "__main__":
> +    script = SOSSImportScript("lp.services.scripts.uctimport")

removed the file as discussed

> +    script.run()


-- 
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491173
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master.



References