← Back to team overview

txaws-dev team mailing list archive

[Merge] lp:~dkeeney/txaws/add-multipart-upload into lp:txaws

 

Zooko O'Whielacronx has proposed merging lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.

Requested reviews:
  txAWS Developers (txaws-dev)
Related bugs:
  Bug #783801 in txAWS: "s3 multipart upload support"
  https://bugs.launchpad.net/txaws/+bug/783801

For more details, see:
https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647

Okay, I read the patches on this branch: lp:~dkeeney/txaws/add-multipart-upload . The only problem I noticed is that the most recent changes -- http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/77 , http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/78 , and http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/79 -- don't have accompanying unit tests. It looks like to me from perusing the patches that this should be sufficient to support multipart upload to S3.
-- 
https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647
Your team txAWS Developers is requested to review the proposed merge of lp:~dkeeney/txaws/add-multipart-upload into lp:txaws.
=== added file 'bin/txaws-delete-upload'
--- bin/txaws-delete-upload	1970-01-01 00:00:00 +0000
+++ bin/txaws-delete-upload	2011-09-22 20:23:28 +0000
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import sys
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+
+def printResults(results):
+    #print results
+    return 0
+
+def printError(error):
+    print error.value
+    return 1
+
+def finish(return_code):
+    reactor.stop(exitStatus=return_code)
+
+options, args = parse_options(__doc__.strip())
+if options.bucket is None:
+    print "Error Message: A bucket name is required."
+    sys.exit(1)
+elif options.object_name is None:
+    print "Error Message: An object name is required."
+    sys.exit(1)
+elif options.uploadid is None:
+    print "Error Message: An uploadId (-u) is required."
+    sys.exit(1)
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(
+    creds=creds, region=options.region, s3_uri=options.url)
+client = region.get_s3_client()
+
+d = client.abort_object(options.bucket, options.object_name, options.uploadid)
+d.addCallback(printResults)
+d.addErrback(printError)
+d.addCallback(finish)
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())

=== added file 'bin/txaws-list-uploads'
--- bin/txaws-list-uploads	1970-01-01 00:00:00 +0000
+++ bin/txaws-list-uploads	2011-09-22 20:23:28 +0000
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import sys
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+
+
+def printResults(results):
+    print "\nUploads:"
+    for upload in results:
+        print "%s\n  %s (%s)" %(upload[0],upload[2],upload[1])
+    print "Total uploads: %s\n" % len(list(results))
+    return 0
+
+def printError(error):
+    print error.value
+    return 1
+
+def finish(return_code):
+    reactor.stop(exitStatus=return_code)
+
+options, args = parse_options(__doc__.strip())
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(
+    creds=creds, region=options.region, s3_uri=options.url)
+client = region.get_s3_client()
+
+d = client.list_mpuploads(options.bucket)
+d.addCallback(printResults)
+d.addErrback(printError)
+d.addCallback(finish)
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())

=== added file 'bin/txaws-post-upload'
--- bin/txaws-post-upload	1970-01-01 00:00:00 +0000
+++ bin/txaws-post-upload	2011-09-22 20:23:28 +0000
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+"""
+%prog [options]
+"""
+
+import os
+import sys
+import StringIO
+
+from txaws.credentials import AWSCredentials
+from txaws.script import parse_options
+from txaws.service import AWSServiceRegion
+from txaws.reactor import reactor
+from txaws.s3.multipart import MultipartManager
+
+def printResults(results):
+    print results
+    print 'next part num:', mgr.partNumber()
+    return 0
+
+def printError(error):
+    print error.value
+    return 1
+
+def finish(return_code):
+    reactor.stop(exitStatus=return_code)
+
+
+options, args = parse_options(__doc__.strip())
+if options.bucket is None:
+    print "Error Message: A bucket name is required."
+    sys.exit(1)
+
+filename = options.object_filename
+if filename:
+    options.object_name = os.path.basename(filename)
+    try:
+        options.object_data = open(filename,'rb')
+    except Exception, error:
+        print error
+        sys.exit(1)
+elif options.object_name is None:
+    print "Error Message: An object name is required."
+    sys.exit(1)
+else:
+    # turn input data into file-like obj
+    options.object_data = StringIO.StringIO(options.object_data)
+    options.object_data.seek(0)
+
+creds = AWSCredentials(options.access_key, options.secret_key)
+region = AWSServiceRegion(creds=creds, region=options.region,
+                          s3_uri=options.url)
+client = region.get_s3_client()
+
+mgr = MultipartManager(client, options.bucket, options.object_name,
+                       options.content_type)
+
+# send first block of data
+def startdata(uploadId):
+    if uploadId:
+        senddata()
+    else:
+        print 'uploadId not obtained'
+        reactor.stop(exitStatus=1)
+        
+# send each block of data
+def senddata(ign=None):
+    tosend = options.object_data.read(6000000)
+    if tosend:
+        d = mgr.send_part(tosend)
+        d.addErrback(printError)
+        d.addCallback(senddata)
+    else:
+        d = mgr.finish()
+        d.addCallback(printResults)
+        d.addCallback(finish)
+        d.addErrback(printError)
+
+# start init
+d = mgr.initialize()
+d.addCallback(startdata)
+d.addErrback(printError)
+
+# We use a custom reactor so that we can return the exit status from
+# reactor.run().
+sys.exit(reactor.run())

