launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32889
[Merge] ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master
Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master.
Commit message:
Add sossimport.py script
Add ExternalPackageType.GENERIC
Add CVSS field to vulnerability
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491173
Add the script that will enable us to import SOSS vulnerability data to fit LP188. Also made some model changes to support all the spec fields.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-soss-import-mapping into launchpad:master.
diff --git a/lib/lp/bugs/configure.zcml b/lib/lp/bugs/configure.zcml
index 9eafebb..b0bec51 100644
--- a/lib/lp/bugs/configure.zcml
+++ b/lib/lp/bugs/configure.zcml
@@ -254,7 +254,9 @@
subscribe
getConjoinedPrimary
findSimilarBugs
- getContributorInfo"/>
+ getContributorInfo
+ metadata
+ "/>
<require
permission="launchpad.Delete"
interface="lp.bugs.interfaces.bugtask.IBugTaskDelete"/>
@@ -295,6 +297,7 @@
status_explanation
targetname
title
+ metadata
"/>
</class>
<adapter
diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
index f1280a4..9a3c2a0 100644
--- a/lib/lp/bugs/interfaces/bug.py
+++ b/lib/lp/bugs/interfaces/bug.py
@@ -104,6 +104,7 @@ class CreateBugParams:
description=None,
msg=None,
status=None,
+ status_explanation=None,
datecreated=None,
information_type=None,
subscribers=(),
@@ -115,6 +116,8 @@ class CreateBugParams:
milestone=None,
assignee=None,
cve=None,
+ metadata=None,
+ check_permissions=None,
):
self.owner = owner
self.title = title
@@ -122,6 +125,7 @@ class CreateBugParams:
self.description = description
self.msg = msg
self.status = status
+ self.status_explanation = status_explanation
self.datecreated = datecreated
self.information_type = information_type
self.subscribers = subscribers
@@ -133,6 +137,8 @@ class CreateBugParams:
self.milestone = milestone
self.assignee = assignee
self.cve = cve
+ self.metadata = metadata
+ self.check_permissions = check_permissions
class BugNameField(ContentNameField):
diff --git a/lib/lp/bugs/interfaces/bugtask.py b/lib/lp/bugs/interfaces/bugtask.py
index 9d8a36c..9f1e473 100644
--- a/lib/lp/bugs/interfaces/bugtask.py
+++ b/lib/lp/bugs/interfaces/bugtask.py
@@ -1208,6 +1208,7 @@ class IBugTaskSet(Interface):
importance=None,
assignee=None,
milestone=None,
+ metadata=None,
):
"""Create a bug task on a bug and return it.
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index 474309f..6ff2c1c 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -15,7 +15,7 @@ from lazr.enum import DBEnumeratedType, DBItem
from lazr.restful.declarations import exported, exported_as_webservice_entry
from lazr.restful.fields import CollectionField, Reference
from zope.interface import Interface
-from zope.schema import Bool, Choice, Datetime, Int, TextLine
+from zope.schema import Bool, Choice, Datetime, Int, List, TextLine
from lp import _
from lp.app.enums import InformationType
@@ -97,6 +97,15 @@ class VulnerabilityChange(DBEnumeratedType):
""",
)
+ PRIVACY = DBItem(
+ 6,
+ """
+ CVSS
+
+ Common Vulnerability Scoring System
+ """,
+ )
+
class IVulnerabilityView(Interface):
"""`IVulnerability` attributes that require launchpad.View."""
@@ -303,6 +312,15 @@ class IVulnerabilityEditableAttributes(Interface):
as_of="devel",
)
+ cvss = exported(
+ List(
+ title=_("List of Common Vulnerability Scoring System."),
+ required=False,
+ readonly=False,
+ ),
+ as_of="devel",
+ )
+
class IVulnerabilityEdit(Interface):
"""`IVulnerability` attributes that require launchpad.Edit."""
@@ -338,6 +356,7 @@ class IVulnerabilitySet(Interface):
date_made_public=None,
date_notice_issued=None,
date_coordinated_release=None,
+ cvss=None,
):
"""Return a new vulnerability.
@@ -356,6 +375,7 @@ class IVulnerabilitySet(Interface):
:param date_coordinated_release: Date when a security notice was issued
for this vulnerability.
:param date_notice_issued: Coordinated Release Date.
+ :param cvss: Common vulnerability scroring system.
"""
def findByIds(vulnerability_ids, visible_by_user=None):
diff --git a/lib/lp/bugs/model/bug.py b/lib/lp/bugs/model/bug.py
index e303731..2d3f90a 100644
--- a/lib/lp/bugs/model/bug.py
+++ b/lib/lp/bugs/model/bug.py
@@ -249,6 +249,8 @@ def snapshot_bug_params(bug_params):
"milestone",
"assignee",
"cve",
+ "metadata",
+ "check_permissions",
],
)
@@ -3320,7 +3322,11 @@ class BugSet:
# Create the initial task on the specified target. This also
# reconciles access policies for this bug based on that target.
getUtility(IBugTaskSet).createTask(
- bug, params.owner, params.target, status=params.status
+ bug,
+ params.owner,
+ params.target,
+ status=params.status,
+ metadata=params.metadata,
)
if params.subscribe_owner:
@@ -3415,7 +3421,7 @@ class BugSet:
bug.markUserAffected(bug.owner)
if params.cve is not None:
- bug.linkCVE(params.cve, params.owner)
+ bug.linkCVE(params.cve, params.owner, params.check_permissions)
# Populate the creation event.
if params.filed_by is None:
diff --git a/lib/lp/bugs/model/bugtask.py b/lib/lp/bugs/model/bugtask.py
index 998b572..c3fb364 100644
--- a/lib/lp/bugs/model/bugtask.py
+++ b/lib/lp/bugs/model/bugtask.py
@@ -1908,6 +1908,7 @@ class BugTaskSet:
importance=None,
assignee=None,
milestone=None,
+ metadata=None,
):
"""See `IBugTaskSet`."""
if status is None:
@@ -1939,6 +1940,7 @@ class BugTaskSet:
importance,
assignee,
milestone,
+ metadata,
)
for key in target_keys
]
@@ -1958,6 +1960,7 @@ class BugTaskSet:
BugTask.importance,
BugTask.assignee,
BugTask.milestone,
+ BugTask.metadata,
),
values,
get_objects=True,
@@ -1986,6 +1989,7 @@ class BugTaskSet:
importance=None,
assignee=None,
milestone=None,
+ metadata=None,
):
"""See `IBugTaskSet`."""
# Create tasks for accepted nominations if this is a source
@@ -2012,6 +2016,7 @@ class BugTaskSet:
importance=importance,
assignee=assignee,
milestone=milestone,
+ metadata=metadata,
)
return [task for task in tasks if task.target == target][0]
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index d147171..21da533 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -11,6 +11,7 @@ import operator
from datetime import timezone
from typing import Iterable
+from storm.databases.postgres import JSON
from storm.expr import SQL, Coalesce, Join, Or, Select
from storm.locals import DateTime, Int, Reference, Unicode
from storm.store import Store
@@ -74,6 +75,8 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
mitigation = Unicode(name="mitigation", allow_none=True)
+ cvss = JSON(name="cvss", allow_none=True)
+
importance = DBEnum(
name="importance",
allow_none=False,
@@ -127,6 +130,7 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
date_made_public=None,
date_notice_issued=None,
date_coordinated_release=None,
+ cvss=None,
):
super().__init__()
self.distribution = distribution
@@ -147,6 +151,7 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
self.date_notice_issued = date_notice_issued
self.date_coordinated_release = date_coordinated_release
self.date_created = UTC_NOW
+ self.cvss = cvss
@property
def private(self):
@@ -339,6 +344,7 @@ class VulnerabilitySet:
date_made_public=None,
date_notice_issued=None,
date_coordinated_release=None,
+ cvss=None,
):
"""See `IVulnerabilitySet`."""
store = IStore(Vulnerability)
@@ -356,6 +362,7 @@ class VulnerabilitySet:
date_made_public=date_made_public,
date_notice_issued=date_notice_issued,
date_coordinated_release=date_coordinated_release,
+ cvss=cvss,
)
store.add(vulnerability)
vulnerability._reconcileAccess()
@@ -452,6 +459,10 @@ def get_vulnerability_privacy_filter(user):
elif IPersonRoles.providedBy(user):
user = user.person
+ role = IPersonRoles(user)
+ if role.in_bug_importer or role.in_admin:
+ return True
+
artifact_grant_query = Coalesce(
ArrayIntersects(
SQL("Vulnerability.access_grants"),
diff --git a/lib/lp/bugs/scripts/soss/__init__.py b/lib/lp/bugs/scripts/soss/__init__.py
index db91701..cc2ebdf 100644
--- a/lib/lp/bugs/scripts/soss/__init__.py
+++ b/lib/lp/bugs/scripts/soss/__init__.py
@@ -2,3 +2,4 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
from lp.bugs.scripts.soss.models import SOSSRecord # noqa: F401
+from lp.bugs.scripts.soss.sossimport import SOSSImporter # noqa: F401
diff --git a/lib/lp/bugs/scripts/soss/sossimport.py b/lib/lp/bugs/scripts/soss/sossimport.py
new file mode 100644
index 0000000..4695dcf
--- /dev/null
+++ b/lib/lp/bugs/scripts/soss/sossimport.py
@@ -0,0 +1,464 @@
+# Copyright 2025 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""A SOSS (SOSS CVE Tracker) bug importer"""
+import logging
+import os
+from collections import defaultdict
+from datetime import timezone
+from typing import Dict, List, Optional, Tuple
+
+import transaction
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.app.enums import InformationType
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.enums import VulnerabilityStatus
+from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
+from lp.bugs.interfaces.bugtask import (
+ BugTaskImportance,
+ BugTaskStatus,
+ IBugTaskSet,
+)
+from lp.bugs.interfaces.cve import ICveSet
+from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
+from lp.bugs.model.bug import Bug as BugModel
+from lp.bugs.model.cve import Cve as CveModel
+from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.scripts.soss.models import SOSSRecord
+from lp.registry.interfaces.distribution import IDistributionSet
+from lp.registry.interfaces.externalpackage import ExternalPackageType
+from lp.registry.interfaces.person import IPersonSet
+from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
+from lp.registry.model.distribution import Distribution
+from lp.registry.model.externalpackage import ExternalPackage
+from lp.registry.model.person import Person
+from lp.testing import person_logged_in
+
+__all__ = [
+ "SOSSImporter",
+]
+
+logger = logging.getLogger(__name__)
+
+# Constants moved to module level with proper naming
+PRIORITY_ENUM_MAP = {
+ SOSSRecord.PriorityEnum.NEEDS_TRIAGE: BugTaskImportance.UNDECIDED,
+ SOSSRecord.PriorityEnum.NEGLIGIBLE: BugTaskImportance.WISHLIST,
+ SOSSRecord.PriorityEnum.LOW: BugTaskImportance.LOW,
+ SOSSRecord.PriorityEnum.MEDIUM: BugTaskImportance.MEDIUM,
+ SOSSRecord.PriorityEnum.HIGH: BugTaskImportance.HIGH,
+ SOSSRecord.PriorityEnum.CRITICAL: BugTaskImportance.CRITICAL,
+}
+
+PACKAGE_TYPE_MAP = {
+ SOSSRecord.PackageTypeEnum.UNPACKAGED: ExternalPackageType.GENERIC,
+ SOSSRecord.PackageTypeEnum.PYTHON: ExternalPackageType.PYTHON,
+ SOSSRecord.PackageTypeEnum.MAVEN: ExternalPackageType.MAVEN,
+ SOSSRecord.PackageTypeEnum.CONDA: ExternalPackageType.CONDA,
+ SOSSRecord.PackageTypeEnum.RUST: ExternalPackageType.CARGO,
+}
+
+PACKAGE_STATUS_MAP = {
+ SOSSRecord.PackageStatusEnum.IGNORED: BugTaskStatus.WONTFIX,
+ SOSSRecord.PackageStatusEnum.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
+ SOSSRecord.PackageStatusEnum.RELEASED: BugTaskStatus.FIXRELEASED,
+ SOSSRecord.PackageStatusEnum.NOT_AFFECTED: BugTaskStatus.INVALID,
+ SOSSRecord.PackageStatusEnum.DEFERRED: BugTaskStatus.DEFERRED,
+ SOSSRecord.PackageStatusEnum.NEEDED: BugTaskStatus.NEW,
+}
+
+DISTRIBUTION_NAME = "soss"
+
+
+class SOSSImporter:
+ """
+ SOSSImporter is used to import SOSS CVE files to Launchpad database.
+ """
+
+ def __init__(
+ self,
+ information_type: InformationType = InformationType.PRIVATESECURITY,
+ dry_run: bool = False,
+ ) -> None:
+ self.information_type = information_type
+ self.dry_run = dry_run
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+ self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
+ self.person_set = getUtility(IPersonSet)
+ self.source_package_name_set = getUtility(ISourcePackageNameSet)
+ self.bugtask_set = getUtility(IBugTaskSet)
+ self.vulnerability_set = getUtility(IVulnerabilitySet)
+ self.bug_set = getUtility(IBugSet)
+ self.cve_set = getUtility(ICveSet)
+
+ def import_cve_from_file(
+ self, cve_path: str
+ ) -> Tuple[BugModel, Vulnerability]:
+ """Import CVE from file path."""
+ cve_sequence = os.path.basename(cve_path)
+ logger.info(f"Importing {cve_sequence}")
+
+ with open(cve_path, encoding="utf-8") as file:
+ soss_record = SOSSRecord.from_yaml(file)
+
+ with person_logged_in(self.bug_importer):
+ bug, vulnerability = self.import_cve(soss_record, cve_sequence)
+
+ return bug, vulnerability
+
+ def import_cve(
+ self, soss_record: SOSSRecord, cve_sequence: str
+ ) -> Tuple[BugModel, Vulnerability]:
+ """Import CVE from SOSS record."""
+ if not self._validate_soss_record(soss_record, cve_sequence):
+ return None, None
+
+ lp_cve = self._get_launchpad_cve(cve_sequence)
+ if lp_cve is None:
+ return None, None
+
+ bug = self._find_existing_bug(soss_record, lp_cve, self.soss)
+ if not bug:
+ bug = self._create_bug(soss_record, lp_cve)
+ else:
+ bug = self._update_bug(bug, soss_record, lp_cve)
+
+ vulnerability = self._find_existing_vulnerability(bug, self.soss)
+ if not vulnerability:
+ vulnerability = self._create_vulnerability(
+ bug, soss_record, lp_cve, self.soss
+ )
+ else:
+ vulnerability = self._update_vulnerability(
+ vulnerability, soss_record
+ )
+
+ if not self.dry_run:
+ transaction.commit()
+ logger.info(f"Successfully committed changes for {cve_sequence}")
+
+ return bug, vulnerability
+
+ def _create_bug(
+ self, soss_record: SOSSRecord, lp_cve: CveModel
+ ) -> BugModel:
+ """
+ Create a Bug model based on the information contained in a
+ SOSSRecord.
+
+ :param soss_record: SOSSRecord with information from a SOSS cve
+ :param lp_cve: Launchpad Cve model
+ """
+ packagetype, package = self._get_first_package_info(soss_record)
+ assignee = self._get_assignee(soss_record.assigned_to)
+
+ externalpackage = self._create_external_package(package, packagetype)
+ metadata = {"repositories": package.repositories}
+
+ # Create the bug, only first bugtask
+ bug, _ = self.bug_set.createBug(
+ CreateBugParams(
+ comment=self._make_bug_description(soss_record),
+ title=lp_cve.sequence,
+ information_type=self.information_type,
+ owner=self.bug_importer,
+ target=externalpackage,
+ status=PACKAGE_STATUS_MAP[package.status],
+ status_explanation=package.note,
+ assignee=assignee,
+ importance=PRIORITY_ENUM_MAP[soss_record.priority],
+ cve=lp_cve,
+ metadata=metadata,
+ check_permissions=False,
+ ),
+ notify_event=False,
+ )
+
+ # Create next bugtasks
+ self._update_bugtasks(bug, soss_record)
+
+ logger.info(f"Created bug with ID: {bug.id}")
+ return bug
+
+ def _update_bug(
+ self, bug: BugModel, soss_record: SOSSRecord, lp_cve: CveModel
+ ) -> BugModel:
+ """
+ Update a Bug model with the information contained in a SOSSRecord.
+
+ :param bug: Bug model to be updated
+ :param soss_record: SOSSRecord with information from a SOSS cve
+ :param lp_cve: Launchpad Cve model
+ """
+ bug.description = self._make_bug_description(soss_record)
+ bug.title = lp_cve.sequence
+ bug.transitionToInformationType(
+ self.information_type, self.bug_importer
+ )
+ self._update_bugtasks(bug, soss_record)
+
+ logger.info(f"Updated Bug with ID: {bug.id}")
+ return bug
+
+ def _create_vulnerability(
+ self,
+ bug: BugModel,
+ soss_record: SOSSRecord,
+ lp_cve: CveModel,
+ distribution: Distribution,
+ ) -> Vulnerability:
+ """
+ Create a Vulnerability instance based on the information from
+ the given SOSSRecord instance and link to the specified Bug
+ and LP's Cve model.
+
+ :param bug: Bug model associated with the vulnerability
+ :param soss_record: SOSSRecord with information from a SOSS cve
+ :param lp_cve: Launchpad Cve model
+ :param distribution: a Distribution affected by the vulnerability
+ :return: a Vulnerability
+ """
+ vulnerability: Vulnerability = self.vulnerability_set.new(
+ distribution=distribution,
+ status=VulnerabilityStatus.NEEDS_TRIAGE,
+ importance=PRIORITY_ENUM_MAP[soss_record.priority],
+ creator=bug.owner,
+ information_type=self.information_type,
+ cve=lp_cve,
+ description=soss_record.description,
+ notes="\n".join(soss_record.notes),
+ mitigation=None,
+ importance_explanation=soss_record.priority_reason,
+ date_made_public=self._normalize_date_with_timezone(
+ soss_record.public_date
+ ),
+ date_notice_issued=None,
+ date_coordinated_release=None,
+ cvss=self._prepare_cvss_data(soss_record),
+ )
+ vulnerability.linkBug(bug, bug.owner)
+
+ logger.info(
+ f"Created vulnerability with ID: {vulnerability.id} "
+ f"for {distribution.name}",
+ )
+
+ return vulnerability
+
+ def _update_vulnerability(
+ self, vulnerability: Vulnerability, soss_record: SOSSRecord
+ ) -> None:
+ """
+ Update a Vulnerability model with the information
+ contained in a SOSSRecord
+
+ :param vulnerability: Vulnerability model to be updated
+ :param soss_record: SOSSRecord with information from a SOSS cve
+ """
+ vulnerability.status = VulnerabilityStatus.NEEDS_TRIAGE
+ vulnerability.description = soss_record.description
+ vulnerability.notes = "\n".join(soss_record.notes)
+ vulnerability.mitigation = None
+ vulnerability.importance = PRIORITY_ENUM_MAP[soss_record.priority]
+ vulnerability.importance_explanation = soss_record.priority_reason
+ vulnerability.date_made_public = self._normalize_date_with_timezone(
+ soss_record.public_date
+ )
+ vulnerability.date_notice_issued = None
+ vulnerability.date_coordinated_release = None
+ vulnerability.cvss = self._prepare_cvss_data(soss_record)
+
+ logger.info(
+ f"Updated Vulnerability with ID: {vulnerability.id} "
+ f"for {vulnerability.distribution.name}",
+ )
+
+ def _find_existing_bug(
+ self,
+ soss_record: SOSSRecord,
+ lp_cve: CveModel,
+ distribution: Distribution,
+ ) -> Optional[BugModel]:
+ """Find existing bug for the given CVE."""
+ for vulnerability in lp_cve.vulnerabilities:
+ if vulnerability.distribution == distribution:
+ bugs = vulnerability.bugs
+ if len(bugs) > 1:
+ raise ValueError(
+ "Multiple existing bugs found for CVE ",
+ soss_record.sequence,
+ )
+ if bugs:
+ return bugs[0]
+ return None
+
+ def _find_existing_vulnerability(
+ self, bug: BugModel, distribution: Distribution
+ ) -> Optional[Vulnerability]:
+ """Find existing vulnerability for the current distribution"""
+ if not bug:
+ return None
+
+ vulnerability = next(
+ (v for v in bug.vulnerabilities if v.distribution == distribution),
+ None,
+ )
+ return vulnerability
+
+ def _update_bugtasks(self, bug: BugModel, soss_record: SOSSRecord) -> None:
+ """
+ Add bug tasks to the given Bug model based on the information
+ from a SOSSRecord.
+
+ This may be called multiple times, only new targets will be created.
+ Existing targets will be updated. Packages that were removed from the
+ SOSSRecord will be removed.
+
+ :param bug: Bug model to be updated
+ :param packages: list of SOSSRecord.Packages from a SOSSRecord
+ """
+ packages: List[SOSSRecord.Package] = soss_record.packages.items()
+ assignee = self._get_assignee(soss_record.assigned_to)
+
+ # Build a lookup dict for existing bug tasks
+ bugtask_by_target = {task.target: task for task in bug.bugtasks}
+
+ for packagetype, package_list in packages:
+ for package in package_list:
+ target = self._create_external_package(package, packagetype)
+ metadata = (
+ {"repositories": package.repositories}
+ if package.repositories
+ else None
+ )
+
+ if target not in bugtask_by_target:
+ self.bugtask_set.createTask(
+ bug,
+ self.bug_importer,
+ target,
+ status=PACKAGE_STATUS_MAP[package.status],
+ importance=PRIORITY_ENUM_MAP[soss_record.priority],
+ assignee=assignee,
+ metadata=metadata,
+ )
+ else:
+ bugtask = bugtask_by_target[target]
+
+ # This should not appear again, we use this to remove the
+ # not used bugtasks
+ bugtask_by_target.pop(target)
+
+ bugtask.transitionToStatus(
+ PACKAGE_STATUS_MAP[package.status]
+ )
+ bugtask.transitionToImportance(
+ PRIORITY_ENUM_MAP[soss_record.priority]
+ )
+ # We always have rights to change assignees
+ bugtask.transitionToAssignee(assignee, validate=False)
+ bugtask.metadata = metadata
+
+ # Remove bugtasks that were deleted from the record
+ for bugtask in bugtask_by_target.values():
+ bugtask.destroySelf()
+
+ def _get_launchpad_cve(self, cve_sequence: str) -> Optional[CveModel]:
+ """Get CVE from Launchpad."""
+ lp_cve: CveModel = removeSecurityProxy(self.cve_set[cve_sequence])
+ if lp_cve is None:
+ logger.warning(
+ "%s: could not find the CVE in LP. Aborting. "
+ "%s was not imported.",
+ cve_sequence,
+ cve_sequence,
+ )
+ return lp_cve
+
+ def _make_bug_description(self, soss_record: SOSSRecord) -> str:
+ """
+ Some SOSSRecord fields can't be mapped to Launchpad models.
+
+ They are saved to bug description.
+
+ :param soss_record: SOSSRecord with information from UCT
+ :return: bug description
+ """
+ parts = [soss_record.description] if soss_record.description else []
+ if soss_record.references:
+ parts.extend(["", "References:"])
+ parts.extend(soss_record.references)
+ return "\n".join(parts) if parts else "-"
+
+ def _get_assignee(self, assigned_to: Optional[str]) -> Person:
+ """Get assignee person object if assigned_to is provided."""
+ if not assigned_to:
+ return None
+
+ person = self.person_set.getByName(assigned_to)
+ if not person:
+ logger.warning(f"Assignee not found: {assigned_to}")
+
+ return person
+
+ def _create_external_package(
+ self,
+ package: SOSSRecord.Package,
+ packagetype: SOSSRecord.PackageTypeEnum,
+ ) -> ExternalPackage:
+ """Create external package for the given package."""
+ source_package_name = self.source_package_name_set.getOrCreateByName(
+ package.name
+ )
+ return self.soss.getExternalPackage(
+ name=source_package_name,
+ packagetype=PACKAGE_TYPE_MAP[packagetype],
+ channel=package.channel.value,
+ )
+
+ def _prepare_cvss_data(self, soss_record: SOSSRecord) -> Dict:
+ """Prepare CVSS data from SOSS record."""
+ cvss_data = defaultdict(list)
+ for cvss in soss_record.cvss:
+ cvss_data[cvss.source].append(cvss.to_dict())
+ return dict(cvss_data)
+
+ def _normalize_date_with_timezone(self, date_obj) -> Optional:
+ """Normalize date to UTC timezone if needed."""
+ if date_obj and date_obj.tzinfo is None:
+ return date_obj.replace(tzinfo=timezone.utc)
+ return date_obj
+
+ def _validate_soss_record(
+ self, soss_record: SOSSRecord, cve_sequence: str
+ ) -> bool:
+ """Validate SOSS record before processing."""
+ if soss_record.candidate and soss_record.candidate != cve_sequence:
+ logger.warning(
+ "CVE sequence mismatch: %s != %s",
+ soss_record.candidate,
+ cve_sequence,
+ )
+ return False
+
+ if not soss_record.packages:
+ logger.warning(
+ "%s: could not find any affected packages, aborting."
+ "%s was not imported.",
+ cve_sequence,
+ cve_sequence,
+ )
+ return False
+
+ return True
+
+ def _get_first_package_info(
+ self, soss_record: SOSSRecord
+ ) -> Tuple[SOSSRecord.PackageTypeEnum, SOSSRecord.Package]:
+ """Get first package type and package from SOSS record."""
+ first_item = next(iter(soss_record.packages.items()))
+ packagetype = first_item[0]
+ package = first_item[1][0]
+ return packagetype, package
diff --git a/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2021-21300 b/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2021-21300
index 0a68496..22ae13d 100644
--- a/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2021-21300
+++ b/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2021-21300
@@ -2,7 +2,7 @@ References: []
Notes: []
Priority: Needs-triage
Priority-Reason: ''
-Assigned-To: ''
+Assigned-To: octagalland
Packages:
conda:
- Name: git
diff --git a/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979-full b/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979
similarity index 98%
rename from lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979-full
rename to lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979
index c8d8b2c..5d90588 100644
--- a/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979-full
+++ b/lib/lp/bugs/scripts/soss/tests/sampledata/CVE-2025-1979
@@ -14,7 +14,7 @@ Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally and
the logs). Given the requirement for priviledged system access to access log files
the real "danger" posed by the vulnerability is quite low, and that is reflected
in this priority assignment. '
-Assigned-To: octagalland
+Assigned-To: janitor
Packages:
unpackaged:
- Name: vllm
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossimport.py b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
new file mode 100644
index 0000000..e8f8612
--- /dev/null
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
@@ -0,0 +1,440 @@
+from datetime import datetime
+from pathlib import Path
+
+import transaction
+from zope.component import getUtility
+
+from lp.app.enums import InformationType
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
+from lp.bugs.scripts.soss import SOSSRecord
+from lp.bugs.scripts.soss.sossimport import SOSSImporter
+from lp.registry.interfaces.externalpackage import ExternalPackageType
+from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
+from lp.testing import TestCaseWithFactory, person_logged_in
+from lp.testing.layers import LaunchpadZopelessLayer
+
+
+class TestSOSSImporter(TestCaseWithFactory):
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super().setUp()
+ self.sampledata = Path(__file__).parent / "sampledata"
+ self.file = self.sampledata / "CVE-2025-1979"
+
+ with open(self.file, encoding="utf-8") as file:
+ self.soss_record = SOSSRecord.from_yaml(file)
+
+ self.cve = self.factory.makeCVE(sequence="2025-1979")
+ self.soss = self.factory.makeDistribution(
+ name="soss", displayname="SOSS"
+ )
+ transaction.commit()
+
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+ self.janitor = getUtility(ILaunchpadCelebrities).janitor
+ self.source_package_name_set = getUtility(ISourcePackageNameSet)
+
+ # Set up references
+ self.description = (
+ "Versions of the package ray before 2.43.0 are vulnerable to "
+ "Insertion of Sensitive Information into Log File where the redis "
+ "password is being logged in the standard logging. If the redis "
+ "password is passed as an argument, it will be logged and could "
+ "potentially leak the password.\r\rThis is only exploitable if:"
+ "\r\r1) Logging is enabled;\r\r2) Redis is using password "
+ "authentication;\r\r3) Those logs are accessible to an attacker, "
+ "who can reach that redis instance.\r\r**Note:**\r\rIt is "
+ "recommended that anyone who is running in this configuration "
+ "should update to the latest version of Ray, then rotate their "
+ "redis password.\n\n"
+ "References:\n"
+ "https://github.com/ray-project/ray/commit/"
+ "64a2e4010522d60b90c389634f24df77b603d85d\n"
+ "https://github.com/ray-project/ray/issues/50266\n"
+ "https://github.com/ray-project/ray/pull/50409\n"
+ "https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212\n"
+ "https://ubuntu.com/security/notices/SSN-148-1.json"
+ "?show_hidden=true"
+ )
+
+ # Set up reference for bugtasks
+ pyyaml = self.source_package_name_set.getOrCreateByName("pyyaml")
+ ray = self.source_package_name_set.getOrCreateByName("ray")
+ vllm = self.source_package_name_set.getOrCreateByName("vllm")
+
+ self.bugtask_reference = [
+ (
+ self.soss.getExternalPackage(
+ name=pyyaml,
+ packagetype=ExternalPackageType.PYTHON,
+ channel=("jammy:2.22.0", "stable"),
+ ),
+ BugTaskStatus.INVALID,
+ {"repositories": ["nvidia-pb3-python-stable-local"]},
+ ),
+ (
+ self.soss.getExternalPackage(
+ name=ray,
+ packagetype=ExternalPackageType.PYTHON,
+ channel=("jammy:2.22.0", "stable"),
+ ),
+ BugTaskStatus.FIXRELEASED,
+ {"repositories": ["nvidia-pb3-python-stable-local"]},
+ ),
+ (
+ self.soss.getExternalPackage(
+ name=ray,
+ packagetype=ExternalPackageType.CONDA,
+ channel=("jammy:1.17.0", "stable"),
+ ),
+ BugTaskStatus.INVALID,
+ {"repositories": ["nvidia-pb3-python-stable-local"]},
+ ),
+ (
+ self.soss.getExternalPackage(
+ name=ray,
+ packagetype=ExternalPackageType.CARGO,
+ channel=("focal:0.27.0", "stable"),
+ ),
+ BugTaskStatus.DEFERRED,
+ {"repositories": ["nvidia-pb3-python-stable-local"]},
+ ),
+ (
+ self.soss.getExternalPackage(
+ name=vllm,
+ packagetype=ExternalPackageType.GENERIC,
+ channel=("noble:0.7.3", "stable"),
+ ),
+ BugTaskStatus.NEW,
+ {"repositories": ["soss-src-stable-local"]},
+ ),
+ (
+ self.soss.getExternalPackage(
+ name=vllm,
+ packagetype=ExternalPackageType.MAVEN,
+ channel=("noble:0.7.3", "stable"),
+ ),
+ BugTaskStatus.UNKNOWN,
+ {"repositories": ["soss-src-stable-local"]},
+ ),
+ ]
+
+ self.cvss = {
+ "report@xxxxxxx": [
+ {
+ "source": "report@xxxxxxx",
+ "vector": "CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N",
+ "baseScore": 6.4,
+ "baseSeverity": "MEDIUM",
+ }
+ ],
+ "security-advisories@xxxxxxxxxx": [
+ {
+ "source": "security-advisories@xxxxxxxxxx",
+ "vector": "CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H",
+ "baseScore": 9.0,
+ "baseSeverity": "CRITICAL",
+ }
+ ],
+ }
+
+ self.importance_explanation = (
+ "Unrealistic exploitation scenario. Logs are stored locally "
+ "and not transferred between agents, so local log access is "
+ "the only conceivable method to view the password for the "
+ "redis instance (i.e., no possibility of MitM to access the "
+ "logs). Given the requirement for priviledged system access "
+ 'to access log files the real "danger" posed by the '
+ "vulnerability is quite low, and that is reflected in this "
+ "priority assignment. "
+ )
+
+ self.notes = (
+ "This is a sample soss cve with all the fields filled for "
+ "testing\nsample note 2"
+ )
+
+ def _check_bugtasks(
+ self, bugtasks, bugtask_reference, importance, assignee
+ ):
+ self.assertEqual(len(bugtasks), len(bugtask_reference))
+
+ for i, (target, status, metadata) in enumerate(bugtask_reference):
+ self.assertEqual(bugtasks[i].target, target)
+ self.assertEqual(bugtasks[i].status, status)
+ self.assertEqual(bugtasks[i].importance, importance)
+ self.assertEqual(bugtasks[i].assignee, assignee)
+ self.assertEqual(bugtasks[i].metadata, metadata)
+
+ def _check_bug_fields(self, bug, bugtask_reference):
+ """Helper function to check the imported bug"""
+ self.assertEqual(bug.description, self.description)
+ self.assertEqual(bug.title, self.cve.sequence)
+ self.assertEqual(bug.information_type, InformationType.PRIVATESECURITY)
+ self.assertEqual(bug.owner, self.bug_importer)
+
+ self._check_bugtasks(
+ bug.bugtasks,
+ self.bugtask_reference,
+ BugTaskImportance.LOW,
+ self.janitor,
+ )
+
+ def _check_vulnerability_fields(self, vulnerability, bug):
+ """Helper function to check the imported vulnerability"""
+ self.assertEqual(vulnerability.distribution, self.soss)
+ self.assertEqual(
+ vulnerability.date_created.date(), datetime.now().date()
+ )
+ self.assertEqual(
+ vulnerability.date_made_public,
+ datetime.fromisoformat("2025-03-06T05:15:16.213+00:00"),
+ )
+ self.assertEqual(vulnerability.date_notice_issued, None)
+ self.assertEqual(vulnerability.date_coordinated_release, None)
+ self.assertEqual(
+ vulnerability.information_type, InformationType.PRIVATESECURITY
+ )
+ self.assertEqual(vulnerability.importance, BugTaskImportance.LOW)
+ self.assertEqual(
+ vulnerability.importance_explanation,
+ self.importance_explanation,
+ )
+ self.assertEqual(vulnerability.creator, self.bug_importer)
+ self.assertEqual(
+ vulnerability.notes,
+ self.notes,
+ )
+ self.assertEqual(vulnerability.mitigation, None)
+ self.assertEqual(vulnerability.cve, self.cve)
+
+ self.assertEqual(vulnerability.cvss, self.cvss)
+
+ self.assertEqual(len(vulnerability.bugs), 1)
+ self.assertEqual(vulnerability.bugs[0], bug)
+
+ def test_import_cve_from_file(self):
+ """Test import a SOSS cve from file"""
+ file = self.sampledata / "CVE-2025-1979"
+
+ soss_importer = SOSSImporter(
+ information_type=InformationType.PRIVATESECURITY
+ )
+ bug, vulnerability = soss_importer.import_cve_from_file(file)
+
+ # Check bug fields
+ self._check_bug_fields(bug, self.bugtask_reference)
+
+ # Check vulnerability
+ self._check_vulnerability_fields(vulnerability, bug)
+
+ def test_create_update_bug(self):
+ """Test create and update a bug from a SOSS cve file"""
+ with person_logged_in(self.bug_importer):
+ bug = SOSSImporter()._create_bug(self.soss_record, self.cve)
+
+ self._check_bug_fields(bug, self.bugtask_reference)
+
+ # Modify the soss_record and check that the bug changed
+ new_cve = self.factory.makeCVE("2025-1234")
+
+ self.soss_record.description = "New sample description"
+ new_description = (
+ f"{self.soss_record.description}\n\n"
+ "References:\n"
+ "https://github.com/ray-project/ray/commit/"
+ "64a2e4010522d60b90c389634f24df77b603d85d\n"
+ "https://github.com/ray-project/ray/issues/50266\n"
+ "https://github.com/ray-project/ray/pull/50409\n"
+ "https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212\n"
+ "https://ubuntu.com/security/notices/SSN-148-1.json"
+ "?show_hidden=true"
+ )
+
+ self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.UNPACKAGED)
+ self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.MAVEN)
+ self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.RUST)
+
+ bug = SOSSImporter(
+ information_type=InformationType.PUBLICSECURITY
+ )._update_bug(bug, self.soss_record, new_cve)
+ transaction.commit()
+
+ # Check bug fields
+ self.assertEqual(bug.description, new_description)
+ self.assertEqual(bug.title, new_cve.sequence)
+ self.assertEqual(bug.information_type, InformationType.PUBLICSECURITY)
+
+ # Check bugtasks
+ bugtasks = bug.bugtasks
+ bugtask_reference = self.bugtask_reference[:3]
+ self._check_bugtasks(
+ bugtasks, bugtask_reference, BugTaskImportance.LOW, self.janitor
+ )
+
+ def test_create_update_vulnerability(self):
+ """Test create and update a vulnerability from a SOSS cve file"""
+ soss_importer = SOSSImporter()
+ with person_logged_in(self.bug_importer):
+ bug = soss_importer._create_bug(self.soss_record, self.cve)
+ vulnerability = soss_importer._create_vulnerability(
+ bug, self.soss_record, self.cve, self.soss
+ )
+
+ self.assertEqual(vulnerability.distribution, self.soss)
+ self.assertEqual(
+ vulnerability.date_created.date(), datetime.now().date()
+ )
+ self.assertEqual(
+ vulnerability.date_made_public,
+ datetime.fromisoformat("2025-03-06T05:15:16.213+00:00"),
+ )
+ self.assertEqual(vulnerability.date_notice_issued, None)
+ self.assertEqual(vulnerability.date_coordinated_release, None)
+ self.assertEqual(
+ vulnerability.information_type, InformationType.PRIVATESECURITY
+ )
+ self.assertEqual(vulnerability.importance, BugTaskImportance.LOW)
+ self.assertEqual(
+ vulnerability.importance_explanation,
+ self.importance_explanation,
+ )
+ self.assertEqual(vulnerability.creator, self.bug_importer)
+ self.assertEqual(
+ vulnerability.notes,
+ self.notes,
+ )
+ self.assertEqual(vulnerability.mitigation, None)
+ self.assertEqual(vulnerability.cve, self.cve)
+
+ self.assertEqual(vulnerability.cvss, self.cvss)
+
+ self.assertEqual(len(vulnerability.bugs), 1)
+ self.assertEqual(vulnerability.bugs[0], bug)
+
+ def test_update_bugtasks(self):
+ """Test update bugtasks"""
+ soss_importer = SOSSImporter()
+ with person_logged_in(self.bug_importer):
+ bug = soss_importer._create_bug(self.soss_record, self.cve)
+
+ self._check_bugtasks(
+ bug.bugtasks,
+ self.bugtask_reference,
+ BugTaskImportance.LOW,
+ self.janitor,
+ )
+
+ # Update soss_record and check that the bugtasks change
+ self.soss_record.assigned_to = "bug-importer"
+ self.soss_record.priority = SOSSRecord.PriorityEnum.HIGH
+
+ # Remove 2 packages from the soss_record
+ self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.PYTHON)
+
+ # Modify a package
+ self.soss_record.packages[SOSSRecord.PackageTypeEnum.CONDA] = (
+ SOSSRecord.Package(
+ name="aaa",
+ channel=SOSSRecord.Channel(value="noble:4.23.1/stable"),
+ repositories=["test-repo"],
+ status=SOSSRecord.PackageStatusEnum.DEFERRED,
+ note="test note",
+ ),
+ )
+ # Modify its bugtask_reference
+ self.bugtask_reference[2] = (
+ self.soss.getExternalPackage(
+ name=self.source_package_name_set.getOrCreateByName("aaa"),
+ packagetype=ExternalPackageType.CONDA,
+ channel=("noble:4.23.1", "stable"),
+ ),
+ BugTaskStatus.DEFERRED,
+ {"repositories": ["test-repo"]},
+ )
+
+ soss_importer._update_bugtasks(bug, self.soss_record)
+ transaction.commit()
+
+ self._check_bugtasks(
+ bug.bugtasks,
+ self.bugtask_reference[2:],
+ BugTaskImportance.HIGH,
+ self.bug_importer,
+ )
+
+ def test_get_launchpad_cve(self):
+ """Test get a cve from Launchpad"""
+ soss_importer = SOSSImporter()
+ self.assertEqual(
+ soss_importer._get_launchpad_cve("2025-1979"), self.cve
+ )
+ self.assertEqual(soss_importer._get_launchpad_cve("2000-1111"), None)
+
+ def test_make_bug_description(self):
+ """Test make a bug description from a SOSSRecord"""
+ description = SOSSImporter()._make_bug_description(self.soss_record)
+ self.assertEqual(description, self.description)
+
+ def test_get_assignee(self):
+ """Test get an assignee person from Launchpad"""
+ soss_importer = SOSSImporter()
+
+ janitor = soss_importer._get_assignee("janitor")
+ self.assertEqual(janitor, self.janitor)
+ nonexistent = soss_importer._get_assignee("nonexistent")
+ self.assertEqual(nonexistent, None)
+
+ def test_create_external_package(self):
+ """Test create an ExternalPackage from SOSSRecord"""
+ soss_importer = SOSSImporter()
+
+ cargo_pkg = soss_importer._create_external_package(
+ self.soss_record.packages[SOSSRecord.PackageTypeEnum.RUST][0],
+ SOSSRecord.PackageTypeEnum.RUST,
+ )
+ self.assertEqual(cargo_pkg, self.bugtask_reference[3][0])
+
+ generic_pkg = soss_importer._create_external_package(
+ self.soss_record.packages[SOSSRecord.PackageTypeEnum.UNPACKAGED][
+ 0
+ ],
+ SOSSRecord.PackageTypeEnum.UNPACKAGED,
+ )
+ self.assertEqual(generic_pkg, self.bugtask_reference[4][0])
+
+ maven_pkg = soss_importer._create_external_package(
+ self.soss_record.packages[SOSSRecord.PackageTypeEnum.MAVEN][0],
+ SOSSRecord.PackageTypeEnum.MAVEN,
+ )
+ self.assertEqual(maven_pkg, self.bugtask_reference[5][0])
+
+ def test_prepare_cvss_data(self):
+ """Test prepare the cvss json"""
+ cvss = SOSSImporter()._prepare_cvss_data(self.soss_record)
+ self.assertEqual(cvss, self.cvss)
+
+ def test_validate_soss_record(self):
+ """Test validate the SOSSRecord"""
+ soss_importer = SOSSImporter()
+ valid = soss_importer._validate_soss_record(
+ self.soss_record, f"CVE-{self.cve.sequence}"
+ )
+ self.assertEqual(valid, True)
+
+ # SOSSRecord without packages is not valid
+ self.soss_record.packages = {}
+ valid = soss_importer._validate_soss_record(
+ self.soss_record, f"CVE-{self.cve.sequence}"
+ )
+ self.assertEqual(valid, False)
+
+ # SOSSRecord with candidate != sequence is not valid
+ self.soss_record.candidate = "nonvalid"
+ valid = soss_importer._validate_soss_record(
+ self.soss_record, f"CVE-{self.cve.sequence}"
+ )
+ self.assertEqual(valid, False)
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
index 1fdf48b..0f6d3dc 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossrecord.py
@@ -45,7 +45,7 @@ class TestSOSSRecord(TestCase):
"vulnerability is quite low, and that is reflected in this "
"priority assignment. "
),
- assigned_to="octagalland",
+ assigned_to="janitor",
packages={
SOSSRecord.PackageTypeEnum.UNPACKAGED: [
SOSSRecord.Package(
@@ -157,7 +157,7 @@ class TestSOSSRecord(TestCase):
"vulnerability is quite low, and that is reflected in this "
"priority assignment. "
),
- "Assigned-To": "octagalland",
+ "Assigned-To": "janitor",
"Packages": {
"unpackaged": [
{
@@ -271,7 +271,7 @@ class TestSOSSRecord(TestCase):
)
def test_from_yaml(self):
- load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979-full"
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979"
soss_record = None
with open(load_from) as f:
@@ -286,7 +286,7 @@ class TestSOSSRecord(TestCase):
)
def test_to_yaml(self):
- load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979-full"
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2025-1979"
with open(load_from) as f:
sample_data = f.read()
diff --git a/lib/lp/bugs/scripts/sossimport.py b/lib/lp/bugs/scripts/sossimport.py
new file mode 100644
index 0000000..1a905bf
--- /dev/null
+++ b/lib/lp/bugs/scripts/sossimport.py
@@ -0,0 +1,82 @@
+import logging
+from pathlib import Path
+
+from lp.app.enums import InformationType
+from lp.app.validators.cve import CVEREF_PATTERN
+from lp.bugs.scripts.soss import SOSSImporter
+from lp.services.scripts.base import LaunchpadScript
+
+logger = logging.getLogger(__name__)
+
+
+class SOSSImportScript(LaunchpadScript):
+ """CLI for SOSSImport
+
+ Command line options:
+ The filter option takes a glob-style pattern.
+ Example: `2007*` filters all CVEs from the year 2007.
+ """
+
+ usage = "usage: %prog [options] PATH"
+ description = (
+ "Import bugs into Launchpad from CVE entries in soss-cve-tracker. "
+ "PATH is either path to a CVE file, or path to a directory "
+ "containing the CVE files."
+ )
+ loglevel = logging.INFO
+
+ def add_my_options(self):
+ self.parser.add_option(
+ "--dry-run",
+ action="store_true",
+ dest="dry_run",
+ default=False,
+ help="Don't commit changes to the DB.",
+ )
+ self.parser.add_option(
+ "--filter",
+ action="store",
+ dest="filter",
+ default="*",
+ help="Apply given glob-style pattern to filter CVEs.",
+ )
+ self.parser.add_option(
+ "--information-type",
+ action="store",
+ dest="information_type",
+ default="private",
+ help=(
+ "The types used to control which users and teams can see the "
+ "bugs and vulnerabilities. Allowed values: public, private"
+ ),
+ )
+
+ def main(self):
+ if len(self.args) != 1:
+ self.parser.error("Please specify a path to import")
+ path = Path(self.args[0])
+ if path.is_dir():
+ logger.info(
+ "Importing CVE files from directory: %s", path.resolve()
+ )
+ cve_paths = sorted(
+ p
+ for p in path.rglob("CVE-%s" % self.options.filter)
+ if p.is_file() and CVEREF_PATTERN.match(p.name)
+ )
+ if not cve_paths:
+ logger.warning("Could not find CVE files in %s", path)
+ return
+ else:
+ cve_paths = [path]
+
+ information_type = InformationType.PRIVATESECURITY
+ if self.options.information_type == "public":
+ information_type = InformationType.PUBLICSECURITY
+
+ logger.info(f"Bug/Vulnerability visibility: {information_type}")
+ importer = SOSSImporter(
+ information_type=information_type, dry_run=self.options.dry_run
+ )
+ for cve_path in cve_paths:
+ importer.import_cve_from_file(cve_path)
diff --git a/lib/lp/bugs/scripts/tests/test_sossimport.py b/lib/lp/bugs/scripts/tests/test_sossimport.py
new file mode 100644
index 0000000..dc159a8
--- /dev/null
+++ b/lib/lp/bugs/scripts/tests/test_sossimport.py
@@ -0,0 +1,139 @@
+from pathlib import Path
+
+import transaction
+
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import LaunchpadZopelessLayer
+from lp.testing.script import run_script
+
+
+class TestSOSSImportScript(TestCaseWithFactory):
+ """Test the TestSOSSImportScript class."""
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super().setUp()
+ self.factory.makeCVE(sequence="2025-1979")
+ self.soss = self.factory.makeDistribution(
+ name="soss", displayname="SOSS"
+ )
+ transaction.commit()
+
+ self.sampledata = (
+ Path(__file__).parent / ".." / "soss" / "tests" / "sampledata"
+ )
+
+ def test_no_path_given(self):
+ """TestSOSSImportScript errors when no path given"""
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[],
+ )
+ self.assertEqual(2, exit_code)
+ self.assertEqual("", out)
+ self.assertEqual(
+ "Usage: soss-import.py [options] PATH\n\nsoss-import.py: "
+ "error: Please specify a path to import\n",
+ err,
+ )
+
+ def test_load_from_file(self):
+ load_from = self.sampledata / "CVE-2025-1979"
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "-vvv"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Importing CVE-2025-1979", err)
+ self.assertIn("Created bug with ID:", err)
+ self.assertIn("Created vulnerability with ID:", err)
+ self.assertIn("commit", err)
+
+ def test_load_from_directory(self):
+ load_from = Path(__file__).parent / ".." / "uct"
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "-vvv"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Could not find CVE files in", err)
+
+ load_from = self.sampledata
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "-vvv"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Aborting. CVE-2005-1544 was not imported", err)
+ self.assertIn("Aborting. CVE-2011-5000 was not imported", err)
+ self.assertIn("Aborting. CVE-2021-21300 was not imported", err)
+ self.assertIn("Importing CVE-2025-1979", err)
+ self.assertIn("commit", err)
+
+ def test_dry_run_does_not_crash(self):
+ load_from = self.sampledata / "CVE-2025-1979"
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--dry-run", "-vvv"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Importing CVE-2025-1979", err)
+ self.assertNotIn("commit", err)
+
+ def test_filter_cve(self):
+ load_from = self.sampledata
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--filter", "2005*"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("CVE-2005-1544", err)
+ self.assertNotIn("CVE-2011-5000", err)
+ self.assertNotIn("CVE-2021-21300", err)
+ self.assertNotIn("CVE-2025-1979", err)
+
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--filter", "202*"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertNotIn("CVE-2005-1544", err)
+ self.assertNotIn("CVE-2011-5000", err)
+ self.assertIn("CVE-2021-21300", err)
+ self.assertIn("CVE-2025-1979", err)
+
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--filter", "20[02][15]*"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("CVE-2005-1544", err)
+ self.assertNotIn("CVE-2011-5000", err)
+ self.assertIn("CVE-2021-21300", err)
+ self.assertIn("CVE-2025-1979", err)
+
+ def test_information_type(self):
+ load_from = self.sampledata / "CVE-2025-1979"
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--information-type", "public"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Bug/Vulnerability visibility: Public Security", err)
+
+ exit_code, out, err = run_script(
+ script="scripts/soss-import.py",
+ args=[str(load_from), "--information-type", "private"],
+ )
+ self.assertEqual(0, exit_code)
+ self.assertEqual("", out)
+ self.assertIn("Bug/Vulnerability visibility: Private Security", err)
diff --git a/lib/lp/registry/interfaces/externalpackage.py b/lib/lp/registry/interfaces/externalpackage.py
index 8a8ef57..d0116f7 100644
--- a/lib/lp/registry/interfaces/externalpackage.py
+++ b/lib/lp/registry/interfaces/externalpackage.py
@@ -108,9 +108,18 @@ class ExternalPackageType(DBEnumeratedType):
""",
)
- SNAP = DBItem(
+ GENERIC = DBItem(
1,
"""
+ Generic
+
+ Generic external package
+ """,
+ )
+
+ SNAP = DBItem(
+ 2,
+ """
Snap
Snap external package
@@ -118,7 +127,7 @@ class ExternalPackageType(DBEnumeratedType):
)
CHARM = DBItem(
- 2,
+ 3,
"""
Charm
@@ -127,7 +136,7 @@ class ExternalPackageType(DBEnumeratedType):
)
ROCK = DBItem(
- 3,
+ 4,
"""
Rock
@@ -136,7 +145,7 @@ class ExternalPackageType(DBEnumeratedType):
)
PYTHON = DBItem(
- 4,
+ 5,
"""
Python
@@ -145,7 +154,7 @@ class ExternalPackageType(DBEnumeratedType):
)
CONDA = DBItem(
- 5,
+ 6,
"""
Conda
@@ -154,7 +163,7 @@ class ExternalPackageType(DBEnumeratedType):
)
CARGO = DBItem(
- 6,
+ 7,
"""
Cargo
@@ -163,7 +172,7 @@ class ExternalPackageType(DBEnumeratedType):
)
MAVEN = DBItem(
- 7,
+ 8,
"""
Maven
diff --git a/lib/lp/registry/model/externalpackage.py b/lib/lp/registry/model/externalpackage.py
index 0022f2b..d874ca6 100644
--- a/lib/lp/registry/model/externalpackage.py
+++ b/lib/lp/registry/model/externalpackage.py
@@ -168,6 +168,11 @@ class ExternalPackage(
"""See `IBugTarget`."""
return self.distribution.bug_reported_acknowledgement
+ def getBugSummaryContextWhereClause(self):
+ """See `IBugSummaryDimension`."""
+ # 2025-08-06 TODO: add support for BugSummary
+ return False
+
def _getOfficialTagClause(self):
"""See `IBugTarget`."""
return self.distribution._getOfficialTagClause()
diff --git a/lib/lp/registry/model/externalpackageseries.py b/lib/lp/registry/model/externalpackageseries.py
index c18088a..b312ba4 100644
--- a/lib/lp/registry/model/externalpackageseries.py
+++ b/lib/lp/registry/model/externalpackageseries.py
@@ -192,6 +192,11 @@ class ExternalPackageSeries(
"""See `IBugTarget`."""
return self.distribution.bug_reported_acknowledgement
+ def getBugSummaryContextWhereClause(self):
+ """See `IBugSummaryDimension`."""
+ # 2025-08-06 TODO: add support for BugSummary
+ return False
+
def _getOfficialTagClause(self):
"""See `IBugTarget`."""
return self.distroseries._getOfficialTagClause()
diff --git a/scripts/soss-import.py b/scripts/soss-import.py
new file mode 100755
index 0000000..26a2131
--- /dev/null
+++ b/scripts/soss-import.py
@@ -0,0 +1,11 @@
+#!/usr/bin/python3 -S
+#
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+import _pythonpath # noqa: F401
+
+from lp.bugs.scripts.sossimport import SOSSImportScript
+
+if __name__ == "__main__":
+ script = SOSSImportScript("lp.services.scripts.uctimport")
+ script.run()
Follow ups