← Back to team overview

duplicity-team team mailing list archive

[Merge] lp:~prateek/duplicity/s3-glacier into lp:duplicity

 

someone1 has proposed merging lp:~prateek/duplicity/s3-glacier into lp:duplicity.

Commit message:
Fixes https://bugs.launchpad.net/duplicity/+bug/1039511 - Adds support to detect when a file is on Glacier and initiates a restore to S3. Also merges overlapping code in the boto backends
Fixes https://bugs.launchpad.net/duplicity/+bug/1243246 - Adds a --s3_multipart_max_timeout input option to limit the max execution time of a chunked upload to S3. Also adds debug message to calculate upload speed.

Requested reviews:
  duplicity-team (duplicity-team)
Related bugs:
  Bug #1039511 in Duplicity: "Support for Amazon Glacier"
  https://bugs.launchpad.net/duplicity/+bug/1039511
  Bug #1243246 in Duplicity: "Duplicity Hangs intermittently when using multiprocess uploads to S3"
  https://bugs.launchpad.net/duplicity/+bug/1243246

For more details, see:
https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719

How this addresses bug 1039511:
If a file located in S3 is found to be on Glacier, it will initiate a restore to S3 and wait until the file is ready to continue the restoration process.

This branch also merged _boto_single and _boto_multi as a majority of the code overlaps, so to make updates easier, having _boto_multi as a subclass to _boto_single makes it so changes to shared code is only done in one place.


-- 
https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
Your team duplicity-team is requested to review the proposed merge of lp:~prateek/duplicity/s3-glacier into lp:duplicity.
=== modified file 'README'
--- README	2014-01-24 12:39:40 +0000
+++ README	2014-02-21 17:46:27 +0000
@@ -29,6 +29,7 @@
  * Boto 2.0 or later for single-processing S3 or GCS access (default)
  * Boto 2.1.1 or later for multi-processing S3 access
  * Python v2.6 or later for multi-processing S3 access
+ * Boto 2.7.0 or later for Glacier S3 access
 
 If you install from the source package, you will also need:
 

=== modified file 'bin/duplicity'
--- bin/duplicity	2014-02-05 02:57:01 +0000
+++ bin/duplicity	2014-02-21 17:46:27 +0000
@@ -735,6 +735,15 @@
             log.Progress(_('Processed volume %d of %d') % (cur_vol[0], num_vols),
                          cur_vol[0], num_vols)
 
+    if hasattr(globals.backend, 'pre_process_download'):
+        file_names = []
+        for backup_set in backup_setlist:
+            manifest = backup_set.get_manifest()
+            volumes = manifest.get_containing_volumes(index)
+            for vol_num in volumes:
+                file_names.append(backup_set.volume_name_dict[vol_num])
+        globals.backend.pre_process_download(file_names)
+
     fileobj_iters = map(get_fileobj_iter, backup_setlist)
     tarfiles = map(patchdir.TarFile_FromFileobjs, fileobj_iters)
     return patchdir.tarfiles2rop_iter(tarfiles, index)
@@ -1142,6 +1151,8 @@
                     local_missing = [] # don't download if we can't decrypt
             for fn in local_spurious:
                 remove_local(fn)
+            if hasattr(globals.backend, 'pre_process_download'):
+                globals.backend.pre_process_download(local_missing)
             for fn in local_missing:
                 copy_to_local(fn)
         else:

=== modified file 'duplicity/backends/_boto_multi.py'
--- duplicity/backends/_boto_multi.py	2014-01-13 15:54:13 +0000
+++ duplicity/backends/_boto_multi.py	2014-02-21 17:46:27 +0000
@@ -22,20 +22,20 @@
 
 import os
 import sys
-import time
 import threading
 import Queue
-
-import duplicity.backend
+import time
+import traceback
 
 from duplicity import globals
 from duplicity import log
 from duplicity.errors import * #@UnusedWildImport
-from duplicity.util import exception_traceback
-from duplicity.backend import retry
 from duplicity.filechunkio import FileChunkIO
 from duplicity import progress
 
+from _boto_single import BotoBackend as BotoSingleBackend
+from _boto_single import get_connection
+
 BOTO_MIN_VERSION = "2.1.1"
 
 # Multiprocessing is not supported on *BSD