=== modified file 'bin/txaws-put-object'
--- bin/txaws-put-object	2011-04-13 03:23:49 +0000
+++ bin/txaws-put-object	2011-09-22 20:23:28 +0000
@@ -33,7 +33,7 @@
 if filename:
     options.object_name = os.path.basename(filename)
     try:
-        options.object_data = open(filename).read()
+        options.object_data = open(filename,'rb').read()
     except Exception, error:
         print error
         sys.exit(1)

=== modified file 'txaws/client/base.py'
--- txaws/client/base.py	2011-04-21 21:16:37 +0000
+++ txaws/client/base.py	2011-09-22 20:23:28 +0000
@@ -1,7 +1,12 @@
+<<<<<<< TREE
 try:
     from xml.etree.ElementTree import ParseError
 except ImportError:
     from xml.parsers.expat import ExpatError as ParseError
+=======
+
+from xml.parsers.expat import ExpatError
+>>>>>>> MERGE-SOURCE
 
 from twisted.internet import reactor, ssl
 from twisted.web import http
@@ -91,6 +96,7 @@
         """
         contextFactory = None
         scheme, host, port, path = parse(url)
+        #print >> sys.stderr, 'gp:'+str(path)
         self.client = self.factory(url, *args, **kwds)
         if scheme == 'https':
             contextFactory = ssl.ClientContextFactory()

=== modified file 'txaws/s3/client.py'
--- txaws/s3/client.py	2011-04-14 19:50:30 +0000
+++ txaws/s3/client.py	2011-09-22 20:23:28 +0000
@@ -12,8 +12,13 @@
 functionality in this wrapper.
 """
 import mimetypes
+import time
 
 from twisted.web.http import datetimeToString
+from twisted.web2 import stream, http_headers
+from twisted.web2 import http as web2_http
+from twisted.web2.client import http as web2_client_http
+from twisted.internet import protocol, reactor, ssl
 
 from epsilon.extime import Time
 
@@ -23,11 +28,14 @@
     Bucket, BucketItem, BucketListing, ItemOwner, RequestPayment)
 from txaws.s3.exception import S3Error
 from txaws.service import AWSServiceEndpoint, S3_ENDPOINT
-from txaws.util import XML, calculate_md5
+from txaws.util import XML, calculate_md5, parse
 
 
 def s3_error_wrapper(error):
-    error_wrapper(error, S3Error)
+    print '\n'
+    print str(error.__dict__)
+    print str(error)
+    #error_wrapper(error, S3Error)
 
 
 class URLContext(object):
@@ -64,9 +72,15 @@
 class S3Client(BaseClient):
     """A client for S3."""
 
-    def __init__(self, creds=None, endpoint=None, query_factory=None):
+    def __init__(self, creds=None, endpoint=None, query_factory=None, streaming_query_factory=None):
         if query_factory is None:
             query_factory = Query
+
+        if streaming_query_factory is None:
+            streaming_query_factory = StreamingQuery
+
+        self.streaming_query_factory = streaming_query_factory
+
         super(S3Client, self).__init__(creds, endpoint, query_factory)
 
     def list_buckets(self):
@@ -257,6 +271,15 @@
             bucket=bucket, object_name=object_name)
         return query.submit()
 
+    def get_object_stream(self, bucket, object_name):
+        """
+        Get an object from a bucket.
+        """
+        query = self.streaming_query_factory(
+            action="GET", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=object_name)
+        return query.submit()
+
     def head_object(self, bucket, object_name):
         """
         Retrieve object metadata only.
@@ -278,6 +301,7 @@
             bucket=bucket, object_name=object_name)
         return query.submit()
 
+<<<<<<< TREE
     def get_object_acl(self, bucket, object_name):
         """
         Get the access control policy for an object.
