← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~enriqueesanchz/launchpad:add-import-export-endpoints into launchpad:master

 

Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-import-export-endpoints into launchpad:master.

Commit message:
Fix SOSSImport find vulnerability and bug helpers
    
Helpers will be able to return the existing vulnerability and bug
without using person_logged_in.
Add Cve.getDistributionVulnerability.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491770
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-import-export-endpoints into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
index 634be7e..caef426 100644
--- a/lib/lp/bugs/interfaces/bug.py
+++ b/lib/lp/bugs/interfaces/bug.py
@@ -115,6 +115,7 @@ class CreateBugParams:
         importance=None,
         milestone=None,
         assignee=None,
+        validate_assignee=True,
         cve=None,
         metadata=None,
         check_permissions=True,
@@ -136,6 +137,7 @@ class CreateBugParams:
         self.importance = importance
         self.milestone = milestone
         self.assignee = assignee
+        self.validate_assignee = validate_assignee
         self.cve = cve
         self.metadata = metadata
         self.check_permissions = check_permissions
diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
index 204dcd7..4efd60c 100644
--- a/lib/lp/bugs/interfaces/cve.py
+++ b/lib/lp/bugs/interfaces/cve.py
@@ -189,6 +189,9 @@ class ICve(Interface):
     def setCVSSVectorForAuthority(authority, vector_string):
         """Set the CVSS vector string from an authority."""
 
+    def getDistributionVulnerability(self, distribution):
+        """Return the linked vulnerability for the given distribution."""
+
     def getVulnerabilitiesVisibleToUser(user):
         """Return the linked vulnerabilities visible to the given user."""
 
diff --git a/lib/lp/bugs/model/bug.py b/lib/lp/bugs/model/bug.py
index 2d3f90a..d48b166 100644
--- a/lib/lp/bugs/model/bug.py
+++ b/lib/lp/bugs/model/bug.py
@@ -248,6 +248,7 @@ def snapshot_bug_params(bug_params):
             "importance",
             "milestone",
             "assignee",
+            "validate_assignee",
             "cve",
             "metadata",
             "check_permissions",
@@ -3337,7 +3338,9 @@ class BugSet:
 
         bug_task = bug.default_bugtask
         if params.assignee:
-            bug_task.transitionToAssignee(params.assignee)
+            bug_task.transitionToAssignee(
+                params.assignee, validate=params.validate_assignee
+            )
         if params.importance:
             bug_task.transitionToImportance(params.importance, params.owner)
         if params.milestone:
diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
index 3bf1194..ce24b2d 100644
--- a/lib/lp/bugs/model/cve.py
+++ b/lib/lp/bugs/model/cve.py
@@ -115,6 +115,18 @@ class Cve(StormBase, BugLinkTargetMixin):
             sorted(bulk.load(Bug, bug_ids), key=operator.attrgetter("id"))
         )
 
