← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~andrey-fedoseev/launchpad:uct-import into launchpad:master

 


Diff comments:

> diff --git a/lib/lp/bugs/scripts/tests/test_uctimport.py b/lib/lp/bugs/scripts/tests/test_uctimport.py
> new file mode 100644
> index 0000000..743eea9
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/tests/test_uctimport.py
> @@ -0,0 +1,357 @@
> +#  Copyright 2022 Canonical Ltd.  This software is licensed under the
> +#  GNU Affero General Public License version 3 (see the file LICENSE).
> +import datetime
> +from pathlib import Path
> +
> +from dateutil.tz import tzutc

We normally use `pytz.UTC` instead at the moment, but I've been thinking of switching us over to `datetime.timezone.utc` since that's in the standard library.  I'd prefer either of those over introducing `dateutil.tz.tzutc`.

> +from zope.component import getUtility
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityStatus
> +from lp.bugs.interfaces.bugtask import (
> +    BugTaskImportance,
> +    BugTaskStatus,
> +    )
> +from lp.bugs.scripts.uctimport import (
> +    CVE,
> +    DistroSeriesPackageStatus,
> +    Note,
> +    Package,
> +    PackageStatus,
> +    Patch,
> +    Priority,
> +    UCTImporter,
> +    )
> +from lp.registry.interfaces.series import SeriesStatus
> +from lp.testing import TestCaseWithFactory
> +from lp.testing.layers import ZopelessDatabaseLayer
> +
> +
> +class TestUCTImporter(TestCaseWithFactory):
> +
> +    layer = ZopelessDatabaseLayer
> +
> +    def setUp(self, *args, **kwargs):
> +        super().setUp(*args, **kwargs)
> +        self.importer = UCTImporter()
> +
> +    def test_load_cve_from_file(self):
> +        cve_path = Path(__file__).parent / "sampledata" / "CVE-2022-23222"
> +        cve = self.importer.load_cve_from_file(cve_path)
> +        self.assertEqual(
> +            cve,
> +            CVE(
> +                assigned_to="",
> +                bugs=[
> +                    "https://github.com/mm2/Little-CMS/issues/29";,
> +                    "https://github.com/mm2/Little-CMS/issues/30";,
> +                    "https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=745471";,
> +                ],
> +                cvss=[
> +                    {
> +                        "source": "nvd",
> +                        "vector": (
> +                            "CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H"
> +                        ),
> +                        "baseScore": "7.8",
> +                        "baseSeverity": "HIGH",
> +                    }
> +                ],
> +                candidate="CVE-2022-23222",
> +                date_made_public=datetime.datetime(
> +                    2022, 1, 14, 8, 15, tzinfo=tzutc()
> +                ),
> +                description=(
> +                    "kernel/bpf/verifier.c in the Linux kernel through "
> +                    "5.15.14 allows local\nusers to gain privileges because "
> +                    "of the availability of pointer arithmetic\nvia certain "
> +                    "*_OR_NULL pointer types."
> +                ),
> +                discovered_by="tr3e wang",
> +                mitigation=(
> +                    "seth-arnold> set kernel.unprivileged_bpf_disabled to 1"
> +                ),
> +                notes=[
> +                    Note(
> +                        author="sbeattie",
> +                        text=(
> +                            "Ubuntu 21.10 / 5.13+ kernels disable "
> +                            "unprivileged BPF by default.\nkernels 5.8 and "
> +                            "older are not affected, priority high is "
> +                            "for\n5.10 and 5.11 based kernels only"
> +                        ),
> +                    ),
> +                ],
> +                priority=Priority.CRITICAL,
> +                references=[
> +                    "https://ubuntu.com/security/notices/USN-5368-1";
> +                ],
> +                ubuntu_description=(
> +                    "It was discovered that the BPF verifier in the Linux "
> +                    "kernel did not\nproperly restrict pointer types in "
> +                    "certain situations. A local attacker\ncould use this to "
> +                    "cause a denial of service (system crash) or possibly\n"
> +                    "execute arbitrary code."
> +                ),
> +                packages=[
> +                    Package(
> +                        name="linux",
> +                        statuses=[
> +                            DistroSeriesPackageStatus(
> +                                distroseries="upstream",
> +                                status=PackageStatus.RELEASED,
> +                                reason="5.17~rc1",
> +                                priority=None,
> +                            ),
> +                            DistroSeriesPackageStatus(
> +                                distroseries="impish",
> +                                status=PackageStatus.RELEASED,
> +                                reason="5.13.0-37.42",
> +                                priority=Priority.MEDIUM,
> +                            ),
> +                            DistroSeriesPackageStatus(
> +                                distroseries="devel",
> +                                status=PackageStatus.NOT_AFFECTED,
> +                                reason="5.15.0-25.25",
> +                                priority=Priority.MEDIUM,
> +                            ),
> +                        ],
> +                        priority=None,
> +                        tags={"not-ue"},
> +                        patches=[
> +                            Patch(
> +                                patch_type="break-fix",
> +                                entry=(
> +                                    "457f44363a8894135c85b7a9afd2bd8196db24ab "
> +                                    "c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
> +                                    "local-CVE-2022-23222-fix"
> +                                ),
> +                            )
> +                        ],
> +                    ),
> +                    Package(
> +                        name="linux-hwe",
> +                        statuses=[
> +                            DistroSeriesPackageStatus(
> +                                distroseries="upstream",
> +                                status=PackageStatus.RELEASED,
> +                                reason="5.17~rc1",
> +                                priority=None,
> +                            ),
> +                            DistroSeriesPackageStatus(
> +                                distroseries="impish",
> +                                status=PackageStatus.DOES_NOT_EXIST,
> +                                reason="",
> +                                priority=None,
> +                            ),
> +                            DistroSeriesPackageStatus(
> +                                distroseries="devel",
> +                                status=PackageStatus.DOES_NOT_EXIST,
> +                                reason="",
> +                                priority=None,
> +                            ),
> +                        ],
> +                        priority=Priority.HIGH,
> +                        tags=set(),
> +                        patches=[],
> +                    ),
> +                ],
> +            ),
> +        )
> +
> +    def test_create_bug(self):
> +        celebrities = getUtility(ILaunchpadCelebrities)
> +        ubuntu = celebrities.ubuntu
> +        owner = celebrities.bug_importer
> +        supported_series = self.factory.makeDistroSeries(
> +            distribution=ubuntu, status=SeriesStatus.SUPPORTED
> +        )
> +        current_series = self.factory.makeDistroSeries(
> +            distribution=ubuntu, status=SeriesStatus.CURRENT
> +        )
> +        devel_series = self.factory.makeDistroSeries(
> +            distribution=ubuntu, status=SeriesStatus.DEVELOPMENT
> +        )
> +        dsp1 = self.factory.makeDistributionSourcePackage(distribution=ubuntu)
> +        dsp2 = self.factory.makeDistributionSourcePackage(distribution=ubuntu)
> +        lp_cve = self.factory.makeCVE("2022-23222")
> +
> +        for package in (dsp1, dsp2):
> +            for series in (supported_series, current_series, devel_series):
> +                self.factory.makeSourcePackagePublishingHistory(
> +                    distroseries=series,
> +                    sourcepackagerelease=self.factory.makeSourcePackageRelease(
> +                        distroseries=series,
> +                        sourcepackagename=package.sourcepackagename,
> +                    ),
> +                )
> +
> +        now = datetime.datetime.now(tzutc())
> +        cve = CVE(
> +            assigned_to="",
> +            bugs=[
> +                "https://github.com/mm2/Little-CMS/issues/29";,
> +                "https://github.com/mm2/Little-CMS/issues/30";,
> +                "https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=745471";,
> +            ],
> +            cvss=[],
> +            candidate="CVE-2022-23222",
> +            date_made_public=now,
> +            description="description",
> +            discovered_by="tr3e wang",
> +            mitigation="mitigation",
> +            notes=[Note(author="author", text="text")],
> +            priority=Priority.MEDIUM,
> +            references=[
> +                "https://ubuntu.com/security/notices/USN-5368-1";
> +            ],
> +            ubuntu_description="ubuntu-description",
> +            packages=[
> +                Package(
> +                    name=dsp1.sourcepackagename.name,
> +                    statuses=[
> +                        DistroSeriesPackageStatus(
> +                            distroseries=supported_series.name,
> +                            status=PackageStatus.RELEASED,
> +                            reason="released",
> +                            priority=Priority.HIGH,
> +                        ),
> +                        DistroSeriesPackageStatus(
> +                            distroseries=current_series.name,
> +                            status=PackageStatus.DOES_NOT_EXIST,
> +                            reason="does not exist",
> +                            priority=None,
> +                        ),
> +                    ],
> +                    priority=Priority.LOW,
> +                    patches=[],
> +                    tags=set(),
> +                ),
> +                Package(
> +                    name=dsp2.sourcepackagename.name,
> +                    statuses=[
> +                        DistroSeriesPackageStatus(
> +                            distroseries=supported_series.name,
> +                            status=PackageStatus.NOT_AFFECTED,
> +                            reason="not affected",
> +                            priority=Priority.LOW,
> +                        ),
> +                        DistroSeriesPackageStatus(
> +                            distroseries=current_series.name,
> +                            status=PackageStatus.IGNORED,
> +                            reason="ignored",
> +                            priority=None,
> +                        ),
> +                        DistroSeriesPackageStatus(
> +                            distroseries="devel",
> +                            status=PackageStatus.NEEDS_TRIAGE,
> +                            reason="needs triage",
> +                            priority=None,
> +                        ),
> +                    ],
> +                    priority=None,
> +                    patches=[],
> +                    tags=set(),
> +                ),
> +            ],
> +        )
> +        bug, vulnerabilities = self.importer.create_bug(cve, lp_cve)
> +
> +        self.assertEqual(bug.title, "CVE-2022-23222")
> +        self.assertEqual(bug.description, "ubuntu-description")
> +        self.assertEqual(bug.owner, owner)
> +        self.assertEqual(bug.information_type, InformationType.PRIVATESECURITY)
> +
> +        messages = list(bug.messages)
> +        self.assertEqual(len(messages), 5)
> +
> +        message = messages.pop(0)
> +        self.assertEqual(message.owner, owner)
> +        self.assertEqual(message.text_contents, "description")
> +
> +        for external_bug_url in cve.bugs:
> +            message = messages.pop(0)
> +            self.assertEqual(message.text_contents, external_bug_url)
> +
> +        for reference in cve.references:
> +            message = messages.pop(0)
> +            self.assertEqual(message.text_contents, reference)
> +
> +        bug_tasks = bug.bugtasks
> +        # 7 bug tasks are supposed to be created:
> +        #   2 for distro packages
> +        #   5 for combinations of distroseries/package:
> +        #     2 for the first package (2 distro series)
> +        #     3 for the second package (3 distro series)
> +        self.assertEqual(len(bug_tasks), 7)
> +
> +        bug_tasks_by_target = {
> +            (t.distribution, t.distroseries, t.sourcepackagename): t
> +            for t in bug_tasks
> +        }
> +        t = bug_tasks_by_target.pop((ubuntu, None, dsp1.sourcepackagename))
> +        self.assertEqual(t.importance, BugTaskImportance.LOW)
> +        self.assertEqual(t.status, BugTaskStatus.NEW)
> +        self.assertEqual(t.status_explanation, None)
> +
> +        t = bug_tasks_by_target.pop((ubuntu, None, dsp2.sourcepackagename))
> +        self.assertEqual(t.importance, BugTaskImportance.MEDIUM)
> +        self.assertEqual(t.status, BugTaskStatus.UNKNOWN)
> +        self.assertEqual(t.status_explanation, None)
> +
> +        t = bug_tasks_by_target.pop(
> +            (None, supported_series, dsp1.sourcepackagename)
> +        )
> +        self.assertEqual(t.importance, BugTaskImportance.HIGH)
> +        self.assertEqual(t.status, BugTaskStatus.FIXRELEASED)
> +        self.assertEqual(t.status_explanation, "released")
> +
> +        t = bug_tasks_by_target.pop(
> +            (None, current_series, dsp1.sourcepackagename)
> +        )
> +        self.assertEqual(t.importance, BugTaskImportance.LOW)
> +        self.assertEqual(t.status, BugTaskStatus.DOESNOTEXIST)
> +        self.assertEqual(t.status_explanation, "does not exist")
> +
> +        t = bug_tasks_by_target.pop(
> +            (None, supported_series, dsp2.sourcepackagename)
> +        )
> +        self.assertEqual(t.importance, BugTaskImportance.LOW)
> +        self.assertEqual(t.status, BugTaskStatus.INVALID)
> +        self.assertEqual(t.status_explanation, "not affected")
> +
> +        t = bug_tasks_by_target.pop(
> +            (None, current_series, dsp2.sourcepackagename)
> +        )
> +        self.assertEqual(t.importance, BugTaskImportance.MEDIUM)
> +        self.assertEqual(t.status, BugTaskStatus.WONTFIX)
> +        self.assertEqual(t.status_explanation, "ignored")
> +
> +        t = bug_tasks_by_target.pop(
> +            (None, devel_series, dsp2.sourcepackagename)
> +        )
> +        self.assertEqual(t.importance, BugTaskImportance.MEDIUM)
> +        self.assertEqual(t.status, BugTaskStatus.UNKNOWN)
> +        self.assertEqual(t.status_explanation, "needs triage")
> +
> +        self.assertEqual(bug.cves, [lp_cve])
> +
> +        self.assertEqual(len(vulnerabilities), 1)
> +
> +        vulnerability = vulnerabilities[0]
> +        self.assertEqual(vulnerability.distribution, ubuntu)
> +        self.assertEqual(vulnerability.creator, owner)
> +        self.assertEqual(vulnerability.cve, lp_cve)
> +        self.assertEqual(
> +            vulnerability.status, VulnerabilityStatus.NEEDS_TRIAGE
> +        )
> +        self.assertEqual(vulnerability.description, "description")
> +        self.assertEqual(vulnerability.notes, "author> text")
> +        self.assertEqual(vulnerability.mitigation, "mitigation")
> +        self.assertEqual(vulnerability.importance, BugTaskImportance.MEDIUM)
> +        self.assertEqual(
> +            vulnerability.information_type, InformationType.PRIVATESECURITY
> +        )
> +        self.assertEqual(vulnerability.date_made_public, now)
> +        self.assertEqual(vulnerability.bugs, [bug])
> diff --git a/lib/lp/bugs/scripts/uctimport.py b/lib/lp/bugs/scripts/uctimport.py
> new file mode 100644
> index 0000000..09f8421
> --- /dev/null
> +++ b/lib/lp/bugs/scripts/uctimport.py
> @@ -0,0 +1,2246 @@
> +# Copyright 2022 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""A UCT (Ubuntu CVE Tracker) bug importer
> +
> +This code can import CVE summaries stored in UCT repository to bugs in
> +Launchpad.
> +
> +For each entry in UCT we:
> +
> +1. Create a Bug instance
> +2. Create a Vulnerability instance and link it to the bug (multiple
> +    Vulnerabilities may be created if the CVE entry covers multiple
> +    distributions)
> +3. Create a Bug Task for each package/distro-series in the CVE entry
> +4. Update the statuses of Bug Tasks based on the information in the CVE entry
> +"""
> +import codecs
> +from datetime import datetime
> +from enum import Enum
> +import glob
> +import logging
> +import math
> +import os
> +from pathlib import Path
> +import re
> +import sys
> +from typing import (
> +    Any,
> +    Dict,
> +    List,
> +    NamedTuple,
> +    Optional,
> +    Set,
> +    Tuple,
> +    )
> +
> +import dateutil.parser
> +import yaml
> +from zope.component import getUtility
> +
> +from lp.app.enums import InformationType
> +from lp.app.interfaces.launchpad import ILaunchpadCelebrities
> +from lp.bugs.enums import VulnerabilityStatus
> +from lp.bugs.interfaces.bug import (
> +    CreateBugParams,
> +    IBugSet,
> +    )
> +from lp.bugs.interfaces.bugtask import (
> +    BugTaskImportance,
> +    BugTaskStatus,
> +    IBugTaskSet,
> +    )
> +from lp.bugs.interfaces.cve import ICveSet
> +from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
> +from lp.bugs.model.bug import Bug as BugModel
> +from lp.bugs.model.cve import Cve as CveModel
> +from lp.bugs.model.vulnerability import Vulnerability
> +from lp.registry.interfaces.distroseries import IDistroSeriesSet
> +from lp.registry.interfaces.series import SeriesStatus
> +from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
> +from lp.registry.model.distribution import Distribution
> +from lp.registry.model.distributionsourcepackage import (
> +    DistributionSourcePackage,
> +    )
> +from lp.registry.model.distroseries import DistroSeries
> +from lp.registry.model.sourcepackagename import SourcePackageName
> +from lp.services.messages.interfaces.message import IMessageSet
> +
> +
> +DEFAULT_LOGGER = logging.getLogger("lp.bugs.scripts.import")
> +
> +
> +class Priority(Enum):
> +    CRITICAL = "critical"
> +    HIGH = "high"
> +    MEDIUM = "medium"
> +    LOW = "low"
> +    UNTRIAGED = "untriaged"
> +    NEGLIGIBLE = "negligible"
> +
> +
> +class PackageStatus(Enum):
> +    IGNORED = "ignored"
> +    NEEDS_TRIAGE = "needs-triage"
> +    DOES_NOT_EXIST = "DNE"
> +    RELEASED = "released"
> +    NOT_AFFECTED = "not-affected"
> +    DEFERRED = "deferred"
> +    NEEDED = "needed"
> +    PENDING = "pending"
> +
> +
> +DistroSeriesPackageStatus = NamedTuple(
> +    "DistroSeriesPackageStatus",
> +    [
> +        ("distroseries", str),
> +        ("status", PackageStatus),
> +        ("reason", str),
> +        ("priority", Optional[Priority]),
> +    ],
> +)
> +
> +
> +Patch = NamedTuple(
> +    "Patch",
> +    [
> +        ("patch_type", str),
> +        ("entry", str),
> +    ],
> +)
> +
> +
> +Package = NamedTuple(
> +    "Package",
> +    [
> +        ("name", str),
> +        ("statuses", List[DistroSeriesPackageStatus]),
> +        ("priority", Optional[Priority]),
> +        ("tags", Set[str]),
> +        ("patches", List[Patch]),
> +    ],
> +)
> +
> +Note = NamedTuple(
> +    "Note",
> +    [
> +        ("author", str),
> +        ("text", str),
> +    ],
> +)
> +
> +
> +CVE = NamedTuple(
> +    "CVE",
> +    [
> +        ("assigned_to", str),
> +        ("bugs", List[str]),
> +        ("cvss", List[Dict[str, Any]]),
> +        ("candidate", str),
> +        ("date_made_public", Optional[datetime]),
> +        ("description", str),
> +        ("discovered_by", str),
> +        ("mitigation", str),
> +        ("notes", List[Note]),
> +        ("priority", Priority),
> +        ("references", List[str]),
> +        ("ubuntu_description", str),
> +        ("packages", List[Package]),
> +    ],
> +)
> +
> +PRIORITY_MAP = {
> +    Priority.CRITICAL: BugTaskImportance.CRITICAL,
> +    Priority.HIGH: BugTaskImportance.HIGH,
> +    Priority.MEDIUM: BugTaskImportance.MEDIUM,
> +    Priority.LOW: BugTaskImportance.LOW,
> +    Priority.UNTRIAGED: BugTaskImportance.UNDECIDED,  # TODO: confirm this
> +    Priority.NEGLIGIBLE: BugTaskImportance.WISHLIST,  # TODO: confirm this

Those seem correct.

> +}
> +
> +STATUS_MAP = {
> +    PackageStatus.IGNORED: BugTaskStatus.WONTFIX,
> +    PackageStatus.NEEDS_TRIAGE: BugTaskStatus.UNKNOWN,
> +    PackageStatus.DOES_NOT_EXIST: BugTaskStatus.DOESNOTEXIST,
> +    PackageStatus.RELEASED: BugTaskStatus.FIXRELEASED,
> +    PackageStatus.NOT_AFFECTED: BugTaskStatus.INVALID,
> +    # PackageStatus.DEFERRED: ...,  # TODO: fix this

This will need to stay commented out for now, since we haven't yet added a suitable `BugTaskStatus`.  I'd suggest that this import script should be prepared to explicitly skip (and log) things it doesn't know how to handle, for now; that's better than trying to add support for everything in the first pass.

> +    PackageStatus.NEEDED: BugTaskStatus.NEW,
> +    PackageStatus.PENDING: BugTaskStatus.FIXCOMMITTED,
> +}
> +
> +
> +def format_cve_notes(notes: List[Note]) -> str:
> +    return "\n".join(
> +        "{author}> {text}".format(author=note.author, text=note.text)
> +        for note in notes
> +    )
> +
> +
> +class UCTImporter:
> +    def __init__(self, logger: Optional[logging.Logger] = None) -> None:
> +        self.logger = logger or DEFAULT_LOGGER
> +
> +    def import_cve_from_file(self, cve_path: Path) -> None:
> +        cve = self.load_cve_from_file(cve_path)
> +        self.import_cve(cve)
> +
> +    @staticmethod
> +    def load_cve_from_file(cve_path: Path) -> CVE:
> +        """
> +        Load a `CVE` instance from data contained in `cve_path`.
> +
> +        The file is parsed to a dictionary using the code copied from
> +        `cve_lib` in `ubuntu-cve-tracker`.
> +
> +        A `CVE` instance is created from that dictionary, applying some data
> +        transformations along the way.
> +        """
> +
> +        cve_data = load_cve(str(cve_path))  # type: Dict[str, Any]
> +
> +        packages = []  # type: List[Package]
> +        tags = cve_data.pop("tags")  # type: Dict[str, Set[str]]
> +        patches = cve_data.pop(
> +            "patches"
> +        )  # type: Dict[str, List[Tuple[str, str]]]
> +        for package, statuses_dict in cve_data.pop("pkgs").items():
> +            statuses = []  # type: List[DistroSeriesPackageStatus]
> +            for distroseries, (status, reason) in statuses_dict.items():
> +                distroseries_priority = cve_data.pop(
> +                    "Priority_{package}_{distroseries}".format(
> +                        package=package,
> +                        distroseries=distroseries,
> +                    ),
> +                    None,
> +                )
> +                statuses.append(
> +                    DistroSeriesPackageStatus(
> +                        distroseries=distroseries,
> +                        status=PackageStatus(status),
> +                        reason=reason,
> +                        priority=(
> +                            Priority(distroseries_priority)
> +                            if distroseries_priority
> +                            else None
> +                        ),
> +                    )
> +                )
> +            package_priority = cve_data.pop(
> +                "Priority_{package}".format(package=package), None
> +            )
> +            packages.append(
> +                Package(
> +                    name=package,
> +                    statuses=statuses,
> +                    priority=(
> +                        Priority(package_priority)
> +                        if package_priority
> +                        else None
> +                    ),
> +                    tags=tags.pop(package, set()),
> +                    patches=[
> +                        Patch(patch_type=patch_type, entry=entry)
> +                        for patch_type, entry in patches.pop(package, [])
> +                    ],
> +                )
> +            )
> +
> +        crd = cve_data.pop("CRD", None)
> +        if crd == "unknown":
> +            crd = None
> +        public_date = cve_data.pop("PublicDate", None)
> +        if public_date == "unknown":
> +            public_date = None
> +        public_date_at_USN = cve_data.pop("PublicDateAtUSN", None)
> +        if public_date_at_USN == "unknown":
> +            public_date_at_USN = None
> +
> +        date_made_public = crd or public_date or public_date_at_USN
> +
> +        cve = CVE(
> +            assigned_to=cve_data.pop("Assigned-to").strip(),
> +            bugs=cve_data.pop("Bugs").strip().split("\n"),
> +            cvss=cve_data.pop("CVSS"),
> +            candidate=cve_data.pop("Candidate").strip(),
> +            date_made_public=dateutil.parser.parse(date_made_public)
> +            if date_made_public
> +            else None,
> +            description=cve_data.pop("Description").strip(),
> +            discovered_by=cve_data.pop("Discovered-by").strip(),
> +            mitigation=cve_data.pop("Mitigation", "").strip(),
> +            notes=[
> +                Note(author=author, text=text)
> +                for author, text in cve_data.pop("Notes")
> +            ],
> +            priority=Priority(cve_data.pop("Priority")),
> +            references=cve_data.pop("References").strip().split("\n"),
> +            ubuntu_description=cve_data.pop("Ubuntu-Description").strip(),
> +            packages=packages,
> +        )
> +
> +        # make sure all fields are consumed
> +        if cve_data:
> +            raise AssertionError(
> +                "not all fields are consumed: {}".format(cve_data)
> +            )
> +
> +        return cve
> +
> +    def import_cve(self, cve: CVE) -> None:
> +        lp_cve: CveModel = getUtility(ICveSet)[cve.candidate]
> +        if lp_cve is None:
> +            self.logger.warning(
> +                "Could not find the CVE in LP: %s", cve.candidate
> +            )
> +            return
> +        self.create_bug(cve, lp_cve)
> +
> +    def create_bug(
> +        self, cve: CVE, lp_cve: CveModel
> +    ) -> Tuple[Optional[BugModel], List[Vulnerability]]:
> +
> +        self.logger.debug("creating bug...")
> +
> +        affected_packages = []  # type: List[DistributionSourcePackage]
> +        affected_distro_series = []  # type: List[DistroSeries]
> +        affected_distributions = set()  # type: Set[Distribution]
> +        importances = {}
> +        statuses_with_explanations = {}
> +
> +        for cve_package in cve.packages:
> +            source_package_name = self.get_source_package_name(
> +                cve_package.name
> +            )
> +            if not source_package_name:
> +                continue
> +
> +            package_priority = cve_package.priority or cve.priority
> +            importances[source_package_name] = (
> +                PRIORITY_MAP[package_priority] if package_priority else None
> +            )
> +
> +            for cve_package_status in cve_package.statuses:
> +                distro_series = self.get_distro_series(
> +                    cve_package_status.distroseries
> +                )
> +                if not distro_series:
> +                    continue
> +
> +                if distro_series not in affected_distro_series:
> +                    affected_distro_series.append(distro_series)
> +
> +                affected_distributions.add(distro_series.distribution)
> +
> +                distro_package = DistributionSourcePackage(
> +                    distribution=distro_series.distribution,
> +                    sourcepackagename=source_package_name,
> +                )
> +                if distro_package not in affected_packages:
> +                    affected_packages.append(distro_package)
> +
> +                distro_series_package_priority = (
> +                    cve_package_status.priority or package_priority
> +                )
> +                key = (source_package_name, distro_series)
> +                importances[key] = (
> +                    PRIORITY_MAP[distro_series_package_priority]
> +                    if distro_series_package_priority
> +                    else None
> +                )
> +                statuses_with_explanations[key] = (
> +                    STATUS_MAP[cve_package_status.status],
> +                    cve_package_status.reason,
> +                )
> +
> +        if not affected_packages:
> +            self.logger.warning("Could not find any affected packages")
> +            return None, []
> +
> +        distro_package = affected_packages.pop(0)
> +        affected_distributions = {distro_package.distribution}
> +
> +        # Create the bug
> +        # TODO: confirm this. Should this be something like `team_security`?
> +        owner = getUtility(ILaunchpadCelebrities).bug_importer

Good question.  It'll show up as the reporter of the bug, but I think it's probably better for that to explicitly indicate that the bug was imported, so I'm fine with using the `bug-importer` celebrity here.

> +
> +        # TODO: is this correct?
> +        if cve.date_made_public:
> +            information_type = InformationType.PRIVATESECURITY
> +        else:
> +            information_type = InformationType.EMBARGOED

This feels like a dubious inference.  Anything in the lp:ubuntu-cve-tracker repository can't be all that embargoed by definition - for embargoed CVEs we wouldn't even be allowed to commit information about them to the public tracker repository, and entries would have to go in a private tracker repository instead.

> +
> +        bug = getUtility(IBugSet).createBug(
> +            CreateBugParams(
> +                description=cve.ubuntu_description,
> +                title=cve.candidate,
> +                information_type=information_type,
> +                owner=owner,
> +                msg=getUtility(IMessageSet).fromText(
> +                    "", cve.description, owner=owner
> +                ),
> +                target=distro_package,
> +                importance=importances[distro_package.sourcepackagename],
> +            )
> +        )  # type: BugModel
> +
> +        # Add links to external bug trackers
> +        for external_bug_url in cve.bugs:
> +            bug.newMessage(owner=owner, content=external_bug_url)
> +
> +        # Add references
> +        for reference in cve.references:
> +            bug.newMessage(owner=owner, content=reference)
> +
> +        # TODO: shall we store discovered_by?

This is supposed to be `Cve.discoverer`, but there may be a difficulty there since the `Cve` table should only be managed by syncing data from MITRE and not from ubuntu-cve-tracker, and I don't actually know whether MITRE has enough information for us to store this.  Skip this for now and leave a comment.

> +        # TODO: shall we store assigned_to? (this looks like a LP username)

Should end up as the bug task assignee.

> +        # TODO: shall we store cvss?

`Cve.cvss`, but may have a similar issue to `Cve.discoverer` as above.  Skip for now.

> +
> +        self.logger.info("Created bug with ID: %s", bug.id)
> +
> +        # Create bug tasks for distribution packages
> +        bug_task_set = getUtility(IBugTaskSet)
> +        for distro_package in affected_packages:
> +            bug_task_set.createTask(
> +                bug,
> +                owner,
> +                distro_package,
> +                importance=importances[distro_package.sourcepackagename],
> +            )
> +
> +        # Create bug tasks for distro series by adding nominations
> +        # This may create some extra bug tasks which we will delete later
> +        for distro_series in affected_distro_series:
> +            nomination = bug.addNomination(owner, distro_series)
> +            nomination.approve(owner)
> +
> +        # Set importance and status on distro series bug tasks
> +        # If the bug task's package/series isn't listed in the
> +        # CVE entry - delete it
> +        for bug_task in bug.bugtasks:
> +            distro_series = bug_task.distroseries
> +            if not distro_series:
> +                continue
> +            source_package_name = bug_task.sourcepackagename
> +            key = (source_package_name, distro_series)
> +            if key not in importances:
> +                # This combination of package/series is not present in the CVE
> +                # Delete it
> +                bug_task.delete(owner)
> +                continue
> +            bug_task.importance = importances[key]
> +            status, status_explanation = statuses_with_explanations[key]
> +            bug_task.transitionToStatus(status, owner)
> +            bug_task.status_explanation = status_explanation
> +
> +        # Link the bug to CVE
> +        bug.linkCVE(lp_cve, owner)
> +
> +        # Create the Vulnerabilities
> +        vulnerabilities = []
> +        for distribution in affected_distributions:
> +            vulnerabilities.append(
> +                self.create_vulnerability(bug, cve, lp_cve, distribution)
> +            )
> +
> +        return bug, vulnerabilities
> +
> +    def get_source_package_name(
> +        self, package_name: str
> +    ) -> Optional[SourcePackageName]:
> +        spn = getUtility(ISourcePackageNameSet).queryByName(package_name)
> +        if not spn:
> +            self.logger.warning("Could not find package: %s", package_name)
> +        return spn
> +
> +    def get_devel_series(
> +        self, distribution: Distribution
> +    ) -> Optional[DistroSeries]:
> +        for series in distribution.series:
> +            if series.status == SeriesStatus.FROZEN:
> +                return series
> +        for series in distribution.series:
> +            if series.status == SeriesStatus.DEVELOPMENT:
> +                return series
> +
> +    def get_distro_series(
> +        self, distro_series_name: str
> +    ) -> Optional[DistroSeries]:
> +        if "/" in distro_series_name:
> +            series_name, distro_name = distro_series_name.split("/", 1)
> +            if distro_name == "esm":
> +                # TODO: ESM needs special handling
> +                pass
> +            return
> +        else:
> +            series_name = distro_series_name
> +            distribution = getUtility(ILaunchpadCelebrities).ubuntu
> +            if series_name == "devel":
> +                distro_series = self.get_devel_series(distribution)
> +            else:
> +                distro_series = getUtility(IDistroSeriesSet).queryByName(
> +                    distribution, series_name
> +                )
> +        if not distro_series:
> +            self.logger.warning(
> +                "Could not find the distro series: %s", distro_series_name
> +            )
> +        return distro_series
> +
> +    def create_vulnerability(
> +        self,
> +        bug: BugModel,
> +        cve: CVE,
> +        lp_cve: CveModel,
> +        distribution: Distribution,
> +    ) -> Vulnerability:
> +        # TODO: is this correct?
> +        if cve.date_made_public:
> +            information_type = InformationType.PRIVATESECURITY
> +        else:
> +            information_type = InformationType.EMBARGOED

See above.

> +
> +        vulnerability = getUtility(IVulnerabilitySet).new(
> +            distribution=distribution,
> +            creator=bug.owner,
> +            cve=lp_cve,
> +            status=VulnerabilityStatus.NEEDS_TRIAGE,
> +            description=cve.description,
> +            notes=format_cve_notes(cve.notes),
> +            mitigation=cve.mitigation,
> +            importance=PRIORITY_MAP[cve.priority],
> +            information_type=information_type,
> +            date_made_public=cve.date_made_public,
> +        )  # type: Vulnerability
> +
> +        # TODO: is this correct?
> +        vulnerability.linkBug(bug, bug.owner)

Looks right.

> +
> +        self.logger.info("Create vulnerability with ID: %s", vulnerability)
> +
> +        return vulnerability
> +
> +
> +############################################################################
> +# The code below is copied from `cve_lib` module from `ubuntu-cve-tracker` #
> +############################################################################
> +
> +
> +def set_cve_dir(path):
> +    """Return a path with CVEs in it. Specifically:
> +    - if 'path' has CVEs in it, return path
> +    - if 'path' is a relative directory with no CVEs, see if UCT is defined
> +      and if so, see if 'UCT/path' has CVEs in it and return path
> +    """
> +    p = path
> +    found = False
> +    if len(glob.glob("%s/CVE-*" % path)) > 0:
> +        found = True
> +    elif not path.startswith("/") and "UCT" in os.environ:
> +        tmp = os.path.join(os.environ["UCT"], path)
> +        if len(glob.glob("%s/CVE-*" % tmp)) > 0:
> +            found = True
> +            p = tmp
> +            # print("INFO: using '%s'" % p, file=sys.stderr)
> +
> +    if not found:
> +        print(
> +            "WARN: could not find CVEs in '%s' (or relative to UCT)" % path,
> +            file=sys.stderr,
> +        )
> +    return p
> +
> +
> +if "UCT" in os.environ:
> +    active_dir = set_cve_dir(os.environ["UCT"] + "/active")
> +    retired_dir = set_cve_dir(os.environ["UCT"] + "/retired")
> +    ignored_dir = set_cve_dir(os.environ["UCT"] + "/ignored")
> +    embargoed_dir = os.environ["UCT"] + "/embargoed"
> +    meta_dir = os.path.join(os.environ["UCT"], "meta_lists")
> +    subprojects_dir = os.environ["UCT"] + "/subprojects"
> +else:
> +    active_dir = set_cve_dir("active")
> +    retired_dir = set_cve_dir("retired")
> +    ignored_dir = set_cve_dir("ignored")
> +    embargoed_dir = "embargoed"  # Intentionally not using set_cve_dir()
> +    meta_dir = os.path.join(
> +        os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
> +        "meta_lists",
> +    )
> +    subprojects_dir = "subprojects"
> +
> +PRODUCT_UBUNTU = "ubuntu"
> +
> +# common to all scripts
> +# these get populated by the contents of subprojects defined below
> +all_releases = []
> +eol_releases = []
> +external_releases = []
> +releases = []
> +devel_release = ""
> +
> +# known subprojects which are supported by cve_lib - in general each
> +# subproject is defined by the combination of a product and series as
> +# <product/series>.
> +#
> +# For each subproject, it is either internal (ie is part of this static
> +# dict) or external (found dynamically at runtime by
> +# load_external_subprojects()).
> +#
> +# eol specifies whether the subproject is now end-of-life.  packages
> +# specifies list of files containing the names of supported packages for the
> +# subproject. alias defines an alternate preferred name for the subproject
> +# (this is often used to support historical names for projects etc).
> +subprojects = {
> +    "stable-phone-overlay/vivid": {
> +        "eol": True,
> +        "packages": ["vivid-stable-phone-overlay-supported.txt"],
> +        "name": "Ubuntu Touch 15.04",
> +        "alias": "vivid/stable-phone-overlay",
> +    },
> +    "ubuntu-core/vivid": {
> +        "eol": True,
> +        "packages": ["vivid-ubuntu-core-supported.txt"],
> +        "name": "Ubuntu Core 15.04",
> +        "alias": "vivid/ubuntu-core",
> +    },
> +    "esm/precise": {
> +        "eol": True,
> +        "packages": ["precise-esm-supported.txt"],
> +        "name": "Ubuntu 12.04 ESM",
> +        "codename": "Precise Pangolin",
> +        "alias": "precise/esm",
> +        "ppa": "ubuntu-esm/esm",
> +        "parent": "ubuntu/precise",
> +        "description": (
> +            "Available with UA Infra or UA Desktop: "
> +            "https://ubuntu.com/advantage";
> +        ),
> +        "stamp": 1493521200,
> +    },
> +    "esm/trusty": {
> +        "eol": False,
> +        "packages": ["trusty-esm-supported.txt"],
> +        "name": "Ubuntu 14.04 ESM",
> +        "codename": "Trusty Tahr",
> +        "alias": "trusty/esm",
> +        "ppa": "ubuntu-esm/esm-infra-security",
> +        "parent": "ubuntu/trusty",
> +        "description": (
> +            "Available with UA Infra or UA Desktop: "
> +            "https://ubuntu.com/advantage";
> +        ),
> +        "stamp": 1556593200,
> +    },
> +    "esm-infra/xenial": {
> +        "eol": False,
> +        "components": ["main", "restricted"],
> +        "packages": ["esm-infra-xenial-supported.txt"],
> +        "name": "Ubuntu 16.04 ESM",
> +        "codename": "Xenial Xerus",
> +        "ppa": "ubuntu-esm/esm-infra-security",
> +        "parent": "ubuntu/xenial",
> +        "description": (
> +            "Available with UA Infra or UA Desktop: "
> +            "https://ubuntu.com/advantage";
> +        ),
> +        "stamp": 1618963200,
> +    },
> +    "fips/xenial": {
> +        "eol": False,
> +        "packages": ["fips-xenial-supported.txt"],
> +        "name": "Ubuntu 16.04 FIPS Certified",
> +        "codename": "Xenial Xerus",
> +        "ppa": "ubuntu-advantage/fips",
> +        "parent": "ubuntu/xenial",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "fips/bionic": {
> +        "eol": False,
> +        "packages": ["fips-bionic-supported.txt"],
> +        "name": "Ubuntu 18.04 FIPS Certified",
> +        "codename": "Bionic Beaver",
> +        "ppa": "ubuntu-advantage/fips",
> +        "parent": "ubuntu/bionic",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "fips/focal": {
> +        "eol": False,
> +        "packages": ["fips-focal-supported.txt"],
> +        "name": "Ubuntu 20.04 FIPS Certified",
> +        "codename": "Focal Fossa",
> +        "ppa": "ubuntu-advantage/fips",
> +        "parent": "ubuntu/bionic",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "fips-updates/xenial": {
> +        "eol": False,
> +        "packages": ["fips-updates-xenial-supported.txt"],
> +        "name": "Ubuntu 16.04 FIPS Compliant",
> +        "codename": "Xenial Xerus",
> +        "ppa": "ubuntu-advantage/fips-updates",
> +        "parent": "ubuntu/xenial",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "fips-updates/bionic": {
> +        "eol": False,
> +        "packages": ["fips-updates-bionic-supported.txt"],
> +        "name": "Ubuntu 18.04 FIPS Compliant",
> +        "codename": "Bionic Beaver",
> +        "ppa": "ubuntu-advantage/fips-updates",
> +        "parent": "ubuntu/bionic",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "fips-updates/focal": {
> +        "eol": False,
> +        "packages": ["fips-updates-focal-supported.txt"],
> +        "name": "Ubuntu 20.04 FIPS Compliant",
> +        "codename": "Focal Fossa",
> +        "ppa": "ubuntu-advantage/fips-updates",
> +        "parent": "ubuntu/bionic",
> +        "description": "Available with UA ... https://ubuntu.com/advantage";,
> +    },
> +    "ubuntu/warty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 4.10",
> +        "codename": "Warty Warthog",
> +        "alias": "warty",
> +        "description": "Interim Release",
> +        "stamp": 1098748800,
> +    },
> +    "ubuntu/hoary": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 5.04",
> +        "codename": "Hoary Hedgehog",
> +        "alias": "hoary",
> +        "description": "Interim Release",
> +        "stamp": 1112918400,
> +    },
> +    "ubuntu/breezy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 5.10",
> +        "codename": "Breezy Badger",
> +        "alias": "breezy",
> +        "description": "Interim Release",
> +        "stamp": 1129075200,
> +    },
> +    "ubuntu/dapper": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 6.06 LTS",
> +        "codename": "Dapper Drake",
> +        "alias": "dapper",
> +        "description": "Long Term Support",
> +        "stamp": 1149120000,
> +    },
> +    "ubuntu/edgy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 6.10",
> +        "codename": "Edgy Eft",
> +        "alias": "edgy",
> +        "description": "Interim Release",
> +        "stamp": 1161864000,
> +    },
> +    "ubuntu/feisty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 7.04",
> +        "codename": "Feisty Fawn",
> +        "alias": "feisty",
> +        "description": "Interim Release",
> +        "stamp": 1176984000,
> +    },
> +    "ubuntu/gutsy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 7.10",
> +        "codename": "Gutsy Gibbon",
> +        "alias": "gutsy",
> +        "description": "Interim Release",
> +        "stamp": 1192708800,
> +    },
> +    "ubuntu/hardy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 8.04 LTS",
> +        "codename": "Hardy Heron",
> +        "alias": "hardy",
> +        "description": "Long Term Support",
> +        "stamp": 1209038400,
> +    },
> +    "ubuntu/intrepid": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 8.10",
> +        "codename": "Intrepid Ibex",
> +        "alias": "intrepid",
> +        "description": "Interim Release",
> +        "stamp": 1225368000,
> +    },
> +    "ubuntu/jaunty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 9.04",
> +        "codename": "Jaunty Jackalope",
> +        "alias": "jaunty",
> +        "description": "Interim Release",
> +        "stamp": 1240488000,
> +    },
> +    "ubuntu/karmic": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 9.10",
> +        "codename": "Karmic Koala",
> +        "alias": "karmic",
> +        "description": "Interim Release",
> +        "stamp": 1256817600,
> +    },
> +    "ubuntu/lucid": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 10.04 LTS",
> +        "codename": "Lucid Lynx",
> +        "alias": "lucid",
> +        "description": "Long Term Support",
> +        "stamp": 1272565800,
> +    },
> +    "ubuntu/maverick": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 10.10",
> +        "codename": "Maverick Meerkat",
> +        "alias": "maverick",
> +        "description": "Interim Release",
> +        "stamp": 1286706600,
> +    },
> +    "ubuntu/natty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 11.04",
> +        "codename": "Natty Narwhal",
> +        "alias": "natty",
> +        "description": "Interim Release",
> +        "stamp": 1303822800,
> +    },
> +    "ubuntu/oneiric": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 11.10",
> +        "codename": "Oneiric Ocelot",
> +        "alias": "oneiric",
> +        "description": "Interim Release",
> +        "stamp": 1318446000,
> +    },
> +    "ubuntu/precise": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 12.04 LTS",
> +        "codename": "Precise Pangolin",
> +        "alias": "precise",
> +        "description": "Long Term Support",
> +        "stamp": 1335423600,
> +    },
> +    "ubuntu/quantal": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 12.10",
> +        "codename": "Quantal Quetzal",
> +        "alias": "quantal",
> +        "description": "Interim Release",
> +        "stamp": 1350547200,
> +    },
> +    "ubuntu/raring": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 13.04",
> +        "codename": "Raring Ringtail",
> +        "alias": "raring",
> +        "description": "Interim Release",
> +        "stamp": 1366891200,
> +    },
> +    "ubuntu/saucy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 13.10",
> +        "codename": "Saucy Salamander",
> +        "alias": "saucy",
> +        "description": "Interim Release",
> +        "stamp": 1381993200,
> +    },
> +    "ubuntu/trusty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 14.04 LTS",
> +        "codename": "Trusty Tahr",
> +        "alias": "trusty",
> +        "description": "Long Term Support",
> +        "stamp": 1397826000,
> +    },
> +    "ubuntu/utopic": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 14.10",
> +        "codename": "Utopic Unicorn",
> +        "alias": "utopic",
> +        "description": "Interim Release",
> +        "stamp": 1414083600,
> +    },
> +    "ubuntu/vivid": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 15.04",
> +        "codename": "Vivid Vervet",
> +        "alias": "vivid",
> +        "description": "Interim Release",
> +        "stamp": 1429027200,
> +    },
> +    "ubuntu/wily": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 15.10",
> +        "codename": "Wily Werewolf",
> +        "alias": "wily",
> +        "description": "Interim Release",
> +        "stamp": 1445518800,
> +    },
> +    "ubuntu/xenial": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 16.04 LTS",
> +        "codename": "Xenial Xerus",
> +        "alias": "xenial",
> +        "description": "Long Term Support",
> +        "stamp": 1461279600,
> +    },
> +    "ubuntu/yakkety": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 16.10",
> +        "codename": "Yakkety Yak",
> +        "alias": "yakkety",
> +        "description": "Interim Release",
> +        "stamp": 1476518400,
> +    },
> +    "ubuntu/zesty": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 17.04",
> +        "codename": "Zesty Zapus",
> +        "alias": "zesty",
> +        "description": "Interim Release",
> +        "stamp": 1492153200,
> +    },
> +    "ubuntu/artful": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 17.10",
> +        "codename": "Artful Aardvark",
> +        "alias": "artful",
> +        "description": "Interim Release",
> +        "stamp": 1508418000,
> +    },
> +    "ubuntu/bionic": {
> +        "eol": False,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 18.04 LTS",
> +        "codename": "Bionic Beaver",
> +        "alias": "bionic",
> +        "description": "Long Term Support",
> +        "stamp": 1524870000,
> +    },
> +    "ubuntu/cosmic": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 18.10",
> +        "codename": "Cosmic Cuttlefish",
> +        "alias": "cosmic",
> +        "description": "Interim Release",
> +        "stamp": 1540040400,
> +    },
> +    "ubuntu/disco": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 19.04",
> +        "codename": "Disco Dingo",
> +        "alias": "disco",
> +        "description": "Interim Release",
> +        "stamp": 1555581600,
> +    },
> +    "ubuntu/eoan": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 19.10",
> +        "codename": "Eoan Ermine",
> +        "alias": "eoan",
> +        "description": "Interim Release",
> +        "stamp": 1571234400,
> +    },
> +    "ubuntu/focal": {
> +        "eol": False,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 20.04 LTS",
> +        "codename": "Focal Fossa",
> +        "alias": "focal",
> +        "description": "Long Term Support",
> +        "stamp": 1587567600,
> +    },
> +    "ubuntu/groovy": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 20.10",
> +        "codename": "Groovy Gorilla",
> +        "alias": "groovy",
> +        "description": "Interim Release",
> +        "stamp": 1603288800,
> +    },
> +    "ubuntu/hirsute": {
> +        "eol": True,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 21.04",
> +        "codename": "Hirsute Hippo",
> +        "alias": "hirsute",
> +        "description": "Interim Release",
> +        "stamp": 1619049600,
> +    },
> +    "ubuntu/impish": {
> +        "eol": False,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 21.10",
> +        "codename": "Impish Indri",
> +        "alias": "impish",
> +        "description": "Interim Release",
> +        "stamp": 1634220000,
> +    },
> +    "ubuntu/jammy": {
> +        "eol": False,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 22.04 LTS",
> +        "codename": "Jammy Jellyfish",
> +        "alias": "jammy",
> +        "description": "Long Term Support",
> +        "stamp": 1650693600,
> +    },
> +    "ubuntu/kinetic": {
> +        "eol": False,
> +        "components": [
> +            "main",
> +            "restricted",
> +            "universe",
> +            "multiverse",
> +            "partner",
> +        ],
> +        "name": "Ubuntu 22.10",
> +        "codename": "Kinetic Kudu",
> +        "alias": "kinetic",
> +        "devel": True,  # there can be only one ⚔
> +        "description": "Interim Release",
> +    },
> +    "snap": {
> +        "eol": False,
> +        "packages": ["snap-supported.txt"],
> +    },
> +}
> +
> +
> +def product_series(rel):
> +    """Return the product,series tuple for rel."""
> +    series = ""
> +    parts = rel.split("/", 1)
> +    product = parts[0]
> +    if len(parts) == 2:
> +        series = parts[1]
> +    return product, series
> +
> +
> +# get the subproject details for rel along with
> +# it's canonical name, product and series
> +def get_subproject_details(rel):
> +    """Return the product,series,details tuple for rel."""
> +    canon, product, series, details = None, None, None, None
> +    try:
> +        details = subprojects[rel]
> +        product, series = product_series(rel)
> +        canon = product + "/" + series
> +    except (ValueError, KeyError):
> +        # look for alias
> +        for r in subprojects:
> +            try:
> +                if subprojects[r]["alias"] == rel:
> +                    product, series = product_series(r)
> +                    details = subprojects[r]
> +                    canon = product + "/" + series
> +                    break
> +            except KeyError:
> +                pass
> +            if details is not None:
> +                break
> +    return canon, product, series, details
> +
> +
> +def release_alias(rel):
> +    """Return the alias for rel or just rel if no alias is defined."""
> +    alias = rel
> +    _, _, _, details = get_subproject_details(rel)
> +    try:
> +        alias = details["alias"]
> +    except (KeyError, TypeError):
> +        pass
> +    return alias
> +
> +
> +def release_parent(rel):
> +    """Return the parent for rel or None if no parent is defined."""
> +    parent = None
> +    _, _, _, details = get_subproject_details(rel)
> +    try:
> +        parent = release_alias(details["parent"])
> +    except (KeyError, TypeError):
> +        pass
> +    return parent
> +
> +
> +def get_external_subproject_cve_dir(subproject):
> +    """Get the directory where CVE files are stored for the subproject.
> +
> +    Get the directory where CVE files are stored for a subproject. In
> +    general this is within the higher level project directory, not within
> +    the specific subdirectory for the particular series that defines this
> +    subproject.
> +
> +    """
> +    rel, product, _, _ = get_subproject_details(subproject)
> +    if rel not in external_releases:
> +        raise ValueError("%s is not an external subproject" % rel)
> +    # CVEs live in the product dir
> +    return os.path.join(subprojects_dir, product)
> +
> +
> +def get_external_subproject_dir(subproject):
> +    """Get the directory for the given external subproject."""
> +    rel, _, _, _ = get_subproject_details(subproject)
> +    if rel not in external_releases:
> +        raise ValueError("%s is not an external subproject" % rel)
> +    return os.path.join(subprojects_dir, rel)
> +
> +
> +def read_external_subproject_config(subproject):
> +    """Read and return the configuration for the given subproject."""
> +    sp_dir = get_external_subproject_dir(subproject)
> +    config_yaml = os.path.join(sp_dir, "config.yaml")
> +    with open(config_yaml) as cfg:
> +        return yaml.safe_load(cfg)
> +
> +
> +def find_files_recursive(path, name):
> +    """Return a list of all files under path with name."""
> +    matches = []
> +    for root, _, files in os.walk(path, followlinks=True):
> +        for f in files:
> +            if f == name:
> +                filepath = os.path.join(root, f)
> +                matches.append(filepath)
> +    return matches
> +
> +
> +def find_external_subproject_cves(cve):
> +    """
> +    Return the list of external subproject CVE snippets for the given CVE.
> +    """
> +    cves = []
> +    for rel in external_releases:
> +        # fallback to the series specific subdir rather than just the
> +        # top-level project directory even though this is preferred
> +        for d in [
> +            get_external_subproject_cve_dir(rel),
> +            get_external_subproject_dir(rel),
> +        ]:
> +            path = os.path.join(d, cve)
> +            if os.path.exists(path):
> +                cves.append(path)
> +    return cves
> +
> +
> +def load_external_subprojects():
> +    """Search for and load subprojects into the global subprojects dict.
> +
> +    Search for and load subprojects into the global subprojects dict.
> +
> +    A subproject is defined as a directory which resides within
> +    subprojects_dir and contains a supported.txt file. It can also contain
> +    a project.yml file which specifies configuration directives for the
> +    project as well as snippet CVE files. By convention, a subproject is
> +    usually defined as the combination of a product and series, ie:
> +
> +    esm-apps/focal
> +
> +    as such in this case there would expect to be within subprojects_dir a
> +    directory called esm-apps/ and within that a subdirectory called
> +    focal/. Inside this focal/ subdirectory a supported.txt file would list
> +    the packages which are supported by the esm-apps/focal subproject. By
> +    convention, snippet CVE files should reside within the esm-apps/
> +    project directory rather than the esm-apps/focal/ subdirectory to avoid
> +    unnecessary fragmentation across different subproject series.
> +
> +    """
> +    for supported_txt in find_files_recursive(
> +        subprojects_dir, "supported.txt"
> +    ):
> +        # rel name is the path component between subprojects/ and
> +        # /supported.txt
> +        rel = supported_txt[
> +            len(subprojects_dir) + 1:-len("supported.txt") - 1
> +        ]
> +        external_releases.append(rel)
> +        subprojects.setdefault(rel, {"packages": [], "eol": False})
> +        # an external subproject can append to an internal one
> +        subprojects[rel]["packages"].append(supported_txt)
> +        try:
> +            # use config to populate other parts of the
> +            # subproject settings
> +            config = read_external_subproject_config(rel)
> +            subprojects[rel].setdefault("ppa", config["ppa"])
> +            subprojects[rel].setdefault("name", config["name"])
> +            subprojects[rel].setdefault("description", config["description"])
> +            subprojects[rel].setdefault("parent", config["parent"])
> +        except Exception:
> +            pass
> +
> +
> +load_external_subprojects()
> +
> +for release in subprojects:
> +    details = subprojects[release]
> +    rel = release_alias(release)
> +    # prefer the alias name
> +    all_releases.append(rel)
> +    if details["eol"]:
> +        eol_releases.append(rel)
> +    if "devel" in details and details["devel"]:
> +        if devel_release != "":
> +            raise ValueError("there can be only one ⚔ devel")
> +        devel_release = rel
> +    # ubuntu specific releases
> +    product, series = product_series(release)
> +    if product == PRODUCT_UBUNTU:
> +        releases.append(rel)
> +
> +
> +VALID_TAGS = {
> +    "universe-binary": (
> +        "Binaries built from this source package are in universe and so are "
> +        "supported by the community. For more details see "
> +        "https://wiki.ubuntu.com/SecurityTeam/FAQ#Official_Support";
> +    ),
> +    "not-ue": (
> +        "This package is not directly supported by the Ubuntu Security Team"
> +    ),
> +    "apparmor": (
> +        "This vulnerability is mitigated in part by an AppArmor profile. "
> +        "For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#apparmor";
> +    ),
> +    "stack-protector": (
> +        "This vulnerability is mitigated in part by the use of gcc's stack "
> +        "protector in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#stack-protector";
> +    ),
> +    "fortify-source": (
> +        "This vulnerability is mitigated in part by the use of "
> +        "-D_FORTIFY_SOURCE=2 in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#fortify-source";
> +    ),
> +    "symlink-restriction": (
> +        "This vulnerability is mitigated in part by the use of symlink "
> +        "restrictions in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#symlink";
> +    ),
> +    "hardlink-restriction": (
> +        "This vulnerability is mitigated in part by the use of hardlink "
> +        "restrictions in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#hardlink";
> +    ),
> +    "heap-protector": (
> +        "This vulnerability is mitigated in part by the use of GNU C Library "
> +        "heap protector in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#heap-protector";
> +    ),
> +    "pie": (
> +        "This vulnerability is mitigated in part by the use of Position "
> +        "Independent Executables in Ubuntu. For more details see "
> +        "https://wiki.ubuntu.com/Security/Features#pie";
> +    ),
> +}
> +
> +# Possible CVE priorities
> +PRIORITIES = ["negligible", "low", "medium", "high", "critical"]
> +
> +NOTE_RE = re.compile(r"^\s+([A-Za-z0-9-]+)([>|]) *(.*)$")
> +
> +EXIT_FAIL = 1
> +EXIT_OKAY = 0
> +
> +# New CVE file format for release package field is:
> +# <product>[/<where or who>]_SOFTWARE[/<modifier>]: <status> [(<when>)]
> +# <product> is the Canonical product or supporting technology (eg, ‘esm-apps’
> +# or ‘snap’). ‘ubuntu’ is the implied product when ‘<product>/’ is omitted
> +# from the ‘<product>[/<where or who>]’ tuple (ie, where we might use
> +# ‘ubuntu/bionic_DEBSRCPKG’ for consistency, we continue to use
> +# ‘bionic_DEBSRCPKG’)
> +# <where or who> indicates where the software lives or in the case of snaps or
> +# other technologies with a concept of publishers, who the publisher is
> +# SOFTWARE is the name of the software as dictated by the product (eg, the deb
> +# source package, the name of the snap or the name of the software project
> +# <modifier> is an optional key for grouping collections of packages (eg,
> +# ‘melodic’ for the ROS Melodic release or ‘rocky’ for the OpenStack Rocky
> +# release)
> +# <status> indicates the statuses as defined in UCT (eg, needs-triage, needed,
> +# pending, released, etc)
> +# <when> indicates ‘when’ the software will be/was fixed when used with the
> +# ‘pending’ or ‘released’ status (eg, the source package version, snap
> +# revision, etc)
> +# e.g.: esm-apps/xenial_jackson-databind: released (2.4.2-3ubuntu0.1~esm2)
> +# e.g.: git/github.com/gogo/protobuf_gogoprotobuf: needs-triage
> +# This method should keep supporting existing current format:
> +# e.g.: bionic_jackson-databind: needs-triage
> +def parse_cve_release_package_field(
> +    cve, field, data, value, code, msg, linenum
> +):
> +    package = ""
> +    release = ""
> +    state = ""
> +    details = ""
> +    try:
> +        release, package = field.split("_", 1)
> +    except ValueError:
> +        msg += "%s: %d: bad field with '_': '%s'\n" % (cve, linenum, field)
> +        code = EXIT_FAIL
> +        return False, package, release, state, details, code, msg
> +
> +    try:
> +        info = value.split(" ", 1)
> +    except ValueError:
> +        msg += "%s: %d: missing state for '%s': '%s'\n" % (
> +            cve,
> +            linenum,
> +            field,
> +            value,
> +        )
> +        code = EXIT_FAIL
> +        return False, package, release, state, details, code, msg
> +
> +    state = info[0]
> +    if state == "":
> +        state = "needs-triage"
> +
> +    if len(info) < 2:
> +        details = ""
> +    else:
> +        details = info[1].strip()
> +
> +    if details.startswith("["):
> +        msg += "%s: %d: %s has details that starts with a bracket: '%s'\n" % (
> +            cve,
> +            linenum,
> +            field,
> +            details,
> +        )
> +        code = EXIT_FAIL
> +        return False, package, release, state, details, code, msg
> +
> +    if details.startswith("("):
> +        details = details[1:]
> +    if details.endswith(")"):
> +        details = details[:-1]
> +
> +    # Work-around for old-style of only recording released versions
> +    if details == "" and state[0] in ("0123456789"):
> +        details = state
> +        state = "released"
> +
> +    valid_states = [
> +        "needs-triage",
> +        "needed",
> +        "active",
> +        "pending",
> +        "released",
> +        "deferred",
> +        "DNE",
> +        "ignored",
> +        "not-affected",
> +    ]
> +    if state not in valid_states:
> +        msg += (
> +            "%s: %d: %s has unknown state: '%s' (valid states are: %s)\n"
> +            % (
> +                cve,
> +                linenum,
> +                field,
> +                state,
> +                " ".join(valid_states),
> +            )
> +        )
> +        code = EXIT_FAIL
> +        return False, package, release, state, details, code, msg
> +
> +    # Verify "released" kernels have version details
> +    # if state == 'released' and package in kernel_srcs and details == '':
> +    #    msg += "%s: %s_%s has state '%s' but lacks version note\n" % (
> +    #       cve, package, release, state
> +    #    )
> +    #    code = EXIT_FAIL
> +
> +    # Verify "active" states have an Assignee
> +    if state == "active" and data["Assigned-to"].strip() == "":
> +        msg += "%s: %d: %s has state '%s' but lacks 'Assigned-to'\n" % (
> +            cve,
> +            linenum,
> +            field,
> +            state,
> +        )
> +        code = EXIT_FAIL
> +        return False, package, release, state, details, code, msg
> +
> +    return True, package, release, state, details, code, msg
> +
> +
> +class NotesParser:
> +    def __init__(self):
> +        self.notes = list()
> +        self.user = None
> +        self.separator = None
> +        self.note = None
> +
> +    def parse_line(self, cve, line, linenum, code):
> +        msg = ""
> +        m = NOTE_RE.match(line)
> +        if m is not None:
> +            new_user = m.group(1)
> +            new_sep = m.group(2)
> +            new_note = m.group(3)
> +        else:
> +            # follow up comments should have 2 space indent and
> +            # an author
> +            if self.user is None:
> +                msg += "%s: %d: Note entry with no author: '%s'\n" % (
> +                    cve,
> +                    linenum,
> +                    line[1:],
> +                )
> +                code = EXIT_FAIL
> +            if not line.startswith("  "):
> +                msg += (
> +                    "%s: %d: Note continuations should be indented by "
> +                    "2 spaces: '%s'.\n" % (cve, linenum, line)
> +                )
> +                code = EXIT_FAIL
> +            new_user = self.user
> +            new_sep = self.separator
> +            new_note = line.strip()
> +        if self.user and self.separator and self.note:
> +            # if is different user, start a new note
> +            if new_user != self.user:
> +                self.notes.append((self.user, self.note))
> +                self.user = new_user
> +                self.note = new_note
> +                self.separator = new_sep
> +            elif new_sep != self.separator:
> +                # finish this note and start a new one since this has new
> +                # semantics
> +                self.notes.append((self.user, self.note))
> +                self.separator = new_sep
> +                self.note = new_note
> +            else:
> +                if self.separator == "|":
> +                    self.note = self.note + " " + new_note
> +                else:
> +                    assert self.separator == ">"
> +                    self.note = self.note + "\n" + new_note
> +        else:
> +            # this is the first note
> +            self.user = new_user
> +            self.separator = new_sep
> +            self.note = new_note
> +        return code, msg
> +
> +    def finalize(self):
> +        if self.user is not None and self.note is not None:
> +            # add last Note
> +            self.notes.append((self.user, self.note))
> +            self.user = None
> +            self.note = None
> +        notes = self.notes
> +        self.user = None
> +        self.separator = None
> +        self.notes = None
> +        return notes
> +
> +
> +def load_cve(cve, strict=False, srcmap=None):
> +    """Loads a given CVE into:
> +    dict( fields...
> +          'pkgs' -> dict(  pkg -> dict(  release ->  (state, details)   ) )
> +        )
> +    """
> +
> +    msg = ""
> +    code = EXIT_OKAY
> +    required_fields = [
> +        "Candidate",
> +        "PublicDate",
> +        "References",
> +        "Description",
> +        "Ubuntu-Description",
> +        "Notes",
> +        "Bugs",
> +        "Priority",
> +        "Discovered-by",
> +        "Assigned-to",
> +        "CVSS",
> +    ]
> +    extra_fields = ["CRD", "PublicDateAtUSN", "Mitigation"]
> +
> +    data = dict()
> +    # maps entries in data to their source line - if didn't supply one
> +    # create a local one to simplify the code
> +    if srcmap is None:
> +        srcmap = dict()
> +    srcmap.setdefault("pkgs", dict())
> +    srcmap.setdefault("tags", dict())
> +    data.setdefault("tags", dict())
> +    srcmap.setdefault("patches", dict())
> +    data.setdefault("patches", dict())
> +    affected = dict()
> +    lastfield = ""
> +    fields_seen = []
> +    if not os.path.exists(cve):
> +        raise ValueError("File does not exist: '%s'" % (cve))
> +    linenum = 0
> +    notes_parser = NotesParser()
> +    cvss_entries = []
> +
> +    cve_file = codecs.open(cve, encoding="utf-8")
> +
> +    for line in cve_file.readlines():
> +        line = line.rstrip()
> +        linenum += 1
> +
> +        # Ignore blank/commented lines
> +        if len(line) == 0 or line.startswith("#"):
> +            continue
> +        if line.startswith(" "):
> +            try:
> +                # parse Notes properly
> +                if lastfield == "Notes":
> +                    code, newmsg = notes_parser.parse_line(
> +                        cve, line, linenum, code
> +                    )
> +                    if code != EXIT_OKAY:
> +                        msg += newmsg
> +                elif "Patches_" in lastfield:
> +                    try:
> +                        _, pkg = lastfield.split("_", 1)
> +                        patch_type, entry = line.split(":", 1)
> +                        patch_type = patch_type.strip()
> +                        entry = entry.strip()
> +                        data["patches"][pkg].append((patch_type, entry))
> +                        srcmap["patches"][pkg].append((cve, linenum))
> +                    except Exception as e:
> +                        msg += (
> +                            "%s: %d: Failed to parse '%s' entry %s: %s\n"
> +                            % (
> +                                cve,
> +                                linenum,
> +                                lastfield,
> +                                line,
> +                                e,
> +                            )
> +                        )
> +                        code = EXIT_FAIL
> +                elif lastfield == "CVSS":
> +                    try:
> +                        cvss = dict()
> +                        result = re.search(
> +                            r" (.+)\: (\S+)( \[(.*) (.*)\])?", line
> +                        )
> +                        if result is None:
> +                            continue
> +                        cvss["source"] = result.group(1)
> +                        cvss["vector"] = result.group(2)
> +                        entry = parse_cvss(cvss["vector"])
> +                        if entry is None:
> +                            raise RuntimeError(
> +                                "Failed to parse_cvss() without raising "
> +                                "an exception."
> +                            )
> +                        if result.group(3):
> +                            cvss["baseScore"] = result.group(4)
> +                            cvss["baseSeverity"] = result.group(5)
> +
> +                        cvss_entries.append(cvss)
> +                        # CVSS in srcmap will be a tuple since this is the
> +                        # line where the CVSS block starts - so convert it
> +                        # to a dict first if needed
> +                        if type(srcmap["CVSS"]) is tuple:
> +                            srcmap["CVSS"] = dict()
> +                        srcmap["CVSS"].setdefault(
> +                            cvss["source"], (cve, linenum)
> +                        )
> +                    except Exception as e:
> +                        msg += "%s: %d: Failed to parse CVSS: %s\n" % (
> +                            cve,
> +                            linenum,
> +                            e,
> +                        )
> +                        code = EXIT_FAIL
> +                else:
> +                    data[lastfield] += "\n%s" % (line[1:])
> +            except KeyError as e:
> +                msg += "%s: %d: bad line '%s' (%s)\n" % (cve, linenum, line, e)
> +                code = EXIT_FAIL
> +            continue
> +
> +        try:
> +            field, value = line.split(":", 1)
> +        except ValueError as e:
> +            msg += "%s: %d: bad line '%s' (%s)\n" % (cve, linenum, line, e)
> +            code = EXIT_FAIL
> +            continue
> +
> +        lastfield = field = field.strip()
> +        if field in fields_seen:
> +            msg += "%s: %d: repeated field '%s'\n" % (cve, linenum, field)
> +            code = EXIT_FAIL
> +        else:
> +            fields_seen.append(field)
> +        value = value.strip()
> +        if field == "Candidate":
> +            data.setdefault(field, value)
> +            srcmap.setdefault(field, (cve, linenum))
> +            if (
> +                value != ""
> +                and not value.startswith("CVE-")
> +                and not value.startswith("UEM-")
> +                and not value.startswith("EMB-")
> +            ):
> +                msg += (
> +                    "%s: %d: unknown Candidate '%s' "
> +                    "(must be /(CVE|UEM|EMB)-/)\n"
> +                    % (
> +                        cve,
> +                        linenum,
> +                        value,
> +                    )
> +                )
> +                code = EXIT_FAIL
> +        elif "Priority" in field:
> +            # For now, throw away comments on Priority fields
> +            if " " in value:
> +                value = value.split()[0]
> +            if "Priority_" in field:
> +                try:
> +                    _, pkg = field.split("_", 1)
> +                except ValueError:
> +                    msg += "%s: %d: bad field with 'Priority_': '%s'\n" % (
> +                        cve,
> +                        linenum,
> +                        field,
> +                    )
> +                    code = EXIT_FAIL
> +                    continue
> +            data.setdefault(field, value)
> +            srcmap.setdefault(field, (cve, linenum))
> +            if value not in ["untriaged", "not-for-us"] + PRIORITIES:
> +                msg += "%s: %d: unknown Priority '%s'\n" % (
> +                    cve,
> +                    linenum,
> +                    value,
> +                )
> +                code = EXIT_FAIL
> +        elif "Patches_" in field:
> +            try:
> +                _, pkg = field.split("_", 1)
> +            except ValueError:
> +                msg += "%s: %d: bad field with 'Patches_': '%s'\n" % (
> +                    cve,
> +                    linenum,
> +                    field,
> +                )
> +                code = EXIT_FAIL
> +                continue
> +            # value should be empty
> +            if len(value) > 0:
> +                msg += "%s: %d: '%s' field should have no value\n" % (
> +                    cve,
> +                    linenum,
> +                    field,
> +                )
> +                code = EXIT_FAIL
> +                continue
> +            data["patches"].setdefault(pkg, list())
> +            srcmap["patches"].setdefault(pkg, list())
> +        elif "Tags_" in field:
> +            """These are processed into the "tags" hash"""
> +            try:
> +                _, pkg = field.split("_", 1)
> +            except ValueError:
> +                msg += "%s: %d: bad field with 'Tags_': '%s'\n" % (
> +                    cve,
> +                    linenum,
> +                    field,
> +                )
> +                code = EXIT_FAIL
> +                continue
> +            data["tags"].setdefault(pkg, set())
> +            srcmap["tags"].setdefault(pkg, (cve, linenum))
> +            for word in value.strip().split(" "):
> +                if word not in VALID_TAGS:
> +                    msg += "%s: %d: invalid tag '%s': '%s'\n" % (
> +                        cve,
> +                        linenum,
> +                        word,
> +                        field,
> +                    )
> +                    code = EXIT_FAIL
> +                    continue
> +                data["tags"][pkg].add(word)
> +        elif "_" in field:
> +            (
> +                success,
> +                pkg,
> +                rel,
> +                state,
> +                details,
> +                code,
> +                msg,
> +            ) = parse_cve_release_package_field(
> +                cve, field, data, value, code, msg, linenum
> +            )
> +            if not success:
> +                assert code == EXIT_FAIL
> +                continue
> +            canon, _, _, _ = get_subproject_details(rel)
> +            if canon is None and rel not in ["upstream", "devel"]:
> +                msg += "%s: %d: unknown entry '%s'\n" % (cve, linenum, rel)
> +                code = EXIT_FAIL
> +                continue
> +            affected.setdefault(pkg, dict())
> +            if rel in affected[pkg]:
> +                msg += (
> +                    "%s: %d: duplicate entry for '%s': original at line %d\n"
> +                    % (
> +                        cve,
> +                        linenum,
> +                        rel,
> +                        srcmap["pkgs"][pkg][rel][1],
> +                    )
> +                )
> +                code = EXIT_FAIL
> +                continue
> +            affected[pkg].setdefault(rel, [state, details])
> +            srcmap["pkgs"].setdefault(pkg, dict())
> +            srcmap["pkgs"][pkg].setdefault(rel, (cve, linenum))
> +        elif field not in required_fields + extra_fields:
> +            msg += "%s: %d: unknown field '%s'\n" % (cve, linenum, field)
> +            code = EXIT_FAIL
> +        else:
> +            data.setdefault(field, value)
> +            srcmap.setdefault(field, (cve, linenum))
> +
> +    cve_file.close()
> +
> +    data["Notes"] = notes_parser.finalize()
> +    data["CVSS"] = cvss_entries
> +
> +    # Check for required fields
> +    for field in required_fields:
> +        nonempty = ["Candidate"]
> +        if strict:
> +            nonempty += ["PublicDate"]
> +        # boilerplate files are special and can (should?) be empty
> +        if "boilerplate" in cve:
> +            nonempty = []
> +
> +        if field not in data or field not in fields_seen:
> +            msg += "%s: %d: missing field '%s'\n" % (cve, linenum, field)
> +            code = EXIT_FAIL
> +        elif field in nonempty and data[field].strip() == "":
> +            msg += "%s: %d: required field '%s' is empty\n" % (
> +                cve,
> +                linenum,
> +                field,
> +            )
> +            code = EXIT_FAIL
> +
> +    # Fill in defaults for missing fields
> +    if "Priority" not in data:
> +        data.setdefault("Priority", "untriaged")
> +        srcmap.setdefault("Priority", (cve, 1))
> +    # Perform override fields
> +    if "PublicDateAtUSN" in data:
> +        data["PublicDate"] = data["PublicDateAtUSN"]
> +        srcmap["PublicDate"] = srcmap["PublicDateAtUSN"]
> +    if (
> +        "CRD" in data
> +        and data["CRD"].strip() != ""
> +        and data["PublicDate"] != data["CRD"]
> +    ):
> +        if cve.startswith("embargoed"):
> +            print(
> +                "%s: %d: adjusting PublicDate to use CRD: %s"
> +                % (cve, linenum, data["CRD"]),
> +                file=sys.stderr,
> +            )
> +        data["PublicDate"] = data["CRD"]
> +        srcmap["PublicDate"] = srcmap["CRD"]
> +
> +    # entries need an upstream entry if any entries are from the internal
> +    # list of subprojects
> +    for pkg in affected:
> +        needs_upstream = False
> +        for rel in affected[pkg]:
> +            if rel not in external_releases:
> +                needs_upstream = True
> +        if needs_upstream and "upstream" not in affected[pkg]:
> +            msg += "%s: %d: missing upstream '%s'\n" % (cve, linenum, pkg)
> +            code = EXIT_FAIL
> +
> +    data["pkgs"] = affected
> +
> +    code, msg = load_external_subproject_cve_data(cve, data, srcmap, code, msg)
> +
> +    if code != EXIT_OKAY:
> +        raise ValueError(msg.strip())
> +    return data
> +
> +
> +def amend_external_subproject_pkg(cve, data, srcmap, amendments, code, msg):
> +    linenum = 0
> +    for line in amendments.splitlines():
> +        linenum += 1
> +        if len(line) == 0 or line.startswith("#") or line.startswith(" "):
> +            continue
> +        try:
> +            field, value = line.split(":", 1)
> +            field = field.strip()
> +            value = value.strip()
> +        except ValueError as e:
> +            msg += "%s: bad line '%s' (%s)\n" % (cve, line, e)
> +            code = EXIT_FAIL
> +            return code, msg
> +
> +        if "_" in field:
> +            (
> +                success,
> +                pkg,
> +                release,
> +                state,
> +                details,
> +                code,
> +                msg,
> +            ) = parse_cve_release_package_field(
> +                cve, field, data, value, code, msg, linenum
> +            )
> +            if not success:
> +                return code, msg
> +
> +            data.setdefault("pkgs", dict())
> +            data["pkgs"].setdefault(pkg, dict())
> +            srcmap["pkgs"].setdefault(pkg, dict())
> +            # override existing release info if it exists
> +            data["pkgs"][pkg][release] = [state, details]
> +            srcmap["pkgs"][pkg][release] = (cve, linenum)
> +
> +    return code, msg
> +
> +
> +def load_external_subproject_cve_data(cve, data, srcmap, code, msg):
> +    cve_id = os.path.basename(cve)
> +    for f in find_external_subproject_cves(cve_id):
> +        with codecs.open(f, "r", encoding="utf-8") as fp:
> +            amendments = fp.read()
> +            fp.close()
> +        code, msg = amend_external_subproject_pkg(
> +            f, data, srcmap, amendments, code, msg
> +        )
> +
> +    return code, msg
> +
> +
> +def parse_cvss(cvss):
> +    # parse a CVSS string into components suitable for MITRE / NVD JSON
> +    # format - assumes only the Base metric group from
> +    # https://www.first.org/cvss/specification-document since this is
> +    # mandatory - also validates by raising exceptions on errors
> +    metrics = {
> +        "attackVector": {
> +            "abbrev": "AV",
> +            "values": {
> +                "NETWORK": 0.85,
> +                "ADJACENT": 0.62,
> +                "LOCAL": 0.55,
> +                "PHYSICAL": 0.2,
> +            },
> +        },
> +        "attackComplexity": {
> +            "abbrev": "AC",
> +            "values": {"LOW": 0.77, "HIGH": 0.44},
> +        },
> +        "privilegesRequired": {
> +            "abbrev": "PR",
> +            "values": {
> +                "NONE": 0.85,
> +                # [ scope unchanged, changed ]
> +                "LOW": [0.62, 0.68],  # depends on scope
> +                "HIGH": [0.27, 0.5],
> +            },  # depends on scope
> +        },
> +        "userInteraction": {
> +            "abbrev": "UI",
> +            "values": {"NONE": 0.85, "REQUIRED": 0.62},
> +        },
> +        "scope": {"abbrev": "S", "values": {"UNCHANGED", "CHANGED"}},
> +        "confidentialityImpact": {
> +            "abbrev": "C",
> +            "values": {"HIGH": 0.56, "LOW": 0.22, "NONE": 0},
> +        },
> +        "integrityImpact": {
> +            "abbrev": "I",
> +            "values": {"HIGH": 0.56, "LOW": 0.22, "NONE": 0},
> +        },
> +        "availabilityImpact": {
> +            "abbrev": "A",
> +            "values": {"HIGH": 0.56, "LOW": 0.22, "NONE": 0},
> +        },
> +    }
> +    severities = {
> +        "NONE": 0.0,
> +        "LOW": 3.9,
> +        "MEDIUM": 6.9,
> +        "HIGH": 8.9,
> +        "CRITICAL": 10.0,
> +    }
> +    js = None
> +    # coerce cvss into a string
> +    cvss = str(cvss)
> +    for c in cvss.split("/"):
> +        elements = c.split(":")
> +        if len(elements) != 2:
> +            raise ValueError("Invalid CVSS element '%s'" % c)
> +        valid = False
> +        metric = elements[0]
> +        value = elements[1]
> +        if metric == "CVSS":
> +            if value == "3.0" or value == "3.1":
> +                js = {"baseMetricV3": {"cvssV3": {"version": value}}}
> +                valid = True
> +            else:
> +                raise ValueError(
> +                    "Unable to process CVSS version '%s' (we only support 3.x)"
> +                    % value
> +                )
> +        else:
> +            for m in metrics.keys():
> +                if metrics[m]["abbrev"] == metric:
> +                    for val in metrics[m]["values"]:
> +                        if val[0:1] == value:
> +                            js["baseMetricV3"]["cvssV3"][m] = val
> +                            valid = True
> +        if not valid:
> +            raise ValueError("Invalid CVSS elements '%s:%s'" % (metric, value))
> +    for m in metrics.keys():
> +        if m not in js["baseMetricV3"]["cvssV3"]:
> +            raise ValueError("Missing required CVSS base element %s" % m)
> +    # add vectorString
> +    js["baseMetricV3"]["cvssV3"]["vectorString"] = cvss
> +
> +    # now calculate CVSS scores
> +    iss = 1 - (
> +        (
> +            1
> +            - metrics["confidentialityImpact"]["values"][
> +                js["baseMetricV3"]["cvssV3"]["confidentialityImpact"]
> +            ]
> +        )
> +        * (
> +            1
> +            - metrics["integrityImpact"]["values"][
> +                js["baseMetricV3"]["cvssV3"]["integrityImpact"]
> +            ]
> +        )
> +        * (
> +            1
> +            - metrics["availabilityImpact"]["values"][
> +                js["baseMetricV3"]["cvssV3"]["availabilityImpact"]
> +            ]
> +        )
> +    )
> +    if js["baseMetricV3"]["cvssV3"]["scope"] == "UNCHANGED":
> +        impact = 6.42 * iss
> +    else:
> +        impact = 7.52 * (iss - 0.029) - 3.25 * pow(iss - 0.02, 15)
> +    attackVector = metrics["attackVector"]["values"][
> +        js["baseMetricV3"]["cvssV3"]["attackVector"]
> +    ]
> +    attackComplexity = metrics["attackComplexity"]["values"][
> +        js["baseMetricV3"]["cvssV3"]["attackComplexity"]
> +    ]
> +    privilegesRequired = metrics["privilegesRequired"]["values"][
> +        js["baseMetricV3"]["cvssV3"]["privilegesRequired"]
> +    ]
> +    # privilegesRequires could be a list if is LOW or HIGH (and then the
> +    # value depends on whether the scope is unchanged or not)
> +    if isinstance(privilegesRequired, list):
> +        if js["baseMetricV3"]["cvssV3"]["scope"] == "UNCHANGED":
> +            privilegesRequired = privilegesRequired[0]
> +        else:
> +            privilegesRequired = privilegesRequired[1]
> +    userInteraction = metrics["userInteraction"]["values"][
> +        js["baseMetricV3"]["cvssV3"]["userInteraction"]
> +    ]
> +    exploitability = (
> +        8.22
> +        * attackVector
> +        * attackComplexity
> +        * privilegesRequired
> +        * userInteraction
> +    )
> +    if impact <= 0:
> +        base_score = 0
> +    elif js["baseMetricV3"]["cvssV3"]["scope"] == "UNCHANGED":
> +        # use ceil and * 10 / 10 to get rounded up to nearest 10th decimal
> +        # (where rounded-up is say 0.01 -> 0.1)
> +        base_score = math.ceil(min(impact + exploitability, 10) * 10) / 10
> +    else:
> +        base_score = (
> +            math.ceil(min(1.08 * (impact + exploitability), 10) * 10) / 10
> +        )
> +    js["baseMetricV3"]["cvssV3"]["baseScore"] = base_score
> +    for severity in severities.keys():
> +        if base_score <= severities[severity]:
> +            js["baseMetricV3"]["cvssV3"]["baseSeverity"] = severity
> +            break
> +    # these use normal rounding to 1 decimal place
> +    js["baseMetricV3"]["exploitabilityScore"] = round(exploitability * 10) / 10
> +    js["baseMetricV3"]["impactScore"] = round(impact * 10) / 10
> +    return js


-- 
https://code.launchpad.net/~andrey-fedoseev/launchpad/+git/launchpad/+merge/425142
Your team Launchpad code reviewers is requested to review the proposed merge of ~andrey-fedoseev/launchpad:uct-import into launchpad:master.



References