← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~enriqueesanchz/launchpad:add-vulnerabilityjob-model into launchpad:master

 

Addressed the comments :)

Diff comments:

> diff --git a/lib/lp/bugs/model/importvulnerabilityjob.py b/lib/lp/bugs/model/importvulnerabilityjob.py
> new file mode 100644
> index 0000000..3753a7f
> --- /dev/null
> +++ b/lib/lp/bugs/model/importvulnerabilityjob.py
> @@ -0,0 +1,280 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> +    "ImportVulnerabilityJob",
> +]
> +
> +import logging
> +import re
> +
> +from zope.component import getUtility
> +from zope.interface import implementer, provider
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IImportVulnerabilityJob,
> +    IImportVulnerabilityJobSource,
> +    VulnerabilityJobException,
> +    VulnerabilityJobInProgress,
> +    VulnerabilityJobType,
> +)
> +from lp.bugs.model.vulnerabilityjob import (
> +    VulnerabilityJob,
> +    VulnerabilityJobDerived,
> +)
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.bugs.scripts.soss.sossimport import SOSSImporter
> +from lp.code.interfaces.githosting import IGitHostingClient
> +from lp.code.interfaces.gitlookup import IGitLookup
> +from lp.services.config import config
> +from lp.services.database.interfaces import IPrimaryStore, IStore
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.model.job import Job
> +from lp.testing import person_logged_in
> +
> +CVE_PATTERN = re.compile(r"^CVE-\d{4}-\d+$")
> +logger = logging.getLogger(__name__)
> +
> +
> +@implementer(IImportVulnerabilityJob)
> +@provider(IImportVulnerabilityJobSource)
> +class ImportVulnerabilityJob(VulnerabilityJobDerived):
> +    class_job_type = VulnerabilityJobType.IMPORT_DATA
> +
> +    user_error_types = (VulnerabilityJobException,)
> +
> +    config = config.IImportVulnerabilityJobSource
> +
> +    @property
> +    def git_repository(self):
> +        return self.metadata.get("git_repository")
> +
> +    @property
> +    def git_ref(self):
> +        return self.metadata.get("git_ref")
> +
> +    @property
> +    def git_paths(self):
> +        return self.metadata.get("git_paths")
> +
> +    @property
> +    def information_type(self):
> +        return self.metadata.get("information_type")
> +
> +    @property
> +    def import_since_commit_sha1(self):
> +        return self.metadata.get("import_since_commit_sha1")
> +
> +    @property
> +    def error_description(self):
> +        return self.metadata.get("error_description")
> +
> +    @classmethod
> +    def create(
> +        cls,
> +        handler,
> +        git_repository,
> +        git_ref,
> +        git_paths,
> +        information_type,
> +        import_since_commit_sha1=False,
> +    ):
> +        """Create a new `ImportVulnerabilityJob`.
> +
> +        :param handler: What handler to use for importing the data. Can be one
> +             of a group of predefined classes (SOSS, UCT, ...).
> +        :param git_repository: Git repository to import from.
> +        :param git_ref: Git branch/tag to get data from.
> +        :param git_paths: List of relative directories within the repository to
> +            get data from.
> +        :param information_type: Whether imported data (bugs) should be private
> +            or public. Can be one of a group of predefined options (PUBLIC,
> +            PRIVATE_SECURITY...). If the source git repository is private, then
> +            the information_type needs to be private also.
> +        :param import_since_commit_sha1: Import data from files that were
> +            altered since the given commit_sha1
> +        """
> +        store = IPrimaryStore(VulnerabilityJob)
> +
> +        vulnerability_job = store.find(
> +            VulnerabilityJob,
> +            VulnerabilityJob.job_id == Job.id,
> +            VulnerabilityJob.job_type == cls.class_job_type,
> +            VulnerabilityJob.handler == handler,
> +        ).one()
> +
> +        if vulnerability_job is not None and (
> +            vulnerability_job.job.status == JobStatus.WAITING
> +            or vulnerability_job.job.status == JobStatus.RUNNING
> +        ):
> +            raise VulnerabilityJobInProgress(cls(vulnerability_job))
> +
> +        # Schedule the initialization.
> +        metadata = {

Agree, added :)