+    def getDistributionVulnerability(self, distribution):
+        """See `ICve`."""
+        return (
+            Store.of(self)
+            .find(
+                Vulnerability,
+                Vulnerability.cve == self,
+                Vulnerability.distribution_id == distribution.id,
+            )
+            .one()
+        )
+
     def getVulnerabilitiesVisibleToUser(self, user):
         """See `ICve`."""
         vulnerabilities = Store.of(self).find(
diff --git a/lib/lp/bugs/scripts/soss/sossimport.py b/lib/lp/bugs/scripts/soss/sossimport.py
index e0026e1..bdc004c 100644
--- a/lib/lp/bugs/scripts/soss/sossimport.py
+++ b/lib/lp/bugs/scripts/soss/sossimport.py
@@ -37,7 +37,6 @@ from lp.registry.model.distribution import Distribution
 from lp.registry.model.externalpackage import ExternalPackage
 from lp.registry.model.person import Person
 from lp.registry.security import SecurityAdminDistribution
-from lp.testing import person_logged_in
 
 __all__ = [
     "SOSSImporter",
@@ -86,7 +85,7 @@ class SOSSImporter:
 
     def __init__(
         self,
-        information_type: InformationType = InformationType.PRIVATESECURITY,
+        information_type: InformationType = InformationType.PROPRIETARY,
         dry_run: bool = False,
     ) -> None:
         self.information_type = information_type
@@ -114,9 +113,7 @@ class SOSSImporter:
         with open(cve_path, encoding="utf-8") as file:
             soss_record = SOSSRecord.from_yaml(file)
 
-        with person_logged_in(self.bug_importer):
-            bug, vulnerability = self.import_cve(soss_record, cve_sequence)
-
+        bug, vulnerability = self.import_cve(soss_record, cve_sequence)
         return bug, vulnerability
 
     def import_cve(
@@ -184,6 +181,7 @@ class SOSSImporter:
                 status=PACKAGE_STATUS_MAP[package.status],
                 status_explanation=package.note,
                 assignee=assignee,
+                validate_assignee=False,
                 importance=PRIORITY_ENUM_MAP[soss_record.priority],
                 cve=lp_cve,
                 metadata=metadata,
@@ -265,7 +263,7 @@ class SOSSImporter:
 
     def _update_vulnerability(
         self, vulnerability: Vulnerability, soss_record: SOSSRecord
-    ) -> None:
+    ) -> Vulnerability:
         """
         Update a Vulnerability model with the information
         contained in a SOSSRecord
@@ -290,6 +288,7 @@ class SOSSImporter:
             "[SOSSImporter] Updated Vulnerability with ID: "
             f"{vulnerability.id} for {vulnerability.distribution.name}",
         )
+        return vulnerability
 
     def _find_existing_bug(
         self,
@@ -298,16 +297,19 @@ class SOSSImporter:
         distribution: Distribution,
     ) -> Optional[BugModel]:
         """Find existing bug for the given CVE."""
-        for vulnerability in lp_cve.vulnerabilities:
-            if vulnerability.distribution == distribution:
-                bugs = vulnerability.bugs
-                if len(bugs) > 1:
-                    raise ValueError(
-                        "Multiple existing bugs found for CVE ",
-                        soss_record.sequence,
-                    )
-                if bugs:
-                    return bugs[0]
+        vulnerability = self._find_existing_vulnerability(lp_cve, distribution)
+        if not vulnerability:
+            return None
+
+        bugs = vulnerability.bugs
+        if len(bugs) > 1:
+            raise ValueError(
+                "Multiple existing bugs found for CVE ",
+                soss_record.sequence,
+            )
+        if bugs:
+            return bugs[0]
+
         return None
 
     def _find_existing_vulnerability(
@@ -317,15 +319,7 @@ class SOSSImporter:
         if not lp_cve:
             return None
 
-        vulnerability = next(
-            (
-                v
-                for v in lp_cve.vulnerabilities
-                if v.distribution == distribution
-            ),
-            None,
-        )
-        return vulnerability
+        return lp_cve.getDistributionVulnerability(distribution)
 
     def _create_or_update_bugtasks(
         self, bug: BugModel, soss_record: SOSSRecord
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossimport.py b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
index a79e5b2..ef21825 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
@@ -11,7 +11,7 @@ from lp.bugs.scripts.soss import SOSSRecord
 from lp.bugs.scripts.soss.sossimport import SOSSImporter
 from lp.registry.interfaces.externalpackage import ExternalPackageType
 from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
-from lp.testing import TestCaseWithFactory, person_logged_in
+from lp.testing import TestCaseWithFactory
 from lp.testing.layers import LaunchpadZopelessLayer
 
 
@@ -33,6 +33,7 @@ class TestSOSSImporter(TestCaseWithFactory):
             name="soss",
             displayname="SOSS",
             owner=self.owner,
+            information_type=InformationType.PROPRIETARY,
         )
         transaction.commit()
 
@@ -187,7 +188,7 @@ class TestSOSSImporter(TestCaseWithFactory):
         """Helper function to check the imported bug"""
         self.assertEqual(bug.description, self.description)
         self.assertEqual(bug.title, self.cve.sequence)
-        self.assertEqual(bug.information_type, InformationType.PRIVATESECURITY)
+        self.assertEqual(bug.information_type, InformationType.PROPRIETARY)
         self.assertEqual(bug.owner, self.bug_importer)
 
         self._check_bugtasks(
@@ -210,7 +211,7 @@ class TestSOSSImporter(TestCaseWithFactory):
         self.assertEqual(vulnerability.date_notice_issued, None)
         self.assertEqual(vulnerability.date_coordinated_release, None)
         self.assertEqual(
-            vulnerability.information_type, InformationType.PRIVATESECURITY
+            vulnerability.information_type, InformationType.PROPRIETARY
         )
         self.assertEqual(vulnerability.importance, BugTaskImportance.LOW)
         self.assertEqual(
@@ -235,7 +236,7 @@ class TestSOSSImporter(TestCaseWithFactory):
         file = self.sampledata / "CVE-2025-1979"
 
         soss_importer = SOSSImporter(
-            information_type=InformationType.PRIVATESECURITY
+            information_type=InformationType.PROPRIETARY
         )
         bug, vulnerability = soss_importer.import_cve_from_file(file)
 
@@ -245,10 +246,14 @@ class TestSOSSImporter(TestCaseWithFactory):
         # Check vulnerability
         self._check_vulnerability_fields(vulnerability, bug)
 
+        # Import again to check that it doesn't create new objects
+        bug_copy, vulnerability_copy = soss_importer.import_cve_from_file(file)
+        self.assertEqual(bug, bug_copy)
+        self.assertEqual(vulnerability, vulnerability_copy)
+
     def test_create_update_bug(self):
         """Test create and update a bug from a SOSS cve file"""
-        with person_logged_in(self.bug_importer):
-            bug = SOSSImporter()._create_bug(self.soss_record, self.cve)
+        bug = SOSSImporter()._create_bug(self.soss_record, self.cve)
 
         self._check_bug_fields(bug, self.bugtask_reference)
 
@@ -273,14 +278,14 @@ class TestSOSSImporter(TestCaseWithFactory):
         self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.RUST)
 
         bug = SOSSImporter(
-            information_type=InformationType.PUBLICSECURITY
+            information_type=InformationType.PROPRIETARY
         )._update_bug(bug, self.soss_record, new_cve)
         transaction.commit()
 
         # Check bug fields
         self.assertEqual(bug.description, new_description)
         self.assertEqual(bug.title, new_cve.sequence)
-        self.assertEqual(bug.information_type, InformationType.PUBLICSECURITY)
+        self.assertEqual(bug.information_type, InformationType.PROPRIETARY)
 
         # Check bugtasks
         bugtasks = bug.bugtasks
@@ -292,11 +297,10 @@ class TestSOSSImporter(TestCaseWithFactory):
     def test_create_update_vulnerability(self):
         """Test create and update a vulnerability from a SOSS cve file"""
         soss_importer = SOSSImporter()
-        with person_logged_in(self.bug_importer):
-            bug = soss_importer._create_bug(self.soss_record, self.cve)
-            vulnerability = soss_importer._create_vulnerability(
-                bug, self.soss_record, self.cve, self.soss
-            )
+        bug = soss_importer._create_bug(self.soss_record, self.cve)
+        vulnerability = soss_importer._create_vulnerability(
+            bug, self.soss_record, self.cve, self.soss
+        )
 
         self.assertEqual(vulnerability.distribution, self.soss)
         self.assertEqual(
@@ -309,7 +313,7 @@ class TestSOSSImporter(TestCaseWithFactory):
         self.assertEqual(vulnerability.date_notice_issued, None)
         self.assertEqual(vulnerability.date_coordinated_release, None)
         self.assertEqual(
-            vulnerability.information_type, InformationType.PRIVATESECURITY
+            vulnerability.information_type, InformationType.PROPRIETARY
         )
         self.assertEqual(vulnerability.importance, BugTaskImportance.LOW)
         self.assertEqual(
@@ -332,8 +336,7 @@ class TestSOSSImporter(TestCaseWithFactory):
     def test_create_or_update_bugtasks(self):
         """Test update bugtasks"""
         soss_importer = SOSSImporter()
-        with person_logged_in(self.bug_importer):
-            bug = soss_importer._create_bug(self.soss_record, self.cve)
+        bug = soss_importer._create_bug(self.soss_record, self.cve)
 
         self._check_bugtasks(
             bug.bugtasks,
diff --git a/lib/lp/bugs/tests/test_bug.py b/lib/lp/bugs/tests/test_bug.py
index d4265f9..3bc678c 100644
--- a/lib/lp/bugs/tests/test_bug.py
+++ b/lib/lp/bugs/tests/test_bug.py
@@ -334,6 +334,26 @@ class TestBugCreation(TestCaseWithFactory):
             assignee=person_2,
         )
 
+    def test_CreateBugParams_accepts_validate_asignee_changes(self):
+        # createBug() will accept any assignee change if validate_assignee
+        # equals false
+        person = self.factory.makePerson()
+        person_2 = self.factory.makePerson()
+        target = self.factory.makeProduct()
+        # Setting the target's bug supervisor means that
+        # canTransitionToAssignee() will return False for `person` if
+        # another Person is passed as `assignee`.
+        with person_logged_in(target.owner):
+            target.bug_supervisor = target.owner
+
+        bug = self.createBug(
+            owner=person,
+            target=target,
+            assignee=person_2,
+            validate_assignee=False,
+        )
+        self.assertEqual(person_2, bug.default_bugtask.assignee)
+
     def test_CreateBugParams_rejects_not_allowed_milestone_changes(self):
         # createBug() will reject any importance value passed by users
         # who don't have the right to set the milestone.
diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
index 601c672..23fd423 100644
--- a/lib/lp/bugs/tests/test_cve.py
+++ b/lib/lp/bugs/tests/test_cve.py
@@ -10,11 +10,13 @@ from testtools.testcase import ExpectedException
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
+from lp.app.enums import InformationType
 from lp.bugs.interfaces.bugtasksearch import BugTaskSearchParams
 from lp.bugs.interfaces.cve import CveStatus, ICveSet
 from lp.bugs.scripts.uct.models import CVSS
 from lp.testing import (
     TestCaseWithFactory,
+    admin_logged_in,
     login_person,
     person_logged_in,
     verifyObject,
@@ -357,3 +359,27 @@ class TestCve(TestCaseWithFactory):
             },
             unproxied_cve.cvss,
         )
+
+    def test_getDistributionVulnerability(self):
+        cve = self.factory.makeCVE(sequence="2099-1234")
+        distribution = self.factory.makeDistribution(
+            information_type=InformationType.PROPRIETARY
+        )
+        vulnerability = self.factory.makeVulnerability(
+            distribution=distribution,
+            cve=cve,
+            information_type=InformationType.PROPRIETARY,
+        )
+
+        # getDistributionVulnerability returns the vulnerability although we
+        # are not logged in
+        self.assertEqual(
+            vulnerability, cve.getDistributionVulnerability(distribution)
+        )
+
+        # As we are not logged as an user, cve.vulnerabilities is empty
+        self.assertEqual(len(list(cve.vulnerabilities)), 0)
+
+        # Admin can see the PROPRIETARY vulnerability
+        with admin_logged_in():
+            self.assertEqual(vulnerability, cve.vulnerabilities[0])