launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32974
[Merge] ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master
İlker Emre Koç has proposed merging ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master.
Commit message:
WIP: Add vulnerability job list endpoints
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492479
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index 9e83bef..80dda47 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -16,6 +16,7 @@ from lazr.restful.declarations import (
REQUEST_USER,
call_with,
collection_default_content,
+ export_read_operation,
export_write_operation,
exported,
exported_as_webservice_collection,
@@ -476,6 +477,47 @@ class IVulnerabilitySet(Interface):
data.
"""
+ @operation_parameters(
+ handler=Choice(
+ title=_("The handler to import or export vulnerability data"),
+ readonly=True,
+ required=True,
+ vocabulary=VulnerabilityHandlerEnum,
+ ),
+ )
+ @call_with(requester=REQUEST_USER)
+ @export_read_operation()
+ @operation_for_version("devel")
+ def getImports(
+ requester,
+ handler,
+ ):
+ """Return the latest 10 import jobs done and their statuses.
+
+ :param handler: Get the import jobs performed by this handler.
+ """
+
+ @operation_parameters(
+ handler=Choice(
+ title=_("The handler to import or export vulnerability data"),
+ readonly=True,
+ required=True,
+ vocabulary=VulnerabilityHandlerEnum,
+ ),
+ )
+ @call_with(requester=REQUEST_USER)
+ @export_read_operation()
+ @operation_for_version("devel")
+ def getExports(
+ requester,
+ handler,
+ ):
+ """Return the latest 10 export jobs done and link to their librarian
+ files.
+
+ :param handler: Get the export jobs performed by this handler.
+ """
+
@collection_default_content()
def empty_list():
"""Return an empty collection of vulnerabilities.
diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
index 6fdad6a..487ca57 100644
--- a/lib/lp/bugs/model/exportvulnerabilityjob.py
+++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
@@ -28,7 +28,7 @@ from lp.bugs.interfaces.vulnerabilityjob import (
)
from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.cve import Cve as CveModel
-from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.model.vulnerability import HANDLER_DISTRIBUTION_MAP, Vulnerability
from lp.bugs.model.vulnerabilityjob import (
VulnerabilityJob,
VulnerabilityJobDerived,
@@ -49,8 +49,6 @@ logger = logging.getLogger(__name__)
HANDLER_EXPORTER_MAP = {VulnerabilityHandlerEnum.SOSS: SOSSExporter}
-HANDLER_DISTRIBUTION_MAP = {VulnerabilityHandlerEnum.SOSS: "soss"}
-
@implementer(IExportVulnerabilityJob)
@provider(IExportVulnerabilityJobSource)
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index effa984..244d161 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -2,6 +2,8 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for the vulnerability and related models."""
+from pathlib import Path
+
from fixtures import MockPatch
from storm.store import Store
from testtools.matchers import MatchesStructure
@@ -27,6 +29,7 @@ from lp.bugs.interfaces.vulnerability import (
VulnerabilityChange,
)
from lp.bugs.interfaces.vulnerabilityjob import (
+ IExportVulnerabilityJobSource,
IImportVulnerabilityJobSource,
VulnerabilityJobInProgress,
)
@@ -36,6 +39,7 @@ from lp.bugs.model.vulnerability import (
UnauthorizedVulnerabilityHandler,
)
from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.code.tests.helpers import GitHostingFixture
from lp.registry.enums import BugSharingPolicy, TeamMembershipPolicy
from lp.services.webapp.authorization import check_permission
from lp.testing import (
@@ -48,7 +52,7 @@ from lp.testing import (
set_feature_flag,
verifyObject,
)
-from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.layers import DatabaseFunctionalLayer, LaunchpadZopelessLayer
from lp.testing.pages import webservice_for_person
@@ -1090,3 +1094,382 @@ class TestVulnerabilitySetWebService(TestCaseWithFactory):
},
set(response.body.decode().split("\n")),
)
+
+ def test_getImports_webservice_required_arguments_missing(self):
+ """Test that getImports webservice requires missing arguments."""
+ webservice = webservice_for_person(
+ self.requester,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.vulnerability_set_url,
+ "getImports",
+ )
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ {
+ "handler: Required input is missing.",
+ },
+ set(response.body.decode().split("\n")),
+ )
+
+ def test_getImports_webservice(self):
+ """Test that we can get Import jobs using the webservice."""
+ webservice = webservice_for_person(
+ self.requester,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.vulnerability_set_url,
+ "getImports",
+ handler=self.handler.title,
+ )
+ self.assertEqual(200, response.status)
+
+ def test_getExports_webservice_required_arguments_missing(self):
+ """Test that getExports webservice requires missing arguments."""
+ webservice = webservice_for_person(
+ self.requester,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.vulnerability_set_url,
+ "getExports",
+ )
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ {
+ "handler: Required input is missing.",
+ },
+ set(response.body.decode().split("\n")),
+ )
+
+ def test_getExports_webservice(self):
+ """Test that we can get Export jobs using the webservice."""
+
+ webservice = webservice_for_person(
+ self.requester,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.vulnerability_set_url,
+ "getExports",
+ handler=self.handler.title,
+ )
+ self.assertEqual(200, response.status)
+
+
+class TestVulnerabilityGetJobs(TestCaseWithFactory):
+ """Tests for collecting export vulnerability job info."""
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super().setUp()
+ self.requester = self.factory.makePerson()
+ self.handler = VulnerabilityHandlerEnum.SOSS
+
+ self.cve_path = (
+ Path(__file__).parent
+ / ".."
+ / ".."
+ / "scripts"
+ / "soss"
+ / "tests"
+ / "sampledata"
+ / "CVE-2025-1979"
+ )
+ self.repository = self.factory.makeGitRepository()
+ self.refs = self.factory.makeGitRefs(
+ repository=self.repository,
+ paths=("ref/heads/main", "ref/tags/v1.0"),
+ )
+
+ def _create_import_job(self):
+ """Helper to create a single ImportVulnerabilityJob."""
+ with open(self.cve_path, encoding="utf-8") as file:
+ self.useFixture(
+ GitHostingFixture(
+ blob=file.read(),
+ refs=self.refs,
+ diff_stats={"added": ["cves/CVE-2025-1979"]},
+ )
+ )
+
+ job = getUtility(IImportVulnerabilityJobSource).create(
+ handler=VulnerabilityHandlerEnum.SOSS,
+ git_repository=self.repository.id,
+ git_ref="ref/tags/v1.0",
+ git_paths=["cves"],
+ information_type=InformationType.PRIVATESECURITY.value,
+ )
+
+ return job
+
+ def _create_export_job(self):
+ """Helper to create a single ExportVulnerabilityJob.
+
+ Ensures an import job has already been completed before export.
+ """
+ import_job = self._create_import_job()
+
+ import_job.job.start()
+ import_job.run()
+ import_job.job.complete()
+
+ job = getUtility(IExportVulnerabilityJobSource).create(
+ handler=VulnerabilityHandlerEnum.SOSS,
+ information_type=InformationType.PRIVATESECURITY.value,
+ )
+
+ return job
+
+ def test_getImports(self):
+ """Test getImports function runs as expected."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ job = self._create_import_job()
+
+ job.run()
+
+ result = getUtility(IVulnerabilitySet).getImports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(1, len(result))
+ result = result[0]
+
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getImports_full(self):
+ """Test retrieval of exactly 10 import jobs."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ jobs = []
+
+ for _ in range(10):
+ job = self._create_import_job()
+ jobs.append(job)
+ job.job.start()
+ job.run()
+ job.job.complete()
+
+ jobs = jobs[::-1] # Reverse since the results will be reversed
+
+ results = getUtility(IVulnerabilitySet).getImports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(10, len(results))
+
+ # Validate all results against jobs
+ for job, result in zip(jobs, results):
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getImports_more_than_10(self):
+ """Test that only the last 10 import jobs are returned."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ jobs = []
+
+ for _ in range(11):
+ job = self._create_import_job()
+ jobs.append(job)
+ job.job.start()
+ job.run()
+ job.job.complete()
+
+ # Reverse and delete the first job since the results will use
+ # the last 10 jobs in reverse
+ jobs = jobs[-1::-1]
+
+ results = getUtility(IVulnerabilitySet).getImports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(10, len(results))
+
+ for job, result in zip(jobs, results):
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getImports_unauthenticated(self):
+ """Test imports fail when requester lacks handler rights."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss")
+
+ job = self._create_import_job()
+
+ job.run()
+
+ self.assertRaisesWithContent(
+ UnauthorizedVulnerabilityHandler,
+ f"You don't have enough rights to use {self.handler}",
+ getUtility(IVulnerabilitySet).getImports,
+ self.requester,
+ self.handler,
+ )
+
+ def test_getExports(self):
+ """Test getExports function runs as expected."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ job = self._create_export_job()
+
+ job.run()
+
+ result = getUtility(IVulnerabilitySet).getExports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(1, len(result))
+ result = result[0]
+
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+ self.assertEqual(result["data"], naked_job.metadata["data"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getExports_full(self):
+ """Test retrieval of exactly 10 export jobs."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ jobs = []
+
+ for _ in range(10):
+ job = self._create_export_job()
+ jobs.append(job)
+ job.job.start()
+ job.run()
+ job.job.complete()
+
+ jobs = jobs[::-1] # Reverse since the results will be reversed
+
+ results = getUtility(IVulnerabilitySet).getExports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(10, len(results))
+
+ for job, result in zip(jobs, results):
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+ self.assertEqual(result["data"], naked_job.metadata["data"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getExports_more_than_10(self):
+ """Test that only the last 10 export jobs are returned."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss", owner=self.requester)
+
+ jobs = []
+
+ for _ in range(11):
+ job = self._create_export_job()
+ jobs.append(job)
+ job.job.start()
+ job.run()
+ job.job.complete()
+
+ # Reverse and delete the first job since the results will use
+ # the last 10 jobs in reverse
+ jobs = jobs[-1::-1]
+
+ results = getUtility(IVulnerabilitySet).getExports(
+ self.requester,
+ self.handler,
+ )
+
+ self.assertEqual(10, len(results))
+
+ for job, result in zip(jobs, results):
+ naked_job = removeSecurityProxy(job)
+
+ self.assertEqual(result["request"], naked_job.metadata["request"])
+ self.assertEqual(result["result"], naked_job.metadata["result"])
+ self.assertEqual(result["data"], naked_job.metadata["data"])
+
+ job_details = {
+ "created_at": naked_job.job.date_created,
+ "started_at": naked_job.job.date_started,
+ "finished_at": naked_job.job.date_finished,
+ }
+
+ self.assertEqual(result["job_details"], job_details)
+
+ def test_getExports_unauthenticated(self):
+ """Test exports fail when requester lacks handler rights."""
+ self.factory.makeCVE("2025-1979")
+ self.factory.makeDistribution(name="soss")
+
+ job = self._create_export_job()
+
+ job.run()
+
+ self.assertRaisesWithContent(
+ UnauthorizedVulnerabilityHandler,
+ f"You don't have enough rights to use {self.handler}",
+ getUtility(IVulnerabilitySet).getExports,
+ self.requester,
+ self.handler,
+ )
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index c337b8f..93acc60 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -5,6 +5,7 @@ __all__ = [
"get_vulnerability_privacy_filter",
"Vulnerability",
"VulnerabilitySet",
+ "HANDLER_DISTRIBUTION_MAP",
]
import operator
@@ -12,7 +13,7 @@ from datetime import timezone
from typing import Iterable
from storm.databases.postgres import JSON
-from storm.expr import SQL, Coalesce, Join, Or, Select
+from storm.expr import SQL, Coalesce, Desc, Join, Or, Select
from storm.locals import DateTime, Int, Reference, Unicode
from storm.store import Store
from zope.component import getUtility
@@ -49,6 +50,7 @@ from lp.registry.interfaces.accesspolicy import (
IAccessArtifactGrantSource,
IAccessArtifactSource,
)
+from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.role import IPersonRoles
from lp.registry.model.accesspolicy import reconcile_access_for_artifacts
from lp.registry.model.person import Person
@@ -66,6 +68,8 @@ VULNERABILITY_IMPORT_ENABLED_FEATURE_FLAG = (
"bugs.vulnerability_import_export.enabled"
)
+HANDLER_DISTRIBUTION_MAP = {VulnerabilityHandlerEnum.SOSS: "soss"}
+
@implementer(IVulnerability, IBugLinkTarget)
class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
@@ -473,6 +477,107 @@ class VulnerabilitySet:
return None
+ def _get_jobs_info(self, vulnerability_jobs):
+ # Merge fields from Job and VulnerabilityJob.metadata
+ jobs_info = []
+ for vulnerability_job in vulnerability_jobs:
+ job = vulnerability_job.job
+ if not job:
+ continue
+
+ job_details = {
+ "created_at": job.date_created,
+ "started_at": job.date_started,
+ "finished_at": job.date_finished,
+ }
+ # breakpoint()
+ job_info = dict(
+ vulnerability_job.metadata or {}
+ ).copy() # copy so we don’t mutate the original
+
+ request = job_info.get("request")
+ if request and "information_type" in request:
+ info_type_number = request["information_type"]
+ try:
+ request["information_type"] = InformationType.items[
+ info_type_number
+ ].title
+ except (KeyError, IndexError):
+ request["information_type"] = ""
+
+ job_info["job_details"] = job_details
+ # breakpoint()
+ jobs_info.append(job_info)
+
+ return jobs_info
+
+ def getImports(
+ self,
+ requester,
+ handler,
+ ):
+ from lp.bugs.model.importvulnerabilityjob import ImportVulnerabilityJob
+ from lp.bugs.model.vulnerabilityjob import VulnerabilityJob
+ from lp.bugs.scripts.soss.sossimport import SOSSImporter
+
+ if handler == VulnerabilityHandlerEnum.SOSS:
+ importer = SOSSImporter()
+ else:
+ raise NotFoundError(f"{handler} not found")
+
+ if not importer.checkUserPermissions(requester):
+ raise UnauthorizedVulnerabilityHandler(
+ f"You don't have enough rights to use {handler}"
+ )
+
+ store = IStore(VulnerabilityJob)
+
+ vulnerability_jobs = (
+ store.find(
+ VulnerabilityJob,
+ VulnerabilityJob.job_type
+ == ImportVulnerabilityJob.class_job_type,
+ VulnerabilityJob.handler == handler,
+ )
+ .order_by(Desc(VulnerabilityJob.id))
+ .config(limit=10)
+ )
+
+ return self._get_jobs_info(vulnerability_jobs)
+
+ def getExports(self, requester, handler):
+ from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
+ from lp.bugs.model.vulnerabilityjob import VulnerabilityJob
+ from lp.bugs.scripts.soss.sossexport import SOSSExporter
+
+ if handler == VulnerabilityHandlerEnum.SOSS:
+ exporter = SOSSExporter()
+ distribution = getUtility(IDistributionSet).getByName(
+ HANDLER_DISTRIBUTION_MAP[handler]
+ )
+ else:
+ raise NotFoundError(f"{handler} not found")
+
+ if not exporter.checkUserPermissions(requester, distribution):
+ raise UnauthorizedVulnerabilityHandler(
+ f"You don't have enough rights to use {handler}"
+ )
+
+ store = IStore(VulnerabilityJob)
+
+ vulnerability_jobs = (
+ store.find(
+ VulnerabilityJob,
+ VulnerabilityJob.job_type
+ == ExportVulnerabilityJob.class_job_type,
+ VulnerabilityJob.handler == handler,
+ )
+ .order_by(Desc(VulnerabilityJob.id))
+ .config(limit=10)
+ )
+
+ return self._get_jobs_info(vulnerability_jobs)
+
def empty_list(self):
"""See `IVulnerabilitySet`."""
return []
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index a8659d5..07fde36 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -18,7 +18,9 @@ from lp.bugs.scripts.soss.sossimport import (
PACKAGE_TYPE_MAP,
PRIORITY_ENUM_MAP,
)
+from lp.registry.interfaces.role import IPersonRoles
from lp.registry.model.distribution import Distribution
+from lp.registry.security import SecurityAdminDistribution
__all__ = [
"SOSSExporter",
@@ -178,3 +180,8 @@ class SOSSExporter:
if date_obj and date_obj.tzinfo is not None:
return date_obj.replace(tzinfo=None)
return date_obj
+
+ def checkUserPermissions(self, user, distribution):
+ return SecurityAdminDistribution(distribution).checkAuthenticated(
+ IPersonRoles(user)
+ )
Follow ups