> +            "git_repository": git_repository,
> +            "git_ref": git_ref,
> +            "git_paths": git_paths,
> +            "information_type": information_type,
> +            "import_since_commit_sha1": import_since_commit_sha1,
> +        }
> +
> +        vulnerability_job = VulnerabilityJob(
> +            handler, cls.class_job_type, metadata
> +        )
> +        store.add(vulnerability_job)
> +        derived_job = cls(vulnerability_job)
> +        derived_job.celeryRunOnCommit()
> +        IStore(VulnerabilityJob).flush()
> +        return derived_job
> +
> +    @classmethod
> +    def get(cls, handler):
> +        """See `IImportVulnerabilityJob`."""
> +        vulnerability_job = (
> +            IStore(VulnerabilityJob)
> +            .find(
> +                VulnerabilityJob,
> +                VulnerabilityJob.job_id == Job.id,
> +                VulnerabilityJob.job_type == cls.class_job_type,
> +                VulnerabilityJob.handler == handler,
> +            )
> +            .one()
> +        )
> +        return None if vulnerability_job is None else cls(vulnerability_job)
> +
> +    def __repr__(self):
> +        """Returns an informative representation of the job."""
> +        parts = "%s for" % self.__class__.__name__
> +        parts += " handler: %s" % self.handler
> +        parts += ", metadata: %s" % self.metadata
> +        return "<%s>" % parts
> +
> +    def _get_parser_importer(
> +        self,
> +        handler: VulnerabilityHandlerEnum,
> +        information_type: InformationType,
> +    ):
> +        """Decide which parser and importer to use
> +
> +        :return: a tuple of (parser, importer) where parser is the function
> +        that gets a blob and returns a record and importer is the function that
> +        gets a record and imports it.
> +        """
> +
> +        if handler == VulnerabilityHandlerEnum.SOSS:
> +            parser = SOSSRecord.from_yaml
> +            importer = SOSSImporter(
> +                information_type=information_type
> +            ).import_cve
> +        else:
> +            exception = VulnerabilityJobException("Handler not found")

This is done in self.notifyUserError

> +            self.notifyUserError(exception)
> +            raise exception
> +
> +        return parser, importer
> +
> +    def run(self):
> +        """See `IRunnableJob`."""
> +        self.metadata["result"] = {"succeeded": [], "failed": []}

I think it makes sense to improve it in a separate MP, maybe we should create a ticket for this.

> +        admin = getUtility(ILaunchpadCelebrities).admin
> +
> +        # InformationType is passed as a value as DBItem is not serializable
> +        information_type = InformationType.items[self.information_type]
> +        parser, importer = self._get_parser_importer(
> +            self.context.handler, information_type
> +        )
> +
> +        # Get git repository
> +        git_lookup = getUtility(IGitLookup)
> +        repository = git_lookup.getByUrl(self.git_repository)
> +        if not repository:
> +            exception = VulnerabilityJobException("Git repository not found")
> +            self.notifyUserError(exception)
> +            raise exception
> +
> +        # Get git reference
> +        ref = repository.getRefByPath(self.git_ref)
> +        if not ref:
> +            exception = VulnerabilityJobException("Git ref not found")
> +            self.notifyUserError(exception)
> +            raise exception
> +
> +        # turnip API call to get added/modified files
> +        stats = getUtility(IGitHostingClient).getDiffStats(
> +            path=self.git_repository,
> +            old=self.import_since_commit_sha1,
> +            new=ref.commit_sha1,
> +            logger=logger,
> +        )
> +
> +        files = [*stats.get("added", ()), *stats.get("modified", ())]
> +        for file in files:
> +            # Check if files that changed are in the desired path
> +            found_path = False
> +            for path in self.git_paths:
> +                if file.startswith(path):
> +                    found_path = True
> +                    break
> +
> +            if not found_path:
> +                logger.debug(
> +                    f"[ImportVulnerabilityJob] {file} is not in git_paths"
> +                )
> +                continue
> +
> +            cve_sequence = file.rsplit("/", maxsplit=1)[-1]
> +            if not CVE_PATTERN.match(cve_sequence):
> +                logger.debug(
> +                    f"[ImportVulnerabilityJob] {cve_sequence} is not a CVE "
> +                    "sequence"
> +                )
> +                continue
> +
> +            try:
> +                logger.debug(f"[ImportVulnerabilityJob] Getting {file}")
> +                blob = ref.getBlob(file)

The parser cares about that. `from_yaml` checks that the input is a yaml

> +
> +                logger.debug(
> +                    f"[ImportVulnerabilityJob] Parsing {cve_sequence}"
> +                )
> +                record = parser(blob)
> +
> +                # Logged as admin
> +                with person_logged_in(admin):
> +                    bug, vulnerability = importer(record, cve_sequence)
> +
> +                if bug and vulnerability:
> +                    self.metadata["result"]["succeeded"].append(cve_sequence)
> +                else:
> +                    self.metadata["result"]["failed"].append(cve_sequence)
> +            except Exception as e:
> +                self.notifyUserError(e)

Currently I'm not using notifyUserError to send an email to the user as I did not define `getErrorRecipients`. I'm just using that function to log the error + add it to the `error_description` field in `metadata`.

Agree that doing that could be confusing, we can change it and make whatever we think that makes more sense.

