launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #33073
[Merge] ~ines-almeida/launchpad:uct-export-handler into launchpad:master
Ines Almeida has proposed merging ~ines-almeida/launchpad:uct-export-handler into launchpad:master with ~ines-almeida/launchpad:update-bugpresence-permissions as a prerequisite.
Commit message:
Add UCT handler as option for vulnerability exports
This included adding missing methods to the UCTRecord and other minor fixes that came along the way while testing
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ines-almeida/launchpad/+git/launchpad/+merge/493617
Still WIP
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ines-almeida/launchpad:uct-export-handler into launchpad:master.
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index b7e940e..0e00d1c 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -1174,6 +1174,30 @@ class TestVulnerabilitySetExportData(TestCaseWithFactory):
self.assertIsInstance(naked_job, ExportVulnerabilityJob)
self.assertEqual(naked_job.handler, self.handler)
+ def test_exportData_multiple_handlers(self):
+ """Test that the feature flag allows to create ExportVulnerabilityJob
+ for multiple handlers.
+ """
+ self.useContext(feature_flags())
+ set_feature_flag(
+ VULNERABILITY_EXPORT_HANDLER_ENABLED_FEATURE_FLAG, "UCT SOSS"
+ )
+
+ ubuntu_team = getUtility(ILaunchpadCelebrities).ubuntu.owner
+ requester = self.factory.makePerson(member_of=(ubuntu_team,))
+ handler = VulnerabilityHandlerEnum.UCT
+ with person_logged_in(requester):
+ getUtility(IVulnerabilitySet).exportData(
+ requester,
+ handler,
+ None,
+ )
+
+ job = getUtility(IExportVulnerabilityJobSource).get(handler)
+ naked_job = removeSecurityProxy(job)
+ self.assertIsInstance(naked_job, ExportVulnerabilityJob)
+ self.assertEqual(naked_job.handler, handler)
+
def test_exportData_feature_flag_disabled(self):
"""Test that if the feature flag is disabled it raises the
NotImplementedError exception."""
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index fd5fba0..1d93b97 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -509,12 +509,19 @@ class VulnerabilitySet:
# Check requester's permissions to handler
from lp.bugs.scripts.soss.sossexport import SOSSExporter
+ from lp.bugs.scripts.uct.uctexport import UCTExporter
if handler == VulnerabilityHandlerEnum.SOSS:
distribution = getUtility(IDistributionSet).getByName(
HANDLER_DISTRIBUTION_MAP[handler]
)
exporter = SOSSExporter()
+
+ elif handler == VulnerabilityHandlerEnum.UCT:
+ distribution = getUtility(IDistributionSet).getByName(
+ HANDLER_DISTRIBUTION_MAP[handler]
+ )
+ exporter = UCTExporter()
else:
raise NotFoundError(f"{handler} not found")
diff --git a/lib/lp/bugs/scripts/uct/models.py b/lib/lp/bugs/scripts/uct/models.py
index 42331d0..8473ca2 100644
--- a/lib/lp/bugs/scripts/uct/models.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -8,6 +8,7 @@ from collections import OrderedDict, defaultdict
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
+from io import StringIO
from pathlib import Path
from typing import (
Any,
@@ -272,17 +273,11 @@ class UCTRecord(SVTRecord):
return entry
- def save(self, output_dir: Path) -> Path:
+ def to_str(self) -> str:
"""
- Save UCTRecord to a file in UCT format.
+ Export UCTRecord to a yaml str format.
"""
- if not output_dir.is_dir():
- raise ValueError(
- "{} does not exist or is not a directory", output_dir
- )
- output_path = output_dir / self.parent_dir / self.candidate
- output_path.parent.mkdir(exist_ok=True)
- output = open(str(output_path), "w")
+ output = StringIO()
if self.public_date_at_USN:
self._write_field(
"PublicDateAtUSN",
@@ -362,7 +357,20 @@ class UCTRecord(SVTRecord):
output,
)
- output.close()
+ return output.getvalue()
+
+ def save(self, output_dir: Path) -> Path:
+ """
+ Save UCTRecord to a file in UCT format.
+ """
+ if not output_dir.is_dir():
+ raise ValueError(
+ "{} does not exist or is not a directory", output_dir
+ )
+ output_path = output_dir / self.parent_dir / self.candidate
+ output_path.parent.mkdir(exist_ok=True)
+ yaml_content = self.to_str()
+ output_path.write_text(yaml_content)
return output_path
@classmethod
diff --git a/lib/lp/bugs/scripts/uct/tests/test_uct.py b/lib/lp/bugs/scripts/uct/tests/test_uct.py
index 7b022c0..c5c0ccd 100644
--- a/lib/lp/bugs/scripts/uct/tests/test_uct.py
+++ b/lib/lp/bugs/scripts/uct/tests/test_uct.py
@@ -278,6 +278,13 @@ class TestUCTRecord(TestCase):
self.record.parent_dir = "tmp"
self.assertEqual(record.__dict__, self.record.__dict__)
+ def test_to_str(self):
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
+ with open(load_from) as f:
+ expected_record = f.read()
+ record_str = self.record.to_str()
+ self.assertEqual(expected_record, record_str)
+
class TestCVE(TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -736,6 +743,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
# Note: The ubuntu-esm distribution does not have any source packages
# published.
+ assignee = self.factory.makePerson()
self.lp_cve = self.factory.makeCVE("2022-23222")
self.cve = CVE(
sequence="CVE-2022-23222",
@@ -830,7 +838,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
],
importance=BugTaskImportance.MEDIUM,
status=VulnerabilityStatus.ACTIVE,
- assignee=self.factory.makePerson(),
+ assignee=assignee,
discovered_by="tr3e wang",
description="description",
ubuntu_description="ubuntu-description",
@@ -881,6 +889,127 @@ class TestUCTImporterExporter(TestCaseWithFactory):
],
global_tags={"cisa-kev"},
)
+
+ self.uct_record = UCTRecord(
+ assigned_to=assignee.name,
+ bugs=["https://github.com/mm2/Little-CMS/issues/29"],
+ candidate="CVE-2022-23222",
+ crd=datetime(2020, 1, 14, 8, 15, tzinfo=timezone.utc),
+ cvss=[
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
+ ),
+ ),
+ ],
+ description="description",
+ discovered_by="tr3e wang",
+ global_tags={"cisa-kev"},
+ mitigation="mitigation",
+ notes="author> text",
+ packages=[
+ UCTRecord.Package(
+ name=self.ubuntu_package.sourcepackagename.name,
+ statuses=[
+ UCTRecord.SeriesPackageStatus(
+ series="focal",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="released",
+ priority=UCTRecord.Priority.HIGH,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="jammy",
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="does not exist",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
+ reason="not affected",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="fix released",
+ priority=UCTRecord.Priority.HIGH,
+ ),
+ ],
+ priority=UCTRecord.Priority.LOW,
+ tags={"review-break-fix"},
+ patches=[
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/123 (1.4.4)" # noqa: E501
+ ),
+ ),
+ UCTRecord.Patch(
+ patch_type="break-fix",
+ entry=(
+ "457f44363a8894135c85b7a9afd2bd8196db24ab "
+ "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
+ "local-CVE-2022-23222-fix"
+ ),
+ ),
+ ],
+ ),
+ UCTRecord.Package(
+ name=self.esm_package.sourcepackagename.name,
+ statuses=[
+ UCTRecord.SeriesPackageStatus(
+ series="precise/esm",
+ status=UCTRecord.PackageStatus.IGNORED,
+ reason="ignored",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="trusty/esm",
+ status=UCTRecord.PackageStatus.NEEDS_TRIAGE,
+ reason="needs triage",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.IGNORED,
+ reason="ignored",
+ priority=UCTRecord.Priority.LOW,
+ ),
+ ],
+ priority=None,
+ tags={"universe-binary"},
+ patches=[
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/456" # noqa: E501
+ ),
+ ),
+ UCTRecord.Patch(
+ patch_type="break-fix",
+ entry=(
+ "457f44363a8894135c85b7a9afd2bd8196db24ab "
+ "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
+ "local-CVE-2022-23222-fix"
+ ),
+ ),
+ ],
+ ),
+ ],
+ parent_dir="active",
+ priority=UCTRecord.Priority.MEDIUM,
+ priority_explanation="",
+ public_date=datetime(2022, 1, 14, 8, 15, tzinfo=timezone.utc),
+ public_date_at_USN=datetime(
+ 2021, 1, 14, 8, 15, tzinfo=timezone.utc
+ ),
+ references=["https://ubuntu.com/security/notices/USN-5368-1"],
+ ubuntu_description="ubuntu-description",
+ )
+
self.importer = UCTImporter(self.ubuntu)
self.exporter = UCTExporter()
@@ -1624,14 +1753,10 @@ class TestUCTImporterExporter(TestCaseWithFactory):
def test_import_cve_from_file(self):
uct_record = self.cve.to_uct_record()
- import tempfile
- with tempfile.TemporaryDirectory() as tmpdir:
- cve_path = uct_record.save(Path(tmpdir))
- self.importer.import_cve_from_file(cve_path)
+ cve_path = uct_record.save(Path(self.makeTemporaryDirectory()))
+ bug, _ = self.importer.import_cve_from_file(cve_path)
- bug = self.importer._find_existing_bug(self.lp_cve, self.ubuntu)
- self.importer._find_existing_vulnerability(self.lp_cve, self.ubuntu)
self.checkBug(bug, self.cve)
self.checkVulnerabilities(bug, self.cve)
@@ -1666,3 +1791,12 @@ class TestUCTImporterExporter(TestCaseWithFactory):
)
self.assertEqual(bug, bug_copy)
self.assertEqual(vulnerability, vulnerability_copy)
+
+ def test_exporter_to_record(self):
+ """Test to_record returns expected UCTRecord"""
+ bug, vulnerability = self.importer.import_cve(self.cve)
+
+ uct_record = self.exporter.to_record(bug, vulnerability)
+
+ self.assertListEqual(self.uct_record.packages, uct_record.packages)
+ self.assertDictEqual(self.uct_record.__dict__, uct_record.__dict__)
diff --git a/lib/lp/bugs/scripts/uct/uctexport.py b/lib/lp/bugs/scripts/uct/uctexport.py
index dfad576..5b02917 100644
--- a/lib/lp/bugs/scripts/uct/uctexport.py
+++ b/lib/lp/bugs/scripts/uct/uctexport.py
@@ -13,17 +13,19 @@ from lp.bugs.interfaces.bug import IBugSet
from lp.bugs.interfaces.bugattachment import BugAttachmentType
from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
-from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
from lp.bugs.scripts.svthandler import SVTExporter
-from lp.bugs.scripts.uct.models import CVE, CVSS
+from lp.bugs.scripts.uct.models import CVE, CVSS, UCTRecord
from lp.bugs.scripts.uct.uctimport import UCTImporter
+from lp.registry.interfaces.role import IPersonRoles
+from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
from lp.registry.model.distributionsourcepackage import (
DistributionSourcePackage,
)
from lp.registry.model.product import Product
from lp.registry.model.sourcepackage import SourcePackage
from lp.registry.model.sourcepackagename import SourcePackageName
+from lp.registry.security import SecurityAdminDistribution
__all__ = [
"UCTExporter",
@@ -43,6 +45,32 @@ class UCTExporter(SVTExporter):
description: str
references: List[str]
+ def to_record(
+ self,
+ bug: BugModel,
+ vulnerability: Vulnerability,
+ ) -> UCTRecord:
+ """
+ Export the bug and vulnerability related to a cve in a distribution
+ and return a `CVE` instance.
+
+ :param bug: `Bug` model
+ :param vulnerability: `Vulnerability` model
+ :return: `CVE` instance
+ """
+ if bug is None:
+ raise ValueError("Bug can't be None")
+ if vulnerability is None:
+ raise ValueError("Vulnerability can't be None")
+
+ cve = self._import_cve(bug, vulnerability)
+ return cve.to_uct_record()
+
+ def checkUserPermissions(self, user, distribution):
+ return SecurityAdminDistribution(distribution).checkAuthenticated(
+ IPersonRoles(user)
+ )
+
def export_bug_to_uct_file(
self, bug_id: int, output_dir: Path
) -> Optional[Path]:
@@ -85,13 +113,35 @@ class UCTExporter(SVTExporter):
raise ValueError(
f"Bug with ID: {bug.id} does not have vulnerabilities"
)
+
vulnerability: Vulnerability = vulnerabilities[0]
if not vulnerability.cve:
raise ValueError(
"Bug with ID: {} - vulnerability "
"is not linked to a CVE".format(bug.id)
)
- lp_cve: CveModel = vulnerability.cve
+
+ return self._import_cve(bug, vulnerability)
+
+ def _import_cve(
+ self,
+ bug: BugModel,
+ vulnerability: Vulnerability,
+ ) -> CVE:
+ """
+ Create a `CVE` instances from a `Bug` model and the related
+ Vulnerabilities and `Cve`.
+
+ `BugTasks` are converted to `CVE.DistroPackage` and `CVE.SeriesPackage`
+ objects.
+
+ Other `CVE` fields are populated from the information contained in the
+ `Bug`, its related Vulnerabilities and LP `Cve` model.
+
+ :param bug: `Bug` model to import
+ :param vulnerability: `Vulnerability` model
+ :return: `CVE` instance
+ """
parsed_description = self._parse_bug_description(bug.description)
@@ -207,9 +257,6 @@ class UCTExporter(SVTExporter):
)
)
- packages_by_name = {
- p.name: p for p in package_name_by_product.values()
- }
patch_urls = []
for attachment in bug.attachments:
if attachment.url:
@@ -226,10 +273,13 @@ class UCTExporter(SVTExporter):
):
continue
+ package_name = getUtility(ISourcePackageNameSet).queryByName(
+ attachment.title
+ )
for patch in attachment.vulnerability_patches:
patch_urls.append(
CVE.PatchURL(
- package_name=packages_by_name.get(attachment.title),
+ package_name=package_name,
type=patch["name"],
url=patch["value"],
notes=patch["comment"],
@@ -247,6 +297,8 @@ class UCTExporter(SVTExporter):
)
)
+ lp_cve = vulnerability.cve
+
return CVE(
sequence=f"CVE-{lp_cve.sequence}",
date_made_public=vulnerability.date_made_public,
diff --git a/lib/lp/bugs/scripts/uct/uctimport.py b/lib/lp/bugs/scripts/uct/uctimport.py
index 6c51041..dbc080b 100644
--- a/lib/lp/bugs/scripts/uct/uctimport.py
+++ b/lib/lp/bugs/scripts/uct/uctimport.py
@@ -88,7 +88,9 @@ class UCTImporter(SVTImporter):
self.ubuntu = ubuntu
self.information_type = information_type
- def import_cve_from_file(self, cve_path: Path) -> None:
+ def import_cve_from_file(
+ self, cve_path: Path
+ ) -> Tuple[BugModel, Vulnerability]:
"""
Import a UCT CVE record from a file located at `cve_path`.
@@ -97,7 +99,7 @@ class UCTImporter(SVTImporter):
logger.info("Importing %s", cve_path)
uct_record = UCTRecord.load(cve_path)
cve = CVE.make_from_uct_record(uct_record)
- self.import_cve(cve)
+ return self.import_cve(cve)
def from_record(
self, record: UCTRecord, cve_sequence: str
diff --git a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
index 1d8ea25..8408a40 100644
--- a/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
+++ b/lib/lp/bugs/tests/test_exportvulnerabilityjob.py
@@ -4,6 +4,7 @@
from pathlib import Path
import transaction
+from testscenarios.testcase import WithScenarios
from testtools.matchers import MatchesRegex
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -19,6 +20,7 @@ from lp.bugs.interfaces.vulnerabilityjob import (
)
from lp.bugs.model.exportvulnerabilityjob import ExportVulnerabilityJob
from lp.bugs.scripts.soss.sossimport import SOSSImporter
+from lp.bugs.scripts.uctimport import UCTImporter
from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.tests import block_on_job
@@ -27,26 +29,107 @@ from lp.testing import TestCaseWithFactory
from lp.testing.layers import CelerySlowJobLayer, LaunchpadZopelessLayer
-class ExportVulnerabilityJobTests(TestCaseWithFactory):
+class ExportVulnerabilityJobTests(WithScenarios, TestCaseWithFactory):
"""Test case for ImportVulnerabilityJob."""
layer = LaunchpadZopelessLayer
+ scenarios = [
+ ("soss", {"handler_name": "SOSS"}),
+ ("uct", {"handler_name": "UCT"}),
+ ]
+
def setUp(self):
super().setUp()
self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
self.cve_set = getUtility(ICveSet)
+ if self.handler_name == "SOSS":
+ self.handler = VulnerabilityHandlerEnum.SOSS
+ self.distribution = removeSecurityProxy(
+ self.factory.makeDistribution(
+ name="soss",
+ displayname="SOSS",
+ information_type=InformationType.PROPRIETARY,
+ )
+ )
+ elif self.handler_name == "UCT":
+ self.handler = VulnerabilityHandlerEnum.UCT
+ celebrities = getUtility(ILaunchpadCelebrities)
+ self.distribution = removeSecurityProxy(celebrities.ubuntu)
+
+ # Create objects to match the sample data
+ series_names = [
+ "artful",
+ "bionic",
+ "cosmic",
+ "dapper",
+ "disco",
+ "edgy",
+ "eoan",
+ "feisty",
+ "focal",
+ "groovy",
+ "gutsy",
+ "hardy",
+ "hirsute",
+ "impish",
+ "intrepid",
+ "jammy",
+ "jaunty",
+ "karmic",
+ "kinetic",
+ "lucid",
+ "maverick",
+ "natty",
+ "oneiric",
+ "precise",
+ "quantal",
+ "raring",
+ "saucy",
+ "trusty",
+ "utopic",
+ "vivid",
+ "wily",
+ "xenial",
+ "yakkety",
+ "zesty",
+ ]
+ for name in series_names:
+ self.factory.makeDistroSeries(
+ name=name,
+ distribution=self.distribution,
+ )
+
+ distroseries = self.factory.makeDistroSeries(
+ name="questing",
+ distribution=self.distribution,
+ )
+
+ package_names = [
+ "linux",
+ "linux-hwe",
+ "gnupg2",
+ "xine-ui",
+ "gnupg",
+ "gbrowse",
+ ]
+ for name in package_names:
+ sourcepackagename = self.factory.makeSourcePackageName(name)
+ self.factory.makeSourcePackage(
+ distroseries=distroseries,
+ publish=True,
+ sourcepackagename=sourcepackagename,
+ )
+
@property
def job_source(self):
return getUtility(IExportVulnerabilityJobSource)
def test_getOopsVars(self):
"""Test getOopsVars method."""
- handler = VulnerabilityHandlerEnum.SOSS
-
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
vars = job.getOopsVars()
naked_job = removeSecurityProxy(job)
self.assertIn(("vulnerabilityjob_job_id", naked_job.id), vars)
@@ -57,8 +140,6 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
def test___repr__(self):
"""Test __repr__ method."""
- handler = VulnerabilityHandlerEnum.SOSS
-
metadata = {
"request": {
"sources": [],
@@ -73,11 +154,11 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
},
}
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
expected = (
"<ExportVulnerabilityJob for "
- f"handler: {handler}, "
+ f"handler: {self.handler}, "
f"metadata: {metadata}"
">"
)
@@ -87,21 +168,19 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""If there's already a waiting/running ExportVulnerabilityJob for the
handler ExportVulnerabilityJob.create() raises an exception.
"""
- handler = VulnerabilityHandlerEnum.SOSS
-
# Job waiting status
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
waiting_exception = self.assertRaises(
VulnerabilityJobInProgress,
self.job_source.create,
- handler,
+ self.handler,
)
self.assertEqual(job, waiting_exception.job)
# Job status from WAITING to RUNNING
job.start()
running_exception = self.assertRaises(
- VulnerabilityJobInProgress, self.job_source.create, handler
+ VulnerabilityJobInProgress, self.job_source.create, self.handler
)
self.assertEqual(job, running_exception.job)
@@ -109,14 +188,12 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""If there's already a completed ExportVulnerabilityJob for the
handler the job can be runned again.
"""
- handler = VulnerabilityHandlerEnum.SOSS
-
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
job.start()
job.complete()
self.assertEqual(job.status, JobStatus.COMPLETED)
- job_duplicated = self.job_source.create(handler)
+ job_duplicated = self.job_source.create(self.handler)
job_duplicated.start()
job_duplicated.complete()
self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
@@ -125,14 +202,12 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""If there's a failed ExportVulnerabilityJob for the handler the job
can be runned again.
"""
- handler = VulnerabilityHandlerEnum.SOSS
-
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
job.start()
job.fail()
self.assertEqual(job.status, JobStatus.FAILED)
- job_duplicated = self.job_source.create(handler)
+ job_duplicated = self.job_source.create(self.handler)
job_duplicated.start()
job_duplicated.complete()
self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
@@ -141,14 +216,12 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""If there's a suspended ExportVulnerabilityJob for the handler the
job can be runned again.
"""
- handler = VulnerabilityHandlerEnum.SOSS
-
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
job.start()
job.suspend()
self.assertEqual(job.status, JobStatus.SUSPENDED)
- job_duplicated = self.job_source.create(handler)
+ job_duplicated = self.job_source.create(self.handler)
job_duplicated.start()
job_duplicated.complete()
self.assertEqual(job_duplicated.status, JobStatus.COMPLETED)
@@ -157,8 +230,6 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""Test that ExportVulnerabilityJob specified with arguments can
be gotten out again."""
- handler = VulnerabilityHandlerEnum.SOSS
-
metadata = {
"request": {
"sources": ["https://launchpad.net/ubuntu"],
@@ -174,12 +245,12 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
}
job = self.job_source.create(
- handler,
+ self.handler,
sources=["https://launchpad.net/ubuntu"],
)
naked_job = removeSecurityProxy(job)
- self.assertEqual(naked_job.handler, handler)
+ self.assertEqual(naked_job.handler, self.handler)
self.assertEqual(naked_job.metadata, metadata)
@@ -193,30 +264,24 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
vulnerabilities for this test to pass. If they ever get added, the
test will need to be updated to use a different approach.
"""
-
- self.factory.makeDistribution(name="soss")
-
- job = self.job_source.create(handler=VulnerabilityHandlerEnum.SOSS)
+ job = self.job_source.create(handler=self.handler)
self.assertRaisesWithContent(
VulnerabilityJobException, "No CVEs to export", job.run
)
- def _put_cve_in_soss(self):
+ def _create_test_cve(self):
self.factory.makePerson(name="octagalland")
- soss = removeSecurityProxy(
- self.factory.makeDistribution(
- name="soss",
- displayname="SOSS",
- information_type=InformationType.PROPRIETARY,
- )
- )
- sampledata = (
- Path(__file__).parent.parent / "scripts/soss/tests/sampledata"
- )
+ if self.handler_name == "SOSS":
+ sampledata_path = "scripts/soss/tests/sampledata"
+ importer = SOSSImporter(self.distribution)
- soss_importer = SOSSImporter(soss)
+ elif self.handler_name == "UCT":
+ sampledata_path = "scripts/uct/tests/sampledata"
+ importer = UCTImporter(self.distribution)
+
+ sampledata = Path(__file__).parent.parent / sampledata_path
imported_list = []
for file in sampledata.iterdir():
@@ -224,14 +289,14 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
if not self.cve_set[cve_sequence]:
self.factory.makeCVE(sequence=cve_sequence)
- bug, vulnerability = soss_importer.import_cve_from_file(file)
+ bug, vulnerability = importer.import_cve_from_file(file)
imported_list.append((cve_sequence, bug, vulnerability))
return imported_list
def test_run_export(self):
"""Run ExportVulnerabilityJob."""
- imported_list = self._put_cve_in_soss()
+ imported_list = self._create_test_cve()
export_link = "http://example.com/fake-url"
self.patch(
@@ -240,9 +305,7 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
lambda self: export_link,
)
- job = self.job_source.create(
- handler=VulnerabilityHandlerEnum.SOSS,
- )
+ job = self.job_source.create(handler=self.handler)
job.run()
@@ -275,13 +338,12 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
"""ExportVulnerabilityJob.get() returns the import job for the given
handler.
"""
- handler = VulnerabilityHandlerEnum.SOSS
# There is no job before creating it
- self.assertIs(None, self.job_source.get(handler))
+ self.assertIs(None, self.job_source.get(self.handler))
- job = self.job_source.create(handler)
- job_gotten = self.job_source.get(handler)
+ job = self.job_source.create(self.handler)
+ job_gotten = self.job_source.get(self.handler)
self.assertIsInstance(job, ExportVulnerabilityJob)
self.assertEqual(job, job_gotten)
@@ -289,45 +351,124 @@ class ExportVulnerabilityJobTests(TestCaseWithFactory):
def test_error_description_when_no_error(self):
"""The ExportVulnerabilityJob.error_description property returns
None when no error description is recorded."""
- handler = VulnerabilityHandlerEnum.SOSS
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
self.assertEqual([], removeSecurityProxy(job).error_description)
def test_error_description_set_when_notifying_about_user_errors(self):
"""Test that error_description is set by notifyUserError()."""
- handler = VulnerabilityHandlerEnum.SOSS
- job = self.job_source.create(handler)
+ job = self.job_source.create(self.handler)
message = "This is an example message."
job.notifyUserError(VulnerabilityJobException(message))
self.assertEqual([message], removeSecurityProxy(job).error_description)
-class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
+class ExportVulnerabilityTestViaCelery(WithScenarios, TestCaseWithFactory):
layer = CelerySlowJobLayer
+ scenarios = [
+ ("soss", {"handler_name": "SOSS"}),
+ ("uct", {"handler_name": "UCT"}),
+ ]
+
def setUp(self):
super().setUp()
self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
self.cve_set = getUtility(ICveSet)
- def _put_cve_in_soss(self):
- self.factory.makePerson(name="octagalland")
- soss = removeSecurityProxy(
- self.factory.makeDistribution(
- name="soss",
- displayname="SOSS",
- information_type=InformationType.PROPRIETARY,
+ if self.handler_name == "SOSS":
+ self.handler = VulnerabilityHandlerEnum.SOSS
+ self.distribution = removeSecurityProxy(
+ self.factory.makeDistribution(
+ name="soss",
+ displayname="SOSS",
+ information_type=InformationType.PROPRIETARY,
+ )
)
- )
- sampledata = (
- Path(__file__).parent.parent / "scripts/soss/tests/sampledata"
- )
+ elif self.handler_name == "UCT":
+ self.handler = VulnerabilityHandlerEnum.UCT
+ celebrities = getUtility(ILaunchpadCelebrities)
+ self.distribution = removeSecurityProxy(celebrities.ubuntu)
+
+ # Create objects to match the sample data
+ series_names = [
+ "artful",
+ "bionic",
+ "cosmic",
+ "dapper",
+ "disco",
+ "edgy",
+ "eoan",
+ "feisty",
+ "focal",
+ "groovy",
+ "gutsy",
+ "hardy",
+ "hirsute",
+ "impish",
+ "intrepid",
+ "jammy",
+ "jaunty",
+ "karmic",
+ "kinetic",
+ "lucid",
+ "maverick",
+ "natty",
+ "oneiric",
+ "precise",
+ "quantal",
+ "raring",
+ "saucy",
+ "trusty",
+ "utopic",
+ "vivid",
+ "wily",
+ "xenial",
+ "yakkety",
+ "zesty",
+ ]
+ for name in series_names:
+ self.factory.makeDistroSeries(
+ name=name,
+ distribution=self.distribution,
+ )
+
+ distroseries = self.factory.makeDistroSeries(
+ name="questing",
+ distribution=self.distribution,
+ )
+
+ package_names = [
+ "linux",
+ "linux-hwe",
+ "gnupg2",
+ "xine-ui",
+ "gnupg",
+ "gbrowse",
+ ]
+ for name in package_names:
+ sourcepackagename = self.factory.makeSourcePackageName(name)
+ self.factory.makeSourcePackage(
+ distroseries=distroseries,
+ publish=True,
+ sourcepackagename=sourcepackagename,
+ )
+
+ def _create_test_cve(self):
+ self.factory.makePerson(name="octagalland")
+
+ if self.handler_name == "SOSS":
+ sampledata_path = "scripts/soss/tests/sampledata"
+ importer = SOSSImporter(self.distribution)
+
+ elif self.handler_name == "UCT":
+ sampledata_path = "scripts/uct/tests/sampledata"
+ importer = UCTImporter(self.distribution)
- soss_importer = SOSSImporter(soss)
+ sampledata = Path(__file__).parent.parent / sampledata_path
imported_list = []
for file in sampledata.iterdir():
@@ -337,7 +478,7 @@ class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
if not self.cve_set[cve_sequence]:
self.factory.makeCVE(sequence=cve_sequence)
- bug, vulnerability = soss_importer.import_cve_from_file(file)
+ bug, vulnerability = importer.import_cve_from_file(file)
imported_list.append((cve_sequence, bug, vulnerability))
return imported_list
@@ -351,21 +492,20 @@ class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
)
self.useFixture(fixture)
- imported_list = self._put_cve_in_soss()
+ imported_list = self._create_test_cve()
transaction.commit()
job_source = getUtility(IExportVulnerabilityJobSource)
- handler = VulnerabilityHandlerEnum.SOSS
with block_on_job():
- job_source.create(handler)
+ job_source.create(self.handler)
transaction.commit()
cve_names = [
f"CVE-{cve_sequence}" for cve_sequence, _, _ in imported_list
]
- job = job_source.get(handler)
+ job = job_source.get(self.handler)
naked_job_metadata = removeSecurityProxy(job.metadata)
naked_job_metadata["result"]["succeeded"].sort()
@@ -381,7 +521,7 @@ class ExportVulnerabilityTestViaCelery(TestCaseWithFactory):
"failed": [],
}
- self.assertEqual(handler, job.handler)
+ self.assertEqual(self.handler, job.handler)
self.assertEqual(metadata_request, naked_job_metadata["request"])
self.assertEqual(metadata_result, naked_job_metadata["result"])
Follow ups