← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad.

Commit message:
Reject SSH public keys that Twisted can't load.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #230144 in Launchpad itself: "Launchpad shouldn't accept malformed ssh keys"
  https://bugs.launchpad.net/launchpad/+bug/230144

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/reject-bad-ssh-keys/+merge/339445
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/reject-bad-ssh-keys into lp:launchpad.
=== modified file 'lib/lp/registry/browser/tests/test_person_webservice.py'
--- lib/lp/registry/browser/tests/test_person_webservice.py	2018-01-02 16:10:26 +0000
+++ lib/lp/registry/browser/tests/test_person_webservice.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -562,21 +562,22 @@
         with admin_logged_in():
             person = removeSecurityProxy(self.factory.makePerson())
             openid_id = person.account.openid_identifiers.any().identifier
-        response = self.addSSHKeyForPerson(
-            openid_id, 'ssh-rsa keydata comment')
+        full_key = self.factory.makeSSHKeyText()
+        _, keytext, comment = full_key.split(' ', 2)
+        response = self.addSSHKeyForPerson(openid_id, full_key)
 
         self.assertEqual(200, response.status)
         [key] = person.sshkeys
         self.assertEqual(SSHKeyType.RSA, key.keytype)
-        self.assertEqual('keydata', key.keytext)
-        self.assertEqual('comment', key.comment)
+        self.assertEqual(keytext, key.keytext)
+        self.assertEqual(comment, key.comment)
 
     def test_addSSHKeyFromSSO_dry_run(self):
         with admin_logged_in():
             person = removeSecurityProxy(self.factory.makePerson())
             openid_id = person.account.openid_identifiers.any().identifier
         response = self.addSSHKeyForPerson(
-            openid_id, 'ssh-rsa keydata comment', dry_run=True)
+            openid_id, self.factory.makeSSHKeyText(), dry_run=True)
 
         self.assertEqual(200, response.status)
         self.assertEqual(0, person.sshkeys.count())

=== modified file 'lib/lp/registry/browser/tests/test_sshkey.py'
--- lib/lp/registry/browser/tests/test_sshkey.py	2017-06-01 12:58:53 +0000
+++ lib/lp/registry/browser/tests/test_sshkey.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for GPG key on the web."""
@@ -51,7 +51,7 @@
         with person_logged_in(person):
             # Add the key for the user here,
             # since we only care about testing removal.
-            getUtility(ISSHKeySet).new(person, public_key)
+            getUtility(ISSHKeySet).new(person, public_key, check_key=False)
             browser = setupBrowserFreshLogin(person)
             browser.open(url)
             browser.getControl('Remove').click()

=== modified file 'lib/lp/registry/interfaces/ssh.py'
--- lib/lp/registry/interfaces/ssh.py	2017-06-01 12:58:53 +0000
+++ lib/lp/registry/interfaces/ssh.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """SSH key interfaces."""
@@ -89,16 +89,19 @@
 class ISSHKeySet(Interface):
     """The set of SSHKeys."""
 
-    def new(person, sshkey, send_notification=True, dry_run=False):
+    def new(person, sshkey, check_key=True, send_notification=True,
+            dry_run=False):
         """Create a new SSHKey pointing to the given Person.
 
         :param person: The IPerson to add the ssh key to.
         :param sshkey: The full ssh key text.
-        :param send_notification: Set to False to supress sending the user an
+        :param check_key: Set to False to skip the check for whether the key
+            text can be deserialised by Twisted.
+        :param send_notification: Set to False to suppress sending the user an
             email about the change.
-        :param dry_run: Perform all the format and vaulnerability checks, but
-            don't actually add the key. Causes the method to return None,
-            rather than an instance of ISSHKey.
+        :param dry_run: Perform all the format checks, but don't actually
+            add the key. Causes the method to return None, rather than an
+            instance of ISSHKey.
         """
 
     def getByID(id, default=None):

=== modified file 'lib/lp/registry/model/person.py'
--- lib/lp/registry/model/person.py	2017-09-28 13:06:10 +0000
+++ lib/lp/registry/model/person.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Implementation classes for a Person."""
@@ -81,6 +81,7 @@
     Store,
     )
 import transaction
