launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25932
[Merge] ~pappacena/turnip:py3-http-auth-flow into turnip:master
Thiago F. Pappacena has proposed merging ~pappacena/turnip:py3-http-auth-flow into turnip:master.
Commit message:
Making HTTP auth flow compatible with python3
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/turnip/+git/turnip/+merge/395807
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/turnip:py3-http-auth-flow into turnip:master.
diff --git a/turnip/pack/http.py b/turnip/pack/http.py
index f138c77..3e627d0 100644
--- a/turnip/pack/http.py
+++ b/turnip/pack/http.py
@@ -7,6 +7,7 @@ from __future__ import (
unicode_literals,
)
+import base64
import io
import json
import os.path
@@ -24,12 +25,14 @@ from openid.extensions.sreg import (
SRegRequest,
SRegResponse,
)
+import paste.auth.cookie
from paste.auth.cookie import (
AuthCookieSigner,
decode as decode_cookie,
encode as encode_cookie,
)
import six
+import time
from twisted.internet import (
defer,
error,
@@ -476,18 +479,53 @@ class CGitScriptResource(twcgi.CGIScript):
twcgi.CGIScript.runProcess(self, env, request, *args, **kwargs)
+class TurnipAuthCookieSigner(AuthCookieSigner):
+ def auth(self, cookie):
+ """
+ Authenticate the cooke using the signature, verify that it
+ has not expired; and return the cookie's content.
+
+ This was moved from the original AuthCookieSigner to make it python3
+ compatible, since paste.auth.* is marked to be deprecated according to
+ https://pythonpaste.readthedocs.io/en/latest/future.html#to-deprecate.
+ """
+ _signature_size = paste.auth.cookie._signature_size
+ _header_size = paste.auth.cookie._header_size
+ sha1 = paste.auth.cookie.sha1
+ hmac = paste.auth.cookie.hmac
+ make_time = paste.auth.cookie.make_time
+ decode = base64.decodestring(
+ cookie.replace(b"_", b"/").replace(b"~", b"="))
+ signature = decode[:_signature_size]
+ expires = decode[_signature_size:_header_size]
+ content = decode[_header_size:]
+ if signature == hmac.new(self.secret, content, sha1).digest():
+ if int(expires) > int(make_time(time.time())):
+ return content
+ else:
+ # This is the normal case of an expired cookie; just
+ # don't bother doing anything here.
+ pass
+ else:
+ # This case can happen if the server is restarted with a
+ # different secret; or if the user's IP address changed
+ # due to a proxy. However, it could also be a break-in
+ # attempt -- so should it be reported?
+ pass
+
+
class BaseHTTPAuthResource(resource.Resource):
"""Base HTTP resource for OpenID authentication handling."""
session_var = 'turnip.session'
- cookie_name = 'TURNIP_COOKIE'
+ cookie_name = b'TURNIP_COOKIE'
anonymous_id = '+launchpad-anonymous'
def __init__(self, root):
resource.Resource.__init__(self)
self.root = root
if root.cgit_secret is not None:
- self.signer = AuthCookieSigner(root.cgit_secret)
+ self.signer = TurnipAuthCookieSigner(root.cgit_secret)
else:
self.signer = None
@@ -497,14 +535,14 @@ class BaseHTTPAuthResource(resource.Resource):
if cookie is not None:
content = self.signer.auth(cookie)
if content:
- return json.loads(decode_cookie(content))
+ return json.loads(decode_cookie(six.ensure_text(content)))
return {}
def _putSession(self, request, session):
if self.signer is not None:
content = self.signer.sign(encode_cookie(json.dumps(session)))
- cookie = '%s=%s; Path=/; secure;' % (self.cookie_name, content)
- request.setHeader(b'Set-Cookie', cookie.encode('UTF-8'))
+ cookie = b'%s=%s; Path=/; secure;' % (self.cookie_name, content)
+ request.setHeader(b'Set-Cookie', cookie)
def _setErrorCode(self, request, code=http.INTERNAL_SERVER_ERROR):
request.setResponseCode(code)
@@ -534,7 +572,8 @@ class HTTPAuthLoginResource(BaseHTTPAuthResource):
wrong.
"""
session = self._getSession(request)
- query = {k: v[-1] for k, v in request.args.items()}
+ query = {six.ensure_text(k): six.ensure_text(v[-1])
+ for k, v in request.args.items()}
response = self._makeConsumer(session).complete(
query, query['openid.return_to'])
if response.status == consumer.SUCCESS:
@@ -551,7 +590,7 @@ class HTTPAuthLoginResource(BaseHTTPAuthResource):
session['identity_url'] = response.identity_url
session['user'] = sreg_info['nickname']
self._putSession(request, session)
- request.redirect(query['back_to'])
+ request.redirect(six.ensure_binary(query['back_to']))
return b''
elif response.status == consumer.FAILURE:
log.msg('OpenID response: FAILURE: %s' % response.message)
diff --git a/turnip/pack/tests/test_http.py b/turnip/pack/tests/test_http.py
index 86a5c95..18c5893 100644
--- a/turnip/pack/tests/test_http.py
+++ b/turnip/pack/tests/test_http.py
@@ -8,10 +8,13 @@ from __future__ import (
)
from io import BytesIO
+import json
import os
from fixtures import TempDir
import six
+from openid.consumer import consumer
+from paste.auth.cookie import encode as encode_cookie
from testtools import TestCase
from testtools.deferredruntest import AsynchronousDeferredRunTest
from twisted.internet import (
@@ -31,7 +34,10 @@ from turnip.pack import (
http,
)
from turnip.pack.helpers import encode_packet
-from turnip.pack.http import get_protocol_version_from_request
+from turnip.pack.http import (
+ get_protocol_version_from_request,
+ HTTPAuthLoginResource,
+ )
from turnip.pack.tests.fake_servers import FakeVirtInfoService
from turnip.tests.compat import mock
from turnip.version_info import version_info
@@ -44,6 +50,7 @@ class LessDummyRequest(requesthelper.DummyRequest):
def __init__(self, *args, **kwargs):
super(LessDummyRequest, self).__init__(*args, **kwargs)
self.content = BytesIO()
+ self.cookies = {}
@property
def value(self):
@@ -68,6 +75,9 @@ class LessDummyRequest(requesthelper.DummyRequest):
def getClientAddress(self):
return IPv4Address('TCP', '127.0.0.1', '80')
+ def getCookie(self, name):
+ return self.cookies.get(name)
+
class AuthenticatedLessDummyRequest(LessDummyRequest):
def getUser(self):
@@ -304,6 +314,69 @@ class TestSmartHTTPCommandResource(ErrorTestMixin, TestCase):
self.request.value)
+class TestHTTPAuthLoginResource(TestCase):
+ """Unit tests for login resource."""
+ def setUp(self):
+ super(TestHTTPAuthLoginResource, self).setUp()
+ self.root = FakeRoot(self.useFixture(TempDir()).path)
+ self.root.cgit_secret = b'dont-tell-anyone shuuu'
+
+ def getResourceInstance(self, mock_response):
+ resource = HTTPAuthLoginResource(self.root)
+ resource._makeConsumer = mock.Mock()
+ resource._makeConsumer.return_value.complete.return_value = (
+ mock_response)
+ return resource
+
+ def test_render_GET_success(self):
+ response = mock.Mock()
+ response.status = consumer.SUCCESS
+ response.identity_url = 'http://lp.test/XopAlqp'
+ response.getSignedNS.return_value = {
+ 'nickname': 'pappacena', 'country': 'BR'
+ }
+
+ request = LessDummyRequest([''])
+ request.method = b'GET'
+ request.path = b'/example'
+ request.args = {
+ b'openid.return_to': [b'https://return.to.test'],
+ b'back_to': [b'https://return.to.test']
+ }
+
+ resource = self.getResourceInstance(response)
+ self.assertEqual(b'', resource.render_GET(request))
+ encoded_cookie = resource.signer.sign(encode_cookie(json.dumps({
+ 'identity_url': response.identity_url,
+ 'user': 'pappacena'
+ })))
+ expected_cookie = b'TURNIP_COOKIE=%s; Path=/; secure;' % encoded_cookie
+ self.assertEqual({
+ b'Set-Cookie': [expected_cookie],
+ b'Location': [b'https://return.to.test']
+ }, dict(request.responseHeaders.getAllRawHeaders()))
+
+ def test_getSession(self):
+ response = mock.Mock()
+ request = LessDummyRequest([''])
+ request.method = b'GET'
+ request.path = b'/example'
+ request.args = {
+ b'openid.return_to': [b'https://return.to.test'],
+ b'back_to': [b'https://return.to.test']
+ }
+
+ resource = self.getResourceInstance(response)
+ cookie_data = {
+ 'identity_url': 'http://localhost',
+ 'user': 'pappacena'}
+ cookie_content = resource.signer.sign(
+ encode_cookie(json.dumps(cookie_data)))
+ request.cookies[resource.cookie_name] = cookie_content
+
+ self.assertEqual(cookie_data, resource._getSession(request))
+
+
class TestHTTPAuthRootResource(TestCase):
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=5)