← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/oauth-secret-checking into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/oauth-secret-checking into lp:launchpad.

Commit message:
Push OAuth secret checking down into the model, and add unit tests for check_oauth_signature.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/oauth-secret-checking/+merge/224761

This branch pushes OAuth secret validation down into the model, so we can switch to storing hashed rather than plaintext signatures.

I also added unit tests for the validation method, as it wasn't directly tested before.
-- 
https://code.launchpad.net/~wgrant/launchpad/oauth-secret-checking/+merge/224761
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/oauth-secret-checking into lp:launchpad.
=== modified file 'lib/lp/services/oauth/interfaces.py'
--- lib/lp/services/oauth/interfaces.py	2013-01-07 02:40:55 +0000
+++ lib/lp/services/oauth/interfaces.py	2014-06-27 08:21:59 +0000
@@ -83,6 +83,9 @@
         "desktop"). If the consumer is a specific web or desktop
         application, this is None.""")
 
+    def isSecretValid(secret):
+        """Check if a secret is valid for this consumer."""
+
     def newRequestToken():
         """Return a new `IOAuthRequestToken` with a random key and secret.
 
@@ -170,6 +173,9 @@
         description=_("A token may only be usable for a limited time, "
                       "after which it will expire."))
 
+    def isSecretValid(secret):
+        """Check if a secret is valid for this token."""
+
 
 class IOAuthAccessToken(IOAuthToken):
     """A token used by a consumer to access protected resources in LP.

=== modified file 'lib/lp/services/oauth/model.py'
--- lib/lp/services/oauth/model.py	2013-06-20 05:50:00 +0000
+++ lib/lp/services/oauth/model.py	2014-06-27 08:21:59 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2014 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -171,6 +171,10 @@
         """See `IOAuthConsumer`."""
         return self._integrated_desktop_match_group(1)
 
+    def isSecretValid(self, secret):
+        """See `IOAuthConsumer`."""
+        return secret == self.secret
+
     def newRequestToken(self):
         """See `IOAuthConsumer`."""
         key, secret = create_token_key_and_secret(table=OAuthRequestToken)
@@ -249,6 +253,10 @@
         now = datetime.now(pytz.timezone('UTC'))
         return self.date_expires is not None and self.date_expires <= now
 
+    def isSecretValid(self, secret):
+        """See `IOAuthConsumer`."""
+        return secret == self.secret
+
     def checkNonceAndTimestamp(self, nonce, timestamp):
         """See `IOAuthAccessToken`."""
         timestamp = float(timestamp)
@@ -334,6 +342,10 @@
         expires = self.date_created + timedelta(hours=REQUEST_TOKEN_VALIDITY)
         return expires <= now
 
+    def isSecretValid(self, secret):
+        """See `IOAuthConsumer`."""
+        return secret == self.secret
+
     def review(self, user, permission, context=None, date_expires=None):
         """See `IOAuthRequestToken`."""
         if self.is_reviewed:

=== modified file 'lib/lp/services/oauth/tests/test_tokens.py'
--- lib/lp/services/oauth/tests/test_tokens.py	2012-12-26 01:32:19 +0000
+++ lib/lp/services/oauth/tests/test_tokens.py	2014-06-27 08:21:59 +0000
@@ -146,6 +146,11 @@
         self.assertEquals(
             None, self.consumer.getRequestToken("no-such-token"))
 
+    def test_isSecretValid(self):
+        token = self.consumer.newRequestToken()
+        self.assertTrue(token.isSecretValid(token.secret))
+        self.assertFalse(token.isSecretValid(token.secret + 'a'))
+
     def test_token_review(self):
         request_token = self.consumer.newRequestToken()
 
@@ -352,6 +357,13 @@
         login_person(access_token.person)
         try_to_set()
 
+    def test_isSecretValid(self):
+        request_token = self.consumer.newRequestToken()
+        request_token.review(self.person, OAuthPermission.WRITE_PRIVATE)
+        token = request_token.createAccessToken()
+        self.assertTrue(token.isSecretValid(token.secret))
+        self.assertFalse(token.isSecretValid(token.secret + 'a'))
+
     def test_get_access_tokens_for_person(self):
         """It's possible to get a person's access tokens."""
         person = self.factory.makePerson()

=== modified file 'lib/lp/services/webapp/authentication.py'
--- lib/lp/services/webapp/authentication.py	2013-04-10 08:36:30 +0000
+++ lib/lp/services/webapp/authentication.py	2014-06-27 08:21:59 +0000
@@ -297,13 +297,13 @@
         request.response.setStatus(400)
         return False
 
-    if token is not None:
-        token_secret = token.secret
-    else:
-        token_secret = ''
-    expected_signature = "&".join([consumer.secret, token_secret])
-    if expected_signature != authorization.get('oauth_signature'):
-        request.unauthorized(OAUTH_CHALLENGE)
-        return False
-
-    return True
+    # The signature is a consumer secret and a token secret joined by &.
+    # If there's no token, the token secret is the empty string.
+    sig_bits = authorization.get('oauth_signature', '').split('&')
+    if (len(sig_bits) == 2
+        and consumer.isSecretValid(sig_bits[0])
+        and ((token is None and sig_bits[1] == '')
+             or (token is not None and token.isSecretValid(sig_bits[1])))):
+        return True
+    request.unauthorized(OAUTH_CHALLENGE)
+    return False

=== modified file 'lib/lp/services/webapp/tests/test_authentication.py'
--- lib/lp/services/webapp/tests/test_authentication.py	2012-02-28 04:24:19 +0000
+++ lib/lp/services/webapp/tests/test_authentication.py	2014-06-27 08:21:59 +0000
@@ -10,7 +10,12 @@
 
 from contrib.oauth import OAuthRequest
 
-from lp.testing import TestCaseWithFactory
+from lp.services.webapp.authentication import check_oauth_signature
+from lp.services.webapp.servers import LaunchpadTestRequest
+from lp.testing import (
+    TestCase,
+    TestCaseWithFactory,
+    )
 from lp.testing.layers import (
     DatabaseFunctionalLayer,
     LaunchpadFunctionalLayer,
@@ -22,9 +27,7 @@
     )
 
 
-class TestOAuthParsing(TestCaseWithFactory):
-
-    layer = DatabaseFunctionalLayer
+class TestOAuthParsing(TestCase):
 
     def test_split_oauth(self):
         # OAuth headers are parsed correctly: see bug 314507.
@@ -47,6 +50,50 @@
             {'oauth_consumer_key': 'justtesting'})
 
 
+class TestCheckOAuthSignature(TestCaseWithFactory):
+
+    layer = DatabaseFunctionalLayer
+
+    def makeRequest(self, signature, method='PLAINTEXT'):
+        form = {
+            'oauth_signature_method': method, 'oauth_signature': signature}
+        return LaunchpadTestRequest(form=form)
+
+    def test_valid(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('&%s' % token.secret)
+        self.assertTrue(check_oauth_signature(request, token.consumer, token))
+
+    def test_bad_secret(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('&%slol' % token.secret)
+        self.assertFalse(check_oauth_signature(request, token.consumer, token))
+        self.assertEqual(401, request.response.getStatus())
+
+    def test_valid_no_token(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('&')
+        self.assertTrue(check_oauth_signature(request, token.consumer, None))
+
+    def test_bad_secret_no_token(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('&lol')
+        self.assertFalse(check_oauth_signature(request, token.consumer, None))
+        self.assertEqual(401, request.response.getStatus())
+
+    def test_not_plaintext(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('&lol', method='HMAC-SHA1')
+        self.assertFalse(check_oauth_signature(request, token.consumer, token))
+        self.assertEqual(400, request.response.getStatus())
+
+    def test_bad_signature_format(self):
+        token = self.factory.makeOAuthAccessToken()
+        request = self.makeRequest('lol')
+        self.assertFalse(check_oauth_signature(request, token.consumer, token))
+        self.assertEqual(401, request.response.getStatus())
+
+
 def test_suite():
     suite = unittest.TestLoader().loadTestsFromName(__name__)
     suite.addTest(LayeredDocFileSuite(


Follow ups