+from twisted.conch.ssh.keys import Key
 from zope.component import (
     adapter,
     getUtility,
@@ -3512,7 +3513,8 @@
             raise NoSuchAccount("No account found for openid identifier '%s'"
                                 % openid_identifier)
         getUtility(ISSHKeySet).new(
-            IPerson(account), key_text, False, dry_run=dry_run)
+            IPerson(account), key_text, send_notification=False,
+            dry_run=dry_run)
 
     def deleteSSHKeyFromSSO(self, user, openid_identifier, key_text,
                             dry_run=False):
@@ -4123,9 +4125,17 @@
 @implementer(ISSHKeySet)
 class SSHKeySet:
 
-    def new(self, person, sshkey, send_notification=True, dry_run=False):
+    def new(self, person, sshkey, check_key=True, send_notification=True,
+            dry_run=False):
         keytype, keytext, comment = self._extract_ssh_key_components(sshkey)
 
+        if check_key:
+            try:
+                Key.fromString(sshkey)
+            except Exception as e:
+                raise SSHKeyAdditionError(
+                    "Invalid SSH key data: '%s' (%s)" % (sshkey, e))
+
         if send_notification:
             person.security_field_changed(
                 "New SSH key added to your account.",

=== modified file 'lib/lp/registry/tests/test_personset.py'
--- lib/lp/registry/tests/test_personset.py	2018-01-02 16:10:26 +0000
+++ lib/lp/registry/tests/test_personset.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for PersonSet."""
@@ -976,7 +976,7 @@
     def test_restricted_to_sso(self):
         # Only the ubuntu-sso celebrity can invoke this
         # privileged method.
-        key_text = 'ssh-rsa keytext keycomment'
+        key_text = self.factory.makeSSHKeyText()
         target = self.factory.makePerson(name='username')
         make_openid_identifier(target.account, u'openid')
 
@@ -998,22 +998,23 @@
             self.assertEqual(None, do_it())
 
     def test_adds_new_ssh_key(self):
-        key_text = 'ssh-rsa keytext keycomment'
+        full_key = self.factory.makeSSHKeyText()
+        _, keytext, comment = full_key.split(' ', 2)
         target = self.factory.makePerson(name='username')
         make_openid_identifier(target.account, u'openid')
 
         with person_logged_in(self.sso):
             getUtility(IPersonSet).addSSHKeyFromSSO(
-                self.sso, u'openid', key_text, False)
+                self.sso, u'openid', full_key, False)
 
         with person_logged_in(target):
             [key] = target.sshkeys
             self.assertEqual(key.keytype, SSHKeyType.RSA)
-            self.assertEqual(key.keytext, 'keytext')
-            self.assertEqual(key.comment, 'keycomment')
+            self.assertEqual(key.keytext, keytext)
+            self.assertEqual(key.comment, comment)
 
     def test_does_not_add_new_ssh_key_with_dry_run(self):
-        key_text = 'ssh-rsa keytext keycomment'
+        key_text = self.factory.makeSSHKeyText()
         target = self.factory.makePerson(name='username')
         make_openid_identifier(target.account, u'openid')
 

=== modified file 'lib/lp/registry/tests/test_ssh.py'
--- lib/lp/registry/tests/test_ssh.py	2017-06-01 12:58:53 +0000
+++ lib/lp/registry/tests/test_ssh.py	2018-02-24 09:42:40 +0000
@@ -1,4 +1,4 @@
-# Copyright 2016-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2016-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for SSHKey."""
@@ -74,9 +74,10 @@
 
     def test_sends_notification_by_default(self):
         person = self.factory.makePerson()
+        key_text = self.factory.makeSSHKeyText(comment="comment")
         with person_logged_in(person):
             keyset = getUtility(ISSHKeySet)
-            keyset.new(person, "ssh-rsa keytext comment")
+            keyset.new(person, key_text)
             [email] = pop_notifications()
         self.assertEqual(
             email['Subject'], "New SSH key added to your account.")
@@ -89,7 +90,7 @@
         person = self.factory.makePerson()
         with person_logged_in(person):
             keyset = getUtility(ISSHKeySet)
-            keyset.new(person, "ssh-rsa keytext comment",
+            keyset.new(person, self.factory.makeSSHKeyText(),
                        send_notification=False)
             self.assertEqual([], pop_notifications())
 
@@ -138,10 +139,8 @@
     def test_can_add_new_key(self):
         keyset = getUtility(ISSHKeySet)
         person = self.factory.makePerson()
-        keytype = 'ssh-rsa'
-        keytext = 'ThisIsAFakeSSHKey'
-        comment = 'This is a key comment.'
-        full_key = ' '.join((keytype, keytext, comment))
+        full_key = self.factory.makeSSHKeyText()
+        keytype, keytext, comment = full_key.split(' ', 2)
         with person_logged_in(person):
             key = keyset.new(person, full_key)
             self.assertEqual([key], list(person.sshkeys))
@@ -152,18 +151,28 @@
     def test_new_raises_KeyAdditionError_on_bad_key_data(self):
         person = self.factory.makePerson()
         keyset = getUtility(ISSHKeySet)
-        self.assertRaises(
+        self.assertRaisesWithContent(
             SSHKeyAdditionError,
+            "Invalid SSH key data: 'thiskeyhasnospaces'",
             keyset.new,
             person, 'thiskeyhasnospaces'
         )
-        self.assertRaises(
+        self.assertRaisesWithContent(
             SSHKeyAdditionError,
+            "Invalid SSH key type: 'bad_key_type'",
             keyset.new,
             person, 'bad_key_type keytext comment'
         )
-        self.assertRaises(
-            SSHKeyAdditionError,
+        self.assertRaisesWithContent(
+            SSHKeyAdditionError,
+            "Invalid SSH key data: 'ssh-rsa badkeytext comment' "
+            "(Incorrect padding)",
+            keyset.new,
+            person, 'ssh-rsa badkeytext comment'
+        )
+        self.assertRaisesWithContent(
+            SSHKeyAdditionError,
+            "Invalid SSH key data: 'None'",
             keyset.new,
             person, None
         )

=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py	2018-02-02 10:06:24 +0000
+++ lib/lp/testing/factory.py	2018-02-24 09:42:40 +0000
@@ -19,6 +19,7 @@
     'remove_security_proxy_and_shout_at_engineer',
     ]
 
+import base64
 from datetime import (
     datetime,
     timedelta,
@@ -50,6 +51,11 @@
 from lazr.jobrunner.jobrunner import SuspendJobException
 import pytz
 from pytz import UTC
+from twisted.conch.ssh.common import (
+    MP,
+    NS,
+    )
+from twisted.conch.test import keydata
 from twisted.python.util import mergeFunctionMetadata
 from zope.component import (
     ComponentLookupError,
@@ -4304,6 +4310,29 @@
         return getUtility(IHWSubmissionDeviceSet).create(
             device_driver_link, submission, parent, hal_device_id)
 
+    def makeSSHKeyText(self, key_type=SSHKeyType.RSA, comment=None):
+        """Create new SSH public key text.
+
+        :param key_type: If specified, the type of SSH key to generate. Must be
+            a member of SSHKeyType. If unspecified, SSHKeyType.RSA is used.
+        """
+        key_type_string = SSH_KEY_TYPE_TO_TEXT.get(key_type)
+        if key_type is None:
+            raise AssertionError(
+                "key_type must be a member of SSHKeyType, not %r" % key_type)
+        if key_type == SSHKeyType.RSA:
+            parameters = [MP(keydata.RSAData[param]) for param in ("e", "n")]
+        elif key_type == SSHKeyType.DSA:
+            parameters = [
+                MP(keydata.DSAData[param]) for param in ("p", "q", "g", "y")]
+        else:
+            raise AssertionError(
+                "key_type must be a member of SSHKeyType, not %r" % key_type)
+        key_text = base64.b64encode(NS(key_type_string) + b"".join(parameters))
+        if comment is None:
+            comment = self.getUniqueString()
+        return "%s %s %s" % (key_type_string, key_text, comment)
+
     def makeSSHKey(self, person=None, key_type=SSHKeyType.RSA,
                    send_notification=True):
         """Create a new SSHKey.
@@ -4315,15 +4344,7 @@
         """
         if person is None:
             person = self.makePerson()
-        key_type_string = SSH_KEY_TYPE_TO_TEXT.get(key_type)
-        if key_type is None:
-            raise AssertionError(
-                "key_type must be a member of SSHKeyType, not %r" % key_type)
-        public_key = "%s %s %s" % (
-            key_type_string,
-            self.getUniqueString(),
-            self.getUniqueString(),
-            )
+        public_key = self.makeSSHKeyText(key_type=key_type)
         return getUtility(ISSHKeySet).new(
             person, public_key, send_notification=send_notification)
 


Follow ups