@@ -61,100 +61,13 @@
     def run(self):
         while not self.finish:
             try:
-                args = self.queue.get(True, 1) 
+                args = self.queue.get(True, 1)
                 progress.report_transfer(args[0], args[1])
             except Queue.Empty, e:
                 pass
-            
-
-def get_connection(scheme, parsed_url):
-    try:
-        import boto
-        assert boto.Version >= BOTO_MIN_VERSION
-
-        from boto.s3.connection import S3Connection
-        assert hasattr(S3Connection, 'lookup')
-
-        # Newer versions of boto default to using
-        # virtual hosting for buckets as a result of
-        # upstream deprecation of the old-style access
-        # method by Amazon S3. This change is not
-        # backwards compatible (in particular with
-        # respect to upper case characters in bucket
-        # names); so we default to forcing use of the
-        # old-style method unless the user has
-        # explicitly asked us to use new-style bucket
-        # access.
-        #
-        # Note that if the user wants to use new-style
-        # buckets, we use the subdomain calling form
-        # rather than given the option of both
-        # subdomain and vhost. The reason being that
-        # anything addressable as a vhost, is also
-        # addressable as a subdomain. Seeing as the
-        # latter is mostly a convenience method of
-        # allowing browse:able content semi-invisibly
-        # being hosted on S3, the former format makes
-        # a lot more sense for us to use - being
-        # explicit about what is happening (the fact
-        # that we are talking to S3 servers).
-
-        try:
-            from boto.s3.connection import OrdinaryCallingFormat
-            from boto.s3.connection import SubdomainCallingFormat
-            cfs_supported = True
-            calling_format = OrdinaryCallingFormat()
-        except ImportError:
-            cfs_supported = False
-            calling_format = None
-
-        if globals.s3_use_new_style:
-            if cfs_supported:
-                calling_format = SubdomainCallingFormat()
-            else:
-                log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
-                               "requested, but does not seem to be supported by the "
-                               "boto library. Either you need to upgrade your boto "
-                               "library or duplicity has failed to correctly detect "
-                               "the appropriate support.",
-                               log.ErrorCode.boto_old_style)
-        else:
-            if cfs_supported:
-                calling_format = OrdinaryCallingFormat()
-            else:
-                calling_format = None
-
-    except ImportError:
-        log.FatalError("This backend (s3) requires boto library, version %s or later, "
-                       "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
-                       log.ErrorCode.boto_lib_too_old)
-
-    if scheme == 's3+http':
-        # Use the default Amazon S3 host.
-        conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
-    else:
-        assert scheme == 's3'
-        conn = S3Connection(
-            host = parsed_url.hostname,
-            is_secure=(not globals.s3_unencrypted_connection))
-
-    if hasattr(conn, 'calling_format'):
-        if calling_format is None:
-            log.FatalError("It seems we previously failed to detect support for calling "
-                           "formats in the boto library, yet the support is there. This is "
-                           "almost certainly a duplicity bug.",
-                           log.ErrorCode.boto_calling_format)
-        else:
-            conn.calling_format = calling_format
-
-    else:
-        # Duplicity hangs if boto gets a null bucket name.
-        # HC: Caught a socket error, trying to recover
-        raise BackendException('Boto requires a bucket name.')
-    return conn
-
-
-class BotoBackend(duplicity.backend.Backend):
+
+
+class BotoBackend(BotoSingleBackend):
     """
     Backend for Amazon's Simple Storage System, (aka Amazon S3), though
     the use of the boto module, (http://code.google.com/p/boto/).
@@ -167,197 +80,29 @@
     """
 
     def __init__(self, parsed_url):
