← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master

 

Answered! There are some further questions at times but I will try to implement the lot of them

Diff comments:

> diff --git a/lib/lp/bugs/model/exportvulnerabilityjob.py b/lib/lp/bugs/model/exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..cecb478
> --- /dev/null
> +++ b/lib/lp/bugs/model/exportvulnerabilityjob.py
> @@ -0,0 +1,346 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> +    "ExportVulnerabilityJob",
> +]
> +
> +import io
> +import logging
> +import os
> +import tempfile
> +import zipfile
> +from datetime import timedelta
> +from typing import List, Optional, Tuple
> +
> +from zope.component import getUtility
> +from zope.interface import implementer, provider
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IExportVulnerabilityJob,
> +    IExportVulnerabilityJobSource,
> +    VulnerabilityJobException,
> +    VulnerabilityJobInProgress,
> +    VulnerabilityJobType,
> +)
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.bugs.model.vulnerabilityjob import (
> +    VulnerabilityJob,
> +    VulnerabilityJobDerived,
> +)
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.bugs.scripts.soss.sossexport import SOSSExporter
> +from lp.registry.interfaces.distribution import IDistributionSet
> +from lp.registry.model.distribution import Distribution
> +from lp.services.config import config
> +from lp.services.database.interfaces import IPrimaryStore, IStore
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.model.job import Job
> +from lp.services.librarian.interfaces import ILibraryFileAliasSet
> +from lp.services.utils import utc_now
> +
> +DISTRIBUTION_NAME = "soss"
> +EXPIRATION_WEEK_INTERVAL = 1
> +logger = logging.getLogger(__name__)
> +
> +
> +@implementer(IExportVulnerabilityJob)
> +@provider(IExportVulnerabilityJobSource)
> +class ExportVulnerabilityJob(VulnerabilityJobDerived):
> +    class_job_type = VulnerabilityJobType.EXPORT_DATA
> +
> +    user_error_types = (VulnerabilityJobException,)
> +
> +    config = config.IExportVulnerabilityJobSource
> +
> +    def __init__(self, job):
> +        super().__init__(job)
> +        self.cve_set = getUtility(ICveSet)
> +        self.soss = getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)

Ah I see, I copied that part from improt script code and assumed we would be changing it later on after the ubuntu distro stuff added in. I will work on this.

