← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master

 

Enrique Sánchez has proposed merging ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.

Commit message:
Add CveSet.advancedSearch
    
`advancedSearch` allows to filter cves if they are present or not in a
distribution (as a vulnerability). This allows the SEC team to search
cves to triage them.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/493853
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
index 4efd60c..dd341b4 100644
--- a/lib/lp/bugs/interfaces/cve.py
+++ b/lib/lp/bugs/interfaces/cve.py
@@ -9,16 +9,24 @@ __all__ = [
     "ICveSet",
 ]
 
+from typing import Iterator, Optional
+
 from lazr.enum import DBEnumeratedType, DBItem
 from lazr.restful.declarations import (
+    REQUEST_USER,
+    call_with,
     collection_default_content,
+    export_operation_as,
+    export_read_operation,
     exported,
     exported_as_webservice_collection,
     exported_as_webservice_entry,
+    operation_for_version,
+    operation_parameters,
 )
 from lazr.restful.fields import CollectionField, Reference
 from zope.interface import Attribute, Interface
-from zope.schema import Choice, Datetime, Dict, Int, Text, TextLine
+from zope.schema import Choice, Datetime, Dict, Int, List, Text, TextLine
 
 from lp import _
 from lp.app.validators.validation import valid_cve_sequence
@@ -233,6 +241,66 @@ class ICveSet(Interface):
     def search(text):
         """Search the CVE database for matching CVE entries."""
 