-        duplicity.backend.Backend.__init__(self, parsed_url)
-
-        from boto.s3.key import Key
-        from boto.s3.multipart import MultiPartUpload
-
-        # This folds the null prefix and all null parts, which means that:
-        #  //MyBucket/ and //MyBucket are equivalent.
-        #  //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
-        self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
-
-        if self.url_parts:
-            self.bucket_name = self.url_parts.pop(0)
-        else:
-            # Duplicity hangs if boto gets a null bucket name.
-            # HC: Caught a socket error, trying to recover
-            raise BackendException('Boto requires a bucket name.')
-
-        self.scheme = parsed_url.scheme
-
-        self.key_class = Key
-
-        if self.url_parts:
-            self.key_prefix = '%s/' % '/'.join(self.url_parts)
-        else:
-            self.key_prefix = ''
-
-        self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
-        self.parsed_url = parsed_url
-        self.resetConnection()
-
-    def resetConnection(self):
-        self.bucket = None
-        self.conn = get_connection(self.scheme, self.parsed_url)
-        self.bucket = self.conn.lookup(self.bucket_name)
-
-    def put(self, source_path, remote_filename=None):
-        from boto.s3.connection import Location
-        if globals.s3_european_buckets:
-            if not globals.s3_use_new_style:
-                log.FatalError("European bucket creation was requested, but not new-style "
-                               "bucket addressing (--s3-use-new-style)",
-                               log.ErrorCode.s3_bucket_not_style)
-        #Network glitch may prevent first few attempts of creating/looking up a bucket
-        for n in range(1, globals.num_retries+1):
-            if self.bucket:
-                break
-            if n > 1:
-                time.sleep(30)
-            try:
-                try:
-                    self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
-                except Exception, e:
-                    if "NoSuchBucket" in str(e):
-                        if globals.s3_european_buckets:
-                            self.bucket = self.conn.create_bucket(self.bucket_name,
-                                                                  location=Location.EU)
-                        else:
-                            self.bucket = self.conn.create_bucket(self.bucket_name)
-                    else:
-                        raise e
-            except Exception, e:
-                log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
-                         "" % (n, self.bucket_name,
-                               e.__class__.__name__,
-                               str(e)))
-                self.resetConnection()
-
-        if not remote_filename:
-            remote_filename = source_path.get_filename()
-        key = self.key_prefix + remote_filename
-        for n in range(1, globals.num_retries+1):
-            if n > 1:
-                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
-                time.sleep(10)
-
-            if globals.s3_use_rrs:
-                storage_class = 'REDUCED_REDUNDANCY'
-            else:
-                storage_class = 'STANDARD'
-            log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
-            try:
-                headers = {
-                    'Content-Type': 'application/octet-stream',
-                    'x-amz-storage-class': storage_class
-                }
-                self.upload(source_path.name, key, headers)
-                self.resetConnection()
-                return
-            except Exception, e:
-                log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
-                         "" % (self.straight_url,
-                               remote_filename,
-                               n,
-                               e.__class__.__name__,
-                               str(e)))
-                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
-                self.resetConnection()
-        log.Warn("Giving up trying to upload %s/%s after %d attempts" %
-                 (self.straight_url, remote_filename, globals.num_retries))
-        raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
-
-    def get(self, remote_filename, local_path):
-        key = self.key_class(self.bucket)
-        key.key = self.key_prefix + remote_filename
-        for n in range(1, globals.num_retries+1):
-            if n > 1:
-                # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
-                time.sleep(10)
-            log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
-            try:
-                key.get_contents_to_filename(local_path.name)
-                local_path.setdata()
-                self.resetConnection()
-                return
-            except Exception, e:
-                log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
-                         "" % (self.straight_url,
-                               remote_filename,
-                               n,
-                               e.__class__.__name__,
-                               str(e)), 1)
-                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
-                self.resetConnection()
-        log.Warn("Giving up trying to download %s/%s after %d attempts" %
-                (self.straight_url, remote_filename, globals.num_retries))
-        raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
-
-    def _list(self):
-        if not self.bucket:
-            raise BackendException("No connection to backend")
-
-        for n in range(1, globals.num_retries+1):
-            if n > 1:
-                # sleep before retry
-                time.sleep(30)
-            log.Info("Listing %s" % self.straight_url)
-            try:
-                return self._list_filenames_in_bucket()
-            except Exception, e:
-                log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
-                         "" % (self.straight_url,
-                               n,
-                               e.__class__.__name__,
-                               str(e)), 1)
-                log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
-        log.Warn("Giving up trying to list %s after %d attempts" %
-                (self.straight_url, globals.num_retries))
-        raise BackendException("Error listng %s" % self.straight_url)
-
-    def _list_filenames_in_bucket(self):
-        # We add a 'd' to the prefix to make sure it is not null (for boto) and
-        # to optimize the listing of our filenames, which always begin with 'd'.
-        # This will cause a failure in the regression tests as below:
-        #   FAIL: Test basic backend operations
-        #   <tracback snipped>
-        #   AssertionError: Got list: []
-        #   Wanted: ['testfile']
-        # Because of the need for this optimization, it should be left as is.
-        #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
-        filename_list = []
-        for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
-            try:
-                filename = k.key.replace(self.key_prefix, '', 1)
-                filename_list.append(filename)
-                log.Debug("Listed %s/%s" % (self.straight_url, filename))
-            except AttributeError:
-                pass
-        return filename_list
-
-    def delete(self, filename_list):
-        for filename in filename_list:
-            self.bucket.delete_key(self.key_prefix + filename)
-            log.Debug("Deleted %s/%s" % (self.straight_url, filename))
-
-    @retry
-    def _query_file_info(self, filename, raise_errors=False):
-        try:
-            key = self.bucket.lookup(self.key_prefix + filename)
-            if key is None:
-                return {'size': -1}
-            return {'size': key.size}
-        except Exception, e:
-            log.Warn("Query %s/%s failed: %s"
-                     "" % (self.straight_url,
-                           filename,
-                           str(e)))
-            self.resetConnection()
-            if raise_errors:
-                raise e
-            else:
-                return {'size': None}
+        BotoSingleBackend.__init__(self, parsed_url)
+        self._setup_pool()
+
+    def _setup_pool(self):
+        number_of_procs = globals.s3_multipart_max_procs
+        if not number_of_procs:
+            number_of_procs = multiprocessing.cpu_count()
+
+        if getattr(self, '_pool', False):
+            log.Debug("A process pool already exists. Destroying previous pool.")
+            self._pool.terminate()
+            self._pool.join()
+            self._pool = None
+
+        log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs)
+
+        self._pool = multiprocessing.Pool(processes=number_of_procs)
+
+    def close(self):
+        BotoSingleBackend.close(self)
+        log.Debug("Closing pool")
+        self._pool.terminate()
+        self._pool.join()
 
     def upload(self, filename, key, headers=None):
         chunk_size = globals.s3_multipart_chunk_size