> +
> +    @property
> +    def sources(self):
> +        return self.metadata.get("request").get("sources")
> +
> +    @property
> +    def information_type(self):
> +        return self.metadata.get("request").get("information_type")
> +
> +    @property
> +    def error_description(self):
> +        return self.metadata.get("result").get("error_description")
> +
> +    @classmethod
> +    def create(
> +        cls,
> +        handler,
> +        sources: Optional[List[str]] = None,
> +        information_type=InformationType.PRIVATESECURITY.value,
> +    ):
> +        """Create a new `ExportVulnerabilityJob`.
> +
> +        :param handler: What handler to use for importing the data. Can be one
> +            of a group of predefined classes (SOSS, UCT, ...).
> +        :param sources: A list of sources to export from. Gets used depending
> +            on the handler.
> +        """
> +        store = IPrimaryStore(VulnerabilityJob)
> +
> +        vulnerability_job = store.find(
> +            VulnerabilityJob,
> +            VulnerabilityJob.job_id == Job.id,
> +            VulnerabilityJob.job_type == cls.class_job_type,
> +            VulnerabilityJob.handler == handler,
> +            Job._status.is_in(
> +                (JobStatus.WAITING, JobStatus.RUNNING, JobStatus.SUSPENDED)
> +            ),
> +        ).one()
> +
> +        if vulnerability_job is not None:
> +            raise VulnerabilityJobInProgress(cls(vulnerability_job))
> +
> +        # Schedule the initialization.
> +        metadata = {
> +            "request": {
> +                "sources": sources if sources is not None else [],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        vulnerability_job = VulnerabilityJob(
> +            handler, cls.class_job_type, metadata
> +        )
> +        store.add(vulnerability_job)
> +        derived_job = cls(vulnerability_job)
> +        derived_job.celeryRunOnCommit()
> +        IStore(VulnerabilityJob).flush()
> +        return derived_job
> +
> +    @classmethod
> +    def get(cls, handler):
> +        """See `IExportVulnerabilityJob`."""
> +        vulnerability_job = (
> +            IStore(VulnerabilityJob)
> +            .find(
> +                VulnerabilityJob,
> +                VulnerabilityJob.job_id == Job.id,
> +                VulnerabilityJob.job_type == cls.class_job_type,
> +                VulnerabilityJob.handler == handler,
> +            )
> +            .one()
> +        )
> +        return None if vulnerability_job is None else cls(vulnerability_job)
> +
> +    def __repr__(self):
> +        """Returns an informative representation of the job."""
> +        parts = "%s for" % self.__class__.__name__
> +        parts += " handler: %s" % self.handler
> +        parts += ", metadata: %s" % self.metadata
> +        return "<%s>" % parts
> +
> +    def _get_exporter_to_record(
> +        self,
> +        handler: VulnerabilityHandlerEnum,
> +        information_type: InformationType = InformationType.PRIVATESECURITY,
> +    ):
> +        """Decide which parser and importer to use
> +
> +        :return: a tuple of (parser, importer) where parser is the function
> +        that gets a blob and returns a record and importer is the function that
> +        gets a record and imports it.
> +        """
> +
> +        if handler == VulnerabilityHandlerEnum.SOSS:

You mean in a `{"handler": SOSSExporter}` kind of way? It would make sense, just want to make sure ahaha

> +            exporter = SOSSExporter(
> +                information_type=information_type
> +            ).to_record
> +        else:
> +            exception = VulnerabilityJobException("Handler not found")
> +            self.notifyUserError(exception)
> +            raise exception
> +
> +        return exporter
> +
> +    def _get_bug_and_vulnerability(
> +        self, cve: CveModel, distribution: Distribution
> +    ) -> Tuple[Optional[BugModel], Optional[Vulnerability]]:
> +
> +        vulnerability = cve.getDistributionVulnerability(distribution)
> +
> +        if not vulnerability:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "does not have a vulnerability attached."
> +            )
> +            return None, None
> +
> +        bugs = vulnerability.bugs
> +
> +        if not bugs:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "does not have a bug attached."
> +            )
> +            return None, None
> +
> +        if len(bugs) > 1:
> +            logger.debug(
> +                f"[ExportVulnerabilityJob] CVE-{cve.sequence} "
> +                "has more than one bug attached."
> +            )
> +            return None, None
> +
> +        bug = bugs[0]
> +
> +        return removeSecurityProxy(bug), vulnerability

I didn't go down deep on this topic, I just added it around the bug because the bug.channel call within the export script were having permission problem while the vulnerability weren't having them. I will take a deeper look but it is possible that we just happen to no use any vulnerability data that is permission bound or all the ones we use having already being extracted possibly.

> +
> +    def run(self):
> +        """See `IRunnableJob`."""
> +
> +        information_type = InformationType.items[self.information_type]
> +        export_to_record = self._get_exporter_to_record(
> +            self.context.handler,
> +            information_type,
> +        )
> +
> +        logger.debug(
> +            "[ExportVulnerabilityJob] Getting CVEs to export and storing "
> +            "them in Records"
> +        )
> +
> +        exported_cves: List[Tuple[SOSSRecord, str]] = []
> +        for cve in self.cve_set:

Yeah, I tried following the spec word for word, as I assumed there was something I may not know yet. It makes sense for it to go away.

I'm sure there are a bunch of ways to do it, the one you mentioned being one of the better ones ahaha. I will take a look at this.

> +
> +            bug, vulnerability = self._get_bug_and_vulnerability(
> +                cve, self.soss
> +            )
> +
> +            if not bug or not vulnerability:
> +                continue
> +
> +            try:
> +                record = export_to_record(cve, self.soss, bug, vulnerability)
> +            except ValueError as e:
> +                self.notifyUserError(e)
> +                continue
> +
> +            if record is None:

