launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29036
[Merge] ~andrey-fedoseev/launchpad:more-vulnerability-dates into launchpad:master
Andrey Fedoseev has proposed merging ~andrey-fedoseev/launchpad:more-vulnerability-dates into launchpad:master.
Commit message:
Add `Vulnerability.date_notice_issued` and `date_coordinated_release` fields
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~andrey-fedoseev/launchpad/+git/launchpad/+merge/428705
Update the UCT import/export scripts accordingly
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~andrey-fedoseev/launchpad:more-vulnerability-dates into launchpad:master.
diff --git a/lib/contrib/cve_lib.py b/lib/contrib/cve_lib.py
index 6672864..da211fd 100644
--- a/lib/contrib/cve_lib.py
+++ b/lib/contrib/cve_lib.py
@@ -11,6 +11,7 @@ import math
import os
import re
import sys
+from collections import OrderedDict
import yaml
@@ -1195,17 +1196,17 @@ def load_cve(cve, strict=False, srcmap=None):
]
extra_fields = ["CRD", "PublicDateAtUSN", "Mitigation"]
- data = dict()
+ data = OrderedDict()
# maps entries in data to their source line - if didn't supply one
# create a local one to simplify the code
if srcmap is None:
- srcmap = dict()
- srcmap.setdefault("pkgs", dict())
- srcmap.setdefault("tags", dict())
- data.setdefault("tags", dict())
- srcmap.setdefault("patches", dict())
- data.setdefault("patches", dict())
- affected = dict()
+ srcmap = OrderedDict()
+ srcmap.setdefault("pkgs", OrderedDict())
+ srcmap.setdefault("tags", OrderedDict())
+ data.setdefault("tags", OrderedDict())
+ srcmap.setdefault("patches", OrderedDict())
+ data.setdefault("patches", OrderedDict())
+ affected = OrderedDict()
lastfield = ""
fields_seen = []
if not os.path.exists(cve):
@@ -1254,7 +1255,7 @@ def load_cve(cve, strict=False, srcmap=None):
code = EXIT_FAIL
elif lastfield == "CVSS":
try:
- cvss = dict()
+ cvss = OrderedDict()
result = re.search(
r" (.+)\: (\S+)( \[(.*) (.*)\])?", line
)
@@ -1277,7 +1278,7 @@ def load_cve(cve, strict=False, srcmap=None):
# line where the CVSS block starts - so convert it
# to a dict first if needed
if type(srcmap["CVSS"]) is tuple:
- srcmap["CVSS"] = dict()
+ srcmap["CVSS"] = OrderedDict()
srcmap["CVSS"].setdefault(
cvss["source"], (cve, linenum)
)
@@ -1419,7 +1420,7 @@ def load_cve(cve, strict=False, srcmap=None):
msg += "%s: %d: unknown entry '%s'\n" % (cve, linenum, rel)
code = EXIT_FAIL
continue
- affected.setdefault(pkg, dict())
+ affected.setdefault(pkg, OrderedDict())
if rel in affected[pkg]:
msg += (
"%s: %d: duplicate entry for '%s': original at line %d\n"
@@ -1433,7 +1434,7 @@ def load_cve(cve, strict=False, srcmap=None):
code = EXIT_FAIL
continue
affected[pkg].setdefault(rel, [state, details])
- srcmap["pkgs"].setdefault(pkg, dict())
+ srcmap["pkgs"].setdefault(pkg, OrderedDict())
srcmap["pkgs"][pkg].setdefault(rel, (cve, linenum))
elif field not in required_fields + extra_fields:
msg += "%s: %d: unknown field '%s'\n" % (cve, linenum, field)
@@ -1539,9 +1540,9 @@ def amend_external_subproject_pkg(cve, data, srcmap, amendments, code, msg):
if not success:
return code, msg
- data.setdefault("pkgs", dict())
- data["pkgs"].setdefault(pkg, dict())
- srcmap["pkgs"].setdefault(pkg, dict())
+ data.setdefault("pkgs", OrderedDict())
+ data["pkgs"].setdefault(pkg, OrderedDict())
+ srcmap["pkgs"].setdefault(pkg, OrderedDict())
# override existing release info if it exists
data["pkgs"][pkg][release] = [state, details]
srcmap["pkgs"][pkg][release] = (cve, linenum)
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index 6fe9abc..4b935d5 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -264,6 +264,27 @@ class IVulnerabilityEditableAttributes(Interface):
as_of="devel",
)
+ date_notice_issued = exported(
+ Datetime(
+ title=_(
+ "Date when a security notice was issued for this "
+ "vulnerability."
+ ),
+ required=False,
+ readonly=False,
+ ),
+ as_of="devel",
+ )
+
+ date_coordinated_release = exported(
+ Datetime(
+ title=_("Coordinated Release Date."),
+ required=False,
+ readonly=False,
+ ),
+ as_of="devel",
+ )
+
class IVulnerabilityEdit(Interface):
"""`IVulnerability` attributes that require launchpad.Edit."""
@@ -297,6 +318,8 @@ class IVulnerabilitySet(Interface):
mitigation=None,
importance_explanation=None,
date_made_public=None,
+ date_notice_issued=None,
+ date_coordinated_release=None,
):
"""Return a new vulnerability.
@@ -312,6 +335,9 @@ class IVulnerabilitySet(Interface):
:param importance_explanation: Used to explain why our importance
differs from somebody else's CVSS score.
:param date_made_public: The date this vulnerability was made public.
+ :param date_coordinated_release: Date when a security notice was issued
+ for this vulnerability.
+ :param date_notice_issued: Coordinated Release Date.
"""
def findByIds(vulnerability_ids, visible_by_user=None):
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index 63b6e78..f564cd3 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -98,6 +98,12 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
date_made_public = DateTime(
name="date_made_public", tzinfo=pytz.UTC, allow_none=True
)
+ date_notice_issued = DateTime(
+ name="date_notice_issued", tzinfo=pytz.UTC, allow_none=True
+ )
+ date_coordinated_release = DateTime(
+ name="date_coordinated_release", tzinfo=pytz.UTC, allow_none=True
+ )
creator_id = Int(name="creator", allow_none=False)
creator = Reference(creator_id, "Person.id")
@@ -115,6 +121,8 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
mitigation=None,
importance_explanation=None,
date_made_public=None,
+ date_notice_issued=None,
+ date_coordinated_release=None,
):
super().__init__()
self.distribution = distribution
@@ -132,6 +140,8 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
self.mitigation = mitigation
self.importance_explanation = importance_explanation
self.date_made_public = date_made_public
+ self.date_notice_issued = date_notice_issued
+ self.date_coordinated_release = date_coordinated_release
self.date_created = UTC_NOW
@property
@@ -311,6 +321,8 @@ class VulnerabilitySet:
mitigation=None,
importance_explanation=None,
date_made_public=None,
+ date_notice_issued=None,
+ date_coordinated_release=None,
):
"""See `IVulnerabilitySet`."""
store = IStore(Vulnerability)
@@ -326,6 +338,8 @@ class VulnerabilitySet:
information_type=information_type,
importance_explanation=importance_explanation,
date_made_public=date_made_public,
+ date_notice_issued=date_notice_issued,
+ date_coordinated_release=date_coordinated_release,
)
store.add(vulnerability)
vulnerability._reconcileAccess()
diff --git a/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222 b/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
index 6d665be..30574d6 100644
--- a/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
+++ b/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
@@ -14,7 +14,7 @@ Ubuntu-Description:
execute arbitrary code.
Notes:
sbeattie> Ubuntu 21.10 / 5.13+ kernels disable unprivileged BPF by default.
- sbeattie> kernels 5.8 and older are not affected, priority high is for
+ kernels 5.8 and older are not affected, priority high is for
5.10 and 5.11 based kernels only
Mitigation:
seth-arnold> set kernel.unprivileged_bpf_disabled to 1
diff --git a/lib/lp/bugs/scripts/tests/test_uctimport.py b/lib/lp/bugs/scripts/tests/test_uct.py
similarity index 63%
rename from lib/lp/bugs/scripts/tests/test_uctimport.py
rename to lib/lp/bugs/scripts/tests/test_uct.py
index d9aaa4b..b0348f5 100644
--- a/lib/lp/bugs/scripts/tests/test_uctimport.py
+++ b/lib/lp/bugs/scripts/tests/test_uct.py
@@ -13,8 +13,11 @@ from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
from lp.bugs.model.bug import Bug
from lp.bugs.model.bugtask import BugTask
-from lp.bugs.scripts.uctimport import (
+from lp.bugs.model.cve import Cve as CveModel
+from lp.bugs.scripts.uct import (
CVE,
+ CVSS,
+ UCTExporter,
UCTImporter,
UCTImportError,
UCTRecord,
@@ -27,12 +30,15 @@ from lp.testing.layers import ZopelessDatabaseLayer
class TestUCTRecord(TestCase):
- def test_load(self):
- cve_path = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
- uct_record = UCTRecord.load(cve_path)
- self.assertEqual(
+
+ maxDiff = None
+
+ def test_load_save(self):
+ load_from = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
+ uct_record = UCTRecord.load(load_from)
+ self.assertDictEqual(
UCTRecord(
- path=cve_path,
+ parent_dir="sampledata",
assigned_to="",
bugs=[
"https://github.com/mm2/Little-CMS/issues/29",
@@ -40,17 +46,20 @@ class TestUCTRecord(TestCase):
"https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=745471",
],
cvss=[
- {
- "source": "nvd",
- "vector": (
- "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H"
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
),
- "baseScore": "7.8",
- "baseSeverity": "HIGH",
- }
+ ),
],
candidate="CVE-2022-23222",
- date_made_public=datetime.datetime(
+ crd=None,
+ public_date_at_USN=datetime.datetime(
+ 2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ public_date=datetime.datetime(
2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
),
description=(
@@ -63,17 +72,12 @@ class TestUCTRecord(TestCase):
mitigation=(
"seth-arnold> set kernel.unprivileged_bpf_disabled to 1"
),
- notes=[
- UCTRecord.Note(
- author="sbeattie",
- text=(
- "Ubuntu 21.10 / 5.13+ kernels disable "
- "unprivileged BPF by default.\nkernels 5.8 and "
- "older are not affected, priority high is "
- "for\n5.10 and 5.11 based kernels only"
- ),
- ),
- ],
+ notes=(
+ "sbeattie> Ubuntu 21.10 / 5.13+ kernels disable "
+ "unprivileged BPF by default.\n kernels 5.8 and "
+ "older are not affected, priority high is "
+ "for\n 5.10 and 5.11 based kernels only"
+ ),
priority=UCTRecord.Priority.CRITICAL,
references=["https://ubuntu.com/security/notices/USN-5368-1"],
ubuntu_description=(
@@ -87,23 +91,23 @@ class TestUCTRecord(TestCase):
UCTRecord.Package(
name="linux",
statuses=[
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="devel",
- status=UCTRecord.PackageStatus.NOT_AFFECTED,
- reason="5.15.0-25.25",
- priority=UCTRecord.Priority.MEDIUM,
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="5.17~rc1",
+ priority=None,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="impish",
+ UCTRecord.SeriesPackageStatus(
+ series="impish",
status=UCTRecord.PackageStatus.RELEASED,
reason="5.13.0-37.42",
priority=UCTRecord.Priority.MEDIUM,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="upstream",
- status=UCTRecord.PackageStatus.RELEASED,
- reason="5.17~rc1",
- priority=None,
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.NOT_AFFECTED,
+ reason="5.15.0-25.25",
+ priority=UCTRecord.Priority.MEDIUM,
),
],
priority=None,
@@ -122,22 +126,22 @@ class TestUCTRecord(TestCase):
UCTRecord.Package(
name="linux-hwe",
statuses=[
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="devel",
- status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
- reason="",
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="5.17~rc1",
priority=None,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="impish",
+ UCTRecord.SeriesPackageStatus(
+ series="impish",
status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="",
priority=None,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="upstream",
- status=UCTRecord.PackageStatus.RELEASED,
- reason="5.17~rc1",
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
+ reason="",
priority=None,
),
],
@@ -146,49 +150,79 @@ class TestUCTRecord(TestCase):
patches=[],
),
],
- ),
- uct_record,
+ ).__dict__,
+ uct_record.__dict__,
)
+ output_dir = Path(self.makeTemporaryDirectory())
+ saved_to_path = uct_record.save(output_dir)
+ self.assertEqual(
+ output_dir / "sampledata" / "CVE-2022-23222", saved_to_path
+ )
+ self.assertEqual(load_from.read_text(), saved_to_path.read_text())
-class TextCVE(TestCaseWithFactory):
+
+class TestCVE(TestCaseWithFactory):
layer = ZopelessDatabaseLayer
+ maxDiff = None
- def test_make_from_uct_record(self):
+ def setUp(self, *args, **kwargs):
+ super().setUp(*args, **kwargs)
celebrities = getUtility(ILaunchpadCelebrities)
ubuntu = celebrities.ubuntu
supported_series = self.factory.makeDistroSeries(
- distribution=ubuntu, status=SeriesStatus.SUPPORTED
+ distribution=ubuntu,
+ status=SeriesStatus.SUPPORTED,
+ name="focal",
)
current_series = self.factory.makeDistroSeries(
- distribution=ubuntu, status=SeriesStatus.CURRENT
+ distribution=ubuntu,
+ status=SeriesStatus.CURRENT,
+ name="jammy",
)
devel_series = self.factory.makeDistroSeries(
- distribution=ubuntu, status=SeriesStatus.DEVELOPMENT
+ distribution=ubuntu,
+ status=SeriesStatus.DEVELOPMENT,
+ name="kinetic",
+ )
+ product_1 = self.factory.makeProduct()
+ product_2 = self.factory.makeProduct()
+ dsp1 = self.factory.makeDistributionSourcePackage(
+ sourcepackagename=product_1.name, distribution=ubuntu
+ )
+ dsp2 = self.factory.makeDistributionSourcePackage(
+ sourcepackagename=product_2.name, distribution=ubuntu
)
- dsp1 = self.factory.makeDistributionSourcePackage(distribution=ubuntu)
- dsp2 = self.factory.makeDistributionSourcePackage(distribution=ubuntu)
assignee = self.factory.makePerson()
- uct_record = UCTRecord(
- path=Path("./active/CVE-2022-23222"),
+ self.uct_record = UCTRecord(
+ parent_dir="active",
assigned_to=assignee.name,
bugs=["https://github.com/mm2/Little-CMS/issues/29"],
- cvss=[],
+ cvss=[
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
+ ),
+ ),
+ ],
candidate="CVE-2022-23222",
- date_made_public=datetime.datetime(
+ crd=datetime.datetime(
+ 2020, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ public_date_at_USN=datetime.datetime(
+ 2021, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ public_date=datetime.datetime(
2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
),
description="description",
discovered_by="tr3e wang",
mitigation="mitigation",
- notes=[
- UCTRecord.Note(
- author="author",
- text="text",
- ),
- ],
+ notes="author> text",
priority=UCTRecord.Priority.CRITICAL,
references=["https://ubuntu.com/security/notices/USN-5368-1"],
ubuntu_description="ubuntu-description",
@@ -196,55 +230,58 @@ class TextCVE(TestCaseWithFactory):
UCTRecord.Package(
name=dsp1.sourcepackagename.name,
statuses=[
- UCTRecord.DistroSeriesPackageStatus(
- distroseries=supported_series.name,
+ UCTRecord.SeriesPackageStatus(
+ series=supported_series.name,
status=UCTRecord.PackageStatus.NOT_AFFECTED,
reason="reason 1",
priority=UCTRecord.Priority.MEDIUM,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries=current_series.name,
+ UCTRecord.SeriesPackageStatus(
+ series=current_series.name,
status=UCTRecord.PackageStatus.RELEASED,
reason="reason 2",
priority=UCTRecord.Priority.MEDIUM,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="devel",
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
status=UCTRecord.PackageStatus.RELEASED,
reason="reason 3",
priority=None,
),
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="reason 4",
+ priority=None,
+ ),
],
priority=None,
- tags={"not-ue"},
- patches=[
- UCTRecord.Patch(
- patch_type="break-fix",
- entry=(
- "457f44363a8894135c85b7a9afd2bd8196db24ab "
- "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
- "local-CVE-2022-23222-fix"
- ),
- )
- ],
+ tags=set(),
+ patches=[],
),
UCTRecord.Package(
name=dsp2.sourcepackagename.name,
statuses=[
- UCTRecord.DistroSeriesPackageStatus(
- distroseries=supported_series.name,
+ UCTRecord.SeriesPackageStatus(
+ series=supported_series.name,
status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="",
priority=None,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries=current_series.name,
+ UCTRecord.SeriesPackageStatus(
+ series=current_series.name,
status=UCTRecord.PackageStatus.DOES_NOT_EXIST,
reason="",
priority=None,
),
- UCTRecord.DistroSeriesPackageStatus(
- distroseries="devel",
+ UCTRecord.SeriesPackageStatus(
+ series="devel",
+ status=UCTRecord.PackageStatus.RELEASED,
+ reason="",
+ priority=None,
+ ),
+ UCTRecord.SeriesPackageStatus(
+ series="upstream",
status=UCTRecord.PackageStatus.RELEASED,
reason="",
priority=None,
@@ -256,29 +293,29 @@ class TextCVE(TestCaseWithFactory):
),
],
)
- cve = CVE.make_from_uct_record(uct_record)
- self.assertEqual("CVE-2022-23222", cve.sequence)
- self.assertEqual(
- datetime.datetime(
+
+ self.cve = CVE(
+ sequence="CVE-2022-23222",
+ date_made_public=datetime.datetime(
2022, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
),
- cve.date_made_public,
- )
- self.assertEqual(
- [
+ date_notice_issued=datetime.datetime(
+ 2021, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ date_coordinated_release=datetime.datetime(
+ 2020, 1, 14, 8, 15, tzinfo=datetime.timezone.utc
+ ),
+ distro_packages=[
CVE.DistroPackage(
package=dsp1,
- importance=BugTaskImportance.CRITICAL,
+ importance=None,
),
CVE.DistroPackage(
package=dsp2,
importance=BugTaskImportance.HIGH,
),
],
- cve.distro_packages,
- )
- self.assertEqual(
- [
+ series_packages=[
CVE.SeriesPackage(
package=SourcePackage(
sourcepackagename=dsp1.sourcepackagename,
@@ -302,7 +339,7 @@ class TextCVE(TestCaseWithFactory):
sourcepackagename=dsp1.sourcepackagename,
distroseries=devel_series,
),
- importance=BugTaskImportance.CRITICAL,
+ importance=None,
status=BugTaskStatus.FIXRELEASED,
status_explanation="reason 3",
),
@@ -311,7 +348,7 @@ class TextCVE(TestCaseWithFactory):
sourcepackagename=dsp2.sourcepackagename,
distroseries=supported_series,
),
- importance=BugTaskImportance.HIGH,
+ importance=None,
status=BugTaskStatus.DOESNOTEXIST,
status_explanation="",
),
@@ -320,7 +357,7 @@ class TextCVE(TestCaseWithFactory):
sourcepackagename=dsp2.sourcepackagename,
distroseries=current_series,
),
- importance=BugTaskImportance.HIGH,
+ importance=None,
status=BugTaskStatus.DOESNOTEXIST,
status_explanation="",
),
@@ -329,58 +366,97 @@ class TextCVE(TestCaseWithFactory):
sourcepackagename=dsp2.sourcepackagename,
distroseries=devel_series,
),
- importance=BugTaskImportance.HIGH,
+ importance=None,
status=BugTaskStatus.FIXRELEASED,
status_explanation="",
),
],
- cve.series_packages,
- )
- self.assertEqual(BugTaskImportance.CRITICAL, cve.importance)
- self.assertEqual(VulnerabilityStatus.ACTIVE, cve.status)
- self.assertEqual(assignee, cve.assignee)
- self.assertEqual("description", cve.description)
- self.assertEqual("ubuntu-description", cve.ubuntu_description)
- self.assertEqual(
- ["https://github.com/mm2/Little-CMS/issues/29"], cve.bug_urls
- )
- self.assertEqual(
- ["https://ubuntu.com/security/notices/USN-5368-1"], cve.references
+ upstream_packages=[
+ CVE.UpstreamPackage(
+ package=product_1,
+ importance=None,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="reason 4",
+ ),
+ CVE.SeriesPackage(
+ package=product_2,
+ importance=None,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="",
+ ),
+ ],
+ importance=BugTaskImportance.CRITICAL,
+ status=VulnerabilityStatus.ACTIVE,
+ assignee=assignee,
+ discovered_by="tr3e wang",
+ description="description",
+ ubuntu_description="ubuntu-description",
+ bug_urls=["https://github.com/mm2/Little-CMS/issues/29"],
+ references=["https://ubuntu.com/security/notices/USN-5368-1"],
+ notes="author> text",
+ mitigation="mitigation",
+ cvss=[
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
+ ),
+ ),
+ ],
)
- self.assertEqual("author> text", cve.notes)
- self.assertEqual("mitigation", cve.mitigation)
+
+ def test_make_from_uct_record(self):
+ cve = CVE.make_from_uct_record(self.uct_record)
+ self.assertDictEqual(self.cve.__dict__, cve.__dict__)
+
+ def test_to_uct_record(self):
+ uct_record = self.cve.to_uct_record()
+ self.assertListEqual(self.uct_record.packages, uct_record.packages)
+ self.assertDictEqual(self.uct_record.__dict__, uct_record.__dict__)
-class TestUCTImporter(TestCaseWithFactory):
+class TestUCTImporterExporter(TestCaseWithFactory):
+ maxDiff = None
layer = ZopelessDatabaseLayer
def setUp(self, *args, **kwargs):
super().setUp(*args, **kwargs)
celebrities = getUtility(ILaunchpadCelebrities)
self.ubuntu = celebrities.ubuntu
- self.esm = self.factory.makeDistribution("esm")
+ self.esm = self.factory.makeDistribution("ubuntu-esm")
self.bug_importer = celebrities.bug_importer
self.ubuntu_supported_series = self.factory.makeDistroSeries(
- distribution=self.ubuntu, status=SeriesStatus.SUPPORTED
+ distribution=self.ubuntu,
+ status=SeriesStatus.SUPPORTED,
+ name="focal",
)
self.ubuntu_current_series = self.factory.makeDistroSeries(
- distribution=self.ubuntu, status=SeriesStatus.CURRENT
+ distribution=self.ubuntu, status=SeriesStatus.CURRENT, name="jammy"
)
self.ubuntu_devel_series = self.factory.makeDistroSeries(
- distribution=self.ubuntu, status=SeriesStatus.DEVELOPMENT
+ distribution=self.ubuntu,
+ status=SeriesStatus.DEVELOPMENT,
+ name="kinetic",
)
self.esm_supported_series = self.factory.makeDistroSeries(
- distribution=self.esm, status=SeriesStatus.SUPPORTED
+ distribution=self.esm,
+ status=SeriesStatus.SUPPORTED,
+ name="precise",
)
self.esm_current_series = self.factory.makeDistroSeries(
- distribution=self.esm, status=SeriesStatus.CURRENT
+ distribution=self.esm,
+ status=SeriesStatus.CURRENT,
+ name="trusty",
)
+ self.product_1 = self.factory.makeProduct()
+ self.product_2 = self.factory.makeProduct()
self.ubuntu_package = self.factory.makeDistributionSourcePackage(
- distribution=self.ubuntu
+ sourcepackagename=self.product_1.name, distribution=self.ubuntu
)
self.esm_package = self.factory.makeDistributionSourcePackage(
- distribution=self.esm
+ sourcepackagename=self.product_2.name, distribution=self.esm
)
for series in (
self.ubuntu_supported_series,
@@ -404,10 +480,14 @@ class TestUCTImporter(TestCaseWithFactory):
)
self.lp_cve = self.factory.makeCVE("2022-23222")
- self.now = datetime.datetime.now(datetime.timezone.utc)
+ self.now = datetime.datetime.now(datetime.timezone.utc).replace(
+ microsecond=0
+ )
self.cve = CVE(
sequence="CVE-2022-23222",
date_made_public=self.now,
+ date_notice_issued=self.now,
+ date_coordinated_release=self.now,
distro_packages=[
CVE.DistroPackage(
package=self.ubuntu_package,
@@ -415,7 +495,7 @@ class TestUCTImporter(TestCaseWithFactory):
),
CVE.DistroPackage(
package=self.esm_package,
- importance=BugTaskImportance.MEDIUM,
+ importance=None,
),
],
series_packages=[
@@ -433,7 +513,7 @@ class TestUCTImporter(TestCaseWithFactory):
sourcepackagename=self.ubuntu_package.sourcepackagename, # noqa: E501
distroseries=self.ubuntu_current_series,
),
- importance=BugTaskImportance.LOW,
+ importance=None,
status=BugTaskStatus.DOESNOTEXIST,
status_explanation="does not exist",
),
@@ -442,7 +522,7 @@ class TestUCTImporter(TestCaseWithFactory):
sourcepackagename=self.ubuntu_package.sourcepackagename, # noqa: E501
distroseries=self.ubuntu_devel_series,
),
- importance=BugTaskImportance.LOW,
+ importance=None,
status=BugTaskStatus.INVALID,
status_explanation="not affected",
),
@@ -451,7 +531,7 @@ class TestUCTImporter(TestCaseWithFactory):
sourcepackagename=self.esm_package.sourcepackagename,
distroseries=self.esm_supported_series,
),
- importance=BugTaskImportance.MEDIUM,
+ importance=None,
status=BugTaskStatus.WONTFIX,
status_explanation="ignored",
),
@@ -460,22 +540,47 @@ class TestUCTImporter(TestCaseWithFactory):
sourcepackagename=self.esm_package.sourcepackagename,
distroseries=self.esm_current_series,
),
- importance=BugTaskImportance.MEDIUM,
+ importance=None,
status=BugTaskStatus.UNKNOWN,
status_explanation="needs triage",
),
],
+ upstream_packages=[
+ CVE.UpstreamPackage(
+ package=self.product_1,
+ importance=BugTaskImportance.HIGH,
+ status=BugTaskStatus.FIXRELEASED,
+ status_explanation="fix released",
+ ),
+ CVE.UpstreamPackage(
+ package=self.product_2,
+ importance=BugTaskImportance.LOW,
+ status=BugTaskStatus.WONTFIX,
+ status_explanation="ignored",
+ ),
+ ],
importance=BugTaskImportance.MEDIUM,
status=VulnerabilityStatus.ACTIVE,
assignee=self.factory.makePerson(),
+ discovered_by="",
description="description",
ubuntu_description="ubuntu-description",
bug_urls=["https://github.com/mm2/Little-CMS/issues/29"],
references=["https://ubuntu.com/security/notices/USN-5368-1"],
notes="author> text",
mitigation="mitigation",
+ cvss=[
+ CVSS(
+ authority="nvd",
+ vector_string=(
+ "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H "
+ "[7.8 HIGH]"
+ ),
+ ),
+ ],
)
self.importer = UCTImporter()
+ self.exporter = UCTExporter()
def checkBug(self, bug: Bug, cve: CVE):
self.assertEqual(cve.sequence, bug.title)
@@ -483,10 +588,6 @@ class TestUCTImporter(TestCaseWithFactory):
self.assertEqual(InformationType.PUBLICSECURITY, bug.information_type)
expected_description = cve.description
- if cve.ubuntu_description:
- expected_description = "{}\n\nUbuntu-Description:\n{}".format(
- expected_description, cve.ubuntu_description
- )
if cve.references:
expected_description = "{}\n\nReferences:\n{}".format(
expected_description, "\n".join(cve.references)
@@ -501,19 +602,28 @@ class TestUCTImporter(TestCaseWithFactory):
bug_tasks = bug.bugtasks # type: List[BugTask]
self.assertEqual(
- len(cve.distro_packages) + len(cve.series_packages), len(bug_tasks)
+ len(cve.distro_packages)
+ + len(cve.series_packages)
+ + len(cve.upstream_packages),
+ len(bug_tasks),
)
bug_tasks_by_target = {t.target: t for t in bug_tasks}
+ package_importances = {}
+
for distro_package in cve.distro_packages:
self.assertIn(distro_package.package, bug_tasks_by_target)
t = bug_tasks_by_target[distro_package.package]
+ package_importance = distro_package.importance or cve.importance
+ package_importances[
+ distro_package.package.sourcepackagename.name
+ ] = package_importance
conjoined_primary = t.conjoined_primary
if conjoined_primary:
expected_importance = conjoined_primary.importance
expected_status = conjoined_primary.status
else:
- expected_importance = distro_package.importance
+ expected_importance = package_importance
expected_status = BugTaskStatus.NEW
self.assertEqual(expected_importance, t.importance)
self.assertEqual(expected_status, t.status)
@@ -522,12 +632,29 @@ class TestUCTImporter(TestCaseWithFactory):
for series_package in cve.series_packages:
self.assertIn(series_package.package, bug_tasks_by_target)
t = bug_tasks_by_target[series_package.package]
- self.assertEqual(series_package.importance, t.importance)
+ package_importance = package_importances[
+ series_package.package.sourcepackagename.name
+ ]
+ sp_importance = series_package.importance or package_importance
+ self.assertEqual(sp_importance, t.importance)
self.assertEqual(series_package.status, t.status)
self.assertEqual(
series_package.status_explanation, t.status_explanation
)
+ for upstream_package in cve.upstream_packages:
+ self.assertIn(upstream_package.package, bug_tasks_by_target)
+ t = bug_tasks_by_target[upstream_package.package]
+ package_importance = package_importances[
+ upstream_package.package.name
+ ]
+ sp_importance = upstream_package.importance or package_importance
+ self.assertEqual(sp_importance, t.importance)
+ self.assertEqual(upstream_package.status, t.status)
+ self.assertEqual(
+ upstream_package.status_explanation, t.status_explanation
+ )
+
for t in bug_tasks:
self.assertEqual(cve.assignee, t.assignee)
@@ -546,7 +673,7 @@ class TestUCTImporter(TestCaseWithFactory):
self.assertEqual(self.bug_importer, vulnerability.creator)
self.assertEqual(self.lp_cve, vulnerability.cve)
self.assertEqual(cve.status, vulnerability.status)
- self.assertEqual(cve.description, vulnerability.description)
+ self.assertEqual(cve.ubuntu_description, vulnerability.description)
self.assertEqual(cve.notes, vulnerability.notes)
self.assertEqual(cve.mitigation, vulnerability.mitigation)
self.assertEqual(cve.importance, vulnerability.importance)
@@ -556,8 +683,49 @@ class TestUCTImporter(TestCaseWithFactory):
self.assertEqual(
cve.date_made_public, vulnerability.date_made_public
)
+ self.assertEqual(
+ cve.date_notice_issued, vulnerability.date_notice_issued
+ )
+ self.assertEqual(
+ cve.date_coordinated_release,
+ vulnerability.date_coordinated_release,
+ )
self.assertEqual([bug], vulnerability.bugs)
+ def checkLaunchpadCve(self, lp_cve: CveModel, cve: CVE):
+ self.assertDictEqual(
+ {cvss.authority: cvss.vector_string for cvss in cve.cvss},
+ lp_cve.cvss,
+ )
+
+ def checkCVE(self, expected: CVE, actual: CVE):
+ self.assertEqual(expected.sequence, actual.sequence)
+ self.assertEqual(expected.date_made_public, actual.date_made_public)
+ self.assertEqual(
+ expected.date_notice_issued, actual.date_notice_issued
+ )
+ self.assertEqual(
+ expected.date_coordinated_release, actual.date_coordinated_release
+ )
+ self.assertListEqual(expected.distro_packages, actual.distro_packages)
+ self.assertListEqual(expected.series_packages, actual.series_packages)
+ self.assertListEqual(
+ expected.upstream_packages, actual.upstream_packages
+ )
+ self.assertEqual(expected.importance, actual.importance)
+ self.assertEqual(expected.status, actual.status)
+ self.assertEqual(expected.assignee, actual.assignee)
+ self.assertEqual(expected.discovered_by, actual.discovered_by)
+ self.assertEqual(expected.description, actual.description)
+ self.assertEqual(
+ expected.ubuntu_description, actual.ubuntu_description
+ )
+ self.assertListEqual(expected.bug_urls, actual.bug_urls)
+ self.assertListEqual(expected.references, actual.references)
+ self.assertEqual(expected.notes, actual.notes)
+ self.assertEqual(expected.mitigation, actual.mitigation)
+ self.assertListEqual(expected.cvss, actual.cvss)
+
def test_create_bug(self):
bug = self.importer.create_bug(self.cve, self.lp_cve)
@@ -578,11 +746,11 @@ class TestUCTImporter(TestCaseWithFactory):
def test_find_existing_bug(self):
self.assertIsNone(
- self.importer.find_existing_bug(self.cve, self.lp_cve)
+ self.importer._find_existing_bug(self.cve, self.lp_cve)
)
bug = self.importer.create_bug(self.cve, self.lp_cve)
self.assertEqual(
- self.importer.find_existing_bug(self.cve, self.lp_cve), bug
+ self.importer._find_existing_bug(self.cve, self.lp_cve), bug
)
def test_find_existing_bug_multiple_bugs(self):
@@ -594,7 +762,7 @@ class TestUCTImporter(TestCaseWithFactory):
vulnerability.linkBug(another_bug)
self.assertRaises(
UCTImportError,
- self.importer.find_existing_bug,
+ self.importer._find_existing_bug,
self.cve,
self.lp_cve,
)
@@ -802,19 +970,48 @@ class TestUCTImporter(TestCaseWithFactory):
def test_import_cve(self):
self.importer.import_cve(self.cve)
self.assertIsNotNone(
- self.importer.find_existing_bug(self.cve, self.lp_cve)
+ self.importer._find_existing_bug(self.cve, self.lp_cve)
)
+ self.checkLaunchpadCve(self.lp_cve, self.cve)
def test_import_cve_dry_run(self):
importer = UCTImporter(dry_run=True)
importer.import_cve(self.cve)
- self.assertIsNone(importer.find_existing_bug(self.cve, self.lp_cve))
+ self.assertIsNone(importer._find_existing_bug(self.cve, self.lp_cve))
- def test_naive_date_made_public(self):
+ def test_naive_dates(self):
cve = self.cve
cve.date_made_public = cve.date_made_public.replace(tzinfo=None)
- bug = self.importer.create_bug(cve, self.lp_cve)
- self.assertEqual(
- UTC,
- bug.vulnerabilities[0].date_made_public.tzinfo,
+ cve.date_notice_issued = cve.date_notice_issued.replace(tzinfo=None)
+ cve.date_coordinated_release = cve.date_coordinated_release.replace(
+ tzinfo=None
)
+ bug = self.importer.create_bug(cve, self.lp_cve)
+ for date in (
+ bug.vulnerabilities[0].date_made_public,
+ bug.vulnerabilities[0].date_notice_issued,
+ bug.vulnerabilities[0].date_coordinated_release,
+ ):
+ self.assertEqual(UTC, date.tzinfo)
+ self.importer.update_bug(bug, cve, self.lp_cve)
+ for date in (
+ bug.vulnerabilities[0].date_made_public,
+ bug.vulnerabilities[0].date_notice_issued,
+ bug.vulnerabilities[0].date_coordinated_release,
+ ):
+ self.assertEqual(UTC, date.tzinfo)
+
+ def test_make_cve_from_bug(self):
+ self.importer.import_cve(self.cve)
+ bug = self.importer._find_existing_bug(self.cve, self.lp_cve)
+ cve = self.exporter._make_cve_from_bug(bug)
+ self.checkCVE(self.cve, cve)
+
+ def test_export_bug_to_uct_file(self):
+ self.importer.import_cve(self.cve)
+ bug = self.importer._find_existing_bug(self.cve, self.lp_cve)
+ output_dir = Path(self.makeTemporaryDirectory())
+ cve_path = self.exporter.export_bug_to_uct_file(bug.id, output_dir)
+ uct_record = UCTRecord.load(cve_path)
+ cve = CVE.make_from_uct_record(uct_record)
+ self.checkCVE(self.cve, cve)
diff --git a/lib/lp/bugs/scripts/uct/__init__.py b/lib/lp/bugs/scripts/uct/__init__.py
new file mode 100644
index 0000000..da0635b
--- /dev/null
+++ b/lib/lp/bugs/scripts/uct/__init__.py
@@ -0,0 +1,6 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from .models import CVE, CVSS, UCTRecord
+from .uctexport import UCTExporter
+from .uctimport import UCTImporter, UCTImportError
diff --git a/lib/lp/bugs/scripts/uctimport.py b/lib/lp/bugs/scripts/uct/models.py
similarity index 51%
rename from lib/lp/bugs/scripts/uctimport.py
rename to lib/lp/bugs/scripts/uct/models.py
index 235ea0d..4234fdd 100644
--- a/lib/lp/bugs/scripts/uctimport.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -1,51 +1,24 @@
-# Copyright 2022 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
-"""A UCT (Ubuntu CVE Tracker) bug importer
-
-This code can import CVE summaries stored in UCT repository to bugs in
-Launchpad.
-
-For each entry in UCT we:
-
-1. Create a Bug instance
-2. Create a Vulnerability instance and link it to the bug (multiple
- Vulnerabilities may be created if the CVE entry covers multiple
- distributions)
-3. Create a Bug Task for each package/distro-series in the CVE entry
-4. Update the statuses of Bug Tasks based on the information in the CVE entry
-"""
import logging
-from datetime import datetime, timezone
+from collections import OrderedDict, defaultdict
+from datetime import datetime
from enum import Enum
-from itertools import chain
from pathlib import Path
-from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple
+from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
+from typing.io import TextIO
import dateutil.parser
-import transaction
from contrib.cve_lib import load_cve
from zope.component import getUtility
-from lp.app.enums import InformationType
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.enums import VulnerabilityStatus
-from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
-from lp.bugs.interfaces.bugactivity import IBugActivitySet
-from lp.bugs.interfaces.bugtask import (
- BugTaskImportance,
- BugTaskStatus,
- IBugTaskSet,
-)
-from lp.bugs.interfaces.bugwatch import IBugWatchSet
-from lp.bugs.interfaces.cve import ICveSet
-from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
-from lp.bugs.model.bug import Bug as BugModel
-from lp.bugs.model.bugtask import BugTask
-from lp.bugs.model.cve import Cve as CveModel
-from lp.bugs.model.vulnerability import Vulnerability
+from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
+from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.distroseries import IDistroSeriesSet
from lp.registry.interfaces.person import IPersonSet
+from lp.registry.interfaces.product import IProductSet
from lp.registry.interfaces.series import SeriesStatus
from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
from lp.registry.model.distribution import Distribution
@@ -54,24 +27,34 @@ from lp.registry.model.distributionsourcepackage import (
)
from lp.registry.model.distroseries import DistroSeries
from lp.registry.model.person import Person
+from lp.registry.model.product import Product
from lp.registry.model.sourcepackage import SourcePackage
-from lp.services.database.constants import UTC_NOW
+from lp.registry.model.sourcepackagename import SourcePackageName
+from lp.services.propertycache import cachedproperty
__all__ = [
"CVE",
- "UCTImporter",
+ "CVSS",
"UCTRecord",
- "UCTImportError",
]
-from lp.services.propertycache import cachedproperty
+logger = logging.getLogger(__name__)
-logger = logging.getLogger("lp.bugs.scripts.import")
+
+CVSS = NamedTuple(
+ "CVSS",
+ (
+ ("authority", str),
+ ("vector_string", str),
+ ),
+)
class UCTRecord:
"""
- UCTRecord represents a single CVE record in the ubuntu-cve-tracker.
+ UCTRecord represents a single CVE record (file) in the ubuntu-cve-tracker.
+
+ It contains exactly the same information as a UCT CVE record.
"""
class Priority(Enum):
@@ -92,10 +75,10 @@ class UCTRecord:
NEEDED = "needed"
PENDING = "pending"
- DistroSeriesPackageStatus = NamedTuple(
- "DistroSeriesPackageStatus",
+ SeriesPackageStatus = NamedTuple(
+ "SeriesPackageStatus",
(
- ("distroseries", str),
+ ("series", str),
("status", PackageStatus),
("reason", str),
("priority", Optional[Priority]),
@@ -114,44 +97,40 @@ class UCTRecord:
"Package",
(
("name", str),
- ("statuses", List[DistroSeriesPackageStatus]),
+ ("statuses", List[SeriesPackageStatus]),
("priority", Optional[Priority]),
("tags", Set[str]),
("patches", List[Patch]),
),
)
- Note = NamedTuple(
- "Note",
- (
- ("author", str),
- ("text", str),
- ),
- )
-
def __init__(
self,
- path: Path,
+ parent_dir: str,
assigned_to: str,
bugs: List[str],
- cvss: List[Dict[str, Any]],
+ cvss: List[CVSS],
candidate: str,
- date_made_public: Optional[datetime],
+ crd: Optional[datetime],
+ public_date: Optional[datetime],
+ public_date_at_USN: Optional[datetime],
description: str,
discovered_by: str,
mitigation: Optional[str],
- notes: List[Note],
+ notes: str,
priority: Priority,
references: List[str],
ubuntu_description: str,
packages: List[Package],
):
- self.path = path
+ self.parent_dir = parent_dir
self.assigned_to = assigned_to
self.bugs = bugs
self.cvss = cvss
self.candidate = candidate
- self.date_made_public = date_made_public
+ self.crd = crd
+ self.public_date = public_date
+ self.public_date_at_USN = public_date_at_USN
self.description = description
self.discovered_by = discovered_by
self.mitigation = mitigation
@@ -167,18 +146,6 @@ class UCTRecord:
return self.__dict__ == other.__dict__
@classmethod
- def pop_cve_property(
- cls, cve_data: Dict[str, Any], field_name: str, required=True
- ) -> Optional[Any]:
- if required:
- value = cve_data.pop(field_name)
- else:
- value = cve_data.pop(field_name, None)
- if isinstance(value, str):
- return value.strip()
- return value
-
- @classmethod
def load(cls, cve_path: Path) -> "UCTRecord":
"""
Create a `UCTRecord` instance from a file located at `cve_path`.
@@ -192,40 +159,38 @@ class UCTRecord:
cve_data = load_cve(str(cve_path)) # type: Dict[str, Any]
packages = []
- tags = cls.pop_cve_property(
+ tags = cls._pop_cve_property(
cve_data, "tags"
) # type: Dict[str, Set[str]]
- patches = cls.pop_cve_property(
+ patches = cls._pop_cve_property(
cve_data, "patches"
) # type: Dict[str, List[Tuple[str, str]]]
- for package, statuses_dict in sorted(
- cls.pop_cve_property(cve_data, "pkgs").items()
- ):
+ for package, statuses_dict in cls._pop_cve_property(
+ cve_data, "pkgs"
+ ).items():
statuses = []
- for distroseries, (status, reason) in sorted(
- statuses_dict.items()
- ):
- distroseries_priority = cls.pop_cve_property(
+ for series, (status, reason) in statuses_dict.items():
+ series_priority = cls._pop_cve_property(
cve_data,
- "Priority_{package}_{distroseries}".format(
+ "Priority_{package}_{series}".format(
package=package,
- distroseries=distroseries,
+ series=series,
),
required=False,
)
statuses.append(
- cls.DistroSeriesPackageStatus(
- distroseries=distroseries,
+ cls.SeriesPackageStatus(
+ series=series,
status=cls.PackageStatus(status),
reason=reason,
priority=(
- cls.Priority(distroseries_priority)
- if distroseries_priority
+ cls.Priority(series_priority)
+ if series_priority
else None
),
)
)
- package_priority = cls.pop_cve_property(
+ package_priority = cls._pop_cve_property(
cve_data,
"Priority_{package}".format(package=package),
required=False,
@@ -247,47 +212,59 @@ class UCTRecord:
)
)
- crd = cls.pop_cve_property(cve_data, "CRD", required=False)
+ crd = cls._pop_cve_property(cve_data, "CRD", required=False)
if crd == "unknown":
crd = None
- public_date = cls.pop_cve_property(
+ public_date = cls._pop_cve_property(
cve_data, "PublicDate", required=False
)
if public_date == "unknown":
public_date = None
- public_date_at_USN = cls.pop_cve_property(
+ public_date_at_USN = cls._pop_cve_property(
cve_data, "PublicDateAtUSN", required=False
)
if public_date_at_USN == "unknown":
public_date_at_USN = None
- date_made_public = crd or public_date or public_date_at_USN
+ cvss = []
+ for cvss_dict in cls._pop_cve_property(cve_data, "CVSS"):
+ cvss.append(
+ CVSS(
+ authority=cvss_dict["source"],
+ vector_string="{} [{} {}]".format(
+ cvss_dict["vector"],
+ cvss_dict["baseScore"],
+ cvss_dict["baseSeverity"],
+ ),
+ )
+ )
entry = UCTRecord(
- path=cve_path,
- assigned_to=cls.pop_cve_property(cve_data, "Assigned-to"),
- bugs=cls.pop_cve_property(cve_data, "Bugs").split("\n"),
- cvss=cls.pop_cve_property(cve_data, "CVSS"),
- candidate=cls.pop_cve_property(cve_data, "Candidate"),
- date_made_public=(
- dateutil.parser.parse(date_made_public)
- if date_made_public
+ parent_dir=cve_path.absolute().parent.name,
+ assigned_to=cls._pop_cve_property(cve_data, "Assigned-to"),
+ bugs=cls._pop_cve_property(cve_data, "Bugs").split("\n"),
+ cvss=cvss,
+ candidate=cls._pop_cve_property(cve_data, "Candidate"),
+ crd=dateutil.parser.parse(crd) if crd else None,
+ public_date=(
+ dateutil.parser.parse(public_date) if public_date else None
+ ),
+ public_date_at_USN=(
+ dateutil.parser.parse(public_date_at_USN)
+ if public_date_at_USN
else None
),
- description=cls.pop_cve_property(cve_data, "Description"),
- discovered_by=cls.pop_cve_property(cve_data, "Discovered-by"),
- mitigation=cls.pop_cve_property(
+ description=cls._pop_cve_property(cve_data, "Description"),
+ discovered_by=cls._pop_cve_property(cve_data, "Discovered-by"),
+ mitigation=cls._pop_cve_property(
cve_data, "Mitigation", required=False
),
- notes=[
- cls.Note(author=author, text=text)
- for author, text in cls.pop_cve_property(cve_data, "Notes")
- ],
- priority=cls.Priority(cls.pop_cve_property(cve_data, "Priority")),
- references=cls.pop_cve_property(cve_data, "References").split(
+ notes=cls._format_notes(cls._pop_cve_property(cve_data, "Notes")),
+ priority=cls.Priority(cls._pop_cve_property(cve_data, "Priority")),
+ references=cls._pop_cve_property(cve_data, "References").split(
"\n"
),
- ubuntu_description=cls.pop_cve_property(
+ ubuntu_description=cls._pop_cve_property(
cve_data, "Ubuntu-Description"
),
packages=packages,
@@ -301,8 +278,145 @@ class UCTRecord:
return entry
+ def save(self, output_dir: Path) -> Path:
+ """
+ Save UCTRecord to a file in UCT format.
+ """
+ if not output_dir.is_dir():
+ raise ValueError(
+ "{} does not exist or is not a directory", output_dir
+ )
+ output_path = output_dir / self.parent_dir / self.candidate
+ output_path.parent.mkdir(exist_ok=True)
+ output = open(str(output_path), "w")
+ if self.public_date_at_USN:
+ self._write_field(
+ "PublicDateAtUSN",
+ self._format_datetime(self.public_date_at_USN),
+ output,
+ )
+ self._write_field("Candidate", self.candidate, output)
+ if self.crd:
+ self._write_field("CRD", self._format_datetime(self.crd), output)
+ if self.public_date:
+ self._write_field(
+ "PublicDate", self._format_datetime(self.public_date), output
+ )
+ self._write_field("References", self.references, output)
+ self._write_field("Description", self.description.split("\n"), output)
+ self._write_field(
+ "Ubuntu-Description", self.ubuntu_description.split("\n"), output
+ )
+ self._write_field("Notes", self.notes.split("\n"), output)
+ self._write_field(
+ "Mitigation",
+ self.mitigation.split("\n") if self.mitigation else "",
+ output,
+ )
+ self._write_field("Bugs", self.bugs, output)
+ self._write_field("Priority", self.priority.value, output)
+ self._write_field("Discovered-by", self.discovered_by, output)
+ self._write_field("Assigned-to", self.assigned_to, output)
+ self._write_field(
+ "CVSS",
+ [
+ "{authority}: {vector_string}".format(**c._asdict())
+ for c in self.cvss
+ ],
+ output,
+ )
+ for package in self.packages:
+ output.write("\n")
+ patches = [
+ "{}: {}".format(patch.patch_type, patch.entry)
+ for patch in package.patches
+ ]
+ self._write_field(
+ "Patches_{}".format(package.name), patches, output
+ )
+ for status in package.statuses:
+ self._write_field(
+ "{}_{}".format(status.series, package.name),
+ (
+ "{} ({})".format(status.status.value, status.reason)
+ if status.reason
+ else status.status.value
+ ),
+ output,
+ )
+ if package.priority:
+ self._write_field(
+ "Priority_{}".format(package.name),
+ package.priority.value,
+ output,
+ )
+ for status in package.statuses:
+ if status.priority:
+ self._write_field(
+ "Priority_{}_{}".format(package.name, status.series),
+ status.priority.value,
+ output,
+ )
+
+ if package.tags:
+ self._write_field(
+ "Tags_{}".format(package.name),
+ " ".join(package.tags),
+ output,
+ )
+
+ output.close()
+ return output_path
+
+ @classmethod
+ def _pop_cve_property(
+ cls, cve_data: Dict[str, Any], field_name: str, required=True
+ ) -> Optional[Any]:
+ if required:
+ value = cve_data.pop(field_name)
+ else:
+ value = cve_data.pop(field_name, None)
+ if isinstance(value, str):
+ return value.strip()
+ return value
+
+ @classmethod
+ def _write_field(
+ cls, name: str, value: Union[str, List[str]], output: TextIO
+ ) -> None:
+ if isinstance(value, str):
+ if value:
+ output.write("{}: {}\n".format(name, value))
+ else:
+ output.write("{}:\n".format(name))
+ elif isinstance(value, list):
+ output.write("{}:\n".format(name))
+ for line in value:
+ output.write(" {}\n".format(line))
+ else:
+ raise AssertionError()
+
+ @classmethod
+ def _format_datetime(cls, dt: datetime) -> str:
+ return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
+
+ @classmethod
+ def _format_notes(cls, notes: List[Tuple[str, str]]) -> str:
+ lines = []
+ for author, text in notes:
+ note_lines = text.split("\n")
+ lines.append("{}> {}".format(author, note_lines[0]))
+ for line in note_lines[1:]:
+ lines.append(" " + line)
+ return "\n".join(lines)
+
class CVE:
+ """
+ `CVE` represents UCT CVE information mapped to Launchpad data structures.
+
+ Do not confuse this with `Cve` database model.
+ """
DistroPackage = NamedTuple(
"DistroPackage",
@@ -322,6 +436,16 @@ class CVE:
),
)
+ UpstreamPackage = NamedTuple(
+ "UpstreamPackage",
+ (
+ ("package", Product),
+ ("importance", Optional[BugTaskImportance]),
+ ("status", BugTaskStatus),
+ ("status_explanation", str),
+ ),
+ )
+
PRIORITY_MAP = {
UCTRecord.Priority.CRITICAL: BugTaskImportance.CRITICAL,
UCTRecord.Priority.HIGH: BugTaskImportance.HIGH,
@@ -330,6 +454,7 @@ class CVE:
UCTRecord.Priority.UNTRIAGED: BugTaskImportance.UNDECIDED,
UCTRecord.Priority.NEGLIGIBLE: BugTaskImportance.WISHLIST,
}
+ PRIORITY_MAP_REVERSE = {v: k for k, v in PRIORITY_MAP.items()}
BUG_TASK_STATUS_MAP = {
UCTRecord.PackageStatus.IGNORED: BugTaskStatus.WONTFIX,
@@ -342,73 +467,82 @@ class CVE:
UCTRecord.PackageStatus.NEEDED: BugTaskStatus.NEW,
UCTRecord.PackageStatus.PENDING: BugTaskStatus.FIXCOMMITTED,
}
+ BUG_TASK_STATUS_MAP_REVERSE = {
+ v: k for k, v in BUG_TASK_STATUS_MAP.items()
+ }
VULNERABILITY_STATUS_MAP = {
"active": VulnerabilityStatus.ACTIVE,
"ignored": VulnerabilityStatus.IGNORED,
"retired": VulnerabilityStatus.RETIRED,
}
+ VULNERABILITY_STATUS_MAP_REVERSE = {
+ v: k for k, v in VULNERABILITY_STATUS_MAP.items()
+ }
def __init__(
self,
sequence: str,
date_made_public: Optional[datetime],
+ date_notice_issued: Optional[datetime],
+ date_coordinated_release: Optional[datetime],
distro_packages: List[DistroPackage],
series_packages: List[SeriesPackage],
+ upstream_packages: List[UpstreamPackage],
importance: BugTaskImportance,
status: VulnerabilityStatus,
assignee: Optional[Person],
+ discovered_by: str,
description: str,
ubuntu_description: str,
bug_urls: List[str],
references: List[str],
notes: str,
mitigation: str,
+ cvss: List[CVSS],
):
self.sequence = sequence
self.date_made_public = date_made_public
+ self.date_notice_issued = date_notice_issued
+ self.date_coordinated_release = date_coordinated_release
self.distro_packages = distro_packages
self.series_packages = series_packages
+ self.upstream_packages = upstream_packages
self.importance = importance
self.status = status
self.assignee = assignee
+ self.discovered_by = discovered_by
self.description = description
self.ubuntu_description = ubuntu_description
self.bug_urls = bug_urls
self.references = references
self.notes = notes
self.mitigation = mitigation
+ self.cvss = cvss
@classmethod
def make_from_uct_record(cls, uct_record: UCTRecord) -> "CVE":
- # Some `UCTRecord` fields are not being used at the moment:
- # - cve.discovered_by: This is supposed to be `Cve.discoverer` but
- # there may be a difficulty there since the `Cve` table should only
- # be managed by syncing data from MITRE and not from
- # ubuntu-cve-tracker
- # - cve.cvss: `Cve.cvss`, but may have a similar issue to
- # `Cve.discoverer` as above.
+ """
+ Create a `CVE` from a `UCTRecord`
+
+ This maps UCT CVE information to Launchpad data structures.
+ """
distro_packages = []
series_packages = []
+ upstream_packages = []
spn_set = getUtility(ISourcePackageNameSet)
for uct_package in uct_record.packages:
source_package_name = spn_set.getOrCreateByName(uct_package.name)
- package_priority = uct_package.priority or uct_record.priority
package_importance = (
- cls.PRIORITY_MAP[package_priority]
- if package_priority
+ cls.PRIORITY_MAP[uct_package.priority]
+ if uct_package.priority
else None
)
for uct_package_status in uct_package.statuses:
- distro_series = cls.get_distro_series(
- uct_package_status.distroseries
- )
- if distro_series is None:
- continue
if uct_package_status.status not in cls.BUG_TASK_STATUS_MAP:
logger.warning(
@@ -417,6 +551,34 @@ class CVE:
)
continue
+ series_package_importance = (
+ cls.PRIORITY_MAP[uct_package_status.priority]
+ if uct_package_status.priority
+ else None
+ )
+
+ if uct_package_status.series == "upstream":
+ product = cls.get_product(uct_package.name)
+ if product is None:
+ continue
+ upstream_packages.append(
+ cls.UpstreamPackage(
+ package=product,
+ importance=series_package_importance,
+ status=cls.BUG_TASK_STATUS_MAP[
+ uct_package_status.status
+ ],
+ status_explanation=uct_package_status.reason,
+ )
+ )
+ continue
+
+ distro_series = cls.get_distro_series(
+ uct_package_status.series
+ )
+ if distro_series is None:
+ continue
+
distro_package = cls.DistroPackage(
package=DistributionSourcePackage(
distribution=distro_series.distribution,
@@ -427,15 +589,6 @@ class CVE:
if distro_package not in distro_packages:
distro_packages.append(distro_package)
- series_package_priority = (
- uct_package_status.priority or package_priority
- )
- series_package_importance = (
- cls.PRIORITY_MAP[series_package_priority]
- if series_package_priority
- else None
- )
-
series_packages.append(
cls.SeriesPackage(
package=SourcePackage(
@@ -461,18 +614,131 @@ class CVE:
return cls(
sequence=uct_record.candidate,
- date_made_public=uct_record.date_made_public,
+ date_made_public=uct_record.public_date,
+ date_notice_issued=uct_record.public_date_at_USN,
+ date_coordinated_release=uct_record.crd,
distro_packages=distro_packages,
series_packages=series_packages,
+ upstream_packages=upstream_packages,
importance=cls.PRIORITY_MAP[uct_record.priority],
status=cls.infer_vulnerability_status(uct_record),
assignee=assignee,
+ discovered_by=uct_record.discovered_by,
description=uct_record.description,
ubuntu_description=uct_record.ubuntu_description,
bug_urls=uct_record.bugs,
references=uct_record.references,
- notes=cls.format_cve_notes(uct_record.notes),
+ notes=uct_record.notes,
mitigation=uct_record.mitigation,
+ cvss=uct_record.cvss,
+ )
+
+ def to_uct_record(self) -> UCTRecord:
+ """
+ Convert a `CVE` to a `UCTRecord`.
+
+ This maps Launchpad data structures to the format that UCT understands.
+ """
+ series_packages_by_name = defaultdict(
+ list
+ ) # type: Dict[SourcePackageName, List[CVE.SeriesPackage]]
+ for series_package in self.series_packages:
+ series_packages_by_name[
+ series_package.package.sourcepackagename
+ ].append(series_package)
+
+ packages_by_name = OrderedDict() # type: Dict[str, UCTRecord.Package]
+ processed_packages = set() # type: Set[SourcePackageName]
+ for distro_package in self.distro_packages:
+ spn = distro_package.package.sourcepackagename
+ if spn in processed_packages:
+ continue
+ processed_packages.add(spn)
+ statuses = [] # type: List[UCTRecord.SeriesPackageStatus]
+ for series_package in series_packages_by_name[spn]:
+ series = series_package.package.distroseries
+ if series.status == SeriesStatus.DEVELOPMENT:
+ series_name = "devel"
+ else:
+ series_name = series.name
+ distro_name = distro_package.package.distribution.name
+ if distro_name != "ubuntu":
+ if distro_name == "ubuntu-esm":
+ distro_name = "esm"
+ series_name = "{}/{}".format(series_name, distro_name)
+ statuses.append(
+ UCTRecord.SeriesPackageStatus(
+ series=series_name,
+ status=self.BUG_TASK_STATUS_MAP_REVERSE[
+ series_package.status
+ ],
+ reason=series_package.status_explanation,
+ priority=(
+ self.PRIORITY_MAP_REVERSE[
+ series_package.importance
+ ]
+ if series_package.importance
+ else None
+ ),
+ )
+ )
+
+ packages_by_name[spn.name] = UCTRecord.Package(
+ name=spn.name,
+ statuses=statuses,
+ priority=(
+ self.PRIORITY_MAP_REVERSE[distro_package.importance]
+ if distro_package.importance
+ else None
+ ),
+ tags=set(),
+ patches=[],
+ )
+
+ for upstream_package in self.upstream_packages:
+ status = UCTRecord.SeriesPackageStatus(
+ series="upstream",
+ status=self.BUG_TASK_STATUS_MAP_REVERSE[
+ upstream_package.status
+ ],
+ reason=upstream_package.status_explanation,
+ priority=(
+ self.PRIORITY_MAP_REVERSE[upstream_package.importance]
+ if upstream_package.importance
+ else None
+ ),
+ )
+ package_name = upstream_package.package.name
+ if package_name in packages_by_name:
+ packages_by_name[package_name].statuses.append(status)
+ else:
+ packages_by_name[package_name] = UCTRecord.Package(
+ name=package_name,
+ statuses=[status],
+ priority=None,
+ tags=set(),
+ patches=[],
+ )
+
+ return UCTRecord(
+ parent_dir=self.VULNERABILITY_STATUS_MAP_REVERSE.get(
+ self.status, ""
+ ),
+ assigned_to=self.assignee.name if self.assignee else "",
+ bugs=self.bug_urls,
+ cvss=self.cvss,
+ candidate=self.sequence,
+ crd=self.date_coordinated_release,
+ public_date=self.date_made_public,
+ public_date_at_USN=self.date_notice_issued,
+ description=self.description,
+ discovered_by=self.discovered_by,
+ mitigation=self.mitigation,
+ notes=self.notes,
+ priority=self.PRIORITY_MAP_REVERSE[self.importance],
+ references=self.references,
+ ubuntu_description=self.ubuntu_description,
+ packages=list(packages_by_name.values()),
)
@cachedproperty
@@ -490,16 +756,8 @@ class CVE:
"""
Infer vulnerability status based on the parent folder of the CVE file.
"""
- cve_folder_name = uct_record.path.absolute().parent.name
return cls.VULNERABILITY_STATUS_MAP.get(
- cve_folder_name, VulnerabilityStatus.NEEDS_TRIAGE
- )
-
- @classmethod
- def format_cve_notes(cls, notes: List[UCTRecord.Note]) -> str:
- return "\n".join(
- "{author}> {text}".format(author=note.author, text=note.text)
- for note in notes
+ uct_record.parent_dir, VulnerabilityStatus.NEEDS_TRIAGE
)
@classmethod
@@ -520,254 +778,29 @@ class CVE:
if "/" in distro_series_name:
series_name, distro_name = distro_series_name.split("/", 1)
if distro_name == "esm":
- # TODO: ESM needs special handling
- pass
- return
+ distro_name = "ubuntu-esm"
else:
+ distro_name = "ubuntu"
series_name = distro_series_name
- distribution = getUtility(ILaunchpadCelebrities).ubuntu
- if series_name == "devel":
- distro_series = cls.get_devel_series(distribution)
- else:
- distro_series = getUtility(IDistroSeriesSet).queryByName(
- distribution, series_name
- )
+ distribution = getUtility(IDistributionSet).getByName(distro_name)
+ if distribution is None:
+ logger.warning("Could not find the distribution: %s", distro_name)
+ return
+ if series_name == "devel":
+ distro_series = cls.get_devel_series(distribution)
+ else:
+ distro_series = getUtility(IDistroSeriesSet).queryByName(
+ distribution, series_name
+ )
if not distro_series:
logger.warning(
"Could not find the distro series: %s", distro_series_name
)
return distro_series
-
-class UCTImportError(Exception):
- pass
-
-
-class UCTImporter:
- def __init__(self, dry_run=False):
- self.dry_run = dry_run
- self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
-
- def import_cve_from_file(self, cve_path: Path) -> None:
- uct_record = UCTRecord.load(cve_path)
- cve = CVE.make_from_uct_record(uct_record)
- self.import_cve(cve)
-
- def import_cve(self, cve: CVE) -> None:
- if cve.date_made_public is None:
- logger.warning(
- "The CVE does not have a publication date, is it embargoed?"
- )
- return
- lp_cve = getUtility(ICveSet)[cve.sequence] # type: CveModel
- if lp_cve is None:
- logger.warning("Could not find the CVE in LP: %s", cve.sequence)
- return
- bug = self.find_existing_bug(cve, lp_cve)
- try:
- if bug is None:
- self.create_bug(cve, lp_cve)
- else:
- self.update_bug(bug, cve, lp_cve)
- except Exception:
- transaction.abort()
- raise
-
- if self.dry_run:
- logger.info("Dry-run mode enabled, all changes are reverted.")
- transaction.abort()
- else:
- transaction.commit()
-
- def find_existing_bug(
- self, cve: CVE, lp_cve: CveModel
- ) -> Optional[BugModel]:
- bug = None
- for vulnerability in lp_cve.vulnerabilities:
- if vulnerability.distribution in cve.affected_distributions:
- bugs = vulnerability.bugs
- if bugs:
- if bug and bugs[0] != bug:
- raise UCTImportError(
- "Multiple existing bugs are found "
- "for CVE {}".format(cve.sequence)
- )
- else:
- bug = bugs[0]
- return bug
-
- def create_bug(self, cve: CVE, lp_cve: CveModel) -> Optional[BugModel]:
-
- logger.debug("creating bug...")
-
- if not cve.series_packages:
- logger.warning("Could not find any affected packages")
- return None
-
- distro_package = cve.distro_packages[0]
-
- # Create the bug
- bug = getUtility(IBugSet).createBug(
- CreateBugParams(
- comment=self.make_bug_description(cve),
- title=cve.sequence,
- information_type=InformationType.PUBLICSECURITY,
- owner=self.bug_importer,
- target=distro_package.package,
- importance=distro_package.importance,
- cve=lp_cve,
- )
- ) # type: BugModel
-
- self.update_external_bug_urls(bug, cve.bug_urls)
-
- logger.info("Created bug with ID: %s", bug.id)
-
- self.create_bug_tasks(
- bug, cve.distro_packages[1:], cve.series_packages
- )
- self.update_statuses_and_importances(
- bug, cve.distro_packages[1:], cve.series_packages
- )
- self.assign_bug_tasks(bug, cve.assignee)
-
- # Make a note of the import in the activity log:
- getUtility(IBugActivitySet).new(
- bug=bug.id,
- datechanged=UTC_NOW,
- person=self.bug_importer,
- whatchanged="bug",
- message="UCT CVE entry {}".format(cve.sequence),
- )
-
- # Create the Vulnerabilities
- for distribution in cve.affected_distributions:
- self.create_vulnerability(bug, cve, lp_cve, distribution)
-
- return bug
-
- def update_bug(self, bug: BugModel, cve: CVE, lp_cve: CveModel) -> None:
- bug.description = self.make_bug_description(cve)
-
- self.create_bug_tasks(bug, cve.distro_packages, cve.series_packages)
- self.update_statuses_and_importances(
- bug, cve.distro_packages, cve.series_packages
- )
- self.assign_bug_tasks(bug, cve.assignee)
- self.update_external_bug_urls(bug, cve.bug_urls)
-
- # Update or add new Vulnerabilities
- vulnerabilities_by_distro = {
- v.distribution: v for v in bug.vulnerabilities
- }
- for distro in cve.affected_distributions:
- vulnerability = vulnerabilities_by_distro.get(distro)
- if vulnerability is None:
- self.create_vulnerability(bug, cve, lp_cve, distro)
- else:
- self.update_vulnerability(vulnerability, cve)
-
- def create_bug_tasks(
- self,
- bug: BugModel,
- distro_packages: List[CVE.DistroPackage],
- series_packages: List[CVE.SeriesPackage],
- ) -> None:
- bug_tasks = bug.bugtasks # type: List[BugTask]
- bug_task_by_target = {t.target: t for t in bug_tasks}
- bug_task_set = getUtility(IBugTaskSet)
- for target in (
- p.package for p in chain(distro_packages, series_packages)
- ):
- if target not in bug_task_by_target:
- bug_task_set.createTask(bug, self.bug_importer, target)
-
- def create_vulnerability(
- self,
- bug: BugModel,
- cve: CVE,
- lp_cve: CveModel,
- distribution: Distribution,
- ) -> Vulnerability:
- date_made_public = cve.date_made_public
- if date_made_public.tzinfo is None:
- date_made_public = date_made_public.replace(tzinfo=timezone.utc)
- vulnerability = getUtility(IVulnerabilitySet).new(
- distribution=distribution,
- creator=bug.owner,
- cve=lp_cve,
- status=cve.status,
- description=cve.description,
- notes=cve.notes,
- mitigation=cve.mitigation,
- importance=cve.importance,
- information_type=InformationType.PUBLICSECURITY,
- date_made_public=date_made_public,
- ) # type: Vulnerability
-
- vulnerability.linkBug(bug, bug.owner)
-
- logger.info("Created vulnerability with ID: %s", vulnerability.id)
-
- return vulnerability
-
- def update_vulnerability(
- self, vulnerability: Vulnerability, cve: CVE
- ) -> None:
- vulnerability.status = cve.status
- vulnerability.description = cve.description
- vulnerability.notes = cve.notes
- vulnerability.mitigation = cve.mitigation
- vulnerability.importance = cve.importance
- vulnerability.date_made_public = cve.date_made_public
-
- def assign_bug_tasks(
- self, bug: BugModel, assignee: Optional[Person]
- ) -> None:
- for bug_task in bug.bugtasks:
- bug_task.transitionToAssignee(assignee, validate=False)
-
- def update_statuses_and_importances(
- self,
- bug: BugModel,
- distro_packages: List[CVE.DistroPackage],
- series_packages: List[CVE.SeriesPackage],
- ) -> None:
- bug_tasks = bug.bugtasks # type: List[BugTask]
- bug_task_by_target = {t.target: t for t in bug_tasks}
-
- for dp in distro_packages:
- task = bug_task_by_target[dp.package]
- if task.importance != dp.importance:
- task.transitionToImportance(dp.importance, self.bug_importer)
-
- for sp in series_packages:
- task = bug_task_by_target[sp.package]
- if task.importance != sp.importance:
- task.transitionToImportance(sp.importance, self.bug_importer)
- if task.status != sp.status:
- task.transitionToStatus(sp.status, self.bug_importer)
- if task.status_explanation != sp.status_explanation:
- task.status_explanation = sp.status_explanation
-
- def update_external_bug_urls(
- self, bug: BugModel, bug_urls: List[str]
- ) -> None:
- bug_urls = set(bug_urls)
- for watch in bug.watches:
- if watch.url in bug_urls:
- bug_urls.remove(watch.url)
- else:
- watch.destroySelf()
- bug_watch_set = getUtility(IBugWatchSet)
- for external_bug_url in bug_urls:
- bug_watch_set.fromText(external_bug_url, bug, self.bug_importer)
-
- def make_bug_description(self, cve: CVE) -> str:
- parts = [cve.description]
- if cve.ubuntu_description:
- parts.extend(["", "Ubuntu-Description:", cve.ubuntu_description])
- if cve.references:
- parts.extend(["", "References:"])
- parts.extend(cve.references)
- return "\n".join(parts)
+ @classmethod
+ def get_product(cls, product_name: str) -> Optional[Product]:
+ product = getUtility(IProductSet).getByName(product_name)
+ if not product:
+ logger.warning("Could not find the product: %s", product_name)
+ return product
diff --git a/lib/lp/bugs/scripts/uct/uctexport.py b/lib/lp/bugs/scripts/uct/uctexport.py
new file mode 100644
index 0000000..b4c3b87
--- /dev/null
+++ b/lib/lp/bugs/scripts/uct/uctexport.py
@@ -0,0 +1,222 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import logging
+from collections import defaultdict
+from pathlib import Path
+from typing import List, NamedTuple, Optional
+
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.bugs.interfaces.bug import IBugSet
+from lp.bugs.model.bug import Bug as BugModel
+from lp.bugs.model.bugtask import BugTask
+from lp.bugs.model.cve import Cve as CveModel
+from lp.bugs.model.vulnerability import Vulnerability
+from lp.registry.model.distributionsourcepackage import (
+ DistributionSourcePackage,
+)
+from lp.registry.model.product import Product
+from lp.registry.model.sourcepackage import SourcePackage
+
+from .models import CVE, CVSS
+
+__all__ = [
+ "UCTExporter",
+]
+
+
+logger = logging.getLogger(__name__)
+
+
+class UCTExporter:
+ """
+ `UCTExporter` is used to export LP Bugs, Vulnerabilities and Cve's to
+ UCT CVE files.
+ """
+
+ ParsedDescription = NamedTuple(
+ "ParsedDescription",
+ (
+ ("description", str),
+ ("references", List[str]),
+ ),
+ )
+
+ def export_bug_to_uct_file(
+ self, bug_id: int, output_dir: Path
+ ) -> Optional[Path]:
+ """
+ Export a bug with the given bug_id as a
+ UCT CVE record file in the `output_dir`
+
+ :param bug_id: ID of the `Bug` model to be exported
+ :param output_dir: the directory where the exported file will be stored
+ :return: path to the exported file
+ """
+ bug = getUtility(IBugSet).get(bug_id)
+ if not bug:
+ logger.error("Could not find a bug with ID: %s", bug_id)
+ return
+ cve = self._make_cve_from_bug(bug)
+ uct_record = cve.to_uct_record()
+ save_to_path = uct_record.save(output_dir)
+ logger.info(
+ "Bug with ID: %s is exported to: %s", bug_id, str(save_to_path)
+ )
+ return save_to_path
+
+ def _make_cve_from_bug(self, bug: BugModel) -> CVE:
+ """
+ Create a `CVE` instances from a `Bug` model and the related
+ Vulnerabilities and `Cve`.
+
+ `BugTasks` are converted to `CVE.DistroPackage` and `CVE.SEriesPackage`
+ objects.
+
+ Other `CVE` fields are populated from the information contained in the
+ `Bug`, its related Vulnerabilities and LP `Cve` model.
+
+ :param bug: `Bug` model
+ :return: `CVE` instance
+ """
+ vulnerabilities = list(bug.vulnerabilities)
+ if not vulnerabilities:
+ raise ValueError(
+ "Bug with ID: {} does not have vulnerabilities".format(bug.id)
+ )
+ vulnerability = vulnerabilities[0] # type: Vulnerability
+ if not vulnerability.cve:
+ raise ValueError(
+ "Bug with ID: {} - vulnerability "
+ "is not linked to a CVE".format(bug.id)
+ )
+ lp_cve = vulnerability.cve # type: CveModel
+
+ parsed_description = self._parse_bug_description(bug.description)
+
+ bug_urls = []
+ for bug_watch in bug.watches:
+ bug_urls.append(bug_watch.url)
+
+ bug_tasks = list(bug.bugtasks) # type: List[BugTask]
+
+ cve_importance = vulnerability.importance
+ package_importances = {}
+
+ distro_packages = []
+ for bug_task in bug_tasks:
+ target = removeSecurityProxy(bug_task.target)
+ if not isinstance(target, DistributionSourcePackage):
+ continue
+ dp_importance = bug_task.importance
+ package_importances[target.sourcepackagename] = dp_importance
+ distro_packages.append(
+ CVE.DistroPackage(
+ package=target,
+ importance=(
+ dp_importance
+ if dp_importance != cve_importance
+ else None
+ ),
+ )
+ )
+
+ series_packages = []
+ for bug_task in bug_tasks:
+ target = removeSecurityProxy(bug_task.target)
+ if not isinstance(target, SourcePackage):
+ continue
+ sp_importance = bug_task.importance
+ package_importance = package_importances[target.sourcepackagename]
+ series_packages.append(
+ CVE.SeriesPackage(
+ package=target,
+ importance=(
+ sp_importance
+ if sp_importance != package_importance
+ else None
+ ),
+ status=bug_task.status,
+ status_explanation=bug_task.status_explanation,
+ )
+ )
+
+ upstream_packages = []
+ for bug_task in bug_tasks:
+ target = removeSecurityProxy(bug_task.target)
+ if not isinstance(target, Product):
+ continue
+ up_importance = bug_task.importance
+ package_importance = package_importances.get(target.name)
+ upstream_packages.append(
+ CVE.UpstreamPackage(
+ package=target,
+ importance=(
+ up_importance
+ if up_importance != package_importance
+ else None
+ ),
+ status=bug_task.status,
+ status_explanation=bug_task.status_explanation,
+ )
+ )
+
+ return CVE(
+ sequence="CVE-{}".format(lp_cve.sequence),
+ date_made_public=vulnerability.date_made_public,
+ date_notice_issued=vulnerability.date_notice_issued,
+ date_coordinated_release=vulnerability.date_coordinated_release,
+ distro_packages=distro_packages,
+ series_packages=series_packages,
+ upstream_packages=upstream_packages,
+ importance=cve_importance,
+ status=vulnerability.status,
+ assignee=bug_tasks[0].assignee,
+ discovered_by="", # TODO: fix this
+ description=parsed_description.description,
+ ubuntu_description=vulnerability.description,
+ bug_urls=bug_urls,
+ references=parsed_description.references,
+ notes=vulnerability.notes,
+ mitigation=vulnerability.mitigation,
+ cvss=[
+ CVSS(
+ authority=authority,
+ vector_string=vector_string,
+ )
+ for authority, vector_string in lp_cve.cvss.items()
+ ],
+ )
+
+ def _parse_bug_description(
+ self, bug_description: str
+ ) -> "ParsedDescription":
+ """
+ Some `CVE` fields can't be mapped to Launchpad models.
+ They are saved to bug description.
+
+ This method extract those fields from the bug description.
+
+ :param bug_description: bug description
+ :return: parsed description
+ """
+ field_values = defaultdict(list)
+ current_field = "description"
+ known_fields = {
+ "References:": "references",
+ }
+ lines = bug_description.split("\n")
+ for line in lines:
+ line = line.strip()
+ if not line:
+ continue
+ if line in known_fields:
+ current_field = known_fields[line]
+ continue
+ field_values[current_field].append(line)
+ return UCTExporter.ParsedDescription(
+ description="\n".join(field_values.get("description", [])),
+ references=field_values.get("references", []),
+ )
diff --git a/lib/lp/bugs/scripts/uct/uctimport.py b/lib/lp/bugs/scripts/uct/uctimport.py
new file mode 100644
index 0000000..3af3ad7
--- /dev/null
+++ b/lib/lp/bugs/scripts/uct/uctimport.py
@@ -0,0 +1,464 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""A UCT (Ubuntu CVE Tracker) bug importer
+
+This code can import CVE summaries stored in UCT repository to bugs in
+Launchpad.
+
+For each entry in UCT we:
+
+1. Create a Bug instance
+2. Create a Vulnerability instance for each affected distribution and link it
+ to the bug
+3. Create a Bug Task for each distribution/series package in the CVE entry
+4. Update the statuses of Bug Tasks based on the information in the CVE entry
+5. Update the information the related Launchpad's `Cve` model, if necessary
+
+Three types of bug tags are created:
+
+1. Bug tasks with a distribution package as a target - they represent
+ importance of the package
+2. Bug tasks with distribution series packages as a target - they represent
+ importance and status of the package in a particular series
+3. Bug tasks with a product as a target - they represent importance and
+ status of the package in upstream.
+"""
+import logging
+from datetime import timezone
+from itertools import chain
+from pathlib import Path
+from typing import Dict, List, Optional
+
+import transaction
+from zope.component import getUtility
+
+from lp.app.enums import InformationType
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
+from lp.bugs.interfaces.bugactivity import IBugActivitySet
+from lp.bugs.interfaces.bugtask import BugTaskImportance, IBugTaskSet
+from lp.bugs.interfaces.bugwatch import IBugWatchSet
+from lp.bugs.interfaces.cve import ICveSet
+from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
+from lp.bugs.model.bug import Bug as BugModel
+from lp.bugs.model.bugtask import BugTask
+from lp.bugs.model.cve import Cve as CveModel
+from lp.bugs.model.vulnerability import Vulnerability
+from lp.registry.model.distribution import Distribution
+from lp.registry.model.person import Person
+from lp.services.database.constants import UTC_NOW
+
+from .models import CVE, UCTRecord
+
+__all__ = [
+ "UCTImporter",
+ "UCTImportError",
+]
+
+logger = logging.getLogger(__name__)
+
+
+class UCTImportError(Exception):
+ pass
+
+
+class UCTImporter:
+ """
+ `UCTImporter` is used to import UCT CVE files to Launchpad database.
+ """
+
+ def __init__(self, dry_run=False):
+ self.dry_run = dry_run
+ self.bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
+
+ def import_cve_from_file(self, cve_path: Path) -> None:
+ """
+ Import a UCT CVE record from a file located at `cve_path`.
+
+ :param cve_path: path to the UCT CVE file
+ """
+ logger.info("Importing %s", cve_path)
+ uct_record = UCTRecord.load(cve_path)
+ cve = CVE.make_from_uct_record(uct_record)
+ self.import_cve(cve)
+ logger.info("%s was imported successfully", cve_path)
+
+ def import_cve(self, cve: CVE) -> None:
+ """
+ Import a `CVE` instance to Launchpad database.
+
+ :param cve: `CVE` with information from UCT
+ """
+ if cve.date_made_public is None:
+ logger.warning(
+ "%s does not have a publication date, "
+ "is it embargoed? Aborting.",
+ cve.sequence,
+ )
+ return
+ if not cve.series_packages:
+ logger.warning(
+ "%s: could not find any affected packages, aborting.",
+ cve.series_packages,
+ )
+ return
+ lp_cve = getUtility(ICveSet)[cve.sequence] # type: CveModel
+ if lp_cve is None:
+ logger.warning(
+ "%s: could not find the CVE in LP. Aborting.", cve.sequence
+ )
+ return
+ bug = self._find_existing_bug(cve, lp_cve)
+ try:
+ if bug is None:
+ bug = self.create_bug(cve, lp_cve)
+ logger.info(
+ "%s: created bug with ID: %s", cve.sequence, bug.id
+ )
+ else:
+ logging.info(
+ "%s: found existing bug with ID: %s",
+ cve.sequence,
+ bug.id,
+ )
+ self.update_bug(bug, cve, lp_cve)
+ logger.info(
+ "%s: updated bug with ID: %s", cve.sequence, bug.id
+ )
+ self._update_launchpad_cve(lp_cve, cve)
+ except Exception:
+ transaction.abort()
+ raise
+
+ if self.dry_run:
+ logger.info(
+ "%s: dry-run mode enabled, all changes are reverted.",
+ cve.sequence,
+ )
+ transaction.abort()
+ else:
+ transaction.commit()
+
+ def create_bug(self, cve: CVE, lp_cve: CveModel) -> BugModel:
+ """
+ Create a `Bug` model based on the information contained in a `CVE`.
+
+ :param cve: `CVE` with information from UCT
+ :param lp_cve: Launchpad `Cve` model
+ """
+
+ distro_package = cve.distro_packages[0]
+
+ # Create the bug
+ bug = getUtility(IBugSet).createBug(
+ CreateBugParams(
+ comment=self._make_bug_description(cve),
+ title=cve.sequence,
+ information_type=InformationType.PUBLICSECURITY,
+ owner=self.bug_importer,
+ target=distro_package.package,
+ importance=distro_package.importance,
+ cve=lp_cve,
+ )
+ ) # type: BugModel
+
+ self._update_external_bug_urls(bug, cve.bug_urls)
+
+ self._create_bug_tasks(
+ bug,
+ cve.distro_packages[1:],
+ cve.series_packages,
+ cve.upstream_packages,
+ )
+ self._update_statuses_and_importances(
+ bug,
+ cve.importance,
+ cve.distro_packages,
+ cve.series_packages,
+ cve.upstream_packages,
+ )
+ self._assign_bug_tasks(bug, cve.assignee)
+
+ # Make a note of the import in the activity log:
+ getUtility(IBugActivitySet).new(
+ bug=bug.id,
+ datechanged=UTC_NOW,
+ person=self.bug_importer,
+ whatchanged="bug",
+ message="UCT CVE entry {}".format(cve.sequence),
+ )
+
+ # Create the Vulnerabilities
+ for distribution in cve.affected_distributions:
+ self._create_vulnerability(bug, cve, lp_cve, distribution)
+
+ return bug
+
+ def update_bug(self, bug: BugModel, cve: CVE, lp_cve: CveModel) -> None:
+ """
+ Update a `Bug` model with the information contained in a `CVE`.
+
+ :param bug: `Bug` model to be updated
+ :param cve: `CVE` with information from UCT
+ :param lp_cve: Launchpad `Cve` model
+ """
+ bug.description = self._make_bug_description(cve)
+
+ self._create_bug_tasks(
+ bug,
+ cve.distro_packages,
+ cve.series_packages,
+ cve.upstream_packages,
+ )
+ self._update_statuses_and_importances(
+ bug,
+ cve.importance,
+ cve.distro_packages,
+ cve.series_packages,
+ cve.upstream_packages,
+ )
+ self._assign_bug_tasks(bug, cve.assignee)
+ self._update_external_bug_urls(bug, cve.bug_urls)
+
+ # Update or add new Vulnerabilities
+ vulnerabilities_by_distro = {
+ v.distribution: v for v in bug.vulnerabilities
+ }
+ for distro in cve.affected_distributions:
+ vulnerability = vulnerabilities_by_distro.get(distro)
+ if vulnerability is None:
+ self._create_vulnerability(bug, cve, lp_cve, distro)
+ else:
+ self._update_vulnerability(vulnerability, cve)
+
+ def _find_existing_bug(
+ self, cve: CVE, lp_cve: CveModel
+ ) -> Optional[BugModel]:
+ bug = None
+ for vulnerability in lp_cve.vulnerabilities:
+ if vulnerability.distribution in cve.affected_distributions:
+ bugs = vulnerability.bugs
+ if bugs:
+ if bug and bugs[0] != bug:
+ raise UCTImportError(
+ "Multiple existing bugs are found "
+ "for CVE {}".format(cve.sequence)
+ )
+ else:
+ bug = bugs[0]
+ return bug
+
+ def _create_bug_tasks(
+ self,
+ bug: BugModel,
+ distro_packages: List[CVE.DistroPackage],
+ series_packages: List[CVE.SeriesPackage],
+ upstream_packages: List[CVE.UpstreamPackage],
+ ) -> None:
+ """
+ Add bug tasks to the given `Bug` model based on the information
+ from a `CVE`.
+
+ `distro_packages` and `series_packages` from the `CVE`
+ are used as bug task targets.
+
+ This may be called multiple times, only new targets will be processed.
+
+ :param bug: `Bug` model to be updated
+ :param distro_packages: list of `DistroPackage`s from a `CVE`
+ :param series_packages: list of `SeriesPackage`s from a `CVE`
+ """
+ bug_tasks = bug.bugtasks # type: List[BugTask]
+ bug_task_by_target = {t.target: t for t in bug_tasks}
+ bug_task_set = getUtility(IBugTaskSet)
+ for target in (
+ p.package
+ for p in chain(distro_packages, series_packages, upstream_packages)
+ ):
+ if target not in bug_task_by_target:
+ bug_task_set.createTask(bug, self.bug_importer, target)
+
+ def _create_vulnerability(
+ self,
+ bug: BugModel,
+ cve: CVE,
+ lp_cve: CveModel,
+ distribution: Distribution,
+ ) -> Vulnerability:
+ """
+ Create a Vulnerability instance based on the information from
+ the given `CVE` instance and link to the specified `Bug`
+ and LP's `Cve` model.
+
+ :param bug: `Bug` model associated with the vulnerability
+ :param cve: `CVE` with information from UCT
+ :param lp_cve: Launchpad `Cve` model
+ :param distribution: a `Distribution` affected by the vulnerability
+ :return: a Vulnerability
+ """
+ vulnerability = getUtility(IVulnerabilitySet).new(
+ distribution=distribution,
+ status=cve.status,
+ importance=cve.importance,
+ creator=bug.owner,
+ information_type=InformationType.PUBLICSECURITY,
+ cve=lp_cve,
+ ) # type: Vulnerability
+ self._update_vulnerability(vulnerability, cve)
+
+ vulnerability.linkBug(bug, bug.owner)
+
+ logger.info(
+ "%s: created vulnerability with ID: %s for distribution: %s",
+ cve.sequence,
+ vulnerability.id,
+ distribution.name,
+ )
+
+ return vulnerability
+
+ def _update_vulnerability(
+ self, vulnerability: Vulnerability, cve: CVE
+ ) -> None:
+ """
+ Update a `Vulnerability` model with the information
+ contained in a `CVE`
+
+ :param vulnerability: `Vulnerability` model to be updated
+ :param cve: `CVE` with information from UCT
+ """
+ date_made_public = cve.date_made_public
+ if date_made_public.tzinfo is None:
+ date_made_public = date_made_public.replace(tzinfo=timezone.utc)
+ date_notice_issued = cve.date_notice_issued
+ if date_notice_issued.tzinfo is None:
+ date_notice_issued = date_notice_issued.replace(
+ tzinfo=timezone.utc
+ )
+ date_coordinated_release = cve.date_coordinated_release
+ if date_coordinated_release.tzinfo is None:
+ date_coordinated_release = date_coordinated_release.replace(
+ tzinfo=timezone.utc
+ )
+
+ vulnerability.status = cve.status
+ vulnerability.description = cve.ubuntu_description
+ vulnerability.notes = cve.notes
+ vulnerability.mitigation = cve.mitigation
+ vulnerability.importance = cve.importance
+ vulnerability.date_made_public = date_made_public
+ vulnerability.date_notice_issued = date_notice_issued
+ vulnerability.date_coordinated_release = date_coordinated_release
+
+ def _assign_bug_tasks(
+ self, bug: BugModel, assignee: Optional[Person]
+ ) -> None:
+ """
+ Assign all bug tasks from the given bug to the given assignee.
+
+ :param bug: `Bug` model to be updated
+ :param assignee: a person to be assigned (may be None)
+ """
+ for bug_task in bug.bugtasks:
+ bug_task.transitionToAssignee(assignee, validate=False)
+
+ def _update_statuses_and_importances(
+ self,
+ bug: BugModel,
+ cve_importance: BugTaskImportance,
+ distro_packages: List[CVE.DistroPackage],
+ series_packages: List[CVE.SeriesPackage],
+ upstream_packages: List[CVE.UpstreamPackage],
+ ) -> None:
+ """
+ Update statuses and importances of bug tasks according to the
+ information contained in `CVE`.
+
+ If a distro package doesn't have importance information, the
+ `cve_importance` is used instead.
+
+ If a series package doesn't have importance information, the
+ importance of the corresponding distro package is used instead.
+
+ :param bug: `Bug` model to be updated
+ :param cve_importance: `CVE` importance
+ :param distro_packages: list of `DistroPackage`s from a `CVE`
+ :param series_packages: list of `SeriesPackage`s from a `CVE`
+ """
+ bug_tasks = bug.bugtasks # type: List[BugTask]
+ bug_task_by_target = {t.target: t for t in bug_tasks}
+
+ package_importances = {} # type: Dict[str, BugTaskImportance]
+
+ for dp in distro_packages:
+ task = bug_task_by_target[dp.package]
+ dp_importance = dp.importance or cve_importance
+ package_importances[
+ dp.package.sourcepackagename.name
+ ] = dp_importance
+ if task.importance != dp_importance:
+ task.transitionToImportance(dp_importance, self.bug_importer)
+
+ for sp in chain(series_packages, upstream_packages):
+ task = bug_task_by_target[sp.package]
+ if isinstance(sp, CVE.SeriesPackage):
+ package_name = sp.package.sourcepackagename.name
+ elif isinstance(sp, CVE.UpstreamPackage):
+ package_name = sp.package.name
+ else:
+ raise AssertionError()
+ package_importance = package_importances[package_name]
+ sp_importance = sp.importance or package_importance
+ if task.importance != sp_importance:
+ task.transitionToImportance(sp_importance, self.bug_importer)
+ if task.status != sp.status:
+ task.transitionToStatus(sp.status, self.bug_importer)
+ if task.status_explanation != sp.status_explanation:
+ task.status_explanation = sp.status_explanation
+
+ def _update_external_bug_urls(
+ self, bug: BugModel, bug_urls: List[str]
+ ) -> None:
+ """
+ Save links to external bug trackers as bug watches.
+
+ :param bug: `Bug` model to be updated
+ :param bug_urls: links to external bug trackers
+ """
+ bug_urls = set(bug_urls)
+ for watch in bug.watches:
+ if watch.url in bug_urls:
+ bug_urls.remove(watch.url)
+ else:
+ watch.destroySelf()
+ bug_watch_set = getUtility(IBugWatchSet)
+ for external_bug_url in bug_urls:
+ bug_watch_set.fromText(external_bug_url, bug, self.bug_importer)
+
+ def _make_bug_description(self, cve: CVE) -> str:
+ """
+ Some `CVE` fields can't be mapped to Launchpad models.
+
+ They are saved to bug description.
+
+ :param cve: `CVE` with information from UCT
+ :return: bug description
+ """
+ parts = [cve.description]
+ if cve.references:
+ parts.extend(["", "References:"])
+ parts.extend(cve.references)
+ return "\n".join(parts)
+
+ def _update_launchpad_cve(self, lp_cve: CveModel, cve: CVE) -> None:
+ """
+ Update LP's `Cve` model based on the information contained in `CVE`.
+
+ :param lp_cve: LP's `CVE` model to be updated
+ :param cve: `CVE` with information from UCT
+ """
+ for cvss in cve.cvss:
+ lp_cve.setCVSSVectorForAuthority(
+ cvss.authority, cvss.vector_string
+ )
diff --git a/lib/lp/registry/model/distributionsourcepackage.py b/lib/lp/registry/model/distributionsourcepackage.py
index 6d31eab..fd9da20 100644
--- a/lib/lp/registry/model/distributionsourcepackage.py
+++ b/lib/lp/registry/model/distributionsourcepackage.py
@@ -141,6 +141,9 @@ class DistributionSourcePackage(
self.distribution = distribution
self.sourcepackagename = sourcepackagename
+ def __repr__(self):
+ return "<{} '{}'>".format(self.__class__.__name__, self.display_name)
+
@property
def name(self):
"""See `IDistributionSourcePackage`."""
diff --git a/scripts/uct-export.py b/scripts/uct-export.py
new file mode 100755
index 0000000..d65d224
--- /dev/null
+++ b/scripts/uct-export.py
@@ -0,0 +1,34 @@
+#!/usr/bin/python3 -S
+#
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+import _pythonpath # noqa: F401
+
+import logging
+from pathlib import Path
+
+from lp.bugs.scripts.uct import UCTExporter
+from lp.services.scripts.base import LaunchpadScript
+
+
+class UCTExportScript(LaunchpadScript):
+
+ usage = "usage: %prog [options] BUG_ID OUTPUT_DIR"
+ description = "Export bugs from to CVE entries used in ubuntu-cve-tracker."
+ loglevel = logging.INFO
+
+ def main(self):
+ if len(self.args) != 2:
+ self.parser.error(
+ "Please specify the bug ID and the output directory."
+ )
+
+ bug_id, output_dir = self.args
+
+ exporter = UCTExporter()
+ exporter.export_bug_to_uct_file(int(bug_id), Path(output_dir))
+
+
+if __name__ == "__main__":
+ script = UCTExportScript("lp.services.scripts.uctexport")
+ script.run()
diff --git a/scripts/uct-import.py b/scripts/uct-import.py
index 549f8d4..92d62d3 100755
--- a/scripts/uct-import.py
+++ b/scripts/uct-import.py
@@ -7,12 +7,13 @@ import _pythonpath # noqa: F401
import logging
from pathlib import Path
-from lp.bugs.scripts.uctimport import UCTImporter
+from lp.bugs.scripts.uct import UCTImporter
from lp.services.scripts.base import LaunchpadScript
class UCTImportScript(LaunchpadScript):
+ usage = "usage: %prog [options] CVE_PATH"
description = (
"Import bugs into Launchpad from CVE entries in ubuntu-cve-tracker."
)
diff --git a/setup.cfg b/setup.cfg
index ae66b51..a64253f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -208,6 +208,9 @@ ignore =
# operators, at least for now.
W503,
W504
+per-file-ignores =
+ # Ignore unused imports in `__init__` files
+ */__init__.py: F401
[isort]
# database/* have some implicit relative imports.
Follow ups