@@ -379,7 +124,7 @@
 
         log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
 
-        mp = self.bucket.initiate_multipart_upload(key, headers)
+        mp = self.bucket.initiate_multipart_upload(key.key, headers)
 
         # Initiate a queue to share progress data between the pool
         # workers and a consumer thread, that will collect and report
@@ -389,57 +134,80 @@
             queue = manager.Queue()
             consumer = ConsumerThread(queue)
             consumer.start()
-
-        pool = multiprocessing.Pool(processes=chunks)
+        tasks = []
         for n in range(chunks):
-             params = [self.scheme, self.parsed_url, self.bucket_name, 
-                 mp.id, filename, n, chunk_size, globals.num_retries, 
-                 queue]
-             pool.apply_async(multipart_upload_worker, params)
-        pool.close()
-        pool.join()
+            params = [self.scheme, self.parsed_url, self.storage_uri, self.bucket_name,
+                      mp.id, filename, n, chunk_size, globals.num_retries,
+                      queue]
+            tasks.append(self._pool.apply_async(multipart_upload_worker, params))
+
+        log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks))
+        while tasks:
+            try:
+                tasks[0].wait(timeout=globals.s3_multipart_max_timeout)
+                if tasks[0].ready():
+                    if tasks[0].successful():
+                        del tasks[0]
+                    else:
+                        log.Debug("Part upload not successful, aborting multipart upload.")
+                        self._setup_pool()
+                        break
+                else:
+                    raise multiprocessing.TimeoutError
+            except multiprocessing.TimeoutError:
+                log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks))
+                self._setup_pool()
+                break
+
+        log.Debug("Done waiting for the pool to finish processing")
 
         # Terminate the consumer thread, if any
         if globals.progress:
             consumer.finish = True
             consumer.join()
 
-        if len(mp.get_all_parts()) < chunks:
+        if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
             mp.cancel_upload()
             raise BackendException("Multipart upload failed. Aborted.")
 
         return mp.complete_upload()
 
 
-def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
-                            offset, bytes, num_retries, queue):
+def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
+                            filename, offset, bytes, num_retries, queue):
     """
     Worker method for uploading a file chunk to S3 using multipart upload.
     Note that the file chunk is read into memory, so it's important to keep
     this number reasonably small.
     """
