launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20491
[Merge] lp:~thomir/launchpad/devel-add-ssh-delete-key-api into lp:launchpad
Thomi Richards has proposed merging lp:~thomir/launchpad/devel-add-ssh-delete-key-api into lp:launchpad with lp:~thomir/launchpad/devel-add-ssh-add-key-api as a prerequisite.
Commit message:
Add restricted API method to delete an SSH key for a person.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~thomir/launchpad/devel-add-ssh-delete-key-api/+merge/295665
Add restricted API method to delete an SSH key for a person.
One thing I'm not happy about in this branch is the fact that the exception raised when deleting an SSH key if the key is invalid is called 'SSHKeyAdditionError' - it was a poor name to begin with, it should just be 'InvalidSSHKey' or similar. I could change it everywhere in the launchpad codebase, but I'm not sure it's worth the change. Thoughts?
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~thomir/launchpad/devel-add-ssh-delete-key-api into lp:launchpad.
=== modified file 'lib/lp/registry/browser/tests/test_person_webservice.py'
--- lib/lp/registry/browser/tests/test_person_webservice.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/browser/tests/test_person_webservice.py 2016-05-25 05:33:39 +0000
@@ -613,3 +613,92 @@
openid_identifier=openid_id, key_text='ssh-rsa foo bar',
dry_run=False, api_version='devel')
self.assertEqual(401, response.status)
+
+ def deleteSSHKeyForPersonFromSSO(self, openid_identifier, key_text,
+ dry_run=False):
+ with admin_logged_in():
+ sso = getUtility(IPersonSet).getByName('ubuntu-sso')
+ webservice = webservice_for_person(
+ sso, permission=OAuthPermission.WRITE_PRIVATE)
+ return webservice.named_post(
+ '/people', 'deleteSSHKeyForPersonFromSSO',
+ openid_identifier=openid_identifier, key_text=key_text,
+ dry_run=dry_run, api_version='devel')
+
+ def test_deleteSSHKeyForPersonFromSSO_nonexistant(self, dry_run=False):
+ response = self.deleteSSHKeyForPersonFromSSO(
+ 'doesnotexist', 'sdf', dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ "No account found for openid identifier 'doesnotexist'",
+ response.body)
+
+ def test_deleteSSHKeyForPersonFromSSO_nonexistant_dry_run(self):
+ self.test_deleteSSHKeyForPersonFromSSO_nonexistant(True)
+
+ def test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_data(self,
+ dry_run=False):
+ with admin_logged_in():
+ person = self.factory.makePerson()
+ openid_id = person.account.openid_identifiers.any().identifier
+ response = self.deleteSSHKeyForPersonFromSSO(
+ openid_id, 'bad_data', dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ "Invalid SSH key data: 'bad_data'",
+ response.body)
+
+ def test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_data_dry_run(self):
+ self.test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_data(True)
+
+ def test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_type(self,
+ dry_run=False):
+ with admin_logged_in():
+ person = self.factory.makePerson()
+ openid_id = person.account.openid_identifiers.any().identifier
+ response = self.deleteSSHKeyForPersonFromSSO(
+ openid_id, 'foo keydata comment', dry_run)
+ self.assertEqual(400, response.status)
+ self.assertEqual(
+ "Invalid SSH key type: 'foo'",
+ response.body)
+
+ def test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_type_dry_run(self):
+ self.test_deleteSSHKeyForPersonFromSSO_rejects_bad_key_type(True)
+
+ def test_deleteSSHKeyForPersonFromSSO_works(self):
+ with admin_logged_in():
+ person = removeSecurityProxy(self.factory.makePerson())
+ key = self.factory.makeSSHKey(person)
+ openid_id = person.account.openid_identifiers.any().identifier
+ response = self.deleteSSHKeyForPersonFromSSO(
+ openid_id, key.getFullKeyText())
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(0, person.sshkeys.count())
+
+ def test_deleteSSHKeyForPersonFromSSO_works_dry_run(self):
+ with admin_logged_in():
+ person = removeSecurityProxy(self.factory.makePerson())
+ key = self.factory.makeSSHKey(person)
+ openid_id = person.account.openid_identifiers.any().identifier
+ response = self.deleteSSHKeyForPersonFromSSO(
+ openid_id, key.getFullKeyText())
+
+ self.assertEqual(200, response.status)
+ self.assertEqual(1, person.sshkeys.count())
+
+ def test_deleteSSHKeyForPersonFromSSO_is_restricted(self, dry_run=False):
+ with admin_logged_in():
+ target = self.factory.makePerson()
+ openid_id = target.account.openid_identifiers.any().identifier
+ webservice = webservice_for_person(
+ target, permission=OAuthPermission.WRITE_PRIVATE)
+ response = webservice.named_post(
+ '/people', 'deleteSSHKeyForPersonFromSSO',
+ openid_identifier=openid_id, key_text='ssh-rsa foo bar',
+ dry_run=dry_run, api_version='devel')
+ self.assertEqual(401, response.status)
+
+ def test_deleteSSHKeyForPersonFromSSO_is_restricted_dry_run(self):
+ self.test_deleteSSHKeyForPersonFromSSO_is_restricted(True)
=== modified file 'lib/lp/registry/interfaces/person.py'
--- lib/lp/registry/interfaces/person.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/interfaces/person.py 2016-05-25 05:33:39 +0000
@@ -2316,6 +2316,33 @@
compromised key.
"""
+ @call_with(user=REQUEST_USER)
+ @operation_parameters(
+ openid_identifier=TextLine(
+ title=_("OpenID identifier suffix"), required=True),
+ key_text=TextLine(
+ title=_("SSH key text"), required=True),
+ dry_run=Bool(_("Don't save changes")))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def deleteSSHKeyForPersonFromSSO(user, openid_identifier, key_text,
+ dry_run=False):
+ """Restricted SSH key deletion API for SSO.
+
+ This method can only be called by the Ubuntu SSO service. It deletes an
+ SSH key from the account identified by 'openid_identifier' based on the
+ 'key_text' parameter.
+
+ :param user: the `IPerson` performing the operation. Only the
+ ubuntu-sso celebrity is allowed.
+ :param openid_identifier: OpenID identifier suffix for the user.
+ This is *not* the full URL, just the unique suffix portion.
+ :param key_text: The full text of the SSH Key.
+ :raises NoSuchAccount: If the openid_identifier specified does not
+ match any account.
+ :raises KeyAdditionError: If the key text is invalid.
+ """
+
@call_with(teamowner=REQUEST_USER)
@rename_parameters_as(
teamdescription='team_description',
=== modified file 'lib/lp/registry/interfaces/ssh.py'
--- lib/lp/registry/interfaces/ssh.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/interfaces/ssh.py 2016-05-25 05:33:39 +0000
@@ -9,6 +9,7 @@
'ISSHKey',
'ISSHKeySet',
'SSH_KEY_TYPE_TO_TEXT',
+ 'SSH_TEXT_TO_KEY_TYPE',
'SSHKeyAdditionError',
'SSHKeyCompromisedError',
'SSHKeyType',
@@ -60,6 +61,9 @@
}
+SSH_TEXT_TO_KEY_TYPE = {v: k for k, v in SSH_KEY_TYPE_TO_TEXT.items()}
+
+
class ISSHKey(Interface):
"""SSH public key"""
@@ -106,6 +110,14 @@
def getByPeople(people):
"""Return SSHKey object associated to the people provided."""
+ def getByPersonAndKeyText(person, sshkey):
+ """Get an SSH key for a person with a specific key text.
+
+ :param person: The person who owns the key.
+ :param sshkey: The full ssh key text.
+ :raises SSHKeyAdditionError: If 'sshkey' is invalid.
+ """
+
@error_status(httplib.BAD_REQUEST)
class SSHKeyAdditionError(Exception):
=== modified file 'lib/lp/registry/model/person.py'
--- lib/lp/registry/model/person.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/model/person.py 2016-05-25 05:33:39 +0000
@@ -209,6 +209,7 @@
SSHKeyCompromisedError,
SSHKeyType,
SSH_KEY_TYPE_TO_TEXT,
+ SSH_TEXT_TO_KEY_TYPE,
)
from lp.registry.interfaces.teammembership import (
IJoinTeamEvent,
@@ -3484,6 +3485,26 @@
getUtility(ISSHKeySet).new(
IPerson(account), key_text, False, dry_run=dry_run)
+ def deleteSSHKeyForPersonFromSSO(self, user, openid_identifier, key_text,
+ dry_run=False):
+ """See `IPersonSet`."""
+ if user != getUtility(ILaunchpadCelebrities).ubuntu_sso:
+ raise Unauthorized()
+ try:
+ account = getUtility(IAccountSet).getByOpenIDIdentifier(
+ openid_identifier)
+ except LookupError:
+ raise NoSuchAccount("No account found for openid identifier '%s'"
+ % openid_identifier)
+ keys = getUtility(ISSHKeySet).getByPersonAndKeyText(
+ IPerson(account),
+ key_text)
+ if not dry_run:
+ # ISSHKeySet does not restrict the same SSH key being added
+ # multiple times, so make sure we delte them all:
+ for key in keys:
+ key.destroySelf()
+
def newTeam(self, teamowner, name, display_name, teamdescription=None,
membership_policy=TeamMembershipPolicy.MODERATED,
defaultmembershipperiod=None, defaultrenewalperiod=None,
@@ -4074,13 +4095,7 @@
class SSHKeySet:
def new(self, person, sshkey, send_notification=True, dry_run=False):
- try:
- kind, keytext, comment = sshkey.split(' ', 2)
- except (ValueError, AttributeError):
- raise SSHKeyAdditionError("Invalid SSH key data: '%s'" % sshkey)
-
- if not (kind and keytext and comment):
- raise SSHKeyAdditionError("Invalid SSH key data: '%s'" % sshkey)
+ keytype, keytext, comment = self._extract_ssh_key_components(sshkey)
process = subprocess.Popen(
'/usr/bin/ssh-vulnkey -', shell=True, stdin=subprocess.PIPE,
@@ -4090,13 +4105,6 @@
raise SSHKeyCompromisedError(
"This key cannot be added as it is known to be compromised.")
- if kind == 'ssh-rsa':
- keytype = SSHKeyType.RSA
- elif kind == 'ssh-dss':
- keytype = SSHKeyType.DSA
- else:
- raise SSHKeyAdditionError("Invalid SSH key type: '%s'" % kind)
-
if send_notification:
person.security_field_changed(
"New SSH key added to your account.",
@@ -4118,6 +4126,27 @@
SSHKey.person IN %s
""" % sqlvalues([person.id for person in people]))
+ def getByPersonAndKeyText(self, person, sshkey):
+ keytype, keytext, comment = self._extract_ssh_key_components(sshkey)
+ return IStore(SSHKey).find(
+ SSHKey,
+ person=person, keytype=keytype, keytext=keytext, comment=comment)
+
+ def _extract_ssh_key_components(self, sshkey):
+ try:
+ kind, keytext, comment = sshkey.split(' ', 2)
+ except (ValueError, AttributeError):
+ raise SSHKeyAdditionError("Invalid SSH key data: '%s'" % sshkey)
+
+ if not (kind and keytext and comment):
+ raise SSHKeyAdditionError("Invalid SSH key data: '%s'" % sshkey)
+
+ keytype = SSH_TEXT_TO_KEY_TYPE.get(kind)
+ if keytype is None:
+ raise SSHKeyAdditionError(
+ "Invalid SSH key type: '%s'" % kind)
+ return keytype, keytext, comment
+
@implementer(IWikiName)
class WikiName(SQLBase, HasOwnerMixin):
=== modified file 'lib/lp/registry/tests/test_personset.py'
--- lib/lp/registry/tests/test_personset.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/tests/test_personset.py 2016-05-25 05:33:39 +0000
@@ -29,7 +29,10 @@
PersonCreationRationale,
TeamEmailAddressError,
)
-from lp.registry.interfaces.ssh import SSHKeyType
+from lp.registry.interfaces.ssh import (
+ SSHKeyAdditionError,
+ SSHKeyType,
+ )
from lp.registry.model.codeofconduct import SignedCodeOfConduct
from lp.registry.model.person import Person
from lp.services.database.interfaces import (
@@ -987,9 +990,86 @@
with person_logged_in(target):
self.assertEqual(0, target.sshkeys.count())
- def test_raises_with_noonexisting_account(self):
+ def test_raises_with_nonexisting_account(self):
with person_logged_in(self.sso):
self.assertRaises(
NoSuchAccount,
getUtility(IPersonSet).addSSHKeyForPersonFromSSO,
self.sso, u'doesnotexist', 'ssh-rsa key comment', True)
+
+
+class TestPersonDeleteSSHKeyFromSSO(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ super(TestPersonDeleteSSHKeyFromSSO, self).setUp()
+ self.sso = getUtility(IPersonSet).getByName(u'ubuntu-sso')
+
+ def test_restricted_to_sso(self):
+ # Only the ubuntu-sso celebrity can invoke this
+ # privileged method.
+ target = self.factory.makePerson(name='username')
+ with person_logged_in(target):
+ key = self.factory.makeSSHKey(target)
+ key_text = key.getFullKeyText()
+ make_openid_identifier(target.account, u'openid')
+
+ def do_it():
+ return getUtility(IPersonSet).deleteSSHKeyForPersonFromSSO(
+ getUtility(ILaunchBag).user, u'openid', key_text, False)
+ random = self.factory.makePerson()
+ admin = self.factory.makePerson(
+ member_of=[getUtility(IPersonSet).getByName(u'admins')])
+
+ # Anonymous, random or admin users can't invoke the method.
+ with anonymous_logged_in():
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(random):
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(admin):
+ self.assertRaises(Unauthorized, do_it)
+ with person_logged_in(self.sso):
+ self.assertEqual(None, do_it())
+
+ def test_deletes_ssh_key(self):
+ target = self.factory.makePerson(name='username')
+ with person_logged_in(target):
+ key = self.factory.makeSSHKey(target)
+ make_openid_identifier(target.account, u'openid')
+
+ with person_logged_in(self.sso):
+ getUtility(IPersonSet).deleteSSHKeyForPersonFromSSO(
+ self.sso, u'openid', key.getFullKeyText(), False)
+
+ with person_logged_in(target):
+ self.assertEqual(0, target.sshkeys.count())
+
+ def test_does_not_delete_ssh_key_with_dry_run(self):
+ target = self.factory.makePerson(name='username')
+ with person_logged_in(target):
+ key = self.factory.makeSSHKey(target)
+ make_openid_identifier(target.account, u'openid')
+
+ with person_logged_in(self.sso):
+ getUtility(IPersonSet).deleteSSHKeyForPersonFromSSO(
+ self.sso, u'openid', key.getFullKeyText(), True)
+
+ with person_logged_in(target):
+ self.assertEqual([key], list(target.sshkeys))
+
+ def test_raises_with_nonexisting_account(self):
+ with person_logged_in(self.sso):
+ self.assertRaises(
+ NoSuchAccount,
+ getUtility(IPersonSet).deleteSSHKeyForPersonFromSSO,
+ self.sso, u'doesnotexist', 'ssh-rsa key comment', False)
+
+ def test_raises_with_bad_key_type(self):
+ target = self.factory.makePerson(name='username')
+ make_openid_identifier(target.account, u'openid')
+ with person_logged_in(self.sso):
+ self.assertRaises(
+ SSHKeyAdditionError,
+ getUtility(IPersonSet).deleteSSHKeyForPersonFromSSO,
+ self.sso, u'openid', 'badtype key comment', False)
=== modified file 'lib/lp/registry/tests/test_ssh.py'
--- lib/lp/registry/tests/test_ssh.py 2016-05-25 05:33:39 +0000
+++ lib/lp/registry/tests/test_ssh.py 2016-05-25 05:33:39 +0000
@@ -10,6 +10,7 @@
from lp.registry.interfaces.ssh import (
ISSHKeySet,
+ SSHKeyAdditionError,
SSHKeyCompromisedError,
SSHKeyType,
)
@@ -98,3 +99,35 @@
for key in (VULNERABLE_DSA_KEY, VULNERABLE_RSA_KEY):
self.assertRaises(SSHKeyCompromisedError,
keyset.new, person, key,)
+
+ def test_getByPersonAndKeyText_retrieves_target_key(self):
+ person = self.factory.makePerson()
+ with person_logged_in(person):
+ key = self.factory.makeSSHKey(person)
+ keytext = key.getFullKeyText()
+
+ results = getUtility(ISSHKeySet).getByPersonAndKeyText(
+ person, keytext)
+ self.assertEqual([key], list(results))
+
+ def test_getByPersonAndKeyText_raises_on_invalid_key_type(self):
+ person = self.factory.makePerson()
+ with person_logged_in(person):
+ invalid_keytext = 'foo bar baz'
+ keyset = getUtility(ISSHKeySet)
+ self.assertRaises(
+ SSHKeyAdditionError,
+ keyset.getByPersonAndKeyText,
+ person, invalid_keytext
+ )
+
+ def test_getByPersonAndKeyText_raises_on_invalid_key_data(self):
+ person = self.factory.makePerson()
+ with person_logged_in(person):
+ invalid_keytext = 'glorp!'
+ keyset = getUtility(ISSHKeySet)
+ self.assertRaises(
+ SSHKeyAdditionError,
+ keyset.getByPersonAndKeyText,
+ person, invalid_keytext
+ )
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2016-05-23 02:43:48 +0000
+++ lib/lp/testing/factory.py 2016-05-25 05:33:39 +0000
@@ -220,6 +220,7 @@
from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
from lp.registry.interfaces.ssh import (
ISSHKeySet,
+ SSH_KEY_TYPE_TO_TEXT,
SSHKeyType,
)
from lp.registry.model.commercialsubscription import CommercialSubscription
@@ -4284,7 +4285,7 @@
return getUtility(IHWSubmissionDeviceSet).create(
device_driver_link, submission, parent, hal_device_id)
- def makeSSHKey(self, person=None, key_type=None):
+ def makeSSHKey(self, person=None, key_type=SSHKeyType.RSA):
"""Create a new SSHKey.
:param person: If specified, the person to attach the key to. If
@@ -4294,11 +4295,8 @@
"""
if person is None:
person = self.makePerson()
- if key_type is None or key_type == SSHKeyType.RSA:
- key_type_string = 'ssh-rsa'
- elif key_type == SSHKeyType.DSA:
- key_type_string = 'ssh-dss'
- else:
+ key_type_string = SSH_KEY_TYPE_TO_TEXT.get(key_type)
+ if key_type is None:
raise AssertionError(
"key_type must be a member of SSHKeyType, not %r" % key_type)
public_key = "%s %s %s" % (
Follow ups