txawsteam team mailing list archive
-
txawsteam team
-
Mailing list archive
-
Message #00047
[Merge] lp:~statik/txaws/here-have-some-s4 into lp:txaws
Elliot Murphy has proposed merging lp:~statik/txaws/here-have-some-s4 into lp:txaws.
Requested reviews:
txAWS Team (txawsteam)
The ubuntu one team would like to contribute the S4 (Simple Storage Service Simulator) code that we use as a stub to test against when developing software that relies on S3.
--
https://code.launchpad.net/~statik/txaws/here-have-some-s4/+merge/10388
Your team txAWS Team is subscribed to branch lp:txaws.
=== modified file 'README'
--- README 2009-04-26 08:32:36 +0000
+++ README 2009-08-19 14:36:56 +0000
@@ -7,6 +7,10 @@
* The epsilon python package (python-epsilon on Debian or similar systems)
+* The S4 test server has a dependency on boto (python-boto) on Debian or similar)
+ This dependency should go away in favor of using txaws infrastructure (s4 was
+ originally developed separately from txaws)
+
Things present here
-------------------
=== added directory 'txaws/s4'
=== added file 'txaws/s4/README'
--- txaws/s4/README 1970-01-01 00:00:00 +0000
+++ txaws/s4/README 2009-08-19 14:36:56 +0000
@@ -0,0 +1,30 @@
+S4 - a S3 storage system stub
+=============================
+
+the server comes with some sample scripts so you can see how to use it.
+
+Using twistd
+------------
+
+to start: ./start-s4.sh
+to stop: ./stop-s4.sh
+
+the sample S4.tac defaults to port 8080. if you want to change that you can create your own S4.tac.
+
+For tests or inside another script
+----------------------------------
+
+see s4.tests.test_S4.S4TestBase
+
+all tests run in a random unused port.
+
+
+
+Notes:
+======
+Based on twisted
+Storage is in memory
+Its not optimal by any means, its just for testing other code.
+For now, it just implements REST put and GET
+it comes with a default /test/ bucket already created and a /size/ bucket with virtual objects the size of its name (ie, /size/100 == "0"*100)
+
=== added file 'txaws/s4/S4.tac'
--- txaws/s4/S4.tac 1970-01-01 00:00:00 +0000
+++ txaws/s4/S4.tac 2009-08-19 14:36:56 +0000
@@ -0,0 +1,74 @@
+# -*- python -*-
+# Copyright 2008-2009 Canonical Ltd.
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from __future__ import with_statement
+
+import os
+import logging
+from optparse import OptionParser
+
+import twisted.web.server
+from twisted.internet import reactor
+from twisted.application import internet, service
+
+from utils import get_arbitrary_port
+from ubuntuone.config import config
+
+logger = logging.getLogger("UbuntuOne.S4")
+logger.setLevel(config.general.log_level)
+log_folder = config.general.log_folder
+log_filename = config.s_four.log_filename
+if log_folder is not None and log_filename is not None:
+ if not os.access(log_folder, os.F_OK):
+ os.mkdir(log_folder)
+ s = logging.FileHandler(os.path.join(log_folder, log_filename))
+else:
+ s = logging.StreamHandler(sys.stderr)
+s.setFormatter(logging.Formatter(config.general.log_format))
+logger.addHandler(s)
+
+from s4 import s4
+
+if config.s_four.storagepath:
+ storedir = os.path.join(config.root, config.s_four.storagepath)
+else:
+ storedir = os.path.join(config.root, "tmp", "s4storage")
+if not os.path.exists(storedir):
+ logger.debug("creating S4 storage directory %s" % storedir)
+ os.mkdir(storedir)
+application = service.Application('s4')
+root = s4.Root(storagedir=storedir)
+# make sure "the bucket" is created
+root._add_bucket(config.api_server.s3_bucket)
+site = twisted.web.server.Site(root)
+
+port = os.getenv('S4PORT', config.aws_s3.port)
+if port:
+ port = int(port)
+# we test again in case the initial value was the "0" as a string
+if not port:
+ port = get_arbitrary_port()
+
+with open(os.path.join(config.root, "tmp", "s4.port"), "w") as s4pf:
+ s4pf.write("%d\n" % port)
+
+internet.TCPServer(port, site).setServiceParent(
+ service.IServiceCollection(application))
=== added file 'txaws/s4/__init__.py'
--- txaws/s4/__init__.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/__init__.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,1 @@
+""" S4 - a S3 storage system stub """
=== added directory 'txaws/s4/contrib'
=== added file 'txaws/s4/contrib/S3.py'
--- txaws/s4/contrib/S3.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/contrib/S3.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,627 @@
+#!/usr/bin/env python
+
+# This software code is made available "AS IS" without warranties of any
+# kind. You may copy, display, modify and redistribute the software
+# code either by itself or as incorporated into your code; provided that
+# you do not remove any proprietary notices. Your use of this software
+# code is at your own risk and you waive any claim against Amazon
+# Digital Services, Inc. or its affiliates with respect to your use of
+# this software code. (c) 2006-2007 Amazon Digital Services, Inc. or its
+# affiliates.
+
+import base64
+import hmac
+import httplib
+import re
+import sha
+import sys
+import time
+import urllib
+import urlparse
+import xml.sax
+
+DEFAULT_HOST = 's3.amazonaws.com'
+PORTS_BY_SECURITY = { True: 443, False: 80 }
+METADATA_PREFIX = 'x-amz-meta-'
+AMAZON_HEADER_PREFIX = 'x-amz-'
+
+# generates the aws canonical string for the given parameters
+def canonical_string(method, bucket="", key="", query_args={}, headers={}, expires=None):
+ interesting_headers = {}
+ for header_key in headers:
+ lk = header_key.lower()
+ if lk in ['content-md5', 'content-type', 'date'] or lk.startswith(AMAZON_HEADER_PREFIX):
+ interesting_headers[lk] = headers[header_key].strip()
+
+ # these keys get empty strings if they don't exist
+ if not interesting_headers.has_key('content-type'):
+ interesting_headers['content-type'] = ''
+ if not interesting_headers.has_key('content-md5'):
+ interesting_headers['content-md5'] = ''
+
+ # just in case someone used this. it's not necessary in this lib.
+ if interesting_headers.has_key('x-amz-date'):
+ interesting_headers['date'] = ''
+
+ # if you're using expires for query string auth, then it trumps date
+ # (and x-amz-date)
+ if expires:
+ interesting_headers['date'] = str(expires)
+
+ sorted_header_keys = interesting_headers.keys()
+ sorted_header_keys.sort()
+
+ buf = "%s\n" % method
+ for header_key in sorted_header_keys:
+ if header_key.startswith(AMAZON_HEADER_PREFIX):
+ buf += "%s:%s\n" % (header_key, interesting_headers[header_key])
+ else:
+ buf += "%s\n" % interesting_headers[header_key]
+
+ # append the bucket if it exists
+ if bucket != "":
+ buf += "/%s" % bucket
+
+ # add the key. even if it doesn't exist, add the slash
+ buf += "/%s" % urllib.quote_plus(key)
+
+ # handle special query string arguments
+
+ if query_args.has_key("acl"):
+ buf += "?acl"
+ elif query_args.has_key("torrent"):
+ buf += "?torrent"
+ elif query_args.has_key("logging"):
+ buf += "?logging"
+ elif query_args.has_key("location"):
+ buf += "?location"
+
+ return buf
+
+# computes the base64'ed hmac-sha hash of the canonical string and the secret
+# access key, optionally urlencoding the result
+def encode(aws_secret_access_key, str, urlencode=False):
+ b64_hmac = base64.encodestring(hmac.new(aws_secret_access_key, str, sha).digest()).strip()
+ if urlencode:
+ return urllib.quote_plus(b64_hmac)
+ else:
+ return b64_hmac
+
+def merge_meta(headers, metadata):
+ final_headers = headers.copy()
+ for k in metadata.keys():
+ final_headers[METADATA_PREFIX + k] = metadata[k]
+
+ return final_headers
+
+# builds the query arg string
+def query_args_hash_to_string(query_args):
+ query_string = ""
+ pairs = []
+ for k, v in query_args.items():
+ piece = k
+ if v != None:
+ piece += "=%s" % urllib.quote_plus(str(v))
+ pairs.append(piece)
+
+ return '&'.join(pairs)
+
+
+class CallingFormat:
+ PATH = 1
+ SUBDOMAIN = 2
+ VANITY = 3
+
+ def build_url_base(protocol, server, port, bucket, calling_format):
+ url_base = '%s://' % protocol
+
+ if bucket == '':
+ url_base += server
+ elif calling_format == CallingFormat.SUBDOMAIN:
+ url_base += "%s.%s" % (bucket, server)
+ elif calling_format == CallingFormat.VANITY:
+ url_base += bucket
+ else:
+ url_base += server
+
+ url_base += ":%s" % port
+
+ if (bucket != '') and (calling_format == CallingFormat.PATH):
+ url_base += "/%s" % bucket
+
+ return url_base
+
+ build_url_base = staticmethod(build_url_base)
+
+
+
+class Location:
+ DEFAULT = None
+ EU = 'EU'
+
+
+
+class AWSAuthConnection:
+ def __init__(self, aws_access_key_id, aws_secret_access_key, is_secure=True,
+ server=DEFAULT_HOST, port=None, calling_format=CallingFormat.SUBDOMAIN):
+
+ if not port:
+ port = PORTS_BY_SECURITY[is_secure]
+
+ self.aws_access_key_id = aws_access_key_id
+ self.aws_secret_access_key = aws_secret_access_key
+ self.is_secure = is_secure
+ self.server = server
+ self.port = port
+ self.calling_format = calling_format
+
+ def create_bucket(self, bucket, headers={}):
+ return Response(self._make_request('PUT', bucket, '', {}, headers))
+
+ def create_located_bucket(self, bucket, location=Location.DEFAULT, headers={}):
+ if location == Location.DEFAULT:
+ body = ""
+ else:
+ body = "<CreateBucketConstraint><LocationConstraint>" + \
+ location + \
+ "</LocationConstraint></CreateBucketConstraint>"
+ return Response(self._make_request('PUT', bucket, '', {}, headers, body))
+
+ def check_bucket_exists(self, bucket):
+ return self._make_request('HEAD', bucket, '', {}, {})
+
+ def list_bucket(self, bucket, options={}, headers={}):
+ return ListBucketResponse(self._make_request('GET', bucket, '', options, headers))
+
+ def delete_bucket(self, bucket, headers={}):
+ return Response(self._make_request('DELETE', bucket, '', {}, headers))
+
+ def put(self, bucket, key, object, headers={}):
+ if not isinstance(object, S3Object):
+ object = S3Object(object)
+
+ return Response(
+ self._make_request(
+ 'PUT',
+ bucket,
+ key,
+ {},
+ headers,
+ object.data,
+ object.metadata))
+
+ def get(self, bucket, key, headers={}):
+ return GetResponse(
+ self._make_request('GET', bucket, key, {}, headers))
+
+ def delete(self, bucket, key, headers={}):
+ return Response(
+ self._make_request('DELETE', bucket, key, {}, headers))
+
+ def get_bucket_logging(self, bucket, headers={}):
+ return GetResponse(self._make_request('GET', bucket, '', { 'logging': None }, headers))
+
+ def put_bucket_logging(self, bucket, logging_xml_doc, headers={}):
+ return Response(self._make_request('PUT', bucket, '', { 'logging': None }, headers, logging_xml_doc))
+
+ def get_bucket_acl(self, bucket, headers={}):
+ return self.get_acl(bucket, '', headers)
+
+ def get_acl(self, bucket, key, headers={}):
+ return GetResponse(
+ self._make_request('GET', bucket, key, { 'acl': None }, headers))
+
+ def put_bucket_acl(self, bucket, acl_xml_document, headers={}):
+ return self.put_acl(bucket, '', acl_xml_document, headers)
+
+ def put_acl(self, bucket, key, acl_xml_document, headers={}):
+ return Response(
+ self._make_request(
+ 'PUT',
+ bucket,
+ key,
+ { 'acl': None },
+ headers,
+ acl_xml_document))
+
+ def list_all_my_buckets(self, headers={}):
+ return ListAllMyBucketsResponse(self._make_request('GET', '', '', {}, headers))
+
+ def get_bucket_location(self, bucket):
+ return LocationResponse(self._make_request('GET', bucket, '', {'location' : None}))
+
+ # end public methods
+
+ def _make_request(self, method, bucket='', key='', query_args={}, headers={}, data='', metadata={}):
+
+ server = ''
+ if bucket == '':
+ server = self.server
+ elif self.calling_format == CallingFormat.SUBDOMAIN:
+ server = "%s.%s" % (bucket, self.server)
+ elif self.calling_format == CallingFormat.VANITY:
+ server = bucket
+ else:
+ server = self.server
+
+ path = ''
+
+ if (bucket != '') and (self.calling_format == CallingFormat.PATH):
+ path += "/%s" % bucket
+
+ # add the slash after the bucket regardless
+ # the key will be appended if it is non-empty
+ path += "/%s" % urllib.quote_plus(key)
+
+
+ # build the path_argument string
+ # add the ? in all cases since
+ # signature and credentials follow path args
+ if len(query_args):
+ path += "?" + query_args_hash_to_string(query_args)
+
+ is_secure = self.is_secure
+ host = "%s:%d" % (server, self.port)
+ while True:
+ if (is_secure):
+ connection = httplib.HTTPSConnection(host)
+ else:
+ connection = httplib.HTTPConnection(host)
+
+ final_headers = merge_meta(headers, metadata);
+ # add auth header
+ self._add_aws_auth_header(final_headers, method, bucket, key, query_args)
+
+ connection.request(method, path, data, final_headers)
+ resp = connection.getresponse()
+ if resp.status < 300 or resp.status >= 400:
+ return resp
+ # handle redirect
+ location = resp.getheader('location')
+ if not location:
+ return resp
+ # (close connection)
+ resp.read()
+ scheme, host, path, params, query, fragment \
+ = urlparse.urlparse(location)
+ if scheme == "http": is_secure = True
+ elif scheme == "https": is_secure = False
+ else: raise invalidURL("Not http/https: " + location)
+ if query: path += "?" + query
+ # retry with redirect
+
+ def _add_aws_auth_header(self, headers, method, bucket, key, query_args):
+ if not headers.has_key('Date'):
+ headers['Date'] = time.strftime("%a, %d %b %Y %X GMT", time.gmtime())
+
+ c_string = canonical_string(method, bucket, key, query_args, headers)
+ headers['Authorization'] = \
+ "AWS %s:%s" % (self.aws_access_key_id, encode(self.aws_secret_access_key, c_string))
+
+
+class QueryStringAuthGenerator:
+ # by default, expire in 1 minute
+ DEFAULT_EXPIRES_IN = 60
+
+ def __init__(self, aws_access_key_id, aws_secret_access_key, is_secure=True,
+ server=DEFAULT_HOST, port=None, calling_format=CallingFormat.SUBDOMAIN):
+
+ if not port:
+ port = PORTS_BY_SECURITY[is_secure]
+
+ self.aws_access_key_id = aws_access_key_id
+ self.aws_secret_access_key = aws_secret_access_key
+ if (is_secure):
+ self.protocol = 'https'
+ else:
+ self.protocol = 'http'
+
+ self.is_secure = is_secure
+ self.server = server
+ self.port = port
+ self.calling_format = calling_format
+ self.__expires_in = QueryStringAuthGenerator.DEFAULT_EXPIRES_IN
+ self.__expires = None
+
+ # for backwards compatibility with older versions
+ self.server_name = "%s:%s" % (self.server, self.port)
+
+ def set_expires_in(self, expires_in):
+ self.__expires_in = expires_in
+ self.__expires = None
+
+ def set_expires(self, expires):
+ self.__expires = expires
+ self.__expires_in = None
+
+ def create_bucket(self, bucket, headers={}):
+ return self.generate_url('PUT', bucket, '', {}, headers)
+
+ def list_bucket(self, bucket, options={}, headers={}):
+ return self.generate_url('GET', bucket, '', options, headers)
+
+ def delete_bucket(self, bucket, headers={}):
+ return self.generate_url('DELETE', bucket, '', {}, headers)
+
+ def put(self, bucket, key, object, headers={}):
+ if not isinstance(object, S3Object):
+ object = S3Object(object)
+
+ return self.generate_url(
+ 'PUT',
+ bucket,
+ key,
+ {},
+ merge_meta(headers, object.metadata))
+
+ def get(self, bucket, key, headers={}):
+ return self.generate_url('GET', bucket, key, {}, headers)
+
+ def delete(self, bucket, key, headers={}):
+ return self.generate_url('DELETE', bucket, key, {}, headers)
+
+ def get_bucket_logging(self, bucket, headers={}):
+ return self.generate_url('GET', bucket, '', { 'logging': None }, headers)
+
+ def put_bucket_logging(self, bucket, logging_xml_doc, headers={}):
+ return self.generate_url('PUT', bucket, '', { 'logging': None }, headers)
+
+ def get_bucket_acl(self, bucket, headers={}):
+ return self.get_acl(bucket, '', headers)
+
+ def get_acl(self, bucket, key='', headers={}):
+ return self.generate_url('GET', bucket, key, { 'acl': None }, headers)
+
+ def put_bucket_acl(self, bucket, acl_xml_document, headers={}):
+ return self.put_acl(bucket, '', acl_xml_document, headers)
+
+ # don't really care what the doc is here.
+ def put_acl(self, bucket, key, acl_xml_document, headers={}):
+ return self.generate_url('PUT', bucket, key, { 'acl': None }, headers)
+
+ def list_all_my_buckets(self, headers={}):
+ return self.generate_url('GET', '', '', {}, headers)
+
+ def make_bare_url(self, bucket, key=''):
+ full_url = self.generate_url(self, bucket, key)
+ return full_url[:full_url.index('?')]
+
+ def generate_url(self, method, bucket='', key='', query_args={}, headers={}):
+ expires = 0
+ if self.__expires_in != None:
+ expires = int(time.time() + self.__expires_in)
+ elif self.__expires != None:
+ expires = int(self.__expires)
+ else:
+ raise "Invalid expires state"
+
+ canonical_str = canonical_string(method, bucket, key, query_args, headers, expires)
+ encoded_canonical = encode(self.aws_secret_access_key, canonical_str)
+
+ url = CallingFormat.build_url_base(self.protocol, self.server, self.port, bucket, self.calling_format)
+
+ url += "/%s" % urllib.quote_plus(key)
+
+ query_args['Signature'] = encoded_canonical
+ query_args['Expires'] = expires
+ query_args['AWSAccessKeyId'] = self.aws_access_key_id
+
+ url += "?%s" % query_args_hash_to_string(query_args)
+
+ return url
+
+
+class S3Object:
+ def __init__(self, data, metadata={}):
+ self.data = data
+ self.metadata = metadata
+
+class Owner:
+ def __init__(self, id='', display_name=''):
+ self.id = id
+ self.display_name = display_name
+
+class ListEntry:
+ def __init__(self, key='', last_modified=None, etag='', size=0, storage_class='', owner=None):
+ self.key = key
+ self.last_modified = last_modified
+ self.etag = etag
+ self.size = size
+ self.storage_class = storage_class
+ self.owner = owner
+
+class CommonPrefixEntry:
+ def __init(self, prefix=''):
+ self.prefix = prefix
+
+class Bucket:
+ def __init__(self, name='', creation_date=''):
+ self.name = name
+ self.creation_date = creation_date
+
+class Response:
+ def __init__(self, http_response):
+ self.http_response = http_response
+ # you have to do this read, even if you don't expect a body.
+ # otherwise, the next request fails.
+ self.body = http_response.read()
+ if http_response.status >= 300 and self.body:
+ self.message = self.body
+ else:
+ self.message = "%03d %s" % (http_response.status, http_response.reason)
+
+
+
+class ListBucketResponse(Response):
+ def __init__(self, http_response):
+ Response.__init__(self, http_response)
+ if http_response.status < 300:
+ handler = ListBucketHandler()
+ xml.sax.parseString(self.body, handler)
+ self.entries = handler.entries
+ self.common_prefixes = handler.common_prefixes
+ self.name = handler.name
+ self.marker = handler.marker
+ self.prefix = handler.prefix
+ self.is_truncated = handler.is_truncated
+ self.delimiter = handler.delimiter
+ self.max_keys = handler.max_keys
+ self.next_marker = handler.next_marker
+ else:
+ self.entries = []
+
+class ListAllMyBucketsResponse(Response):
+ def __init__(self, http_response):
+ Response.__init__(self, http_response)
+ if http_response.status < 300:
+ handler = ListAllMyBucketsHandler()
+ xml.sax.parseString(self.body, handler)
+ self.entries = handler.entries
+ else:
+ self.entries = []
+
+class GetResponse(Response):
+ def __init__(self, http_response):
+ Response.__init__(self, http_response)
+ response_headers = http_response.msg # older pythons don't have getheaders
+ metadata = self.get_aws_metadata(response_headers)
+ self.object = S3Object(self.body, metadata)
+
+ def get_aws_metadata(self, headers):
+ metadata = {}
+ for hkey in headers.keys():
+ if hkey.lower().startswith(METADATA_PREFIX):
+ metadata[hkey[len(METADATA_PREFIX):]] = headers[hkey]
+ del headers[hkey]
+
+ return metadata
+
+class LocationResponse(Response):
+ def __init__(self, http_response):
+ Response.__init__(self, http_response)
+ if http_response.status < 300:
+ handler = LocationHandler()
+ xml.sax.parseString(self.body, handler)
+ self.location = handler.location
+
+class ListBucketHandler(xml.sax.ContentHandler):
+ def __init__(self):
+ self.entries = []
+ self.curr_entry = None
+ self.curr_text = ''
+ self.common_prefixes = []
+ self.curr_common_prefix = None
+ self.name = ''
+ self.marker = ''
+ self.prefix = ''
+ self.is_truncated = False
+ self.delimiter = ''
+ self.max_keys = 0
+ self.next_marker = ''
+ self.is_echoed_prefix_set = False
+
+ def startElement(self, name, attrs):
+ if name == 'Contents':
+ self.curr_entry = ListEntry()
+ elif name == 'Owner':
+ self.curr_entry.owner = Owner()
+ elif name == 'CommonPrefixes':
+ self.curr_common_prefix = CommonPrefixEntry()
+
+
+ def endElement(self, name):
+ if name == 'Contents':
+ self.entries.append(self.curr_entry)
+ elif name == 'CommonPrefixes':
+ self.common_prefixes.append(self.curr_common_prefix)
+ elif name == 'Key':
+ self.curr_entry.key = self.curr_text
+ elif name == 'LastModified':
+ self.curr_entry.last_modified = self.curr_text
+ elif name == 'ETag':
+ self.curr_entry.etag = self.curr_text
+ elif name == 'Size':
+ self.curr_entry.size = int(self.curr_text)
+ elif name == 'ID':
+ self.curr_entry.owner.id = self.curr_text
+ elif name == 'DisplayName':
+ self.curr_entry.owner.display_name = self.curr_text
+ elif name == 'StorageClass':
+ self.curr_entry.storage_class = self.curr_text
+ elif name == 'Name':
+ self.name = self.curr_text
+ elif name == 'Prefix' and self.is_echoed_prefix_set:
+ self.curr_common_prefix.prefix = self.curr_text
+ elif name == 'Prefix':
+ self.prefix = self.curr_text
+ self.is_echoed_prefix_set = True
+ elif name == 'Marker':
+ self.marker = self.curr_text
+ elif name == 'IsTruncated':
+ self.is_truncated = self.curr_text == 'true'
+ elif name == 'Delimiter':
+ self.delimiter = self.curr_text
+ elif name == 'MaxKeys':
+ self.max_keys = int(self.curr_text)
+ elif name == 'NextMarker':
+ self.next_marker = self.curr_text
+
+ self.curr_text = ''
+
+ def characters(self, content):
+ self.curr_text += content
+
+
+class ListAllMyBucketsHandler(xml.sax.ContentHandler):
+ def __init__(self):
+ self.entries = []
+ self.curr_entry = None
+ self.curr_text = ''
+
+ def startElement(self, name, attrs):
+ if name == 'Bucket':
+ self.curr_entry = Bucket()
+
+ def endElement(self, name):
+ if name == 'Name':
+ self.curr_entry.name = self.curr_text
+ elif name == 'CreationDate':
+ self.curr_entry.creation_date = self.curr_text
+ elif name == 'Bucket':
+ self.entries.append(self.curr_entry)
+
+ def characters(self, content):
+ self.curr_text = content
+
+
+class LocationHandler(xml.sax.ContentHandler):
+ def __init__(self):
+ self.location = None
+ self.state = 'init'
+
+ def startElement(self, name, attrs):
+ if self.state == 'init':
+ if name == 'LocationConstraint':
+ self.state = 'tag_location'
+ self.location = ''
+ else: self.state = 'bad'
+ else: self.state = 'bad'
+
+ def endElement(self, name):
+ if self.state == 'tag_location' and name == 'LocationConstraint':
+ self.state = 'done'
+ else: self.state = 'bad'
+
+ def characters(self, content):
+ if self.state == 'tag_location':
+ self.location += content
+
+if __name__=="__main__":
+ keys = raw_input("Enter access and secret key (separated by a space): ")
+ access_key, secret_key = keys.split(" ")
+ s3 = AWSAuthConnection(access_key, secret_key)
+ bucket = "test_s3_lib"
+ m = s3.put(bucket, "sample", "hola mundo", {"Content-Type":"text/lame"})
+ print m.http_response.status, m.http_response.reason
+ print m.http_response.getheaders()
+ print m.body
=== added file 'txaws/s4/contrib/__init__.py'
=== added file 'txaws/s4/s4.py'
--- txaws/s4/s4.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/s4.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,742 @@
+# Copyright 2009 Canonical Ltd.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+""" S4 - a S3 storage system stub
+
+This module implementes a stub for Amazons S3 storage system.
+
+Not all functionality is provided, just enough to test the client.
+
+"""
+from __future__ import with_statement
+
+import os
+import hmac
+import time
+import base64
+import logging
+import hashlib
+from urllib import urlencode
+
+# cPickle would be faster here, but working around its relative
+# imports issues for this module requires extra hacking
+import pickle
+
+from boto.utils import canonical_string as canonical_path_string
+# pylint: disable-msg=W0611
+from boto.s3.connection import OrdinaryCallingFormat as CallingFormat
+
+from twisted.web import server, resource, error, http
+from twisted.internet import reactor, interfaces
+# pylint and zope dont work
+# pylint: disable-msg=E0611
+# pylint: disable-msg=F0401
+from zope.interface import implements
+
+# xml namespace response header required
+XMLNS = "http://s3.amazonaws.com/doc/2006-03-01"
+
+AWS_DEFAULT_ACCESS_KEY_ID = 'aws_key'
+AWS_DEFAULT_SECRET_ACCESS_KEY = 'aws_secret'
+AMAZON_HEADER_PREFIX = 'x-amz-'
+AMAZON_META_PREFIX = "x-amz-meta-"
+
+BLOCK_SIZE = 2**16
+
+S4_STATE_FILE = ".s4_state"
+
+logger = logging.getLogger('UbuntuOne.S4')
+
+# pylint: disable-msg=W0403
+import s4_xml
+
+class S4StorageException(Exception):
+ """ exception raised when S4 backend store runs into trouble """
+
+class FakeContent(object):
+ """A content that can be accesed by slicing but will never exists in memory
+ """
+ def __init__(self, char, size):
+ """Create the content as char*size."""
+ self.char = char
+ self.size = size
+
+ def __getitem__(self, slice):
+ """Get a piece of the content."""
+ size = min(slice.stop, self.size) - slice.start
+ return self.char*size
+
+ def hexdigest(self):
+ """Send a fake hexdigest. For big contents this takes too much time
+ to calculate, so we just fake it."""
+ block_size = BLOCK_SIZE
+ start = 0
+ data = self[start:start+block_size]
+ md5calc = hashlib.md5()
+ md5calc.update(data)
+ return md5calc.hexdigest()
+
+ def __len__(self):
+ """The size."""
+ return self.size
+
+class ContentProducer(object):
+ """A content producer used to stream big data."""
+ implements(interfaces.IPullProducer)
+
+ def __init__(self, request, content, buffer_size=BLOCK_SIZE):
+ """Create a producer for request, that produces content."""
+ self.request = request
+ self.content = content
+ self.buffer_size = buffer_size
+ self.position = 0
+ self.paused = False
+
+ def startProducing(self):
+ """IPullProducer api."""
+ self.request.registerProducer(self, streaming=False)
+
+ def finished(self):
+ """Called to finish the request after producing."""
+ self.request.unregisterProducer()
+ self.request.finish()
+
+ def resumeProducing(self):
+ """IPullProducer api."""
+ if self.position > len(self.content):
+ self.finished()
+ return
+
+ data = self.content[self.position:self.position+self.buffer_size]
+
+ self.position += self.buffer_size
+ self.request.write(data)
+
+ def stopProducing(self):
+ """IPullProducer api."""
+ pass
+
+
+def canonical_string(method, bucket="", key="", query_args=None, headers=None):
+ """ compatibility S3 canonical string calculator for cases where passing in
+ a bucket name, key anme and a hash of query args is easier than using an
+ S3 path """
+ path = []
+ if bucket:
+ path.append("/%s" % bucket)
+ path.append("/%s" % key)
+ if query_args:
+ path.append("?%s" % urlencode(query_args))
+ path = "".join(path)
+ if headers is None:
+ headers = {}
+ return canonical_path_string(method=method, path=path, headers=headers)
+
+def encode(secret_key, data):
+ """base64encoded digest of data using secret_key"""
+ encoded = hmac.new(secret_key, data, hashlib.sha1).digest()
+ return base64.encodestring(encoded).strip()
+
+def parse_range_header(range):
+ """modeled after twisted.web.static.File._parseRangeHeader()"""
+ if '=' in range:
+ type, value = range.split('=', 1)
+ else:
+ raise ValueError("Invalid range header, no '='")
+ if type != 'bytes':
+ raise ValueError("Invalid range header, must be a 'bytes' range")
+ raw_ranges = [bytes.strip() for bytes in value.split(',')]
+ ranges = []
+ for current_range in raw_ranges:
+ if '-' not in current_range:
+ raise ValueError("Illegal byte range: %r" % current_range)
+ begin, end = current_range.split('-')
+ if begin:
+ begin = int(begin)
+ else:
+ begin = None
+ if end:
+ end = int(end)
+ else:
+ end = None
+ ranges.append((begin, end))
+ return ranges
+
+class _ListResult(resource.Resource):
+ """ base class for bulding lists of amazon results """
+ isLeaf = True
+ def __init__(self):
+ resource.Resource.__init__(self)
+ def add_headers(self, request, content):
+ """ add standard headers to an amazon list result page reply """
+ request.setHeader("x-amz-id-2", str(request))
+ request.setHeader("x-amz-request-id", str(request))
+ request.setHeader("Content-Type", "text/xml")
+ request.setHeader("Content-Length", str(len(content)))
+
+
+class ListAllMyBucketsResult(_ListResult):
+ """ builds the result for list all buckets call """
+ def __init__(self, buckets, owner=None):
+ _ListResult.__init__(self)
+ self.buckets = buckets
+ if owner:
+ self.owner = owner
+ else:
+ self.owner = dict(id = 0, name = "fakeuser")
+
+ def render_GET(self, request):
+ """ render request for a GET listing """
+ lambr = s4_xml.ListAllMyBucketsResult(self.owner, self.buckets)
+ content = s4_xml.to_XML(lambr)
+ self.add_headers(request, content)
+ return content
+
+class ListBucketResult(_ListResult):
+ """ encapsulates a list of items in a bucket """
+ def __init__(self, bucket):
+ _ListResult.__init__(self)
+ self.bucket = bucket
+
+ def render_GET(self, request):
+ """ Render response for a GET listing """
+ # pylint: disable-msg=W0631
+ children = self.bucket.bucket_children.copy()
+ prefix = request.args.get("prefix", "")
+ if prefix:
+ children = dict([x for x in children.iteritems()
+ if x[0].startswith(prefix[0])])
+ maxkeys = request.args.get("max-keys", 0)
+ if maxkeys:
+ maxkeys = int(maxkeys[0])
+ ck = children.keys()[:maxkeys]
+ children = dict([x for x in children.iteritems() if x[0] in ck])
+ lbr = s4_xml.ListBucketResult(self.bucket, children)
+ s4_xml.add_props(lbr, Prefix=prefix, MaxKeys=maxkeys)
+ content = s4_xml.to_XML(lbr)
+ self.add_headers(request, content)
+ return content
+
+class BasicS3Object(object):
+ """ Basic S3 object class that takes care of contents and properties """
+ owner_id = 0
+ owner = "fakeuser"
+
+ def __init__(self, name, contents, content_type="binary/octect-stream",
+ content_md5=None):
+ self.name = name
+ self.content_type = content_type
+ self.contents = contents
+ if content_md5:
+ if isinstance(content_md5, str):
+ self._etag = content_md5
+ else:
+ self._etag = content_md5.hexdigest()
+ else:
+ self._etag = hashlib.md5(contents).hexdigest()
+ self._date = time.asctime()
+ self._meta = {}
+
+ def __getstate__(self):
+ d = self.__dict__.copy()
+ del d["children"]
+ return d
+
+ def get_etag(self):
+ " build an ETag value. Extra quites are mandated by standards "
+ return '"%s"' % self._etag
+ def set_date(self, datestr):
+ """ set the object's time """
+ self._date = datestr
+ def get_date(self):
+ """ get the object's time """
+ return self._date
+ def get_size(self):
+ """ returns size of object's contents """
+ return len(self.contents)
+ def get_owner(self):
+ """ query object's owner """
+ return self.owner
+ def get_owner_id(self):
+ """ query object's owner id """
+ return self.owner_id
+ def set_meta(self, name, val):
+ """ set metadata value for object """
+ m = self._meta.setdefault(name, [])
+ m.append(val)
+ def iter_meta(self):
+ """ iterate over object's metadata """
+ for k, vals in self._meta.iteritems():
+ for v in vals:
+ yield k, v
+ def delete(self):
+ """ clear storage used by object """
+ self.contents = None
+
+class S3Object(BasicS3Object, resource.Resource):
+ """ Storage Object
+ This objects store the data and metadata
+ """
+ isLeaf = True
+
+ def __init__(self, *args, **kw):
+ BasicS3Object.__init__(self, *args, **kw)
+ resource.Resource.__init__(self)
+
+ def _render(self, request):
+ """render the response for a GET or HEAD request on this object"""
+ request.setHeader("x-amz-id-2", str(request))
+ request.setHeader("x-amz-request-id", str(request))
+ request.setHeader("Content-Type", self.content_type)
+ request.setHeader("ETag", self._etag)
+ for k, v in self.iter_meta():
+ request.setHeader("%s%s" % (AMAZON_META_PREFIX, k), v)
+ range = request.getHeader("Range")
+ size = len(self.contents)
+ if request.method == 'HEAD':
+ request.setHeader("Content-Length", size)
+ return ""
+ if range:
+ ranges = parse_range_header(range)
+ length = 0
+ if len(ranges)==1:
+ begin, end = ranges[0]
+ if begin is None:
+ request.setResponseCode(
+ http.REQUESTED_RANGE_NOT_SATISFIABLE)
+ return ''
+ if not end:
+ end = size
+ elif end < size:
+ end += 1
+ if begin >= size:
+ request.setResponseCode(
+ http.REQUESTED_RANGE_NOT_SATISFIABLE)
+ request.setHeader(
+ 'content-range', 'bytes */%d' % size)
+ return ''
+ else:
+ request.setHeader(
+ 'content-range',
+ 'bytes %d-%d/%d' % (begin, end-1, size))
+ length = (end - begin)
+ request.setHeader("Content-Length", length)
+ request.setResponseCode(http.PARTIAL_CONTENT)
+ contents = self.contents[begin:end]
+ else:
+ # multiple ranges should be returned in a multipart response
+ request.setResponseCode(
+ http.REQUESTED_RANGE_NOT_SATISFIABLE)
+ return ''
+
+ else:
+ request.setHeader("Content-Length", str(size))
+ contents = self.contents
+
+ producer = ContentProducer(request, contents)
+ producer.startProducing()
+ return server.NOT_DONE_YET
+ render_GET = _render
+ render_HEAD = _render
+
+class UploadS3Object(resource.Resource):
+ """ Class for handling uploads
+
+ It handles the render_PUT method to update the bucket with the data
+ """
+ isLeaf = True
+ def __init__(self, bucket, name):
+ resource.Resource.__init__(self)
+ self.bucket = bucket
+ self.name = name
+
+ def render_PUT(self, request):
+ """accept the incoming data for a PUT request"""
+ data = request.content.read()
+ content_type = request.getHeader("Content-Type")
+ content_md5 = request.getHeader("Content-MD5")
+ if content_md5: # check if the data is good
+ header_md5 = base64.decodestring(content_md5)
+ data_md5 = hashlib.md5(data)
+ assert (data_md5.digest() == header_md5), "md5 check failed!"
+ content_md5 = data_md5
+ child = S3Object(self.name, data, content_type, content_md5)
+ date = request.getHeader("Date")
+ if not date:
+ date = time.ctime()
+ child.set_date(date)
+ for k, v in request.getAllHeaders().items():
+ if k.startswith(AMAZON_META_PREFIX):
+ child.set_meta(k[len(AMAZON_META_PREFIX):], v)
+ self.bucket.bucket_children[ self.name ] = child
+ request.setHeader("ETag", child.get_etag())
+ logger.debug("created object bucket=%s name=%s size=%d" % (
+ self.bucket, self.name, len(data)))
+ return ""
+
+
+class EmptyPage(resource.Resource):
+ """ return Ok/empty document """
+ isLeaf = True
+ def __init__(self, retcode=http.OK, headers=None, body=""):
+ resource.Resource.__init__(self)
+ self._retcode = retcode
+ self._headers = headers
+ self._body = body
+
+ def render(self, request):
+ """ override the render method to return an empty document """
+ request.setHeader("x-amz-id-2", str(request))
+ request.setHeader("x-amz-request-id", str(request))
+ request.setHeader("Content-Type", "text/html")
+ request.setHeader("Connection", "close")
+ if self._headers:
+ for h, v in self._headers.items():
+ request.setHeader(h, v)
+ request.setResponseCode(self._retcode)
+ return self._body
+
+def ErrorPage(http_code, code, message, path, with_body=True):
+ """ helper function that renders an Amazon error response xml page """
+ err = s4_xml.AmazonError(code, message, path)
+ body = s4_xml.to_XML(err)
+ body_size = str(len(body))
+ if not with_body:
+ body = ""
+ logger.info("returning error page %s [%s]%s for %s" % (
+ http_code, code, message, path))
+ return EmptyPage(http_code, headers={
+ "Content-Type": "text/xml",
+ "Content-Length": body_size,
+ }, body=body)
+
+# pylint: disable-msg=C0321
+class Bucket(resource.Resource):
+ """ Storage Bucket
+
+ Buckets hold objects with data and receive uploads in case of PUT
+ """
+ def __init__(self, name):
+ resource.Resource.__init__(self)
+ # cant use children, resource already has that name
+ # and it would work as a cache
+ self.bucket_children = {}
+ self._name = name
+ self._date = time.time()
+
+ def get_name(self):
+ """ returns this bucket's name """
+ return self._name
+ def __len__(self):
+ """ returns how many objects are in this bucket """
+ return len(self.bucket_children)
+ def iter_children(self):
+ """ iterator that returns each children objects """
+ for (key, val) in self.bucket_children.iteritems():
+ yield key, val
+ def delete(self):
+ """ clean up internal state to prepare bucket for deletion """
+ pass
+ def _get_state_file(self, rootdir, check=True):
+ """ builds the pathname of the state file """
+ state_file = os.path.join(rootdir, "%s%s" % (self._name, S4_STATE_FILE))
+ if check and not os.path.exists(state_file):
+ return None
+ return state_file
+ def _save(self, rootdir):
+ """ saves the state of a bucket """
+ state_file = self._get_state_file(rootdir, check=False)
+ data = dict(
+ name = self._name,
+ date = self._date,
+ objects = dict([ x for x in self.bucket_children.iteritems() ])
+ )
+ with open(state_file, "wb") as state_fd:
+ pickle.dump(data, state_fd)
+ logger.debug("saved bucket '%s' in file '%s'" % (
+ self._name, state_file))
+ return
+ def _load(self, rootdir):
+ """ loads a saved bucket state """
+ state_file = self._get_state_file(rootdir)
+ if not state_file:
+ return
+ with open(state_file, "rb") as state_fd:
+ data = pickle.load(state_fd)
+ assert (self._name == data["name"]), \
+ "can not load bucket with different name"
+ self._date = data["date"]
+ self.bucket_children = data["objects"]
+ return
+
+ def getChild(self, name, request):
+ """get the next object down the chain"""
+ # avoid recursion into the key names
+ # (which can contain / as a valid char!)
+ if name and request.postpath:
+ name = os.path.join(*((name,)+tuple(request.postpath)))
+ assert (name), "Wrong call stack for name='%s'" % (name,)
+ if request.method == "PUT":
+ child = UploadS3Object(self, name)
+ elif request.method in ("GET", "HEAD") :
+ child = self.bucket_children.get(name, None)
+ elif request.method == "DELETE":
+ child = self.bucket_children.get(name, None)
+ if child is None: # delete unknown object
+ return EmptyPage(http.NO_CONTENT)
+ child.delete()
+ del self.bucket_children[name]
+ return EmptyPage(http.NO_CONTENT)
+ else:
+ logger.error("UNHANDLED request method %s" % request.method)
+ return ErrorPage(http.BAD_REQUEST, "BadRequest",
+ "Your '%s' request is invalid" % request.method,
+ request.path)
+ if child is None:
+ return ErrorPage(http.NOT_FOUND, "NoSuchKey",
+ "The specified key does not exist.",
+ request.path, with_body=(request.method!="HEAD"))
+ return child
+
+class DiscardBucket(Bucket):
+ """A bucket that will just discard all data as it arrives."""
+
+ def getChild(self, name, request):
+ """accept uploads and discard them."""
+ if request.method == "PUT":
+ return self
+ else:
+ return ErrorPage(http.NOT_FOUND, "NoSuchKey",
+ "The specified key does not exist.",
+ request.path)
+
+ def render_PUT(self, request):
+ """accept the incoming data for a PUT request"""
+ # we need to compute a correct md5/etag to send back to the client
+ etag = hashlib.md5()
+ # this loop should be deadlocking with the client code that writes the
+ # data. But render put doesnt get called until the streamer has
+ # put all the that. The python mem usage is constant. And it works.
+ while True:
+ data = request.content.read(BLOCK_SIZE)
+ if not data:
+ break
+ etag.update(data)
+ request.setHeader("ETag", '"%s"' % etag.hexdigest())
+ return ""
+
+class SizeBucket(Bucket):
+ """ SizeBucket
+
+ Fakes contents and always returns an object with size = int(objname)
+ """
+
+ def getChild(self, name, request):
+ """get the next object down the chain"""
+ try:
+ fake = FakeContent("0", int(name))
+ o = S3Object(name, fake, "text/plain", fake.hexdigest())
+ return o
+ except ValueError:
+ return "this buckets requires integer named objects"
+
+
+class Root(resource.Resource):
+ """ Site Root
+
+ handles all the requests.
+ on initialization it configures some default buckets
+ """
+ owner_id = 0
+ owner = "fakeuser"
+
+ def __init__(self, storagedir=None, allow_default_access=True):
+ resource.Resource.__init__(self)
+
+ self.auth = {}
+ if allow_default_access:
+ self.auth[ AWS_DEFAULT_ACCESS_KEY_ID ] = \
+ AWS_DEFAULT_SECRET_ACCESS_KEY
+ self.fail_next = {}
+ self.buckets = dict(
+ size = SizeBucket("size"),
+ discard = DiscardBucket("discard"))
+
+ self._rootdir = storagedir
+ if self._rootdir:
+ self._load()
+
+ def _add_bucket(self, name):
+ """ create a new bucket """
+ if self.buckets.has_key(name):
+ return self.buckets[name]
+ bucket = Bucket(name)
+ self.buckets[name] = bucket
+ if self._rootdir:
+ bucket._save(self._rootdir)
+ self._save()
+ return bucket
+
+ def _get_state_file(self, check=True):
+ """ locate the saved state file on disk """
+ assert self._rootdir, "S4 storage has not been initialized"
+ state_file = os.path.join(self._rootdir, S4_STATE_FILE)
+ if check and not os.path.exists(state_file):
+ return None
+ return state_file
+ def _load(self):
+ "load a saved bucket list state from disk "
+ state_file = self._get_state_file()
+ if not state_file:
+ return
+ data = dict(buckets=[])
+ with open(state_file, "rb") as state_fd:
+ data = pickle.load(state_fd)
+ self.owner_id = data["owner_id"]
+ self.owner = data["owner"]
+ for bucket_name in data["buckets"]:
+ bucket = Bucket(bucket_name)
+ bucket._load(self._rootdir)
+ self.buckets[bucket_name] = bucket
+ self._save(with_buckets=False)
+ return
+ def _save(self, with_buckets=True):
+ """ save current state to disk """
+ state_file = self._get_state_file(check=False)
+ data = dict(
+ owner = self.owner,
+ owner_id = self.owner_id,
+ buckets = [ x for x in self.buckets.keys()
+ if x not in ("size", "discard")],
+ )
+ with open(state_file, "wb") as state_fd:
+ pickle.dump(data, state_fd)
+ logger.debug("saved state file %s" % state_file)
+ if not with_buckets:
+ return
+ for bucket_name in data["buckets"]:
+ bucket = self.buckets[bucket_name]
+ bucket._save(self._rootdir)
+ return
+ def fail_next_put(self, error=http.INTERNAL_SERVER_ERROR,
+ message="Internal Server Error"):
+ """
+ Force next PUT request to return an error
+ """
+ logger.debug("will fail next put with %d (%s)", error, message)
+ self.fail_next['PUT'] = error, message
+
+ def fail_next_get(self, error=http.INTERNAL_SERVER_ERROR,
+ message="Internal Server Error"):
+ """
+ Force next GET request to return an error
+ """
+ logger.debug("will fail next get with %d (%s)", error, message)
+ self.fail_next['GET'] = error, message
+
+ def getChild(self, name, request):
+ """get the next object down the resource path"""
+ if not self.check_auth( request ):
+ return ErrorPage(http.FORBIDDEN, "InvalidSecurity",
+ "The provided security credentials are not valid.",
+ request.path)
+ if request.method in self.fail_next:
+ err, message = self.fail_next.pop(request.method)
+ return error.ErrorPage(err, message, message)
+ if request.path == "/" and request.method == "GET":
+ # this is a getallbuckets call
+ return ListAllMyBucketsResult(self.buckets.values())
+
+ # need to record when things change and save bucket state
+ if self._rootdir and name and request.method in ("PUT", "DELETE"):
+ def save_state(result, self, name, method):
+ """ callback for when rendering is finished """
+ bucket = self.buckets[name]
+ return bucket._save(self._rootdir)
+ _defer = request.notifyFinish()
+ _defer.addCallback(save_state, self, name, request.method)
+
+ bucket = self.buckets.get(name, None)
+ # if we operate on a key, pass control
+ if request.postpath and request.postpath[0]:
+ if bucket is None:
+ # bucket does not exist, yet we attempt operation on
+ # an object from that bucket
+ return ErrorPage(http.NOT_FOUND, "InvalidBucketName",
+ "The specified bucket is not valid",
+ request.path)
+ return bucket
+
+ # these are operations that are happening on a bucket and
+ # which are better handled from the root handler
+
+ # we're asked to list a bucket
+ if request.method in ("GET", "HEAD"):
+ if bucket is None:
+ return ErrorPage(http.NOT_FOUND, "NoSuchBucket",
+ "The specified bucket does not exist.",
+ request.path)
+ return ListBucketResult(bucket)
+ # bucket creation. if bucket already exists, noop
+ elif request.method == "PUT":
+ if bucket is None:
+ bucket = self._add_bucket(name)
+ return EmptyPage()
+ # we're asked to delete a bucket
+ elif request.method == "DELETE":
+ if len(bucket): # non-empty buckets can not be deleted
+ return ErrorPage(http.CONFLICT, "BucketNotEmpty",
+ "The bucket you tried to delete is not empty.",
+ request.path)
+ bucket.delete()
+ del self.buckets[name]
+ if self._rootdir:
+ self._save(with_buckets=False)
+ return EmptyPage(http.NO_CONTENT,
+ headers=dict(Location=request.path))
+ else:
+ return ErrorPage(http.BAD_REQUEST, "BadRequest",
+ "Your '%s' request is invalid" % request.method,
+ request.path)
+ return bucket
+
+ def check_auth(self, request):
+ """ Validates key/secret """
+ auth_str = request.getHeader('Authorization')
+ if not auth_str.startswith("AWS "):
+ return False
+ access_key, signature = auth_str[4:].split(":")
+ if not access_key in self.auth:
+ return False
+ secret_key = self.auth[ access_key ]
+ headers = request.getAllHeaders()
+ c_string = canonical_path_string(
+ request.method, request.path, headers)
+ if encode(secret_key, c_string) != signature:
+ return False
+ return True
+
+
+if __name__ == "__main__":
+ root = Root()
+ site = server.Site(root)
+ reactor.listenTCP(8808, site)
+ reactor.run()
=== added file 'txaws/s4/s4_xml.py'
--- txaws/s4/s4_xml.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/s4_xml.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,155 @@
+# Copyright 2008-2009 Canonical Ltd.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+""" build XML responses that mimic the behavior of the real S3 """
+
+from StringIO import StringIO
+from xml.etree.ElementTree import Element, ElementTree
+
+XMLNS = "http://s3.amazonaws.com/doc/2006-03-01"
+
+# <?xml version="1.0" encoding="UTF-8"?>
+def to_XML(elem):
+ """ renders an xml element to a text/xml page """
+ s = StringIO()
+ s.write("""<?xml version="1.0" encoding="UTF-8"?>\n""")
+ tree = ElementTree(elem)
+ tree.write(s)
+ return s.getvalue()
+
+def add_props(elem, **kw):
+ """ add subnodes to a xml node based on a dictionary """
+ for (key, val) in kw.iteritems():
+ prop = Element(key)
+ prop.tail = "\n"
+ if val is None:
+ val = ""
+ elif isinstance(val, bool):
+ val = str(val).lower()
+ elif not isinstance(val, str):
+ val = str(val)
+ prop.text = val
+ elem.append(prop)
+
+# <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01">
+# <Name>bucket</Name>
+# <Prefix>prefix</Prefix>
+# <Marker>marker</Marker>
+# <MaxKeys>max-keys</MaxKeys>
+# <IsTruncated>false</IsTruncated>
+# <Contents>
+# <Key>object</Key>
+# <LastModified>date</LastModified>
+# <ETag>etag</ETag>
+# <Size>size</Size>
+# <StorageClass>STANDARD</StorageClass>
+# <Owner>
+# <ID>owner_id</ID>
+# <DisplayName>owner_name</DisplayName>
+# </Owner>
+# </Contents>
+# ...
+# </ListBucketResult>
+def ListBucketResult(bucket, children):
+ """ builds the xml tree corresponding to a bucket listing """
+ root = Element("ListBucketResult", dict(xmlns=XMLNS))
+ root.tail = root.text = "\n"
+ add_props(root, **dict(
+ Name = bucket.get_name(),
+ IsTruncated = False,
+ Marker = 0,
+ ))
+ for (obname, ob) in children.iteritems():
+ contents = Element("Contents")
+ add_props(contents, **dict(
+ Key = obname,
+ LastModified = ob.get_date(),
+ ETag = ob.get_etag(),
+ Size = ob.get_size(),
+ StorageClass = "STANDARD",
+ ))
+ owner = Element("Owner")
+ add_props(owner, **dict(
+ ID = ob.get_owner_id(),
+ DisplayName = ob.get_owner(), ))
+ contents.append(owner)
+ root.append(contents)
+ return root
+
+# <Error>
+# <Code>NoSuchKey</Code>
+# <Message>The resource you requested does not exist</Message>
+# <Resource>/mybucket/myfoto.jpg</Resource>
+# <RequestId>4442587FB7D0A2F9</RequestId>
+# </Error>
+def AmazonError(code, message, resource, req_id=""):
+ """ builds xml tree corresponding to an Amazon error xml page """
+ root = Element("Error")
+ root.tail = root.text = "\n"
+ add_props(root, **dict(
+ Code = code,
+ Message = message,
+ Resource = resource,
+ RequestId = req_id))
+ return root
+
+# <ListAllMyBucketsResult xmlns="http://doc.s3.amazonaws.com/2006-03-01">
+# <Owner>
+# <ID>user_id</ID>
+# <DisplayName>display_name</DisplayName>
+# </Owner>
+# <Buckets>
+# <Bucket>
+# <Name>bucket_name</Name>
+# <CreationDate>date</CreationDate>
+# </Bucket>
+# ...
+# </Buckets>
+# </ListAllMyBucketsResult>
+def ListAllMyBucketsResult(owner, buckets):
+ """ builds xml tree corresponding to an Amazon list all buckets """
+ root = Element("ListAllMyBucketsResult", dict(xmlns=XMLNS))
+ root.tail = root.text = "\n"
+ xml_owner = Element("Owner")
+ add_props(xml_owner, **dict(
+ ID = owner["id"],
+ DisplayName = owner["name"] ))
+ root.append(xml_owner)
+ xml_buckets = Element("Buckets")
+ for bucket in buckets:
+ b = Element("Bucket")
+ add_props(b, **dict(
+ Name = bucket._name,
+ CreationDate = bucket._date))
+ xml_buckets.append(b)
+ root.append(xml_buckets)
+ return root
+
+if __name__ == '__main__':
+ # pylint: disable-msg=W0403
+ # pylint: disable-msg=E0611
+ from s4 import Bucket
+ bucket = Bucket("test-bucket")
+ lbr = ListBucketResult(bucket)
+ print to_XML(lbr)
+ print
+
+
=== added directory 'txaws/s4/testing'
=== added file 'txaws/s4/testing/__init__.py'
=== added file 'txaws/s4/testing/testcase.py'
--- txaws/s4/testing/testcase.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/testing/testcase.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,131 @@
+# Copyright 2008-2009 Canonical Ltd.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""Test case for S4 test server"""
+
+import os
+import tempfile
+import shutil
+
+from twisted.web import server
+from twisted.internet import reactor
+from twisted.trial.unittest import TestCase as TwistedTestCase
+
+from txaws.s4 import s4
+from boto.s3 import connection
+
+# pylint: disable-msg=W0201
+class S4TestCase(TwistedTestCase):
+ """ Base class for testing S4
+
+ This class takes care of starting a server instance for all S4 tests
+
+ As S4 is based on twisted, we inherit from TwistedTestCase.
+ As our tests are blocking, we decorate them with 'blocking_test' to
+ handle that.
+ """
+ s3 = None
+ logfile = None
+ storagedir = None
+ active = False
+ def setUp(self):
+ """Setup method."""
+ if not self.active:
+ self.start_server()
+
+ def tearDown(self):
+ """ tear down end testcase method """
+ # dirty hack to force closing all the cruft boto might be
+ # leaving around
+ if self.s3:
+ # this for is intentional to deal with s3._cache.__iter__ breakage
+ for key in [x for x in self.s3._cache]:
+ self.s3._cache[key].close()
+ self.s3._cache[key] = None
+ self.s3 = None
+ self.stop_server()
+
+ def connect_ok(self, access=s4.AWS_DEFAULT_ACCESS_KEY_ID,
+ secret=s4.AWS_DEFAULT_SECRET_ACCESS_KEY):
+ """ Get a valid connection to S3 (actually, to S4) """
+ if self.s3:
+ return self.s3
+ s3 = connection.S3Connection(access, secret, is_secure=False,
+ host="localhost", port=self.port,
+ calling_format=s4.CallingFormat())
+ # don't let boto do it's braindead retrying for us
+ s3.num_retries = 0
+ # Need to keep track of this connection
+ self.s3 = s3
+ return s3
+
+ @property
+ def port(self):
+ """The port."""
+ return self.conn.getHost().port
+
+ def start_server(self, persistent=False):
+ """ start the S4 listening server """
+ if self.active:
+ return
+ if persistent:
+ if not self.storagedir:
+ self.storagedir = tempfile.mkdtemp(
+ prefix="test-s4-boto-", suffix="-cache")
+ root = s4.Root(storagedir=self.storagedir)
+ else:
+ root = s4.Root()
+ self.site = server.Site(root)
+ self.active = True
+ self.conn = reactor.listenTCP(0, self.site)
+
+ def stop_server(self):
+ """ stop the S4 listening server """
+ self.active = False
+ self.conn.stopListening()
+ if self.storagedir and os.path.exists(self.storagedir):
+ shutil.rmtree(self.storagedir, ignore_errors=True)
+ self.storagedir = None
+
+ def restart_server(self, persistent=False):
+ """ restarts the S4 listening server """
+ self.stop_server()
+ self.start_server(persistent=persistent)
+
+
+from twisted.internet import threads
+from twisted.python.util import mergeFunctionMetadata
+
+def defer_to_thread(function):
+ """Run in a thread and return a Deferred that fires when done."""
+ def decorated(*args, **kwargs):
+ """Run in a thread and return a Deferred that fires when done."""
+ return threads.deferToThread(function, *args, **kwargs)
+ return mergeFunctionMetadata(function, decorated)
+
+def skip_test(reason):
+ """ tag a testcase to be skipped by the test runner """
+ def deco(f, *args, **kw):
+ """ testcase decorator """
+ f.skip = reason
+ return deco
+
+
=== added directory 'txaws/s4/tests'
=== added file 'txaws/s4/tests/__init__.py'
=== added file 'txaws/s4/tests/test_S4.py'
--- txaws/s4/tests/test_S4.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/tests/test_S4.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,194 @@
+# Copyright 2008 Canonical Ltd.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""Unit tests for S4 test server"""
+
+import time
+import unittest
+
+from txaws.s4.testing.testcase import S4TestCase, defer_to_thread
+from boto.exception import S3ResponseError, BotoServerError
+
+class TestBasicObjectManipulation(S4TestCase):
+ """Tests for basic object manipulation."""
+
+ def _get_sample_key(self, s3, content, content_type=None):
+ """ cerate a new bucket and return a sample key from content """
+ bname = "test-%.2f" % time.time()
+ bucket = s3.create_bucket(bname)
+ key = bucket.new_key("sample")
+ if content_type:
+ key.content_type = content_type
+ key.set_contents_from_string(content)
+ return key
+
+ @defer_to_thread
+ def test_get(self):
+ """ Get one object """
+
+ s3 = self.connect_ok()
+ size = 30
+ b = s3.get_bucket("size")
+ m = b.get_key(str(size))
+
+ body = m.get_contents_as_string()
+ self.assertEquals(body, "0"*size)
+ self.assertEquals(m.size, size)
+ self.assertEquals(m.content_type, "text/plain")
+
+ @defer_to_thread
+ def test_get_range(self):
+ """Get part of one object"""
+
+ s3 = self.connect_ok()
+ content = '0123456789'
+ key = self._get_sample_key(s3, content)
+ size = len(content)
+
+ def _get_range(range_start, range_size=None):
+ """test range get for various ranges"""
+ if range_size:
+ range_header = {"Range" : "bytes=%s-%s" % (
+ range_start, range_start + range_size - 1 )}
+ else:
+ range_header = {"Range" : "bytes=%s-" % (range_start,)}
+ range_size = size - range_start
+ key.open_read(headers=range_header)
+ self.assertEquals(key.size, range_size)
+ self.assertEquals(key.resp.status, 206)
+ ret = key.read()
+ body = content[range_start:range_start+range_size]
+ self.assertEquals(ret, body)
+ key.close()
+ # get a test range
+ range_size = 5
+ range_start = 2
+ _get_range(range_start)
+ _get_range(range_start, range_size)
+
+ @defer_to_thread
+ def test_get_multiple_range(self):
+ """Get part of one object"""
+
+ s3 = self.connect_ok()
+ content = '0123456789'
+ size = len(content)
+ key = self._get_sample_key(s3, content)
+ range_header = {"Range" : "bytes=0-1,5-6,9-" }
+ exc = self.assertRaises(S3ResponseError, key.open_read,
+ headers=range_header)
+ self.assertEquals(exc.status, 416)
+ key.close()
+
+ @defer_to_thread
+ def test_get_illegal_range(self):
+ """make sure first-byte-pos is present"""
+
+ s3 = self.connect_ok()
+ content = '0123456789'
+ size = len(content)
+ key = self._get_sample_key(s3, content)
+ range_header = {"Range" : "bytes=-1" }
+ exc = self.assertRaises(S3ResponseError, key.open_read,
+ headers=range_header)
+ self.assertEquals(exc.status, 416)
+ key.close()
+
+ @defer_to_thread
+ def test_get_404(self):
+ """ Try to get an object thats not there, expect 404 """
+
+ s3 = self.connect_ok()
+ bname = "test-%.2f" % time.time()
+ bucket = s3.create_bucket(bname)
+ # this does not create a key on the server side yet
+ key = bucket.new_key(bname)
+ # ... which is why we should get errors when attempting to read it
+ exc = self.assertRaises(S3ResponseError, key.open_read)
+ self.assertEquals(key.resp.status, 404)
+ self.assertEquals(exc.status, 404)
+
+ @defer_to_thread
+ def test_get_403(self):
+ """ Try to get an object with invalid credentials """
+ s3 = self.connect_ok(secret="bad secret")
+ exc = self.assertRaises(S3ResponseError, s3.get_bucket, "size")
+ self.assertEquals(exc.status, 403)
+
+
+ @defer_to_thread
+ def test_discarded(self):
+ """ put an object, get a 404 """
+ s3 = self.connect_ok()
+ bucket = s3.get_bucket("discard")
+ key = bucket.new_key("sample")
+ message = "Hello World!"
+ key.content_type = "text/lame"
+ key.set_contents_from_string(message)
+ exc = self.assertRaises(S3ResponseError, key.read)
+ self.assertEquals(exc.status, 404)
+
+ @defer_to_thread
+ def test_put(self):
+ """ put an object, get it back """
+ s3 = self.connect_ok()
+
+ message = "Hello World!"
+ key = self._get_sample_key(s3, message, "text/lame")
+ for x in range(1, 10):
+ body = key.get_contents_as_string()
+ self.assertEquals(body, message*x)
+ key.set_contents_from_string(message*(x+1))
+ self.assertEquals(key.content_type, "text/lame")
+
+ @defer_to_thread
+ def test_fail_next(self):
+ """ Test whether fail_next_put works """
+ s3 = self.connect_ok()
+ message = "Hello World!"
+ key = self._get_sample_key(s3, message, "text/lamest")
+
+ # dirty poking at our own internals, but it works...
+ self.site.resource.fail_next_put()
+
+ exc = self.assertRaises(BotoServerError, key.set_contents_from_string,
+ message)
+ self.assertEquals(exc.status, 500)
+ # next one should work
+ key.set_contents_from_string(message*2)
+ body = key.get_contents_as_string()
+ self.assertEquals(body, message*2)
+
+ # now test the get fail
+ self.site.resource.fail_next_get()
+ key.set_contents_from_string(message*3)
+ exc = self.assertRaises(BotoServerError, key.read)
+ self.assertEquals(exc.status, 500)
+ # next get should work
+ body = key.get_contents_as_string()
+ self.assertEquals(body, message*3)
+
+def test_suite():
+ """Used by the rest runner to find the tests in this module"""
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
=== added file 'txaws/s4/tests/test_boto.py'
--- txaws/s4/tests/test_boto.py 1970-01-01 00:00:00 +0000
+++ txaws/s4/tests/test_boto.py 2009-08-19 14:36:56 +0000
@@ -0,0 +1,275 @@
+#!/usr/bin/python
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#
+# test s4 implementation using the python-boto client
+
+"""
+imported (from boto) unit tests for the S3Connection
+"""
+import unittest
+
+import os
+import time
+import tempfile
+
+from StringIO import StringIO
+
+from txaws.s4.testing.testcase import S4TestCase, defer_to_thread, skip_test
+from boto.exception import S3PermissionsError
+
+# pylint: disable-msg=C0111
+class S3ConnectionTest(S4TestCase):
+ def _get_bucket(self, s3conn):
+ # create a new, empty bucket
+ bucket_name = 'test-%.3f' % time.time()
+ bucket = s3conn.create_bucket(bucket_name)
+ # now try a get_bucket call and see if it's really there
+ bucket = s3conn.get_bucket(bucket_name)
+ return bucket
+
+ @defer_to_thread
+ def test_basic(self):
+ T1 = 'This is a test of file upload and download'
+ s3conn = self.connect_ok()
+
+ all_buckets = s3conn.get_all_buckets()
+ bucket = self._get_bucket(s3conn)
+ all_buckets = s3conn.get_all_buckets()
+ self.failUnless(bucket.name in [x.name for x in all_buckets])
+ # bucket should be empty now
+ self.failUnlessEqual(bucket.get_key("missing"), None)
+ all = bucket.get_all_keys()
+ self.failUnlessEqual(len(all), 0)
+ # create a new key and store it's content from a string
+ k = bucket.new_key()
+ k.name = 'foobar'
+ k.set_contents_from_string(T1)
+ fp = StringIO()
+ # now get the contents from s3 to a local file
+ k.get_contents_to_file(fp)
+ # check to make sure content read from s3 is identical to original
+ self.failUnlessEqual(T1, fp.getvalue())
+ bucket.delete_key(k)
+ self.failUnlessEqual(bucket.get_key(k.name), None)
+
+ @defer_to_thread
+ def test_lookup(self):
+ T1 = 'This is a test of file upload and download'
+ T2 = 'This is a second string to test file upload and download'
+ s3conn = self.connect_ok()
+ bucket = self._get_bucket(s3conn)
+ # create a new key and store it's content from a string
+ k = bucket.new_key()
+ # test a few variations on get_all_keys - first load some data
+ # for the first one, let's override the content type
+ (fd, fname) = tempfile.mkstemp()
+ os.write(fd, T1)
+ os.close(fd)
+ phony_mimetype = 'application/x-boto-test'
+ headers = {'Content-Type': phony_mimetype}
+ k.name = 'foo/bar'
+ k.set_contents_from_string(T1, headers)
+ k.name = 'foo/bas'
+ k.set_contents_from_filename(fname)
+ k.name = 'foo/bat'
+ k.set_contents_from_string(T1)
+ k.name = 'fie/bar'
+ k.set_contents_from_string(T1)
+ k.name = 'fie/bas'
+ k.set_contents_from_string(T1)
+ k.name = 'fie/bat'
+ k.set_contents_from_string(T1)
+ # try resetting the contents to another value
+ md5 = k.md5
+ k.set_contents_from_string(T2)
+ self.failIfEqual(k.md5, md5)
+ os.unlink(fname)
+ all = bucket.get_all_keys()
+ self.failUnlessEqual(len(all), 6)
+ rs = bucket.get_all_keys(prefix='foo')
+ self.failUnlessEqual(len(rs), 3)
+ rs = bucket.get_all_keys(maxkeys=5)
+ self.failUnlessEqual(len(rs), 5)
+ # test the lookup method
+ k = bucket.lookup('foo/bar')
+ self.failUnless(isinstance(k, bucket.key_class))
+ self.failUnlessEqual(k.content_type, phony_mimetype)
+ k = bucket.lookup('notthere')
+ self.failUnlessEqual(k, None)
+
+ @defer_to_thread
+ def test_metadata(self):
+ T1 = 'This is a test of file upload and download'
+ s3conn = self.connect_ok()
+ bucket = self._get_bucket(s3conn)
+ # try some metadata stuff
+ k = bucket.new_key()
+ k.name = 'has_metadata'
+ mdkey1 = 'meta1'
+ mdval1 = 'This is the first metadata value'
+ k.set_metadata(mdkey1, mdval1)
+ mdkey2 = 'meta2'
+ mdval2 = 'This is the second metadata value'
+ k.set_metadata(mdkey2, mdval2)
+ k.set_contents_from_string(T1)
+ k = bucket.lookup('has_metadata')
+ self.failUnlessEqual(k.get_metadata(mdkey1), mdval1)
+ self.failUnlessEqual(k.get_metadata(mdkey2), mdval2)
+ k = bucket.new_key()
+ k.name = 'has_metadata'
+ k.get_contents_as_string()
+ self.failUnlessEqual(k.get_metadata(mdkey1), mdval1)
+ self.failUnlessEqual(k.get_metadata(mdkey2), mdval2)
+ bucket.delete_key(k)
+ # try a key with a funny character
+ rs = bucket.get_all_keys()
+ num_keys = len(rs)
+ k = bucket.new_key()
+ k.name = 'testnewline\n'
+ k.set_contents_from_string('This is a test')
+ rs = bucket.get_all_keys()
+ self.failUnlessEqual(len(rs), num_keys + 1)
+ bucket.delete_key(k)
+ rs = bucket.get_all_keys()
+ self.failUnlessEqual(len(rs), num_keys)
+
+ # tests removing objects from the store
+ @defer_to_thread
+ def test_cleanup(self):
+ s3conn = self.connect_ok()
+ bucket = self._get_bucket(s3conn)
+ for x in range(10):
+ k = bucket.new_key()
+ k.name = "foo%d" % x
+ k.set_contents_from_string("test %d" % x)
+ all = bucket.get_all_keys()
+ # now delete all keys in bucket
+ for k in all:
+ bucket.delete_key(k)
+ # now delete bucket
+ s3conn.delete_bucket(bucket)
+
+ @defer_to_thread
+ def test_connection(self):
+ s3conn = self.connect_ok()
+ bucket = self._get_bucket(s3conn)
+ all_buckets = s3conn.get_all_buckets()
+ size_bucket = s3conn.get_bucket("size")
+ discard_buucket = s3conn.get_bucket("discard")
+
+ @defer_to_thread
+ def test_persistence(self):
+ # pylint: disable-msg=W0631
+ # first, stop the server and restart it in persistent mode
+ self.restart_server(persistent=True)
+ s3conn = self.connect_ok()
+ for bcount in range(1, 5):
+ bucket = self._get_bucket(s3conn)
+ for kcount in range(1, 5):
+ k = bucket.new_key()
+ k.name = "bucket-%d-key-%d" % (bcount, kcount)
+ k.set_contents_from_string(
+ "This is key %d from bucket %d (%s)" %(
+ kcount, bcount, bucket.name))
+ k.set_metadata("bcount", bcount)
+ k.set_metadata("kcount", kcount)
+ # now get a list of all the buckets and objects in the store
+ all_buckets = s3conn.get_all_buckets()
+ all_objects = {}
+ for x in all_buckets:
+ if x.name in ["size", "discard"]:
+ continue
+ objset = all_objects.setdefault(x.name, set())
+ bucket = s3conn.get_bucket(x.name)
+ for obj in bucket.get_all_keys():
+ objset.add(obj)
+ # XXX: test metadata
+ # now stop the S4Server and restart it
+ self.restart_server(persistent=True)
+ new_buckets = s3conn.get_all_buckets()
+ self.failUnlessEqual(
+ set([x.name for x in all_buckets]),
+ set([x.name for x in new_buckets]) )
+ new_objects = {}
+ for x in new_buckets:
+ if x.name in ["size", "discard"]:
+ continue
+ objset = new_objects.setdefault(x.name, set())
+ bucket = s3conn.get_bucket(x.name)
+ for obj in bucket.get_all_keys():
+ objset.add(obj)
+ # XXX: test metadata
+ # test the newobjects
+ self.failUnlessEqual(
+ set(all_objects.keys()),
+ set(new_objects.keys()) )
+ for key in all_objects.keys():
+ self.failUnlessEqual(
+ set([x.name for x in all_objects[key]]),
+ set([x.name for x in new_objects[key]]) )
+
+ @defer_to_thread
+ def test_size_bucket(self):
+ s3conn = self.connect_ok()
+ bucket = s3conn.get_bucket("size")
+ all_keys = bucket.get_all_keys()
+ self.failUnlessEqual(all_keys, [])
+ for size in range(1, 10**7, 10000):
+ k = bucket.get_key(str(size))
+ self.failUnlessEqual(size, k.size)
+ # try to read in the last key (should be the biggest)
+ size = 0
+ k.open("r")
+ for chunk in k:
+ size += len(chunk)
+ self.failUnlessEqual(size, k.size)
+
+ @skip_test("S4 does not have this functionality yet")
+ @defer_to_thread
+ def test_acl(self):
+ s3conn = self.connect_ok()
+ bucket = self._get_bucket(s3conn)
+ # try some acl stuff
+ bucket.set_acl('public-read')
+ policy = bucket.get_acl()
+ assert len(policy.acl.grants) == 2
+ bucket.set_acl('private')
+ policy = bucket.get_acl()
+ assert len(policy.acl.grants) == 1
+ k = bucket.lookup('foo/bar')
+ k.set_acl('public-read')
+ policy = k.get_acl()
+ assert len(policy.acl.grants) == 2
+ k.set_acl('private')
+ policy = k.get_acl()
+ assert len(policy.acl.grants) == 1
+ # try the convenience methods for grants
+ bucket.add_user_grant(
+ 'FULL_CONTROL',
+ 'c1e724fbfa0979a4448393c59a8c055011f739b6d102fb37a65f26414653cd67')
+ self.failUnlessRaises(S3PermissionsError, bucket.add_email_grant,
+ 'foobar', 'foo@xxxxxxx')
+
+if __name__ == '__main__':
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(S3ConnectionTest))
+ unittest.TextTestRunner(verbosity=2).run(suite)
Follow ups