-    import traceback
 
     def _upload_callback(uploaded, total):
         worker_name = multiprocessing.current_process().name
         log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
         if not queue is None:
-            queue.put([uploaded, total]) # Push data to the consumer thread
+            queue.put([uploaded, total])  # Push data to the consumer thread
 
     def _upload(num_retries):
         worker_name = multiprocessing.current_process().name
         log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
         try:
-            conn = get_connection(scheme, parsed_url)
+            conn = get_connection(scheme, parsed_url, storage_uri)
             bucket = conn.lookup(bucket_name)
 
-            for mp in bucket.get_all_multipart_uploads():
+            for mp in bucket.list_multipart_uploads():
                 if mp.id == multipart_id:
                     with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
+                        start = time.time()
                         mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
-                                                    num_cb=max(2, 8 * bytes / (1024 * 1024))
-                                                ) # Max num of callbacks = 8 times x megabyte
+                                                 num_cb=max(2, 8 * bytes / (1024 * 1024))
+                                                 )  # Max num of callbacks = 8 times x megabyte
+                        end = time.time()
+                        log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start)))))
                     break
+            conn.close()
+            conn = None
+            bucket = None
+            del conn
         except Exception, e:
             traceback.print_exc()
             if num_retries:
@@ -452,6 +220,3 @@
         log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
 
     return _upload(num_retries)
-
-duplicity.backend.register_backend("s3", BotoBackend)
-duplicity.backend.register_backend("s3+http", BotoBackend)

=== modified file 'duplicity/backends/_boto_single.py'
--- duplicity/backends/_boto_single.py	2014-01-13 15:54:13 +0000
+++ duplicity/backends/_boto_single.py	2014-02-21 17:46:27 +0000
@@ -19,6 +19,7 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
+import os
 import time
 
 import duplicity.backend
@@ -29,7 +30,90 @@
 from duplicity.backend import retry
 from duplicity import progress
 
-BOTO_MIN_VERSION = "2.0"
+BOTO_MIN_VERSION = "2.1.1"
+
+
+def get_connection(scheme, parsed_url, storage_uri):
+    try:
+        from boto.s3.connection import S3Connection
+        assert hasattr(S3Connection, 'lookup')
+
+        # Newer versions of boto default to using
+        # virtual hosting for buckets as a result of
+        # upstream deprecation of the old-style access
+        # method by Amazon S3. This change is not
+        # backwards compatible (in particular with
+        # respect to upper case characters in bucket
+        # names); so we default to forcing use of the
+        # old-style method unless the user has
+        # explicitly asked us to use new-style bucket
+        # access.
+        #
+        # Note that if the user wants to use new-style
+        # buckets, we use the subdomain calling form
+        # rather than given the option of both
+        # subdomain and vhost. The reason being that
+        # anything addressable as a vhost, is also
+        # addressable as a subdomain. Seeing as the
+        # latter is mostly a convenience method of
+        # allowing browse:able content semi-invisibly
+        # being hosted on S3, the former format makes
+        # a lot more sense for us to use - being
+        # explicit about what is happening (the fact
+        # that we are talking to S3 servers).
+
+        try:
+            from boto.s3.connection import OrdinaryCallingFormat
+            from boto.s3.connection import SubdomainCallingFormat
+            cfs_supported = True
+            calling_format = OrdinaryCallingFormat()
+        except ImportError:
+            cfs_supported = False
+            calling_format = None
+
+        if globals.s3_use_new_style:
+            if cfs_supported:
+                calling_format = SubdomainCallingFormat()
+            else:
+                log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
+                               "requested, but does not seem to be supported by the "
+                               "boto library. Either you need to upgrade your boto "
+                               "library or duplicity has failed to correctly detect "
+                               "the appropriate support.",
+                               log.ErrorCode.boto_old_style)
+        else:
+            if cfs_supported:
+                calling_format = OrdinaryCallingFormat()
+            else:
+                calling_format = None
+
+    except ImportError:
+        log.FatalError("This backend (s3) requires boto library, version %s or later, "
+                       "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
+                       log.ErrorCode.boto_lib_too_old)
+
+    if not parsed_url.hostname:
+        # Use the default host.
+        conn = storage_uri.connect(is_secure=(not globals.s3_unencrypted_connection))
+    else:
+        assert scheme == 's3'
+        conn = storage_uri.connect(host=parsed_url.hostname,
+                                   is_secure=(not globals.s3_unencrypted_connection))
+
+    if hasattr(conn, 'calling_format'):
+        if calling_format is None:
+            log.FatalError("It seems we previously failed to detect support for calling "
+                           "formats in the boto library, yet the support is there. This is "
+                           "almost certainly a duplicity bug.",
+                           log.ErrorCode.boto_calling_format)
+        else:
+            conn.calling_format = calling_format
+
+    else:
+        # Duplicity hangs if boto gets a null bucket name.
+        # HC: Caught a socket error, trying to recover
+        raise BackendException('Boto requires a bucket name.')
+    return conn
 
 
 class BotoBackend(duplicity.backend.Backend):
