txaws-dev team mailing list archive
-
txaws-dev team
-
Mailing list archive
-
Message #00007
[Merge] lp:~therve/txaws/old-signature into lp:txaws
Thomas Herve has proposed merging lp:~therve/txaws/old-signature into lp:txaws.
Requested reviews:
txAWS Developers (txaws-dev)
Related bugs:
#580544 Support SignatureVersion 1
https://bugs.launchpad.net/bugs/580544
--
https://code.launchpad.net/~therve/txaws/old-signature/+merge/25335
Your team txAWS Developers is requested to review the proposed merge of lp:~therve/txaws/old-signature into lp:txaws.
=== modified file 'txaws/credentials.py'
--- txaws/credentials.py 2009-11-05 22:19:12 +0000
+++ txaws/credentials.py 2010-05-14 14:49:22 +0000
@@ -44,3 +44,5 @@
return hmac_sha256(self.secret_key, bytes)
elif hash_type == "sha1":
return hmac_sha1(self.secret_key, bytes)
+ else:
+ raise RuntimeError("Unsupported hash type: '%s'" % hash_type)
=== modified file 'txaws/ec2/client.py'
--- txaws/ec2/client.py 2010-03-24 18:26:32 +0000
+++ txaws/ec2/client.py 2010-05-14 14:49:22 +0000
@@ -764,7 +764,6 @@
self.params = {
"Version": api_version,
"SignatureVersion": "2",
- "SignatureMethod": "HmacSHA256",
"Action": self.action,
"AWSAccessKeyId": self.creds.access_key,
"Timestamp": iso8601time(time_tuple),
@@ -794,18 +793,38 @@
self.get_canonical_query_params())
return result
+ def old_signing_text(self):
+ """Return the text needed for signing using SignatureVersion 1."""
+ result = []
+ lower_cmp = lambda x, y: cmp(x[0].lower(), y[0].lower())
+ for key, value in sorted(self.params.items(), cmp=lower_cmp):
+ result.append("%s%s" % (key, value))
+ return "".join(result)
+
def sorted_params(self):
"""Return the query parameters sorted appropriately for signing."""
return sorted(self.params.items())
- def sign(self):
+ def sign(self, hash_type="sha256"):
"""Sign this query using its built in credentials.
+ @param hash_type: if the SignatureVersion is 2, specify the type of
+ hash to use, either "sha1" or "sha256". It defaults to the latter.
+
This prepares it to be sent, and should be done as the last step before
submitting the query. Signing is done automatically - this is a public
method to facilitate testing.
"""
- self.params["Signature"] = self.creds.sign(self.signing_text())
+ version = self.params["SignatureVersion"]
+ if version == "1":
+ self.params["Signature"] = self.creds.sign(
+ self.old_signing_text(), "sha1")
+ elif version == "2":
+ self.params["SignatureMethod"] = "Hmac%s" % hash_type.upper()
+ self.params["Signature"] = self.creds.sign(
+ self.signing_text(), hash_type)
+ else:
+ raise RuntimeError("Unsupported SignatureVersion: '%s'" % version)
def submit(self):
"""Submit this query.
=== modified file 'txaws/ec2/tests/test_client.py'
--- txaws/ec2/tests/test_client.py 2010-03-29 17:16:46 +0000
+++ txaws/ec2/tests/test_client.py 2010-05-14 14:49:22 +0000
@@ -1417,7 +1417,6 @@
query.params,
{"AWSAccessKeyId": "foo",
"Action": "DescribeInstances",
- "SignatureMethod": "HmacSHA256",
"SignatureVersion": "2",
"Version": "2008-12-01"})
@@ -1431,7 +1430,6 @@
{"AWSAccessKeyId": "foo",
"Action": "DescribeInstances",
"InstanceId.0": "12345",
- "SignatureMethod": "HmacSHA256",
"SignatureVersion": "2",
"Timestamp": "2007-11-12T13:14:15Z",
"Version": "2008-12-01"})
@@ -1444,7 +1442,6 @@
self.assertEqual([
("AWSAccessKeyId", "foo"),
("Action", "DescribeInstances"),
- ("SignatureMethod", "HmacSHA256"),
("SignatureVersion", "2"),
("Timestamp", "2007-11-12T13:14:15Z"),
("Version", "2008-12-01"),
@@ -1475,7 +1472,7 @@
time_tuple=(2007,11,12,13,14,15,0,0,0))
expected_query = ("AWSAccessKeyId=foo&Action=DescribeInstances"
"&InstanceId.1=i-1234"
- "&SignatureMethod=HmacSHA256&SignatureVersion=2&"
+ "&SignatureVersion=2&"
"Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01&"
"argwithnovalue=&fu%20n=g%2Fames")
self.assertEqual(expected_query, query.get_canonical_query_params())
@@ -1486,10 +1483,20 @@
endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0))
signing_text = ("GET\n%s\n/\n" % self.endpoint.host +
"AWSAccessKeyId=foo&Action=DescribeInstances&"
- "SignatureMethod=HmacSHA256&SignatureVersion=2&"
+ "SignatureVersion=2&"
"Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01")
self.assertEqual(signing_text, query.signing_text())
+ def test_old_signing_text(self):
+ query = client.Query(
+ action="DescribeInstances", creds=self.creds,
+ endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+ other_params={"SignatureVersion": "1"})
+ signing_text = (
+ "ActionDescribeInstancesAWSAccessKeyIdfooSignatureVersion1"
+ "Timestamp2007-11-12T13:14:15ZVersion2008-12-01")
+ self.assertEqual(signing_text, query.old_signing_text())
+
def test_sign(self):
query = client.Query(
action="DescribeInstances", creds=self.creds,
@@ -1499,6 +1506,22 @@
self.assertEqual("aDmLr0Ktjsmt17UJD/EZf6DrfKWT1JW0fq2FDUCOPic=",
query.params["Signature"])
+ def test_old_sign(self):
+ query = client.Query(
+ action="DescribeInstances", creds=self.creds,
+ endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+ other_params={"SignatureVersion": "1"})
+ query.sign()
+ self.assertEqual(
+ "MBKyHoxqCd/lBQLVkCZYpwAtNJg=", query.params["Signature"])
+
+ def test_unsupported_sign(self):
+ query = client.Query(
+ action="DescribeInstances", creds=self.creds,
+ endpoint=self.endpoint, time_tuple=(2007,11,12,13,14,15,0,0,0),
+ other_params={"SignatureVersion": "0"})
+ self.assertRaises(RuntimeError, query.sign)
+
def test_submit_400(self):
"""A 4xx response status from EC2 should raise a txAWS EC2Error."""
status = 400