launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20726
[Merge] lp:~maxiberta/launchpad/named-auth-tokens into lp:launchpad
Maximiliano Bertacchini has proposed merging lp:~maxiberta/launchpad/named-auth-tokens into lp:launchpad.
Commit message:
Add new named ArchiveAuthToken API.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~maxiberta/launchpad/named-auth-tokens/+merge/299432
Add new named ArchiveAuthToken API. This is required for UA customer delivery of kernel livepatches.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~maxiberta/launchpad/named-auth-tokens into lp:launchpad.
=== modified file 'lib/lp/soyuz/interfaces/archive.py'
--- lib/lp/soyuz/interfaces/archive.py 2016-01-26 15:47:37 +0000
+++ lib/lp/soyuz/interfaces/archive.py 2016-07-07 14:30:49 +0000
@@ -151,6 +151,12 @@
"""Raised when creating an archive subscription for a public archive."""
+@error_status(httplib.FORBIDDEN)
+class ArchiveNotOwner(Exception):
+ """Raised when the user is not the owner or a member of the owner team
+ of an archive."""
+
+
@error_status(httplib.BAD_REQUEST)
class NoTokensForTeams(Exception):
"""Raised when creating a token for a team, rather than a person."""
@@ -312,6 +318,12 @@
self._fmt % {'processor': processor.name})
+@error_status(httplib.BAD_REQUEST)
+class DuplicateTokenName(Exception):
+ """Raised when creating a named token and an active token for this archive
+ with this name already exists."""
+
+
class IArchivePublic(IPrivacy, IHasOwner):
"""An Archive interface for publicly available operations."""
# Most of this stuff should really be on View, but it's needed for
@@ -526,12 +538,12 @@
"""
def newAuthToken(person, token=None, date_created=None):
- """Create a new authorisation token.
+ """Create a new authorization token.
- :param person: An IPerson whom this token is for
+ :param person: An IPerson whom this token is for.
:param token: Optional unicode text to use as the token. One will be
- generated if not given
- :param date_created: Optional, defaults to now
+ generated if not given.
+ :param date_created: Optional, defaults to now.
:return: A new IArchiveAuthToken
"""
@@ -1324,7 +1336,7 @@
@operation_returns_collection_of(Interface)
@export_read_operation()
def getQueueAdminsForComponent(component_name):
- """Return `IArchivePermission` records for authorised queue admins.
+ """Return `IArchivePermission` records for authorized queue admins.
:param component_name: An `IComponent` or textual name for the
component.
@@ -1377,7 +1389,7 @@
@export_read_operation()
@operation_for_version("devel")
def getQueueAdminsForPocket(pocket, distroseries=None):
- """Return `IArchivePermission` records for authorised queue admins.
+ """Return `IArchivePermission` records for authorized queue admins.
:param pocket: A `PackagePublishingPocket`.
:param distroseries: An optional `IDistroSeries`.
@@ -2062,6 +2074,7 @@
:return: a `IArchiveDependency` object targeted to the context
`IArchive` requiring 'dependency' `IArchive`.
"""
+
@operation_parameters(
dependency=Reference(schema=Interface, required=True),
# Really IArchive
@@ -2074,6 +2087,57 @@
:param dependency: is an `IArchive` object.
"""
+ @operation_parameters(
+ name=TextLine(title=_("Authorization token name"), required=True))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def newNamedAuthToken(name, token=None, date_created=None):
+ """Create a new named authorization token.
+
+ :param name: An identifier string for this token.
+ :param token: Optional unicode text to use as the token. One will be
+ generated if not given.
+ :param date_created: Optional, defaults to now.
+
+ :return: A dictionary where the value of `token` is the secret and
+ the value of `archive_url` is the externally-usable archive URL
+ including basic auth.
+ """
+
+ @operation_parameters(
+ name=TextLine(title=_("Authorization token name"), required=True))
+ @export_read_operation()
+ @operation_for_version("devel")
+ def getNamedAuthToken(name):
+ """Return a named authorization token for a given archive and name.
+
+ :param name: The identifier string for a token.
+
+ :return: A dictionary where the value of `token` is the secret and
+ the value of `archive_url` is the externally-usable archive URL
+ including basic auth.
+ """
+
+ @export_read_operation()
+ @operation_for_version("devel")
+ def getNamedAuthTokens():
+ """Return a list of named authorization tokens for a given archive.
+
+ :return: A list of dictionaries where the value of `token` is the
+ secret and the value of `archive_url` is the externally-usable
+ archive URL including basic auth.
+ """
+
+ @operation_parameters(
+ name=TextLine(title=_("Authorization token name"), required=True))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def revokeNamedAuthToken(name):
+ """Deactivates a named authorization token.
+
+ :param name: The identifier string for a token.
+ """
+
class IArchiveAdmin(Interface):
"""Archive interface for operations restricted by commercial."""
=== modified file 'lib/lp/soyuz/interfaces/archiveauthtoken.py'
--- lib/lp/soyuz/interfaces/archiveauthtoken.py 2013-01-07 02:40:55 +0000
+++ lib/lp/soyuz/interfaces/archiveauthtoken.py 2016-07-07 14:30:49 +0000
@@ -27,16 +27,16 @@
class IArchiveAuthTokenView(Interface):
- """Interface for Archive Authorisation Tokens requiring launchpad.View."""
+ """Interface for Archive Authorization Tokens requiring launchpad.View."""
id = Int(title=_('ID'), required=True, readonly=True)
archive = Reference(
IArchive, title=_("Archive"), required=True, readonly=True,
- description=_("The archive for this authorisation token."))
+ description=_("The archive for this authorization token."))
person = Reference(
- IPerson, title=_("Person"), required=True, readonly=True,
- description=_("The person for this authorisation token."))
+ IPerson, title=_("Person"), required=False, readonly=True,
+ description=_("The person for this authorization token."))
person_id = Attribute('db person value')
date_created = Datetime(
@@ -56,9 +56,19 @@
description=_(
"External archive URL including basic auth for this person"))
- def deactivate(self):
+ name = TextLine(
+ title=_("Name"), required=False, readonly=True,
+ description=_("The name for this named authorization token."))
+
+ def deactivate():
"""Deactivate the token by setting date_deactivated to UTC_NOW."""
+ def as_dict():
+ """Returns a dictionary where the value of `token` is the secret and
+ the value of `archive_url` is the externally-usable archive URL
+ including basic auth.
+ """
+
class IArchiveAuthTokenEdit(Interface):
"""Interface for Archive Auth Tokens requiring launchpad.Edit."""
@@ -99,3 +109,18 @@
:param person: The person to which the token corresponds.
:return An object conforming to IArchiveAuthToken or None.
"""
+
+ def getActiveNamedTokenForArchive(archive, name):
+ """Retrieve an active named token for the given archive and name.
+
+ :param archive: The archive to which the token corresponds.
+ :param name: The name of a named authorization token.
+ :return An object conforming to IArchiveAuthToken or None.
+ """
+
+ def getActiveNamedTokensForArchive(archive):
+ """Retrieve all active named tokens for the given archive.
+
+ :param archive: The archive to which the tokens correspond.
+ :return: A result set containing `IArchiveAuthToken`s.
+ """
=== modified file 'lib/lp/soyuz/model/archive.py'
--- lib/lp/soyuz/model/archive.py 2016-03-14 23:42:45 +0000
+++ lib/lp/soyuz/model/archive.py 2016-07-07 14:30:49 +0000
@@ -140,6 +140,7 @@
ArchiveAlreadyDeleted,
ArchiveDependencyError,
ArchiveDisabled,
+ ArchiveNotOwner,
ArchiveNotPrivate,
CannotCopy,
CannotModifyArchiveProcessor,
@@ -149,6 +150,7 @@
CannotUploadToSeries,
ComponentNotFound,
default_name_by_purpose,
+ DuplicateTokenName,
FULL_COMPONENT_SUPPORT,
IArchive,
IArchiveSet,
@@ -1948,6 +1950,53 @@
IStore(ArchiveAuthToken).add(archive_auth_token)
return archive_auth_token
+ def newNamedAuthToken(self, name, token=None, date_created=None):
+ """See `IArchive`."""
+
+ # Bail if the archive isn't private
+ if not self.private:
+ raise ArchiveNotPrivate("Archive must be private.")
+
+ if self.getNamedAuthToken(name) is not None:
+ raise DuplicateTokenName(
+ "An active token with name %s for archive %s alread exists." %
+ (name, self.displayname))
+
+ # Now onto the actual token creation:
+ if token is None:
+ token = create_token(20)
+ archive_auth_token = ArchiveAuthToken()
+ archive_auth_token.archive = self
+ archive_auth_token.name = name
+ archive_auth_token.token = token
+ if date_created is not None:
+ archive_auth_token.date_created = date_created
+ IStore(ArchiveAuthToken).add(archive_auth_token)
+ return archive_auth_token.as_dict()
+
+ def getNamedAuthToken(self, name):
+ """See `IArchive`."""
+ token_set = getUtility(IArchiveAuthTokenSet)
+ archive_auth_token = token_set.getActiveNamedTokenForArchive(self, name)
+ if archive_auth_token is not None:
+ return archive_auth_token.as_dict()
+
+ def getNamedAuthTokens(self):
+ """See `IArchive`."""
+ token_set = getUtility(IArchiveAuthTokenSet)
+ archive_auth_tokens = token_set.getActiveNamedTokensForArchive(self)
+ return [archive_auth_token.as_dict()
+ for archive_auth_token in archive_auth_tokens]
+
+ def revokeNamedAuthToken(self, name):
+ """See `IArchive`."""
+ token_set = getUtility(IArchiveAuthTokenSet)
+ archive_auth_token = token_set.getActiveNamedTokenForArchive(self, name)
+ if archive_auth_token is not None:
+ archive_auth_token.deactivate()
+ else:
+ raise NotFoundError(name)
+
def newSubscription(self, subscriber, registrant, date_expires=None,
description=None):
"""See `IArchive`."""
=== modified file 'lib/lp/soyuz/model/archiveauthtoken.py'
--- lib/lp/soyuz/model/archiveauthtoken.py 2015-10-21 09:37:08 +0000
+++ lib/lp/soyuz/model/archiveauthtoken.py 2016-07-07 14:30:49 +0000
@@ -39,7 +39,7 @@
archive_id = Int(name='archive', allow_none=False)
archive = Reference(archive_id, 'Archive.id')
- person_id = Int(name='person', allow_none=False)
+ person_id = Int(name='person', allow_none=True)
person = Reference(person_id, 'Person.id')
date_created = DateTime(
@@ -50,6 +50,8 @@
token = Unicode(name='token', allow_none=False)
+ name = Unicode(name='name', allow_none=True)
+
def deactivate(self):
"""See `IArchiveAuthTokenSet`."""
self.date_deactivated = UTC_NOW
@@ -59,9 +61,12 @@
"""Return a custom archive url for basic authentication."""
normal_url = URI(self.archive.archive_url)
auth_url = normal_url.replace(
- userinfo="%s:%s" % (self.person.name, self.token))
+ userinfo="%s:%s" % (self.name or self.person.name, self.token))
return str(auth_url)
+ def as_dict(self):
+ return {"token": self.token, "archive_url": self.archive_url}
+
@implementer(IArchiveAuthTokenSet)
class ArchiveAuthTokenSet:
@@ -88,8 +93,18 @@
def getActiveTokenForArchiveAndPerson(self, archive, person):
"""See `IArchiveAuthTokenSet`."""
store = Store.of(archive)
- return store.find(
- ArchiveAuthToken,
- ArchiveAuthToken.archive == archive,
- ArchiveAuthToken.person == person,
- ArchiveAuthToken.date_deactivated == None).one()
+ return self.getByArchive(archive).find(
+ ArchiveAuthToken.person == person).one()
+
+ def getActiveNamedTokenForArchive(self, archive, name):
+ """See `IArchiveAuthTokenSet`."""
+ store = Store.of(archive)
+ return self.getByArchive(archive).find(
+ ArchiveAuthToken.name == name).one()
+
+ def getActiveNamedTokensForArchive(self, archive):
+ """See `IArchiveAuthTokenSet`."""
+ store = Store.of(archive)
+ return self.getByArchive(archive).find(
+ ArchiveAuthToken.name != None)
+
=== modified file 'lib/lp/soyuz/templates/person-archive-subscription.pt'
--- lib/lp/soyuz/templates/person-archive-subscription.pt 2012-03-01 18:17:56 +0000
+++ lib/lp/soyuz/templates/person-archive-subscription.pt 2016-07-07 14:30:49 +0000
@@ -50,7 +50,7 @@
<div id="regenerate_token" class="portlet" style="clear:both">
<h2>Reset password</h2>
<p>If you believe the security of your password for this access
- has been compromised, you reset your password. After you've
+ has been compromised, you should reset your password. After you've
requested a new password, you'll see new "sources.list" entries
on this page. You'll need to update them on your computer.
</p>
=== modified file 'lib/lp/soyuz/tests/test_archive.py'
--- lib/lp/soyuz/tests/test_archive.py 2016-04-07 00:04:42 +0000
+++ lib/lp/soyuz/tests/test_archive.py 2016-07-07 14:30:49 +0000
@@ -65,11 +65,13 @@
from lp.soyuz.interfaces.archive import (
ArchiveDependencyError,
ArchiveDisabled,
+ ArchiveNotPrivate,
CannotCopy,
CannotModifyArchiveProcessor,
CannotUploadToPocket,
CannotUploadToPPA,
CannotUploadToSeries,
+ DuplicateTokenName,
IArchiveSet,
InsufficientUploadRights,
InvalidPocketForPartnerArchive,
@@ -80,6 +82,8 @@
RedirectedPocket,
VersionRequiresName,
)
+
+from lp.soyuz.interfaces.archiveauthtoken import IArchiveAuthTokenSet
from lp.soyuz.interfaces.archivepermission import IArchivePermissionSet
from lp.soyuz.interfaces.binarypackagebuild import BuildSetStatus
from lp.soyuz.interfaces.binarypackagename import IBinaryPackageNameSet
@@ -1257,11 +1261,11 @@
self.private_ppa.newSubscription(self.joe, owner)
def test_getAuthToken_with_no_token(self):
- token = self.private_ppa.getAuthToken(self.joe)
- self.assertEqual(token, None)
+ self.assertIsNone(self.private_ppa.getAuthToken(self.joe))
def test_getAuthToken_with_token(self):
token = self.private_ppa.newAuthToken(self.joe)
+ self.assertIsNone(token.name)
self.assertEqual(self.private_ppa.getAuthToken(self.joe), token)
def test_getArchiveSubscriptionURL(self):
@@ -1269,6 +1273,63 @@
token = self.private_ppa.getAuthToken(self.joe)
self.assertEqual(token.archive_url, url)
+ def test_newNamedAuthToken_private_archive(self):
+ result = self.private_ppa.newNamedAuthToken(u"tokenname", u"somesecret")
+ token = getUtility(IArchiveAuthTokenSet).getActiveNamedTokenForArchive(
+ self.private_ppa, u"tokenname")
+ self.assertIsNone(token.person)
+ self.assertEqual("tokenname", token.name)
+ self.assertEqual("somesecret", token.token)
+ self.assertIn("//%s:%s@" % (token.name, token.token), token.archive_url)
+ self.assertDictEqual(
+ {"token": token.token, "archive_url": token.archive_url},
+ result
+ )
+
+ def test_newNamedAuthToken_public_archive(self):
+ public_ppa = self.factory.makeArchive(private=False)
+ self.assertRaises(ArchiveNotPrivate,
+ public_ppa.newNamedAuthToken, u"tokenname", u"somesecret")
+
+ def test_newNamedAuthToken_duplicate_name(self):
+ self.private_ppa.newNamedAuthToken(u"tokenname", u"somesecret1")
+ self.assertRaises(DuplicateTokenName,
+ self.private_ppa.newNamedAuthToken, u"tokenname", u"somesecret2")
+
+ def test_getNamedAuthToken_with_no_token(self):
+ self.assertIsNone(self.private_ppa.getNamedAuthToken(u"tokenname"))
+
+ def test_getNamedAuthToken_with_token(self):
+ result = self.private_ppa.newNamedAuthToken(u"tokenname", u"somesecret")
+ self.assertEqual(
+ result,
+ self.private_ppa.getNamedAuthToken(u"tokenname"))
+
+ def test_revokeNamedAuthToken_with_token(self):
+ self.private_ppa.newNamedAuthToken(u"tokenname", u"somesecret")
+ token = getUtility(IArchiveAuthTokenSet).getActiveNamedTokenForArchive(
+ self.private_ppa, u"tokenname")
+ self.private_ppa.revokeNamedAuthToken(u"tokenname")
+ self.assertIsNotNone(token.date_deactivated)
+
+ def test_revokeNamedAuthToken_with_no_token(self):
+ self.assertRaises(
+ NotFoundError, self.private_ppa.revokeNamedAuthToken, u"tokenname")
+
+ def test_getNamedAuthToken_with_revoked_token(self):
+ self.private_ppa.newNamedAuthToken(u"tokenname", u"somesecret")
+ self.private_ppa.revokeNamedAuthToken(u"tokenname")
+ self.assertIsNone(self.private_ppa.getNamedAuthToken(u"tokenname"))
+
+ def test_getNamedAuthTokens(self):
+ res1 = self.private_ppa.newNamedAuthToken(u"tokenname1", u"somesecret1")
+ res2 = self.private_ppa.newNamedAuthToken(u"tokenname2", u"somesecret2")
+ self.private_ppa.newNamedAuthToken(u"tokenname3", u"somesecret3")
+ self.private_ppa.revokeNamedAuthToken(u"tokenname3")
+ self.assertContentEqual(
+ [res1, res2],
+ self.private_ppa.getNamedAuthTokens())
+
class TestGetBinaryPackageRelease(TestCaseWithFactory):
"""Ensure that getBinaryPackageRelease works as expected."""
@@ -3505,9 +3566,6 @@
layer = LaunchpadFunctionalLayer
- def assertDictEqual(self, one, two):
- self.assertContentEqual(one.items(), two.items())
-
def test_cprov_build_counters_in_sampledata(self):
cprov_archive = getUtility(IPersonSet).getByName("cprov").archive
expected_counters = {
=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py 2015-11-08 01:05:24 +0000
+++ lib/lp/testing/__init__.py 2016-07-07 14:30:49 +0000
@@ -619,7 +619,7 @@
self.assertIsNot(
None, pattern.search(normalise_whitespace(text)), text)
- def assertIsInstance(self, instance, assert_class):
+ def assertIsInstance(self, instance, assert_class, msg=None):
"""Assert that an instance is an instance of assert_class.
instance and assert_class have the same semantics as the parameters
Follow ups