txawsteam team mailing list archive
-
txawsteam team
-
Mailing list archive
-
Message #00069
[Merge] lp:~oubiwann/txaws/416109-arbitrary-endpoints into lp:txaws
Robert Collins has proposed merging lp:~oubiwann/txaws/416109-arbitrary-endpoints into lp:txaws.
--
https://code.launchpad.net/~oubiwann/txaws/416109-arbitrary-endpoints/+merge/10581
Your team txAWS Team is subscribed to branch lp:txaws.
=== modified file 'txaws/client/gui/gtk.py'
--- txaws/client/gui/gtk.py 2009-08-18 22:53:53 +0000
+++ txaws/client/gui/gtk.py 2009-08-20 19:14:32 +0000
@@ -8,7 +8,7 @@
import gobject
import gtk
-from txaws.credentials import AWSCredentials
+from txaws.ec2.service import EC2Service
__all__ = ['main']
@@ -27,10 +27,10 @@
# Nested import because otherwise we get 'reactor already installed'.
self.password_dialog = None
try:
- creds = AWSCredentials()
+ service = AWSService()
except ValueError:
- creds = self.from_gnomekeyring()
- self.create_client(creds)
+ service = self.from_gnomekeyring()
+ self.create_client(service)
menu = '''
<ui>
<menubar name="Menubar">
@@ -54,10 +54,10 @@
'/Menubar/Menu/Stop instances').props.parent
self.connect('popup-menu', self.on_popup_menu)
- def create_client(self, creds):
+ def create_client(self, service):
from txaws.ec2.client import EC2Client
- if creds is not None:
- self.client = EC2Client(creds=creds)
+ if service is not None:
+ self.client = EC2Client(service=service)
self.on_activate(None)
else:
# waiting on user entered credentials.
@@ -65,7 +65,7 @@
def from_gnomekeyring(self):
# Try for gtk gui specific credentials.
- creds = None
+ service = None
try:
items = gnomekeyring.find_items_sync(
gnomekeyring.ITEM_GENERIC_SECRET,
@@ -78,7 +78,7 @@
return None
else:
key_id, secret_key = items[0].secret.split(':')
- return AWSCredentials(access_key=key_id, secret_key=secret_key)
+ return EC2Service(access_key=key_id, secret_key=secret_key)
def show_a_password_dialog(self):
self.password_dialog = gtk.Dialog(
@@ -133,8 +133,8 @@
content = self.password_dialog.get_content_area()
key_id = content.get_children()[0].get_children()[1].get_text()
secret_key = content.get_children()[1].get_children()[1].get_text()
- creds = AWSCredentials(access_key=key_id, secret_key=secret_key)
- self.create_client(creds)
+ service = EC2Service(access_key=key_id, secret_key=secret_key)
+ self.create_client(service)
gnomekeyring.item_create_sync(
None,
gnomekeyring.ITEM_GENERIC_SECRET,
=== added file 'txaws/credentials.py'
--- txaws/credentials.py 1970-01-01 00:00:00 +0000
+++ txaws/credentials.py 2009-08-21 14:50:25 +0000
@@ -0,0 +1,42 @@
+# Copyright (C) 2009 Robert Collins <robertc@xxxxxxxxxxxxxxxxx>
+# Licenced under the txaws licence available at /LICENSE in the txaws source.
+
+"""Credentials for accessing AWS services."""
+
+import os
+
+from txaws.util import hmac_sha1
+
+
+__all__ = ['AWSCredentials']
+
+
+ENV_ACCESS_KEY = "AWS_ACCESS_KEY_ID"
+ENV_SECRET_KEY = "AWS_SECRET_ACCESS_KEY"
+
+
+class AWSCredentials(object):
+
+ def __init__(self, access_key="", secret_key=""):
+ """Create an AWSCredentials object.
+
+ @param access_key: The access key to use. If None the environment
+ variable AWS_ACCESS_KEY_ID is consulted.
+ @param secret_key: The secret key to use. If None the environment
+ variable AWS_SECRET_ACCESS_KEY is consulted.
+ """
+ self.access_key = access_key
+ self.secret_key = secret_key
+ if not self.access_key:
+ self.access_key = os.environ.get(ENV_ACCESS_KEY)
+ if not self.access_key:
+ raise ValueError("Could not find %s" % ENV_ACCESS_KEY)
+ # perform checks for secret key
+ if not self.secret_key:
+ self.secret_key = os.environ.get(ENV_SECRET_KEY)
+ if not self.secret_key:
+ raise ValueError("Could not find %s" % ENV_SECRET_KEY)
+
+ def sign(self, bytes):
+ """Sign some bytes."""
+ return hmac_sha1(self.secret_key, bytes)
=== removed file 'txaws/credentials.py'
--- txaws/credentials.py 2009-08-17 11:18:56 +0000
+++ txaws/credentials.py 1970-01-01 00:00:00 +0000
@@ -1,37 +0,0 @@
-# Copyright (C) 2009 Robert Collins <robertc@xxxxxxxxxxxxxxxxx>
-# Licenced under the txaws licence available at /LICENSE in the txaws source.
-
-"""Credentials for accessing AWS services."""
-
-import os
-
-from txaws.util import *
-
-
-__all__ = ['AWSCredentials']
-
-
-class AWSCredentials(object):
-
- def __init__(self, access_key=None, secret_key=None):
- """Create an AWSCredentials object.
-
- :param access_key: The access key to use. If None the environment
- variable AWS_ACCESS_KEY_ID is consulted.
- :param secret_key: The secret key to use. If None the environment
- variable AWS_SECRET_ACCESS_KEY is consulted.
- """
- self.secret_key = secret_key
- if self.secret_key is None:
- self.secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
- if self.secret_key is None:
- raise ValueError('Could not find AWS_SECRET_ACCESS_KEY')
- self.access_key = access_key
- if self.access_key is None:
- self.access_key = os.environ.get('AWS_ACCESS_KEY_ID')
- if self.access_key is None:
- raise ValueError('Could not find AWS_ACCESS_KEY_ID')
-
- def sign(self, bytes):
- """Sign some bytes."""
- return hmac_sha1(self.secret_key, bytes)
=== modified file 'txaws/ec2/client.py'
--- txaws/ec2/client.py 2009-08-21 03:26:35 +0000
+++ txaws/ec2/client.py 2009-08-21 14:50:25 +0000
@@ -8,7 +8,8 @@
from twisted.web.client import getPage
-from txaws import credentials
+from txaws.credentials import AWSCredentials
+from txaws.service import AWSServiceEndpoint
from txaws.util import iso8601time, XML
@@ -77,16 +78,16 @@
name_space = '{http://ec2.amazonaws.com/doc/2008-12-01/}'
- def __init__(self, creds=None, query_factory=None):
+ def __init__(self, creds=None, endpoint=None, query_factory=None):
"""Create an EC2Client.
- @param creds: Explicit credentials to use. If None, credentials are
- inferred as per txaws.credentials.AWSCredentials.
+ @param creds: User authentication credentials to use.
+ @param endpoint: The service URI.
+ @param query_factory: The class or function that produces a query
+ object for making requests to the EC2 service.
"""
- if creds is None:
- self.creds = credentials.AWSCredentials()
- else:
- self.creds = creds
+ self.creds = creds or AWSCredentials()
+ self.endpoint = endpoint or AWSServiceEndpoint()
if query_factory is None:
self.query_factory = Query
else:
@@ -177,7 +178,8 @@
instanceset = {}
for pos, instance_id in enumerate(instance_ids):
instanceset["InstanceId.%d" % (pos+1)] = instance_id
- q = self.query_factory('TerminateInstances', self.creds, instanceset)
+ q = self.query_factory('TerminateInstances', self.creds, self.endpoint,
+ instanceset)
d = q.submit()
return d.addCallback(self._parse_terminate_instances)
@@ -200,24 +202,24 @@
class Query(object):
"""A query that may be submitted to EC2."""
- def __init__(self, action, creds, other_params=None, time_tuple=None):
+ def __init__(self, action, creds, endpoint, other_params=None,
+ time_tuple=None):
"""Create a Query to submit to EC2."""
+ self.creds = creds
+ self.endpoint = endpoint
# Require params (2008-12-01 API):
# Version, SignatureVersion, SignatureMethod, Action, AWSAccessKeyId,
# Timestamp || Expires, Signature,
- self.params = {'Version': '2008-12-01',
+ self.params = {
+ 'Version': '2008-12-01',
'SignatureVersion': '2',
'SignatureMethod': 'HmacSHA1',
'Action': action,
- 'AWSAccessKeyId': creds.access_key,
+ 'AWSAccessKeyId': self.creds.access_key,
'Timestamp': iso8601time(time_tuple),
}
if other_params:
self.params.update(other_params)
- self.method = 'GET'
- self.host = 'ec2.amazonaws.com'
- self.uri = '/'
- self.creds = creds
def canonical_query_params(self):
"""Return the canonical query params (used in signing)."""
@@ -230,18 +232,19 @@
"""Encode a_string as per the canonicalisation encoding rules.
See the AWS dev reference page 90 (2008-12-01 version).
- :return: a_string encoded.
+ @return: a_string encoded.
"""
return quote(a_string, safe='~')
def signing_text(self):
"""Return the text to be signed when signing the query."""
- result = "%s\n%s\n%s\n%s" % (self.method, self.host, self.uri,
- self.canonical_query_params())
+ result = "%s\n%s\n%s\n%s" % (self.endpoint.method, self.endpoint.host,
+ self.endpoint.path,
+ self.canonical_query_params())
return result
def sign(self):
- """Sign this query using its built in credentials.
+ """Sign this query using its built in service.
This prepares it to be sent, and should be done as the last step before
submitting the query. Signing is done automatically - this is a public
@@ -256,9 +259,9 @@
def submit(self):
"""Submit this query.
- :return: A deferred from twisted.web.client.getPage
+ @return: A deferred from twisted.web.client.getPage
"""
self.sign()
- url = 'http://%s%s?%s' % (self.host, self.uri,
- self.canonical_query_params())
- return getPage(url, method=self.method)
+ url = "%s?%s" % (self.endpoint.get_uri(),
+ self.canonical_query_params())
+ return getPage(url, method=self.service.method)
=== modified file 'txaws/ec2/tests/test_client.py'
--- txaws/ec2/tests/test_client.py 2009-08-21 03:26:35 +0000
+++ txaws/ec2/tests/test_client.py 2009-08-21 14:50:25 +0000
@@ -7,6 +7,7 @@
from txaws.credentials import AWSCredentials
from txaws.ec2 import client
+from txaws.service import AWSServiceEndpoint, EC2_ENDPOINT_US
from txaws.tests import TXAWSTestCase
@@ -117,7 +118,7 @@
self.assertEquals(instance.ramdisk_id, "id4")
-class TestEC2Client(TXAWSTestCase):
+class EC2ClientTestCase(TXAWSTestCase):
def test_init_no_creds(self):
os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo'
@@ -129,7 +130,7 @@
self.assertRaises(ValueError, client.EC2Client)
def test_init_explicit_creds(self):
- creds = 'foo'
+ creds = AWSCredentials("foo", "bar")
ec2 = client.EC2Client(creds=creds)
self.assertEqual(creds, ec2.creds)
@@ -162,7 +163,8 @@
def test_parse_reservation(self):
- ec2 = client.EC2Client(creds='foo')
+ creds = AWSCredentials("foo", "bar")
+ ec2 = client.EC2Client(creds=creds)
results = ec2._parse_instances(sample_describe_instances_result)
self.check_parsed_instances(results)
@@ -170,25 +172,31 @@
class StubQuery(object):
def __init__(stub, action, creds):
self.assertEqual(action, 'DescribeInstances')
- self.assertEqual('foo', creds)
+ self.assertEqual(creds.access_key, "foo")
+ self.assertEqual(creds.secret_key, "bar")
def submit(self):
return succeed(sample_describe_instances_result)
- ec2 = client.EC2Client(creds='foo', query_factory=StubQuery)
+ creds = AWSCredentials("foo", "bar")
+ ec2 = client.EC2Client(creds, query_factory=StubQuery)
d = ec2.describe_instances()
d.addCallback(self.check_parsed_instances)
return d
def test_terminate_instances(self):
class StubQuery(object):
- def __init__(stub, action, creds, other_params):
+ def __init__(stub, action, creds, endpoint, other_params):
self.assertEqual(action, 'TerminateInstances')
- self.assertEqual('foo', creds)
+ self.assertEqual(creds.access_key, "foo")
+ self.assertEqual(creds.secret_key, "bar")
self.assertEqual(
{'InstanceId.1': 'i-1234', 'InstanceId.2': 'i-5678'},
other_params)
def submit(self):
return succeed(sample_terminate_instances_result)
- ec2 = client.EC2Client(creds='foo', query_factory=StubQuery)
+ creds = AWSCredentials("foo", "bar")
+ endpoint = AWSServiceEndpoint(uri=EC2_ENDPOINT_US)
+ ec2 = client.EC2Client(creds=creds, endpoint=endpoint,
+ query_factory=StubQuery)
d = ec2.terminate_instances('i-1234', 'i-5678')
def check_transition(changes):
self.assertEqual([('i-1234', 'running', 'shutting-down'),
@@ -196,14 +204,15 @@
return d
-class TestQuery(TXAWSTestCase):
+class QueryTestCase(TXAWSTestCase):
def setUp(self):
TXAWSTestCase.setUp(self)
self.creds = AWSCredentials('foo', 'bar')
+ self.endpoint = AWSServiceEndpoint(uri=EC2_ENDPOINT_US)
def test_init_minimum(self):
- query = client.Query('DescribeInstances', self.creds)
+ query = client.Query('DescribeInstances', self.creds, self.endpoint)
self.assertTrue('Timestamp' in query.params)
del query.params['Timestamp']
self.assertEqual(
@@ -221,7 +230,7 @@
self.assertRaises(TypeError, client.Query, None)
def test_init_other_args_are_params(self):
- query = client.Query('DescribeInstances', self.creds,
+ query = client.Query('DescribeInstances', self.creds, self.endpoint,
{'InstanceId.0': '12345'},
time_tuple=(2007,11,12,13,14,15,0,0,0))
self.assertEqual(
@@ -235,7 +244,7 @@
query.params)
def test_sorted_params(self):
- query = client.Query('DescribeInstances', self.creds,
+ query = client.Query('DescribeInstances', self.creds, self.endpoint,
{'fun': 'games'},
time_tuple=(2007,11,12,13,14,15,0,0,0))
self.assertEqual([
@@ -251,16 +260,16 @@
def test_encode_unreserved(self):
all_unreserved = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'abcdefghijklmnopqrstuvwxyz0123456789-_.~')
- query = client.Query('DescribeInstances', self.creds)
+ query = client.Query('DescribeInstances', self.creds, self.endpoint)
self.assertEqual(all_unreserved, query.encode(all_unreserved))
def test_encode_space(self):
"""This may be just 'url encode', but the AWS manual isn't clear."""
- query = client.Query('DescribeInstances', self.creds)
+ query = client.Query('DescribeInstances', self.creds, self.endpoint)
self.assertEqual('a%20space', query.encode('a space'))
def test_canonical_query(self):
- query = client.Query('DescribeInstances', self.creds,
+ query = client.Query('DescribeInstances', self.creds, self.endpoint,
{'fu n': 'g/ames', 'argwithnovalue':'',
'InstanceId.1': 'i-1234'},
time_tuple=(2007,11,12,13,14,15,0,0,0))
@@ -272,17 +281,17 @@
self.assertEqual(expected_query, query.canonical_query_params())
def test_signing_text(self):
- query = client.Query('DescribeInstances', self.creds,
+ query = client.Query('DescribeInstances', self.creds, self.endpoint,
time_tuple=(2007,11,12,13,14,15,0,0,0))
- signing_text = ('GET\nec2.amazonaws.com\n/\n'
+ signing_text = ('GET\n%s\n/\n' % self.endpoint.host +
'AWSAccessKeyId=foo&Action=DescribeInstances&'
'SignatureMethod=HmacSHA1&SignatureVersion=2&'
'Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01')
self.assertEqual(signing_text, query.signing_text())
def test_sign(self):
- query = client.Query('DescribeInstances', self.creds,
+ query = client.Query('DescribeInstances', self.creds, self.endpoint,
time_tuple=(2007,11,12,13,14,15,0,0,0))
query.sign()
- self.assertEqual('4hEtLuZo9i6kuG3TOXvRQNOrE/U=',
+ self.assertEqual('JuCpwFA2H4OVF3Ql/lAQs+V6iMc=',
query.params['Signature'])
=== added file 'txaws/ec2/tests/test_service.py'
=== added file 'txaws/service.py'
--- txaws/service.py 1970-01-01 00:00:00 +0000
+++ txaws/service.py 2009-08-21 20:50:36 +0000
@@ -0,0 +1,94 @@
+# Copyright (C) 2009 Duncan McGreggor <duncan@xxxxxxxxxxxxx>
+# Copyright (C) 2009 Robert Collins <robertc@xxxxxxxxxxxxxxxxx>
+# Licenced under the txaws licence available at /LICENSE in the txaws source.
+
+import os
+
+from twisted.web.client import _parse
+
+
+
+__all__ = ["AWSServiceEndpoint", "AWSServiceRegion"]
+
+
+REGION_US = "US"
+REGION_EU = "EU"
+EC2_ENDPOINT_US = "https://us-east-1.ec2.amazonaws.com/"
+EC2_ENDPOINT_EU = "https://eu-west-1.ec2.amazonaws.com/"
+DEFAULT_PORT = 80
+
+
+class AWSServiceEndpoint(object):
+ """
+ @param uri: The URL for the service.
+ @param method: The HTTP method used when accessing a service.
+ """
+
+ def __init__(self, uri="", method="GET"):
+ self.host = ""
+ self.port = DEFAULT_PORT
+ self.path = "/"
+ self.method = method
+ self._parse_uri(uri)
+ if not self.scheme:
+ self.scheme = "http"
+
+ def _parse_uri(self, uri):
+ scheme, host, port, path = _parse(
+ str(uri), defaultPort=DEFAULT_PORT)
+ self.scheme = scheme
+ self.host = host
+ self.port = port
+ self.path = path
+
+ def set_path(self, path):
+ self.path = path
+
+ def get_uri(self):
+ """Get a URL representation of the service."""
+ uri = "%s://%s" % (self.scheme, self.host)
+ if self.port and self.port != DEFAULT_PORT:
+ uri = "%s:%s" % (uri, self.port)
+ return uri + self.path
+
+
+class AWSServiceRegion(object):
+ """
+ This object represents a collection of client factories that use the same
+ credentials. With Amazon, this collection is associated with a region
+ (e.g., US or EU).
+ """
+ def __init__(self, creds=None, region=REGION_US):
+ self.creds = creds
+ self._clients = {}
+ if region == REGION_US:
+ ec2_endpoint = EC2_ENDPOINT_US
+ elif region == REGION_EU:
+ ec2_endpoint = EC2_ENDPOINT_EU
+ self.ec2_endpoint = AWSServiceEndpoint(uri=ec2_endpoint)
+
+ def get_client(self, cls, *args, **kwds):
+ key = str(cls) + str(args) + str(kwds)
+ instance = self._clients.get(key)
+ if not instance:
+ instance = cls(*args, **kwds)
+ self._clients[key] = instance
+ return instance
+
+ def get_ec2_client(self, creds=None):
+ from txaws.ec2.client import EC2Client
+
+ if creds:
+ self.creds = creds
+ return self.get_client(EC2Client, creds=creds,
+ endpoint=self.ec2_endpoint, query_factory=None)
+
+ def get_s3_client(self):
+ raise NotImplementedError
+
+ def get_simpledb_client(self):
+ raise NotImplementedError
+
+ def get_sqs_client(self):
+ raise NotImplementedError
+
=== modified file 'txaws/storage/client.py'
--- txaws/storage/client.py 2009-08-20 12:15:12 +0000
+++ txaws/storage/client.py 2009-08-21 14:50:25 +0000
@@ -25,19 +25,18 @@
class S3Request(object):
def __init__(self, verb, bucket=None, object_name=None, data='',
- content_type=None,
- metadata={}, root_uri='https://s3.amazonaws.com', creds=None):
+ content_type=None, metadata={}, creds=None, endpoint=None):
self.verb = verb
self.bucket = bucket
self.object_name = object_name
self.data = data
self.content_type = content_type
self.metadata = metadata
- self.root_uri = root_uri
self.creds = creds
+ self.endpoint = endpoint or self.get_uri()
self.date = datetimeToString()
- def get_uri_path(self):
+ def get_path(self):
path = '/'
if self.bucket is not None:
path += self.bucket
@@ -46,7 +45,8 @@
return path
def get_uri(self):
- return self.root_uri + self.get_uri_path()
+ self.endpoint.set_path(self.get_path())
+ return self.endpoint.get_uri()
def get_headers(self):
headers = {'Content-Length': len(self.data),
@@ -66,7 +66,7 @@
return headers
def get_canonicalized_resource(self):
- return self.get_uri_path()
+ return self.get_path()
def get_canonicalized_amz_headers(self, headers):
result = ''
@@ -76,12 +76,12 @@
return ''.join('%s:%s\n' % (name, value) for name, value in headers)
def get_signature(self, headers):
- text = self.verb + '\n'
- text += headers.get('Content-MD5', '') + '\n'
- text += headers.get('Content-Type', '') + '\n'
- text += headers.get('Date', '') + '\n'
- text += self.get_canonicalized_amz_headers(headers)
- text += self.get_canonicalized_resource()
+ text = (self.verb + '\n' +
+ headers.get('Content-MD5', '') + '\n' +
+ headers.get('Content-Type', '') + '\n' +
+ headers.get('Date', '') + '\n' +
+ self.get_canonicalized_amz_headers(headers) +
+ self.get_canonicalized_resource())
return self.creds.sign(text)
def submit(self):
@@ -94,20 +94,21 @@
class S3(object):
- root_uri = 'https://s3.amazonaws.com/'
request_factory = S3Request
- def __init__(self, creds):
+ def __init__(self, creds, endpoint):
self.creds = creds
+ self.endpoint = endpoint
def make_request(self, *a, **kw):
"""
Create a request with the arguments passed in.
- This uses the request_factory attribute, adding the credentials to the
- arguments passed in.
+ This uses the request_factory attribute, adding the creds and endpoint
+ to the arguments passed in.
"""
- return self.request_factory(creds=self.creds, *a, **kw)
+ return self.request_factory(creds=self.creds, endpoint=self.endpoint,
+ *a, **kw)
def _parse_bucket_list(self, response):
"""
=== modified file 'txaws/storage/tests/test_client.py'
--- txaws/storage/tests/test_client.py 2009-08-20 12:15:12 +0000
+++ txaws/storage/tests/test_client.py 2009-08-21 14:50:25 +0000
@@ -5,6 +5,7 @@
from twisted.internet.defer import succeed
from txaws.credentials import AWSCredentials
+from txaws.service import AWSServiceEndpoint
from txaws.storage.client import S3, S3Request
from txaws.tests import TXAWSTestCase
from txaws.util import calculate_md5
@@ -18,10 +19,10 @@
return succeed('')
-class RequestTests(TXAWSTestCase):
+class RequestTestCase(TXAWSTestCase):
- creds = AWSCredentials(access_key='0PN5J17HBGZHT7JJ3X82',
- secret_key='uV3F3YluFJax1cknvbcGwgjvx4QpvB+leU8dUj2o')
+ creds = AWSCredentials(access_key='fookeyid', secret_key='barsecretkey')
+ endpoint = AWSServiceEndpoint("https://s3.amazonaws.com/")
def test_objectRequest(self):
"""
@@ -31,18 +32,22 @@
DIGEST = 'zhdB6gwvocWv/ourYUWMxA=='
request = S3Request('PUT', 'somebucket', 'object/name/here', DATA,
- content_type='text/plain', metadata={'foo': 'bar'})
+ content_type='text/plain', metadata={'foo': 'bar'},
+ creds=self.creds, endpoint=self.endpoint)
+ request.get_signature = lambda headers: "TESTINGSIG="
self.assertEqual(request.verb, 'PUT')
self.assertEqual(
request.get_uri(),
'https://s3.amazonaws.com/somebucket/object/name/here')
headers = request.get_headers()
self.assertNotEqual(headers.pop('Date'), '')
- self.assertEqual(headers,
- {'Content-Type': 'text/plain',
- 'Content-Length': len(DATA),
- 'Content-MD5': DIGEST,
- 'x-amz-meta-foo': 'bar'})
+ self.assertEqual(
+ headers, {
+ 'Authorization': 'AWS fookeyid:TESTINGSIG=',
+ 'Content-Type': 'text/plain',
+ 'Content-Length': len(DATA),
+ 'Content-MD5': DIGEST,
+ 'x-amz-meta-foo': 'bar'})
self.assertEqual(request.data, 'objectData')
def test_bucketRequest(self):
@@ -51,22 +56,27 @@
"""
DIGEST = '1B2M2Y8AsgTpgAmY7PhCfg=='
- request = S3Request('GET', 'somebucket')
+ request = S3Request('GET', 'somebucket', creds=self.creds,
+ endpoint=self.endpoint)
+ request.get_signature = lambda headers: "TESTINGSIG="
self.assertEqual(request.verb, 'GET')
self.assertEqual(
request.get_uri(), 'https://s3.amazonaws.com/somebucket')
headers = request.get_headers()
self.assertNotEqual(headers.pop('Date'), '')
- self.assertEqual(headers,
- {'Content-Length': 0,
- 'Content-MD5': DIGEST})
+ self.assertEqual(
+ headers, {
+ 'Authorization': 'AWS fookeyid:TESTINGSIG=',
+ 'Content-Length': 0,
+ 'Content-MD5': DIGEST})
self.assertEqual(request.data, '')
def test_submit(self):
"""
Submitting the request should invoke getPage correctly.
"""
- request = StubbedS3Request('GET', 'somebucket')
+ request = StubbedS3Request('GET', 'somebucket', creds=self.creds,
+ endpoint=self.endpoint)
def _postCheck(result):
self.assertEqual(result, '')
@@ -80,13 +90,14 @@
return request.submit().addCallback(_postCheck)
def test_authenticationTestCases(self):
- req = S3Request('GET', creds=self.creds)
- req.date = 'Wed, 28 Mar 2007 01:29:59 +0000'
+ request = S3Request('GET', creds=self.creds, endpoint=self.endpoint)
+ request.get_signature = lambda headers: "TESTINGSIG="
+ request.date = 'Wed, 28 Mar 2007 01:29:59 +0000'
- headers = req.get_headers()
+ headers = request.get_headers()
self.assertEqual(
headers['Authorization'],
- 'AWS 0PN5J17HBGZHT7JJ3X82:jF7L3z/FTV47vagZzhKupJ9oNig=')
+ 'AWS fookeyid:TESTINGSIG=')
class InertRequest(S3Request):
@@ -153,16 +164,18 @@
TXAWSTestCase.setUp(self)
self.creds = AWSCredentials(
access_key='accessKey', secret_key='secretKey')
- self.s3 = TestableS3(creds=self.creds)
+ self.endpoint = AWSServiceEndpoint()
+ self.s3 = TestableS3(creds=self.creds, endpoint=self.endpoint)
def test_make_request(self):
"""
- Test that make_request passes in the service credentials.
+ Test that make_request passes in the credentials object.
"""
marker = object()
def _cb(*a, **kw):
self.assertEqual(kw['creds'], self.creds)
+ self.assertEqual(kw['endpoint'], self.endpoint)
return marker
self.s3.request_factory = _cb
=== added file 'txaws/tests/test_credentials.py'
--- txaws/tests/test_credentials.py 1970-01-01 00:00:00 +0000
+++ txaws/tests/test_credentials.py 2009-08-21 14:50:25 +0000
@@ -0,0 +1,53 @@
+# Copyright (C) 2009 Robert Collins <robertc@xxxxxxxxxxxxxxxxx>
+# Licenced under the txaws licence available at /LICENSE in the txaws source.
+
+import os
+
+from twisted.trial.unittest import TestCase
+
+from txaws.credentials import AWSCredentials, ENV_ACCESS_KEY, ENV_SECRET_KEY
+from txaws.tests import TXAWSTestCase
+
+from txaws.tests import TXAWSTestCase
+
+
+class TestCredentials(TXAWSTestCase):
+
+ def setUp(self):
+ self.addCleanup(self.clean_environment)
+
+ def clean_environment(self):
+ if os.environ.has_key(ENV_ACCESS_KEY):
+ del os.environ[ENV_ACCESS_KEY]
+ if os.environ.has_key(ENV_SECRET_KEY):
+ del os.environ[ENV_SECRET_KEY]
+
+ def test_no_access_errors(self):
+ # Without anything in os.environ, AWSService() blows up
+ os.environ[ENV_SECRET_KEY] = "bar"
+ self.assertRaises(ValueError, AWSCredentials)
+
+ def test_no_secret_errors(self):
+ # Without anything in os.environ, AWSService() blows up
+ os.environ[ENV_ACCESS_KEY] = "foo"
+ self.assertRaises(ValueError, AWSCredentials)
+
+ def test_found_values_used(self):
+ os.environ[ENV_ACCESS_KEY] = "foo"
+ os.environ[ENV_SECRET_KEY] = "bar"
+ service = AWSCredentials()
+ self.assertEqual("foo", service.access_key)
+ self.assertEqual("bar", service.secret_key)
+ self.clean_environment()
+
+ def test_explicit_access_key(self):
+ os.environ[ENV_SECRET_KEY] = "foo"
+ service = AWSCredentials(access_key="bar")
+ self.assertEqual("foo", service.secret_key)
+ self.assertEqual("bar", service.access_key)
+
+ def test_explicit_secret_key(self):
+ os.environ[ENV_ACCESS_KEY] = "bar"
+ service = AWSCredentials(secret_key="foo")
+ self.assertEqual("foo", service.secret_key)
+ self.assertEqual("bar", service.access_key)
=== removed file 'txaws/tests/test_credentials.py'
--- txaws/tests/test_credentials.py 2009-08-20 12:15:12 +0000
+++ txaws/tests/test_credentials.py 1970-01-01 00:00:00 +0000
@@ -1,41 +0,0 @@
-# Copyright (C) 2009 Robert Collins <robertc@xxxxxxxxxxxxxxxxx>
-# Licenced under the txaws licence available at /LICENSE in the txaws source.
-
-import os
-
-from twisted.trial.unittest import TestCase
-
-from txaws.credentials import AWSCredentials
-from txaws.tests import TXAWSTestCase
-
-
-class TestCredentials(TXAWSTestCase):
-
- def test_no_access_errors(self):
- # Without anything in os.environ, AWSCredentials() blows up
- os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo'
- self.assertRaises(Exception, AWSCredentials)
-
- def test_no_secret_errors(self):
- # Without anything in os.environ, AWSCredentials() blows up
- os.environ['AWS_ACCESS_KEY_ID'] = 'bar'
- self.assertRaises(Exception, AWSCredentials)
-
- def test_found_values_used(self):
- os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo'
- os.environ['AWS_ACCESS_KEY_ID'] = 'bar'
- creds = AWSCredentials()
- self.assertEqual('foo', creds.secret_key)
- self.assertEqual('bar', creds.access_key)
-
- def test_explicit_access_key(self):
- os.environ['AWS_SECRET_ACCESS_KEY'] = 'foo'
- creds = AWSCredentials(access_key='bar')
- self.assertEqual('foo', creds.secret_key)
- self.assertEqual('bar', creds.access_key)
-
- def test_explicit_secret_key(self):
- os.environ['AWS_ACCESS_KEY_ID'] = 'bar'
- creds = AWSCredentials(secret_key='foo')
- self.assertEqual('foo', creds.secret_key)
- self.assertEqual('bar', creds.access_key)
=== added file 'txaws/tests/test_service.py'
--- txaws/tests/test_service.py 1970-01-01 00:00:00 +0000
+++ txaws/tests/test_service.py 2009-08-21 21:13:27 +0000
@@ -0,0 +1,105 @@
+# Copyright (C) 2009 Duncan McGreggor <duncan@xxxxxxxxxxxxx>
+# Licenced under the txaws licence available at /LICENSE in the txaws source.
+
+import os
+
+from txaws.credentials import AWSCredentials
+from txaws.ec2.client import EC2Client
+from txaws.service import AWSServiceEndpoint, AWSServiceRegion, EC2_ENDPOINT_US
+from txaws.tests import TXAWSTestCase
+
+class AWSServiceEndpointTestCase(TXAWSTestCase):
+
+ def setUp(self):
+ self.endpoint = AWSServiceEndpoint(uri="http://my.service/da_endpoint")
+
+ def test_simple_creation(self):
+ endpoint = AWSServiceEndpoint()
+ self.assertEquals(endpoint.scheme, "http")
+ self.assertEquals(endpoint.host, "")
+ self.assertEquals(endpoint.port, 80)
+ self.assertEquals(endpoint.path, "/")
+ self.assertEquals(endpoint.method, "GET")
+
+ def test_parse_uri(self):
+ self.assertEquals(self.endpoint.scheme, "http")
+ self.assertEquals(self.endpoint.host, "my.service")
+ self.assertEquals(self.endpoint.port, 80)
+ self.assertEquals(self.endpoint.path, "/da_endpoint")
+
+ def test_parse_uri_https_and_custom_port(self):
+ endpoint = AWSServiceEndpoint(uri="https://my.service:8080/endpoint")
+ self.assertEquals(endpoint.scheme, "https")
+ self.assertEquals(endpoint.host, "my.service")
+ self.assertEquals(endpoint.port, 8080)
+ self.assertEquals(endpoint.path, "/endpoint")
+
+ def test_custom_method(self):
+ endpoint = AWSServiceEndpoint(uri="http://service/endpoint",
+ method="PUT")
+ self.assertEquals(endpoint.method, "PUT")
+
+ def test_get_uri(self):
+ uri = self.endpoint.get_uri()
+ self.assertEquals(uri, "http://my.service/da_endpoint")
+
+ def test_get_uri_custom_port(self):
+ uri = "https://my.service:8080/endpoint"
+ endpoint = AWSServiceEndpoint(uri=uri)
+ new_uri = endpoint.get_uri()
+ self.assertEquals(new_uri, uri)
+
+ def test_set_path(self):
+ original_path = self.endpoint.path
+ self.endpoint.set_path("/newpath")
+ self.assertEquals(
+ self.endpoint.get_uri(),
+ "http://my.service/newpath")
+
+
+class AWSServiceRegionTestCase(TXAWSTestCase):
+
+ def setUp(self):
+ self.creds = AWSCredentials("foo", "bar")
+ self.region = AWSServiceRegion(creds=self.creds)
+
+ def test_simple_creation(self):
+ self.assertEquals(self.creds, self.region.creds)
+ self.assertEquals(self.region._clients, {})
+ self.assertEquals(self.region.ec2_endpoint.get_uri(), EC2_ENDPOINT_US)
+
+ def test_get_client_with_empty_cache(self):
+ key = str(EC2Client) + str(self.creds) + str(self.region.ec2_endpoint)
+ original_client = self.region._clients.get(key)
+ new_client = self.region.get_client(
+ EC2Client, self.creds, self.region.ec2_endpoint)
+ self.assertEquals(original_client, None)
+ self.assertNotEquals(original_client, new_client)
+ self.assertTrue(isinstance(new_client, EC2Client))
+
+ def test_get_client_from_cache(self):
+ client1 = self.region.get_client(
+ EC2Client, self.creds, self.region.ec2_endpoint)
+ client2 = self.region.get_client(
+ EC2Client, self.creds, self.region.ec2_endpoint)
+ self.assertTrue(isinstance(client1, EC2Client))
+ self.assertTrue(isinstance(client2, EC2Client))
+ self.assertEquals(client2, client2)
+
+ def test_get_ec2_client_from_cache(self):
+ client1 = self.region.get_ec2_client(self.creds)
+ client2 = self.region.get_ec2_client(self.creds)
+ self.assertEquals(self.creds, self.region.creds)
+ self.assertTrue(isinstance(client1, EC2Client))
+ self.assertTrue(isinstance(client2, EC2Client))
+ self.assertEquals(client2, client2)
+
+
+ def test_get_s3_client(self):
+ self.assertRaises(NotImplementedError, self.region.get_s3_client)
+
+ def test_get_simpledb_client(self):
+ self.assertRaises(NotImplementedError, self.region.get_simpledb_client)
+
+ def test_get_sqs_client(self):
+ self.assertRaises(NotImplementedError, self.region.get_sqs_client)
Follow ups