launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20258
[Merge] lp:~wgrant/launchpad/invisible-person-api into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/invisible-person-api into lp:launchpad with lp:~wgrant/launchpad/invisible-person as a prerequisite.
Commit message:
Add getUsernameForSSO and setUsernameFromSSO API methods.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/invisible-person-api/+merge/292939
Add getUsernameForSSO and setUsernameFromSSO API methods.
SSO will soon allow creation of usernames for users without Launchpad accounts. This is implemented with invisible Person rows with Account.status == PLACEHOLDER.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/invisible-person-api into lp:launchpad.
=== modified file 'lib/lp/registry/browser/tests/test_person_webservice.py'
--- lib/lp/registry/browser/tests/test_person_webservice.py 2016-03-21 05:37:40 +0000
+++ lib/lp/registry/browser/tests/test_person_webservice.py 2016-04-26 13:09:52 +0000
@@ -13,7 +13,10 @@
TeamMembershipStatus,
)
from lp.registry.interfaces.teammembership import ITeamMembershipSet
-from lp.services.identity.interfaces.account import AccountStatus
+from lp.services.identity.interfaces.account import (
+ AccountStatus,
+ IAccountSet,
+ )
from lp.services.openid.model.openididentifier import OpenIdIdentifier
from lp.services.webapp import snapshot
from lp.services.webapp.interfaces import OAuthPermission
@@ -367,3 +370,114 @@
sca = getUtility(IPersonSet).getByName('software-center-agent')
response = self.getOrCreateSoftwareCenterCustomer(sca)
self.assertEqual(400, response.status)
+
+ def test_getUsernameForSSO(self):
+ # canonical-identity-provider (SSO) can get the username for an
+ # OpenID identifier suffix.
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ existing = self.factory.makePerson(name='username')
+ taken_openid = (
+ existing.account.openid_identifiers.any().identifier)
+ webservice = webservice_for_person(
+ sso, permission=OAuthPermission.READ_PUBLIC)
+ response = webservice.named_get(
+ '/people', 'getUsernameForSSO',
+ openid_identifier=taken_openid, api_version='devel')
+ self.assertEqual(200, response.status)
+ self.assertEqual('username', response.jsonBody())
+
+ def test_getUsernameForSSO_nonexistent(self):
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ webservice = webservice_for_person(
+ sso, permission=OAuthPermission.READ_PUBLIC)
+ response = webservice.named_get(
+ '/people', 'getUsernameForSSO',
+ openid_identifier='doesnotexist', api_version='devel')
+ self.assertEqual(200, response.status)
+ self.assertEqual(None, response.jsonBody())
+
+ def setUsernameFromSSO(self, user, openid_identifier, name,
+ dry_run=False):
+ webservice = webservice_for_person(
+ user, permission=OAuthPermission.WRITE_PRIVATE)
+ response = webservice.named_post(
+ '/people', 'setUsernameFromSSO',
+ openid_identifier=openid_identifier, name=name, dry_run=dry_run,
+ api_version='devel')
+ return response
+
+ def test_setUsernameFromSSO(self):
+ # canonical-identity-provider (SSO) can create a placeholder
+ # Person to give a username to a non-LP user.
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ response = self.setUsernameFromSSO(sso, 'foo', 'bar')
+ self.assertEqual(200, response.status)
+ with admin_logged_in():
+ by_name = getUtility(IPersonSet).getByName('bar')
+ by_openid = getUtility(IPersonSet).getByOpenIDIdentifier(
+ u'http://testopenid.dev/+id/foo')
+ self.assertEqual(by_name, by_openid)
+ self.assertEqual(
+ AccountStatus.PLACEHOLDER, by_name.account_status)
+
+ def test_setUsernameFromSSO_dry_run(self):
+ # setUsernameFromSSO provides a dry run mode that performs all
+ # the checks but doesn't actually make changes. Useful for input
+ # validation in SSO.
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ response = self.setUsernameFromSSO(sso, 'foo', 'bar', dry_run=True)
+ self.assertEqual(200, response.status)
+ with admin_logged_in():
+ self.assertIs(None, getUtility(IPersonSet).getByName('bar'))
+ self.assertRaises(
+ LookupError,
+ getUtility(IAccountSet).getByOpenIDIdentifier, u'foo')
+
+ def test_setUsernameFromSSO_is_restricted(self):
+ # The method may only be invoked by the ~ubuntu-sso celebrity
+ # user, as it is security-sensitive.
+ with admin_logged_in():
+ random = self.factory.makePerson()
+ response = self.setUsernameFromSSO(random, 'foo', 'bar')
+ self.assertEqual(401, response.status)
+
+ def test_setUsernameFromSSO_rejects_bad_input(self, dry_run=False):
+ # The method returns meaningful errors on bad input, so SSO can
+ # give advice to users.
+ # Check canonical-identity-provider before changing these!
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ self.factory.makePerson(name='taken-name')
+ existing = self.factory.makePerson()
+ taken_openid = (
+ existing.account.openid_identifiers.any().identifier)
+
+ response = self.setUsernameFromSSO(
+ sso, 'foo', 'taken-name', dry_run=dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ 'name: taken-name is already in use by another person or team.',
+ response.body)
+
+ response = self.setUsernameFromSSO(
+ sso, 'foo', 'private-name', dry_run=dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ 'name: The name 'private-name' has been blocked by the '
+ 'Launchpad administrators. Contact Launchpad Support if you want '
+ 'to use this name.',
+ response.body)
+
+ response = self.setUsernameFromSSO(
+ sso, taken_openid, 'bar', dry_run=dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ 'An account for that OpenID identifier already exists.',
+ response.body)
+
+ def test_setUsernameFromSSO_rejects_bad_input_in_dry_run(self):
+ self.test_setUsernameFromSSO_rejects_bad_input(dry_run=True)
=== modified file 'lib/lp/registry/errors.py'
--- lib/lp/registry/errors.py 2012-11-28 21:32:06 +0000
+++ lib/lp/registry/errors.py 2016-04-26 13:09:52 +0000
@@ -21,6 +21,7 @@
'NameAlreadyTaken',
'NoSuchDistroSeries',
'NoSuchSourcePackageName',
+ 'NotPlaceholderAccount',
'InclusiveTeamLinkageError',
'PPACreationError',
'PrivatePersonLinkageError',
@@ -61,6 +62,11 @@
@error_status(httplib.BAD_REQUEST)
+class NotPlaceholderAccount(Exception):
+ """A non-placeholder account already exists for that OpenID identifier."""
+
+
+@error_status(httplib.BAD_REQUEST)
class InvalidFilename(Exception):
"""An invalid filename was used as an attachment filename."""
=== modified file 'lib/lp/registry/interfaces/person.py'
--- lib/lp/registry/interfaces/person.py 2016-04-26 13:09:52 +0000
+++ lib/lp/registry/interfaces/person.py 2016-04-26 13:09:52 +0000
@@ -2223,6 +2223,51 @@
:param full_name: the full name of the user.
"""
+ @call_with(user=REQUEST_USER)
+ @operation_parameters(
+ openid_identifier=TextLine(
+ title=_("OpenID identifier suffix"), required=True))
+ @export_read_operation()
+ @operation_for_version("devel")
+ def getUsernameForSSO(user, openid_identifier):
+ """Restricted person creation API for SSO.
+
+ This method can only be called by the Ubuntu SSO service. It
+ finds the username for an account by OpenID identifier.
+
+ :param user: the `IPerson` performing the operation. Only the
+ ubuntu-sso celebrity is allowed.
+ :param openid_identifier: OpenID identifier suffix for the user.
+ This is *not* the full URL, just the unique suffix portion.
+ """
+
+ @call_with(user=REQUEST_USER)
+ @operation_parameters(
+ openid_identifier=TextLine(
+ title=_("OpenID identifier suffix"), required=True),
+ name=copy_field(IPerson['name']),
+ dry_run=Bool(_("Save changes")))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def setUsernameFromSSO(user, openid_identifier, name, dry_run=False):
+ """Restricted person creation API for SSO.
+
+ This method can only be called by the Ubuntu SSO service. It
+ reserves a username for an account by OpenID identifier, as long as
+ the user has no Launchpad account.
+
+ :param user: the `IPerson` performing the operation. Only the
+ ubuntu-sso celebrity is allowed.
+ :param openid_identifier: OpenID identifier suffix for the user.
+ This is *not* the full URL, just the unique suffix portion.
+ :param name: the desired username.
+ :raises: `InvalidName` if the username doesn't meet character
+ constraints.
+ :raises: `NameAlreadyTaken` if the username is already in use.
+ :raises: `NotPlaceholderAccount` if the OpenID identifier has a
+ non-placeholder Launchpad account.
+ """
+
@call_with(teamowner=REQUEST_USER)
@rename_parameters_as(
teamdescription='team_description',
=== modified file 'lib/lp/registry/model/person.py'
--- lib/lp/registry/model/person.py 2016-04-26 13:09:52 +0000
+++ lib/lp/registry/model/person.py 2016-04-26 13:09:52 +0000
@@ -154,6 +154,7 @@
InvalidName,
JoinNotAllowed,
NameAlreadyTaken,
+ NotPlaceholderAccount,
PPACreationError,
TeamMembershipPolicyError,
)
@@ -3429,6 +3430,38 @@
trust_email=False)
return person
+ def getUsernameForSSO(self, user, openid_identifier):
+ """See `IPersonSet`."""
+ if user != getUtility(ILaunchpadCelebrities).ubuntu_sso:
+ raise Unauthorized()
+ try:
+ account = getUtility(IAccountSet).getByOpenIDIdentifier(
+ openid_identifier)
+ except LookupError:
+ return None
+ return IPerson(account).name
+
+ def setUsernameFromSSO(self, user, openid_identifier, name,
+ dry_run=False):
+ """See `IPersonSet`."""
+ if user != getUtility(ILaunchpadCelebrities).ubuntu_sso:
+ raise Unauthorized()
+ self._validateName(name)
+ try:
+ account = getUtility(IAccountSet).getByOpenIDIdentifier(
+ openid_identifier)
+ except LookupError:
+ if not dry_run:
+ person = self.createPlaceholderPerson(openid_identifier, name)
+ else:
+ if account.status != AccountStatus.PLACEHOLDER:
+ raise NotPlaceholderAccount(
+ "An account for that OpenID identifier already exists.")
+ if not dry_run:
+ account = removeSecurityProxy(account)
+ person = IPerson(account)
+ person.name = person.display_name = account.displayname = name
+
def newTeam(self, teamowner, name, display_name, teamdescription=None,
membership_policy=TeamMembershipPolicy.MODERATED,
defaultmembershipperiod=None, defaultrenewalperiod=None,
@@ -3507,9 +3540,7 @@
rationale=PersonCreationRationale.USERNAME_PLACEHOLDER,
comment="when setting a username in SSO", account=account)
- def _newPerson(self, name, displayname, hide_email_addresses,
- rationale, comment=None, registrant=None, account=None):
- """Create and return a new Person with the given attributes."""
+ def _validateName(self, name):
if not valid_name(name):
raise InvalidName(
"%s is not a valid name for a person." % name)
@@ -3520,6 +3551,11 @@
raise NameAlreadyTaken(
"The name '%s' is already taken." % name)
+ def _newPerson(self, name, displayname, hide_email_addresses,
+ rationale, comment=None, registrant=None, account=None):
+ """Create and return a new Person with the given attributes."""
+ self._validateName(name)
+
if not displayname:
displayname = name.capitalize()
=== modified file 'lib/lp/registry/tests/test_personset.py'
--- lib/lp/registry/tests/test_personset.py 2016-04-26 13:09:52 +0000
+++ lib/lp/registry/tests/test_personset.py 2016-04-26 13:09:52 +0000
@@ -17,6 +17,7 @@
from lp.code.tests.helpers import remove_all_sample_data_branches
from lp.registry.errors import (
+ NotPlaceholderAccount,
InvalidName,
NameAlreadyTaken,
)
@@ -41,6 +42,7 @@
AccountCreationRationale,
AccountStatus,
AccountSuspendedError,
+ IAccountSet,
)
from lp.services.identity.interfaces.emailaddress import (
EmailAddressAlreadyTaken,
@@ -67,6 +69,13 @@
from lp.testing.matchers import HasQueryCount
+def make_openid_identifier(account, identifier):
+ openid_identifier = OpenIdIdentifier()
+ openid_identifier.identifier = identifier
+ openid_identifier.account = account
+ return IStore(OpenIdIdentifier).add(openid_identifier)
+
+
class TestPersonSet(TestCaseWithFactory):
"""Test `IPersonSet`."""
layer = DatabaseFunctionalLayer
@@ -248,7 +257,7 @@
# Generate some valid test data.
self.account = self.makeAccount()
- self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
+ self.identifier = make_openid_identifier(self.account, u'whatever')
self.person = self.makePerson(self.account)
self.email = self.makeEmailAddress(
email='whatever@xxxxxxxxxxx', person=self.person)
@@ -259,12 +268,6 @@
creation_rationale=AccountCreationRationale.UNKNOWN,
status=AccountStatus.ACTIVE))
- def makeOpenIdIdentifier(self, account, identifier):
- openid_identifier = OpenIdIdentifier()
- openid_identifier.identifier = identifier
- openid_identifier.account = account
- return self.store.add(openid_identifier)
-
def makePerson(self, account):
return self.store.add(Person(
name='acc%d' % account.id, account=account,
@@ -627,12 +630,6 @@
super(TestPersonSetGetOrCreateSoftwareCenterCustomer, self).setUp()
self.sca = getUtility(IPersonSet).getByName('software-center-agent')
- def makeOpenIdIdentifier(self, account, identifier):
- openid_identifier = OpenIdIdentifier()
- openid_identifier.identifier = identifier
- openid_identifier.account = account
- return IStore(OpenIdIdentifier).add(openid_identifier)
-
def test_restricted_to_sca(self):
# Only the software-center-agent celebrity can invoke this
# privileged method.
@@ -659,7 +656,7 @@
def test_finds_by_openid(self):
# A Person with the requested OpenID identifier is returned.
somebody = self.factory.makePerson()
- self.makeOpenIdIdentifier(somebody.account, u'somebody')
+ make_openid_identifier(somebody.account, u'somebody')
with person_logged_in(self.sca):
got = getUtility(IPersonSet).getOrCreateSoftwareCenterCustomer(
self.sca, u'somebody', 'somebody@xxxxxxxxxxx', 'Example')
@@ -692,7 +689,7 @@
somebody = self.factory.makePerson(
email='existing@xxxxxxxxxxx',
account_status=AccountStatus.NOACCOUNT)
- self.makeOpenIdIdentifier(somebody.account, u'somebody')
+ make_openid_identifier(somebody.account, u'somebody')
self.assertEqual(AccountStatus.NOACCOUNT, somebody.account.status)
with person_logged_in(self.sca):
got = getUtility(IPersonSet).getOrCreateSoftwareCenterCustomer(
@@ -725,7 +722,7 @@
def test_fails_if_account_is_suspended(self):
# Suspended accounts cannot be returned.
somebody = self.factory.makePerson()
- self.makeOpenIdIdentifier(somebody.account, u'somebody')
+ make_openid_identifier(somebody.account, u'somebody')
with admin_logged_in():
somebody.setAccountStatus(
AccountStatus.SUSPENDED, None, "Go away!")
@@ -740,7 +737,7 @@
# nor do we want to potentially compromise them with a bad email
# address.
somebody = self.factory.makePerson()
- self.makeOpenIdIdentifier(somebody.account, u'somebody')
+ make_openid_identifier(somebody.account, u'somebody')
with admin_logged_in():
somebody.setAccountStatus(
AccountStatus.DEACTIVATED, None, "Goodbye cruel world.")
@@ -749,3 +746,149 @@
NameAlreadyTaken,
getUtility(IPersonSet).getOrCreateSoftwareCenterCustomer,
self.sca, u'somebody', 'somebody@xxxxxxxxxxx', 'Example')
+
+
+class TestPersonGetUsernameForSSO(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ super(TestPersonGetUsernameForSSO, self).setUp()
+ self.sso = getUtility(IPersonSet).getByName(u'ubuntu-sso')
+
+ def test_restricted_to_sca(self):
+ # Only the ubuntu-sso celebrity can invoke this
+ # privileged method.
+ target = self.factory.makePerson(name='username')
+ make_openid_identifier(target.account, u'openid')
+
+ def do_it():
+ return getUtility(IPersonSet).getUsernameForSSO(
+ getUtility(ILaunchBag).user, u'openid')
+ random = self.factory.makePerson()
+ admin = self.factory.makePerson(
+ member_of=[getUtility(IPersonSet).getByName(u'admins')])
+
+ # Anonymous, random or admin users can't invoke the method.
+ with anonymous_logged_in():
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(random):
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(admin):
+ self.assertRaises(Unauthorized, do_it)
+
+ with person_logged_in(self.sso):
+ self.assertEqual('username', do_it())
+
+
+class TestPersonSetUsernameFromSSO(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ super(TestPersonSetUsernameFromSSO, self).setUp()
+ self.sso = getUtility(IPersonSet).getByName(u'ubuntu-sso')
+
+ def test_restricted_to_sca(self):
+ # Only the ubuntu-sso celebrity can invoke this
+ # privileged method.
+ def do_it():
+ getUtility(IPersonSet).setUsernameFromSSO(
+ getUtility(ILaunchBag).user, u'openid', u'username')
+ random = self.factory.makePerson()
+ admin = self.factory.makePerson(
+ member_of=[getUtility(IPersonSet).getByName(u'admins')])
+
+ # Anonymous, random or admin users can't invoke the method.
+ with anonymous_logged_in():
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(random):
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(admin):
+ self.assertRaises(Unauthorized, do_it)
+
+ with person_logged_in(self.sso):
+ do_it()
+
+ def test_creates_new_placeholder(self):
+ # If an unknown OpenID identifier and email address are
+ # provided, a new account is created with the given username and
+ # returned.
+ with person_logged_in(self.sso):
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'username')
+ person = getUtility(IPersonSet).getByName(u'username')
+ self.assertEqual(u'username', person.name)
+ self.assertEqual(u'username', person.displayname)
+ self.assertEqual(AccountStatus.PLACEHOLDER, person.account.status)
+ with admin_logged_in():
+ self.assertContentEqual(
+ [u'openid'],
+ [oid.identifier for oid in person.account.openid_identifiers])
+ self.assertContentEqual([], person.validatedemails)
+ self.assertContentEqual([], person.guessedemails)
+
+ def test_creates_new_placeholder_dry_run(self):
+ with person_logged_in(self.sso):
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'username', dry_run=True)
+ self.assertRaises(
+ LookupError,
+ getUtility(IAccountSet).getByOpenIDIdentifier, u'openid')
+ self.assertIs(None, getUtility(IPersonSet).getByName(u'username'))
+
+ def test_updates_existing_placeholder(self):
+ # An existing placeholder Person with the request OpenID
+ # identifier has its name updated.
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'username')
+ person = getUtility(IPersonSet).getByName(u'username')
+
+ # Another call for the same OpenID identifier updates the
+ # existing Person.
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'newsername')
+ self.assertEqual(u'newsername', person.name)
+ self.assertEqual(u'newsername', person.displayname)
+ self.assertEqual(AccountStatus.PLACEHOLDER, person.account.status)
+ with admin_logged_in():
+ self.assertContentEqual([], person.validatedemails)
+ self.assertContentEqual([], person.guessedemails)
+
+ def test_updates_existing_placeholder_dry_run(self):
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'username')
+ person = getUtility(IPersonSet).getByName(u'username')
+
+ getUtility(IPersonSet).setUsernameFromSSO(
+ self.sso, u'openid', u'newsername', dry_run=True)
+ self.assertEqual(u'username', person.name)
+
+ def test_validation(self, dry_run=False):
+ # An invalid username is rejected with an InvalidName exception.
+ self.assertRaises(
+ InvalidName,
+ getUtility(IPersonSet).setUsernameFromSSO,
+ self.sso, u'openid', u'username!!', dry_run=dry_run)
+ transaction.abort()
+
+ # A username that's already in use is rejected with a
+ # NameAlreadyTaken exception.
+ self.factory.makePerson(name='taken')
+ self.assertRaises(
+ NameAlreadyTaken,
+ getUtility(IPersonSet).setUsernameFromSSO,
+ self.sso, u'openid', u'taken', dry_run=dry_run)
+ transaction.abort()
+
+ # setUsernameFromSSO can't be used to set an OpenID
+ # identifier's username if a non-placeholder account exists.
+ somebody = self.factory.makePerson()
+ make_openid_identifier(somebody.account, u'openid-taken')
+ self.assertRaises(
+ NotPlaceholderAccount,
+ getUtility(IPersonSet).setUsernameFromSSO,
+ self.sso, u'openid-taken', u'username', dry_run=dry_run)
+
+ def test_validation_dry_run(self):
+ self.test_validation(dry_run=True)
Follow ups