launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32944
[Merge] ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master
İlker Emre Koç has proposed merging ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master.
Commit message:
Add Vulnerability Exporting Celery Job
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492056
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master.
diff --git a/lib/lp/bugs/configure.zcml b/lib/lp/bugs/configure.zcml
index 687a347..0c6c864 100644
--- a/lib/lp/bugs/configure.zcml
+++ b/lib/lp/bugs/configure.zcml
@@ -1137,6 +1137,17 @@
<allow interface="lp.bugs.interfaces.vulnerabilityjob.IVulnerabilityJob" />
</class>
+ <!-- ExportVulnerabilityJobSource -->
+ <lp:securedutility
+ component="lp.bugs.model.exportvulnerabilityjob.ExportVulnerabilityJob"
+ provides="lp.bugs.interfaces.vulnerabilityjob.IExportVulnerabilityJobSource">
+ <allow interface="lp.bugs.interfaces.vulnerabilityjob.IExportVulnerabilityJobSource"/>
+ </lp:securedutility>
+ <class class="lp.bugs.model.exportvulnerabilityjob.ExportVulnerabilityJob">
+ <allow interface="lp.bugs.interfaces.vulnerabilityjob.IExportVulnerabilityJob" />
+ <allow interface="lp.bugs.interfaces.vulnerabilityjob.IVulnerabilityJob" />
+ </class>
+
<!-- FileBugData -->
<class class="lp.bugs.model.bug.FileBugData">
<allow interface="lp.bugs.interfaces.bug.IFileBugData" />
diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
new file mode 100644
index 0000000..cecb478
--- /dev/null
+++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
@@ -0,0 +1,346 @@
+# Copyright 2025 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__all__ = [
+ "ExportVulnerabilityJob",
+]
+
+import io
+import logging
+import os
+import tempfile
+import zipfile
+from datetime import timedelta
+from typing import List, Optional, Tuple
+
+from zope.component import getUtility
+from zope.interface import implementer, provider
+from zope.security.proxy import removeSecurityProxy
+
+from lp.app.enums import InformationType
+from lp.bugs.enums import VulnerabilityHandlerEnum
+from lp.bugs.interfaces.cve import ICveSet
+from lp.bugs.interfaces.vulnerabilityjob import (
+ IExportVulnerabilityJob,
+ IExportVulnerabilityJobSource,
+ VulnerabilityJobException,
+ VulnerabilityJobInProgress,
+ VulnerabilityJobType,
+)
+from lp.bugs.model.bug import Bug as BugModel
+from lp.bugs.model.cve import Cve as CveModel
+from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.model.vulnerabilityjob import (
+ VulnerabilityJob,
+ VulnerabilityJobDerived,
+)
+from lp.bugs.scripts.soss.models import SOSSRecord
+from lp.bugs.scripts.soss.sossexport import SOSSExporter
+from lp.registry.interfaces.distribution import IDistributionSet
+from lp.registry.model.distribution import Distribution
+from lp.services.config import config
+from lp.services.database.interfaces import IPrimaryStore, IStore
+from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.model.job import Job
+from lp.services.librarian.interfaces import ILibraryFileAliasSet
+from lp.services.utils import utc_now
+
+DISTRIBUTION_NAME = "soss"
+EXPIRATION_WEEK_INTERVAL = 1
+logger = logging.getLogger(__name__)
+
+
+@implementer(IExportVulnerabilityJob)
+@provider(IExportVulnerabilityJobSource)
+class ExportVulnerabilityJob(VulnerabilityJobDerived):
+ class_job_type = VulnerabilityJobType.EXPORT_DATA
+
+ user_error_types = (VulnerabilityJobException,)
+
+ config = config.IExportVulnerabilityJobSource
+
+ def __init__(self, job):
+ super().__init__(job)
+ self.cve_set = getUtility(ICveSet)
+ self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
+
+ @property
+ def sources(self):
+ return self.metadata.get("request").get("sources")
+
+ @property
+ def information_type(self):
+ return self.metadata.get("request").get("information_type")
+
+ @property
+ def error_description(self):
+ return self.metadata.get("result").get("error_description")
+
+ @classmethod
+ def create(
+ cls,
+ handler,
+ sources: Optional[List[str]] = None,
+ information_type=InformationType.PRIVATESECURITY.value,
+ ):
+ """Create a new `ExportVulnerabilityJob`.
+
+ :param handler: What handler to use for importing the data. Can be one
+ of a group of predefined classes (SOSS, UCT, ...).
+ :param sources: A list of sources to export from. Gets used depending
+ on the handler.
+ """
+ store = IPrimaryStore(VulnerabilityJob)
+
+ vulnerability_job = store.find(
+ VulnerabilityJob,
+ VulnerabilityJob.job_id == Job.id,
+ VulnerabilityJob.job_type == cls.class_job_type,
+ VulnerabilityJob.handler == handler,
+ Job._status.is_in(
+ (JobStatus.WAITING, JobStatus.RUNNING, JobStatus.SUSPENDED)
+ ),
+ ).one()
+
+ if vulnerability_job is not None:
+ raise VulnerabilityJobInProgress(cls(vulnerability_job))
+
+ # Schedule the initialization.
+ metadata = {
+ "request": {
+ "sources": sources if sources is not None else [],
+ "information_type": information_type,
+ },
+ "result": {
+ "error_description": [],
+ "succeeded": [],
+ "failed": [],
+ },
+ "data": {
+ "export_link": "",
+ },
+ }
+
+ vulnerability_job = VulnerabilityJob(
+ handler, cls.class_job_type, metadata
+ )
+ store.add(vulnerability_job)
+ derived_job = cls(vulnerability_job)
+ derived_job.celeryRunOnCommit()
+ IStore(VulnerabilityJob).flush()
+ return derived_job
+
+ @classmethod
+ def get(cls, handler):
+ """See `IExportVulnerabilityJob`."""
+ vulnerability_job = (
+ IStore(VulnerabilityJob)
+ .find(
+ VulnerabilityJob,
+ VulnerabilityJob.job_id == Job.id,
+ VulnerabilityJob.job_type == cls.class_job_type,
+ VulnerabilityJob.handler == handler,
+ )
+ .one()
+ )
+ return None if vulnerability_job is None else cls(vulnerability_job)
+
+ def __repr__(self):
+ """Returns an informative representation of the job."""
+ parts = "%s for" % self.__class__.__name__
+ parts += " handler: %s" % self.handler
+ parts += ", metadata: %s" % self.metadata
+ return "<%s>" % parts
+
+ def _get_exporter_to_record(
+ self,
+ handler: VulnerabilityHandlerEnum,
+ information_type: InformationType = InformationType.PRIVATESECURITY,
+ ):
+ """Decide which parser and importer to use
+
+ :return: a tuple of (parser, importer) where parser is the function
+ that gets a blob and returns a record and importer is the function that
+ gets a record and imports it.
+ """
+
+ if handler == VulnerabilityHandlerEnum.SOSS:
+ exporter = SOSSExporter(
+ information_type=information_type
+ ).to_record
+ else:
+ exception = VulnerabilityJobException("Handler not found")
+ self.notifyUserError(exception)
+ raise exception
+
+ return exporter
+
+ def _get_bug_and_vulnerability(
+ self, cve: CveModel, distribution: Distribution
+ ) -> Tuple[Optional[BugModel], Optional[Vulnerability]]:
+
+ vulnerability = cve.getDistributionVulnerability(distribution)
+
+ if not vulnerability:
+ logger.debug(
+ f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
+ "does not have a vulnerability attached."
+ )
+ return None, None
+
+ bugs = vulnerability.bugs
+
+ if not bugs:
+ logger.debug(
+ f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
+ "does not have a bug attached."
+ )
+ return None, None
+
+ if len(bugs) > 1:
+ logger.debug(
+ f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
+ "has more than one bug attached."
+ )
+ return None, None
+
+ bug = bugs[0]
+
+ return removeSecurityProxy(bug), vulnerability
+
+ def run(self):
+ """See `IRunnableJob`."""
+
+ information_type = InformationType.items[self.information_type]
+ export_to_record = self._get_exporter_to_record(
+ self.context.handler,
+ information_type,
+ )
+
+ logger.debug(
+ "[ExportVulnerabilityJob] Getting CVEs to export and storing "
+ "them in Records"
+ )
+
+ exported_cves: List[Tuple[SOSSRecord, str]] = []
+ for cve in self.cve_set:
+
+ bug, vulnerability = self._get_bug_and_vulnerability(
+ cve, self.soss
+ )
+
+ if not bug or not vulnerability:
+ continue
+
+ try:
+ record = export_to_record(cve, self.soss, bug, vulnerability)
+ except ValueError as e:
+ self.notifyUserError(e)
+ continue
+
+ if record is None:
+ continue
+
+ exported_cves.append((record, record.candidate))
+
+ if exported_cves == []:
+ exception = VulnerabilityJobException("No CVEs to export")
+ self.notifyUserError(exception)
+ return
+
+ # Create a temporary folder
+ with tempfile.TemporaryDirectory() as temp_dir:
+
+ logger.debug(
+ "[ExportVulnerabilityJob] Writing CVEs to temporary "
+ f"directory {temp_dir}"
+ )
+
+ for record, cve_name in exported_cves:
+ file_path = os.path.join(temp_dir, cve_name)
+
+ with open(file_path, "w") as f:
+
+ logger.debug(
+ "[ExportVulnerabilityJob] Writing CVE %s to file %s",
+ cve_name,
+ file_path,
+ )
+
+ f.write(record.to_yaml())
+
+ # Create a zip archive of the temp folder
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zipf:
+ logger.debug(
+ "[ExportVulnerabilityJob] Compressing CVEs to zip"
+ )
+ for dirpath, _, filenames in os.walk(temp_dir):
+ for filename in filenames:
+ try:
+ logger.debug(
+ "[ExportVulnerabilityJob] Compressing file %s",
+ filename,
+ )
+ filepath = os.path.join(dirpath, filename)
+ arcname = os.path.relpath(
+ filepath, temp_dir
+ ) # relative path inside zip
+ zipf.write(filepath, arcname)
+ except Exception as e:
+ logger.error(
+ f"Error adding file {filename} to zip: {e}"
+ )
+ self.metadata["result"]["failed"].append(filename)
+ self.notifyUserError(e)
+ else:
+ self.metadata["result"]["succeeded"].append(
+ filename
+ )
+
+ zip_name = f"exported_vulnerabilities_{self.context.id}.zip"
+ contentType = "application/zip"
+ file_size = buf.getbuffer().nbytes
+
+ expires = utc_now() + timedelta(weeks=EXPIRATION_WEEK_INTERVAL)
+
+ # Reset buffer position to the beginning since librarian will read it.
+ buf.seek(0)
+
+ logger.debug("[ExportVulnerabilityJob] Writing zip to librarian")
+ lfa = getUtility(ILibraryFileAliasSet).create(
+ name=zip_name,
+ size=file_size,
+ file=buf,
+ contentType=contentType,
+ expires=expires,
+ )
+
+ self.metadata["data"]["export_link"] = lfa.getURL()
+
+ def notifyUserError(self, error):
+ """Calls up and also saves the error text in this job's metadata.
+
+ See `BaseRunnableJob`.
+ """
+ # This method is called when error is an instance of
+ # self.user_error_types.
+ super().notifyUserError(error)
+ logger.error(error)
+ error_description = self.metadata.get("result").get(
+ "error_description", []
+ )
+ error_description.append(str(error))
+ self.metadata["result"]["error_description"] = error_description
+
+ def getOopsVars(self):
+ """See `IRunnableJob`."""
+ vars = super().getOopsVars()
+ vars.extend(
+ [
+ ("vulnerabilityjob_job_id", self.context.id),
+ ("vulnerability_job_type", self.context.job_type.title),
+ ("handler", self.context.handler),
+ ]
+ )
+ return vars
diff --git a/lib/lp/bugs/tests/sampledata/CVE-2005-1544 b/lib/lp/bugs/tests/sampledata/CVE-2005-1544
new file mode 100644
index 0000000..15536ad
--- /dev/null
+++ b/lib/lp/bugs/tests/sampledata/CVE-2005-1544
@@ -0,0 +1,34 @@
+References: []
+Notes: []
+Priority: Needs-triage
+Priority-Reason: ''
+Assigned-To: ''
+Packages:
+ conda:
+ - Name: libtiff
+ Channel: focal:4.5.0-h6adf6a1/stable
+ Repositories:
+ - soss-conda-candidate-local
+ Status: ignored
+ Note: ''
+ - Name: opencv
+ Channel: focal:4.5.3-py39hf3d152e/stable
+ Repositories:
+ - soss-conda-stable-local
+ - soss-conda-candidate-local
+ Status: ignored
+ Note: ''
+ unpackaged:
+ - Name: opencv
+ Channel: jammy:4.8.0/stable
+ Repositories:
+ - soss-src-stable-local
+ Status: ignored
+ Note: ''
+ - Name: opencv
+ Channel: jammy:4.7.0/stable
+ Repositories:
+ - soss-src-stable-local
+ Status: ignored
+ Note: ''
+Candidate: CVE-2005-1544
diff --git a/lib/lp/bugs/tests/sampledata/CVE-2011-5000 b/lib/lp/bugs/tests/sampledata/CVE-2011-5000
new file mode 100644
index 0000000..76ce151
--- /dev/null
+++ b/lib/lp/bugs/tests/sampledata/CVE-2011-5000
@@ -0,0 +1,15 @@
+References: []
+Notes: []
+Priority: Needs-triage
+Priority-Reason: ''
+Assigned-To: ''
+Packages:
+ conda:
+ - Name: openssh
+ Channel: focal:8.6p1-h1fa914a/stable
+ Repositories:
+ - soss-conda-src-local
+ - soss-conda-stable-local
+ Status: ignored
+ Note: ''
+Candidate: CVE-2011-5000
diff --git a/lib/lp/bugs/tests/sampledata/CVE-2021-21300 b/lib/lp/bugs/tests/sampledata/CVE-2021-21300
new file mode 100644
index 0000000..3997a96
--- /dev/null
+++ b/lib/lp/bugs/tests/sampledata/CVE-2021-21300
@@ -0,0 +1,14 @@
+References: []
+Notes: []
+Priority: Needs-triage
+Priority-Reason: ''
+Assigned-To: octagalland
+Packages:
+ conda:
+ - Name: git
+ Channel: focal:2.32.0-pl5321hc30692c/stable
+ Repositories:
+ - soss-conda-candidate-local
+ Status: ignored
+ Note: was ignored
+Candidate: CVE-2021-21300
diff --git a/lib/lp/bugs/tests/sampledata/CVE-2025-1979 b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
new file mode 100644
index 0000000..a562efa
--- /dev/null
+++ b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
@@ -0,0 +1,78 @@
+References:
+- https://github.com/ray-project/ray/commit/64a2e4010522d60b90c389634f24df77b603d85d
+- https://github.com/ray-project/ray/issues/50266
+- https://github.com/ray-project/ray/pull/50409
+- https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212
+- https://ubuntu.com/security/notices/SSN-148-1.json?show_hidden=true
+Notes:
+- This is a sample soss cve with all the fields filled for testing
+- sample note 2
+Priority: Low
+Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally and not
+ transferred between agents, so local log access is the only conceivable method to
+ view the password for the redis instance (i.e., no possibility of MitM to access
+ the logs). Given the requirement for priviledged system access to access log files
+ the real "danger" posed by the vulnerability is quite low, and that is reflected
+ in this priority assignment. '
+Assigned-To: janitor
+Packages:
+ conda:
+ - Name: ray
+ Channel: jammy:1.17.0/stable
+ Repositories:
+ - nvidia-pb3-python-stable-local
+ Status: not-affected
+ Note: 2.22.0+soss.1
+ maven:
+ - Name: vllm
+ Channel: noble:0.7.3/stable
+ Repositories:
+ - soss-src-stable-local
+ Status: needs-triage
+ Note: ''
+ python:
+ - Name: pyyaml
+ Channel: jammy:2.22.0/stable
+ Repositories:
+ - nvidia-pb3-python-stable-local
+ Status: not-affected
+ Note: ''
+ - Name: ray
+ Channel: jammy:2.22.0/stable
+ Repositories:
+ - nvidia-pb3-python-stable-local
+ Status: released
+ Note: 2.22.0+soss.1
+ rust:
+ - Name: ray
+ Channel: focal:0.27.0/stable
+ Repositories:
+ - nvidia-pb3-python-stable-local
+ Status: deferred
+ Note: 2.22.0+soss.1
+ unpackaged:
+ - Name: vllm
+ Channel: noble:0.7.3/stable
+ Repositories:
+ - soss-src-stable-local
+ Status: needed
+ Note: ''
+Candidate: CVE-2025-1979
+Description: "Versions of the package ray before 2.43.0 are vulnerable to Insertion\
+ \ of Sensitive Information into Log File where the redis password is being logged\
+ \ in the standard logging. If the redis password is passed as an argument, it will\
+ \ be logged and could potentially leak the password.\r\rThis is only exploitable\
+ \ if:\r\r1) Logging is enabled;\r\r2) Redis is using password authentication;\r\r\
+ 3) Those logs are accessible to an attacker, who can reach that redis instance.\r\
+ \r**Note:**\r\rIt is recommended that anyone who is running in this configuration\
+ \ should update to the latest version of Ray, then rotate their redis password."
+CVSS:
+- source: report@xxxxxxx
+ vector: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N
+ baseScore: 6.4
+ baseSeverity: MEDIUM
+- source: security-advisories@xxxxxxxxxx
+ vector: CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
+ baseScore: 9.0
+ baseSeverity: CRITICAL
+PublicDate: '2025-03-06T05:15:16.213'
diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
new file mode 100644
index 0000000..11355cc
--- /dev/null
+++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
@@ -0,0 +1,404 @@
+# Copyright 2025 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from pathlib import Path
+
+import transaction
+from testtools.matchers import MatchesRegex
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.app.enums import InformationType
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.enums import VulnerabilityHandlerEnum
+from lp.bugs.interfaces.cve import ICveSet
+from lp.bugs.interfaces.vulnerabilityjob import (
+ IExportVulnerabilityJobSource,
+ VulnerabilityJobException,
+ VulnerabilityJobInProgress,
+)
+from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
+from lp.bugs.scripts.soss.sossimport import SOSSImporter
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.tests import block_on_job
+from lp.services.librarian.model import LibraryFileAlias
+from lp.testing import TestCaseWithFactory, person_logged_in
+from lp.testing.layers import CeleryJobLayer, LaunchpadZopelessLayer
+
+
+class ExportVulnerabilityJobTests(TestCaseWithFactory):
+ """Test case for ImportVulnerabilityJob."""
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super().setUp()
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+
+ self.cve_set = getUtility(ICveSet)
+
+ @property
+ def job_source(self):
+ return getUtility(IExportVulnerabilityJobSource)
+
+ def test_getOopsVars(self):
+ """Test getOopsVars method."""
+ handler = VulnerabilityHandlerEnum.SOSS
+
+ job = self.job_source.create(handler)
+ vars = job.getOopsVars()
+ naked_job = removeSecurityProxy(job)
+ self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars)
+ self.assertIn(
+ ("vulnerability_job_type", naked_job.job_type.title), vars
+ )
+ self.assertIn(("handler", naked_job.handler), vars)
+
+ def test___repr__(self):
+ """Test __repr__ method."""
+ handler = VulnerabilityHandlerEnum.SOSS
+ information_type = InformationType.PRIVATESECURITY.value
+
+ metadata = {
+ "request": {
+ "sources": [],
+ "information_type": information_type,
+ },
+ "result": {
+ "error_description": [],
+ "succeeded": [],
+ "failed": [],
+ },
+ "data": {
+ "export_link": "",
+ },
+ }
+
+ job = self.job_source.create(
+ handler, information_type=information_type
+ )
+
+ expected = (
+ "<ExportVulnerabilityJob for "
+ f"handler: {handler}, "
+ f"metadata: {metadata}"
+ ">"
+ )
+ self.assertEqual(expected, repr(job))
+
+ def test_create_with_existing_in_progress_job(self):
+ """If there's already a waiting/running ExportVulnerabilityJob for the
+ handler ExportVulnerabilityJob.create() raises an exception.
+ """
+ handler = VulnerabilityHandlerEnum.SOSS
+
+ # Job waiting status
+ job = self.job_source.create(handler)
+ waiting_exception = self.assertRaises(
+ VulnerabilityJobInProgress,
+ self.job_source.create,
+ handler,
+ )
+ self.assertEqual(job, waiting_exception.job)
+
+ # Job status from WAITING to RUNNING
+ job.start()
+ running_exception = self.assertRaises(
+ VulnerabilityJobInProgress, self.job_source.create, handler
+ )
+ self.assertEqual(job, running_exception.job)
+
+ def test_create_with_existing_completed_job(self):
+ """If there's already a completed ExportVulnerabilityJob for the
+ handler the job can be runned again.
+ """
+ handler = VulnerabilityHandlerEnum.SOSS
+
+ job = self.job_source.create(handler)
+ job.start()
+ job.complete()
+ self.assertEqual(job.status, JobStatus.COMPLETED)
+
+ job_duplicated = self.job_source.create(handler)
+ job_duplicated.start()
+ job_duplicated.complete()
+ self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
+
+ def test_create_with_existing_failed_job(self):
+ """If there's a failed ExportVulnerabilityJob for the handler the job
+ can be runned again.
+ """
+ handler = VulnerabilityHandlerEnum.SOSS
+
+ job = self.job_source.create(handler)
+ job.start()
+ job.fail()
+ self.assertEqual(job.status, JobStatus.FAILED)
+
+ job_duplicated = self.job_source.create(handler)
+ job_duplicated.start()
+ job_duplicated.complete()
+ self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
+
+ def test_arguments(self):
+ """Test that ExportVulnerabilityJob specified with arguments can
+ be gotten out again."""
+
+ handler = VulnerabilityHandlerEnum.SOSS
+ information_type = InformationType.PRIVATESECURITY.value
+
+ metadata = {
+ "request": {
+ "sources": ["https://launchpad.net/ubuntu"],
+ "information_type": information_type,
+ },
+ "result": {
+ "error_description": [],
+ "succeeded": [],
+ "failed": [],
+ },
+ "data": {
+ "export_link": "",
+ },
+ }
+
+ job = self.job_source.create(
+ handler,
+ sources=["https://launchpad.net/ubuntu"],
+ information_type=information_type,
+ )
+
+ naked_job = removeSecurityProxy(job)
+ self.assertEqual(naked_job.handler, handler)
+
+ self.assertEqual(naked_job.metadata, metadata)
+
+ def test_run_with_no_CVEs(self):
+ """
+ Run ExportVulnerabilityJob but with no CVE that also has its bug and
+ vulnerability.
+
+ Note: Since we use the database to look for CVE sequence names, the
+ dev environment's sample database must have no CVEs with bugs and
+ vulnerabilities for this test to pass. If they ever get added, the
+ test will need to be updated to use a different approach.
+ """
+
+ self.factory.makeDistribution(name="soss")
+
+ job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS)
+
+ # Run the job as bug_importer since normal users don't have permission
+ # This is to ensure we are actually testing the "no CVEs" case
+ # instead of hitting a permission hiding the CVEs.
+ with person_logged_in(self.bug_importer):
+ job.run()
+
+ self.assertEqual(
+ job.metadata.get("result"),
+ {
+ "succeeded": [],
+ "failed": [],
+ "error_description": ["No CVEs to export"],
+ },
+ )
+
+ def _put_cve_in_soss(self):
+ self.factory.makePerson(name="octagalland")
+ self.factory.makeDistribution(
+ name="soss",
+ displayname="SOSS",
+ information_type=InformationType.PROPRIETARY,
+ )
+
+ sampledata = Path(__file__).parent / "sampledata"
+
+ soss_importer = SOSSImporter()
+
+ imported_list = []
+ for file in sampledata.iterdir():
+ cve_sequence = file.name.lstrip("CVE-")
+ if not self.cve_set[cve_sequence]:
+ self.factory.makeCVE(sequence=cve_sequence)
+
+ bug, vulnerability = soss_importer.import_cve_from_file(file)
+ imported_list.append((cve_sequence, bug, vulnerability))
+
+ return imported_list
+
+ def test_run_export(self):
+ """Run ExportVulnerabilityJob."""
+ imported_list = self._put_cve_in_soss()
+ information_type = InformationType.PRIVATESECURITY.value
+ export_link = "http://example.com/fake-url"
+
+ self.patch(
+ LibraryFileAlias,
+ "getURL",
+ lambda self: export_link,
+ )
+
+ job = self.job_source.create(
+ handler=VulnerabilityHandlerEnum.SOSS,
+ information_type=information_type,
+ )
+
+ with person_logged_in(self.bug_importer):
+ job.run()
+
+ cve_names = [
+ f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
+ ]
+
+ naked_job_metadata = removeSecurityProxy(job.metadata)
+ naked_job_metadata["result"]["succeeded"].sort()
+ cve_names.sort()
+
+ self.assertEqual(
+ naked_job_metadata,
+ {
+ "request": {
+ "sources": [],
+ "information_type": information_type,
+ },
+ "result": {
+ "error_description": [],
+ "succeeded": cve_names,
+ "failed": [],
+ },
+ "data": {
+ "export_link": export_link,
+ },
+ },
+ )
+
+ def test_get(self):
+ """ExportVulnerabilityJob.get() returns the import job for the given
+ handler.
+ """
+ handler = VulnerabilityHandlerEnum.SOSS
+
+ # There is no job before creating it
+ self.assertIs(None, self.job_source.get(handler))
+
+ job = self.job_source.create(handler)
+ job_gotten = self.job_source.get(handler)
+
+ self.assertIsInstance(job, ExportVulnerabilityJob)
+ self.assertEqual(job, job_gotten)
+
+ def test_error_description_when_no_error(self):
+ """The ExportVulnerabilityJob.error_description property returns
+ None when no error description is recorded."""
+ handler = VulnerabilityHandlerEnum.SOSS
+ information_type = InformationType.PRIVATESECURITY.value
+
+ job = self.job_source.create(
+ handler,
+ information_type=information_type,
+ )
+ self.assertEqual([], removeSecurityProxy(job).error_description)
+
+ def test_error_description_set_when_notifying_about_user_errors(self):
+ """Test that error_description is set by notifyUserError()."""
+ handler = VulnerabilityHandlerEnum.SOSS
+ information_type = InformationType.PRIVATESECURITY.value
+
+ job = self.job_source.create(
+ handler,
+ information_type=information_type,
+ )
+ message = "This is an example message."
+ job.notifyUserError(VulnerabilityJobException(message))
+ self.assertEqual([message], removeSecurityProxy(job).error_description)
+
+
+class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
+ layer = CeleryJobLayer
+
+ def setUp(self):
+ super().setUp()
+
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+ self.cve_set = getUtility(ICveSet)
+
+ def _put_cve_in_soss(self):
+ self.factory.makePerson(name="octagalland")
+ self.factory.makeDistribution(
+ name="soss",
+ displayname="SOSS",
+ information_type=InformationType.PROPRIETARY,
+ )
+
+ sampledata = Path(__file__).parent / "sampledata"
+
+ soss_importer = SOSSImporter()
+
+ imported_list = []
+ for file in sampledata.iterdir():
+ cve_sequence = (
+ file.name[4:] if file.name.startswith("CVE-") else file.name
+ )
+ if not self.cve_set[cve_sequence]:
+ self.factory.makeCVE(sequence=cve_sequence)
+
+ bug, vulnerability = soss_importer.import_cve_from_file(file)
+ imported_list.append((cve_sequence, bug, vulnerability))
+
+ return imported_list
+
+ def test_job(self):
+ """Job runs via Celery."""
+ fixture = FeatureFixture(
+ {
+ "jobs.celery.enabled_classes": "ExportVulnerabilityJob",
+ }
+ )
+ self.useFixture(fixture)
+
+ imported_list = self._put_cve_in_soss()
+ transaction.commit()
+
+ job_source = getUtility(IExportVulnerabilityJobSource)
+
+ handler = VulnerabilityHandlerEnum.SOSS
+ information_type = InformationType.PRIVATESECURITY.value
+ with block_on_job():
+ job_source.create(
+ handler,
+ information_type=information_type,
+ )
+ transaction.commit()
+
+ cve_names = [
+ f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
+ ]
+
+ job = job_source.get(handler)
+
+ naked_job_metadata = removeSecurityProxy(job.metadata)
+ naked_job_metadata["result"]["succeeded"].sort()
+ cve_names.sort()
+
+ metadata_request = {
+ "sources": [],
+ "information_type": information_type,
+ }
+
+ metadata_result = {
+ "error_description": [],
+ "succeeded": cve_names,
+ "failed": [],
+ }
+
+ self.assertEqual(handler, job.handler)
+ self.assertEqual(metadata_request, naked_job_metadata["request"])
+ self.assertEqual(metadata_result, naked_job_metadata["result"])
+
+ self.assertThat(
+ naked_job_metadata["data"]["export_link"],
+ MatchesRegex(
+ r".*exported_vulnerabilities_[0-9]+\.zip$",
+ ),
+ )
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index 5dbcf5a..d3c2892 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -2254,6 +2254,10 @@ link: IBranchMergeProposalJobSource
module: lp.bugs.interfaces.vulnerabilityjob
dbuser: launchpad_main
+[IExportVulnerabilityJobSource]
+module: lp.bugs.interfaces.vulnerabilityjob
+dbuser: launchpad_main
+
[IWebhookDeliveryJobSource]
module: lp.services.webhooks.interfaces
dbuser: webhookrunner
Follow ups