launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #33109
Re: [Merge] ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master
Looking good, left a few comments
Diff comments:
> diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
> index 4efd60c..dd341b4 100644
> --- a/lib/lp/bugs/interfaces/cve.py
> +++ b/lib/lp/bugs/interfaces/cve.py
> @@ -233,6 +241,66 @@ class ICveSet(Interface):
> def search(text):
> """Search the CVE database for matching CVE entries."""
>
> + def advancedSearch(
> + in_distribution: List = [],
> + not_in_distribution: List = [],
I'm always wary of having mutable vars as input variables. I'd go for `None` and then do `if None --> []`
> + since: Optional[Datetime] = None,
> + limit: Optional[int] = None,
> + ) -> Iterator[str]:
> + """Return an iterator of cve sequences that matches the given filters.
> +
> + :param in_distribution: filter cves that have a vulnerability for all
> + of these distributions
> + :param not_in_distribution: filter cves that have no vulnerability for
> + any of these distributions
> + :param since: only updated cves after `since` will be returned.
> + :param limit: return `limit` cves at max.
> + """
> +
> + # in_distribution patched in lp.bugs.interfaces.webservice
> + # not_in_distribution patched in lp.bugs.interfaces.webservice
> + @operation_parameters(
> + in_distribution=List(
> + title=_("Distributions linked to the cve"),
> + value_type=Reference(schema=Interface),
> + required=False,
> + ),
> + not_in_distribution=List(
> + title=_("Distributions not linked to the cve"),
> + value_type=Reference(schema=Interface),
> + required=False,
> + ),
> + since=Datetime(
> + title=_("Minimum cve.datemodified"),
> + description=_("Ignore cves that are older than this."),
> + required=False,
> + ),
> + limit=Int(
> + title=_("Maximum number of cves to return"),
> + required=False,
> + ),
> + )
> + @export_operation_as("advancedSearch")
> + @call_with(requester=REQUEST_USER)
> + @export_read_operation()
> + @operation_for_version("devel")
> + def api_advancedSearch(
Can we just name it `advancedSearch`? And the aux method above perhaps 'getFilteredCVEs' or something else? I think naming the endpoint `advancedSearch` but not the function will be confusing when getting back to this in the future
> + requester,
> + in_distribution: List = [],
> + not_in_distribution: List = [],
> + since: Optional[Datetime] = None,
IMO since could easily refer to date created, date modified or even date reported, so I'd go for a clearer name
Perhaps 'modified_since'?
> + limit: Optional[int] = None,
> + ) -> dict:
> + """Return cve sequences that matches the given filters.
> +
> + :param in_distribution: filter cves that have a vulnerability for all
> + of these distributions
> + :param not_in_distribution: filter cves that have no vulnerability for
> + any of these distributions
> + :param since: only updated cves after `since` will be returned.
> + :param limit: return `limit` cves at max.
> + """
> +
> def inText(text):
> """Find one or more Cve's by analysing the given text.
>
> diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
> index ce24b2d..a20df63 100644
> --- a/lib/lp/bugs/model/cve.py
> +++ b/lib/lp/bugs/model/cve.py
> @@ -256,6 +259,80 @@ class CveSet:
> .config(distinct=True)
> )
>
> + def advancedSearch(
> + self,
> + in_distribution=[],
> + not_in_distribution=[],
> + since=None,
> + limit=None,
Should we enforce a default limit?
I was curious to see if we did something similar elsewhere and I found https://launchpad.net/+apidoc/devel.html#pillars --> it seems that if None, we set a default batch size. I like that since it makes the request more conscious and less strain in the DB
> + ):
On a similar topic, should we consider any sort of pagination or offset? This I couldn't find in existing examples.
Might be more of a future improvement
Thinking about it, if results were sorted by date modified, the date modified could be used to paginate
> + """See `ICveSet`."""
> +
> + in_distribution_id = [d.id for d in in_distribution]
> + not_in_distribution_id = [d.id for d in not_in_distribution]
> +
> + query = """
Although I like a good SQL query, and we do some of these in the code, I'd be leaning towards using storm here.
If not, I'd at least use something like `sqlvalues` - part of the advantages of ORMs is that they are safer against SQL injection. In this case in specific, this won't really change much, the variables are ints and datetime which are validated in the API call. But as this query grows (which will eventually happen), the next person might add a text field and not sanitize because all others aren't, so I'd go for sanitizing regardless.
I'd still go for storm if it's not too hard to create the query there. You perhaps wouldn't need that mapping to lists of IDs with storm either
> + SELECT c.sequence
> + FROM cve c
> + LEFT JOIN vulnerability v ON c.id = v.cve
> + WHERE (%s is NULL OR c.datemodified > %s)
> + GROUP BY c.id, c.sequence
> + HAVING
> + (
> + %s = 0
> + OR COUNT(DISTINCT v.distribution) FILTER (
> + WHERE v.distribution = ANY(%s)
> + ) = %s
> + )
> + AND COUNT(DISTINCT v.distribution) FILTER (
> + WHERE v.distribution = ANY(%s)
> + ) = 0
> + LIMIT %s;
I think we should order the result on something, either id or date. Perhaps date modified or date created?
> + """
> +
> + params = (
> + since,
> + since,
> + len(set(in_distribution_id)),
> + in_distribution_id,
> + len(set(in_distribution_id)),
> + not_in_distribution_id,
> + limit,
> + )
> +
> + store = IStore(Vulnerability)
> + result = store.execute(query, params)
> + for r in result:
> + yield r[0]
> +
> + def api_advancedSearch(
> + self,
> + requester,
> + in_distribution=[],
> + not_in_distribution=[],
> + since=None,
> + limit=None,
> + ):
> + """See `ICveSet`."""
> + if not requester:
> + raise Unauthorized(
> + "Only authenticated users can use this endpoint"
> + )
> +
> + for distro in in_distribution + not_in_distribution:
> + user = IPersonRoles(requester)
> + if not SecurityAdminDistribution(distro).checkAuthenticated(user):
> + raise Unauthorized(
> + f"Only security admins can use {distro.name} as a filter"
> + )
> +
> + cves = list(
> + self.advancedSearch(
> + in_distribution, not_in_distribution, since, limit
> + )
> + )
> + return {"cves": cves}
> +
> def inText(self, text):
> """See ICveSet."""
> # let's look for matching entries
> diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
> index 23fd423..27faf68 100644
> --- a/lib/lp/bugs/tests/test_cve.py
> +++ b/lib/lp/bugs/tests/test_cve.py
> @@ -112,6 +115,263 @@ class TestCveSet(TestCaseWithFactory):
> self.assertEqual(base, getUtility(ICveSet).getBugCveCount())
>
>
> +class TestCveSetAdvancedSearch(TestCaseWithFactory):
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.cves = getUtility(ICveSet)
> +
> + def test_advancedSearch_default_arguments(self):
> + # All cves will be returned as there are not vulnerabilities created
> + result = self.cves.advancedSearch()
> + self.assertEqual(10, len(list(result)))
> +
> + def test_advancedSearch_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> +
> + # No result as there is no cve with a vulnerability for distribution1
> + result = self.cves.advancedSearch(in_distribution=[distribution1])
> + self.assertEqual(0, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # There is 1 cve with a vulnerability for distribution
> + result = self.cves.advancedSearch(in_distribution=[distribution1])
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + # No result as there is no cve with a vulnerability for distribution2
> + result = self.cves.advancedSearch(in_distribution=[distribution2])
> + self.assertEqual(0, len(list(result)))
> +
> + def test_advancedSearch_not_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> +
> + # No result as there is no cve with a vulnerability for distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(10, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> +
> + # There are 11 cve without a vulnerability for distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(11, len(list(result)))
> +
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # There are 10 cve without a vulnerability for any of distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(10, len(list(result)))
> +
> + def test_advancedSearch_both_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> + distribution3 = self.factory.makeDistribution()
> +
> + # No result as there are no cves that:
> + # - has a vulnerability for distribution1 and distribution2
> + # - has no vulnerability for distribution3
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # No result as we only created the vulnerability for distribution1
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + # 1 result as we created a vulnerability for distribution1 and
> + # distribution2
> + self.factory.makeVulnerability(distribution2, cve=cve)
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + # No result as we created a vulnerability for distribution3 for the cve
> + self.factory.makeVulnerability(distribution3, cve=cve)
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + def test_advancedSearch_since(self):
> + since = datetime.utcnow() - timedelta(days=1)
> + self.factory.makeCVE(sequence="2099-9876")
> +
> + result = self.cves.advancedSearch(since=since)
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + def test_advancedSearch_limit(self):
> + result = self.cves.advancedSearch(limit=5)
> + self.assertEqual(5, len(list(result)))
> +
> +
> +class TestCveSetAdvancedSearchWebService(TestCaseWithFactory):
Nice test coverage!
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.person = self.factory.makePerson()
> + self.distribution1 = self.factory.makeDistribution(
> + owner=self.person, name="distribution1"
> + )
> + self.distribution2 = self.factory.makeDistribution(
> + owner=self.person, name="distribution2"
> + )
> +
> + self.api_base = "http://api.launchpad.test/devel"
> + self.cves = getUtility(ICveSet)
> + self.cves_url = self.api_base + api_url(self.cves)
> +
> + def test_advancedSearch_default_arguments(self):
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(10, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_in_distribution(self):
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(self.distribution1, cve=cve)
> +
> + in_distribution = [api_url(self.distribution1)]
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(1, len(response.jsonBody()["cves"]))
> + self.assertEqual("2099-9876", response.jsonBody()["cves"][0])
> +
> + def test_advancedSearch_not_in_distribution(self):
> + not_in_distribution = [api_url(self.distribution1)]
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + not_in_distribution=not_in_distribution,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(10, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_since(self):
> + self.factory.makeCVE(sequence="2099-9876")
> +
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + since=(datetime.utcnow() - timedelta(days=1)).isoformat(),
I'd consider adding a quick test that a regular date string also works - I think that's the case - (and/or that a string that's not a date doesn't)
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(1, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_limit(self):
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + limit=5,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(5, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_unauthenticated(self):
> + in_distribution = [api_url(self.distribution1)]
> +
> + webservice = webservice_for_person(
> + None,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(401, response.status)
> + self.assertEqual(
> + b"Only authenticated users can use this endpoint",
> + response.body,
> + )
> +
> + def test_advancedSearch_unauthorized(self):
> + in_distribution = [api_url(self.distribution1)]
> + person = self.factory.makePerson()
> +
> + webservice = webservice_for_person(
> + person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(401, response.status)
> + self.assertEqual(
> + b"Only security admins can use distribution1 as a filter",
> + response.body,
> + )
> +
> +
> class TestBugLinks(TestCaseWithFactory):
> layer = DatabaseFunctionalLayer
>
--
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/493853
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.
References