← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master

 

İlker Emre Koç has proposed merging ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master.

Commit message:
WIP: Add vulnerability job list endpoints

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492479
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-eximport-endpoints into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index 9e83bef..80dda47 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -16,6 +16,7 @@ from lazr.restful.declarations import (
     REQUEST_USER,
     call_with,
     collection_default_content,
+    export_read_operation,
     export_write_operation,
     exported,
     exported_as_webservice_collection,
@@ -476,6 +477,47 @@ class IVulnerabilitySet(Interface):
             data.
         """
 
+    @operation_parameters(
+        handler=Choice(
+            title=_("The handler to import or export vulnerability data"),
+            readonly=True,
+            required=True,
+            vocabulary=VulnerabilityHandlerEnum,
+        ),
+    )
+    @call_with(requester=REQUEST_USER)
+    @export_read_operation()
+    @operation_for_version("devel")
+    def getImports(
+        requester,
+        handler,
+    ):
+        """Return the latest 10 import jobs done and their statuses.
+
+        :param handler: Get the import jobs performed by this handler.
+        """
+
+    @operation_parameters(
+        handler=Choice(
+            title=_("The handler to import or export vulnerability data"),
+            readonly=True,
+            required=True,
+            vocabulary=VulnerabilityHandlerEnum,
+        ),
+    )
+    @call_with(requester=REQUEST_USER)
+    @export_read_operation()
+    @operation_for_version("devel")
+    def getExports(
+        requester,
+        handler,
+    ):
+        """Return the latest 10 export jobs done and link to their librarian
+        files.
+
+        :param handler: Get the export jobs performed by this handler.
+        """
+
     @collection_default_content()
     def empty_list():
         """Return an empty collection of vulnerabilities.
diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
index 6fdad6a..487ca57 100644
--- a/lib/lp/bugs/model/exportvulnerabilityjob.py
+++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
@@ -28,7 +28,7 @@ from lp.bugs.interfaces.vulnerabilityjob import (
 )
 from lp.bugs.model.bug import Bug as BugModel
 from lp.bugs.model.cve import Cve as CveModel
-from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.model.vulnerability import HANDLER_DISTRIBUTION_MAP, Vulnerability
 from lp.bugs.model.vulnerabilityjob import (
     VulnerabilityJob,
     VulnerabilityJobDerived,
@@ -49,8 +49,6 @@ logger = logging.getLogger(__name__)
 
 HANDLER_EXPORTER_MAP = {VulnerabilityHandlerEnum.SOSS: SOSSExporter}
 
-HANDLER_DISTRIBUTION_MAP = {VulnerabilityHandlerEnum.SOSS: "soss"}
-
 
 @implementer(IExportVulnerabilityJob)
 @provider(IExportVulnerabilityJobSource)
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index effa984..244d161 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -2,6 +2,8 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the vulnerability and related models."""
+from pathlib import Path
+
 from fixtures import MockPatch
 from storm.store import Store
 from testtools.matchers import MatchesStructure
@@ -27,6 +29,7 @@ from lp.bugs.interfaces.vulnerability import (
     VulnerabilityChange,
 )
 from lp.bugs.interfaces.vulnerabilityjob import (
+    IExportVulnerabilityJobSource,
     IImportVulnerabilityJobSource,
     VulnerabilityJobInProgress,
 )
@@ -36,6 +39,7 @@ from lp.bugs.model.vulnerability import (
     UnauthorizedVulnerabilityHandler,
 )
 from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.code.tests.helpers import GitHostingFixture
 from lp.registry.enums import BugSharingPolicy, TeamMembershipPolicy
 from lp.services.webapp.authorization import check_permission
 from lp.testing import (
@@ -48,7 +52,7 @@ from lp.testing import (
     set_feature_flag,
     verifyObject,
 )
-from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.layers import DatabaseFunctionalLayer, LaunchpadZopelessLayer
 from lp.testing.pages import webservice_for_person
 
 
@@ -1090,3 +1094,382 @@ class TestVulnerabilitySetWebService(TestCaseWithFactory):
             },
             set(response.body.decode().split("\n")),
         )
+
+    def test_getImports_webservice_required_arguments_missing(self):
+        """Test that getImports webservice requires missing arguments."""
+        webservice = webservice_for_person(
+            self.requester,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.vulnerability_set_url,
+            "getImports",
+        )
+        self.assertEqual(400, response.status)
+        self.assertEqual(
+            {
+                "handler: Required input is missing.",
+            },
+            set(response.body.decode().split("\n")),
+        )
+
+    def test_getImports_webservice(self):
+        """Test that we can get Import jobs using the webservice."""
+        webservice = webservice_for_person(
+            self.requester,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.vulnerability_set_url,
+            "getImports",
+            handler=self.handler.title,
+        )
+        self.assertEqual(200, response.status)
+
+    def test_getExports_webservice_required_arguments_missing(self):
+        """Test that getExports webservice requires missing arguments."""
+        webservice = webservice_for_person(
+            self.requester,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.vulnerability_set_url,
+            "getExports",
+        )
+        self.assertEqual(400, response.status)
+        self.assertEqual(
+            {
+                "handler: Required input is missing.",
+            },
+            set(response.body.decode().split("\n")),
+        )
+
+    def test_getExports_webservice(self):
+        """Test that we can get Export jobs using the webservice."""
+
+        webservice = webservice_for_person(
+            self.requester,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.vulnerability_set_url,
+            "getExports",
+            handler=self.handler.title,
+        )
+        self.assertEqual(200, response.status)
+
+
+class TestVulnerabilityGetJobs(TestCaseWithFactory):
+    """Tests for collecting export vulnerability job info."""
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super().setUp()
+        self.requester = self.factory.makePerson()
+        self.handler = VulnerabilityHandlerEnum.SOSS
+
+        self.cve_path = (
+            Path(__file__).parent
+            / ".."
+            / ".."
+            / "scripts"
+            / "soss"
+            / "tests"
+            / "sampledata"
+            / "CVE-2025-1979"
+        )
+        self.repository = self.factory.makeGitRepository()
+        self.refs = self.factory.makeGitRefs(
+            repository=self.repository,
+            paths=("ref/heads/main", "ref/tags/v1.0"),
+        )
+
+    def _create_import_job(self):
+        """Helper to create a single ImportVulnerabilityJob."""
+        with open(self.cve_path, encoding="utf-8") as file:
+            self.useFixture(
+                GitHostingFixture(
+                    blob=file.read(),
+                    refs=self.refs,
+                    diff_stats={"added": ["cves/CVE-2025-1979"]},
+                )
+            )
+
+        job = getUtility(IImportVulnerabilityJobSource).create(
+            handler=VulnerabilityHandlerEnum.SOSS,
+            git_repository=self.repository.id,
+            git_ref="ref/tags/v1.0",
+            git_paths=["cves"],
+            information_type=InformationType.PRIVATESECURITY.value,
+        )
+
+        return job
+
+    def _create_export_job(self):
+        """Helper to create a single ExportVulnerabilityJob.
+
+        Ensures an import job has already been completed before export.
+        """
+        import_job = self._create_import_job()
+
+        import_job.job.start()
+        import_job.run()
+        import_job.job.complete()
+
+        job = getUtility(IExportVulnerabilityJobSource).create(
+            handler=VulnerabilityHandlerEnum.SOSS,
+            information_type=InformationType.PRIVATESECURITY.value,
+        )
+
+        return job
+
+    def test_getImports(self):
+        """Test getImports function runs as expected."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        job = self._create_import_job()
+
+        job.run()
+
+        result = getUtility(IVulnerabilitySet).getImports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(1, len(result))
+        result = result[0]
+
+        naked_job = removeSecurityProxy(job)
+
+        self.assertEqual(result["request"], naked_job.metadata["request"])
+        self.assertEqual(result["result"], naked_job.metadata["result"])
+
+        job_details = {
+            "created_at": naked_job.job.date_created,
+            "started_at": naked_job.job.date_started,
+            "finished_at": naked_job.job.date_finished,
+        }
+
+        self.assertEqual(result["job_details"], job_details)
+
+    def test_getImports_full(self):
+        """Test retrieval of exactly 10 import jobs."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        jobs = []
+
+        for _ in range(10):
+            job = self._create_import_job()
+            jobs.append(job)
+            job.job.start()
+            job.run()
+            job.job.complete()
+
+        jobs = jobs[::-1]  # Reverse since the results will be reversed
+
+        results = getUtility(IVulnerabilitySet).getImports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(10, len(results))
+
+        # Validate all results against jobs
+        for job, result in zip(jobs, results):
+            naked_job = removeSecurityProxy(job)
+
+            self.assertEqual(result["request"], naked_job.metadata["request"])
+            self.assertEqual(result["result"], naked_job.metadata["result"])
+
+            job_details = {
+                "created_at": naked_job.job.date_created,
+                "started_at": naked_job.job.date_started,
+                "finished_at": naked_job.job.date_finished,
+            }
+
+            self.assertEqual(result["job_details"], job_details)
+
+    def test_getImports_more_than_10(self):
+        """Test that only the last 10 import jobs are returned."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        jobs = []
+
+        for _ in range(11):
+            job = self._create_import_job()
+            jobs.append(job)
+            job.job.start()
+            job.run()
+            job.job.complete()
+
+        # Reverse and delete the first job since the results will use
+        # the last 10 jobs in reverse
+        jobs = jobs[-1::-1]
+
+        results = getUtility(IVulnerabilitySet).getImports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(10, len(results))
+
+        for job, result in zip(jobs, results):
+            naked_job = removeSecurityProxy(job)
+
+            self.assertEqual(result["request"], naked_job.metadata["request"])
+            self.assertEqual(result["result"], naked_job.metadata["result"])
+
+            job_details = {
+                "created_at": naked_job.job.date_created,
+                "started_at": naked_job.job.date_started,
+                "finished_at": naked_job.job.date_finished,
+            }
+
+            self.assertEqual(result["job_details"], job_details)
+
+    def test_getImports_unauthenticated(self):
+        """Test imports fail when requester lacks handler rights."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss")
+
+        job = self._create_import_job()
+
+        job.run()
+
+        self.assertRaisesWithContent(
+            UnauthorizedVulnerabilityHandler,
+            f"You don't have enough rights to use {self.handler}",
+            getUtility(IVulnerabilitySet).getImports,
+            self.requester,
+            self.handler,
+        )
+
+    def test_getExports(self):
+        """Test getExports function runs as expected."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        job = self._create_export_job()
+
+        job.run()
+
+        result = getUtility(IVulnerabilitySet).getExports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(1, len(result))
+        result = result[0]
+
+        naked_job = removeSecurityProxy(job)
+
+        self.assertEqual(result["request"], naked_job.metadata["request"])
+        self.assertEqual(result["result"], naked_job.metadata["result"])
+        self.assertEqual(result["data"], naked_job.metadata["data"])
+
+        job_details = {
+            "created_at": naked_job.job.date_created,
+            "started_at": naked_job.job.date_started,
+            "finished_at": naked_job.job.date_finished,
+        }
+
+        self.assertEqual(result["job_details"], job_details)
+
+    def test_getExports_full(self):
+        """Test retrieval of exactly 10 export jobs."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        jobs = []
+
+        for _ in range(10):
+            job = self._create_export_job()
+            jobs.append(job)
+            job.job.start()
+            job.run()
+            job.job.complete()
+
+        jobs = jobs[::-1]  # Reverse since the results will be reversed
+
+        results = getUtility(IVulnerabilitySet).getExports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(10, len(results))
+
+        for job, result in zip(jobs, results):
+            naked_job = removeSecurityProxy(job)
+
+            self.assertEqual(result["request"], naked_job.metadata["request"])
+            self.assertEqual(result["result"], naked_job.metadata["result"])
+            self.assertEqual(result["data"], naked_job.metadata["data"])
+
+            job_details = {
+                "created_at": naked_job.job.date_created,
+                "started_at": naked_job.job.date_started,
+                "finished_at": naked_job.job.date_finished,
+            }
+
+            self.assertEqual(result["job_details"], job_details)
+
+    def test_getExports_more_than_10(self):
+        """Test that only the last 10 export jobs are returned."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss", owner=self.requester)
+
+        jobs = []
+
+        for _ in range(11):
+            job = self._create_export_job()
+            jobs.append(job)
+            job.job.start()
+            job.run()
+            job.job.complete()
+
+        # Reverse and delete the first job since the results will use
+        # the last 10 jobs in reverse
+        jobs = jobs[-1::-1]
+
+        results = getUtility(IVulnerabilitySet).getExports(
+            self.requester,
+            self.handler,
+        )
+
+        self.assertEqual(10, len(results))
+
+        for job, result in zip(jobs, results):
+            naked_job = removeSecurityProxy(job)
+
+            self.assertEqual(result["request"], naked_job.metadata["request"])
+            self.assertEqual(result["result"], naked_job.metadata["result"])
+            self.assertEqual(result["data"], naked_job.metadata["data"])
+
+            job_details = {
+                "created_at": naked_job.job.date_created,
+                "started_at": naked_job.job.date_started,
+                "finished_at": naked_job.job.date_finished,
+            }
+
+            self.assertEqual(result["job_details"], job_details)
+
+    def test_getExports_unauthenticated(self):
+        """Test exports fail when requester lacks handler rights."""
+        self.factory.makeCVE("2025-1979")
+        self.factory.makeDistribution(name="soss")
+
+        job = self._create_export_job()
+
+        job.run()
+
+        self.assertRaisesWithContent(
+            UnauthorizedVulnerabilityHandler,
+            f"You don't have enough rights to use {self.handler}",
+            getUtility(IVulnerabilitySet).getExports,
+            self.requester,
+            self.handler,
+        )
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index c337b8f..93acc60 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -5,6 +5,7 @@ __all__ = [
     "get_vulnerability_privacy_filter",
     "Vulnerability",
     "VulnerabilitySet",
+    "HANDLER_DISTRIBUTION_MAP",
 ]
 
 import operator
@@ -12,7 +13,7 @@ from datetime import timezone
 from typing import Iterable
 
 from storm.databases.postgres import JSON
-from storm.expr import SQL, Coalesce, Join, Or, Select
+from storm.expr import SQL, Coalesce, Desc, Join, Or, Select
 from storm.locals import DateTime, Int, Reference, Unicode
 from storm.store import Store
 from zope.component import getUtility
@@ -49,6 +50,7 @@ from lp.registry.interfaces.accesspolicy import (
     IAccessArtifactGrantSource,
     IAccessArtifactSource,
 )
+from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.interfaces.role import IPersonRoles
 from lp.registry.model.accesspolicy import reconcile_access_for_artifacts
 from lp.registry.model.person import Person
@@ -66,6 +68,8 @@ VULNERABILITY_IMPORT_ENABLED_FEATURE_FLAG = (
     "bugs.vulnerability_import_export.enabled"
 )
 
+HANDLER_DISTRIBUTION_MAP = {VulnerabilityHandlerEnum.SOSS: "soss"}
+
 
 @implementer(IVulnerability, IBugLinkTarget)
 class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
@@ -473,6 +477,107 @@ class VulnerabilitySet:
 
         return None
 
+    def _get_jobs_info(self, vulnerability_jobs):
+        # Merge fields from Job and VulnerabilityJob.metadata
+        jobs_info = []
+        for vulnerability_job in vulnerability_jobs:
+            job = vulnerability_job.job
+            if not job:
+                continue
+
+            job_details = {
+                "created_at": job.date_created,
+                "started_at": job.date_started,
+                "finished_at": job.date_finished,
+            }
+            # breakpoint()
+            job_info = dict(
+                vulnerability_job.metadata or {}
+            ).copy()  # copy so we don’t mutate the original
+
+            request = job_info.get("request")
+            if request and "information_type" in request:
+                info_type_number = request["information_type"]
+                try:
+                    request["information_type"] = InformationType.items[
+                        info_type_number
+                    ].title
+                except (KeyError, IndexError):
+                    request["information_type"] = ""
+
+            job_info["job_details"] = job_details
+            # breakpoint()
+            jobs_info.append(job_info)
+
+        return jobs_info
+
+    def getImports(
+        self,
+        requester,
+        handler,
+    ):
+        from lp.bugs.model.importvulnerabilityjob import ImportVulnerabilityJob
+        from lp.bugs.model.vulnerabilityjob import VulnerabilityJob
+        from lp.bugs.scripts.soss.sossimport import SOSSImporter
+
+        if handler == VulnerabilityHandlerEnum.SOSS:
+            importer = SOSSImporter()
+        else:
+            raise NotFoundError(f"{handler} not found")
+
+        if not importer.checkUserPermissions(requester):
+            raise UnauthorizedVulnerabilityHandler(
+                f"You don't have enough rights to use {handler}"
+            )
+
+        store = IStore(VulnerabilityJob)
+
+        vulnerability_jobs = (
+            store.find(
+                VulnerabilityJob,
+                VulnerabilityJob.job_type
+                == ImportVulnerabilityJob.class_job_type,
+                VulnerabilityJob.handler == handler,
+            )
+            .order_by(Desc(VulnerabilityJob.id))
+            .config(limit=10)
+        )
+
+        return self._get_jobs_info(vulnerability_jobs)
+
+    def getExports(self, requester, handler):
+        from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
+        from lp.bugs.model.vulnerabilityjob import VulnerabilityJob
+        from lp.bugs.scripts.soss.sossexport import SOSSExporter
+
+        if handler == VulnerabilityHandlerEnum.SOSS:
+            exporter = SOSSExporter()
+            distribution = getUtility(IDistributionSet).getByName(
+                HANDLER_DISTRIBUTION_MAP[handler]
+            )
+        else:
+            raise NotFoundError(f"{handler} not found")
+
+        if not exporter.checkUserPermissions(requester, distribution):
+            raise UnauthorizedVulnerabilityHandler(
+                f"You don't have enough rights to use {handler}"
+            )
+
+        store = IStore(VulnerabilityJob)
+
+        vulnerability_jobs = (
+            store.find(
+                VulnerabilityJob,
+                VulnerabilityJob.job_type
+                == ExportVulnerabilityJob.class_job_type,
+                VulnerabilityJob.handler == handler,
+            )
+            .order_by(Desc(VulnerabilityJob.id))
+            .config(limit=10)
+        )
+
+        return self._get_jobs_info(vulnerability_jobs)
+
     def empty_list(self):
         """See `IVulnerabilitySet`."""
         return []
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index a8659d5..07fde36 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -18,7 +18,9 @@ from lp.bugs.scripts.soss.sossimport import (
     PACKAGE_TYPE_MAP,
     PRIORITY_ENUM_MAP,
 )
+from lp.registry.interfaces.role import IPersonRoles
 from lp.registry.model.distribution import Distribution
+from lp.registry.security import SecurityAdminDistribution
 
 __all__ = [
     "SOSSExporter",
@@ -178,3 +180,8 @@ class SOSSExporter:
         if date_obj and date_obj.tzinfo is not None:
             return date_obj.replace(tzinfo=None)
         return date_obj
+
+    def checkUserPermissions(self, user, distribution):
+        return SecurityAdminDistribution(distribution).checkAuthenticated(
+            IPersonRoles(user)
+        )

Follow ups