+    def advancedSearch(
+        in_distribution: List = [],
+        not_in_distribution: List = [],
+        since: Optional[Datetime] = None,
+        limit: Optional[int] = None,
+    ) -> Iterator[str]:
+        """Return an iterator of cve sequences that matches the given filters.
+
+        :param in_distribution: filter cves that have a vulnerability for all
+            of these distributions
+        :param not_in_distribution: filter cves that have no vulnerability for
+            any of these distributions
+        :param since: only updated cves after `since` will be returned.
+        :param limit: return `limit` cves at max.
+        """
+
+    # in_distribution patched in lp.bugs.interfaces.webservice
+    # not_in_distribution patched in lp.bugs.interfaces.webservice
+    @operation_parameters(
+        in_distribution=List(
+            title=_("Distributions linked to the cve"),
+            value_type=Reference(schema=Interface),
+            required=False,
+        ),
+        not_in_distribution=List(
+            title=_("Distributions not linked to the cve"),
+            value_type=Reference(schema=Interface),
+            required=False,
+        ),
+        since=Datetime(
+            title=_("Minimum cve.datemodified"),
+            description=_("Ignore cves that are older than this."),
+            required=False,
+        ),
+        limit=Int(
+            title=_("Maximum number of cves to return"),
+            required=False,
+        ),
+    )
+    @export_operation_as("advancedSearch")
+    @call_with(requester=REQUEST_USER)
+    @export_read_operation()
+    @operation_for_version("devel")
+    def api_advancedSearch(
+        requester,
+        in_distribution: List = [],
+        not_in_distribution: List = [],
+        since: Optional[Datetime] = None,
+        limit: Optional[int] = None,
+    ) -> dict:
+        """Return cve sequences that matches the given filters.
+
+        :param in_distribution: filter cves that have a vulnerability for all
+            of these distributions
+        :param not_in_distribution: filter cves that have no vulnerability for
+            any of these distributions
+        :param since: only updated cves after `since` will be returned.
+        :param limit: return `limit` cves at max.
+        """
+
     def inText(text):
         """Find one or more Cve's by analysing the given text.
 
diff --git a/lib/lp/bugs/interfaces/webservice.py b/lib/lp/bugs/interfaces/webservice.py
index e69d8e8..b409934 100644
--- a/lib/lp/bugs/interfaces/webservice.py
+++ b/lib/lp/bugs/interfaces/webservice.py
@@ -84,6 +84,7 @@ from lp.bugs.interfaces.structuralsubscription import (
 )
 from lp.bugs.interfaces.vulnerability import IVulnerability, IVulnerabilitySet
 from lp.code.interfaces.branchmergeproposal import IBranchMergeProposal
+from lp.registry.interfaces.distribution import IDistribution
 from lp.registry.interfaces.distributionsourcepackage import (
     IDistributionSourcePackage,
 )
@@ -163,3 +164,17 @@ patch_entry_return_type(
     "addBugSubscriptionFilter",
     IBugSubscriptionFilter,
 )
+
+# ICve
+patch_list_parameter_type(
+    ICveSet,
+    "api_advancedSearch",
+    "in_distribution",
+    Reference(schema=IDistribution),
+)
+patch_list_parameter_type(
+    ICveSet,
+    "api_advancedSearch",
+    "not_in_distribution",
+    Reference(schema=IDistribution),
+)
diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
index ce24b2d..a20df63 100644
--- a/lib/lp/bugs/model/cve.py
+++ b/lib/lp/bugs/model/cve.py
@@ -14,6 +14,7 @@ from storm.databases.postgres import JSON
 from storm.locals import DateTime, Desc, Int, ReferenceSet, Store, Unicode
 from zope.component import getUtility
 from zope.interface import implementer
+from zope.security.interfaces import Unauthorized
 
 from lp.app.validators.cve import CVEREF_PATTERN, valid_cve
 from lp.bugs.interfaces.buglink import IBugLinkTarget
@@ -25,7 +26,9 @@ from lp.bugs.model.vulnerability import (
     Vulnerability,
     get_vulnerability_privacy_filter,
 )
+from lp.registry.interfaces.role import IPersonRoles
 from lp.registry.model.distribution import Distribution
+from lp.registry.security import SecurityAdminDistribution
 from lp.services.database import bulk
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.decoratedresultset import DecoratedResultSet
@@ -256,6 +259,80 @@ class CveSet:
             .config(distinct=True)
         )
 
+    def advancedSearch(
+        self,
+        in_distribution=[],
+        not_in_distribution=[],
+        since=None,
+        limit=None,
+    ):
+        """See `ICveSet`."""
+
+        in_distribution_id = [d.id for d in in_distribution]
+        not_in_distribution_id = [d.id for d in not_in_distribution]
+
+        query = """
+                SELECT c.sequence
+                FROM cve c
+                LEFT JOIN vulnerability v ON c.id = v.cve
+                WHERE (%s is NULL OR c.datemodified > %s)
+                GROUP BY c.id, c.sequence
+                HAVING
+                    (
+                        %s = 0
+                        OR COUNT(DISTINCT v.distribution) FILTER (
+                            WHERE v.distribution = ANY(%s)
+                        ) = %s
+                    )
+                    AND COUNT(DISTINCT v.distribution) FILTER (
+                        WHERE v.distribution = ANY(%s)
+                    ) = 0
+                LIMIT %s;
+        """
+
+        params = (
+            since,
+            since,
+            len(set(in_distribution_id)),
+            in_distribution_id,
+            len(set(in_distribution_id)),
+            not_in_distribution_id,
+            limit,
+        )
+
+        store = IStore(Vulnerability)
+        result = store.execute(query, params)
+        for r in result:
+            yield r[0]
+
+    def api_advancedSearch(
+        self,
+        requester,
+        in_distribution=[],
+        not_in_distribution=[],
+        since=None,
+        limit=None,
+    ):
+        """See `ICveSet`."""
+        if not requester:
+            raise Unauthorized(
+                "Only authenticated users can use this endpoint"
+            )
+
+        for distro in in_distribution + not_in_distribution:
+            user = IPersonRoles(requester)
+            if not SecurityAdminDistribution(distro).checkAuthenticated(user):
+                raise Unauthorized(
+                    f"Only security admins can use {distro.name} as a filter"
+                )
+
+        cves = list(
+            self.advancedSearch(
+                in_distribution, not_in_distribution, since, limit
+            )
+        )
+        return {"cves": cves}
+
     def inText(self, text):
         """See ICveSet."""
         # let's look for matching entries
diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
index 23fd423..27faf68 100644
--- a/lib/lp/bugs/tests/test_cve.py
+++ b/lib/lp/bugs/tests/test_cve.py
@@ -3,7 +3,7 @@
 
 """CVE related tests."""
 
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
 
 from testtools.matchers import MatchesStructure
 from testtools.testcase import ExpectedException
@@ -14,14 +14,17 @@ from lp.app.enums import InformationType
 from lp.bugs.interfaces.bugtasksearch import BugTaskSearchParams
 from lp.bugs.interfaces.cve import CveStatus, ICveSet
 from lp.bugs.scripts.uct.models import CVSS
+from lp.services.webapp.interfaces import OAuthPermission
 from lp.testing import (
     TestCaseWithFactory,
     admin_logged_in,
+    api_url,
     login_person,
     person_logged_in,
     verifyObject,
 )
 from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.pages import webservice_for_person
 
 
 class TestCveSet(TestCaseWithFactory):
@@ -112,6 +115,263 @@ class TestCveSet(TestCaseWithFactory):
         self.assertEqual(base, getUtility(ICveSet).getBugCveCount())
 
 
+class TestCveSetAdvancedSearch(TestCaseWithFactory):
+    layer = DatabaseFunctionalLayer
+
+    def setUp(self):
+        super().setUp()
+        self.cves = getUtility(ICveSet)
+
+    def test_advancedSearch_default_arguments(self):
+        # All cves will be returned as there are not vulnerabilities created
+        result = self.cves.advancedSearch()
+        self.assertEqual(10, len(list(result)))
+
+    def test_advancedSearch_in_distribution(self):
+        distribution1 = self.factory.makeDistribution()
+        distribution2 = self.factory.makeDistribution()
+
+        # No result as there is no cve with a vulnerability for distribution1
+        result = self.cves.advancedSearch(in_distribution=[distribution1])
+        self.assertEqual(0, len(list(result)))
+
+        cve = self.factory.makeCVE(sequence="2099-9876")
+        self.factory.makeVulnerability(distribution1, cve=cve)
+
+        # There is 1 cve with a vulnerability for distribution
+        result = self.cves.advancedSearch(in_distribution=[distribution1])
+        cves = list(result)
+        self.assertEqual(1, len(cves))
+        self.assertEqual("2099-9876", cves[0])
+
+        # No result as there is no cve with a vulnerability for distribution2
+        result = self.cves.advancedSearch(in_distribution=[distribution2])
+        self.assertEqual(0, len(list(result)))
+
+    def test_advancedSearch_not_in_distribution(self):
+        distribution1 = self.factory.makeDistribution()
+        distribution2 = self.factory.makeDistribution()
+
+        # No result as there is no cve with a vulnerability for distributions
+        result = self.cves.advancedSearch(
+            not_in_distribution=[distribution1, distribution2]
+        )
+        self.assertEqual(10, len(list(result)))
+
+        cve = self.factory.makeCVE(sequence="2099-9876")
+
+        # There are 11 cve without a vulnerability for distributions
+        result = self.cves.advancedSearch(
+            not_in_distribution=[distribution1, distribution2]
+        )
+        self.assertEqual(11, len(list(result)))
+
+        self.factory.makeVulnerability(distribution1, cve=cve)
+
+        # There are 10 cve without a vulnerability for any of distributions
+        result = self.cves.advancedSearch(
+            not_in_distribution=[distribution1, distribution2]
+        )
+        self.assertEqual(10, len(list(result)))
+
+    def test_advancedSearch_both_in_distribution(self):
+        distribution1 = self.factory.makeDistribution()
+        distribution2 = self.factory.makeDistribution()
+        distribution3 = self.factory.makeDistribution()
+
+        # No result as there are no cves that:
+        #   - has a vulnerability for distribution1 and distribution2
+        #   - has no vulnerability for distribution3
+        result = self.cves.advancedSearch(
+            in_distribution=[distribution1, distribution2],
+            not_in_distribution=[distribution3],
+        )
+        self.assertEqual(0, len(list(result)))
+
+        cve = self.factory.makeCVE(sequence="2099-9876")
+        self.factory.makeVulnerability(distribution1, cve=cve)
+
+        # No result as we only created the vulnerability for distribution1
+        result = self.cves.advancedSearch(
+            in_distribution=[distribution1, distribution2],
+            not_in_distribution=[distribution3],
+        )
+        self.assertEqual(0, len(list(result)))
+
+        # 1 result as we created a vulnerability for distribution1 and
+        # distribution2
+        self.factory.makeVulnerability(distribution2, cve=cve)
+        result = self.cves.advancedSearch(
+            in_distribution=[distribution1, distribution2],
+            not_in_distribution=[distribution3],
+        )
+        cves = list(result)
+        self.assertEqual(1, len(cves))
+        self.assertEqual("2099-9876", cves[0])
+
+        # No result as we created a vulnerability for distribution3 for the cve
+        self.factory.makeVulnerability(distribution3, cve=cve)
+        result = self.cves.advancedSearch(
+            in_distribution=[distribution1, distribution2],
+            not_in_distribution=[distribution3],
+        )
+        self.assertEqual(0, len(list(result)))
+
+    def test_advancedSearch_since(self):
+        since = datetime.utcnow() - timedelta(days=1)
+        self.factory.makeCVE(sequence="2099-9876")
+
+        result = self.cves.advancedSearch(since=since)
+        cves = list(result)
+        self.assertEqual(1, len(cves))
+        self.assertEqual("2099-9876", cves[0])
+
+    def test_advancedSearch_limit(self):
+        result = self.cves.advancedSearch(limit=5)
+        self.assertEqual(5, len(list(result)))
+
+
+class TestCveSetAdvancedSearchWebService(TestCaseWithFactory):
+    layer = DatabaseFunctionalLayer
+
+    def setUp(self):
+        super().setUp()
+        self.person = self.factory.makePerson()
+        self.distribution1 = self.factory.makeDistribution(
+            owner=self.person, name="distribution1"
+        )
+        self.distribution2 = self.factory.makeDistribution(
+            owner=self.person, name="distribution2"
+        )
+
+        self.api_base = "http://api.launchpad.test/devel";
+        self.cves = getUtility(ICveSet)
+        self.cves_url = self.api_base + api_url(self.cves)
+
+    def test_advancedSearch_default_arguments(self):
+        webservice = webservice_for_person(
+            self.person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+        )
+
+        self.assertEqual(200, response.status)
+        self.assertEqual(10, len(response.jsonBody()["cves"]))
+
+    def test_advancedSearch_in_distribution(self):
+        cve = self.factory.makeCVE(sequence="2099-9876")
+        self.factory.makeVulnerability(self.distribution1, cve=cve)
+
+        in_distribution = [api_url(self.distribution1)]
+        webservice = webservice_for_person(
+            self.person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            in_distribution=in_distribution,
+        )
+
+        self.assertEqual(200, response.status)
+        self.assertEqual(1, len(response.jsonBody()["cves"]))
+        self.assertEqual("2099-9876", response.jsonBody()["cves"][0])
+
+    def test_advancedSearch_not_in_distribution(self):
+        not_in_distribution = [api_url(self.distribution1)]
+        webservice = webservice_for_person(
+            self.person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            not_in_distribution=not_in_distribution,
+        )
+
+        self.assertEqual(200, response.status)
+        self.assertEqual(10, len(response.jsonBody()["cves"]))
+
+    def test_advancedSearch_since(self):
+        self.factory.makeCVE(sequence="2099-9876")
+
+        webservice = webservice_for_person(
+            self.person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            since=(datetime.utcnow() - timedelta(days=1)).isoformat(),
+        )
+
+        self.assertEqual(200, response.status)
+        self.assertEqual(1, len(response.jsonBody()["cves"]))
+
+    def test_advancedSearch_limit(self):
+        webservice = webservice_for_person(
+            self.person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            limit=5,
+        )
+
+        self.assertEqual(200, response.status)
+        self.assertEqual(5, len(response.jsonBody()["cves"]))
+
+    def test_advancedSearch_unauthenticated(self):
+        in_distribution = [api_url(self.distribution1)]
+
+        webservice = webservice_for_person(
+            None,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            in_distribution=in_distribution,
+        )
+
+        self.assertEqual(401, response.status)
+        self.assertEqual(
+            b"Only authenticated users can use this endpoint",
+            response.body,
+        )
+
+    def test_advancedSearch_unauthorized(self):
+        in_distribution = [api_url(self.distribution1)]
+        person = self.factory.makePerson()
+
+        webservice = webservice_for_person(
+            person,
+            permission=OAuthPermission.WRITE_PRIVATE,
+            default_api_version="devel",
+        )
+        response = webservice.named_get(
+            self.cves_url,
+            "advancedSearch",
+            in_distribution=in_distribution,
+        )
+
+        self.assertEqual(401, response.status)
+        self.assertEqual(
+            b"Only security admins can use distribution1 as a filter",
+            response.body,
+        )
+
+
 class TestBugLinks(TestCaseWithFactory):
     layer = DatabaseFunctionalLayer
 

Follow ups