launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29297
[Merge] ~andrey-fedoseev/launchpad:uct-import-export-patches into launchpad:master
Andrey Fedoseev has proposed merging ~andrey-fedoseev/launchpad:uct-import-export-patches into launchpad:master with ~andrey-fedoseev/launchpad:bug-attachment-url as a prerequisite.
Commit message:
UCT import/export: handle patch URLs
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~andrey-fedoseev/launchpad/+git/launchpad/+merge/431263
Patch URLs are imported as `BugAttachments` with URLs
Attachment title consists of:
1. package name, e.g. "linux"
2. patch type, e.g. "upstream" or "other"
3. optional notes, e.g. "1.5.4"
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~andrey-fedoseev/launchpad:uct-import-export-patches into launchpad:master.
diff --git a/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222 b/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
index 30574d6..95a74f0 100644
--- a/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
+++ b/lib/lp/bugs/scripts/tests/sampledata/CVE-2022-23222
@@ -30,6 +30,8 @@ CVSS:
Patches_linux:
break-fix: 457f44363a8894135c85b7a9afd2bd8196db24ab c25b2ae136039ffa820c26138ed4a5e5f3ab3841|local-CVE-2022-23222-fix
+ upstream: https://github.com/389ds/389-ds-base/commit/58dbf084a63e6dbbd999bf6a70475fad8255f26a (1.4.4)
+ upstream: https://github.com/389ds/389-ds-base/commit/2e5b526012612d1d6ccace46398bee679a730271
upstream_linux: released (5.17~rc1)
impish_linux: released (5.13.0-37.42)
devel_linux: not-affected (5.15.0-25.25)
diff --git a/lib/lp/bugs/scripts/tests/test_uct.py b/lib/lp/bugs/scripts/tests/test_uct.py
index 00bc2c8..04580b8 100644
--- a/lib/lp/bugs/scripts/tests/test_uct.py
+++ b/lib/lp/bugs/scripts/tests/test_uct.py
@@ -120,7 +120,19 @@ class TestUCTRecord(TestCase):
"c25b2ae136039ffa820c26138ed4a5e5f3ab3841|"
"local-CVE-2022-23222-fix"
),
- )
+ ),
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/58dbf084a63e6dbbd999bf6a70475fad8255f26a (1.4.4)" # noqa: 501
+ ),
+ ),
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/commit/2e5b526012612d1d6ccace46398bee679a730271" # noqa: 501
+ ),
+ ),
],
),
UCTRecord.Package(
@@ -260,7 +272,22 @@ class TestCVE(TestCaseWithFactory):
],
priority=None,
tags=set(),
- patches=[],
+ patches=[
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/"
+ "commit/123 (1.4.4)"
+ ),
+ ),
+ UCTRecord.Patch(
+ patch_type="upstream",
+ entry=(
+ "https://github.com/389ds/389-ds-base/"
+ "commit/456"
+ ),
+ ),
+ ],
),
UCTRecord.Package(
name=dsp2.sourcepackagename.name,
@@ -417,6 +444,20 @@ class TestCVE(TestCaseWithFactory):
),
),
],
+ patch_urls=[
+ CVE.PatchURL(
+ package_name=dsp1.sourcepackagename,
+ type="upstream",
+ url="https://github.com/389ds/389-ds-base/" "commit/123",
+ notes="1.4.4",
+ ),
+ CVE.PatchURL(
+ package_name=dsp1.sourcepackagename,
+ type="upstream",
+ url="https://github.com/389ds/389-ds-base/" "commit/456",
+ notes=None,
+ ),
+ ],
)
def test_make_from_uct_record(self):
@@ -428,6 +469,40 @@ class TestCVE(TestCaseWithFactory):
self.assertListEqual(self.uct_record.packages, uct_record.packages)
self.assertDictEqual(self.uct_record.__dict__, uct_record.__dict__)
+ def test_get_patches(self):
+ spn = self.factory.makeSourcePackageName()
+ self.assertListEqual(
+ [
+ CVE.PatchURL(
+ package_name=spn,
+ url="https://github.com/repo/1",
+ type="upstream",
+ notes=None,
+ ),
+ CVE.PatchURL(
+ package_name=spn,
+ url="https://github.com/repo/2",
+ type="upstream",
+ notes="1.2.3",
+ ),
+ ],
+ list(
+ CVE.get_patch_urls(
+ spn,
+ [
+ UCTRecord.Patch("break-fix", "- -"),
+ UCTRecord.Patch(
+ "upstream", "https://github.com/repo/1"
+ ),
+ UCTRecord.Patch(
+ "upstream", "https://github.com/repo/2 (1.2.3)"
+ ),
+ UCTRecord.Patch("other", "foo"),
+ ],
+ )
+ ),
+ )
+
class TestUCTImporterExporter(TestCaseWithFactory):
@@ -610,6 +685,20 @@ class TestUCTImporterExporter(TestCaseWithFactory):
),
),
],
+ patch_urls=[
+ CVE.PatchURL(
+ package_name=self.ubuntu_package.sourcepackagename,
+ type="upstream",
+ url="https://github.com/389ds/389-ds-base/" "commit/123",
+ notes="1.4.4",
+ ),
+ CVE.PatchURL(
+ package_name=self.ubuntu_package.sourcepackagename,
+ type="upstream",
+ url="https://github.com/389ds/389-ds-base/" "commit/456",
+ notes=None,
+ ),
+ ],
)
self.importer = UCTImporter()
self.exporter = UCTExporter()
@@ -630,6 +719,8 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.assertEqual(len(cve.bug_urls), len(watches))
self.assertEqual(sorted(cve.bug_urls), sorted(w.url for w in watches))
+ self.checkBugAttachments(bug, cve)
+
def checkBugTasks(self, bug: Bug, cve: CVE):
bug_tasks = bug.bugtasks # type: List[BugTask]
@@ -690,6 +781,20 @@ class TestUCTImporterExporter(TestCaseWithFactory):
for t in bug_tasks:
self.assertEqual(cve.assignee, t.assignee)
+ def checkBugAttachments(self, bug: Bug, cve: CVE):
+ attachments_by_url = {a.url: a for a in bug.attachments if a.url}
+ for patch_url in cve.patch_urls:
+ self.assertIn(patch_url.url, attachments_by_url)
+ attachment = attachments_by_url[patch_url.url]
+ expected_title = "{}/{}".format(
+ patch_url.package_name.name, patch_url.type
+ )
+ if patch_url.notes:
+ expected_title = "{}/{}".format(
+ expected_title, patch_url.notes
+ )
+ self.assertEqual(expected_title, attachment.title)
+
def checkVulnerabilities(self, bug: Bug, cve: CVE):
vulnerabilities = bug.vulnerabilities
@@ -758,6 +863,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.assertEqual(expected.notes, actual.notes)
self.assertEqual(expected.mitigation, actual.mitigation)
self.assertListEqual(expected.cvss, actual.cvss)
+ self.assertListEqual(expected.patch_urls, actual.patch_urls)
def test_create_bug(self):
bug = self.importer.create_bug(self.cve, self.lp_cve)
@@ -769,7 +875,7 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.assertEqual([self.lp_cve], bug.cves)
activities = list(bug.activity)
- self.assertEqual(4, len(activities))
+ self.assertEqual(6, len(activities))
import_bug_activity = activities[-1]
self.assertEqual(self.bug_importer, import_bug_activity.person)
self.assertEqual("bug", import_bug_activity.whatchanged)
@@ -1005,6 +1111,22 @@ class TestUCTImporterExporter(TestCaseWithFactory):
self.importer.update_bug(bug, cve, self.lp_cve)
self.checkBug(bug, cve)
+ def test_update_patch_urls(self):
+ bug = self.importer.create_bug(self.cve, self.lp_cve)
+ cve = self.cve
+
+ # Add new patch URL
+ cve.patch_urls.append(
+ CVE.PatchURL(
+ package_name=cve.distro_packages[0].package_name,
+ type="upstream",
+ url="https://github.com/123",
+ notes=None,
+ )
+ )
+ self.importer.update_bug(bug, cve, self.lp_cve)
+ self.checkBug(bug, cve)
+
def test_import_cve(self):
self.importer.import_cve(self.cve)
self.assertIsNotNone(
diff --git a/lib/lp/bugs/scripts/uct/models.py b/lib/lp/bugs/scripts/uct/models.py
index b7b7f6e..a63c805 100644
--- a/lib/lp/bugs/scripts/uct/models.py
+++ b/lib/lp/bugs/scripts/uct/models.py
@@ -2,16 +2,29 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
import logging
+import re
from collections import OrderedDict, defaultdict
from datetime import datetime
from enum import Enum
from pathlib import Path
-from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
+from typing import (
+ Any,
+ Dict,
+ Iterable,
+ List,
+ NamedTuple,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
from typing.io import TextIO
import dateutil.parser
from contrib.cve_lib import load_cve
from zope.component import getUtility
+from zope.schema import URI
+from zope.schema.interfaces import InvalidURI
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.bugtask import BugTaskImportance, BugTaskStatus
@@ -448,6 +461,21 @@ class CVE:
),
)
+ PatchURL = NamedTuple(
+ "PatchURL",
+ (
+ ("package_name", SourcePackageName),
+ ("type", str),
+ ("url", str),
+ ("notes", Optional[str]),
+ ),
+ )
+
+ # Example:
+ # https://github.com/389ds/389-ds-base/commit/123 (1.4.4)
+ # https://github.com/389ds/389-ds-base/commit/345
+ PATCH_URL_RE = re.compile(r"^(?P<url>.+?)(\s+\((?P<notes>.+)\))?$")
+
PRIORITY_MAP = {
UCTRecord.Priority.CRITICAL: BugTaskImportance.CRITICAL,
UCTRecord.Priority.HIGH: BugTaskImportance.HIGH,
@@ -502,6 +530,7 @@ class CVE:
notes: str,
mitigation: str,
cvss: List[CVSS],
+ patch_urls: Optional[List[PatchURL]] = None,
):
self.sequence = sequence
self.date_made_public = date_made_public
@@ -521,6 +550,7 @@ class CVE:
self.notes = notes
self.mitigation = mitigation
self.cvss = cvss
+ self.patch_urls = patch_urls or [] # type: List[CVE.PatchURL]
@classmethod
def make_from_uct_record(cls, uct_record: UCTRecord) -> "CVE":
@@ -532,6 +562,7 @@ class CVE:
distro_packages = []
series_packages = []
+ patch_urls = []
spn_set = getUtility(ISourcePackageNameSet)
@@ -541,6 +572,11 @@ class CVE:
for uct_package in uct_record.packages:
source_package_name = spn_set.getOrCreateByName(uct_package.name)
+
+ patch_urls.extend(
+ cls.get_patch_urls(source_package_name, uct_package.patches)
+ )
+
package_importance = (
cls.PRIORITY_MAP[uct_package.priority]
if uct_package.priority
@@ -660,6 +696,7 @@ class CVE:
notes=uct_record.notes,
mitigation=uct_record.mitigation,
cvss=uct_record.cvss,
+ patch_urls=patch_urls,
)
def to_uct_record(self) -> UCTRecord:
@@ -749,6 +786,17 @@ class CVE:
patches=[],
)
+ for patch_url in self.patch_urls:
+ entry = patch_url.url
+ if patch_url.notes:
+ entry = "{} ({})".format(entry, patch_url.notes)
+ packages_by_name[patch_url.package_name.name].patches.append(
+ UCTRecord.Patch(
+ patch_type=patch_url.type,
+ entry=entry,
+ )
+ )
+
return UCTRecord(
parent_dir=self.VULNERABILITY_STATUS_MAP_REVERSE.get(
self.status, ""
@@ -826,3 +874,33 @@ class CVE:
"Could not find the distro series: %s", distro_series_name
)
return distro_series
+
+ @classmethod
+ def get_patch_urls(
+ cls,
+ source_package_name: SourcePackageName,
+ patches: List[UCTRecord.Patch],
+ ) -> Iterable[PatchURL]:
+ for patch in patches:
+ if patch.patch_type == "break-fix":
+ continue
+ match = cls.PATCH_URL_RE.match(patch.entry)
+ if not match:
+ logger.warning(
+ "Could not parse the patch entry: %s", patch.entry
+ )
+ continue
+
+ try:
+ url = URI().fromUnicode(match.groupdict()["url"])
+ except InvalidURI:
+ logger.error("Invalid patch URL: %s", patch.entry)
+ continue
+
+ notes = match.groupdict().get("notes")
+ yield cls.PatchURL(
+ package_name=source_package_name,
+ type=patch.patch_type,
+ url=url,
+ notes=notes,
+ )
diff --git a/lib/lp/bugs/scripts/uct/uctexport.py b/lib/lp/bugs/scripts/uct/uctexport.py
index d4b5b45..8a1dd27 100644
--- a/lib/lp/bugs/scripts/uct/uctexport.py
+++ b/lib/lp/bugs/scripts/uct/uctexport.py
@@ -2,6 +2,7 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
import logging
+import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, NamedTuple, Optional
@@ -10,6 +11,7 @@ from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.bugs.interfaces.bug import IBugSet
+from lp.bugs.interfaces.bugattachment import BugAttachmentType
from lp.bugs.model.bug import Bug as BugModel
from lp.bugs.model.bugtask import BugTask
from lp.bugs.model.cve import Cve as CveModel
@@ -44,6 +46,13 @@ class UCTExporter:
),
)
+ # Example:
+ # linux/upstream
+ # linux/upstream/5.19.1
+ BUG_ATTACHMENT_TITLE_RE = re.compile(
+ "^(?P<package_name>.+?)/(?P<patch_type>.+?)(/(?P<notes>.+?))?$"
+ )
+
def export_bug_to_uct_file(
self, bug_id: int, output_dir: Path
) -> Optional[Path]:
@@ -192,6 +201,31 @@ class UCTExporter:
)
)
+ packages_by_name = {
+ p.name: p for p in package_name_by_product.values()
+ }
+ patch_urls = []
+ for attachment in bug.attachments:
+ if (
+ not attachment.url
+ or not attachment.type == BugAttachmentType.PATCH
+ ):
+ continue
+ title_match = self.BUG_ATTACHMENT_TITLE_RE.match(attachment.title)
+ if not title_match:
+ continue
+ spn = packages_by_name.get(title_match.groupdict()["package_name"])
+ if not spn:
+ continue
+ patch_urls.append(
+ CVE.PatchURL(
+ package_name=spn,
+ type=title_match.groupdict()["patch_type"],
+ url=attachment.url,
+ notes=title_match.groupdict().get("notes"),
+ )
+ )
+
return CVE(
sequence="CVE-{}".format(lp_cve.sequence),
date_made_public=vulnerability.date_made_public,
@@ -217,6 +251,7 @@ class UCTExporter:
)
for authority, vector_string in lp_cve.cvss.items()
],
+ patch_urls=patch_urls,
)
def _parse_bug_description(
diff --git a/lib/lp/bugs/scripts/uct/uctimport.py b/lib/lp/bugs/scripts/uct/uctimport.py
index 12bc601..5bc8411 100644
--- a/lib/lp/bugs/scripts/uct/uctimport.py
+++ b/lib/lp/bugs/scripts/uct/uctimport.py
@@ -38,6 +38,7 @@ from lp.app.enums import InformationType
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.interfaces.bug import CreateBugParams, IBugSet
from lp.bugs.interfaces.bugactivity import IBugActivitySet
+from lp.bugs.interfaces.bugattachment import BugAttachmentType
from lp.bugs.interfaces.bugtask import BugTaskImportance, IBugTaskSet
from lp.bugs.interfaces.bugwatch import IBugWatchSet
from lp.bugs.interfaces.cve import ICveSet
@@ -166,6 +167,7 @@ class UCTImporter:
) # type: BugModel
self._update_external_bug_urls(bug, cve.bug_urls)
+ self._update_patches(bug, cve.patch_urls)
self._create_bug_tasks(
bug,
@@ -222,6 +224,7 @@ class UCTImporter:
)
self._assign_bug_tasks(bug, cve.assignee)
self._update_external_bug_urls(bug, cve.bug_urls)
+ self._update_patches(bug, cve.patch_urls)
# Update or add new Vulnerabilities
vulnerabilities_by_distro = {
@@ -429,6 +432,29 @@ class UCTImporter:
for external_bug_url in bug_urls:
bug_watch_set.fromText(external_bug_url, bug, self.bug_importer)
+ def _update_patches(self, bug: BugModel, patch_urls: List[CVE.PatchURL]):
+ attachments_by_url = {a.url: a for a in bug.attachments if a.url}
+ for patch_url in patch_urls:
+ title = "{}/{}".format(patch_url.package_name.name, patch_url.type)
+ if patch_url.notes:
+ title = "{}/{}".format(title, patch_url.notes)
+ if patch_url in attachments_by_url:
+ attachment = removeSecurityProxy(
+ attachments_by_url[patch_url.url]
+ )
+ attachment.title = title
+ attachment.type = BugAttachmentType.PATCH
+ else:
+ bug.addAttachment(
+ owner=bug.owner,
+ data=None,
+ comment=None,
+ filename=None,
+ url=patch_url.url,
+ is_patch=True,
+ description=title,
+ )
+
def _make_bug_description(self, cve: CVE) -> str:
"""
Some `CVE` fields can't be mapped to Launchpad models.