launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #33093
[Merge] ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master
Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.
Commit message:
Add CveSet.advancedSearch
`advancedSearch` allows to filter cves if they are present or not in a
distribution (as a vulnerability). This allows the SEC team to search
cves to triage them.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/493853
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
index 4efd60c..dd341b4 100644
--- a/lib/lp/bugs/interfaces/cve.py
+++ b/lib/lp/bugs/interfaces/cve.py
@@ -9,16 +9,24 @@ __all__ = [
"ICveSet",
]
+from typing import Iterator, Optional
+
from lazr.enum import DBEnumeratedType, DBItem
from lazr.restful.declarations import (
+ REQUEST_USER,
+ call_with,
collection_default_content,
+ export_operation_as,
+ export_read_operation,
exported,
exported_as_webservice_collection,
exported_as_webservice_entry,
+ operation_for_version,
+ operation_parameters,
)
from lazr.restful.fields import CollectionField, Reference
from zope.interface import Attribute, Interface
-from zope.schema import Choice, Datetime, Dict, Int, Text, TextLine
+from zope.schema import Choice, Datetime, Dict, Int, List, Text, TextLine
from lp import _
from lp.app.validators.validation import valid_cve_sequence
@@ -233,6 +241,66 @@ class ICveSet(Interface):
def search(text):
"""Search the CVE database for matching CVE entries."""
+ def advancedSearch(
+ in_distribution: List = [],
+ not_in_distribution: List = [],
+ since: Optional[Datetime] = None,
+ limit: Optional[int] = None,
+ ) -> Iterator[str]:
+ """Return an iterator of cve sequences that matches the given filters.
+
+ :param in_distribution: filter cves that have a vulnerability for all
+ of these distributions
+ :param not_in_distribution: filter cves that have no vulnerability for
+ any of these distributions
+ :param since: only updated cves after `since` will be returned.
+ :param limit: return `limit` cves at max.
+ """
+
+ # in_distribution patched in lp.bugs.interfaces.webservice
+ # not_in_distribution patched in lp.bugs.interfaces.webservice
+ @operation_parameters(
+ in_distribution=List(
+ title=_("Distributions linked to the cve"),
+ value_type=Reference(schema=Interface),
+ required=False,
+ ),
+ not_in_distribution=List(
+ title=_("Distributions not linked to the cve"),
+ value_type=Reference(schema=Interface),
+ required=False,
+ ),
+ since=Datetime(
+ title=_("Minimum cve.datemodified"),
+ description=_("Ignore cves that are older than this."),
+ required=False,
+ ),
+ limit=Int(
+ title=_("Maximum number of cves to return"),
+ required=False,
+ ),
+ )
+ @export_operation_as("advancedSearch")
+ @call_with(requester=REQUEST_USER)
+ @export_read_operation()
+ @operation_for_version("devel")
+ def api_advancedSearch(
+ requester,
+ in_distribution: List = [],
+ not_in_distribution: List = [],
+ since: Optional[Datetime] = None,
+ limit: Optional[int] = None,
+ ) -> dict:
+ """Return cve sequences that matches the given filters.
+
+ :param in_distribution: filter cves that have a vulnerability for all
+ of these distributions
+ :param not_in_distribution: filter cves that have no vulnerability for
+ any of these distributions
+ :param since: only updated cves after `since` will be returned.
+ :param limit: return `limit` cves at max.
+ """
+
def inText(text):
"""Find one or more Cve's by analysing the given text.
diff --git a/lib/lp/bugs/interfaces/webservice.py b/lib/lp/bugs/interfaces/webservice.py
index e69d8e8..b409934 100644
--- a/lib/lp/bugs/interfaces/webservice.py
+++ b/lib/lp/bugs/interfaces/webservice.py
@@ -84,6 +84,7 @@ from lp.bugs.interfaces.structuralsubscription import (
)
from lp.bugs.interfaces.vulnerability import IVulnerability, IVulnerabilitySet
from lp.code.interfaces.branchmergeproposal import IBranchMergeProposal
+from lp.registry.interfaces.distribution import IDistribution
from lp.registry.interfaces.distributionsourcepackage import (
IDistributionSourcePackage,
)
@@ -163,3 +164,17 @@ patch_entry_return_type(
"addBugSubscriptionFilter",
IBugSubscriptionFilter,
)
+
+# ICve
+patch_list_parameter_type(
+ ICveSet,
+ "api_advancedSearch",
+ "in_distribution",
+ Reference(schema=IDistribution),
+)
+patch_list_parameter_type(
+ ICveSet,
+ "api_advancedSearch",
+ "not_in_distribution",
+ Reference(schema=IDistribution),
+)
diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
index ce24b2d..a20df63 100644
--- a/lib/lp/bugs/model/cve.py
+++ b/lib/lp/bugs/model/cve.py
@@ -14,6 +14,7 @@ from storm.databases.postgres import JSON
from storm.locals import DateTime, Desc, Int, ReferenceSet, Store, Unicode
from zope.component import getUtility
from zope.interface import implementer
+from zope.security.interfaces import Unauthorized
from lp.app.validators.cve import CVEREF_PATTERN, valid_cve
from lp.bugs.interfaces.buglink import IBugLinkTarget
@@ -25,7 +26,9 @@ from lp.bugs.model.vulnerability import (
Vulnerability,
get_vulnerability_privacy_filter,
)
+from lp.registry.interfaces.role import IPersonRoles
from lp.registry.model.distribution import Distribution
+from lp.registry.security import SecurityAdminDistribution
from lp.services.database import bulk
from lp.services.database.constants import UTC_NOW
from lp.services.database.decoratedresultset import DecoratedResultSet
@@ -256,6 +259,80 @@ class CveSet:
.config(distinct=True)
)
+ def advancedSearch(
+ self,
+ in_distribution=[],
+ not_in_distribution=[],
+ since=None,
+ limit=None,
+ ):
+ """See `ICveSet`."""
+
+ in_distribution_id = [d.id for d in in_distribution]
+ not_in_distribution_id = [d.id for d in not_in_distribution]
+
+ query = """
+ SELECT c.sequence
+ FROM cve c
+ LEFT JOIN vulnerability v ON c.id = v.cve
+ WHERE (%s is NULL OR c.datemodified > %s)
+ GROUP BY c.id, c.sequence
+ HAVING
+ (
+ %s = 0
+ OR COUNT(DISTINCT v.distribution) FILTER (
+ WHERE v.distribution = ANY(%s)
+ ) = %s
+ )
+ AND COUNT(DISTINCT v.distribution) FILTER (
+ WHERE v.distribution = ANY(%s)
+ ) = 0
+ LIMIT %s;
+ """
+
+ params = (
+ since,
+ since,
+ len(set(in_distribution_id)),
+ in_distribution_id,
+ len(set(in_distribution_id)),
+ not_in_distribution_id,
+ limit,
+ )
+
+ store = IStore(Vulnerability)
+ result = store.execute(query, params)
+ for r in result:
+ yield r[0]
+
+ def api_advancedSearch(
+ self,
+ requester,
+ in_distribution=[],
+ not_in_distribution=[],
+ since=None,
+ limit=None,
+ ):
+ """See `ICveSet`."""
+ if not requester:
+ raise Unauthorized(
+ "Only authenticated users can use this endpoint"
+ )
+
+ for distro in in_distribution + not_in_distribution:
+ user = IPersonRoles(requester)
+ if not SecurityAdminDistribution(distro).checkAuthenticated(user):
+ raise Unauthorized(
+ f"Only security admins can use {distro.name} as a filter"
+ )
+
+ cves = list(
+ self.advancedSearch(
+ in_distribution, not_in_distribution, since, limit
+ )
+ )
+ return {"cves": cves}
+
def inText(self, text):
"""See ICveSet."""
# let's look for matching entries
diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
index 23fd423..27faf68 100644
--- a/lib/lp/bugs/tests/test_cve.py
+++ b/lib/lp/bugs/tests/test_cve.py
@@ -3,7 +3,7 @@
"""CVE related tests."""
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from testtools.matchers import MatchesStructure
from testtools.testcase import ExpectedException
@@ -14,14 +14,17 @@ from lp.app.enums import InformationType
from lp.bugs.interfaces.bugtasksearch import BugTaskSearchParams
from lp.bugs.interfaces.cve import CveStatus, ICveSet
from lp.bugs.scripts.uct.models import CVSS
+from lp.services.webapp.interfaces import OAuthPermission
from lp.testing import (
TestCaseWithFactory,
admin_logged_in,
+ api_url,
login_person,
person_logged_in,
verifyObject,
)
from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.pages import webservice_for_person
class TestCveSet(TestCaseWithFactory):
@@ -112,6 +115,263 @@ class TestCveSet(TestCaseWithFactory):
self.assertEqual(base, getUtility(ICveSet).getBugCveCount())
+class TestCveSetAdvancedSearch(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ super().setUp()
+ self.cves = getUtility(ICveSet)
+
+ def test_advancedSearch_default_arguments(self):
+ # All cves will be returned as there are not vulnerabilities created
+ result = self.cves.advancedSearch()
+ self.assertEqual(10, len(list(result)))
+
+ def test_advancedSearch_in_distribution(self):
+ distribution1 = self.factory.makeDistribution()
+ distribution2 = self.factory.makeDistribution()
+
+ # No result as there is no cve with a vulnerability for distribution1
+ result = self.cves.advancedSearch(in_distribution=[distribution1])
+ self.assertEqual(0, len(list(result)))
+
+ cve = self.factory.makeCVE(sequence="2099-9876")
+ self.factory.makeVulnerability(distribution1, cve=cve)
+
+ # There is 1 cve with a vulnerability for distribution
+ result = self.cves.advancedSearch(in_distribution=[distribution1])
+ cves = list(result)
+ self.assertEqual(1, len(cves))
+ self.assertEqual("2099-9876", cves[0])
+
+ # No result as there is no cve with a vulnerability for distribution2
+ result = self.cves.advancedSearch(in_distribution=[distribution2])
+ self.assertEqual(0, len(list(result)))
+
+ def test_advancedSearch_not_in_distribution(self):
+ distribution1 = self.factory.makeDistribution()
+ distribution2 = self.factory.makeDistribution()
+
+ # No result as there is no cve with a vulnerability for distributions
+ result = self.cves.advancedSearch(
+ not_in_distribution=[distribution1, distribution2]
+ )
+ self.assertEqual(10, len(list(result)))
+
+ cve = self.factory.makeCVE(sequence="2099-9876")
+
+ # There are 11 cve without a vulnerability for distributions
+ result = self.cves.advancedSearch(
+ not_in_distribution=[distribution1, distribution2]
+ )
+ self.assertEqual(11, len(list(result)))
+
+ self.factory.makeVulnerability(distribution1, cve=cve)
+
+ # There are 10 cve without a vulnerability for any of distributions
+ result = self.cves.advancedSearch(
+ not_in_distribution=[distribution1, distribution2]
+ )
+ self.assertEqual(10, len(list(result)))
+
+ def test_advancedSearch_both_in_distribution(self):
+ distribution1 = self.factory.makeDistribution()
+ distribution2 = self.factory.makeDistribution()
+ distribution3 = self.factory.makeDistribution()
+
+ # No result as there are no cves that:
+ # - has a vulnerability for distribution1 and distribution2
+ # - has no vulnerability for distribution3
+ result = self.cves.advancedSearch(
+ in_distribution=[distribution1, distribution2],
+ not_in_distribution=[distribution3],
+ )
+ self.assertEqual(0, len(list(result)))
+
+ cve = self.factory.makeCVE(sequence="2099-9876")
+ self.factory.makeVulnerability(distribution1, cve=cve)
+
+ # No result as we only created the vulnerability for distribution1
+ result = self.cves.advancedSearch(
+ in_distribution=[distribution1, distribution2],
+ not_in_distribution=[distribution3],
+ )
+ self.assertEqual(0, len(list(result)))
+
+ # 1 result as we created a vulnerability for distribution1 and
+ # distribution2
+ self.factory.makeVulnerability(distribution2, cve=cve)
+ result = self.cves.advancedSearch(
+ in_distribution=[distribution1, distribution2],
+ not_in_distribution=[distribution3],
+ )
+ cves = list(result)
+ self.assertEqual(1, len(cves))
+ self.assertEqual("2099-9876", cves[0])
+
+ # No result as we created a vulnerability for distribution3 for the cve
+ self.factory.makeVulnerability(distribution3, cve=cve)
+ result = self.cves.advancedSearch(
+ in_distribution=[distribution1, distribution2],
+ not_in_distribution=[distribution3],
+ )
+ self.assertEqual(0, len(list(result)))
+
+ def test_advancedSearch_since(self):
+ since = datetime.utcnow() - timedelta(days=1)
+ self.factory.makeCVE(sequence="2099-9876")
+
+ result = self.cves.advancedSearch(since=since)
+ cves = list(result)
+ self.assertEqual(1, len(cves))
+ self.assertEqual("2099-9876", cves[0])
+
+ def test_advancedSearch_limit(self):
+ result = self.cves.advancedSearch(limit=5)
+ self.assertEqual(5, len(list(result)))
+
+
+class TestCveSetAdvancedSearchWebService(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ super().setUp()
+ self.person = self.factory.makePerson()
+ self.distribution1 = self.factory.makeDistribution(
+ owner=self.person, name="distribution1"
+ )
+ self.distribution2 = self.factory.makeDistribution(
+ owner=self.person, name="distribution2"
+ )
+
+ self.api_base = "http://api.launchpad.test/devel"
+ self.cves = getUtility(ICveSet)
+ self.cves_url = self.api_base + api_url(self.cves)
+
+ def test_advancedSearch_default_arguments(self):
+ webservice = webservice_for_person(
+ self.person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ )
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(10, len(response.jsonBody()["cves"]))
+
+ def test_advancedSearch_in_distribution(self):
+ cve = self.factory.makeCVE(sequence="2099-9876")
+ self.factory.makeVulnerability(self.distribution1, cve=cve)
+
+ in_distribution = [api_url(self.distribution1)]
+ webservice = webservice_for_person(
+ self.person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ in_distribution=in_distribution,
+ )
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(1, len(response.jsonBody()["cves"]))
+ self.assertEqual("2099-9876", response.jsonBody()["cves"][0])
+
+ def test_advancedSearch_not_in_distribution(self):
+ not_in_distribution = [api_url(self.distribution1)]
+ webservice = webservice_for_person(
+ self.person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ not_in_distribution=not_in_distribution,
+ )
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(10, len(response.jsonBody()["cves"]))
+
+ def test_advancedSearch_since(self):
+ self.factory.makeCVE(sequence="2099-9876")
+
+ webservice = webservice_for_person(
+ self.person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ since=(datetime.utcnow() - timedelta(days=1)).isoformat(),
+ )
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(1, len(response.jsonBody()["cves"]))
+
+ def test_advancedSearch_limit(self):
+ webservice = webservice_for_person(
+ self.person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ limit=5,
+ )
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(5, len(response.jsonBody()["cves"]))
+
+ def test_advancedSearch_unauthenticated(self):
+ in_distribution = [api_url(self.distribution1)]
+
+ webservice = webservice_for_person(
+ None,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ in_distribution=in_distribution,
+ )
+
+ self.assertEqual(401, response.status)
+ self.assertEqual(
+ b"Only authenticated users can use this endpoint",
+ response.body,
+ )
+
+ def test_advancedSearch_unauthorized(self):
+ in_distribution = [api_url(self.distribution1)]
+ person = self.factory.makePerson()
+
+ webservice = webservice_for_person(
+ person,
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel",
+ )
+ response = webservice.named_get(
+ self.cves_url,
+ "advancedSearch",
+ in_distribution=in_distribution,
+ )
+
+ self.assertEqual(401, response.status)
+ self.assertEqual(
+ b"Only security admins can use distribution1 as a filter",
+ response.body,
+ )
+
+
class TestBugLinks(TestCaseWithFactory):
layer = DatabaseFunctionalLayer
Follow ups