@@ -320,6 +344,140 @@
         """
         return RequestPayment.from_xml(xml_bytes).payer
 
+=======
+    def post_object_init(self, bucket, object_name, content_type=None,
+                         metadata={}):
+        """
+        Starts a multipart upload to a bucket.
+        Any existing object of the same name will be replaced.
+        
+        returns a string uploadId
+        """
+        objectname_plus = object_name+'?uploads'
+        query = self.query_factory(
+            action="POST", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=objectname_plus, data='',
+            content_type=content_type, metadata=metadata)
+        d = query.submit()
+        return d.addCallback(self._parse_post_init)
+    
+    def _parse_post_init(self, xml_bytes):
+        """parse the response to post_object_init"""
+        root = XML(xml_bytes)
+        uploadId = root.findtext("UploadId")
+        return uploadId
+        
+    def put_object_part(self,bucket,object_name,uploadId,data,partNumber,
+                        content_type=None,metadata={}):
+        """
+        Continues a multipart upload to a bucket.
+        Sends another slug of data
+        
+        returns deferred from query.submit
+        """
+        assert(uploadId,'start multipart upload with a .post_object_init')
+        # this is backwards from docs, but it works
+        parms = 'partNumber='+str(partNumber)+'&uploadId='+uploadId
+        objectname_plus = object_name+'?'+parms
+        query = self.query_factory(
+            action="PUT", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=objectname_plus, data=data,
+            content_type=content_type, metadata=metadata)
+        d = query.submit()
+        d.addCallback(query.get_response_headers)
+        return d
+            
+    def abort_object(self,bucket,object_name,uploadId,
+                        content_type=None,metadata={}):
+        """
+        Aborts a multipart upload to a bucket.
+        
+        returns deferred from query.submit
+        """
+        assert(uploadId,'cannot abort an unidentified upload')
+        objectname_plus = object_name+'?uploadId='+uploadId
+        query = self.query_factory(
+            action="DELETE", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=objectname_plus, data='',
+            content_type=content_type, metadata=metadata)
+        d = query.submit()
+        return d
+                
+    def post_object_finish(self, bucket, object_name, uploadId, partsList,
+                           content_type=None, metadata={}):
+        """
+        Completes the multipart upload.
+        
+        Can be slow, returns deferred
+        """
+        assert(uploadId,'start multipart upload with a .post_object_init')
+        # assemble xml
+        body = self._buildxmlPartsList(partsList)
+        #
+        objectname_plus = object_name+'?uploadId='+uploadId
+        query = self.query_factory(
+            action="POST", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=objectname_plus, data=body,
+            content_type=content_type, metadata=metadata)
+        d = query.submit()
+        d.addCallback(self._parse_finish_response)
+        return d
+        
+    def _buildxmlPartsList(self,partsList):
+        xml = []
+        partsList.sort(key=lambda p: p[0])
+        xml.append('<CompleteMultipartUpload>')
+        for pt in partsList:
+            xml.append('<Part>')
+            xmlp = ''.join(['<PartNumber>',pt[0],'</PartNumber>'])
+            xml.append(xmlp)
+            xmlp = ''.join(['<ETag>',pt[1],'</ETag>'])
+            xml.append(xmlp)
+            xml.append('</Part>')
+        xml.append('</CompleteMultipartUpload>')
+        return '\n'.join(xml)
+
+    def _parse_finish_response(self,xml_bytes):
+        """parse the response to post_object_init"""
+        root = XML(xml_bytes)
+        errorNode = root.find("Error")
+        uploadRes = root.findtext("Key")
+        if errorNode:
+            error = errorNode.findtext('Message')
+            raise txaws.s3.exception.S3Error('Error: %s'%error)
+        if uploadRes:
+            return 'multipart upload complete %s'%uploadRes
+        raise txaws.s3.exception.S3Error(
+            'multipart upload finish did not return valid page: \n%s'%xml_bytes)
+        
+    def list_mpuploads(self, bucket, content_type=None, metadata={}):
+        """
+        Gets a list of started but not finished multipart uploads in a bucket.
+        
+        returns a list
+        """
+        path = '?uploads'
+        query = self.query_factory(
+            action="GET", creds=self.creds, endpoint=self.endpoint,
+            bucket=bucket, object_name=path, data='',
+            content_type=content_type, metadata=metadata)
+        d = query.submit()
+        return d.addCallback(self._parse_mpupload_list)
+    
+    def _parse_mpupload_list(self, xml_bytes):
+        """
+        Parse XML multipart upload list response.
+        """
+        root = XML(xml_bytes)
+        uploads = []
+        for uploads_data in root.findall("Upload"):
+            uploadId = uploads_data.findtext("UploadId")
+            key = uploads_data.findtext("Key")
+            initdate = uploads_data.findtext("Initiated")
+            uploads.append((uploadId,initdate,key))
+        return uploads
+    
+>>>>>>> MERGE-SOURCE
 
 class Query(BaseQuery):
     """A query for submission to the S3 service."""
@@ -417,4 +575,47 @@
         d = self.get_page(
             url_context.get_url(), method=self.action, postdata=self.data,
             headers=self.get_headers())
-        return d.addErrback(s3_error_wrapper)
+        return d
+        #return d.addErrback(s3_error_wrapper)
+
+class StreamingQuery(Query):
+    def get_page(self, url, method="GET", postdata=None, headers=None,
+                    agent=None, timeout=0, cookies=None,
+                    followRedirect=True, redirectLimit=20, afterFoundGet=False):
+        scheme, host, port, path = parse(url)
+
+        postdata = stream.MemoryStream(postdata)
+
+        rawHeaders = {}
+
+        for name, value in headers.items():
+            rawHeaders[name] = [str(value)]
+
+        headers = http_headers.Headers(rawHeaders=rawHeaders)
+
+        request = web2_client_http.ClientRequest(method, url, headers, None)
+
+        client = protocol.ClientCreator(reactor, web2_client_http.HTTPClientProtocol)
+
+        if scheme == 'https':
+            contextFactory = ssl.ClientContextFactory()
+            deferred = client.connectSSL(host, port, contextFactory)
+        else:
+            deferred = client.connectTCP(host, port)
+
+        def connected(proto):
+            d = proto.submitRequest(request)
+            d.addCallback(handleResponse)
+
+            return d
+
+        def handleResponse(response):
+            if not response.code in (200, 204):
+                error = web2_http.HTTPError(response)
+                raise error
+
+            return response.stream
+
+        deferred.addCallback(connected)
+
+        return deferred

=== added file 'txaws/s3/multipart.py'
--- txaws/s3/multipart.py	1970-01-01 00:00:00 +0000
+++ txaws/s3/multipart.py	2011-09-22 20:23:28 +0000
@@ -0,0 +1,81 @@
+
+
+
+class MultipartManager(object):
+    """A client for S3."""
+
+    def __init__(self, client, bucket=None, object_name=None, content_type=None,
+                 metadata={}):
+        self.client = client
+        self.object_name = object_name
+        self.bucket = bucket
+        self.content_type = content_type
+        self.metadata = metadata
+        self.uploadId = False
+        self.partTuples = []
+
+    def initialize(self):
+        """
+        Starts a multipart upload to a bucket.
+        Any existing object of the same name will be replaced.
+        
+        returns a deferred
+        """
+        def saveUploadId(uploadId):
+            self.uploadId = uploadId
+            return uploadId
+        d = self.client.post_object_init(bucket=self.bucket,
+                                         object_name=self.object_name,
+                                         content_type=self.content_type,
+                                         metadata=self.metadata)
+        return d.addCallback(saveUploadId)
+    
+    def partNumber(self):
+        return str(len(self.partTuples)+1)
+        
+    def send_part(self, data):
+        """
+        Continues a multipart upload to a bucket.
+        Sends another slug of data
+        
+        returns deferred from query.submit
+        """
+        assert(self.uploadId,'start multipart upload with a .post_object_init')
+        def process_headers(hdrs):
+            # save eTags and partNumbers for use by finish
+            etag = hdrs['etag'][0].strip('""')
+            partNum = self.partNumber()
+            tupl = (partNum,etag)
+            self.partTuples.append(tupl)
+            return hdrs
+        d = self.client.put_object_part(bucket=self.bucket,
+                                        object_name=self.object_name,
+                                        uploadId=self.uploadId, data=data,
+                                        partNumber=self.partNumber())
+        d.addCallback(process_headers)
+        return d
+            
+    def abort(self):
+        """
+        Aborts the multipart upload.
+        
+        Returns deferred
+        """
+        d = self.client.abort_object(bucket=self.bucket,
+                                     object_name=self.object_name,
+                                     uploadId=self.uploadId)
+        return d
+    
+    def finish(self):
+        """
+        Completes the multipart upload.
+        
+        Can be slow, returns deferred
+        """
+        d = self.client.post_object_finish(bucket=self.bucket,
+                                           object_name=self.object_name,
+                                           uploadId=self.uploadId,
+                                           partsList=self.partTuples)
+        return d
+        
+    

=== modified file 'txaws/script.py'
--- txaws/script.py	2009-11-22 21:53:54 +0000
+++ txaws/script.py	2011-09-22 20:23:28 +0000
@@ -32,6 +32,9 @@
     parser.add_option(
         "-c", "--content-type", dest="content_type",
         help="content type of the object")
+    parser.add_option(
+        "-u", "--upload-id", dest="uploadid",
+        help="upload id of the object")
     options, args = parser.parse_args()
     if not (options.access_key and options.secret_key):
         parser.error(