@@ -78,94 +162,22 @@
                                         parsed_url.path.lstrip('/')))
         self.storage_uri = boto.storage_uri(self.boto_uri_str)
         self.resetConnection()
+        self._listed_keys = {}
+
+    def close(self):
+        del self._listed_keys
+        self._listed_keys = {}
+        self.bucket = None
+        self.conn = None
+        del self.conn
 
     def resetConnection(self):
+        if getattr(self, 'conn', False):
+            self.conn.close()
         self.bucket = None
         self.conn = None
-
-        try:
-            from boto.s3.connection import S3Connection
-            from boto.s3.key import Key
-            assert hasattr(S3Connection, 'lookup')
-
-            # Newer versions of boto default to using
-            # virtual hosting for buckets as a result of
-            # upstream deprecation of the old-style access
-            # method by Amazon S3. This change is not
-            # backwards compatible (in particular with
-            # respect to upper case characters in bucket
-            # names); so we default to forcing use of the
-            # old-style method unless the user has
-            # explicitly asked us to use new-style bucket
-            # access.
-            #
-            # Note that if the user wants to use new-style
-            # buckets, we use the subdomain calling form
-            # rather than given the option of both
-            # subdomain and vhost. The reason being that
-            # anything addressable as a vhost, is also
-            # addressable as a subdomain. Seeing as the
-            # latter is mostly a convenience method of
-            # allowing browse:able content semi-invisibly
-            # being hosted on S3, the former format makes
-            # a lot more sense for us to use - being
-            # explicit about what is happening (the fact
-            # that we are talking to S3 servers).
-
-            try:
-                from boto.s3.connection import OrdinaryCallingFormat
-                from boto.s3.connection import SubdomainCallingFormat
-                cfs_supported = True
-                calling_format = OrdinaryCallingFormat()
-            except ImportError:
-                cfs_supported = False
-                calling_format = None
-
-            if globals.s3_use_new_style:
-                if cfs_supported:
-                    calling_format = SubdomainCallingFormat()
-                else:
-                    log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
-                                   "requested, but does not seem to be supported by the "
-                                   "boto library. Either you need to upgrade your boto "
-                                   "library or duplicity has failed to correctly detect "
-                                   "the appropriate support.",
-                                   log.ErrorCode.boto_old_style)
-            else:
-                if cfs_supported:
-                    calling_format = OrdinaryCallingFormat()
-                else:
-                    calling_format = None
-
-        except ImportError:
-            log.FatalError("This backend (s3) requires boto library, version %s or later, "
-                           "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
-                           log.ErrorCode.boto_lib_too_old)
-
-        if not self.parsed_url.hostname:
-            # Use the default host.
-            self.conn = self.storage_uri.connect(
-                is_secure=(not globals.s3_unencrypted_connection))
-        else:
-            assert self.scheme == 's3'
-            self.conn = self.storage_uri.connect(
-                host=self.parsed_url.hostname,
-                is_secure=(not globals.s3_unencrypted_connection))
-
-        if hasattr(self.conn, 'calling_format'):
-            if calling_format is None:
-                log.FatalError("It seems we previously failed to detect support for calling "
-                               "formats in the boto library, yet the support is there. This is "
-                               "almost certainly a duplicity bug.",
-                               log.ErrorCode.boto_calling_format)
-            else:
-                self.conn.calling_format = calling_format
-
-        else:
-            # Duplicity hangs if boto gets a null bucket name.
-            # HC: Caught a socket error, trying to recover
-            raise BackendException('Boto requires a bucket name.')
-
+        del self.conn
+        self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
         self.bucket = self.conn.lookup(self.bucket_name)
 
     def put(self, source_path, remote_filename=None):
