← Back to team overview

txaws-dev team mailing list archive

[Merge] lp:~ack/txaws/xss-hardening into lp:txaws

 

Alberto Donato has proposed merging lp:~ack/txaws/xss-hardening into lp:txaws.

Requested reviews:
  txAWS Committers (txaws-dev)
  txAWS Committers (txaws-dev)

For more details, see:
https://code.launchpad.net/~ack/txaws/xss-hardening/+merge/181562

Based on Chris' branch lp:~tribaal/txaws/xss-hardening, drops the cgi.escape as json content shoudn't be escaped.

It also adds the "X-Content-Type-Options: nosniff" header, to prevent browsers from guessing the content type, and use the one declared in the response (application/json).
-- 
https://code.launchpad.net/~ack/txaws/xss-hardening/+merge/181562
Your team txAWS Committers is requested to review the proposed merge of lp:~ack/txaws/xss-hardening into lp:txaws.
=== modified file 'txaws/server/resource.py'
--- txaws/server/resource.py	2012-05-09 14:21:39 +0000
+++ txaws/server/resource.py	2013-08-22 14:02:04 +0000
@@ -94,6 +94,8 @@
         def write_response(response):
             request.setHeader("Content-Length", str(len(response)))
             request.setHeader("Content-Type", self.content_type)
+            # Prevent browsers from trying to guess a different content type.
+            request.setHeader("X-Content-Type-Options", "nosniff")
             request.write(response)
             request.finish()
             return response
@@ -109,16 +111,15 @@
                     log.msg("status: %s message: %s" % (
                         status, safe_str(failure.value)))
 
-                bytes = failure.value.response
-                if bytes is None:
-                    bytes = self.dump_error(failure.value, request)
+                body = failure.value.response
+                if body is None:
+                    body = self.dump_error(failure.value, request)
             else:
                 log.err(failure)
-                bytes = safe_str(failure.value)
+                body = safe_str(failure.value)
                 status = 500
             request.setResponseCode(status)
-            request.write(bytes)
-            request.finish()
+            write_response(body)
 
         deferred.addCallback(write_response)
         deferred.addErrback(write_error)

=== modified file 'txaws/server/tests/test_resource.py'
--- txaws/server/tests/test_resource.py	2012-05-09 14:21:39 +0000
+++ txaws/server/tests/test_resource.py	2013-08-22 14:02:04 +0000
@@ -120,8 +120,7 @@
             "access_key_id": request.args["access_key"][0],
             "signature_method": "Hmacsha256",
             "signature_version": 2,
-            "signature": request.args["signature"][0],
-            }
+            "signature": request.args["signature"][0]}
         params = dict((k, v[-1]) for k, v in request.args.iteritems())
         raw = params.copy()
         raw.pop("signature")
@@ -153,6 +152,8 @@
             self.assertEqual("data", request.response)
             self.assertEqual("4", request.headers["Content-Length"])
             self.assertEqual("text/plain", request.headers["Content-Type"])
+            self.assertEqual(
+                "nosniff", request.headers["X-Content-Type-Options"])
             self.assertEqual(200, request.code)
 
         self.api.principal = TestPrincipal(creds)
@@ -428,6 +429,30 @@
 
         return self.api.handle(request).addCallback(check)
 
+    def test_handle_error_is_api_content_type(self):
+        """
+        If an error occurs while parsing the parameters, L{QueryAPI.handle}
+        responds with HTTP status 400, and the resulting response has a
+        Content-Type header matching the content type defined in the QueryAPI.
+        """
+        creds = AWSCredentials("access", "secret")
+        endpoint = AWSServiceEndpoint("http://uri";)
+        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
+        query.sign()
+        query.params.pop("Action")
+        request = FakeRequest(query.params, endpoint)
+
+        def check(ignored):
+            errors = self.flushLoggedErrors()
+            self.assertEquals(0, len(errors))
+            self.assertEqual(400, request.code)
+            self.assertEqual(
+                self.api.content_type, request.headers['Content-Type'])
+            self.assertEqual(
+                "nosniff", request.headers["X-Content-Type-Options"])
+
+        return self.api.handle(request).addCallback(check)
+
     def test_handle_unicode_api_error(self):
         """
         If an L{APIError} contains a unicode message, L{QueryAPI} is able to


References