← Back to team overview

txaws-dev team mailing list archive

[Merge] lp:~therve/txaws/old-signature into lp:txaws

 

Thomas Herve has proposed merging lp:~therve/txaws/old-signature into lp:txaws.

Requested reviews:
  txAWS Developers (txaws-dev)
Related bugs:
  #580544 Support SignatureVersion 1
  https://bugs.launchpad.net/bugs/580544

-- 
https://code.launchpad.net/~therve/txaws/old-signature/+merge/25335
Your team txAWS Developers is requested to review the proposed merge of lp:~therve/txaws/old-signature into lp:txaws.
=== modified file 'txaws/credentials.py'
--- txaws/credentials.py	2009-11-05 22:19:12 +0000
+++ txaws/credentials.py	2010-05-14 14:49:22 +0000
@@ -44,3 +44,5 @@
             return hmac_sha256(self.secret_key, bytes)
         elif hash_type == "sha1":
             return hmac_sha1(self.secret_key, bytes)
+        else:
+            raise RuntimeError("Unsupported hash type: '%s'" % hash_type)

=== modified file 'txaws/ec2/client.py'
--- txaws/ec2/client.py	2010-03-24 18:26:32 +0000
+++ txaws/ec2/client.py	2010-05-14 14:49:22 +0000
@@ -764,7 +764,6 @@
         self.params = {
             "Version": api_version,
             "SignatureVersion": "2",
-            "SignatureMethod": "HmacSHA256",
             "Action": self.action,
             "AWSAccessKeyId": self.creds.access_key,
             "Timestamp": iso8601time(time_tuple),
@@ -794,18 +793,38 @@
                                      self.get_canonical_query_params())
         return result
 
+    def old_signing_text(self):
+        """Return the text needed for signing using SignatureVersion 1."""
+        result = []
+        lower_cmp = lambda x, y: cmp(x[0].lower(), y[0].lower())
+        for key, value in sorted(self.params.items(), cmp=lower_cmp):
+            result.append("%s%s" % (key, value))
+        return "".join(result)
+
     def sorted_params(self):
         """Return the query parameters sorted appropriately for signing."""
         return sorted(self.params.items())
 
-    def sign(self):
+    def sign(self, hash_type="sha256"):
         """Sign this query using its built in credentials.
 
+        @param hash_type: if the SignatureVersion is 2, specify the type of
+            hash to use, either "sha1" or "sha256". It defaults to the latter.
+
         This prepares it to be sent, and should be done as the last step before
         submitting the query. Signing is done automatically - this is a public
         method to facilitate testing.
         """
-        self.params["Signature"] = self.creds.sign(self.signing_text())
+        version = self.params["SignatureVersion"]
+        if version == "1":
+            self.params["Signature"] = self.creds.sign(
+                self.old_signing_text(), "sha1")
+        elif version == "2":
+            self.params["SignatureMethod"] = "Hmac%s" % hash_type.upper()
+            self.params["Signature"] = self.creds.sign(
+                self.signing_text(), hash_type)
+        else:
+            raise RuntimeError("Unsupported SignatureVersion: '%s'" % version)
 
     def submit(self):
         """Submit this query.

=== modified file 'txaws/ec2/tests/test_client.py'
--- txaws/ec2/tests/test_client.py	2010-03-29 17:16:46 +0000
+++ txaws/ec2/tests/test_client.py	2010-05-14 14:49:22 +0000
@@ -1417,7 +1417,6 @@
             query.params,
             {"AWSAccessKeyId": "foo",
              "Action": "DescribeInstances",
-             "SignatureMethod": "HmacSHA256",
              "SignatureVersion": "2",
              "Version": "2008-12-01"})
 
@@ -1431,7 +1430,6 @@
             {"AWSAccessKeyId": "foo",
              "Action": "DescribeInstances",
              "InstanceId.0": "12345",
-             "SignatureMethod": "HmacSHA256",
              "SignatureVersion": "2",
              "Timestamp": "2007-11-12T13:14:15Z",
              "Version": "2008-12-01"})
@@ -1444,7 +1442,6 @@
         self.assertEqual([
             ("AWSAccessKeyId", "foo"),
             ("Action", "DescribeInstances"),
-            ("SignatureMethod", "HmacSHA256"),
             ("SignatureVersion", "2"),
             ("Timestamp", "2007-11-12T13:14:15Z"),
             ("Version", "2008-12-01"),
@@ -1475,7 +1472,7 @@
             time_tuple=(2007,11,12,13,14,15,0,0,0))
         expected_query = ("AWSAccessKeyId=foo&Action=DescribeInstances"
             "&InstanceId.1=i-1234"
-            "&SignatureMethod=HmacSHA256&SignatureVersion=2&"
+            "&SignatureVersion=2&"
             "Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01&"
             "argwithnovalue=&fu%20n=g%2Fames")
         self.assertEqual(expected_query, query.get_canonical_query_params())
@@ -1486,10 +1483,20 @@
             endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0))
         signing_text = ("GET\n%s\n/\n" % self.endpoint.host +
             "AWSAccessKeyId=foo&Action=DescribeInstances&"
-            "SignatureMethod=HmacSHA256&SignatureVersion=2&"
+            "SignatureVersion=2&"
             "Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01")
         self.assertEqual(signing_text, query.signing_text())
 
+    def test_old_signing_text(self):
+        query = client.Query(
+            action="DescribeInstances", creds=self.creds,
+            endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+            other_params={"SignatureVersion": "1"})
+        signing_text = (
+            "ActionDescribeInstancesAWSAccessKeyIdfooSignatureVersion1"
+            "Timestamp2007-11-12T13:14:15ZVersion2008-12-01")
+        self.assertEqual(signing_text, query.old_signing_text())
+
     def test_sign(self):
         query = client.Query(
             action="DescribeInstances", creds=self.creds,
@@ -1499,6 +1506,22 @@
         self.assertEqual("aDmLr0Ktjsmt17UJD/EZf6DrfKWT1JW0fq2FDUCOPic=",
             query.params["Signature"])
 
+    def test_old_sign(self):
+        query = client.Query(
+            action="DescribeInstances", creds=self.creds,
+            endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+            other_params={"SignatureVersion": "1"})
+        query.sign()
+        self.assertEqual(
+            "MBKyHoxqCd/lBQLVkCZYpwAtNJg=", query.params["Signature"])
+
+    def test_unsupported_sign(self):
+        query = client.Query(
+            action="DescribeInstances", creds=self.creds,
+            endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+            other_params={"SignatureVersion": "0"})
+        self.assertRaises(RuntimeError, query.sign)
+
     def test_submit_400(self):
         """A 4xx response status from EC2 should raise a txAWS EC2Error."""
         status = 400