launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #33113
Re: [Merge] ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master
Diff comments:
> diff --git a/lib/lp/bugs/interfaces/cve.py b/lib/lp/bugs/interfaces/cve.py
> index 4efd60c..dd341b4 100644
> --- a/lib/lp/bugs/interfaces/cve.py
> +++ b/lib/lp/bugs/interfaces/cve.py
> @@ -233,6 +241,66 @@ class ICveSet(Interface):
> def search(text):
> """Search the CVE database for matching CVE entries."""
>
> + def advancedSearch(
> + in_distribution: List = [],
> + not_in_distribution: List = [],
done
> + since: Optional[Datetime] = None,
> + limit: Optional[int] = None,
> + ) -> Iterator[str]:
> + """Return an iterator of cve sequences that matches the given filters.
> +
> + :param in_distribution: filter cves that have a vulnerability for all
> + of these distributions
> + :param not_in_distribution: filter cves that have no vulnerability for
> + any of these distributions
> + :param since: only updated cves after `since` will be returned.
> + :param limit: return `limit` cves at max.
> + """
> +
> + # in_distribution patched in lp.bugs.interfaces.webservice
> + # not_in_distribution patched in lp.bugs.interfaces.webservice
> + @operation_parameters(
> + in_distribution=List(
> + title=_("Distributions linked to the cve"),
> + value_type=Reference(schema=Interface),
> + required=False,
> + ),
> + not_in_distribution=List(
> + title=_("Distributions not linked to the cve"),
> + value_type=Reference(schema=Interface),
> + required=False,
> + ),
> + since=Datetime(
> + title=_("Minimum cve.datemodified"),
> + description=_("Ignore cves that are older than this."),
> + required=False,
> + ),
> + limit=Int(
> + title=_("Maximum number of cves to return"),
> + required=False,
> + ),
> + )
> + @export_operation_as("advancedSearch")
> + @call_with(requester=REQUEST_USER)
> + @export_read_operation()
> + @operation_for_version("devel")
> + def api_advancedSearch(
I did it as I saw that in other models, thought that was a convention. Tbh it could be something we were doing in old code.
Indeed I agree with you as I got confused when debugging some time ago as I didn't know we were adding `api_` to the method I was looking for.
So as I've only did this as I thought it was a convention, I'm changing the names to your suggestions :)
> + requester,
> + in_distribution: List = [],
> + not_in_distribution: List = [],
> + since: Optional[Datetime] = None,
Done.
> + limit: Optional[int] = None,
> + ) -> dict:
> + """Return cve sequences that matches the given filters.
> +
> + :param in_distribution: filter cves that have a vulnerability for all
> + of these distributions
> + :param not_in_distribution: filter cves that have no vulnerability for
> + any of these distributions
> + :param since: only updated cves after `since` will be returned.
> + :param limit: return `limit` cves at max.
> + """
> +
> def inText(text):
> """Find one or more Cve's by analysing the given text.
>
> diff --git a/lib/lp/bugs/model/cve.py b/lib/lp/bugs/model/cve.py
> index ce24b2d..a20df63 100644
> --- a/lib/lp/bugs/model/cve.py
> +++ b/lib/lp/bugs/model/cve.py
> @@ -256,6 +259,80 @@ class CveSet:
> .config(distinct=True)
> )
>
> + def advancedSearch(
> + self,
> + in_distribution=[],
> + not_in_distribution=[],
> + since=None,
> + limit=None,
I've set the default value to `config.launchpad.default_batch_size`.
> + ):
If there are more than limit cves that were modified at the same time, using the date to paginate is not useful. I've added an `OFFSET` parameter.
> + """See `ICveSet`."""
> +
> + in_distribution_id = [d.id for d in in_distribution]
> + not_in_distribution_id = [d.id for d in not_in_distribution]
> +
> + query = """
In today's meeting when you talked about this I first got confused and said that it was already sanitized. You understood that it was due to the API call but I was referring to the `execute()` method. Didn't talked more about that because I was confused.
It is mentioned in the `sqlvalues()` definition:
""" This is DEPRECATED in favour of passing parameters to SQL statements
using the second parameter to `cursor.execute` (normally via the Storm
query compiler), because it does not deal with escaping '%' characters
in strings.
"""
I've found a way to do an alternative query (with the same EXPLAIN) that does not use `FILTER` (using a workaround with `CASE` doing some sort of `if`, it was taking some more time in planning the query due to that, and it was also larger) but storm does also not implement `ANY` so I think it is fine to left this query as it is sanitized (spent too much maybe but it was worthy to learn more of storm).
I've also manually tried to do sql injection, it raises:
```
storm.database.InvalidTextRepresentation: invalid input syntax for type bigint: "5 AND 1 = 1"
```
I was not able to add this as a test because a cannot import that exception, not sure why.
> + SELECT c.sequence
> + FROM cve c
> + LEFT JOIN vulnerability v ON c.id = v.cve
> + WHERE (%s is NULL OR c.datemodified > %s)
> + GROUP BY c.id, c.sequence
> + HAVING
> + (
> + %s = 0
> + OR COUNT(DISTINCT v.distribution) FILTER (
> + WHERE v.distribution = ANY(%s)
> + ) = %s
> + )
> + AND COUNT(DISTINCT v.distribution) FILTER (
> + WHERE v.distribution = ANY(%s)
> + ) = 0
> + LIMIT %s;
I think ordering by date modified would be the best, done :)
> + """
> +
> + params = (
> + since,
> + since,
> + len(set(in_distribution_id)),
> + in_distribution_id,
> + len(set(in_distribution_id)),
> + not_in_distribution_id,
> + limit,
> + )
> +
> + store = IStore(Vulnerability)
> + result = store.execute(query, params)
> + for r in result:
> + yield r[0]
> +
> + def api_advancedSearch(
> + self,
> + requester,
> + in_distribution=[],
> + not_in_distribution=[],
> + since=None,
> + limit=None,
> + ):
> + """See `ICveSet`."""
> + if not requester:
> + raise Unauthorized(
> + "Only authenticated users can use this endpoint"
> + )
> +
> + for distro in in_distribution + not_in_distribution:
> + user = IPersonRoles(requester)
> + if not SecurityAdminDistribution(distro).checkAuthenticated(user):
> + raise Unauthorized(
> + f"Only security admins can use {distro.name} as a filter"
> + )
> +
> + cves = list(
> + self.advancedSearch(
> + in_distribution, not_in_distribution, since, limit
> + )
> + )
> + return {"cves": cves}
> +
> def inText(self, text):
> """See ICveSet."""
> # let's look for matching entries
> diff --git a/lib/lp/bugs/tests/test_cve.py b/lib/lp/bugs/tests/test_cve.py
> index 23fd423..27faf68 100644
> --- a/lib/lp/bugs/tests/test_cve.py
> +++ b/lib/lp/bugs/tests/test_cve.py
> @@ -112,6 +115,263 @@ class TestCveSet(TestCaseWithFactory):
> self.assertEqual(base, getUtility(ICveSet).getBugCveCount())
>
>
> +class TestCveSetAdvancedSearch(TestCaseWithFactory):
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.cves = getUtility(ICveSet)
> +
> + def test_advancedSearch_default_arguments(self):
> + # All cves will be returned as there are not vulnerabilities created
> + result = self.cves.advancedSearch()
> + self.assertEqual(10, len(list(result)))
> +
> + def test_advancedSearch_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> +
> + # No result as there is no cve with a vulnerability for distribution1
> + result = self.cves.advancedSearch(in_distribution=[distribution1])
> + self.assertEqual(0, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # There is 1 cve with a vulnerability for distribution
> + result = self.cves.advancedSearch(in_distribution=[distribution1])
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + # No result as there is no cve with a vulnerability for distribution2
> + result = self.cves.advancedSearch(in_distribution=[distribution2])
> + self.assertEqual(0, len(list(result)))
> +
> + def test_advancedSearch_not_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> +
> + # No result as there is no cve with a vulnerability for distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(10, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> +
> + # There are 11 cve without a vulnerability for distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(11, len(list(result)))
> +
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # There are 10 cve without a vulnerability for any of distributions
> + result = self.cves.advancedSearch(
> + not_in_distribution=[distribution1, distribution2]
> + )
> + self.assertEqual(10, len(list(result)))
> +
> + def test_advancedSearch_both_in_distribution(self):
> + distribution1 = self.factory.makeDistribution()
> + distribution2 = self.factory.makeDistribution()
> + distribution3 = self.factory.makeDistribution()
> +
> + # No result as there are no cves that:
> + # - has a vulnerability for distribution1 and distribution2
> + # - has no vulnerability for distribution3
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(distribution1, cve=cve)
> +
> + # No result as we only created the vulnerability for distribution1
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + # 1 result as we created a vulnerability for distribution1 and
> + # distribution2
> + self.factory.makeVulnerability(distribution2, cve=cve)
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + # No result as we created a vulnerability for distribution3 for the cve
> + self.factory.makeVulnerability(distribution3, cve=cve)
> + result = self.cves.advancedSearch(
> + in_distribution=[distribution1, distribution2],
> + not_in_distribution=[distribution3],
> + )
> + self.assertEqual(0, len(list(result)))
> +
> + def test_advancedSearch_since(self):
> + since = datetime.utcnow() - timedelta(days=1)
> + self.factory.makeCVE(sequence="2099-9876")
> +
> + result = self.cves.advancedSearch(since=since)
> + cves = list(result)
> + self.assertEqual(1, len(cves))
> + self.assertEqual("2099-9876", cves[0])
> +
> + def test_advancedSearch_limit(self):
> + result = self.cves.advancedSearch(limit=5)
> + self.assertEqual(5, len(list(result)))
> +
> +
> +class TestCveSetAdvancedSearchWebService(TestCaseWithFactory):
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.person = self.factory.makePerson()
> + self.distribution1 = self.factory.makeDistribution(
> + owner=self.person, name="distribution1"
> + )
> + self.distribution2 = self.factory.makeDistribution(
> + owner=self.person, name="distribution2"
> + )
> +
> + self.api_base = "http://api.launchpad.test/devel"
> + self.cves = getUtility(ICveSet)
> + self.cves_url = self.api_base + api_url(self.cves)
> +
> + def test_advancedSearch_default_arguments(self):
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(10, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_in_distribution(self):
> + cve = self.factory.makeCVE(sequence="2099-9876")
> + self.factory.makeVulnerability(self.distribution1, cve=cve)
> +
> + in_distribution = [api_url(self.distribution1)]
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(1, len(response.jsonBody()["cves"]))
> + self.assertEqual("2099-9876", response.jsonBody()["cves"][0])
> +
> + def test_advancedSearch_not_in_distribution(self):
> + not_in_distribution = [api_url(self.distribution1)]
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + not_in_distribution=not_in_distribution,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(10, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_since(self):
> + self.factory.makeCVE(sequence="2099-9876")
> +
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + since=(datetime.utcnow() - timedelta(days=1)).isoformat(),
Nice, done :)
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(1, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_limit(self):
> + webservice = webservice_for_person(
> + self.person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + limit=5,
> + )
> +
> + self.assertEqual(200, response.status)
> + self.assertEqual(5, len(response.jsonBody()["cves"]))
> +
> + def test_advancedSearch_unauthenticated(self):
> + in_distribution = [api_url(self.distribution1)]
> +
> + webservice = webservice_for_person(
> + None,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(401, response.status)
> + self.assertEqual(
> + b"Only authenticated users can use this endpoint",
> + response.body,
> + )
> +
> + def test_advancedSearch_unauthorized(self):
> + in_distribution = [api_url(self.distribution1)]
> + person = self.factory.makePerson()
> +
> + webservice = webservice_for_person(
> + person,
> + permission=OAuthPermission.WRITE_PRIVATE,
> + default_api_version="devel",
> + )
> + response = webservice.named_get(
> + self.cves_url,
> + "advancedSearch",
> + in_distribution=in_distribution,
> + )
> +
> + self.assertEqual(401, response.status)
> + self.assertEqual(
> + b"Only security admins can use distribution1 as a filter",
> + response.body,
> + )
> +
> +
> class TestBugLinks(TestCaseWithFactory):
> layer = DatabaseFunctionalLayer
>
--
https://code.launchpad.net/~enriqueesanchz/launchpad/+git/launchpad/+merge/493853
Your team Launchpad code reviewers is requested to review the proposed merge of ~enriqueesanchz/launchpad:add-get-untriaged-cves into launchpad:master.
References