@@ -181,6 +193,7 @@
                 break
             if n > 1:
                 time.sleep(30)
+                self.resetConnection()
             try:
                 try:
                     self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
@@ -198,7 +211,6 @@
                          "" % (n, self.bucket_name,
                                e.__class__.__name__,
                                str(e)))
-                self.resetConnection()
 
         if not remote_filename:
             remote_filename = source_path.get_filename()
@@ -215,14 +227,17 @@
                 storage_class = 'STANDARD'
             log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
             try:
-                key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream',
-                                                                  'x-amz-storage-class': storage_class},
-                                                cb=progress.report_transfer,
-                                                num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
-                                              ) # Max num of callbacks = 8 times x megabyte
-
-                key.close()
+                headers = {
+                    'Content-Type': 'application/octet-stream',
+                    'x-amz-storage-class': storage_class
+                }
+                upload_start = time.time()
+                self.upload(source_path.name, key, headers)
+                upload_end = time.time()
+                total_s = abs(upload_end-upload_start) or 1  # prevent a zero value!
+                rough_upload_speed = os.path.getsize(source_path.name)/total_s
                 self.resetConnection()
+                log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
                 return
             except Exception, e:
                 log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
@@ -238,19 +253,20 @@
         raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
 
     def get(self, remote_filename, local_path):
+        key_name = self.key_prefix + remote_filename
+        self.pre_process_download(remote_filename, wait=True)
+        key = self._listed_keys[key_name]
         for n in range(1, globals.num_retries+1):
             if n > 1:
                 # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
                 time.sleep(10)
             log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
             try:
-                key_name = self.key_prefix + remote_filename
-                key = self.bucket.get_key(key_name)
+                self.resetConnection()
                 if key is None:
                     raise BackendException("%s: key not found" % key_name)
                 key.get_contents_to_filename(local_path.name)
                 local_path.setdata()
-                self.resetConnection()
                 return
             except Exception, e:
                 log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
@@ -260,7 +276,7 @@
                                e.__class__.__name__,
                                str(e)), 1)
                 log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
-                self.resetConnection()
+
         log.Warn("Giving up trying to download %s/%s after %d attempts" %
                 (self.straight_url, remote_filename, globals.num_retries))
         raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
@@ -273,6 +289,7 @@
             if n > 1:
                 # sleep before retry
                 time.sleep(30)
+                self.resetConnection()
             log.Info("Listing %s" % self.straight_url)
             try:
                 return self._list_filenames_in_bucket()
@@ -298,10 +315,11 @@
         # Because of the need for this optimization, it should be left as is.
         #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
         filename_list = []
-        for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
+        for k in self.bucket.list(prefix=self.key_prefix, delimiter='/'):
             try:
                 filename = k.key.replace(self.key_prefix, '', 1)
                 filename_list.append(filename)
+                self._listed_keys[k.key] = k
                 log.Debug("Listed %s/%s" % (self.straight_url, filename))
             except AttributeError:
                 pass
@@ -330,6 +348,53 @@
             else:
                 return {'size': None}
 
