launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #22207
[Merge] lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad.
Commit message:
Reject SSH public keys that Twisted can't load.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #230144 in Launchpad itself: "Launchpad shouldn't accept malformed ssh keys"
https://bugs.launchpad.net/launchpad/+bug/230144
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/reject-bad-ssh-keys/+merge/339445
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad.
=== modified file 'lib/lp/registry/browser/tests/test_person_webservice.py'
--- lib/lp/registry/browser/tests/test_person_webservice.py 2018-01-02 16:10:26 +0000
+++ lib/lp/registry/browser/tests/test_person_webservice.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
@@ -562,21 +562,22 @@
with admin_logged_in():
person = removeSecurityProxy(self.factory.makePerson())
openid_id = person.account.openid_identifiers.any().identifier
- response = self.addSSHKeyForPerson(
- openid_id, 'ssh-rsa keydata comment')
+ full_key = self.factory.makeSSHKeyText()
+ _, keytext, comment = full_key.split(' ', 2)
+ response = self.addSSHKeyForPerson(openid_id, full_key)
self.assertEqual(200, response.status)
[key] = person.sshkeys
self.assertEqual(SSHKeyType.RSA, key.keytype)
- self.assertEqual('keydata', key.keytext)
- self.assertEqual('comment', key.comment)
+ self.assertEqual(keytext, key.keytext)
+ self.assertEqual(comment, key.comment)
def test_addSSHKeyFromSSO_dry_run(self):
with admin_logged_in():
person = removeSecurityProxy(self.factory.makePerson())
openid_id = person.account.openid_identifiers.any().identifier
response = self.addSSHKeyForPerson(
- openid_id, 'ssh-rsa keydata comment', dry_run=True)
+ openid_id, self.factory.makeSSHKeyText(), dry_run=True)
self.assertEqual(200, response.status)
self.assertEqual(0, person.sshkeys.count())
=== modified file 'lib/lp/registry/browser/tests/test_sshkey.py'
--- lib/lp/registry/browser/tests/test_sshkey.py 2017-06-01 12:58:53 +0000
+++ lib/lp/registry/browser/tests/test_sshkey.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2010-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for GPG key on the web."""
@@ -51,7 +51,7 @@
with person_logged_in(person):
# Add the key for the user here,
# since we only care about testing removal.
- getUtility(ISSHKeySet).new(person, public_key)
+ getUtility(ISSHKeySet).new(person, public_key, check_key=False)
browser = setupBrowserFreshLogin(person)
browser.open(url)
browser.getControl('Remove').click()
=== modified file 'lib/lp/registry/interfaces/ssh.py'
--- lib/lp/registry/interfaces/ssh.py 2017-06-01 12:58:53 +0000
+++ lib/lp/registry/interfaces/ssh.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""SSH key interfaces."""
@@ -89,16 +89,19 @@
class ISSHKeySet(Interface):
"""The set of SSHKeys."""
- def new(person, sshkey, send_notification=True, dry_run=False):
+ def new(person, sshkey, check_key=True, send_notification=True,
+ dry_run=False):
"""Create a new SSHKey pointing to the given Person.
:param person: The IPerson to add the ssh key to.
:param sshkey: The full ssh key text.
- :param send_notification: Set to False to supress sending the user an
+ :param check_key: Set to False to skip the check for whether the key
+ text can be deserialised by Twisted.
+ :param send_notification: Set to False to suppress sending the user an
email about the change.
- :param dry_run: Perform all the format and vaulnerability checks, but
- don't actually add the key. Causes the method to return None,
- rather than an instance of ISSHKey.
+ :param dry_run: Perform all the format checks, but don't actually
+ add the key. Causes the method to return None, rather than an
+ instance of ISSHKey.
"""
def getByID(id, default=None):
=== modified file 'lib/lp/registry/model/person.py'
--- lib/lp/registry/model/person.py 2017-09-28 13:06:10 +0000
+++ lib/lp/registry/model/person.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Implementation classes for a Person."""
@@ -81,6 +81,7 @@
Store,
)
import transaction
+from twisted.conch.ssh.keys import Key
from zope.component import (
adapter,
getUtility,
@@ -3512,7 +3513,8 @@
raise NoSuchAccount("No account found for openid identifier '%s'"
% openid_identifier)
getUtility(ISSHKeySet).new(
- IPerson(account), key_text, False, dry_run=dry_run)
+ IPerson(account), key_text, send_notification=False,
+ dry_run=dry_run)
def deleteSSHKeyFromSSO(self, user, openid_identifier, key_text,
dry_run=False):
@@ -4123,9 +4125,17 @@
@implementer(ISSHKeySet)
class SSHKeySet:
- def new(self, person, sshkey, send_notification=True, dry_run=False):
+ def new(self, person, sshkey, check_key=True, send_notification=True,
+ dry_run=False):
keytype, keytext, comment = self._extract_ssh_key_components(sshkey)
+ if check_key:
+ try:
+ Key.fromString(sshkey)
+ except Exception as e:
+ raise SSHKeyAdditionError(
+ "Invalid SSH key data: '%s' (%s)" % (sshkey, e))
+
if send_notification:
person.security_field_changed(
"New SSH key added to your account.",
=== modified file 'lib/lp/registry/tests/test_personset.py'
--- lib/lp/registry/tests/test_personset.py 2018-01-02 16:10:26 +0000
+++ lib/lp/registry/tests/test_personset.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for PersonSet."""
@@ -976,7 +976,7 @@
def test_restricted_to_sso(self):
# Only the ubuntu-sso celebrity can invoke this
# privileged method.
- key_text = 'ssh-rsa keytext keycomment'
+ key_text = self.factory.makeSSHKeyText()
target = self.factory.makePerson(name='username')
make_openid_identifier(target.account, u'openid')
@@ -998,22 +998,23 @@
self.assertEqual(None, do_it())
def test_adds_new_ssh_key(self):
- key_text = 'ssh-rsa keytext keycomment'
+ full_key = self.factory.makeSSHKeyText()
+ _, keytext, comment = full_key.split(' ', 2)
target = self.factory.makePerson(name='username')
make_openid_identifier(target.account, u'openid')
with person_logged_in(self.sso):
getUtility(IPersonSet).addSSHKeyFromSSO(
- self.sso, u'openid', key_text, False)
+ self.sso, u'openid', full_key, False)
with person_logged_in(target):
[key] = target.sshkeys
self.assertEqual(key.keytype, SSHKeyType.RSA)
- self.assertEqual(key.keytext, 'keytext')
- self.assertEqual(key.comment, 'keycomment')
+ self.assertEqual(key.keytext, keytext)
+ self.assertEqual(key.comment, comment)
def test_does_not_add_new_ssh_key_with_dry_run(self):
- key_text = 'ssh-rsa keytext keycomment'
+ key_text = self.factory.makeSSHKeyText()
target = self.factory.makePerson(name='username')
make_openid_identifier(target.account, u'openid')
=== modified file 'lib/lp/registry/tests/test_ssh.py'
--- lib/lp/registry/tests/test_ssh.py 2017-06-01 12:58:53 +0000
+++ lib/lp/registry/tests/test_ssh.py 2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2016-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2016-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for SSHKey."""
@@ -74,9 +74,10 @@
def test_sends_notification_by_default(self):
person = self.factory.makePerson()
+ key_text = self.factory.makeSSHKeyText(comment="comment")
with person_logged_in(person):
keyset = getUtility(ISSHKeySet)
- keyset.new(person, "ssh-rsa keytext comment")
+ keyset.new(person, key_text)
[email] = pop_notifications()
self.assertEqual(
email['Subject'], "New SSH key added to your account.")
@@ -89,7 +90,7 @@
person = self.factory.makePerson()
with person_logged_in(person):
keyset = getUtility(ISSHKeySet)
- keyset.new(person, "ssh-rsa keytext comment",
+ keyset.new(person, self.factory.makeSSHKeyText(),
send_notification=False)
self.assertEqual([], pop_notifications())
@@ -138,10 +139,8 @@
def test_can_add_new_key(self):
keyset = getUtility(ISSHKeySet)
person = self.factory.makePerson()
- keytype = 'ssh-rsa'
- keytext = 'ThisIsAFakeSSHKey'
- comment = 'This is a key comment.'
- full_key = ' '.join((keytype, keytext, comment))
+ full_key = self.factory.makeSSHKeyText()
+ keytype, keytext, comment = full_key.split(' ', 2)
with person_logged_in(person):
key = keyset.new(person, full_key)
self.assertEqual([key], list(person.sshkeys))
@@ -152,18 +151,28 @@
def test_new_raises_KeyAdditionError_on_bad_key_data(self):
person = self.factory.makePerson()
keyset = getUtility(ISSHKeySet)
- self.assertRaises(
+ self.assertRaisesWithContent(
SSHKeyAdditionError,
+ "Invalid SSH key data: 'thiskeyhasnospaces'",
keyset.new,
person, 'thiskeyhasnospaces'
)
- self.assertRaises(
+ self.assertRaisesWithContent(
SSHKeyAdditionError,
+ "Invalid SSH key type: 'bad_key_type'",
keyset.new,
person, 'bad_key_type keytext comment'
)
- self.assertRaises(
- SSHKeyAdditionError,
+ self.assertRaisesWithContent(
+ SSHKeyAdditionError,
+ "Invalid SSH key data: 'ssh-rsa badkeytext comment' "
+ "(Incorrect padding)",
+ keyset.new,
+ person, 'ssh-rsa badkeytext comment'
+ )
+ self.assertRaisesWithContent(
+ SSHKeyAdditionError,
+ "Invalid SSH key data: 'None'",
keyset.new,
person, None
)
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2018-02-02 10:06:24 +0000
+++ lib/lp/testing/factory.py 2018-02-24 09:42:40 +0000
@@ -19,6 +19,7 @@
'remove_security_proxy_and_shout_at_engineer',
]
+import base64
from datetime import (
datetime,
timedelta,
@@ -50,6 +51,11 @@
from lazr.jobrunner.jobrunner import SuspendJobException
import pytz
from pytz import UTC
+from twisted.conch.ssh.common import (
+ MP,
+ NS,
+ )
+from twisted.conch.test import keydata
from twisted.python.util import mergeFunctionMetadata
from zope.component import (
ComponentLookupError,
@@ -4304,6 +4310,29 @@
return getUtility(IHWSubmissionDeviceSet).create(
device_driver_link, submission, parent, hal_device_id)
+ def makeSSHKeyText(self, key_type=SSHKeyType.RSA, comment=None):
+ """Create new SSH public key text.
+
+ :param key_type: If specified, the type of SSH key to generate. Must be
+ a member of SSHKeyType. If unspecified, SSHKeyType.RSA is used.
+ """
+ key_type_string = SSH_KEY_TYPE_TO_TEXT.get(key_type)
+ if key_type is None:
+ raise AssertionError(
+ "key_type must be a member of SSHKeyType, not %r" % key_type)
+ if key_type == SSHKeyType.RSA:
+ parameters = [MP(keydata.RSAData[param]) for param in ("e", "n")]
+ elif key_type == SSHKeyType.DSA:
+ parameters = [
+ MP(keydata.DSAData[param]) for param in ("p", "q", "g", "y")]
+ else:
+ raise AssertionError(
+ "key_type must be a member of SSHKeyType, not %r" % key_type)
+ key_text = base64.b64encode(NS(key_type_string) + b"".join(parameters))
+ if comment is None:
+ comment = self.getUniqueString()
+ return "%s %s %s" % (key_type_string, key_text, comment)
+
def makeSSHKey(self, person=None, key_type=SSHKeyType.RSA,
send_notification=True):
"""Create a new SSHKey.
@@ -4315,15 +4344,7 @@
"""
if person is None:
person = self.makePerson()
- key_type_string = SSH_KEY_TYPE_TO_TEXT.get(key_type)
- if key_type is None:
- raise AssertionError(
- "key_type must be a member of SSHKeyType, not %r" % key_type)
- public_key = "%s %s %s" % (
- key_type_string,
- self.getUniqueString(),
- self.getUniqueString(),
- )
+ public_key = self.makeSSHKeyText(key_type=key_type)
return getUtility(ISSHKeySet).new(
person, public_key, send_notification=send_notification)
Follow ups