launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28872
[Merge] ~andrey-fedoseev/launchpad:uct-import-update-existing into launchpad:master
Andrey Fedoseev has proposed merging ~andrey-fedoseev/launchpad:uct-import-update-existing into launchpad:master.
Commit message:
Refactor the UCT import
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~andrey-fedoseev/launchpad/+git/launchpad/+merge/427275
This is an intermediary merge proposal
I refactored the UCT import script in preparation for updating the existing bugs and exporting CVEs back to the UCT flat file format
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~andrey-fedoseev/launchpad:uct-import-update-existing into launchpad:master.
diff --git a/lib/lp/bugs/scripts/tests/test_uctimport.py b/lib/lp/bugs/scripts/tests/test_uctimport.py
index 4b52c4d..4fbb17c 100644
--- a/lib/lp/bugs/scripts/tests/test_uctimport.py
+++ b/lib/lp/bugs/scripts/tests/test_uctimport.py
@@ -9,29 +9,21 @@ from lp.app.enums import InformationType
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
-from lp.bugs.scripts.uctimport import (
- CVE,
- DistroSeriesPackageStatus,
- Note,
- Package,
- PackageStatus,
- Patch,
- Priority,
- UCTImporter,
- load_cve_from_file,
-)
+from lp.bugs.scripts.uctimport import CVE, UCTImporter, UCTRecord
from lp.registry.interfaces.series import SeriesStatus
+from lp.registry.model.sourcepackage import SourcePackage
from lp.testing import TestCase, TestCaseWithFactory
from lp.testing.layers import ZopelessDatabaseLayer
-class TestLoadCVEFromFile(TestCase):
- def test_load_cve_from_file(self):
+class TestUCTRecord(TestCase):
+ def test_load(self):
cve_path = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
- cve = load_cve_from_file(cve_path)
+ uct_record = UCTRecord.load(cve_path)
self.assertEqual(
- cve,
- CVE(
+ uct_record,
+ UCTRecord(
+ path=cve_path,
assigned_to="",
bugs=[
"https://github.com/mm2/Little-CMS/issues/29",
@@ -63,7 +55,7 @@ class TestLoadCVEFromFile(TestCase):
"seth-arnold> set kernel.unprivileged_bpf_disabled to 1"
),
notes=[
- Note(
+ UCTRecord.Note(
author="sbeattie",
text=(
"Ubuntu 21.10 / 5.13+ kernels disable "
@@ -73,7 +65,7 @@ class TestLoadCVEFromFile(TestCase):
),
),
],
- priority=Priority.CRITICAL,
+ priority=UCTRecord.Priority.CRITICAL,
references=["https://ubuntu.com/security/notices/USN-5368-1"],
ubuntu_description=(
"It was discovered that the BPF verifier in the Linux "
@@ -83,24 +75,24 @@ class TestLoadCVEFromFile(TestCase):
"execute arbitrary code."
),
packages=[
- Package(
+ UCTRecord.Package(
name="linux",
statuses=[
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="devel",
- status=PackageStatus.NOT_AFFECTED,
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
reason="5.15.0-25.25",
- priority=Priority.MEDIUM,
+ priority=UCTRecord.Priority.MEDIUM,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="impish",
- status=PackageStatus.RELEASED,
+ status=UCTRecord.PackageStatus.RELEASED,
reason="5.13.0-37.42",
- priority=Priority.MEDIUM,
+ priority=UCTRecord.Priority.MEDIUM,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="upstream",
- status=PackageStatus.RELEASED,
+ status=UCTRecord.PackageStatus.RELEASED,
reason="5.17~rc1",
priority=None,
),
@@ -108,7 +100,7 @@ class TestLoadCVEFromFile(TestCase):
priority=None,
tags={"not-ue"},
patches=[
- Patch(
+ UCTRecord.Patch(
patch_type="break-fix",
entry=(
"457f44363a8894135c85b7a9afd2bd8196db24ab "
@@ -118,29 +110,29 @@ class TestLoadCVEFromFile(TestCase):
)
],
),
- Package(
+ UCTRecord.Package(
name="linux-hwe",
statuses=[
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="devel",
- status=PackageStatus.DOES_NOT_EXIST,
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="",
priority=None,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="impish",
- status=PackageStatus.DOES_NOT_EXIST,
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="",
priority=None,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="upstream",
- status=PackageStatus.RELEASED,
+ status=UCTRecord.PackageStatus.RELEASED,
reason="5.17~rc1",
priority=None,
),
],
- priority=Priority.HIGH,
+ priority=UCTRecord.Priority.HIGH,
tags=set(),
patches=[],
),
@@ -149,6 +141,211 @@ class TestLoadCVEFromFile(TestCase):
)
+class TextCVE(TestCaseWithFactory):
+
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self, *args, **kwargs):
+ super().setUp(*args, **kwargs)
+ celebrities = getUtility(ILaunchpadCelebrities)
+ ubuntu = celebrities.ubuntu
+ self.supported_series = self.factory.makeDistroSeries(
+ distribution=ubuntu, status=SeriesStatus.SUPPORTED
+ )
+ self.current_series = self.factory.makeDistroSeries(
+ distribution=ubuntu, status=SeriesStatus.CURRENT
+ )
+ self.devel_series = self.factory.makeDistroSeries(
+ distribution=ubuntu, status=SeriesStatus.DEVELOPMENT
+ )
+ self.dsp1 = self.factory.makeDistributionSourcePackage(
+ distribution=ubuntu
+ )
+ self.dsp2 = self.factory.makeDistributionSourcePackage(
+ distribution=ubuntu
+ )
+
+ def test_make_from_uct_record(self):
+ uct_record = UCTRecord(
+ path=Path("./active/CVE-2022-23222"),
+ assigned_to="assignee",
+ bugs=["https://github.com/mm2/Little-CMS/issues/29"],
+ cvss=[],
+ candidate="CVE-2022-23222",
+ date_made_public=datetime.datetime(
+ 2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ description="description",
+ discovered_by="tr3e wang",
+ mitigation="mitigation",
+ notes=[
+ UCTRecord.Note(
+ author="author",
+ text="text",
+ ),
+ ],
+ priority=UCTRecord.Priority.CRITICAL,
+ references=["https://ubuntu.com/security/notices/USN-5368-1"],
+ ubuntu_description="ubuntu-description",
+ packages=[
+ UCTRecord.Package(
+ name=self.dsp1.sourcepackagename.name,
+ statuses=[
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries=self.supported_series.name,
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
+ reason="reason 1",
+ priority=UCTRecord.Priority.MEDIUM,
+ ),
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries=self.current_series.name,
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="reason 2",
+ priority=UCTRecord.Priority.MEDIUM,
+ ),
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries="devel",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="reason 3",
+ priority=None,
+ ),
+ ],
+ priority=None,
+ tags={"not-ue"},
+ patches=[
+ UCTRecord.Patch(
+ patch_type="break-fix",
+ entry=(
+ "457f44363a8894135c85b7a9afd2bd8196db24ab "
+ "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
+ "local-CVE-2022-23222-fix"
+ ),
+ )
+ ],
+ ),
+ UCTRecord.Package(
+ name=self.dsp2.sourcepackagename.name,
+ statuses=[
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries=self.supported_series.name,
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="",
+ priority=None,
+ ),
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries=self.current_series.name,
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="",
+ priority=None,
+ ),
+ UCTRecord.DistroSeriesPackageStatus(
+ distroseries="devel",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="",
+ priority=None,
+ ),
+ ],
+ priority=UCTRecord.Priority.HIGH,
+ tags=set(),
+ patches=[],
+ ),
+ ],
+ )
+ cve = CVE.make_from_uct_record(uct_record)
+ self.assertEqual(cve.sequence, "CVE-2022-23222")
+ self.assertEqual(
+ cve.date_made_public,
+ datetime.datetime(
+ 2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ )
+ self.assertEqual(
+ cve.distro_packages,
+ [
+ CVE.DistroPackage(
+ package=self.dsp1,
+ importance=BugTaskImportance.CRITICAL,
+ ),
+ CVE.DistroPackage(
+ package=self.dsp2,
+ importance=BugTaskImportance.HIGH,
+ ),
+ ],
+ )
+ self.assertEqual(
+ cve.series_packages,
+ [
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp1.sourcepackagename,
+ distroseries=self.supported_series,
+ ),
+ importance=BugTaskImportance.MEDIUM,
+ status=BugTaskStatus.INVALID,
+ status_explanation="reason 1",
+ ),
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp1.sourcepackagename,
+ distroseries=self.current_series,
+ ),
+ importance=BugTaskImportance.MEDIUM,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="reason 2",
+ ),
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp1.sourcepackagename,
+ distroseries=self.devel_series,
+ ),
+ importance=BugTaskImportance.CRITICAL,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="reason 3",
+ ),
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp2.sourcepackagename,
+ distroseries=self.supported_series,
+ ),
+ importance=BugTaskImportance.HIGH,
+ status=BugTaskStatus.DOESNOTEXIST,
+ status_explanation="",
+ ),
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp2.sourcepackagename,
+ distroseries=self.current_series,
+ ),
+ importance=BugTaskImportance.HIGH,
+ status=BugTaskStatus.DOESNOTEXIST,
+ status_explanation="",
+ ),
+ CVE.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=self.dsp2.sourcepackagename,
+ distroseries=self.devel_series,
+ ),
+ importance=BugTaskImportance.HIGH,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="",
+ ),
+ ],
+ )
+ self.assertEqual(cve.importance, BugTaskImportance.CRITICAL)
+ self.assertEqual(cve.status, VulnerabilityStatus.ACTIVE)
+ self.assertEqual(cve.assigned_to, "assignee")
+ self.assertEqual(cve.description, "description")
+ self.assertEqual(cve.ubuntu_description, "ubuntu-description")
+ self.assertEqual(
+ cve.bug_urls, ["https://github.com/mm2/Little-CMS/issues/29"]
+ )
+ self.assertEqual(
+ cve.references, ["https://ubuntu.com/security/notices/USN-5368-1"]
+ )
+ self.assertEqual(cve.notes, "author> text")
+ self.assertEqual(cve.mitigation, "mitigation")
+
+
class TestUCTImporter(TestCaseWithFactory):
layer = ZopelessDatabaseLayer
@@ -186,7 +383,8 @@ class TestUCTImporter(TestCaseWithFactory):
)
now = datetime.datetime.now(datetime.timezone.utc)
- cve = CVE(
+ uct_record = UCTRecord(
+ path=Path("./ubuntu-cve-tracker/active/CVE-2022-23222"),
assigned_to=assignee.name,
bugs=[
"https://github.com/mm2/Little-CMS/issues/29",
@@ -199,49 +397,49 @@ class TestUCTImporter(TestCaseWithFactory):
description="description",
discovered_by="tr3e wang",
mitigation="mitigation",
- notes=[Note(author="author", text="text")],
- priority=Priority.MEDIUM,
+ notes=[UCTRecord.Note(author="author", text="text")],
+ priority=UCTRecord.Priority.MEDIUM,
references=["https://ubuntu.com/security/notices/USN-5368-1"],
ubuntu_description="ubuntu-description",
packages=[
- Package(
+ UCTRecord.Package(
name=dsp1.sourcepackagename.name,
statuses=[
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries=supported_series.name,
- status=PackageStatus.RELEASED,
+ status=UCTRecord.PackageStatus.RELEASED,
reason="released",
- priority=Priority.HIGH,
+ priority=UCTRecord.Priority.HIGH,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries=current_series.name,
- status=PackageStatus.DOES_NOT_EXIST,
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="does not exist",
priority=None,
),
],
- priority=Priority.LOW,
+ priority=UCTRecord.Priority.LOW,
patches=[],
tags=set(),
),
- Package(
+ UCTRecord.Package(
name=dsp2.sourcepackagename.name,
statuses=[
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries=supported_series.name,
- status=PackageStatus.NOT_AFFECTED,
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
reason="not affected",
- priority=Priority.LOW,
+ priority=UCTRecord.Priority.LOW,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries=current_series.name,
- status=PackageStatus.IGNORED,
+ status=UCTRecord.PackageStatus.IGNORED,
reason="ignored",
priority=None,
),
- DistroSeriesPackageStatus(
+ UCTRecord.DistroSeriesPackageStatus(
distroseries="devel",
- status=PackageStatus.NEEDS_TRIAGE,
+ status=UCTRecord.PackageStatus.NEEDS_TRIAGE,
reason="needs triage",
priority=None,
),
@@ -252,6 +450,7 @@ class TestUCTImporter(TestCaseWithFactory):
),
],
)
+ cve = CVE.make_from_uct_record(uct_record)
bug, vulnerabilities = self.importer.create_bug(cve, lp_cve)
self.assertEqual(bug.title, "CVE-2022-23222")
@@ -266,11 +465,11 @@ class TestUCTImporter(TestCaseWithFactory):
self.assertEqual(message.owner, owner)
self.assertEqual(message.text_contents, "description")
- for external_bug_url in cve.bugs:
+ for external_bug_url in uct_record.bugs:
message = messages.pop(0)
self.assertEqual(message.text_contents, external_bug_url)
- for reference in cve.references:
+ for reference in uct_record.references:
message = messages.pop(0)
self.assertEqual(message.text_contents, reference)
@@ -351,9 +550,7 @@ class TestUCTImporter(TestCaseWithFactory):
self.assertEqual(vulnerability.distribution, ubuntu)
self.assertEqual(vulnerability.creator, owner)
self.assertEqual(vulnerability.cve, lp_cve)
- self.assertEqual(
- vulnerability.status, VulnerabilityStatus.NEEDS_TRIAGE
- )
+ self.assertEqual(vulnerability.status, VulnerabilityStatus.ACTIVE)
self.assertEqual(vulnerability.description, "description")
self.assertEqual(vulnerability.notes, "author> text")
self.assertEqual(vulnerability.mitigation, "mitigation")
diff --git a/lib/lp/bugs/scripts/uctimport.py b/lib/lp/bugs/scripts/uctimport.py
index a625c3d..0db7c61 100644
--- a/lib/lp/bugs/scripts/uctimport.py
+++ b/lib/lp/bugs/scripts/uctimport.py
@@ -55,149 +55,349 @@ from lp.services.database.constants import UTC_NOW
from lp.services.messages.interfaces.message import IMessageSet
__all__ = [
- "Priority",
- "PackageStatus",
- "DistroSeriesPackageStatus",
- "Patch",
- "Package",
- "Note",
"CVE",
- "load_cve_from_file",
"UCTImporter",
+ "UCTRecord",
]
+from lp.services.propertycache import cachedproperty
-DEFAULT_LOGGER = logging.getLogger("lp.bugs.scripts.import")
+logger = logging.getLogger("lp.bugs.scripts.import")
-class Priority(Enum):
- CRITICAL = "critical"
- HIGH = "high"
- MEDIUM = "medium"
- LOW = "low"
- UNTRIAGED = "untriaged"
- NEGLIGIBLE = "negligible"
-
+class UCTRecord:
+ """
+ UCTRecord represents a single CVE record in the ubuntu-cve-tracker.
+ """
-class PackageStatus(Enum):
- IGNORED = "ignored"
- NEEDS_TRIAGE = "needs-triage"
- DOES_NOT_EXIST = "DNE"
- RELEASED = "released"
- NOT_AFFECTED = "not-affected"
- DEFERRED = "deferred"
- NEEDED = "needed"
- PENDING = "pending"
+ class Priority(Enum):
+ CRITICAL = "critical"
+ HIGH = "high"
+ MEDIUM = "medium"
+ LOW = "low"
+ UNTRIAGED = "untriaged"
+ NEGLIGIBLE = "negligible"
+
+ class PackageStatus(Enum):
+ IGNORED = "ignored"
+ NEEDS_TRIAGE = "needs-triage"
+ DOES_NOT_EXIST = "DNE"
+ RELEASED = "released"
+ NOT_AFFECTED = "not-affected"
+ DEFERRED = "deferred"
+ NEEDED = "needed"
+ PENDING = "pending"
+
+ DistroSeriesPackageStatus = NamedTuple(
+ "DistroSeriesPackageStatus",
+ (
+ ("distroseries", str),
+ ("status", PackageStatus),
+ ("reason", str),
+ ("priority", Optional[Priority]),
+ ),
+ )
+ Patch = NamedTuple(
+ "Patch",
+ (
+ ("patch_type", str),
+ ("entry", str),
+ ),
+ )
-DistroSeriesPackageStatus = NamedTuple(
- "DistroSeriesPackageStatus",
- [
- ("distroseries", str),
- ("status", PackageStatus),
- ("reason", str),
- ("priority", Optional[Priority]),
- ],
-)
+ Package = NamedTuple(
+ "Package",
+ (
+ ("name", str),
+ ("statuses", List[DistroSeriesPackageStatus]),
+ ("priority", Optional[Priority]),
+ ("tags", Set[str]),
+ ("patches", List[Patch]),
+ ),
+ )
+ Note = NamedTuple(
+ "Note",
+ (
+ ("author", str),
+ ("text", str),
+ ),
+ )
-Patch = NamedTuple(
- "Patch",
- [
- ("patch_type", str),
- ("entry", str),
- ],
-)
+ def __init__(
+ self,
+ path: Path,
+ assigned_to: str,
+ bugs: List[str],
+ cvss: List[Dict[str, Any]],
+ candidate: str,
+ date_made_public: Optional[datetime],
+ description: str,
+ discovered_by: str,
+ mitigation: Optional[str],
+ notes: List[Note],
+ priority: Priority,
+ references: List[str],
+ ubuntu_description: str,
+ packages: List[Package],
+ ):
+ self.path = path
+ self.assigned_to = assigned_to
+ self.bugs = bugs
+ self.cvss = cvss
+ self.candidate = candidate
+ self.date_made_public = date_made_public
+ self.description = description
+ self.discovered_by = discovered_by
+ self.mitigation = mitigation
+ self.notes = notes
+ self.priority = priority
+ self.references = references
+ self.ubuntu_description = ubuntu_description
+ self.packages = packages
+
+ def __eq__(self, other):
+ if not isinstance(other, UCTRecord):
+ raise ValueError("UCTRecord can only be compared to UCTRecord")
+ return self.__dict__ == other.__dict__
+
+ @classmethod
+ def pop_cve_property(
+ cls, cve_data: Dict[str, Any], field_name: str, required=True
+ ) -> Optional[Any]:
+ if required:
+ value = cve_data.pop(field_name)
+ else:
+ value = cve_data.pop(field_name, None)
+ if isinstance(value, str):
+ return value.strip()
+ return value
+
+ @classmethod
+ def load(cls, cve_path: Path) -> "UCTRecord":
+ """
+ Create a `UCTRecord` instance from a file located at `cve_path`.
+
+ The file is parsed to a dictionary using the code copied from
+ `cve_lib` in `ubuntu-cve-tracker`.
+ A `UCTRecord` instance is created from that dictionary,
+ applying some data transformations along the way.
+ """
+
+ cve_data = load_cve(str(cve_path)) # type: Dict[str, Any]
+
+ packages = []
+ tags = cls.pop_cve_property(
+ cve_data, "tags"
+ ) # type: Dict[str, Set[str]]
+ patches = cls.pop_cve_property(
+ cve_data, "patches"
+ ) # type: Dict[str, List[Tuple[str, str]]]
+ for package, statuses_dict in sorted(
+ cls.pop_cve_property(cve_data, "pkgs").items()
+ ):
+ statuses = []
+ for distroseries, (status, reason) in sorted(
+ statuses_dict.items()
+ ):
+ distroseries_priority = cls.pop_cve_property(
+ cve_data,
+ "Priority_{package}_{distroseries}".format(
+ package=package,
+ distroseries=distroseries,
+ ),
+ required=False,
+ )
+ statuses.append(
+ cls.DistroSeriesPackageStatus(
+ distroseries=distroseries,
+ status=cls.PackageStatus(status),
+ reason=reason,
+ priority=(
+ cls.Priority(distroseries_priority)
+ if distroseries_priority
+ else None
+ ),
+ )
+ )
+ package_priority = cls.pop_cve_property(
+ cve_data,
+ "Priority_{package}".format(package=package),
+ required=False,
+ )
+ packages.append(
+ cls.Package(
+ name=package,
+ statuses=statuses,
+ priority=(
+ cls.Priority(package_priority)
+ if package_priority
+ else None
+ ),
+ tags=tags.pop(package, set()),
+ patches=[
+ cls.Patch(patch_type=patch_type, entry=entry)
+ for patch_type, entry in patches.pop(package, [])
+ ],
+ )
+ )
+ crd = cls.pop_cve_property(cve_data, "CRD", required=False)
+ if crd == "unknown":
+ crd = None
+ public_date = cls.pop_cve_property(
+ cve_data, "PublicDate", required=False
+ )
+ if public_date == "unknown":
+ public_date = None
+ public_date_at_USN = cls.pop_cve_property(
+ cve_data, "PublicDateAtUSN", required=False
+ )
+ if public_date_at_USN == "unknown":
+ public_date_at_USN = None
+
+ date_made_public = crd or public_date or public_date_at_USN
+
+ entry = UCTRecord(
+ path=cve_path,
+ assigned_to=cls.pop_cve_property(cve_data, "Assigned-to"),
+ bugs=cls.pop_cve_property(cve_data, "Bugs").split("\n"),
+ cvss=cls.pop_cve_property(cve_data, "CVSS"),
+ candidate=cls.pop_cve_property(cve_data, "Candidate"),
+ date_made_public=(
+ dateutil.parser.parse(date_made_public)
+ if date_made_public
+ else None
+ ),
+ description=cls.pop_cve_property(cve_data, "Description"),
+ discovered_by=cls.pop_cve_property(cve_data, "Discovered-by"),
+ mitigation=cls.pop_cve_property(
+ cve_data, "Mitigation", required=False
+ ),
+ notes=[
+ cls.Note(author=author, text=text)
+ for author, text in cls.pop_cve_property(cve_data, "Notes")
+ ],
+ priority=cls.Priority(cls.pop_cve_property(cve_data, "Priority")),
+ references=cls.pop_cve_property(cve_data, "References").split(
+ "\n"
+ ),
+ ubuntu_description=cls.pop_cve_property(
+ cve_data, "Ubuntu-Description"
+ ),
+ packages=packages,
+ )
-Package = NamedTuple(
- "Package",
- [
- ("name", str),
- ("statuses", List[DistroSeriesPackageStatus]),
- ("priority", Optional[Priority]),
- ("tags", Set[str]),
- ("patches", List[Patch]),
- ],
-)
+ # make sure all fields are consumed
+ if cve_data:
+ raise AssertionError(
+ "not all fields are consumed: {}".format(cve_data)
+ )
-Note = NamedTuple(
- "Note",
- [
- ("author", str),
- ("text", str),
- ],
-)
+ return entry
-CVE = NamedTuple(
- "CVE",
- [
- ("assigned_to", str),
- ("bugs", List[str]),
- ("cvss", List[Dict[str, Any]]),
- ("candidate", str),
- ("date_made_public", Optional[datetime]),
- ("description", str),
- ("discovered_by", str),
- ("mitigation", Optional[str]),
- ("notes", List[Note]),
- ("priority", Priority),
- ("references", List[str]),
- ("ubuntu_description", str),
- ("packages", List[Package]),
- ],
-)
+class CVE:
+ DistroPackage = NamedTuple(
+ "DistroPackage",
+ (
+ ("package", DistributionSourcePackage),
+ ("importance", Optional[BugTaskImportance]),
+ ),
+ )
-class UCTImporter:
+ SeriesPackage = NamedTuple(
+ "SeriesPackage",
+ (
+ ("package", SourcePackage),
+ ("importance", Optional[BugTaskImportance]),
+ ("status", BugTaskStatus),
+ ("status_explanation", str),
+ ),
+ )
PRIORITY_MAP = {
- Priority.CRITICAL: BugTaskImportance.CRITICAL,
- Priority.HIGH: BugTaskImportance.HIGH,
- Priority.MEDIUM: BugTaskImportance.MEDIUM,
- Priority.LOW: BugTaskImportance.LOW,
- Priority.UNTRIAGED: BugTaskImportance.UNDECIDED,
- Priority.NEGLIGIBLE: BugTaskImportance.WISHLIST,
+ UCTRecord.Priority.CRITICAL: BugTaskImportance.CRITICAL,
+ UCTRecord.Priority.HIGH: BugTaskImportance.HIGH,
+ UCTRecord.Priority.MEDIUM: BugTaskImportance.MEDIUM,
+ UCTRecord.Priority.LOW: BugTaskImportance.LOW,
+ UCTRecord.Priority.UNTRIAGED: BugTaskImportance.UNDECIDED,
+ UCTRecord.Priority.NEGLIGIBLE: BugTaskImportance.WISHLIST,
}
- STATUS_MAP = {
- PackageStatus.IGNORED: BugTaskStatus.WONTFIX,
- PackageStatus.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
- PackageStatus.DOES_NOT_EXIST: BugTaskStatus.DOESNOTEXIST,
- PackageStatus.RELEASED: BugTaskStatus.FIXRELEASED,
- PackageStatus.NOT_AFFECTED: BugTaskStatus.INVALID,
+ BUG_TASK_STATUS_MAP = {
+ UCTRecord.PackageStatus.IGNORED: BugTaskStatus.WONTFIX,
+ UCTRecord.PackageStatus.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
+ UCTRecord.PackageStatus.DOES_NOT_EXIST: BugTaskStatus.DOESNOTEXIST,
+ UCTRecord.PackageStatus.RELEASED: BugTaskStatus.FIXRELEASED,
+ UCTRecord.PackageStatus.NOT_AFFECTED: BugTaskStatus.INVALID,
# we don't have a corresponding BugTaskStatus for this yet
# PackageStatus.DEFERRED: ...,
- PackageStatus.NEEDED: BugTaskStatus.NEW,
- PackageStatus.PENDING: BugTaskStatus.FIXCOMMITTED,
+ UCTRecord.PackageStatus.NEEDED: BugTaskStatus.NEW,
+ UCTRecord.PackageStatus.PENDING: BugTaskStatus.FIXCOMMITTED,
}
- def __init__(self, logger: Optional[logging.Logger] = None) -> None:
- self.logger = logger or DEFAULT_LOGGER
-
- def import_cve_from_file(self, cve_path: Path) -> None:
- cve = load_cve_from_file(cve_path)
- self.import_cve(cve)
+ VULNERABILITY_STATUS_MAP = {
+ "active": VulnerabilityStatus.ACTIVE,
+ "ignored": VulnerabilityStatus.IGNORED,
+ "retired": VulnerabilityStatus.RETIRED,
+ }
- def import_cve(self, cve: CVE) -> None:
- if cve.date_made_public is None:
- self.logger.warning(
- "The CVE does not have a publication date, is it embargoed?"
- )
- return
- lp_cve = getUtility(ICveSet)[cve.candidate] # type: CveModel
- if lp_cve is None:
- self.logger.warning(
- "Could not find the CVE in LP: %s", cve.candidate
- )
- return
- self.create_bug(cve, lp_cve)
+ def __init__(
+ self,
+ sequence: str,
+ date_made_public: Optional[datetime],
+ distro_packages: List[DistroPackage],
+ series_packages: List[SeriesPackage],
+ importance: BugTaskImportance,
+ status: VulnerabilityStatus,
+ assigned_to: str,
+ description: str,
+ ubuntu_description: str,
+ bug_urls: List[str],
+ references: List[str],
+ notes: str,
+ mitigation: str,
+ ):
+ self.sequence = sequence
+ self.date_made_public = date_made_public
+ self.distro_packages = distro_packages
+ self.series_packages = series_packages
+ self.importance = importance
+ self.status = status
+ self.assigned_to = assigned_to
+ self.description = description
+ self.ubuntu_description = ubuntu_description
+ self.bug_urls = bug_urls
+ self.references = references
+ self.notes = notes
+ self.mitigation = mitigation
+
+ @cachedproperty
+ def affected_distributions(self) -> Set[Distribution]:
+ return {p.package.distribution for p in self.distro_packages}
+
+ @cachedproperty
+ def affected_distro_series(self) -> Set[DistroSeries]:
+ return {p.package.distroseries for p in self.series_packages}
+
+ @classmethod
+ def infer_vulnerability_status(
+ cls, uct_record: UCTRecord
+ ) -> VulnerabilityStatus:
+ """
+ Infer vulnerability status based on the parent folder of the CVE file.
+ """
+ cve_folder_name = uct_record.path.absolute().parent.name
+ return cls.VULNERABILITY_STATUS_MAP.get(
+ cve_folder_name, VulnerabilityStatus.NEEDS_TRIAGE
+ )
- def create_bug(
- self, cve: CVE, lp_cve: CveModel
- ) -> Tuple[Optional[BugModel], List[Vulnerability]]:
- # Some `CVE` fields are not being used at the moment:
+ @classmethod
+ def make_from_uct_record(cls, uct_record: UCTRecord) -> "CVE":
+ # Some `UCTRecord` fields are not being used at the moment:
# - cve.discovered_by: This is supposed to be `Cve.discoverer` but
# there may be a difficulty there since the `Cve` table should only
# be managed by syncing data from MITRE and not from
@@ -205,117 +405,204 @@ class UCTImporter:
# - cve.cvss: `Cve.cvss`, but may have a similar issue to
# `Cve.discoverer` as above.
- self.logger.debug("creating bug...")
-
- affected_packages = [] # type: List[DistributionSourcePackage]
- affected_distro_series = [] # type: List[DistroSeries]
- affected_distributions = set() # type: Set[Distribution]
- importances = {}
- statuses_with_explanations = {}
+ distro_packages = []
+ series_packages = []
- for cve_package in cve.packages:
- source_package_name = self.get_source_package_name(
- cve_package.name
- )
+ spn_set = getUtility(ISourcePackageNameSet)
- package_priority = cve_package.priority or cve.priority
- importances[source_package_name] = (
- self.PRIORITY_MAP[package_priority]
+ for uct_package in uct_record.packages:
+ source_package_name = spn_set.getOrCreateByName(uct_package.name)
+ package_priority = uct_package.priority or uct_record.priority
+ package_importance = (
+ cls.PRIORITY_MAP[package_priority]
if package_priority
else None
)
- for cve_package_status in cve_package.statuses:
- distro_series = self.get_distro_series(
- cve_package_status.distroseries
+ for uct_package_status in uct_package.statuses:
+ distro_series = cls.get_distro_series(
+ uct_package_status.distroseries
)
if distro_series is None:
continue
- if cve_package_status.status not in self.STATUS_MAP:
- self.logger.warning(
+
+ if uct_package_status.status not in cls.BUG_TASK_STATUS_MAP:
+ logger.warning(
"Can't find a suitable bug task status for %s",
- cve_package_status.status,
+ uct_package_status.status,
)
continue
- if distro_series not in affected_distro_series:
- affected_distro_series.append(distro_series)
-
- affected_distributions.add(distro_series.distribution)
-
- distro_package = DistributionSourcePackage(
- distribution=distro_series.distribution,
- sourcepackagename=source_package_name,
+ distro_package = cls.DistroPackage(
+ package=DistributionSourcePackage(
+ distribution=distro_series.distribution,
+ sourcepackagename=source_package_name,
+ ),
+ importance=package_importance,
)
- if distro_package not in affected_packages:
- affected_packages.append(distro_package)
+ if distro_package not in distro_packages:
+ distro_packages.append(distro_package)
- distro_series_package_priority = (
- cve_package_status.priority or package_priority
- )
- series_package = SourcePackage(
- sourcepackagename=source_package_name,
- distroseries=distro_series,
+ series_package_priority = (
+ uct_package_status.priority or package_priority
)
- importances[series_package] = (
- self.PRIORITY_MAP[distro_series_package_priority]
- if distro_series_package_priority
+ series_package_importance = (
+ cls.PRIORITY_MAP[series_package_priority]
+ if series_package_priority
else None
)
- statuses_with_explanations[series_package] = (
- self.STATUS_MAP[cve_package_status.status],
- cve_package_status.reason,
+
+ series_packages.append(
+ cls.SeriesPackage(
+ package=SourcePackage(
+ sourcepackagename=source_package_name,
+ distroseries=distro_series,
+ ),
+ importance=series_package_importance,
+ status=cls.BUG_TASK_STATUS_MAP[
+ uct_package_status.status
+ ],
+ status_explanation=uct_package_status.reason,
+ )
+ )
+
+ return cls(
+ sequence=uct_record.candidate,
+ date_made_public=uct_record.date_made_public,
+ distro_packages=distro_packages,
+ series_packages=series_packages,
+ importance=cls.PRIORITY_MAP[uct_record.priority],
+ status=cls.infer_vulnerability_status(uct_record),
+ assigned_to=uct_record.assigned_to,
+ description=uct_record.description,
+ ubuntu_description=uct_record.ubuntu_description,
+ bug_urls=uct_record.bugs,
+ references=uct_record.references,
+ notes=cls.format_cve_notes(uct_record.notes),
+ mitigation=uct_record.mitigation,
+ )
+
+ @classmethod
+ def format_cve_notes(cls, notes: List[UCTRecord.Note]) -> str:
+ return "\n".join(
+ "{author}> {text}".format(author=note.author, text=note.text)
+ for note in notes
+ )
+
+ @classmethod
+ def get_devel_series(
+ cls, distribution: Distribution
+ ) -> Optional[DistroSeries]:
+ for series in distribution.series:
+ if series.status == SeriesStatus.FROZEN:
+ return series
+ for series in distribution.series:
+ if series.status == SeriesStatus.DEVELOPMENT:
+ return series
+
+ @classmethod
+ def get_distro_series(
+ cls, distro_series_name: str
+ ) -> Optional[DistroSeries]:
+ if "/" in distro_series_name:
+ series_name, distro_name = distro_series_name.split("/", 1)
+ if distro_name == "esm":
+ # TODO: ESM needs special handling
+ pass
+ return
+ else:
+ series_name = distro_series_name
+ distribution = getUtility(ILaunchpadCelebrities).ubuntu
+ if series_name == "devel":
+ distro_series = cls.get_devel_series(distribution)
+ else:
+ distro_series = getUtility(IDistroSeriesSet).queryByName(
+ distribution, series_name
)
+ if not distro_series:
+ logger.warning(
+ "Could not find the distro series: %s", distro_series_name
+ )
+ return distro_series
+
+
+class UCTImporter:
+ def import_cve_from_file(self, cve_path: Path) -> None:
+ uct_record = UCTRecord.load(cve_path)
+ cve = CVE.make_from_uct_record(uct_record)
+ self.import_cve(cve)
- if not affected_packages:
- self.logger.warning("Could not find any affected packages")
+ def import_cve(self, cve: CVE) -> None:
+ if cve.date_made_public is None:
+ logger.warning(
+ "The CVE does not have a publication date, is it embargoed?"
+ )
+ return
+ lp_cve = getUtility(ICveSet)[cve.sequence] # type: CveModel
+ if lp_cve is None:
+ logger.warning("Could not find the CVE in LP: %s", cve.sequence)
+ return
+ self.create_bug(cve, lp_cve)
+
+ def create_bug(
+ self, cve: CVE, lp_cve: CveModel
+ ) -> Tuple[Optional[BugModel], List[Vulnerability]]:
+
+ logger.debug("creating bug...")
+
+ if not cve.series_packages:
+ logger.warning("Could not find any affected packages")
return None, []
- distro_package = affected_packages.pop(0)
- affected_distributions = {distro_package.distribution}
+ distro_package = cve.distro_packages[0]
# Create the bug
owner = getUtility(ILaunchpadCelebrities).bug_importer
bug = getUtility(IBugSet).createBug(
CreateBugParams(
description=cve.ubuntu_description,
- title=cve.candidate,
+ title=cve.sequence,
information_type=InformationType.PUBLICSECURITY,
owner=owner,
msg=getUtility(IMessageSet).fromText(
"", cve.description, owner=owner
),
- target=distro_package,
- importance=importances[distro_package.sourcepackagename],
+ target=distro_package.package,
+ importance=distro_package.importance,
)
) # type: BugModel
# Add links to external bug trackers
- for external_bug_url in cve.bugs:
+ for external_bug_url in cve.bug_urls:
bug.newMessage(owner=owner, content=external_bug_url)
# Add references
for reference in cve.references:
bug.newMessage(owner=owner, content=reference)
- self.logger.info("Created bug with ID: %s", bug.id)
+ logger.info("Created bug with ID: %s", bug.id)
# Create bug tasks for distribution packages
bug_task_set = getUtility(IBugTaskSet)
- for distro_package in affected_packages:
+ for distro_package in cve.distro_packages[1:]:
bug_task_set.createTask(
bug,
owner,
- distro_package,
- importance=importances[distro_package.sourcepackagename],
+ distro_package.package,
+ importance=distro_package.importance,
)
# Create bug tasks for distro series by adding nominations
# This may create some extra bug tasks which we will delete later
- for distro_series in affected_distro_series:
+ for distro_series in cve.affected_distro_series:
nomination = bug.addNomination(owner, distro_series)
nomination.approve(owner)
+ series_packages_importance_and_status = {
+ p.package: (p.importance, p.status, p.status_explanation)
+ for p in cve.series_packages
+ }
+
# Set importance and status on distro series bug tasks
# If the bug task's package/series isn't listed in the
# CVE entry - delete it
@@ -328,15 +615,17 @@ class UCTImporter:
sourcepackagename=source_package_name,
distroseries=distro_series,
)
- if series_package not in importances:
+ if series_package not in series_packages_importance_and_status:
# This combination of package/series is not present in the CVE
# Delete it
bug_task.delete(owner)
continue
- bug_task.transitionToImportance(importances[series_package], owner)
- status, status_explanation = statuses_with_explanations[
- series_package
- ]
+ (
+ importance,
+ status,
+ status_explanation,
+ ) = series_packages_importance_and_status[series_package]
+ bug_task.transitionToImportance(importance, owner)
bug_task.transitionToStatus(status, owner)
bug_task.status_explanation = status_explanation
@@ -347,7 +636,7 @@ class UCTImporter:
for bug_task in bug.bugtasks:
bug_task.transitionToAssignee(assignee, validate=False)
else:
- self.logger.warning(
+ logger.warning(
"Could not find the assignee: %s", cve.assigned_to
)
@@ -360,12 +649,12 @@ class UCTImporter:
datechanged=UTC_NOW,
person=owner,
whatchanged="bug",
- message="UCT CVE entry {}".format(cve.candidate),
+ message="UCT CVE entry {}".format(cve.sequence),
)
# Create the Vulnerabilities
vulnerabilities = []
- for distribution in affected_distributions:
+ for distribution in cve.affected_distributions:
vulnerabilities.append(
self.create_vulnerability(bug, cve, lp_cve, distribution)
)
@@ -406,7 +695,7 @@ class UCTImporter:
distribution, series_name
)
if not distro_series:
- self.logger.warning(
+ logger.warning(
"Could not find the distro series: %s", distro_series_name
)
return distro_series
@@ -422,145 +711,17 @@ class UCTImporter:
distribution=distribution,
creator=bug.owner,
cve=lp_cve,
- status=VulnerabilityStatus.NEEDS_TRIAGE,
+ status=cve.status,
description=cve.description,
- notes=format_cve_notes(cve.notes),
+ notes=cve.notes,
mitigation=cve.mitigation,
- importance=self.PRIORITY_MAP[cve.priority],
+ importance=cve.importance,
information_type=InformationType.PUBLICSECURITY,
date_made_public=cve.date_made_public,
) # type: Vulnerability
vulnerability.linkBug(bug, bug.owner)
- self.logger.info("Create vulnerability with ID: %s", vulnerability)
+ logger.info("Create vulnerability with ID: %s", vulnerability)
return vulnerability
-
-
-def load_cve_from_file(cve_path: Path) -> CVE:
- """
- Load a `CVE` instance from data contained in `cve_path`.
-
- The file is parsed to a dictionary using the code copied from
- `cve_lib` in `ubuntu-cve-tracker`.
-
- A `CVE` instance is created from that dictionary, applying some data
- transformations along the way.
- """
-
- cve_data = load_cve(str(cve_path)) # type: Dict[str, Any]
-
- packages = [] # type: List[Package]
- tags = pop_cve_property(cve_data, "tags") # type: Dict[str, Set[str]]
- patches = pop_cve_property(
- cve_data, "patches"
- ) # type: Dict[str, List[Tuple[str, str]]]
- for package, statuses_dict in sorted(
- pop_cve_property(cve_data, "pkgs").items()
- ):
- statuses = [] # type: List[DistroSeriesPackageStatus]
- for distroseries, (status, reason) in sorted(statuses_dict.items()):
- distroseries_priority = pop_cve_property(
- cve_data,
- "Priority_{package}_{distroseries}".format(
- package=package,
- distroseries=distroseries,
- ),
- required=False,
- )
- statuses.append(
- DistroSeriesPackageStatus(
- distroseries=distroseries,
- status=PackageStatus(status),
- reason=reason,
- priority=(
- Priority(distroseries_priority)
- if distroseries_priority
- else None
- ),
- )
- )
- package_priority = pop_cve_property(
- cve_data,
- "Priority_{package}".format(package=package),
- required=False,
- )
- packages.append(
- Package(
- name=package,
- statuses=statuses,
- priority=(
- Priority(package_priority) if package_priority else None
- ),
- tags=tags.pop(package, set()),
- patches=[
- Patch(patch_type=patch_type, entry=entry)
- for patch_type, entry in patches.pop(package, [])
- ],
- )
- )
-
- crd = pop_cve_property(cve_data, "CRD", required=False)
- if crd == "unknown":
- crd = None
- public_date = pop_cve_property(cve_data, "PublicDate", required=False)
- if public_date == "unknown":
- public_date = None
- public_date_at_USN = pop_cve_property(
- cve_data, "PublicDateAtUSN", required=False
- )
- if public_date_at_USN == "unknown":
- public_date_at_USN = None
-
- date_made_public = crd or public_date or public_date_at_USN
-
- cve = CVE(
- assigned_to=pop_cve_property(cve_data, "Assigned-to"),
- bugs=pop_cve_property(cve_data, "Bugs").split("\n"),
- cvss=pop_cve_property(cve_data, "CVSS"),
- candidate=pop_cve_property(cve_data, "Candidate"),
- date_made_public=(
- dateutil.parser.parse(date_made_public)
- if date_made_public
- else None
- ),
- description=pop_cve_property(cve_data, "Description"),
- discovered_by=pop_cve_property(cve_data, "Discovered-by"),
- mitigation=pop_cve_property(cve_data, "Mitigation", required=False),
- notes=[
- Note(author=author, text=text)
- for author, text in pop_cve_property(cve_data, "Notes")
- ],
- priority=Priority(pop_cve_property(cve_data, "Priority")),
- references=pop_cve_property(cve_data, "References").split("\n"),
- ubuntu_description=pop_cve_property(cve_data, "Ubuntu-Description"),
- packages=packages,
- )
-
- # make sure all fields are consumed
- if cve_data:
- raise AssertionError(
- "not all fields are consumed: {}".format(cve_data)
- )
-
- return cve
-
-
-def pop_cve_property(
- cve_data: Dict[str, Any], field_name: str, required=True
-) -> Optional[Any]:
- if required:
- value = cve_data.pop(field_name)
- else:
- value = cve_data.pop(field_name, None)
- if isinstance(value, str):
- return value.strip()
- return value
-
-
-def format_cve_notes(notes: List[Note]) -> str:
- return "\n".join(
- "{author}> {text}".format(author=note.author, text=note.text)
- for note in notes
- )
Follow ups