-duplicity.backend.register_backend("gs", BotoBackend)
-duplicity.backend.register_backend("s3", BotoBackend)
-duplicity.backend.register_backend("s3+http", BotoBackend)
+    def upload(self, filename, key, headers):
+            key.set_contents_from_filename(filename, headers,
+                                           cb=progress.report_transfer,
+                                           num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
+                                           )  # Max num of callbacks = 8 times x megabyte
+            key.close()
+
+    def pre_process_download(self, files_to_download, wait=False):
+        # Used primarily to move files in Glacier to S3
+        if isinstance(files_to_download, basestring):
+            files_to_download = [files_to_download]
+
+        for remote_filename in files_to_download:
+            success = False
+            for n in range(1, globals.num_retries+1):
+                if n > 1:
+                    # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
+                    time.sleep(10)
+                    self.resetConnection()
+                try:
+                    key_name = self.key_prefix + remote_filename
+                    if not self._listed_keys.get(key_name, False):
+                        self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
+                    key = self._listed_keys[key_name]
+
+                    if key.storage_class == "GLACIER":
+                        # We need to move the file out of glacier
+                        if not self.bucket.get_key(key.key).ongoing_restore:
+                            log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
+                            key.restore(days=1)  # Shouldn't need this again after 1 day
+                        if wait:
+                            log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
+                            while self.bucket.get_key(key.key).ongoing_restore:
+                                time.sleep(60)
+                                self.resetConnection()
+                            log.Info("File %s was successfully restored from Glacier" % remote_filename)
+                    success = True
+                    break
+                except Exception, e:
+                    log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)"
+                             "" % (self.straight_url,
+                                   remote_filename,
+                                   n,
+                                   e.__class__.__name__,
+                                   str(e)), 1)
+                    log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
+            if not success:
+                log.Warn("Giving up trying to restore %s/%s after %d attempts" %
+                        (self.straight_url, remote_filename, globals.num_retries))
+                raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename))

=== modified file 'duplicity/backends/botobackend.py'
--- duplicity/backends/botobackend.py	2012-02-29 16:40:41 +0000
+++ duplicity/backends/botobackend.py	2014-02-21 17:46:27 +0000
@@ -20,13 +20,20 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
+import duplicity.backend
 from duplicity import globals
 import sys
+from _boto_multi import BotoBackend as BotoMultiUploadBackend
+from _boto_single import BotoBackend as BotoSingleUploadBackend
 
 if globals.s3_use_multiprocessing:
-    if sys.version_info[:2] < (2,6):
-        print "Sorry, S3 multiprocessing requires version 2.5 or later of python"
+    if sys.version_info[:2] < (2, 6):
+        print "Sorry, S3 multiprocessing requires version 2.6 or later of python"
         sys.exit(1)
-    import _boto_multi
+    duplicity.backend.register_backend("gs", BotoMultiUploadBackend)
+    duplicity.backend.register_backend("s3", BotoMultiUploadBackend)
+    duplicity.backend.register_backend("s3+http", BotoMultiUploadBackend)
 else:
-    import _boto_single
+    duplicity.backend.register_backend("gs", BotoSingleUploadBackend)
+    duplicity.backend.register_backend("s3", BotoSingleUploadBackend)
+    duplicity.backend.register_backend("s3+http", BotoSingleUploadBackend)

=== modified file 'duplicity/commandline.py'
--- duplicity/commandline.py	2014-01-31 12:41:00 +0000
+++ duplicity/commandline.py	2014-02-21 17:46:27 +0000
@@ -495,6 +495,14 @@
     parser.add_option("--s3-multipart-chunk-size", type = "int", action = "callback", metavar = _("number"),
                       callback = lambda o, s, v, p: setattr(p.values, "s3_multipart_chunk_size", v * 1024 * 1024))
 
+    # Number of processes to set the Processor Pool to when uploading multipart
+    # uploads to S3. Use this to control the maximum simultaneous uploads to S3.
+    parser.add_option("--s3-multipart-max-procs", type="int", metavar=_("number"))
+
+    # Number of seconds to wait for each part of a multipart upload to S3. Use this
+    # to prevent hangups when doing a multipart upload to S3.
+    parser.add_option("--s3_multipart_max_timeout", type="int", metavar=_("number"))
+
     # Option to allow the s3/boto backend use the multiprocessing version.
     # By default it is off since it does not work for Python 2.4 or 2.5.
     if sys.version_info[:2] >= (2, 6):

=== modified file 'duplicity/globals.py'
--- duplicity/globals.py	2014-01-31 12:41:00 +0000
+++ duplicity/globals.py	2014-02-21 17:46:27 +0000
@@ -200,6 +200,12 @@
 # Minimum chunk size accepted by S3
 s3_multipart_minimum_chunk_size = 5 * 1024 * 1024
 
+# Maximum number of processes to use while doing a multipart upload to S3
+s3_multipart_max_procs = None
+
+# Maximum time to wait for a part to finish when doig a multipart upload to S3
+s3_multipart_max_timeout = None
+
 # Whether to use the full email address as the user name when
 # logging into an imap server. If false just the user name
 # part of the email address is used.