launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #32963
[Merge] ~enriqueesanchz/launchpad:add-uct-import-handler into launchpad:master
Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-uct-import-handler into launchpad:master.
Commit message:
Add UCT handler to ImportVulnerabilityJob
This also adds that handler to the `Vulnerability.importData` endpoint.
Refactor handlers so they implement the same svthandler interfaces.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/492385
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-uct-import-handler into launchpad:master.
diff --git a/lib/lp/bugs/enums.py b/lib/lp/bugs/enums.py
index be1a968..e4b16cf 100644
--- a/lib/lp/bugs/enums.py
+++ b/lib/lp/bugs/enums.py
@@ -181,3 +181,13 @@ class VulnerabilityHandlerEnum(DBEnumeratedType):
exports.
""",
)
+
+ UCT = DBItem(
+ 2,
+ """
+ UCT Handler
+
+ Specific handler to use for UCT vulnerability data imports and
+ exports.
+ """,
+ )
diff --git a/lib/lp/bugs/model/importvulnerabilityjob.py b/lib/lp/bugs/model/importvulnerabilityjob.py
index 4ef6e08..c023c33 100644
--- a/lib/lp/bugs/model/importvulnerabilityjob.py
+++ b/lib/lp/bugs/model/importvulnerabilityjob.py
@@ -28,8 +28,11 @@ from lp.bugs.model.vulnerabilityjob import (
from lp.bugs.scripts.soss.models import SOSSRecord
from lp.bugs.scripts.soss.sossimport import SOSSImporter
from lp.code.errors import GitRepositoryScanFault
+from lp.bugs.scripts.uct.models import UCTRecord
+from lp.bugs.scripts.uct.uctimport import UCTImporter
from lp.code.interfaces.githosting import IGitHostingClient
from lp.code.interfaces.gitlookup import IGitLookup
+from lp.registry.interfaces.distribution import IDistributionSet
from lp.services.config import config
from lp.services.database.interfaces import IPrimaryStore, IStore
from lp.services.job.interfaces.job import JobStatus
@@ -172,10 +175,19 @@ class ImportVulnerabilityJob(VulnerabilityJobDerived):
"""
if handler == VulnerabilityHandlerEnum.SOSS:
- parser = SOSSRecord.from_yaml
+ distribution = removeSecurityProxy(
+ getUtility(IDistributionSet).getByName("soss")
+ )
+ parser = SOSSRecord
importer = SOSSImporter(
- information_type=information_type
- ).import_cve
+ distribution, information_type=information_type
+ )
+ elif handler == VulnerabilityHandlerEnum.UCT:
+ distribution = getUtility(IDistributionSet).getByName("ubuntu")
+ parser = UCTRecord
+ importer = UCTImporter(
+ distribution, information_type=information_type
+ )
else:
exception = VulnerabilityJobException("Handler not found")
self.notifyUserError(exception)
@@ -252,9 +264,9 @@ class ImportVulnerabilityJob(VulnerabilityJobDerived):
logger.debug(
f"[ImportVulnerabilityJob] Parsing {cve_sequence}"
)
- record = parser(blob)
+ record = parser.from_str(blob)
- bug, vulnerability = importer(record, cve_sequence)
+ bug, vulnerability = importer.from_record(record, cve_sequence)
if bug and vulnerability:
self.metadata["result"]["succeeded"].append(cve_sequence)
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index effa984..9444b38 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -709,6 +709,48 @@ class TestVulnerabilitySetImportData(TestCaseWithFactory):
)
self.assertEqual(naked_job.import_since_commit_sha1, None)
+ def test_importData_uct_handler(self):
+ """Test that we can create a ImportVulnerabilityJob using importData
+ method and the UCT handler.
+ """
+ self.useContext(feature_flags())
+ set_feature_flag(VULNERABILITY_IMPORT_ENABLED_FEATURE_FLAG, "true")
+
+ repo = self.factory.makeGitRepository(
+ owner=self.team, information_type=InformationType.USERDATA
+ )
+
+ ubuntu_team = getUtility(ILaunchpadCelebrities).ubuntu.owner
+ requester = self.factory.makePerson(member_of=(ubuntu_team,))
+
+ self.factory.makeGitRefs(
+ repository=repo,
+ paths=[self.git_ref],
+ )
+
+ handler = VulnerabilityHandlerEnum.UCT
+ with person_logged_in(self.requester):
+ getUtility(IVulnerabilitySet).importData(
+ requester,
+ handler,
+ repo,
+ self.git_ref,
+ self.git_paths,
+ self.information_type,
+ import_since_commit_sha1=None,
+ )
+
+ job = getUtility(IImportVulnerabilityJobSource).get(handler)
+ naked_job = removeSecurityProxy(job)
+ self.assertIsInstance(naked_job, ImportVulnerabilityJob)
+ self.assertEqual(naked_job.git_repository, repo.id)
+ self.assertEqual(naked_job.git_ref, self.git_ref)
+ self.assertEqual(naked_job.git_paths, self.git_paths)
+ self.assertEqual(
+ naked_job.information_type, self.information_type.value
+ )
+ self.assertEqual(naked_job.import_since_commit_sha1, None)
+
def test_importData_feature_flag_disabled(self):
"""Test that if the feature flag is disabled it raises the
NotImplementedError exception."""
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index c337b8f..b643eba 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -49,6 +49,7 @@ from lp.registry.interfaces.accesspolicy import (
IAccessArtifactGrantSource,
IAccessArtifactSource,
)
+from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.role import IPersonRoles
from lp.registry.model.accesspolicy import reconcile_access_for_artifacts
from lp.registry.model.person import Person
@@ -415,10 +416,16 @@ class VulnerabilitySet:
# permissions to git repo
# Check requester's permissions to handler
- from lp.bugs.scripts.soss.sossimport import SOSSImporter
-
if handler == VulnerabilityHandlerEnum.SOSS:
- importer = SOSSImporter()
+ from lp.bugs.scripts.soss.sossimport import SOSSImporter
+
+ distribution = getUtility(IDistributionSet).getByName("soss")
+ importer = SOSSImporter(distribution)
+ elif handler == VulnerabilityHandlerEnum.UCT:
+ from lp.bugs.scripts.uct.uctimport import UCTImporter
+
+ distribution = getUtility(IDistributionSet).getByName("ubuntu")
+ importer = UCTImporter(distribution)
else:
raise NotFoundError(f"{handler} not found")
diff --git a/lib/lp/bugs/scripts/soss/models.py b/lib/lp/bugs/scripts/soss/models.py
index f8fe1a9..dc97731 100644
--- a/lib/lp/bugs/scripts/soss/models.py
+++ b/lib/lp/bugs/scripts/soss/models.py
@@ -9,6 +9,8 @@ from typing import Any, Dict, List, Optional
import yaml
from packaging.version import Version
+from lp.bugs.scripts.svthandler import SVTRecord
+
__all__ = [
"SOSSRecord",
]
@@ -18,7 +20,7 @@ VALID_CHANNEL_REGEX = re.compile(r"^(focal|jammy|noble):[^/]+/stable$")
@dataclass
-class SOSSRecord:
+class SOSSRecord(SVTRecord):
class PriorityEnum(Enum):
NEEDS_TRIAGE = "Needs-triage"
@@ -122,6 +124,10 @@ class SOSSRecord:
public_date: Optional[datetime] = None
@classmethod
+ def from_str(cls, string: str) -> "SOSSRecord":
+ return cls.from_yaml(string)
+
+ @classmethod
def from_yaml(cls, yaml_str: str) -> "SOSSRecord":
raw: Dict[str, Any] = yaml.safe_load(yaml_str)
if not isinstance(raw, dict):
diff --git a/lib/lp/bugs/scripts/soss/sossexport.py b/lib/lp/bugs/scripts/soss/sossexport.py
index 9a75eb6..adeae9f 100644
--- a/lib/lp/bugs/scripts/soss/sossexport.py
+++ b/lib/lp/bugs/scripts/soss/sossexport.py
@@ -18,6 +18,7 @@ from lp.bugs.scripts.soss.sossimport import (
PACKAGE_TYPE_MAP,
PRIORITY_ENUM_MAP,
)
+from lp.bugs.scripts.svthandler import SVTExporter
from lp.registry.model.distribution import Distribution
__all__ = [
@@ -34,7 +35,7 @@ PACKAGE_TYPE_MAP_REVERSE = {v: k for k, v in PACKAGE_TYPE_MAP.items()}
PACKAGE_STATUS_MAP_REVERSE = {v: k for k, v in PACKAGE_STATUS_MAP.items()}
-class SOSSExporter:
+class SOSSExporter(SVTExporter):
"""
SOSSExporter is used to export Launchpad Vulnerability data to SOSS CVE
files.
diff --git a/lib/lp/bugs/scripts/soss/sossimport.py b/lib/lp/bugs/scripts/soss/sossimport.py
index 9f59e92..b50b9db 100644
--- a/lib/lp/bugs/scripts/soss/sossimport.py
+++ b/lib/lp/bugs/scripts/soss/sossimport.py
@@ -13,7 +13,6 @@ from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.app.enums import InformationType
-from lp.app.errors import NotFoundError
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
@@ -28,7 +27,7 @@ from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
from lp.bugs.scripts.soss.models import SOSSRecord
-from lp.registry.interfaces.distribution import IDistributionSet
+from lp.bugs.scripts.svthandler import SVTImporter
from lp.registry.interfaces.externalpackage import ExternalPackageType
from lp.registry.interfaces.person import IPersonSet
from lp.registry.interfaces.role import IPersonRoles
@@ -78,13 +77,14 @@ PACKAGE_STATUS_MAP = {
DISTRIBUTION_NAME = "soss"
-class SOSSImporter:
+class SOSSImporter(SVTImporter):
"""
SOSSImporter is used to import SOSS CVE files to Launchpad database.
"""
def __init__(
self,
+ distribution: Distribution,
information_type: InformationType = InformationType.PROPRIETARY,
dry_run: bool = False,
) -> None:
@@ -97,13 +97,7 @@ class SOSSImporter:
self.vulnerability_set = getUtility(IVulnerabilitySet)
self.bug_set = getUtility(IBugSet)
self.cve_set = getUtility(ICveSet)
- self.soss = removeSecurityProxy(
- getUtility(IDistributionSet).getByName(DISTRIBUTION_NAME)
- )
-
- if self.soss is None:
- logger.error("[SOSSImporter] SOSS distribution not found")
- raise NotFoundError("SOSS distribution not found")
+ self.soss = distribution
def import_cve_from_file(
self, cve_path: str
@@ -115,10 +109,10 @@ class SOSSImporter:
with open(cve_path, encoding="utf-8") as file:
soss_record = SOSSRecord.from_yaml(file)
- bug, vulnerability = self.import_cve(soss_record, cve_sequence)
+ bug, vulnerability = self.from_record(soss_record, cve_sequence)
return bug, vulnerability
- def import_cve(
+ def from_record(
self, soss_record: SOSSRecord, cve_sequence: str
) -> Tuple[BugModel, Vulnerability]:
"""Import CVE from SOSS record."""
@@ -496,6 +490,7 @@ class SOSSImporter:
return packagetype, package
def checkUserPermissions(self, user):
+ """See `SVTImporter`."""
return SecurityAdminDistribution(self.soss).checkAuthenticated(
IPersonRoles(user)
)
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
index 6c1e901..07802dc 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossexport.py
@@ -31,7 +31,7 @@ class TestSOSSExporter(TestCaseWithFactory):
)
self._makeCves()
- self.soss_importer = SOSSImporter()
+ self.soss_importer = SOSSImporter(self.soss)
self.soss_exporter = SOSSExporter()
def _makeCves(self):
@@ -71,7 +71,7 @@ class TestSOSSExporter(TestCaseWithFactory):
def test_to_record(self):
"""Test that imported and exported SOSSRecords match."""
soss_importer = SOSSImporter(
- information_type=InformationType.PROPRIETARY
+ self.soss, information_type=InformationType.PROPRIETARY
)
for file in self.sampledata.iterdir():
@@ -92,7 +92,7 @@ class TestSOSSExporter(TestCaseWithFactory):
"""Integration test that checks that cve files imported and exported
match."""
soss_importer = SOSSImporter(
- information_type=InformationType.PROPRIETARY
+ self.soss, information_type=InformationType.PROPRIETARY
)
for file in self.sampledata.iterdir():
diff --git a/lib/lp/bugs/scripts/soss/tests/test_sossimport.py b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
index ef21825..16e50f9 100644
--- a/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
+++ b/lib/lp/bugs/scripts/soss/tests/test_sossimport.py
@@ -236,7 +236,7 @@ class TestSOSSImporter(TestCaseWithFactory):
file = self.sampledata / "CVE-2025-1979"
soss_importer = SOSSImporter(
- information_type=InformationType.PROPRIETARY
+ self.soss, information_type=InformationType.PROPRIETARY
)
bug, vulnerability = soss_importer.import_cve_from_file(file)
@@ -253,7 +253,7 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_create_update_bug(self):
"""Test create and update a bug from a SOSS cve file"""
- bug = SOSSImporter()._create_bug(self.soss_record, self.cve)
+ bug = SOSSImporter(self.soss)._create_bug(self.soss_record, self.cve)
self._check_bug_fields(bug, self.bugtask_reference)
@@ -278,7 +278,7 @@ class TestSOSSImporter(TestCaseWithFactory):
self.soss_record.packages.pop(SOSSRecord.PackageTypeEnum.RUST)
bug = SOSSImporter(
- information_type=InformationType.PROPRIETARY
+ self.soss, information_type=InformationType.PROPRIETARY
)._update_bug(bug, self.soss_record, new_cve)
transaction.commit()
@@ -296,7 +296,7 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_create_update_vulnerability(self):
"""Test create and update a vulnerability from a SOSS cve file"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
bug = soss_importer._create_bug(self.soss_record, self.cve)
vulnerability = soss_importer._create_vulnerability(
bug, self.soss_record, self.cve, self.soss
@@ -335,7 +335,7 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_create_or_update_bugtasks(self):
"""Test update bugtasks"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
bug = soss_importer._create_bug(self.soss_record, self.cve)
self._check_bugtasks(
@@ -386,7 +386,7 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_get_launchpad_cve(self):
"""Test get a cve from Launchpad"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
self.assertEqual(
soss_importer._get_launchpad_cve("2025-1979"), self.cve
)
@@ -394,12 +394,14 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_make_bug_description(self):
"""Test make a bug description from a SOSSRecord"""
- description = SOSSImporter()._make_bug_description(self.soss_record)
+ description = SOSSImporter(self.soss)._make_bug_description(
+ self.soss_record
+ )
self.assertEqual(description, self.description)
def test_get_assignee(self):
"""Test get an assignee person from Launchpad"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
janitor = soss_importer._get_assignee("janitor")
self.assertEqual(janitor, self.janitor)
@@ -408,7 +410,7 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_get_or_create_external_package(self):
"""Test create an ExternalPackage from SOSSRecord"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
cargo_pkg = soss_importer._get_or_create_external_package(
self.soss_record.packages[SOSSRecord.PackageTypeEnum.RUST][0],
@@ -432,12 +434,12 @@ class TestSOSSImporter(TestCaseWithFactory):
def test_prepare_cvss_data(self):
"""Test prepare the cvss json"""
- cvss = SOSSImporter()._prepare_cvss_data(self.soss_record)
+ cvss = SOSSImporter(self.soss)._prepare_cvss_data(self.soss_record)
self.assertEqual(cvss, self.cvss)
def test_validate_soss_record(self):
"""Test validate the SOSSRecord"""
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
valid = soss_importer._validate_soss_record(
self.soss_record, f"CVE-{self.cve.sequence}"
)
@@ -458,7 +460,7 @@ class TestSOSSImporter(TestCaseWithFactory):
self.assertEqual(valid, False)
def test_checkUserPermissions(self):
- soss_importer = SOSSImporter()
+ soss_importer = SOSSImporter(self.soss)
user = self.factory.makePerson()
self.assertEqual(soss_importer.checkUserPermissions(user), False)
diff --git a/lib/lp/bugs/scripts/svthandler.py b/lib/lp/bugs/scripts/svthandler.py
new file mode 100644
index 0000000..07efeca
--- /dev/null
+++ b/lib/lp/bugs/scripts/svthandler.py
@@ -0,0 +1,41 @@
+# Copyright 2025 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__all__ = [
+ "SVTRecord",
+ "SVTImporter",
+ "SVTExporter",
+]
+
+from lp.bugs.interfaces.bug import IBug
+from lp.bugs.interfaces.cve import ICve
+from lp.bugs.interfaces.vulnerability import IVulnerability
+from lp.registry.interfaces.distribution import IDistribution
+
+
+class SVTRecord:
+ """A dataclass that contains the exact same info as a cve file."""
+
+ def from_str(string: str) -> "SVTRecord":
+ """Parse a string and return a SVTRecord."""
+
+
+class SVTImporter:
+ def from_record(
+ record: SVTRecord, cve_sequence: str
+ ) -> (IBug, IVulnerability):
+ """Import a SVTRecord creating a bug and a vulnerability."""
+
+ def checkUserPermissions(user):
+ """Checks if the user has permissions to use this handler."""
+
+
+class SVTExporter:
+ def to_record(
+ lp_cve: ICve,
+ distribution: IDistribution,
+ bug: IBug,
+ vulnerability: IVulnerability,
+ ) -> SVTRecord:
+ """Export the bug and vulnerability related to a cve in a distribution
+ and return a SVTRecord."""
diff --git a/lib/lp/bugs/scripts/tests/test_uct.py b/lib/lp/bugs/scripts/tests/test_uct.py
index 8b40177..f4d66b8 100644
--- a/lib/lp/bugs/scripts/tests/test_uct.py
+++ b/lib/lp/bugs/scripts/tests/test_uct.py
@@ -35,135 +35,139 @@ TAG_SEPARATOR = UCTImporter.TAG_SEPARATOR
class TestUCTRecord(TestCase):
maxDiff = None
- def test_load_save(self):
- load_from = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
- uct_record = UCTRecord.load(load_from)
- self.assertDictEqual(
- UCTRecord(
- parent_dir="sampledata",
- assigned_to="",
- bugs=[
- "https://github.com/mm2/Little-CMS/issues/29",
- "https://github.com/mm2/Little-CMS/issues/30",
- "https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=745471",
- ],
- cvss=[
- CVSS(
- authority="nvd",
- vector_string=(
- "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
- "[7.8 HIGH]"
- ),
+ def setUp(self):
+ super().setUp()
+ self.record = UCTRecord(
+ parent_dir="sampledata",
+ assigned_to="",
+ bugs=[
+ "https://github.com/mm2/Little-CMS/issues/29",
+ "https://github.com/mm2/Little-CMS/issues/30",
+ "https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=745471",
+ ],
+ cvss=[
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
),
- ],
- candidate="CVE-2022-23222",
- crd=None,
- public_date_at_USN=datetime(
- 2022, 1, 14, 8, 15, tzinfo=timezone.utc
),
- public_date=datetime(2022, 1, 14, 8, 15, tzinfo=timezone.utc),
- description=(
- "kernel/bpf/verifier.c in the Linux kernel through "
- "5.15.14 allows local\nusers to gain privileges because "
- "of the availability of pointer arithmetic\nvia certain "
- "*_OR_NULL pointer types."
- ),
- discovered_by="tr3e wang",
- mitigation=(
- "seth-arnold> set kernel.unprivileged_bpf_disabled to 1"
- ),
- notes=(
- "sbeattie> Ubuntu 21.10 / 5.13+ kernels disable "
- "unprivileged BPF by default.\n kernels 5.8 and "
- "older are not affected, priority high is "
- "for\n 5.10 and 5.11 based kernels only"
- ),
- priority=UCTRecord.Priority.CRITICAL,
- references=["https://ubuntu.com/security/notices/USN-5368-1"],
- ubuntu_description=(
- "It was discovered that the BPF verifier in the Linux "
- "kernel did not\nproperly restrict pointer types in "
- "certain situations. A local attacker\ncould use this to "
- "cause a denial of service (system crash) or possibly\n"
- "execute arbitrary code."
- ),
- packages=[
- UCTRecord.Package(
- name="linux",
- statuses=[
- UCTRecord.SeriesPackageStatus(
- series="upstream",
- status=UCTRecord.PackageStatus.RELEASED,
- reason="5.17~rc1",
- priority=None,
- ),
- UCTRecord.SeriesPackageStatus(
- series="impish",
- status=UCTRecord.PackageStatus.RELEASED,
- reason="5.13.0-37.42",
- priority=UCTRecord.Priority.MEDIUM,
- ),
- UCTRecord.SeriesPackageStatus(
- series="devel",
- status=UCTRecord.PackageStatus.NOT_AFFECTED,
- reason="5.15.0-25.25",
- priority=UCTRecord.Priority.MEDIUM,
- ),
- ],
- priority=None,
- tags={"not-ue"},
- patches=[
- UCTRecord.Patch(
- patch_type="break-fix",
- entry=(
- "457f44363a8894135c85b7a9afd2bd8196db24ab "
- "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
- "local-CVE-2022-23222-fix"
- ),
- ),
- UCTRecord.Patch(
- patch_type="upstream",
- entry=(
- "https://github.com/389ds/389-ds-base/commit/58dbf084a63e6dbbd999bf6a70475fad8255f26a (1.4.4)" # noqa: 501
- ),
- ),
- UCTRecord.Patch(
- patch_type="upstream",
- entry=(
- "https://github.com/389ds/389-ds-base/commit/2e5b526012612d1d6ccace46398bee679a730271" # noqa: 501
- ),
- ),
- ],
- ),
- UCTRecord.Package(
- name="linux-hwe",
- statuses=[
- UCTRecord.SeriesPackageStatus(
- series="upstream",
- status=UCTRecord.PackageStatus.RELEASED,
- reason="5.17~rc1",
- priority=None,
+ ],
+ candidate="CVE-2022-23222",
+ crd=None,
+ public_date_at_USN=datetime(
+ 2022, 1, 14, 8, 15, tzinfo=timezone.utc
+ ),
+ public_date=datetime(2022, 1, 14, 8, 15, tzinfo=timezone.utc),
+ description=(
+ "kernel/bpf/verifier.c in the Linux kernel through "
+ "5.15.14 allows local\nusers to gain privileges because "
+ "of the availability of pointer arithmetic\nvia certain "
+ "*_OR_NULL pointer types."
+ ),
+ discovered_by="tr3e wang",
+ mitigation=(
+ "seth-arnold> set kernel.unprivileged_bpf_disabled to 1"
+ ),
+ notes=(
+ "sbeattie> Ubuntu 21.10 / 5.13+ kernels disable "
+ "unprivileged BPF by default.\n kernels 5.8 and "
+ "older are not affected, priority high is "
+ "for\n 5.10 and 5.11 based kernels only"
+ ),
+ priority=UCTRecord.Priority.CRITICAL,
+ references=["https://ubuntu.com/security/notices/USN-5368-1"],
+ ubuntu_description=(
+ "It was discovered that the BPF verifier in the Linux "
+ "kernel did not\nproperly restrict pointer types in "
+ "certain situations. A local attacker\ncould use this to "
+ "cause a denial of service (system crash) or possibly\n"
+ "execute arbitrary code."
+ ),
+ packages=[
+ UCTRecord.Package(
+ name="linux",
+ statuses=[
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="5.17~rc1",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="impish",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="5.13.0-37.42",
+ priority=UCTRecord.Priority.MEDIUM,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
+ reason="5.15.0-25.25",
+ priority=UCTRecord.Priority.MEDIUM,
+ ),
+ ],
+ priority=None,
+ tags={"not-ue"},
+ patches=[
+ UCTRecord.Patch(
+ patch_type="break-fix",
+ entry=(
+ "457f44363a8894135c85b7a9afd2bd8196db24ab "
+ "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
+ "local-CVE-2022-23222-fix"
),
- UCTRecord.SeriesPackageStatus(
- series="impish",
- status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
- reason="",
- priority=None,
+ ),
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/58dbf084a63e6dbbd999bf6a70475fad8255f26a (1.4.4)" # noqa: 501
),
- UCTRecord.SeriesPackageStatus(
- series="devel",
- status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
- reason="",
- priority=None,
+ ),
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/2e5b526012612d1d6ccace46398bee679a730271" # noqa: 501
),
- ],
- priority=UCTRecord.Priority.HIGH,
- tags=set(),
- patches=[],
- ),
- ],
- global_tags={"cisa-kev"},
- ).__dict__,
+ ),
+ ],
+ ),
+ UCTRecord.Package(
+ name="linux-hwe",
+ statuses=[
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="5.17~rc1",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="impish",
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="",
+ priority=None,
+ ),
+ ],
+ priority=UCTRecord.Priority.HIGH,
+ tags=set(),
+ patches=[],
+ ),
+ ],
+ global_tags={"cisa-kev"},
+ )
+
+ def test_load_save(self):
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
+ uct_record = UCTRecord.load(load_from)
+ self.assertDictEqual(
+ self.record.__dict__,
uct_record.__dict__,
)
@@ -263,6 +267,17 @@ class TestUCTRecord(TestCase):
)
self.assertEqual(load_from.read_text(), saved_to_path.read_text())
+ def test_from_str(self):
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
+ with open(load_from) as f:
+ string = f.read()
+
+ record = UCTRecord.from_str(string)
+
+ # We are importing a blob, creating a temp file
+ self.record.parent_dir = "tmp"
+ self.assertEqual(record.__dict__, self.record.__dict__)
+
class TestCVE(TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -866,7 +881,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
],
global_tags={"cisa-kev"},
)
- self.importer = UCTImporter()
+ self.importer = UCTImporter(self.ubuntu)
self.exporter = UCTExporter()
def checkBug(self, bug: Bug, cve: CVE):
@@ -1064,36 +1079,35 @@ class TestUCTImporterExporter(TestCaseWithFactory):
def checkVulnerabilities(self, bug: Bug, cve: CVE):
vulnerabilities = bug.vulnerabilities
- self.assertEqual(len(cve.affected_distributions), len(vulnerabilities))
+ self.assertEqual(1, len(vulnerabilities))
vulnerabilities_by_distro = {
v.distribution: v for v in vulnerabilities
}
- for distro in cve.affected_distributions:
- self.assertIn(distro, vulnerabilities_by_distro)
- vulnerability = vulnerabilities_by_distro[distro]
-
- self.assertEqual(self.bug_importer, vulnerability.creator)
- self.assertEqual(self.lp_cve, vulnerability.cve)
- self.assertEqual(cve.status, vulnerability.status)
- self.assertEqual(cve.ubuntu_description, vulnerability.description)
- self.assertEqual(cve.notes, vulnerability.notes)
- self.assertEqual(cve.mitigation, vulnerability.mitigation)
- self.assertEqual(cve.importance, vulnerability.importance)
- self.assertEqual(
- InformationType.PUBLICSECURITY, vulnerability.information_type
- )
- self.assertEqual(
- cve.date_made_public, vulnerability.date_made_public
- )
- self.assertEqual(
- cve.date_notice_issued, vulnerability.date_notice_issued
- )
- self.assertEqual(
- cve.date_coordinated_release,
- vulnerability.date_coordinated_release,
- )
- self.assertEqual([bug], vulnerability.bugs)
+
+ distro = self.ubuntu
+ self.assertIn(distro, vulnerabilities_by_distro)
+ vulnerability = vulnerabilities_by_distro[distro]
+
+ self.assertEqual(self.bug_importer, vulnerability.creator)
+ self.assertEqual(self.lp_cve, vulnerability.cve)
+ self.assertEqual(cve.status, vulnerability.status)
+ self.assertEqual(cve.ubuntu_description, vulnerability.description)
+ self.assertEqual(cve.notes, vulnerability.notes)
+ self.assertEqual(cve.mitigation, vulnerability.mitigation)
+ self.assertEqual(cve.importance, vulnerability.importance)
+ self.assertEqual(
+ InformationType.PUBLICSECURITY, vulnerability.information_type
+ )
+ self.assertEqual(cve.date_made_public, vulnerability.date_made_public)
+ self.assertEqual(
+ cve.date_notice_issued, vulnerability.date_notice_issued
+ )
+ self.assertEqual(
+ cve.date_coordinated_release,
+ vulnerability.date_coordinated_release,
+ )
+ self.assertEqual([bug], vulnerability.bugs)
def checkLaunchpadCve(self, lp_cve: CveModel, cve: CVE):
cvss = defaultdict(list)
@@ -1139,7 +1153,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.assertEqual(expected.global_tags, actual.global_tags)
def test_create_bug(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
self.checkBug(bug, self.cve)
self.checkBugTasks(bug, self.cve)
@@ -1238,32 +1252,39 @@ class TestUCTImporterExporter(TestCaseWithFactory):
global_tags={"cisa-kev"},
)
lp_cve = self.factory.makeCVE(sequence="2022-1234")
- bug = self.importer.create_bug(cve, lp_cve)
+ bug, _ = self.importer.create_bug(cve, lp_cve)
self.checkBug(bug, cve)
self.checkBugTasks(bug, cve)
self.assertEqual([lp_cve], bug.cves)
def test_find_existing_bug(self):
self.assertIsNone(
- self.importer._find_existing_bug(self.cve, self.lp_cve)
+ self.importer._find_existing_bug(
+ self.cve, self.lp_cve, self.ubuntu
+ )
)
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
self.assertEqual(
- self.importer._find_existing_bug(self.cve, self.lp_cve), bug
+ self.importer._find_existing_bug(
+ self.cve, self.lp_cve, self.ubuntu
+ ),
+ bug,
)
def test_find_existing_bug_multiple_bugs(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
another_bug = self.factory.makeBug(bug.bugtasks[0].target)
- self.assertGreater(len(bug.vulnerabilities), 1)
+ self.assertEqual(len(bug.vulnerabilities), 1)
vulnerability = bug.vulnerabilities[0]
- vulnerability.unlinkBug(bug)
+
+ # Link another_bug so same vulnerability has more than one
vulnerability.linkBug(another_bug)
self.assertRaises(
UCTImportError,
self.importer._find_existing_bug,
self.cve,
self.lp_cve,
+ self.ubuntu,
)
def test_update_bug_new_package(self):
@@ -1279,7 +1300,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
)
cve = self.cve
- bug = self.importer.create_bug(cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(cve, self.lp_cve)
cve.distro_packages.append(
CVE.DistroPackage(
@@ -1318,7 +1339,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
)
cve = self.cve
- bug = self.importer.create_bug(cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(cve, self.lp_cve)
cve.series_packages.append(
CVE.SeriesPackage(
@@ -1352,7 +1373,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
)
cve = self.cve
- bug = self.importer.create_bug(cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(cve, self.lp_cve)
cve.distro_packages.append(
CVE.DistroPackage(
@@ -1381,14 +1402,14 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkVulnerabilities(bug, cve)
def test_update_bug_assignee_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
cve.assignee = self.factory.makePerson()
self.importer.update_bug(bug, cve, self.lp_cve)
self.checkBugTasks(bug, cve)
def test_update_bug_cve_importance_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
self.assertNotEqual(cve.importance, BugTaskImportance.CRITICAL)
cve.importance = BugTaskImportance.CRITICAL
@@ -1396,7 +1417,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkVulnerabilities(bug, cve)
def test_update_bug_cve_status_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
self.assertNotEqual(cve.status, VulnerabilityStatus.IGNORED)
cve.status = VulnerabilityStatus.IGNORED
@@ -1404,7 +1425,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkVulnerabilities(bug, cve)
def test_update_bug_package_importance_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
self.assertNotEqual(
cve.distro_packages[0].importance, BugTaskImportance.CRITICAL
@@ -1422,7 +1443,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBugTasks(bug, cve)
def test_update_bug_package_status_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
self.assertNotEqual(
cve.series_packages[0].status, BugTaskStatus.DOESNOTEXIST
@@ -1438,7 +1459,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBugTasks(bug, cve)
def test_update_bug_external_bugs_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
# Add new URL
@@ -1452,7 +1473,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_bug_global_tags_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
cve.global_tags.add("another-tag")
@@ -1460,7 +1481,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_bug_ubuntu_description_changed(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
cve.ubuntu_description += "new"
@@ -1468,7 +1489,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_bug_references(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
# Add new URL
@@ -1482,7 +1503,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_patch_urls(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
# Add new patch URL
@@ -1513,7 +1534,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_break_fix(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
# Add new break_fix
@@ -1544,7 +1565,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.checkBug(bug, cve)
def test_update_tags(self):
- bug = self.importer.create_bug(self.cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(self.cve, self.lp_cve)
cve = self.cve
# Add new tags
@@ -1558,14 +1579,18 @@ class TestUCTImporterExporter(TestCaseWithFactory):
def test_import_cve(self):
self.importer.import_cve(self.cve)
self.assertIsNotNone(
- self.importer._find_existing_bug(self.cve, self.lp_cve)
+ self.importer._find_existing_bug(
+ self.cve, self.lp_cve, self.ubuntu
+ )
)
self.checkLaunchpadCve(self.lp_cve, self.cve)
def test_import_cve_dry_run(self):
- importer = UCTImporter(dry_run=True)
+ importer = UCTImporter(self.ubuntu, dry_run=True)
importer.import_cve(self.cve)
- self.assertIsNone(importer._find_existing_bug(self.cve, self.lp_cve))
+ self.assertIsNone(
+ importer._find_existing_bug(self.cve, self.lp_cve, self.ubuntu)
+ )
def test_naive_dates(self):
cve = self.cve
@@ -1574,7 +1599,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
cve.date_coordinated_release = cve.date_coordinated_release.replace(
tzinfo=None
)
- bug = self.importer.create_bug(cve, self.lp_cve)
+ bug, _ = self.importer.create_bug(cve, self.lp_cve)
for date in (
bug.vulnerabilities[0].date_made_public,
bug.vulnerabilities[0].date_notice_issued,
@@ -1591,13 +1616,17 @@ class TestUCTImporterExporter(TestCaseWithFactory):
def test_make_cve_from_bug(self):
self.importer.import_cve(self.cve)
- bug = self.importer._find_existing_bug(self.cve, self.lp_cve)
+ bug = self.importer._find_existing_bug(
+ self.cve, self.lp_cve, self.ubuntu
+ )
cve = self.exporter._make_cve_from_bug(bug)
self.checkCVE(self.cve, cve)
def test_export_bug_to_uct_file(self):
self.importer.import_cve(self.cve)
- bug = self.importer._find_existing_bug(self.cve, self.lp_cve)
+ bug = self.importer._find_existing_bug(
+ self.cve, self.lp_cve, self.ubuntu
+ )
output_dir = Path(self.makeTemporaryDirectory())
cve_path = self.exporter.export_bug_to_uct_file(bug.id, output_dir)
uct_record = UCTRecord.load(cve_path)
diff --git a/lib/lp/bugs/scripts/uct/models.py b/lib/lp/bugs/scripts/uct/models.py
index 0004ff2..6aa6013 100644
--- a/lib/lp/bugs/scripts/uct/models.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -3,6 +3,7 @@
import logging
import re
+import tempfile
from collections import OrderedDict, defaultdict
from datetime import datetime
from enum import Enum
@@ -28,6 +29,7 @@ from zope.schema.interfaces import InvalidURI
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
+from lp.bugs.scripts.svthandler import SVTRecord
from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.distroseries import IDistroSeriesSet
from lp.registry.interfaces.person import IPersonSet
@@ -58,7 +60,7 @@ class CVSS(NamedTuple):
vector_string: str
-class UCTRecord:
+class UCTRecord(SVTRecord):
"""
UCTRecord represents a single CVE record (file) in the ubuntu-cve-tracker.
@@ -148,6 +150,13 @@ class UCTRecord:
return self.__dict__ == other.__dict__
@classmethod
+ def from_str(self, string: str) -> "UCTRecord":
+ with tempfile.NamedTemporaryFile("w") as fp:
+ fp.write(string)
+ fp.flush()
+ return self.load(Path(fp.name))
+
+ @classmethod
def load(cls, cve_path: Path) -> "UCTRecord":
"""
Create a `UCTRecord` instance from a file located at `cve_path`.
diff --git a/lib/lp/bugs/scripts/uct/uctexport.py b/lib/lp/bugs/scripts/uct/uctexport.py
index 188a566..dfad576 100644
--- a/lib/lp/bugs/scripts/uct/uctexport.py
+++ b/lib/lp/bugs/scripts/uct/uctexport.py
@@ -15,6 +15,7 @@ from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.scripts.svthandler import SVTExporter
from lp.bugs.scripts.uct.models import CVE, CVSS
from lp.bugs.scripts.uct.uctimport import UCTImporter
from lp.registry.model.distributionsourcepackage import (
@@ -32,7 +33,7 @@ TAG_SEPARATOR = UCTImporter.TAG_SEPARATOR
logger = logging.getLogger(__name__)
-class UCTExporter:
+class UCTExporter(SVTExporter):
"""
`UCTExporter` is used to export LP Bugs, Vulnerabilities and Cve's to
UCT CVE files.
diff --git a/lib/lp/bugs/scripts/uct/uctimport.py b/lib/lp/bugs/scripts/uct/uctimport.py
index 7da27ab..5b50686 100644
--- a/lib/lp/bugs/scripts/uct/uctimport.py
+++ b/lib/lp/bugs/scripts/uct/uctimport.py
@@ -9,8 +9,10 @@ Launchpad.
For each entry in UCT we:
1. Create a Bug instance
-2. Create a Vulnerability instance for each affected distribution and link it
- to the bug
+2. Create a Vulnerability instance for Ubuntu and link it to the bug. Although
+we are using distributions as fips, esm, etc. we store that information using
+bugtasks for each distropackage. One Ubuntu Vulnerability is enough to store
+all the information we need.
3. Create a Bug Task for each distribution/series package in the CVE entry
4. Update the statuses of Bug Tasks based on the information in the CVE entry
5. Update the information the related Launchpad's `Cve` model, if necessary
@@ -29,7 +31,7 @@ from collections import defaultdict
from datetime import timezone
from itertools import chain
from pathlib import Path
-from typing import Dict, List, Optional, Set
+from typing import Dict, List, Optional, Set, Tuple
import transaction
from zope.component import getUtility
@@ -48,9 +50,12 @@ from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
from lp.bugs.model.cve import Cve as CveModel
from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.scripts.svthandler import SVTImporter
from lp.bugs.scripts.uct.models import CVE, UCTRecord
+from lp.registry.interfaces.role import IPersonRoles
from lp.registry.model.distribution import Distribution
from lp.registry.model.person import Person
+from lp.registry.security import SecurityAdminDistribution
from lp.services.database.constants import UTC_NOW
__all__ = [
@@ -65,16 +70,23 @@ class UCTImportError(Exception):
pass
-class UCTImporter:
+class UCTImporter(SVTImporter):
"""
`UCTImporter` is used to import UCT CVE files to Launchpad database.
"""
TAG_SEPARATOR = "."
- def __init__(self, dry_run=False):
+ def __init__(
+ self,
+ ubuntu,
+ information_type=InformationType.PUBLICSECURITY,
+ dry_run=False,
+ ):
self.dry_run = dry_run
self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+ self.ubuntu = ubuntu
+ self.information_type = information_type
def import_cve_from_file(self, cve_path: Path) -> None:
"""
@@ -87,7 +99,22 @@ class UCTImporter:
cve = CVE.make_from_uct_record(uct_record)
self.import_cve(cve)
- def import_cve(self, cve: CVE) -> None:
+ def from_record(
+ self, record: UCTRecord, cve_sequence: str
+ ) -> Optional[Tuple[BugModel, Vulnerability]]:
+ cve = CVE.make_from_uct_record(record)
+
+ if cve.sequence != cve_sequence:
+ logger.error(
+ "[SOSSImporter] CVE sequence mismatch: %s != %s",
+ cve.sequence,
+ cve_sequence,
+ )
+ return None, None
+
+ return self.import_cve(cve)
+
+ def import_cve(self, cve: CVE) -> Optional[Tuple[BugModel, Vulnerability]]:
"""
Import a `CVE` instance to Launchpad database.
@@ -101,7 +128,7 @@ class UCTImporter:
cve.sequence,
cve.sequence,
)
- return
+ return None, None
if not cve.series_packages:
logger.warning(
"%s: could not find any affected packages, aborting."
@@ -109,7 +136,7 @@ class UCTImporter:
cve.series_packages,
cve.sequence,
)
- return
+ return None, None
lp_cve: CveModel = removeSecurityProxy(
getUtility(ICveSet)[cve.sequence]
)
@@ -120,11 +147,11 @@ class UCTImporter:
cve.sequence,
cve.sequence,
)
- return
- bug = self._find_existing_bug(cve, lp_cve)
+ return None, None
+ bug = self._find_existing_bug(cve, lp_cve, self.ubuntu)
try:
if bug is None:
- bug = self.create_bug(cve, lp_cve)
+ bug, vulnerability = self.create_bug(cve, lp_cve)
logger.info(
"%s: created bug with ID: %s", cve.sequence, bug.id
)
@@ -134,7 +161,7 @@ class UCTImporter:
cve.sequence,
bug.id,
)
- self.update_bug(bug, cve, lp_cve)
+ bug, vulnerability = self.update_bug(bug, cve, lp_cve)
logger.info(
"%s: updated bug with ID: %s", cve.sequence, bug.id
)
@@ -153,6 +180,7 @@ class UCTImporter:
transaction.commit()
logger.info("%s was imported successfully", cve.sequence)
+ return bug, vulnerability
def create_bug(self, cve: CVE, lp_cve: CveModel) -> BugModel:
"""
@@ -165,15 +193,18 @@ class UCTImporter:
distro_package = cve.distro_packages[0]
# Create the bug
- bug: BugModel = getUtility(IBugSet).createBug(
- CreateBugParams(
- comment=self._make_bug_description(cve),
- title=cve.sequence,
- information_type=InformationType.PUBLICSECURITY,
- owner=self.bug_importer,
- target=distro_package.target,
- importance=distro_package.importance,
- cve=lp_cve,
+ bug: BugModel = removeSecurityProxy(
+ getUtility(IBugSet).createBug(
+ CreateBugParams(
+ comment=self._make_bug_description(cve),
+ title=cve.sequence,
+ information_type=self.information_type,
+ owner=self.bug_importer,
+ target=distro_package.target,
+ importance=distro_package.importance,
+ cve=lp_cve,
+ check_permissions=False,
+ )
)
)
@@ -206,11 +237,13 @@ class UCTImporter:
message=f"UCT CVE entry {cve.sequence}",
)
- # Create the Vulnerabilities
- for distribution in cve.affected_distributions:
- self._create_vulnerability(bug, cve, lp_cve, distribution)
-
- return bug
+ # Bug with bugtasks will target packages in different distributions
+ # like we said for fips, esm, etc... But the real distribution is only
+ # Ubuntu so we will only create a vulnerability for ubuntu
+ vulnerability = self._create_vulnerability(
+ bug, cve, lp_cve, self.ubuntu
+ )
+ return bug, vulnerability
def update_bug(self, bug: BugModel, cve: CVE, lp_cve: CveModel) -> None:
"""
@@ -241,16 +274,13 @@ class UCTImporter:
self._update_break_fix(bug, cve.break_fix_data)
self._update_tags(bug, cve.global_tags, cve.distro_packages)
- # Update or add new Vulnerabilities
- vulnerabilities_by_distro = {
- v.distribution: v for v in bug.vulnerabilities
- }
- for distro in cve.affected_distributions:
- vulnerability = vulnerabilities_by_distro.get(distro)
- if vulnerability is None:
- self._create_vulnerability(bug, cve, lp_cve, distro)
- else:
- self._update_vulnerability(vulnerability, cve)
+ vulnerability = self._find_existing_vulnerability(lp_cve, self.ubuntu)
+ if vulnerability is None:
+ self._create_vulnerability(bug, cve, lp_cve, self.ubuntu)
+ else:
+ self._update_vulnerability(vulnerability, cve)
+
+ return bug, vulnerability
def _update_tags(
self, bug: BugModel, global_tags: Set, distro_packages: List
@@ -266,21 +296,36 @@ class UCTImporter:
bug.tags = tags
def _find_existing_bug(
- self, cve: CVE, lp_cve: CveModel
+ self,
+ cve: CVE,
+ lp_cve: CveModel,
+ distribution: Distribution,
) -> Optional[BugModel]:
- bug = None
- for vulnerability in lp_cve.vulnerabilities:
- if vulnerability.distribution in cve.affected_distributions:
- bugs = vulnerability.bugs
- if bugs:
- if bug and bugs[0] != bug:
- raise UCTImportError(
- "Multiple existing bugs are found "
- "for CVE {}".format(cve.sequence)
- )
- else:
- bug = bugs[0]
- return bug
+ """Find existing bug for the given CVE."""
+ vulnerability = self._find_existing_vulnerability(lp_cve, distribution)
+ if not vulnerability:
+ return None
+
+ bugs = vulnerability.bugs
+ if len(bugs) > 1:
+ raise UCTImportError(
+ "Multiple existing bugs found for CVE ",
+ cve.sequence,
+ )
+ if bugs:
+ return removeSecurityProxy(bugs[0])
+
+ return None
+
+ def _find_existing_vulnerability(
+ self, lp_cve: CveModel, distribution: Distribution
+ ) -> Optional[Vulnerability]:
+ """Find existing vulnerability for the current distribution"""
+ if not lp_cve:
+ return None
+
+ vulnerability = lp_cve.getDistributionVulnerability(distribution)
+ return removeSecurityProxy(vulnerability)
def _create_bug_tasks(
self,
@@ -329,14 +374,16 @@ class UCTImporter:
:param distribution: a `Distribution` affected by the vulnerability
:return: a Vulnerability
"""
- vulnerability: Vulnerability = getUtility(IVulnerabilitySet).new(
- distribution=distribution,
- status=cve.status,
- importance=cve.importance,
- importance_explanation=cve.importance_explanation,
- creator=bug.owner,
- information_type=InformationType.PUBLICSECURITY,
- cve=lp_cve,
+ vulnerability: Vulnerability = removeSecurityProxy(
+ getUtility(IVulnerabilitySet).new(
+ distribution=distribution,
+ status=cve.status,
+ importance=cve.importance,
+ importance_explanation=cve.importance_explanation,
+ creator=bug.owner,
+ information_type=self.information_type,
+ cve=lp_cve,
+ )
)
self._update_vulnerability(vulnerability, cve)
@@ -555,3 +602,9 @@ class UCTImporter:
"""
lp_cve.setCVSSVectorForAuthority(cve.cvss)
lp_cve.discovered_by = cve.discovered_by
+
+ def checkUserPermissions(self, user):
+ """See `SVTImporter`."""
+ return SecurityAdminDistribution(self.ubuntu).checkAuthenticated(
+ IPersonRoles(user)
+ )
diff --git a/lib/lp/bugs/scripts/uctimport.py b/lib/lp/bugs/scripts/uctimport.py
index aed11c3..6e1dd39 100644
--- a/lib/lp/bugs/scripts/uctimport.py
+++ b/lib/lp/bugs/scripts/uctimport.py
@@ -1,6 +1,9 @@
import logging
from pathlib import Path
+from zope.component import getUtility
+
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.app.validators.cve import CVEREF_PATTERN
from lp.bugs.scripts.uct import UCTImporter
from lp.services.scripts.base import LaunchpadScript
@@ -58,6 +61,8 @@ class UCTImportScript(LaunchpadScript):
return
else:
cve_paths = [path]
- importer = UCTImporter(dry_run=self.options.dry_run)
+
+ ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
+ importer = UCTImporter(ubuntu, dry_run=self.options.dry_run)
for cve_path in cve_paths:
importer.import_cve_from_file(cve_path)
diff --git a/lib/lp/bugs/tests/test_importvulnerabilityjob.py b/lib/lp/bugs/tests/test_importvulnerabilityjob.py
index aa1640c..ef4fa3b 100644
--- a/lib/lp/bugs/tests/test_importvulnerabilityjob.py
+++ b/lib/lp/bugs/tests/test_importvulnerabilityjob.py
@@ -57,7 +57,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
repository=self.repository,
paths=("ref/heads/main", "ref/tags/v1.0"),
)
- self.cve_path = (
+ self.soss_cve_path = (
Path(__file__).parent
/ ".."
/ ".."
@@ -68,6 +68,16 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
/ "sampledata"
/ "CVE-2025-1979"
)
+ self.uct_cve_path = (
+ Path(__file__).parent
+ / ".."
+ / ".."
+ / "bugs"
+ / "scripts"
+ / "tests"
+ / "sampledata"
+ / "CVE-2022-3219"
+ )
@property
def job_source(self):
@@ -339,9 +349,9 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
)
self.assertEqual(naked_job.metadata, metadata)
- def test_run_import(self):
+ def test_run_import_soss(self):
"""Run ImportVulnerabilityJob."""
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
@@ -379,6 +389,76 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
},
)
+ def test_run_import_uct(self):
+ """Run ImportVulnerabilityJob."""
+ with open(self.uct_cve_path, encoding="utf-8") as file:
+ self.useFixture(
+ GitHostingFixture(
+ blob=file.read(),
+ refs=self.refs,
+ diff_stats={"added": ["active/CVE-2022-3219"]},
+ )
+ )
+
+ cve = self.factory.makeCVE("2022-3219")
+ ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
+ trusty = self.factory.makeDistroSeries(
+ distribution=ubuntu, name="trusty"
+ )
+ self.factory.makeDistroSeries(distribution=ubuntu, name="xenial")
+ self.factory.makeDistroSeries(distribution=ubuntu, name="bionic")
+ self.factory.makeDistroSeries(distribution=ubuntu, name="focal")
+ self.factory.makeDistroSeries(distribution=ubuntu, name="jammy")
+ self.factory.makeDistroSeries(distribution=ubuntu, name="kinetic")
+ self.factory.makeDistroSeries(distribution=ubuntu, name="devel")
+
+ ubuntu_esm = self.factory.makeDistribution(name="ubuntu-esm")
+ self.factory.makeDistroSeries(distribution=ubuntu_esm, name="trusty")
+ self.factory.makeDistroSeries(distribution=ubuntu_esm, name="xenial")
+ self.factory.makeDistroSeries(distribution=ubuntu_esm, name="bionic")
+ self.factory.makeDistroSeries(distribution=ubuntu_esm, name="focal")
+ self.factory.makeDistroSeries(distribution=ubuntu_esm, name="jammy")
+
+ release = self.factory.makeSourcePackageRelease(
+ sourcepackagename="gnupg", distroseries=trusty
+ )
+ self.factory.makeSourcePackagePublishingHistory(
+ distroseries=trusty, sourcepackagerelease=release
+ )
+ release2 = self.factory.makeSourcePackageRelease(
+ sourcepackagename="gnupg2", distroseries=trusty
+ )
+ self.factory.makeSourcePackagePublishingHistory(
+ distroseries=trusty, sourcepackagerelease=release2
+ )
+ transaction.commit()
+
+ job = self.job_source.create(
+ handler=VulnerabilityHandlerEnum.UCT,
+ git_repository=self.repository.id,
+ git_ref="ref/heads/main",
+ git_paths=["active"],
+ information_type=InformationType.PRIVATESECURITY.value,
+ import_since_commit_sha1=None,
+ )
+ job.run()
+
+ # Check that it created the bug and vulnerability
+ self.assertEqual(len(cve.bugs), 1)
+
+ admin = getUtility(ILaunchpadCelebrities).admin
+ with person_logged_in(admin):
+ self.assertEqual(len(list(cve.vulnerabilities)), 1)
+
+ self.assertEqual(
+ job.metadata.get("result"),
+ {
+ "succeeded": ["CVE-2022-3219"],
+ "failed": [],
+ "error_description": [],
+ },
+ )
+
def test_run_import_with_private_repo(self):
"""Run ImportVulnerabilityJob using a PRIVATESECURITY git
repository."""
@@ -391,7 +471,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
repository=private_repo,
paths=("ref/heads/main", "ref/tags/v1.0"),
)
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
@@ -448,7 +528,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
repository=proprietary_repo,
paths=("ref/heads/main", "ref/tags/v1.0"),
)
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
@@ -494,7 +574,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
def test_run_import_with_wrong_git_paths(self):
"""Run ImportVulnerabilityJob with wrong git_paths."""
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
@@ -559,7 +639,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
def test_run_import_with_wrong_git_ref(self):
"""Run ImportVulnerabilityJob with wrong git_ref."""
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
@@ -635,7 +715,7 @@ class ImportVulnerabilityJobTests(TestCaseWithFactory):
def test_run_import_with_import_since_commit_sha1(self):
"""Run ImportVulnerabilityJob using import_since_commit_sha1"""
- with open(self.cve_path, encoding="utf-8") as file:
+ with open(self.soss_cve_path, encoding="utf-8") as file:
self.useFixture(
GitHostingFixture(
blob=file.read(),
Follow ups