I assumed that if a record is None then the export script would handle the error reporting (as it was like that in the previous iteration of the export script), and we wouldn't need to be too verbose on it. But if it would be safer, we can log it and put it in the errors metadata as normal.

Also, I wasn't sure if I should add the ones that fail to be converted to records here since currently we go over all the CVEs in the database and as a result, the list would be enormous. Though this was mainly for the previous iteration of the export script, now that the job code handles the validation, I think it does make sense to add the failed CVEs here.

> +                continue
> +
> +            exported_cves.append((record, record.candidate))
> +
> +        if exported_cves == []:
> +            exception = VulnerabilityJobException("No CVEs to export")
> +            self.notifyUserError(exception)
> +            return

Due to the problem we faced before about celery tasks not returning metadata if they raise exceptions. They do return the metadata with return, however.

> +
> +        # Create a temporary folder
> +        with tempfile.TemporaryDirectory() as temp_dir:
> +
> +            logger.debug(
> +                "[ExportVulnerabilityJob] Writing CVEs to temporary "
> +                f"directory {temp_dir}"
> +            )
> +
> +            for record, cve_name in exported_cves:
> +                file_path = os.path.join(temp_dir, cve_name)
> +
> +                with open(file_path, "w") as f:
> +
> +                    logger.debug(
> +                        "[ExportVulnerabilityJob] Writing CVE %s to file %s",
> +                        cve_name,
> +                        file_path,
> +                    )
> +
> +                    f.write(record.to_yaml())
> +
> +            # Create a zip archive of the temp folder
> +            buf = io.BytesIO()
> +            with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zipf:
> +                logger.debug(
> +                    "[ExportVulnerabilityJob] Compressing CVEs to zip"
> +                )
> +                for dirpath, _, filenames in os.walk(temp_dir):
> +                    for filename in filenames:
> +                        try:
> +                            logger.debug(
> +                                "[ExportVulnerabilityJob] Compressing file %s",

I assumed that the regular log level would be info or similar and that debug would need to be added whenever we wish to. If it is debug level by default, then I agree that it would be too verbose :')

> +                                filename,
> +                            )
> +                            filepath = os.path.join(dirpath, filename)
> +                            arcname = os.path.relpath(
> +                                filepath, temp_dir
> +                            )  # relative path inside zip
> +                            zipf.write(filepath, arcname)
> +                        except Exception as e:
> +                            logger.error(
> +                                f"Error adding file {filename} to zip: {e}"
> +                            )
> +                            self.metadata["result"]["failed"].append(filename)
> +                            self.notifyUserError(e)
> +                        else:
> +                            self.metadata["result"]["succeeded"].append(
> +                                filename
> +                            )
> +
> +        zip_name = f"exported_vulnerabilities_{self.context.id}.zip"
> +        contentType = "application/zip"
> +        file_size = buf.getbuffer().nbytes
> +
> +        expires = utc_now() + timedelta(weeks=EXPIRATION_WEEK_INTERVAL)
> +
> +        # Reset buffer position to the beginning since librarian will read it.
> +        buf.seek(0)
> +
> +        logger.debug("[ExportVulnerabilityJob] Writing zip to librarian")
> +        lfa = getUtility(ILibraryFileAliasSet).create(
> +            name=zip_name,
> +            size=file_size,
> +            file=buf,
> +            contentType=contentType,
> +            expires=expires,
> +        )
> +
> +        self.metadata["data"]["export_link"] = lfa.getURL()
> +
> +    def notifyUserError(self, error):
> +        """Calls up and also saves the error text in this job's metadata.
> +
> +        See `BaseRunnableJob`.
> +        """
> +        # This method is called when error is an instance of
> +        # self.user_error_types.
> +        super().notifyUserError(error)
> +        logger.error(error)
> +        error_description = self.metadata.get("result").get(
> +            "error_description", []
> +        )
> +        error_description.append(str(error))
> +        self.metadata["result"]["error_description"] = error_description
> +
> +    def getOopsVars(self):
> +        """See `IRunnableJob`."""
> +        vars = super().getOopsVars()
> +        vars.extend(
> +            [
> +                ("vulnerabilityjob_job_id", self.context.id),
> +                ("vulnerability_job_type", self.context.job_type.title),
> +                ("handler", self.context.handler),
> +            ]
> +        )
> +        return vars
> diff --git a/lib/lp/bugs/tests/sampledata/CVE-2025-1979 b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> new file mode 100644
> index 0000000..a562efa
> --- /dev/null
> +++ b/lib/lp/bugs/tests/sampledata/CVE-2025-1979
> @@ -0,0 +1,78 @@
> +References:

I wasn't sure whether it would be considered bad design if I just took it from the other dir. I don't like the duplication either, if you think getting it from the other folder would make sense, I would also be happy to delete these ^^

> +- https://github.com/ray-project/ray/commit/64a2e4010522d60b90c389634f24df77b603d85d
> +- https://github.com/ray-project/ray/issues/50266
> +- https://github.com/ray-project/ray/pull/50409
> +- https://security.snyk.io/vuln/SNYK-PYTHON-RAY-8745212
> +- https://ubuntu.com/security/notices/SSN-148-1.json?show_hidden=true
> +Notes:
> +- This is a sample soss cve with all the fields filled for testing
> +- sample note 2
> +Priority: Low
> +Priority-Reason: 'Unrealistic exploitation scenario. Logs are stored locally and not
> +  transferred between agents, so local log access is the only conceivable method to
> +  view the password for the redis instance (i.e., no possibility of MitM to access
> +  the logs). Given the requirement for priviledged system access to access log files
> +  the real "danger" posed by the vulnerability is quite low, and that is reflected
> +  in this priority assignment. '
> +Assigned-To: janitor
> +Packages:
> +  conda:
> +  - Name: ray
> +    Channel: jammy:1.17.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: not-affected
> +    Note: 2.22.0+soss.1
> +  maven:
> +  - Name: vllm
> +    Channel: noble:0.7.3/stable
> +    Repositories:
> +    - soss-src-stable-local
> +    Status: needs-triage
> +    Note: ''
> +  python:
> +  - Name: pyyaml
> +    Channel: jammy:2.22.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: not-affected
> +    Note: ''
> +  - Name: ray
> +    Channel: jammy:2.22.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: released
> +    Note: 2.22.0+soss.1
> +  rust:
> +  - Name: ray
> +    Channel: focal:0.27.0/stable
> +    Repositories:
> +    - nvidia-pb3-python-stable-local
> +    Status: deferred
> +    Note: 2.22.0+soss.1
> +  unpackaged:
> +  - Name: vllm
> +    Channel: noble:0.7.3/stable
> +    Repositories:
> +    - soss-src-stable-local
> +    Status: needed
> +    Note: ''
> +Candidate: CVE-2025-1979
> +Description: "Versions of the package ray before 2.43.0 are vulnerable to Insertion\
> +  \ of Sensitive Information into Log File where the redis password is being logged\
> +  \ in the standard logging. If the redis password is passed as an argument, it will\
> +  \ be logged and could potentially leak the password.\r\rThis is only exploitable\
> +  \ if:\r\r1) Logging is enabled;\r\r2) Redis is using password authentication;\r\r\
> +  3) Those logs are accessible to an attacker, who can reach that redis instance.\r\
> +  \r**Note:**\r\rIt is recommended that anyone who is running in this configuration\
> +  \ should update to the latest version of Ray, then rotate their redis password."
> +CVSS:
> +- source: report@xxxxxxx
> +  vector: CVSS:3.1/AV:L/AC:H/PR:L/UI:N/S:C/C:H/I:L/A:N
> +  baseScore: 6.4
> +  baseSeverity: MEDIUM
> +- source: security-advisories@xxxxxxxxxx
> +  vector: CVSS:3.1/AV:A/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H
> +  baseScore: 9.0
> +  baseSeverity: CRITICAL
> +PublicDate: '2025-03-06T05:15:16.213'
> diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> new file mode 100644
> index 0000000..11355cc
> --- /dev/null
> +++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
> @@ -0,0 +1,404 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +from pathlib import Path
> +
> +import transaction
> +from testtools.matchers import MatchesRegex
> +from zope.component import getUtility
> +from zope.security.proxy import removeSecurityProxy
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IExportVulnerabilityJobSource,
> +    VulnerabilityJobException,
> +    VulnerabilityJobInProgress,
> +)
> +from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
> +from lp.bugs.scripts.soss.sossimport import SOSSImporter
> +from lp.services.features.testing import FeatureFixture
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.tests import block_on_job
> +from lp.services.librarian.model import LibraryFileAlias
> +from lp.testing import TestCaseWithFactory, person_logged_in
> +from lp.testing.layers import CeleryJobLayer, LaunchpadZopelessLayer
> +
> +
> +class ExportVulnerabilityJobTests(TestCaseWithFactory):
> +    """Test case for ImportVulnerabilityJob."""
> +
> +    layer = LaunchpadZopelessLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +
> +        self.cve_set = getUtility(ICveSet)
> +
> +    @property
> +    def job_source(self):
> +        return getUtility(IExportVulnerabilityJobSource)
> +
> +    def test_getOopsVars(self):
> +        """Test getOopsVars method."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        vars = job.getOopsVars()
> +        naked_job = removeSecurityProxy(job)
> +        self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars)
> +        self.assertIn(
> +            ("vulnerability_job_type", naked_job.job_type.title), vars
> +        )
> +        self.assertIn(("handler", naked_job.handler), vars)
> +
> +    def test___repr__(self):
> +        """Test __repr__ method."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        metadata = {
> +            "request": {
> +                "sources": [],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        job = self.job_source.create(
> +            handler, information_type=information_type
> +        )
> +
> +        expected = (
> +            "<ExportVulnerabilityJob for "
> +            f"handler: {handler}, "
> +            f"metadata: {metadata}"
> +            ">"
> +        )
> +        self.assertEqual(expected, repr(job))
> +
> +    def test_create_with_existing_in_progress_job(self):
> +        """If there's already a waiting/running ExportVulnerabilityJob for the
> +        handler ExportVulnerabilityJob.create() raises an exception.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        # Job waiting status
> +        job = self.job_source.create(handler)
> +        waiting_exception = self.assertRaises(
> +            VulnerabilityJobInProgress,
> +            self.job_source.create,
> +            handler,
> +        )
> +        self.assertEqual(job, waiting_exception.job)
> +
> +        # Job status from WAITING to RUNNING
> +        job.start()
> +        running_exception = self.assertRaises(
> +            VulnerabilityJobInProgress, self.job_source.create, handler
> +        )
> +        self.assertEqual(job, running_exception.job)
> +
> +    def test_create_with_existing_completed_job(self):
> +        """If there's already a completed ExportVulnerabilityJob for the
> +        handler the job can be runned again.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        job.start()
> +        job.complete()
> +        self.assertEqual(job.status, JobStatus.COMPLETED)
> +
> +        job_duplicated = self.job_source.create(handler)
> +        job_duplicated.start()
> +        job_duplicated.complete()
> +        self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> +    def test_create_with_existing_failed_job(self):
> +        """If there's a failed ExportVulnerabilityJob for the handler the job
> +        can be runned again.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        job = self.job_source.create(handler)
> +        job.start()
> +        job.fail()
> +        self.assertEqual(job.status, JobStatus.FAILED)
> +
> +        job_duplicated = self.job_source.create(handler)
> +        job_duplicated.start()
> +        job_duplicated.complete()
> +        self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
> +
> +    def test_arguments(self):
> +        """Test that ExportVulnerabilityJob specified with arguments can
> +        be gotten out again."""
> +
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        metadata = {
> +            "request": {
> +                "sources": ["https://launchpad.net/ubuntu";],
> +                "information_type": information_type,
> +            },
> +            "result": {
> +                "error_description": [],
> +                "succeeded": [],
> +                "failed": [],
> +            },
> +            "data": {
> +                "export_link": "",
> +            },
> +        }
> +
> +        job = self.job_source.create(
> +            handler,
> +            sources=["https://launchpad.net/ubuntu";],
> +            information_type=information_type,
> +        )
> +
> +        naked_job = removeSecurityProxy(job)
> +        self.assertEqual(naked_job.handler, handler)
> +
> +        self.assertEqual(naked_job.metadata, metadata)
> +
> +    def test_run_with_no_CVEs(self):
> +        """
> +        Run ExportVulnerabilityJob but with no CVE that also has its bug and
> +        vulnerability.
> +
> +        Note: Since we use the database to look for CVE sequence names, the

We aren't right now, at least for the sample data in database. The problem is, there are sample data of CVEs in the dev database, which is what this part is trying to be prepared against.

> +        dev environment's sample database must have no CVEs with bugs and
> +        vulnerabilities for this test to pass. If they ever get added, the
> +        test will need to be updated to use a different approach.
> +        """
> +
> +        self.factory.makeDistribution(name="soss")
> +
> +        job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS)
> +
> +        # Run the job as bug_importer since normal users don't have permission
> +        # This is to ensure we are actually testing the "no CVEs" case
> +        # instead of hitting a permission hiding the CVEs.
> +        with person_logged_in(self.bug_importer):

It is highly possible, it was initially throwing an error without it and I assumed it needed it, but if it shouldn't need it I should check. As you said, it may even be fixed ^^

> +            job.run()
> +
> +        self.assertEqual(
> +            job.metadata.get("result"),
> +            {
> +                "succeeded": [],
> +                "failed": [],
> +                "error_description": ["No CVEs to export"],
> +            },
> +        )
> +
> +    def _put_cve_in_soss(self):
> +        self.factory.makePerson(name="octagalland")
> +        self.factory.makeDistribution(
> +            name="soss",
> +            displayname="SOSS",
> +            information_type=InformationType.PROPRIETARY,
> +        )
> +
> +        sampledata = Path(__file__).parent / "sampledata"
> +
> +        soss_importer = SOSSImporter()
> +
> +        imported_list = []
> +        for file in sampledata.iterdir():
> +            cve_sequence = file.name.lstrip("CVE-")
> +            if not self.cve_set[cve_sequence]:
> +                self.factory.makeCVE(sequence=cve_sequence)
> +
> +            bug, vulnerability = soss_importer.import_cve_from_file(file)
> +            imported_list.append((cve_sequence, bug, vulnerability))
> +
> +        return imported_list
> +
> +    def test_run_export(self):
> +        """Run ExportVulnerabilityJob."""
> +        imported_list = self._put_cve_in_soss()
> +        information_type = InformationType.PRIVATESECURITY.value
> +        export_link = "http://example.com/fake-url";
> +
> +        self.patch(
> +            LibraryFileAlias,
> +            "getURL",
> +            lambda self: export_link,
> +        )
> +
> +        job = self.job_source.create(
> +            handler=VulnerabilityHandlerEnum.SOSS,
> +            information_type=information_type,
> +        )
> +
> +        with person_logged_in(self.bug_importer):
> +            job.run()
> +
> +        cve_names = [
> +            f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> +        ]
> +
> +        naked_job_metadata = removeSecurityProxy(job.metadata)
> +        naked_job_metadata["result"]["succeeded"].sort()
> +        cve_names.sort()
> +
> +        self.assertEqual(
> +            naked_job_metadata,
> +            {
> +                "request": {
> +                    "sources": [],
> +                    "information_type": information_type,
> +                },
> +                "result": {
> +                    "error_description": [],
> +                    "succeeded": cve_names,
> +                    "failed": [],
> +                },
> +                "data": {
> +                    "export_link": export_link,
> +                },
> +            },
> +        )
> +
> +    def test_get(self):
> +        """ExportVulnerabilityJob.get() returns the import job for the given
> +        handler.
> +        """
> +        handler = VulnerabilityHandlerEnum.SOSS
> +
> +        # There is no job before creating it
> +        self.assertIs(None, self.job_source.get(handler))
> +
> +        job = self.job_source.create(handler)
> +        job_gotten = self.job_source.get(handler)
> +
> +        self.assertIsInstance(job, ExportVulnerabilityJob)
> +        self.assertEqual(job, job_gotten)
> +
> +    def test_error_description_when_no_error(self):
> +        """The ExportVulnerabilityJob.error_description property returns
> +        None when no error description is recorded."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        job = self.job_source.create(
> +            handler,
> +            information_type=information_type,
> +        )
> +        self.assertEqual([], removeSecurityProxy(job).error_description)
> +
> +    def test_error_description_set_when_notifying_about_user_errors(self):
> +        """Test that error_description is set by notifyUserError()."""
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +
> +        job = self.job_source.create(
> +            handler,
> +            information_type=information_type,
> +        )
> +        message = "This is an example message."
> +        job.notifyUserError(VulnerabilityJobException(message))
> +        self.assertEqual([message], removeSecurityProxy(job).error_description)
> +
> +
> +class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
> +    layer = CeleryJobLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +
> +        self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
> +        self.cve_set = getUtility(ICveSet)
> +
> +    def _put_cve_in_soss(self):
> +        self.factory.makePerson(name="octagalland")
> +        self.factory.makeDistribution(
> +            name="soss",
> +            displayname="SOSS",
> +            information_type=InformationType.PROPRIETARY,
> +        )
> +
> +        sampledata = Path(__file__).parent / "sampledata"
> +
> +        soss_importer = SOSSImporter()
> +
> +        imported_list = []
> +        for file in sampledata.iterdir():
> +            cve_sequence = (
> +                file.name[4:] if file.name.startswith("CVE-") else file.name
> +            )
> +            if not self.cve_set[cve_sequence]:
> +                self.factory.makeCVE(sequence=cve_sequence)
> +
> +            bug, vulnerability = soss_importer.import_cve_from_file(file)
> +            imported_list.append((cve_sequence, bug, vulnerability))
> +
> +        return imported_list
> +
> +    def test_job(self):
> +        """Job runs via Celery."""
> +        fixture = FeatureFixture(
> +            {
> +                "jobs.celery.enabled_classes": "ExportVulnerabilityJob",
> +            }
> +        )
> +        self.useFixture(fixture)
> +
> +        imported_list = self._put_cve_in_soss()
> +        transaction.commit()
> +
> +        job_source = getUtility(IExportVulnerabilityJobSource)
> +
> +        handler = VulnerabilityHandlerEnum.SOSS
> +        information_type = InformationType.PRIVATESECURITY.value
> +        with block_on_job():
> +            job_source.create(
> +                handler,
> +                information_type=information_type,
> +            )
> +            transaction.commit()
> +
> +        cve_names = [
> +            f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
> +        ]
> +
> +        job = job_source.get(handler)
> +
> +        naked_job_metadata = removeSecurityProxy(job.metadata)
> +        naked_job_metadata["result"]["succeeded"].sort()
> +        cve_names.sort()
> +
> +        metadata_request = {
> +            "sources": [],
> +            "information_type": information_type,
> +        }
> +
> +        metadata_result = {
> +            "error_description": [],
> +            "succeeded": cve_names,
> +            "failed": [],
> +        }
> +
> +        self.assertEqual(handler, job.handler)
> +        self.assertEqual(metadata_request, naked_job_metadata["request"])
> +        self.assertEqual(metadata_result, naked_job_metadata["result"])
> +
> +        self.assertThat(
> +            naked_job_metadata["data"]["export_link"],
> +            MatchesRegex(
> +                r".*exported_vulnerabilities_[0-9]+\.zip$",
> +            ),
> +        )


-- 
https://code.launchpad.net/~ilkeremrekoc/launchpad/+git/launchpad/+merge/492056
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilkeremrekoc/launchpad:add-exportvulnerabilityjob into launchpad:master.



References