> +
> +    def notifyUserError(self, error):

Replied above ^

> +        """Calls up and also saves the error text in this job's metadata.
> +
> +        See `BaseRunnableJob`.
> +        """
> +        # This method is called when error is an instance of
> +        # self.user_error_types.
> +        super().notifyUserError(error)
> +        error_description = self.metadata.get("error_description", [])
> +        error_description.append(str(error))
> +        self.metadata = dict(
> +            self.metadata, error_description=error_description
> +        )
> +
> +    def getOopsVars(self):
> +        """See `IRunnableJob`."""
> +        vars = super().getOopsVars()
> +        vars.extend(
> +            [
> +                ("vulnerabilityjob_job_id", self.context.id),
> +                ("vulnerability_job_type", self.context.job_type.title),
> +                ("handler", self.context.handler),
> +            ]
> +        )
> +        return vars
> diff --git a/lib/lp/bugs/model/vulnerabilityjob.py b/lib/lp/bugs/model/vulnerabilityjob.py
> new file mode 100644
> index 0000000..80a0753
> --- /dev/null
> +++ b/lib/lp/bugs/model/vulnerabilityjob.py
> @@ -0,0 +1,102 @@
> +# Copyright 2025 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> +    "VulnerabilityJob",
> +    "VulnerabilityJobDerived",
> +]
> +
> +from lazr.delegates import delegate_to
> +from storm.databases.postgres import JSON
> +from storm.locals import And, Int, Reference
> +from zope.interface import implementer
> +
> +from lp.app.errors import NotFoundError
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.vulnerabilityjob import (
> +    IVulnerabilityJob,
> +    VulnerabilityJobType,
> +)
> +from lp.services.database.enumcol import DBEnum
> +from lp.services.database.interfaces import IStore
> +from lp.services.database.stormbase import StormBase
> +from lp.services.job.model.job import EnumeratedSubclass, Job
> +from lp.services.job.runner import BaseRunnableJob
> +
> +
> +@implementer(IVulnerabilityJob)
> +class VulnerabilityJob(StormBase):
> +    """Base class for jobs related to Vulnerabilities."""
> +
> +    __storm_table__ = "VulnerabilityJob"
> +
> +    id = Int(primary=True)
> +
> +    handler = DBEnum(enum=VulnerabilityHandlerEnum, allow_none=False)
> +
> +    job_type = DBEnum(enum=VulnerabilityJobType, allow_none=False)
> +
> +    job_id = Int(name="job")
> +    job = Reference(job_id, Job.id)
> +
> +    metadata = JSON("json_data", allow_none=False)
> +
> +    def __init__(self, handler, job_type, metadata):
> +        super().__init__()
> +        self.job = Job()
> +        self.handler = handler
> +        self.job_type = job_type
> +        self.metadata = metadata
> +
> +    def makeDerived(self):
> +        return VulnerabilityJobDerived.makeSubclass(self)
> +
> +
> +@delegate_to(IVulnerabilityJob)
> +class VulnerabilityJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
> +    """Abstract class for deriving from VulnerabilityJob."""
> +
> +    def __init__(self, job):
> +        self.context = job
> +
> +    @classmethod
> +    def get(cls, job_id):
> +        """Get a job by id.
> +
> +        :return: the VulnerabilityJob with the specified id, as
> +                 the current VulnerabilityJobDerived subclass.
> +        :raises: NotFoundError if there is no job with the specified id,
> +                 or its job_type does not match the desired subclass.
> +        """
> +        job = VulnerabilityJob.get(job_id)
> +        if job.job_type != cls.class_job_type:
> +            raise NotFoundError(
> +                "No object found with id %d and type %s"
> +                % (job_id, cls.class_job_type.title)
> +            )
> +        return cls(job)
> +
> +    @classmethod
> +    def iterReady(cls):
> +        """Iterate through all ready VulnerabilityJob."""
> +        jobs = IStore(VulnerabilityJob).find(
> +            VulnerabilityJob,
> +            And(
> +                VulnerabilityJob.job_type == cls.class_job_type,
> +                VulnerabilityJob.job == Job.id,

Sure, preparing a follow up db patch.

> +                Job.id.is_in(Job.ready_jobs),
> +            ),
> +        )
> +        return (cls(job) for job in jobs)
> +
> +    def getOopsVars(self):
> +        """See `IRunnableJob`."""
> +        vars = super().getOopsVars()
> +        vars.extend(
> +            [
> +                ("vulnerabilityjob_job_id", self.context.id),
> +                ("vulnerability_job_type", self.context.job_type.title),
> +                ("handler", self.context.handler),
> +            ]
> +        )
> +        return vars


-- 
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491368
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-vulnerabilityjob-model into launchpad:master.



References