launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20782
[Merge] lp:~maxiberta/launchpad/named-auth-tokens-bulk-api into lp:launchpad
Maximiliano Bertacchini has proposed merging lp:~maxiberta/launchpad/named-auth-tokens-bulk-api into lp:launchpad with lp:~maxiberta/launchpad/named-auth-tokens-htaccess as a prerequisite.
Commit message:
Add API for bulk creation and revocation of named auth tokens.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~maxiberta/launchpad/named-auth-tokens-bulk-api/+merge/300016
Add API for bulk creation and revocation of named auth tokens.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~maxiberta/launchpad/named-auth-tokens-bulk-api into lp:launchpad.
=== modified file 'lib/lp/soyuz/interfaces/archive.py'
--- lib/lp/soyuz/interfaces/archive.py 2016-07-14 00:25:14 +0000
+++ lib/lp/soyuz/interfaces/archive.py 2016-07-14 00:25:14 +0000
@@ -2116,6 +2116,22 @@
"""
@operation_parameters(
+ names=List(
+ title=_("Authorization token names"),
+ value_type=TextLine(), required=True))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def newNamedAuthTokens(names):
+ """Create named authorization tokens in bulk.
+
+ :param names: A list of token names.
+
+ :return: A dictionary of {name: {token, archive_url} where `name` is
+ a token name, `token` is the secret and `archive_url` is the
+ externally-usable archive URL including basic auth.
+ """
+
+ @operation_parameters(
name=TextLine(title=_("Authorization token name"), required=True))
@export_read_operation()
@operation_for_version("devel")
@@ -2146,12 +2162,24 @@
@export_write_operation()
@operation_for_version("devel")
def revokeNamedAuthToken(name):
- """Deactivates a named authorization token.
+ """Deactivate a named authorization token.
:param name: The identifier string for a token.
:raises NotFoundError: if no matching token could be found.
"""
+ @operation_parameters(
+ names=List(
+ title=_("Authorization token names"),
+ value_type=TextLine(), required=True))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def revokeNamedAuthTokens(names):
+ """Deactivate named authorization tokens in bulk.
+
+ :param names: A list of token names.
+ """
+
class IArchiveAdmin(Interface):
"""Archive interface for operations restricted by commercial."""
=== modified file 'lib/lp/soyuz/model/archive.py'
--- lib/lp/soyuz/model/archive.py 2016-07-14 00:25:14 +0000
+++ lib/lp/soyuz/model/archive.py 2016-07-14 00:25:14 +0000
@@ -93,6 +93,7 @@
from lp.registry.model.teammembership import TeamParticipation
from lp.services.config import config
from lp.services.database.bulk import (
+ create,
load_referencing,
load_related,
)
@@ -1985,20 +1986,57 @@
else:
return archive_auth_token
- def getNamedAuthToken(self, name):
+ def newNamedAuthTokens(self, names, as_dict=True):
+ """See `IArchive`."""
+
+ if not getFeatureFlag(NAMED_AUTH_TOKEN_FEATURE_FLAG):
+ raise NamedAuthTokenFeatureDisabled()
+
+ # Bail if the archive isn't private
+ if not self.private:
+ raise ArchiveNotPrivate("Archive must be private.")
+
+ # Check for duplicate names.
+ token_set = getUtility(IArchiveAuthTokenSet)
+ active_tokens = token_set.getActiveNamedTokensForArchive(self)
+ active_tokens = removeSecurityProxy(active_tokens)
+ dup_tokens = active_tokens.find(ArchiveAuthToken.name.is_in(names))
+ dup_names = [token.name for token in dup_tokens]
+
+ values = [
+ (name, create_token(20), self) for name in names
+ if name not in dup_names]
+ tokens = create(
+ (ArchiveAuthToken.name, ArchiveAuthToken.token,
+ ArchiveAuthToken.archive), values, get_objects=True)
+
+ # Return all requested tokens, including duplicates.
+ tokens.extend(dup_tokens)
+ if as_dict:
+ return {token.name: token.asDict() for token in tokens}
+ else:
+ return tokens
+
+ def getNamedAuthToken(self, name, as_dict=True):
"""See `IArchive`."""
token_set = getUtility(IArchiveAuthTokenSet)
auth_token = token_set.getActiveNamedTokenForArchive(self, name)
if auth_token is not None:
- return auth_token.asDict()
+ if as_dict:
+ return auth_token.asDict()
+ else:
+ return auth_token
else:
raise NotFoundError(name)
- def getNamedAuthTokens(self):
+ def getNamedAuthTokens(self, as_dict=True):
"""See `IArchive`."""
token_set = getUtility(IArchiveAuthTokenSet)
auth_tokens = token_set.getActiveNamedTokensForArchive(self)
- return [auth_token.asDict() for auth_token in auth_tokens]
+ if as_dict:
+ return [auth_token.asDict() for auth_token in auth_tokens]
+ else:
+ return auth_tokens
def revokeNamedAuthToken(self, name):
"""See `IArchive`."""
@@ -2009,6 +2047,14 @@
else:
raise NotFoundError(name)
+ def revokeNamedAuthTokens(self, names):
+ """See `IArchive`."""
+ token_set = getUtility(IArchiveAuthTokenSet)
+ active_tokens = token_set.getActiveNamedTokensForArchive(self)
+ active_tokens = removeSecurityProxy(active_tokens)
+ tokens = active_tokens.find(ArchiveAuthToken.name.is_in(names))
+ tokens.set(date_deactivated=UTC_NOW)
+
def newSubscription(self, subscriber, registrant, date_expires=None,
description=None):
"""See `IArchive`."""
=== modified file 'lib/lp/soyuz/tests/test_archive.py'
--- lib/lp/soyuz/tests/test_archive.py 2016-07-14 00:25:14 +0000
+++ lib/lp/soyuz/tests/test_archive.py 2016-07-14 00:25:14 +0000
@@ -12,8 +12,10 @@
from pytz import UTC
from testtools.matchers import (
+ AllMatch,
DocTestMatches,
LessThan,
+ MatchesPredicate,
MatchesRegex,
MatchesStructure,
)
@@ -41,6 +43,7 @@
from lp.registry.interfaces.teammembership import TeamMembershipStatus
from lp.services.database.interfaces import IStore
from lp.services.database.sqlbase import sqlvalues
+from lp.services.features import getFeatureFlag
from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.propertycache import (
@@ -85,7 +88,6 @@
RedirectedPocket,
VersionRequiresName,
)
-from lp.soyuz.interfaces.archiveauthtoken import IArchiveAuthTokenSet
from lp.soyuz.interfaces.archivepermission import IArchivePermissionSet
from lp.soyuz.interfaces.binarypackagebuild import BuildSetStatus
from lp.soyuz.interfaces.binarypackagename import IBinaryPackageNameSet
@@ -111,6 +113,7 @@
login,
login_person,
person_logged_in,
+ StormStatementRecorder,
RequestTimelineCollector,
TestCaseWithFactory,
)
@@ -1295,8 +1298,7 @@
def test_newNamedAuthToken_private_archive(self):
res = self.private_ppa.newNamedAuthToken(u"tokenname")
- token = getUtility(IArchiveAuthTokenSet).getActiveNamedTokenForArchive(
- self.private_ppa, u"tokenname")
+ token = self.private_ppa.getNamedAuthToken(u"tokenname", as_dict=False)
self.assertIsNotNone(token)
self.assertIsNone(token.person)
self.assertEqual("tokenname", token.name)
@@ -1324,15 +1326,44 @@
u"tokenname", u"somesecret", as_dict=False)
self.assertEqual(u"somesecret", token.token)
+ def test_newNamedAuthTokens_private_archive(self):
+ res = self.private_ppa.newNamedAuthTokens((u"name1", u"name2"))
+ tokens = self.private_ppa.getNamedAuthTokens(as_dict=False)
+ self.assertDictEqual({tok.name: tok.asDict() for tok in tokens}, res)
+
+ def test_newNamedAuthTokens_public_archive(self):
+ public_ppa = self.factory.makeArchive(private=False)
+ self.assertRaises(ArchiveNotPrivate,
+ public_ppa.newNamedAuthTokens, (u"name1", u"name2"))
+
+ def test_newNamedAuthTokens_duplicate_name(self):
+ self.private_ppa.newNamedAuthToken(u"tok1")
+ res = self.private_ppa.newNamedAuthTokens((u"tok1", u"tok2", u"tok3"))
+ tokens = self.private_ppa.getNamedAuthTokens(as_dict=False)
+ self.assertDictEqual({tok.name: tok.asDict() for tok in tokens}, res)
+
+ def test_newNamedAuthTokens_idempotent(self):
+ names = (u"name1", u"name2", u"name3", u"name4", u"name5")
+ res1 = self.private_ppa.newNamedAuthTokens(names)
+ res2 = self.private_ppa.newNamedAuthTokens(names)
+ self.assertEqual(res1, res2)
+
+ def test_newNamedAuthTokens_query_count(self):
+ # Preload feature flag so it is cached.
+ getFeatureFlag(NAMED_AUTH_TOKEN_FEATURE_FLAG)
+ with StormStatementRecorder() as recorder1:
+ self.private_ppa.newNamedAuthTokens((u"tok1"))
+ with StormStatementRecorder() as recorder2:
+ self.private_ppa.newNamedAuthTokens((u"tok1", u"tok2", u"tok3"))
+ self.assertThat(recorder2, HasQueryCount.byEquality(recorder1))
+
def test_getNamedAuthToken_with_no_token(self):
self.assertRaises(
NotFoundError, self.private_ppa.getNamedAuthToken, u"tokenname")
def test_getNamedAuthToken_with_token(self):
res = self.private_ppa.newNamedAuthToken(u"tokenname")
- self.assertEqual(
- self.private_ppa.getNamedAuthToken(u"tokenname"),
- res)
+ self.assertEqual(self.private_ppa.getNamedAuthToken(u"tokenname"), res)
def test_revokeNamedAuthToken_with_token(self):
token = self.private_ppa.newNamedAuthToken(u"tokenname", as_dict=False)
@@ -1343,6 +1374,40 @@
self.assertRaises(
NotFoundError, self.private_ppa.revokeNamedAuthToken, u"tokenname")
+ def test_revokeNamedAuthTokens(self):
+ names = (u"name1", u"name2", u"name3", u"name4", u"name5")
+ tokens = self.private_ppa.newNamedAuthTokens(names, as_dict=False)
+ self.assertThat(
+ tokens, AllMatch(MatchesPredicate(
+ lambda x: not x.date_deactivated, '%s is not active.')))
+ self.private_ppa.revokeNamedAuthTokens(names)
+ self.assertThat(
+ tokens, AllMatch(MatchesPredicate(
+ lambda x: x.date_deactivated, '%s is active.')))
+
+ def test_revokeNamedAuthTokens_with_previously_revoked_token(self):
+ names = (u"name1", u"name2", u"name3", u"name4", u"name5")
+ self.private_ppa.newNamedAuthTokens(names)
+ token1 = self.private_ppa.getNamedAuthToken(u"name1", as_dict=False)
+ token2 = self.private_ppa.getNamedAuthToken(u"name2", as_dict=False)
+
+ # Revoke token1.
+ deactivation_time_1 = datetime.now(UTC) - timedelta(seconds=90)
+ token1.date_deactivated = deactivation_time_1
+
+ # Revoke all tokens, including token1.
+ self.private_ppa.revokeNamedAuthTokens(names)
+
+ # Check that token1.date_deactivated has not changed.
+ self.assertEqual(deactivation_time_1, token1.date_deactivated)
+ self.assertLess(token1.date_deactivated, token2.date_deactivated)
+
+ def test_revokeNamedAuthTokens_idempotent(self):
+ names = (u"name1", u"name2", u"name3", u"name4", u"name5")
+ res1 = self.private_ppa.revokeNamedAuthTokens(names)
+ res2 = self.private_ppa.revokeNamedAuthTokens(names)
+ self.assertEqual(res1, res2)
+
def test_getNamedAuthToken_with_revoked_token(self):
self.private_ppa.newNamedAuthToken(u"tokenname")
self.private_ppa.revokeNamedAuthToken(u"tokenname")
Follow ups