txaws-dev team mailing list archive
-
txaws-dev team
-
Mailing list archive
-
Message #00050
[Merge] lp:~dkeeney/txaws/add-multipart-upload into lp:txaws
Zooko O'Whielacronx has proposed merging lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.
Requested reviews:
txAWS Developers (txaws-dev)
Related bugs:
Bug #783801 in txAWS: "s3 multipart upload support"
https://bugs.launchpad.net/txaws/+bug/783801
For more details, see:
https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647
Okay, I read the patches on this branch: lp:~dkeeney/txaws/add-multipart-upload . The only problem I noticed is that the most recent changes -- http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/77 , http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/78 , and http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/79 -- don't have accompanying unit tests. It looks like to me from perusing the patches that this should be sufficient to support multipart upload to S3.
--
https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647
Your team txAWS Developers is requested to review the proposed merge of lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.
=== added file 'bin/txaws-delete-upload'
--- bin/txaws-delete-upload 1970-01-01 00:00:00 +0000
+++ bin/txaws-delete-upload 2011-09-22 20:23:28 +0000
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import sys
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+
+def printResults(results):
+ #print results
+ return 0
+
+def printError(error):
+ print error.value
+ return 1
+
+def finish(return_code):
+ reactor.stop(exitStatus=return_code)
+
+options, args = parse_options(__doc__.strip())
+if options.bucket is None:
+ print "Error Message: A bucket name is required."
+ sys.exit(1)
+elif options.object_name is None:
+ print "Error Message: An object name is required."
+ sys.exit(1)
+elif options.uploadid is None:
+ print "Error Message: An uploadId (-u) is required."
+ sys.exit(1)
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(
+ creds=creds, region=options.region, s3_uri=options.url)
+client = region.get_s3_client()
+
+d = client.abort_object(options.bucket, options.object_name, options.uploadid)
+d.addCallback(printResults)
+d.addErrback(printError)
+d.addCallback(finish)
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())
=== added file 'bin/txaws-list-uploads'
--- bin/txaws-list-uploads 1970-01-01 00:00:00 +0000
+++ bin/txaws-list-uploads 2011-09-22 20:23:28 +0000
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import sys
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+
+
+def printResults(results):
+ print "\nUploads:"
+ for upload in results:
+ print "%s\n %s (%s)" %(upload[0],upload[2],upload[1])
+ print "Total uploads: %s\n" % len(list(results))
+ return 0
+
+def printError(error):
+ print error.value
+ return 1
+
+def finish(return_code):
+ reactor.stop(exitStatus=return_code)
+
+options, args = parse_options(__doc__.strip())
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(
+ creds=creds, region=options.region, s3_uri=options.url)
+client = region.get_s3_client()
+
+d = client.list_mpuploads(options.bucket)
+d.addCallback(printResults)
+d.addErrback(printError)
+d.addCallback(finish)
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())
=== added file 'bin/txaws-post-upload'
--- bin/txaws-post-upload 1970-01-01 00:00:00 +0000
+++ bin/txaws-post-upload 2011-09-22 20:23:28 +0000
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import os
+import sys
+import StringIO
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+from txaws.s3.multipart import MultipartManager
+
+def printResults(results):
+ print results
+ print 'next part num:', mgr.partNumber()
+ return 0
+
+def printError(error):
+ print error.value
+ return 1
+
+def finish(return_code):
+ reactor.stop(exitStatus=return_code)
+
+
+options, args = parse_options(__doc__.strip())
+if options.bucket is None:
+ print "Error Message: A bucket name is required."
+ sys.exit(1)
+
+filename = options.object_filename
+if filename:
+ options.object_name = os.path.basename(filename)
+ try:
+ options.object_data = open(filename,'rb')
+ except Exception, error:
+ print error
+ sys.exit(1)
+elif options.object_name is None:
+ print "Error Message: An object name is required."
+ sys.exit(1)
+else:
+ # turn input data into file-like obj
+ options.object_data = StringIO.StringIO(options.object_data)
+ options.object_data.seek(0)
+
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(creds=creds, region=options.region,
+ s3_uri=options.url)
+client = region.get_s3_client()
+
+mgr = MultipartManager(client, options.bucket, options.object_name,
+ options.content_type)
+
+# send first block of data
+def startdata(uploadId):
+ if uploadId:
+ senddata()
+ else:
+ print 'uploadId not obtained'
+ reactor.stop(exitStatus=1)
+
+# send each block of data
+def senddata(ign=None):
+ tosend = options.object_data.read(6000000)
+ if tosend:
+ d = mgr.send_part(tosend)
+ d.addErrback(printError)
+ d.addCallback(senddata)
+ else:
+ d = mgr.finish()
+ d.addCallback(printResults)
+ d.addCallback(finish)
+ d.addErrback(printError)
+
+# start init
+d = mgr.initialize()
+d.addCallback(startdata)
+d.addErrback(printError)
+
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())
=== modified file 'bin/txaws-put-object'
--- bin/txaws-put-object 2011-04-13 03:23:49 +0000
+++ bin/txaws-put-object 2011-09-22 20:23:28 +0000
@@ -33,7 +33,7 @@
if filename:
options.object_name = os.path.basename(filename)
try:
- options.object_data = open(filename).read()
+ options.object_data = open(filename,'rb').read()
except Exception, error:
print error
sys.exit(1)
=== modified file 'txaws/client/base.py'
--- txaws/client/base.py 2011-04-21 21:16:37 +0000
+++ txaws/client/base.py 2011-09-22 20:23:28 +0000
@@ -1,7 +1,12 @@
+<<<<<<< TREE
try:
from xml.etree.ElementTree import ParseError
except ImportError:
from xml.parsers.expat import ExpatError as ParseError
+=======
+
+from xml.parsers.expat import ExpatError
+>>>>>>> MERGE-SOURCE
from twisted.internet import reactor, ssl
from twisted.web import http
@@ -91,6 +96,7 @@
"""
contextFactory = None
scheme, host, port, path = parse(url)
+ #print >> sys.stderr, 'gp:'+str(path)
self.client = self.factory(url, *args, **kwds)
if scheme == 'https':
contextFactory = ssl.ClientContextFactory()
=== modified file 'txaws/s3/client.py'
--- txaws/s3/client.py 2011-04-14 19:50:30 +0000
+++ txaws/s3/client.py 2011-09-22 20:23:28 +0000
@@ -12,8 +12,13 @@
functionality in this wrapper.
"""
import mimetypes
+import time
from twisted.web.http import datetimeToString
+from twisted.web2 import stream, http_headers
+from twisted.web2 import http as web2_http
+from twisted.web2.client import http as web2_client_http
+from twisted.internet import protocol, reactor, ssl
from epsilon.extime import Time
@@ -23,11 +28,14 @@
Bucket, BucketItem, BucketListing, ItemOwner, RequestPayment)
from txaws.s3.exception import S3Error
from txaws.service import AWSServiceEndpoint, S3_ENDPOINT
-from txaws.util import XML, calculate_md5
+from txaws.util import XML, calculate_md5, parse
def s3_error_wrapper(error):
- error_wrapper(error, S3Error)
+ print '\n'
+ print str(error.__dict__)
+ print str(error)
+ #error_wrapper(error, S3Error)
class URLContext(object):
@@ -64,9 +72,15 @@
class S3Client(BaseClient):
"""A client for S3."""
- def __init__(self, creds=None, endpoint=None, query_factory=None):
+ def __init__(self, creds=None, endpoint=None, query_factory=None, streaming_query_factory=None):
if query_factory is None:
query_factory = Query
+
+ if streaming_query_factory is None:
+ streaming_query_factory = StreamingQuery
+
+ self.streaming_query_factory = streaming_query_factory
+
super(S3Client, self).__init__(creds, endpoint, query_factory)
def list_buckets(self):
@@ -257,6 +271,15 @@
bucket=bucket, object_name=object_name)
return query.submit()
+ def get_object_stream(self, bucket, object_name):
+ """
+ Get an object from a bucket.
+ """
+ query = self.streaming_query_factory(
+ action="GET", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=object_name)
+ return query.submit()
+
def head_object(self, bucket, object_name):
"""
Retrieve object metadata only.
@@ -278,6 +301,7 @@
bucket=bucket, object_name=object_name)
return query.submit()
+<<<<<<< TREE
def get_object_acl(self, bucket, object_name):
"""
Get the access control policy for an object.
@@ -320,6 +344,140 @@
"""
return RequestPayment.from_xml(xml_bytes).payer
+=======
+ def post_object_init(self, bucket, object_name, content_type=None,
+ metadata={}):
+ """
+ Starts a multipart upload to a bucket.
+ Any existing object of the same name will be replaced.
+
+ returns a string uploadId
+ """
+ objectname_plus = object_name+'?uploads'
+ query = self.query_factory(
+ action="POST", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=objectname_plus, data='',
+ content_type=content_type, metadata=metadata)
+ d = query.submit()
+ return d.addCallback(self._parse_post_init)
+
+ def _parse_post_init(self, xml_bytes):
+ """parse the response to post_object_init"""
+ root = XML(xml_bytes)
+ uploadId = root.findtext("UploadId")
+ return uploadId
+
+ def put_object_part(self,bucket,object_name,uploadId,data,partNumber,
+ content_type=None,metadata={}):
+ """
+ Continues a multipart upload to a bucket.
+ Sends another slug of data
+
+ returns deferred from query.submit
+ """
+ assert(uploadId,'start multipart upload with a .post_object_init')
+ # this is backwards from docs, but it works
+ parms = 'partNumber='+str(partNumber)+'&uploadId='+uploadId
+ objectname_plus = object_name+'?'+parms
+ query = self.query_factory(
+ action="PUT", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=objectname_plus, data=data,
+ content_type=content_type, metadata=metadata)
+ d = query.submit()
+ d.addCallback(query.get_response_headers)
+ return d
+
+ def abort_object(self,bucket,object_name,uploadId,
+ content_type=None,metadata={}):
+ """
+ Aborts a multipart upload to a bucket.
+
+ returns deferred from query.submit
+ """
+ assert(uploadId,'cannot abort an unidentified upload')
+ objectname_plus = object_name+'?uploadId='+uploadId
+ query = self.query_factory(
+ action="DELETE", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=objectname_plus, data='',
+ content_type=content_type, metadata=metadata)
+ d = query.submit()
+ return d
+
+ def post_object_finish(self, bucket, object_name, uploadId, partsList,
+ content_type=None, metadata={}):
+ """
+ Completes the multipart upload.
+
+ Can be slow, returns deferred
+ """
+ assert(uploadId,'start multipart upload with a .post_object_init')
+ # assemble xml
+ body = self._buildxmlPartsList(partsList)
+ #
+ objectname_plus = object_name+'?uploadId='+uploadId
+ query = self.query_factory(
+ action="POST", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=objectname_plus, data=body,
+ content_type=content_type, metadata=metadata)
+ d = query.submit()
+ d.addCallback(self._parse_finish_response)
+ return d
+
+ def _buildxmlPartsList(self,partsList):
+ xml = []
+ partsList.sort(key=lambda p: p[0])
+ xml.append('<CompleteMultipartUpload>')
+ for pt in partsList:
+ xml.append('<Part>')
+ xmlp = ''.join(['<PartNumber>',pt[0],'</PartNumber>'])
+ xml.append(xmlp)
+ xmlp = ''.join(['<ETag>',pt[1],'</ETag>'])
+ xml.append(xmlp)
+ xml.append('</Part>')
+ xml.append('</CompleteMultipartUpload>')
+ return '\n'.join(xml)
+
+ def _parse_finish_response(self,xml_bytes):
+ """parse the response to post_object_init"""
+ root = XML(xml_bytes)
+ errorNode = root.find("Error")
+ uploadRes = root.findtext("Key")
+ if errorNode:
+ error = errorNode.findtext('Message')
+ raise txaws.s3.exception.S3Error('Error: %s'%error)
+ if uploadRes:
+ return 'multipart upload complete %s'%uploadRes
+ raise txaws.s3.exception.S3Error(
+ 'multipart upload finish did not return valid page: \n%s'%xml_bytes)
+
+ def list_mpuploads(self, bucket, content_type=None, metadata={}):
+ """
+ Gets a list of started but not finished multipart uploads in a bucket.
+
+ returns a list
+ """
+ path = '?uploads'
+ query = self.query_factory(
+ action="GET", creds=self.creds, endpoint=self.endpoint,
+ bucket=bucket, object_name=path, data='',
+ content_type=content_type, metadata=metadata)
+ d = query.submit()
+ return d.addCallback(self._parse_mpupload_list)
+
+ def _parse_mpupload_list(self, xml_bytes):
+ """
+ Parse XML multipart upload list response.
+ """
+ root = XML(xml_bytes)
+ uploads = []
+ for uploads_data in root.findall("Upload"):
+ uploadId = uploads_data.findtext("UploadId")
+ key = uploads_data.findtext("Key")
+ initdate = uploads_data.findtext("Initiated")
+ uploads.append((uploadId,initdate,key))
+ return uploads
+
+>>>>>>> MERGE-SOURCE
class Query(BaseQuery):
"""A query for submission to the S3 service."""
@@ -417,4 +575,47 @@
d = self.get_page(
url_context.get_url(), method=self.action, postdata=self.data,
headers=self.get_headers())
- return d.addErrback(s3_error_wrapper)
+ return d
+ #return d.addErrback(s3_error_wrapper)
+
+class StreamingQuery(Query):
+ def get_page(self, url, method="GET", postdata=None, headers=None,
+ agent=None, timeout=0, cookies=None,
+ followRedirect=True, redirectLimit=20, afterFoundGet=False):
+ scheme, host, port, path = parse(url)
+
+ postdata = stream.MemoryStream(postdata)
+
+ rawHeaders = {}
+
+ for name, value in headers.items():
+ rawHeaders[name] = [str(value)]
+
+ headers = http_headers.Headers(rawHeaders=rawHeaders)
+
+ request = web2_client_http.ClientRequest(method, url, headers, None)
+
+ client = protocol.ClientCreator(reactor, web2_client_http.HTTPClientProtocol)
+
+ if scheme == 'https':
+ contextFactory = ssl.ClientContextFactory()
+ deferred = client.connectSSL(host, port, contextFactory)
+ else:
+ deferred = client.connectTCP(host, port)
+
+ def connected(proto):
+ d = proto.submitRequest(request)
+ d.addCallback(handleResponse)
+
+ return d
+
+ def handleResponse(response):
+ if not response.code in (200, 204):
+ error = web2_http.HTTPError(response)
+ raise error
+
+ return response.stream
+
+ deferred.addCallback(connected)
+
+ return deferred
=== added file 'txaws/s3/multipart.py'
--- txaws/s3/multipart.py 1970-01-01 00:00:00 +0000
+++ txaws/s3/multipart.py 2011-09-22 20:23:28 +0000
@@ -0,0 +1,81 @@
+
+
+
+class MultipartManager(object):
+ """A client for S3."""
+
+ def __init__(self, client, bucket=None, object_name=None, content_type=None,
+ metadata={}):
+ self.client = client
+ self.object_name = object_name
+ self.bucket = bucket
+ self.content_type = content_type
+ self.metadata = metadata
+ self.uploadId = False
+ self.partTuples = []
+
+ def initialize(self):
+ """
+ Starts a multipart upload to a bucket.
+ Any existing object of the same name will be replaced.
+
+ returns a deferred
+ """
+ def saveUploadId(uploadId):
+ self.uploadId = uploadId
+ return uploadId
+ d = self.client.post_object_init(bucket=self.bucket,
+ object_name=self.object_name,
+ content_type=self.content_type,
+ metadata=self.metadata)
+ return d.addCallback(saveUploadId)
+
+ def partNumber(self):
+ return str(len(self.partTuples)+1)
+
+ def send_part(self, data):
+ """
+ Continues a multipart upload to a bucket.
+ Sends another slug of data
+
+ returns deferred from query.submit
+ """
+ assert(self.uploadId,'start multipart upload with a .post_object_init')
+ def process_headers(hdrs):
+ # save eTags and partNumbers for use by finish
+ etag = hdrs['etag'][0].strip('""')
+ partNum = self.partNumber()
+ tupl = (partNum,etag)
+ self.partTuples.append(tupl)
+ return hdrs
+ d = self.client.put_object_part(bucket=self.bucket,
+ object_name=self.object_name,
+ uploadId=self.uploadId, data=data,
+ partNumber=self.partNumber())
+ d.addCallback(process_headers)
+ return d
+
+ def abort(self):
+ """
+ Aborts the multipart upload.
+
+ Returns deferred
+ """
+ d = self.client.abort_object(bucket=self.bucket,
+ object_name=self.object_name,
+ uploadId=self.uploadId)
+ return d
+
+ def finish(self):
+ """
+ Completes the multipart upload.
+
+ Can be slow, returns deferred
+ """
+ d = self.client.post_object_finish(bucket=self.bucket,
+ object_name=self.object_name,
+ uploadId=self.uploadId,
+ partsList=self.partTuples)
+ return d
+
+
=== modified file 'txaws/script.py'
--- txaws/script.py 2009-11-22 21:53:54 +0000
+++ txaws/script.py 2011-09-22 20:23:28 +0000
@@ -32,6 +32,9 @@
parser.add_option(
"-c", "--content-type", dest="content_type",
help="content type of the object")
+ parser.add_option(
+ "-u", "--upload-id", dest="uploadid",
+ help="upload id of the object")
options, args = parser.parse_args()
if not (options.access_key and options.secret_key):
parser.error(