launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32950
Re: [Merge] ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master
Replied!
Diff comments:
> diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..cecb478
> --- /dev/null
> +++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
> @@ -0,0 +1,346 @@
> +# Copyright 2025 Canonical Ltd. This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> + "ExportVulnerabilityJob",
> +]
> +
> +import io
> +import logging
> +import os
> +import tempfile
> +import zipfile
> +from datetime import timedelta
> +from typing import List, Optional, Tuple
> +
> +from zope.component import getUtility
> +from zope.interface import implementer, provider
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> + IExportVulnerabilityJob,
> + IExportVulnerabilityJobSource,
> + VulnerabilityJobException,
> + VulnerabilityJobInProgress,
> + VulnerabilityJobType,
> +)
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.bugs.model.vulnerabilityjob import (
> + VulnerabilityJob,
> + VulnerabilityJobDerived,
> +)
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.bugs.scripts.soss.sossexport import SOSSExporter
> +from lp.registry.interfaces.distribution import IDistributionSet
> +from lp.registry.model.distribution import Distribution
> +from lp.services.config import config
> +from lp.services.database.interfaces import IPrimaryStore, IStore
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.model.job import Job
> +from lp.services.librarian.interfaces import ILibraryFileAliasSet
> +from lp.services.utils import utc_now
> +
> +DISTRIBUTION_NAME = "soss"
> +EXPIRATION_WEEK_INTERVAL = 1
> +logger = logging.getLogger(__name__)
> +
> +
> +@implementer(IExportVulnerabilityJob)
> +@provider(IExportVulnerabilityJobSource)
> +class ExportVulnerabilityJob(VulnerabilityJobDerived):
> + class_job_type = VulnerabilityJobType.EXPORT_DATA
> +
> + user_error_types = (VulnerabilityJobException,)
> +
> + config = config.IExportVulnerabilityJobSource
> +
> + def __init__(self, job):
> + super().__init__(job)
> + self.cve_set = getUtility(ICveSet)
> + self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
> +
> + @property
> + def sources(self):
> + return self.metadata.get("request").get("sources")
> +
> + @property
> + def information_type(self):
> + return self.metadata.get("request").get("information_type")
> +
> + @property
> + def error_description(self):
> + return self.metadata.get("result").get("error_description")
> +
> + @classmethod
> + def create(
> + cls,
> + handler,
> + sources: Optional[List[str]] = None,
> + information_type=InformationType.PRIVATESECURITY.value,
> + ):
> + """Create a new `ExportVulnerabilityJob`.
> +
> + :param handler: What handler to use for importing the data. Can be one
> + of a group of predefined classes (SOSS, UCT, ...).
> + :param sources: A list of sources to export from. Gets used depending
> + on the handler.
> + """
> + store = IPrimaryStore(VulnerabilityJob)
> +
> + vulnerability_job = store.find(
> + VulnerabilityJob,
> + VulnerabilityJob.job_id == Job.id,
> + VulnerabilityJob.job_type == cls.class_job_type,
> + VulnerabilityJob.handler == handler,
> + Job._status.is_in(
> + (JobStatus.WAITING, JobStatus.RUNNING, JobStatus.SUSPENDED)
> + ),
> + ).one()
> +
> + if vulnerability_job is not None:
> + raise VulnerabilityJobInProgress(cls(vulnerability_job))
> +
> + # Schedule the initialization.
> + metadata = {
> + "request": {
> + "sources": sources if sources is not None else [],
> + "information_type": information_type,
> + },
> + "result": {
> + "error_description": [],
> + "succeeded": [],
> + "failed": [],
> + },
> + "data": {
> + "export_link": "",
> + },
> + }
> +
> + vulnerability_job = VulnerabilityJob(
> + handler, cls.class_job_type, metadata
> + )
> + store.add(vulnerability_job)
> + derived_job = cls(vulnerability_job)
> + derived_job.celeryRunOnCommit()
> + IStore(VulnerabilityJob).flush()
> + return derived_job
> +
> + @classmethod
> + def get(cls, handler):
> + """See `IExportVulnerabilityJob`."""
> + vulnerability_job = (
> + IStore(VulnerabilityJob)
> + .find(
> + VulnerabilityJob,
> + VulnerabilityJob.job_id == Job.id,
> + VulnerabilityJob.job_type == cls.class_job_type,
> + VulnerabilityJob.handler == handler,
> + )
> + .one()
> + )
> + return None if vulnerability_job is None else cls(vulnerability_job)
> +
> + def __repr__(self):
> + """Returns an informative representation of the job."""
> + parts = "%s for" % self.__class__.__name__
> + parts += " handler: %s" % self.handler
> + parts += ", metadata: %s" % self.metadata
> + return "<%s>" % parts
> +
> + def _get_exporter_to_record(
> + self,
> + handler: VulnerabilityHandlerEnum,
> + information_type: InformationType = InformationType.PRIVATESECURITY,
> + ):
> + """Decide which parser and importer to use
> +
> + :return: a tuple of (parser, importer) where parser is the function
> + that gets a blob and returns a record and importer is the function that
> + gets a record and imports it.
> + """
> +
> + if handler == VulnerabilityHandlerEnum.SOSS:
yes!
> + exporter = SOSSExporter(
> + information_type=information_type
> + ).to_record
> + else:
> + exception = VulnerabilityJobException("Handler not found")
> + self.notifyUserError(exception)
> + raise exception
> +
> + return exporter
> +
> + def _get_bug_and_vulnerability(
> + self, cve: CveModel, distribution: Distribution
> + ) -> Tuple[Optional[BugModel], Optional[Vulnerability]]:
> +
> + vulnerability = cve.getDistributionVulnerability(distribution)
> +
> + if not vulnerability:
> + logger.debug(
> + f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> + "does not have a vulnerability attached."
> + )
> + return None, None
> +
> + bugs = vulnerability.bugs
> +
> + if not bugs:
> + logger.debug(
> + f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> + "does not have a bug attached."
> + )
> + return None, None
> +
> + if len(bugs) > 1:
> + logger.debug(
> + f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> + "has more than one bug attached."
> + )
> + return None, None
> +
> + bug = bugs[0]
> +
> + return removeSecurityProxy(bug), vulnerability
nice thanks!
> +
> + def run(self):
> + """See `IRunnableJob`."""
> +
> + information_type = InformationType.items[self.information_type]
> + export_to_record = self._get_exporter_to_record(
> + self.context.handler,
> + information_type,
> + )
> +
> + logger.debug(
> + "[ExportVulnerabilityJob] Getting CVEs to export and storing "
> + "them in Records"
> + )
> +
> + exported_cves: List[Tuple[SOSSRecord, str]] = []
> + for cve in self.cve_set:
<3
> +
> + bug, vulnerability = self._get_bug_and_vulnerability(
> + cve, self.soss
> + )
> +
> + if not bug or not vulnerability:
> + continue
> +
> + try:
> + record = export_to_record(cve, self.soss, bug, vulnerability)
> + except ValueError as e:
> + self.notifyUserError(e)
> + continue
> +
> + if record is None:
Nice, thanks! Sorry for iterations.
I was only referring to add that cve to the metadata :)
> + continue
> +
> + exported_cves.append((record, record.candidate))
> +
> + if exported_cves == []:
> + exception = VulnerabilityJobException("No CVEs to export")
> + self.notifyUserError(exception)
> + return
I think that only happens with testing, we can check it :)
> +
> + # Create a temporary folder
> + with tempfile.TemporaryDirectory() as temp_dir:
> +
> + logger.debug(
> + "[ExportVulnerabilityJob] Writing CVEs to temporary "
> + f"directory {temp_dir}"
> + )
> +
> + for record, cve_name in exported_cves:
> + file_path = os.path.join(temp_dir, cve_name)
> +
> + with open(file_path, "w") as f:
> +
> + logger.debug(
> + "[ExportVulnerabilityJob] Writing CVE %s to file %s",
> + cve_name,
> + file_path,
> + )
> +
> + f.write(record.to_yaml())
> +
> + # Create a zip archive of the temp folder
> + buf = io.BytesIO()
> + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zipf:
> + logger.debug(
> + "[ExportVulnerabilityJob] Compressing CVEs to zip"
> + )
> + for dirpath, _, filenames in os.walk(temp_dir):
> + for filename in filenames:
> + try:
> + logger.debug(
> + "[ExportVulnerabilityJob] Compressing file %s",
> + filename,
> + )
> + filepath = os.path.join(dirpath, filename)
> + arcname = os.path.relpath(
> + filepath, temp_dir
> + ) # relative path inside zip
> + zipf.write(filepath, arcname)
> + except Exception as e:
> + logger.error(
> + f"Error adding file {filename} to zip: {e}"
> + )
> + self.metadata["result"]["failed"].append(filename)
> + self.notifyUserError(e)
> + else:
> + self.metadata["result"]["succeeded"].append(
> + filename
> + )
> +
> + zip_name = f"exported_vulnerabilities_{self.context.id}.zip"
> + contentType = "application/zip"
> + file_size = buf.getbuffer().nbytes
> +
> + expires = utc_now() + timedelta(weeks=EXPIRATION_WEEK_INTERVAL)
> +
> + # Reset buffer position to the beginning since librarian will read it.
> + buf.seek(0)
> +
> + logger.debug("[ExportVulnerabilityJob] Writing zip to librarian")
> + lfa = getUtility(ILibraryFileAliasSet).create(
> + name=zip_name,
> + size=file_size,
> + file=buf,
> + contentType=contentType,
> + expires=expires,
> + )
> +
> + self.metadata["data"]["export_link"] = lfa.getURL()
> +
> + def notifyUserError(self, error):
> + """Calls up and also saves the error text in this job's metadata.
> +
> + See `BaseRunnableJob`.
> + """
> + # This method is called when error is an instance of
> + # self.user_error_types.
> + super().notifyUserError(error)
> + logger.error(error)
> + error_description = self.metadata.get("result").get(
> + "error_description", []
> + )
> + error_description.append(str(error))
> + self.metadata["result"]["error_description"] = error_description
> +
> + def getOopsVars(self):
> + """See `IRunnableJob`."""
> + vars = super().getOopsVars()
> + vars.extend(
> + [
> + ("vulnerabilityjob_job_id", self.context.id),
> + ("vulnerability_job_type", self.context.job_type.title),
> + ("handler", self.context.handler),
> + ]
> + )
> + return vars
> diff --git a/lib/lp/bugs/tests/sampledata/CVE-2025-1979 b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> new file mode 100644
> index 0000000..a562efa
> --- /dev/null
> +++ b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> @@ -0,0 +1,78 @@
> +References:
Yep, I'm also not sure about this, we can ask others for opinions. But I'm towards preferring that, rather than duplicating. I'm doing that in the `importvulnerabilityjob` tests.
> +- https://github.com/ray-project/ray/commit/64a2e4010522d60b90c389634f24df77b603d85d
> +- https://github.com/ray-project/ray/issues/50266
> +- https://github.com/ray-project/ray/pull/50409
> +- https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212
> +- https://ubuntu.com/security/notices/SSN-148-1.json?show_hidden=true
> +Notes:
> +- This is a sample soss cve with all the fields filled for testing
> +- sample note 2
> +Priority: Low
> +Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally and not
> + transferred between agents, so local log access is the only conceivable method to
> + view the password for the redis instance (i.e., no possibility of MitM to access
> + the logs). Given the requirement for priviledged system access to access log files
> + the real "danger" posed by the vulnerability is quite low, and that is reflected
> + in this priority assignment. '
> +Assigned-To: janitor
> +Packages:
> + conda:
> + - Name: ray
> + Channel: jammy:1.17.0/stable
> + Repositories:
> + - nvidia-pb3-python-stable-local
> + Status: not-affected
> + Note: 2.22.0+soss.1
> + maven:
> + - Name: vllm
> + Channel: noble:0.7.3/stable
> + Repositories:
> + - soss-src-stable-local
> + Status: needs-triage
> + Note: ''
> + python:
> + - Name: pyyaml
> + Channel: jammy:2.22.0/stable
> + Repositories:
> + - nvidia-pb3-python-stable-local
> + Status: not-affected
> + Note: ''
> + - Name: ray
> + Channel: jammy:2.22.0/stable
> + Repositories:
> + - nvidia-pb3-python-stable-local
> + Status: released
> + Note: 2.22.0+soss.1
> + rust:
> + - Name: ray
> + Channel: focal:0.27.0/stable
> + Repositories:
> + - nvidia-pb3-python-stable-local
> + Status: deferred
> + Note: 2.22.0+soss.1
> + unpackaged:
> + - Name: vllm
> + Channel: noble:0.7.3/stable
> + Repositories:
> + - soss-src-stable-local
> + Status: needed
> + Note: ''
> +Candidate: CVE-2025-1979
> +Description: "Versions of the package ray before 2.43.0 are vulnerable to Insertion\
> + \ of Sensitive Information into Log File where the redis password is being logged\
> + \ in the standard logging. If the redis password is passed as an argument, it will\
> + \ be logged and could potentially leak the password.\r\rThis is only exploitable\
> + \ if:\r\r1) Logging is enabled;\r\r2) Redis is using password authentication;\r\r\
> + 3) Those logs are accessible to an attacker, who can reach that redis instance.\r\
> + \r**Note:**\r\rIt is recommended that anyone who is running in this configuration\
> + \ should update to the latest version of Ray, then rotate their redis password."
> +CVSS:
> +- source: report@xxxxxxx
> + vector: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N
> + baseScore: 6.4
> + baseSeverity: MEDIUM
> +- source: security-advisories@xxxxxxxxxx
> + vector: CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
> + baseScore: 9.0
> + baseSeverity: CRITICAL
> +PublicDate: '2025-03-06T05:15:16.213'
> diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..11355cc
> --- /dev/null
> +++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> @@ -0,0 +1,404 @@
> +# Copyright 2025 Canonical Ltd. This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +from pathlib import Path
> +
> +import transaction
> +from testtools.matchers import MatchesRegex
> +from zope.component import getUtility
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> + IExportVulnerabilityJobSource,
> + VulnerabilityJobException,
> + VulnerabilityJobInProgress,
> +)
> +from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
> +from lp.bugs.scripts.soss.sossimport import SOSSImporter
> +from lp.services.features.testing import FeatureFixture
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.tests import block_on_job
> +from lp.services.librarian.model import LibraryFileAlias
> +from lp.testing import TestCaseWithFactory, person_logged_in
> +from lp.testing.layers import CeleryJobLayer, LaunchpadZopelessLayer
> +
> +
> +class ExportVulnerabilityJobTests(TestCaseWithFactory):
> + """Test case for ImportVulnerabilityJob."""
> +
> + layer = LaunchpadZopelessLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +
> + self.cve_set = getUtility(ICveSet)
> +
> + @property
> + def job_source(self):
> + return getUtility(IExportVulnerabilityJobSource)
> +
> + def test_getOopsVars(self):
> + """Test getOopsVars method."""
> + handler = VulnerabilityHandlerEnum.SOSS
> +
> + job = self.job_source.create(handler)
> + vars = job.getOopsVars()
> + naked_job = removeSecurityProxy(job)
> + self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars)
> + self.assertIn(
> + ("vulnerability_job_type", naked_job.job_type.title), vars
> + )
> + self.assertIn(("handler", naked_job.handler), vars)
> +
> + def test___repr__(self):
> + """Test __repr__ method."""
> + handler = VulnerabilityHandlerEnum.SOSS
> + information_type = InformationType.PRIVATESECURITY.value
> +
> + metadata = {
> + "request": {
> + "sources": [],
> + "information_type": information_type,
> + },
> + "result": {
> + "error_description": [],
> + "succeeded": [],
> + "failed": [],
> + },
> + "data": {
> + "export_link": "",
> + },
> + }
> +
> + job = self.job_source.create(
> + handler, information_type=information_type
> + )
> +
> + expected = (
> + "<ExportVulnerabilityJob for "
> + f"handler: {handler}, "
> + f"metadata: {metadata}"
> + ">"
> + )
> + self.assertEqual(expected, repr(job))
> +
> + def test_create_with_existing_in_progress_job(self):
> + """If there's already a waiting/running ExportVulnerabilityJob for the
> + handler ExportVulnerabilityJob.create() raises an exception.
> + """
> + handler = VulnerabilityHandlerEnum.SOSS
> +
> + # Job waiting status
> + job = self.job_source.create(handler)
> + waiting_exception = self.assertRaises(
> + VulnerabilityJobInProgress,
> + self.job_source.create,
> + handler,
> + )
> + self.assertEqual(job, waiting_exception.job)
> +
> + # Job status from WAITING to RUNNING
> + job.start()
> + running_exception = self.assertRaises(
> + VulnerabilityJobInProgress, self.job_source.create, handler
> + )
> + self.assertEqual(job, running_exception.job)
> +
> + def test_create_with_existing_completed_job(self):
> + """If there's already a completed ExportVulnerabilityJob for the
> + handler the job can be runned again.
> + """
> + handler = VulnerabilityHandlerEnum.SOSS
> +
> + job = self.job_source.create(handler)
> + job.start()
> + job.complete()
> + self.assertEqual(job.status, JobStatus.COMPLETED)
> +
> + job_duplicated = self.job_source.create(handler)
> + job_duplicated.start()
> + job_duplicated.complete()
> + self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> + def test_create_with_existing_failed_job(self):
> + """If there's a failed ExportVulnerabilityJob for the handler the job
> + can be runned again.
> + """
> + handler = VulnerabilityHandlerEnum.SOSS
> +
> + job = self.job_source.create(handler)
> + job.start()
> + job.fail()
> + self.assertEqual(job.status, JobStatus.FAILED)
> +
> + job_duplicated = self.job_source.create(handler)
> + job_duplicated.start()
> + job_duplicated.complete()
> + self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> + def test_arguments(self):
> + """Test that ExportVulnerabilityJob specified with arguments can
> + be gotten out again."""
> +
> + handler = VulnerabilityHandlerEnum.SOSS
> + information_type = InformationType.PRIVATESECURITY.value
> +
> + metadata = {
> + "request": {
> + "sources": ["https://launchpad.net/ubuntu"],
> + "information_type": information_type,
> + },
> + "result": {
> + "error_description": [],
> + "succeeded": [],
> + "failed": [],
> + },
> + "data": {
> + "export_link": "",
> + },
> + }
> +
> + job = self.job_source.create(
> + handler,
> + sources=["https://launchpad.net/ubuntu"],
> + information_type=information_type,
> + )
> +
> + naked_job = removeSecurityProxy(job)
> + self.assertEqual(naked_job.handler, handler)
> +
> + self.assertEqual(naked_job.metadata, metadata)
> +
> + def test_run_with_no_CVEs(self):
> + """
> + Run ExportVulnerabilityJob but with no CVE that also has its bug and
> + vulnerability.
> +
> + Note: Since we use the database to look for CVE sequence names, the
> + dev environment's sample database must have no CVEs with bugs and
> + vulnerabilities for this test to pass. If they ever get added, the
> + test will need to be updated to use a different approach.
> + """
> +
> + self.factory.makeDistribution(name="soss")
> +
> + job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS)
> +
> + # Run the job as bug_importer since normal users don't have permission
> + # This is to ensure we are actually testing the "no CVEs" case
> + # instead of hitting a permission hiding the CVEs.
> + with person_logged_in(self.bug_importer):
nice
> + job.run()
> +
> + self.assertEqual(
> + job.metadata.get("result"),
> + {
> + "succeeded": [],
> + "failed": [],
> + "error_description": ["No CVEs to export"],
> + },
> + )
> +
> + def _put_cve_in_soss(self):
> + self.factory.makePerson(name="octagalland")
> + self.factory.makeDistribution(
> + name="soss",
> + displayname="SOSS",
> + information_type=InformationType.PROPRIETARY,
> + )
> +
> + sampledata = Path(__file__).parent / "sampledata"
> +
> + soss_importer = SOSSImporter()
> +
> + imported_list = []
> + for file in sampledata.iterdir():
> + cve_sequence = file.name.lstrip("CVE-")
> + if not self.cve_set[cve_sequence]:
> + self.factory.makeCVE(sequence=cve_sequence)
> +
> + bug, vulnerability = soss_importer.import_cve_from_file(file)
> + imported_list.append((cve_sequence, bug, vulnerability))
> +
> + return imported_list
> +
> + def test_run_export(self):
> + """Run ExportVulnerabilityJob."""
> + imported_list = self._put_cve_in_soss()
> + information_type = InformationType.PRIVATESECURITY.value
> + export_link = "http://example.com/fake-url"
> +
> + self.patch(
> + LibraryFileAlias,
> + "getURL",
> + lambda self: export_link,
> + )
> +
> + job = self.job_source.create(
> + handler=VulnerabilityHandlerEnum.SOSS,
> + information_type=information_type,
> + )
> +
> + with person_logged_in(self.bug_importer):
> + job.run()
> +
> + cve_names = [
> + f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> + ]
> +
> + naked_job_metadata = removeSecurityProxy(job.metadata)
> + naked_job_metadata["result"]["succeeded"].sort()
> + cve_names.sort()
> +
> + self.assertEqual(
> + naked_job_metadata,
> + {
> + "request": {
> + "sources": [],
> + "information_type": information_type,
> + },
> + "result": {
> + "error_description": [],
> + "succeeded": cve_names,
> + "failed": [],
> + },
> + "data": {
> + "export_link": export_link,
> + },
> + },
> + )
> +
> + def test_get(self):
> + """ExportVulnerabilityJob.get() returns the import job for the given
> + handler.
> + """
> + handler = VulnerabilityHandlerEnum.SOSS
> +
> + # There is no job before creating it
> + self.assertIs(None, self.job_source.get(handler))
> +
> + job = self.job_source.create(handler)
> + job_gotten = self.job_source.get(handler)
> +
> + self.assertIsInstance(job, ExportVulnerabilityJob)
> + self.assertEqual(job, job_gotten)
> +
> + def test_error_description_when_no_error(self):
> + """The ExportVulnerabilityJob.error_description property returns
> + None when no error description is recorded."""
> + handler = VulnerabilityHandlerEnum.SOSS
> + information_type = InformationType.PRIVATESECURITY.value
> +
> + job = self.job_source.create(
> + handler,
> + information_type=information_type,
> + )
> + self.assertEqual([], removeSecurityProxy(job).error_description)
> +
> + def test_error_description_set_when_notifying_about_user_errors(self):
> + """Test that error_description is set by notifyUserError()."""
> + handler = VulnerabilityHandlerEnum.SOSS
> + information_type = InformationType.PRIVATESECURITY.value
> +
> + job = self.job_source.create(
> + handler,
> + information_type=information_type,
> + )
> + message = "This is an example message."
> + job.notifyUserError(VulnerabilityJobException(message))
> + self.assertEqual([message], removeSecurityProxy(job).error_description)
> +
> +
> +class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
> + layer = CeleryJobLayer
> +
> + def setUp(self):
> + super().setUp()
> +
> + self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> + self.cve_set = getUtility(ICveSet)
> +
> + def _put_cve_in_soss(self):
> + self.factory.makePerson(name="octagalland")
> + self.factory.makeDistribution(
> + name="soss",
> + displayname="SOSS",
> + information_type=InformationType.PROPRIETARY,
> + )
> +
> + sampledata = Path(__file__).parent / "sampledata"
> +
> + soss_importer = SOSSImporter()
> +
> + imported_list = []
> + for file in sampledata.iterdir():
> + cve_sequence = (
> + file.name[4:] if file.name.startswith("CVE-") else file.name
> + )
> + if not self.cve_set[cve_sequence]:
> + self.factory.makeCVE(sequence=cve_sequence)
> +
> + bug, vulnerability = soss_importer.import_cve_from_file(file)
> + imported_list.append((cve_sequence, bug, vulnerability))
> +
> + return imported_list
> +
> + def test_job(self):
> + """Job runs via Celery."""
> + fixture = FeatureFixture(
> + {
> + "jobs.celery.enabled_classes": "ExportVulnerabilityJob",
> + }
> + )
> + self.useFixture(fixture)
> +
> + imported_list = self._put_cve_in_soss()
> + transaction.commit()
> +
> + job_source = getUtility(IExportVulnerabilityJobSource)
> +
> + handler = VulnerabilityHandlerEnum.SOSS
> + information_type = InformationType.PRIVATESECURITY.value
> + with block_on_job():
> + job_source.create(
> + handler,
> + information_type=information_type,
> + )
> + transaction.commit()
> +
> + cve_names = [
> + f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> + ]
> +
> + job = job_source.get(handler)
> +
> + naked_job_metadata = removeSecurityProxy(job.metadata)
> + naked_job_metadata["result"]["succeeded"].sort()
> + cve_names.sort()
> +
> + metadata_request = {
> + "sources": [],
> + "information_type": information_type,
> + }
> +
> + metadata_result = {
> + "error_description": [],
> + "succeeded": cve_names,
> + "failed": [],
> + }
> +
> + self.assertEqual(handler, job.handler)
> + self.assertEqual(metadata_request, naked_job_metadata["request"])
> + self.assertEqual(metadata_result, naked_job_metadata["result"])
> +
> + self.assertThat(
> + naked_job_metadata["data"]["export_link"],
> + MatchesRegex(
> + r".*exported_vulnerabilities_[0-9]+\.zip$",
> + ),
> + )
--
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492056
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master.
References