launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32905
Re: [Merge] ~enriqueesanchz/launchpad:add-vulnerabilityjob-model into launchpad:master
Addressed the comments :)
Diff comments:
> diff --git a/lib/lp/bugs/model/importvulnerabilityjob.py b/lib/lp/bugs/model/importvulnerabilityjob.py
> new file mode 100644
> index 0000000..3753a7f
> --- /dev/null
> +++ b/lib/lp/bugs/model/importvulnerabilityjob.py
> @@ -0,0 +1,280 @@
> +# Copyright 2025 Canonical Ltd. This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> + "ImportVulnerabilityJob",
> +]
> +
> +import logging
> +import re
> +
> +from zope.component import getUtility
> +from zope.interface import implementer, provider
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.vulnerabilityjob import (
> + IImportVulnerabilityJob,
> + IImportVulnerabilityJobSource,
> + VulnerabilityJobException,
> + VulnerabilityJobInProgress,
> + VulnerabilityJobType,
> +)
> +from lp.bugs.model.vulnerabilityjob import (
> + VulnerabilityJob,
> + VulnerabilityJobDerived,
> +)
> +from lp.bugs.scripts.soss.models import SOSSRecord
> +from lp.bugs.scripts.soss.sossimport import SOSSImporter
> +from lp.code.interfaces.githosting import IGitHostingClient
> +from lp.code.interfaces.gitlookup import IGitLookup
> +from lp.services.config import config
> +from lp.services.database.interfaces import IPrimaryStore, IStore
> +from lp.services.job.interfaces.job import JobStatus
> +from lp.services.job.model.job import Job
> +from lp.testing import person_logged_in
> +
> +CVE_PATTERN = re.compile(r"^CVE-\d{4}-\d+$")
> +logger = logging.getLogger(__name__)
> +
> +
> +@implementer(IImportVulnerabilityJob)
> +@provider(IImportVulnerabilityJobSource)
> +class ImportVulnerabilityJob(VulnerabilityJobDerived):
> + class_job_type = VulnerabilityJobType.IMPORT_DATA
> +
> + user_error_types = (VulnerabilityJobException,)
> +
> + config = config.IImportVulnerabilityJobSource
> +
> + @property
> + def git_repository(self):
> + return self.metadata.get("git_repository")
> +
> + @property
> + def git_ref(self):
> + return self.metadata.get("git_ref")
> +
> + @property
> + def git_paths(self):
> + return self.metadata.get("git_paths")
> +
> + @property
> + def information_type(self):
> + return self.metadata.get("information_type")
> +
> + @property
> + def import_since_commit_sha1(self):
> + return self.metadata.get("import_since_commit_sha1")
> +
> + @property
> + def error_description(self):
> + return self.metadata.get("error_description")
> +
> + @classmethod
> + def create(
> + cls,
> + handler,
> + git_repository,
> + git_ref,
> + git_paths,
> + information_type,
> + import_since_commit_sha1=False,
> + ):
> + """Create a new `ImportVulnerabilityJob`.
> +
> + :param handler: What handler to use for importing the data. Can be one
> + of a group of predefined classes (SOSS, UCT, ...).
> + :param git_repository: Git repository to import from.
> + :param git_ref: Git branch/tag to get data from.
> + :param git_paths: List of relative directories within the repository to
> + get data from.
> + :param information_type: Whether imported data (bugs) should be private
> + or public. Can be one of a group of predefined options (PUBLIC,
> + PRIVATE_SECURITY...). If the source git repository is private, then
> + the information_type needs to be private also.
> + :param import_since_commit_sha1: Import data from files that were
> + altered since the given commit_sha1
> + """
> + store = IPrimaryStore(VulnerabilityJob)
> +
> + vulnerability_job = store.find(
> + VulnerabilityJob,
> + VulnerabilityJob.job_id == Job.id,
> + VulnerabilityJob.job_type == cls.class_job_type,
> + VulnerabilityJob.handler == handler,
> + ).one()
> +
> + if vulnerability_job is not None and (
> + vulnerability_job.job.status == JobStatus.WAITING
> + or vulnerability_job.job.status == JobStatus.RUNNING
> + ):
> + raise VulnerabilityJobInProgress(cls(vulnerability_job))
> +
> + # Schedule the initialization.
> + metadata = {
Agree, added :)
> + "git_repository": git_repository,
> + "git_ref": git_ref,
> + "git_paths": git_paths,
> + "information_type": information_type,
> + "import_since_commit_sha1": import_since_commit_sha1,
> + }
> +
> + vulnerability_job = VulnerabilityJob(
> + handler, cls.class_job_type, metadata
> + )
> + store.add(vulnerability_job)
> + derived_job = cls(vulnerability_job)
> + derived_job.celeryRunOnCommit()
> + IStore(VulnerabilityJob).flush()
> + return derived_job
> +
> + @classmethod
> + def get(cls, handler):
> + """See `IImportVulnerabilityJob`."""
> + vulnerability_job = (
> + IStore(VulnerabilityJob)
> + .find(
> + VulnerabilityJob,
> + VulnerabilityJob.job_id == Job.id,
> + VulnerabilityJob.job_type == cls.class_job_type,
> + VulnerabilityJob.handler == handler,
> + )
> + .one()
> + )
> + return None if vulnerability_job is None else cls(vulnerability_job)
> +
> + def __repr__(self):
> + """Returns an informative representation of the job."""
> + parts = "%s for" % self.__class__.__name__
> + parts += " handler: %s" % self.handler
> + parts += ", metadata: %s" % self.metadata
> + return "<%s>" % parts
> +
> + def _get_parser_importer(
> + self,
> + handler: VulnerabilityHandlerEnum,
> + information_type: InformationType,
> + ):
> + """Decide which parser and importer to use
> +
> + :return: a tuple of (parser, importer) where parser is the function
> + that gets a blob and returns a record and importer is the function that
> + gets a record and imports it.
> + """
> +
> + if handler == VulnerabilityHandlerEnum.SOSS:
> + parser = SOSSRecord.from_yaml
> + importer = SOSSImporter(
> + information_type=information_type
> + ).import_cve
> + else:
> + exception = VulnerabilityJobException("Handler not found")
This is done in self.notifyUserError
> + self.notifyUserError(exception)
> + raise exception
> +
> + return parser, importer
> +
> + def run(self):
> + """See `IRunnableJob`."""
> + self.metadata["result"] = {"succeeded": [], "failed": []}
I think it makes sense to improve it in a separate MP, maybe we should create a ticket for this.
> + admin = getUtility(ILaunchpadCelebrities).admin
> +
> + # InformationType is passed as a value as DBItem is not serializable
> + information_type = InformationType.items[self.information_type]
> + parser, importer = self._get_parser_importer(
> + self.context.handler, information_type
> + )
> +
> + # Get git repository
> + git_lookup = getUtility(IGitLookup)
> + repository = git_lookup.getByUrl(self.git_repository)
> + if not repository:
> + exception = VulnerabilityJobException("Git repository not found")
> + self.notifyUserError(exception)
> + raise exception
> +
> + # Get git reference
> + ref = repository.getRefByPath(self.git_ref)
> + if not ref:
> + exception = VulnerabilityJobException("Git ref not found")
> + self.notifyUserError(exception)
> + raise exception
> +
> + # turnip API call to get added/modified files
> + stats = getUtility(IGitHostingClient).getDiffStats(
> + path=self.git_repository,
> + old=self.import_since_commit_sha1,
> + new=ref.commit_sha1,
> + logger=logger,
> + )
> +
> + files = [*stats.get("added", ()), *stats.get("modified", ())]
> + for file in files:
> + # Check if files that changed are in the desired path
> + found_path = False
> + for path in self.git_paths:
> + if file.startswith(path):
> + found_path = True
> + break
> +
> + if not found_path:
> + logger.debug(
> + f"[ImportVulnerabilityJob] {file} is not in git_paths"
> + )
> + continue
> +
> + cve_sequence = file.rsplit("/", maxsplit=1)[-1]
> + if not CVE_PATTERN.match(cve_sequence):
> + logger.debug(
> + f"[ImportVulnerabilityJob] {cve_sequence} is not a CVE "
> + "sequence"
> + )
> + continue
> +
> + try:
> + logger.debug(f"[ImportVulnerabilityJob] Getting {file}")
> + blob = ref.getBlob(file)
The parser cares about that. `from_yaml` checks that the input is a yaml
> +
> + logger.debug(
> + f"[ImportVulnerabilityJob] Parsing {cve_sequence}"
> + )
> + record = parser(blob)
> +
> + # Logged as admin
> + with person_logged_in(admin):
> + bug, vulnerability = importer(record, cve_sequence)
> +
> + if bug and vulnerability:
> + self.metadata["result"]["succeeded"].append(cve_sequence)
> + else:
> + self.metadata["result"]["failed"].append(cve_sequence)
> + except Exception as e:
> + self.notifyUserError(e)
Currently I'm not using notifyUserError to send an email to the user as I did not define `getErrorRecipients`. I'm just using that function to log the error + add it to the `error_description` field in `metadata`.
Agree that doing that could be confusing, we can change it and make whatever we think that makes more sense.
> +
> + def notifyUserError(self, error):
Replied above ^
> + """Calls up and also saves the error text in this job's metadata.
> +
> + See `BaseRunnableJob`.
> + """
> + # This method is called when error is an instance of
> + # self.user_error_types.
> + super().notifyUserError(error)
> + error_description = self.metadata.get("error_description", [])
> + error_description.append(str(error))
> + self.metadata = dict(
> + self.metadata, error_description=error_description
> + )
> +
> + def getOopsVars(self):
> + """See `IRunnableJob`."""
> + vars = super().getOopsVars()
> + vars.extend(
> + [
> + ("vulnerabilityjob_job_id", self.context.id),
> + ("vulnerability_job_type", self.context.job_type.title),
> + ("handler", self.context.handler),
> + ]
> + )
> + return vars
> diff --git a/lib/lp/bugs/model/vulnerabilityjob.py b/lib/lp/bugs/model/vulnerabilityjob.py
> new file mode 100644
> index 0000000..80a0753
> --- /dev/null
> +++ b/lib/lp/bugs/model/vulnerabilityjob.py
> @@ -0,0 +1,102 @@
> +# Copyright 2025 Canonical Ltd. This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__all__ = [
> + "VulnerabilityJob",
> + "VulnerabilityJobDerived",
> +]
> +
> +from lazr.delegates import delegate_to
> +from storm.databases.postgres import JSON
> +from storm.locals import And, Int, Reference
> +from zope.interface import implementer
> +
> +from lp.app.errors import NotFoundError
> +from lp.bugs.enums import VulnerabilityHandlerEnum
> +from lp.bugs.interfaces.vulnerabilityjob import (
> + IVulnerabilityJob,
> + VulnerabilityJobType,
> +)
> +from lp.services.database.enumcol import DBEnum
> +from lp.services.database.interfaces import IStore
> +from lp.services.database.stormbase import StormBase
> +from lp.services.job.model.job import EnumeratedSubclass, Job
> +from lp.services.job.runner import BaseRunnableJob
> +
> +
> +@implementer(IVulnerabilityJob)
> +class VulnerabilityJob(StormBase):
> + """Base class for jobs related to Vulnerabilities."""
> +
> + __storm_table__ = "VulnerabilityJob"
> +
> + id = Int(primary=True)
> +
> + handler = DBEnum(enum=VulnerabilityHandlerEnum, allow_none=False)
> +
> + job_type = DBEnum(enum=VulnerabilityJobType, allow_none=False)
> +
> + job_id = Int(name="job")
> + job = Reference(job_id, Job.id)
> +
> + metadata = JSON("json_data", allow_none=False)
> +
> + def __init__(self, handler, job_type, metadata):
> + super().__init__()
> + self.job = Job()
> + self.handler = handler
> + self.job_type = job_type
> + self.metadata = metadata
> +
> + def makeDerived(self):
> + return VulnerabilityJobDerived.makeSubclass(self)
> +
> +
> +@delegate_to(IVulnerabilityJob)
> +class VulnerabilityJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
> + """Abstract class for deriving from VulnerabilityJob."""
> +
> + def __init__(self, job):
> + self.context = job
> +
> + @classmethod
> + def get(cls, job_id):
> + """Get a job by id.
> +
> + :return: the VulnerabilityJob with the specified id, as
> + the current VulnerabilityJobDerived subclass.
> + :raises: NotFoundError if there is no job with the specified id,
> + or its job_type does not match the desired subclass.
> + """
> + job = VulnerabilityJob.get(job_id)
> + if job.job_type != cls.class_job_type:
> + raise NotFoundError(
> + "No object found with id %d and type %s"
> + % (job_id, cls.class_job_type.title)
> + )
> + return cls(job)
> +
> + @classmethod
> + def iterReady(cls):
> + """Iterate through all ready VulnerabilityJob."""
> + jobs = IStore(VulnerabilityJob).find(
> + VulnerabilityJob,
> + And(
> + VulnerabilityJob.job_type == cls.class_job_type,
> + VulnerabilityJob.job == Job.id,
Sure, preparing a follow up db patch.
> + Job.id.is_in(Job.ready_jobs),
> + ),
> + )
> + return (cls(job) for job in jobs)
> +
> + def getOopsVars(self):
> + """See `IRunnableJob`."""
> + vars = super().getOopsVars()
> + vars.extend(
> + [
> + ("vulnerabilityjob_job_id", self.context.id),
> + ("vulnerability_job_type", self.context.job_type.title),
> + ("handler", self.context.handler),
> + ]
> + )
> + return vars
--
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/491368
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-vulnerabilityjob-model into launchpad:master.
References