duplicity-team team mailing list archive
-
duplicity-team team
-
Mailing list archive
-
Message #02007
[Merge] lp:~prateek/duplicity/s3-glacier into lp:duplicity
someone1 has proposed merging lp:~prateek/duplicity/s3-glacier into lp:duplicity.
Commit message:
Fixes https://bugs.launchpad.net/duplicity/+bug/1039511 - Adds support to detect when a file is on Glacier and initiates a restore to S3. Also merges overlapping code in the boto backends
Fixes https://bugs.launchpad.net/duplicity/+bug/1243246 - Adds a --s3_multipart_max_timeout input option to limit the max execution time of a chunked upload to S3. Also adds debug message to calculate upload speed.
Requested reviews:
duplicity-team (duplicity-team)
Related bugs:
Bug #1039511 in Duplicity: "Support for Amazon Glacier"
https://bugs.launchpad.net/duplicity/+bug/1039511
Bug #1243246 in Duplicity: "Duplicity Hangs intermittently when using multiprocess uploads to S3"
https://bugs.launchpad.net/duplicity/+bug/1243246
For more details, see:
https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
How this addresses bug 1039511:
If a file located in S3 is found to be on Glacier, it will initiate a restore to S3 and wait until the file is ready to continue the restoration process.
This branch also merged _boto_single and _boto_multi as a majority of the code overlaps, so to make updates easier, having _boto_multi as a subclass to _boto_single makes it so changes to shared code is only done in one place.
--
https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
Your team duplicity-team is requested to review the proposed merge of lp:~prateek/duplicity/s3-glacier into lp:duplicity.
=== modified file 'README'
--- README 2014-01-24 12:39:40 +0000
+++ README 2014-02-21 17:46:27 +0000
@@ -29,6 +29,7 @@
* Boto 2.0 or later for single-processing S3 or GCS access (default)
* Boto 2.1.1 or later for multi-processing S3 access
* Python v2.6 or later for multi-processing S3 access
+ * Boto 2.7.0 or later for Glacier S3 access
If you install from the source package, you will also need:
=== modified file 'bin/duplicity'
--- bin/duplicity 2014-02-05 02:57:01 +0000
+++ bin/duplicity 2014-02-21 17:46:27 +0000
@@ -735,6 +735,15 @@
log.Progress(_('Processed volume %d of %d') % (cur_vol[0], num_vols),
cur_vol[0], num_vols)
+ if hasattr(globals.backend, 'pre_process_download'):
+ file_names = []
+ for backup_set in backup_setlist:
+ manifest = backup_set.get_manifest()
+ volumes = manifest.get_containing_volumes(index)
+ for vol_num in volumes:
+ file_names.append(backup_set.volume_name_dict[vol_num])
+ globals.backend.pre_process_download(file_names)
+
fileobj_iters = map(get_fileobj_iter, backup_setlist)
tarfiles = map(patchdir.TarFile_FromFileobjs, fileobj_iters)
return patchdir.tarfiles2rop_iter(tarfiles, index)
@@ -1142,6 +1151,8 @@
local_missing = [] # don't download if we can't decrypt
for fn in local_spurious:
remove_local(fn)
+ if hasattr(globals.backend, 'pre_process_download'):
+ globals.backend.pre_process_download(local_missing)
for fn in local_missing:
copy_to_local(fn)
else:
=== modified file 'duplicity/backends/_boto_multi.py'
--- duplicity/backends/_boto_multi.py 2014-01-13 15:54:13 +0000
+++ duplicity/backends/_boto_multi.py 2014-02-21 17:46:27 +0000
@@ -22,20 +22,20 @@
import os
import sys
-import time
import threading
import Queue
-
-import duplicity.backend
+import time
+import traceback
from duplicity import globals
from duplicity import log
from duplicity.errors import * #@UnusedWildImport
-from duplicity.util import exception_traceback
-from duplicity.backend import retry
from duplicity.filechunkio import FileChunkIO
from duplicity import progress
+from _boto_single import BotoBackend as BotoSingleBackend
+from _boto_single import get_connection
+
BOTO_MIN_VERSION = "2.1.1"
# Multiprocessing is not supported on *BSD
@@ -61,100 +61,13 @@
def run(self):
while not self.finish:
try:
- args = self.queue.get(True, 1)
+ args = self.queue.get(True, 1)
progress.report_transfer(args[0], args[1])
except Queue.Empty, e:
pass
-
-
-def get_connection(scheme, parsed_url):
- try:
- import boto
- assert boto.Version >= BOTO_MIN_VERSION
-
- from boto.s3.connection import S3Connection
- assert hasattr(S3Connection, 'lookup')
-
- # Newer versions of boto default to using
- # virtual hosting for buckets as a result of
- # upstream deprecation of the old-style access
- # method by Amazon S3. This change is not
- # backwards compatible (in particular with
- # respect to upper case characters in bucket
- # names); so we default to forcing use of the
- # old-style method unless the user has
- # explicitly asked us to use new-style bucket
- # access.
- #
- # Note that if the user wants to use new-style
- # buckets, we use the subdomain calling form
- # rather than given the option of both
- # subdomain and vhost. The reason being that
- # anything addressable as a vhost, is also
- # addressable as a subdomain. Seeing as the
- # latter is mostly a convenience method of
- # allowing browse:able content semi-invisibly
- # being hosted on S3, the former format makes
- # a lot more sense for us to use - being
- # explicit about what is happening (the fact
- # that we are talking to S3 servers).
-
- try:
- from boto.s3.connection import OrdinaryCallingFormat
- from boto.s3.connection import SubdomainCallingFormat
- cfs_supported = True
- calling_format = OrdinaryCallingFormat()
- except ImportError:
- cfs_supported = False
- calling_format = None
-
- if globals.s3_use_new_style:
- if cfs_supported:
- calling_format = SubdomainCallingFormat()
- else:
- log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
- "requested, but does not seem to be supported by the "
- "boto library. Either you need to upgrade your boto "
- "library or duplicity has failed to correctly detect "
- "the appropriate support.",
- log.ErrorCode.boto_old_style)
- else:
- if cfs_supported:
- calling_format = OrdinaryCallingFormat()
- else:
- calling_format = None
-
- except ImportError:
- log.FatalError("This backend (s3) requires boto library, version %s or later, "
- "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
- log.ErrorCode.boto_lib_too_old)
-
- if scheme == 's3+http':
- # Use the default Amazon S3 host.
- conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
- else:
- assert scheme == 's3'
- conn = S3Connection(
- host = parsed_url.hostname,
- is_secure=(not globals.s3_unencrypted_connection))
-
- if hasattr(conn, 'calling_format'):
- if calling_format is None:
- log.FatalError("It seems we previously failed to detect support for calling "
- "formats in the boto library, yet the support is there. This is "
- "almost certainly a duplicity bug.",
- log.ErrorCode.boto_calling_format)
- else:
- conn.calling_format = calling_format
-
- else:
- # Duplicity hangs if boto gets a null bucket name.
- # HC: Caught a socket error, trying to recover
- raise BackendException('Boto requires a bucket name.')
- return conn
-
-
-class BotoBackend(duplicity.backend.Backend):
+
+
+class BotoBackend(BotoSingleBackend):
"""
Backend for Amazon's Simple Storage System, (aka Amazon S3), though
the use of the boto module, (http://code.google.com/p/boto/).
@@ -167,197 +80,29 @@
"""
def __init__(self, parsed_url):
- duplicity.backend.Backend.__init__(self, parsed_url)
-
- from boto.s3.key import Key
- from boto.s3.multipart import MultiPartUpload
-
- # This folds the null prefix and all null parts, which means that:
- # //MyBucket/ and //MyBucket are equivalent.
- # //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
- self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
-
- if self.url_parts:
- self.bucket_name = self.url_parts.pop(0)
- else:
- # Duplicity hangs if boto gets a null bucket name.
- # HC: Caught a socket error, trying to recover
- raise BackendException('Boto requires a bucket name.')
-
- self.scheme = parsed_url.scheme
-
- self.key_class = Key
-
- if self.url_parts:
- self.key_prefix = '%s/' % '/'.join(self.url_parts)
- else:
- self.key_prefix = ''
-
- self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
- self.parsed_url = parsed_url
- self.resetConnection()
-
- def resetConnection(self):
- self.bucket = None
- self.conn = get_connection(self.scheme, self.parsed_url)
- self.bucket = self.conn.lookup(self.bucket_name)
-
- def put(self, source_path, remote_filename=None):
- from boto.s3.connection import Location
- if globals.s3_european_buckets:
- if not globals.s3_use_new_style:
- log.FatalError("European bucket creation was requested, but not new-style "
- "bucket addressing (--s3-use-new-style)",
- log.ErrorCode.s3_bucket_not_style)
- #Network glitch may prevent first few attempts of creating/looking up a bucket
- for n in range(1, globals.num_retries+1):
- if self.bucket:
- break
- if n > 1:
- time.sleep(30)
- try:
- try:
- self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
- except Exception, e:
- if "NoSuchBucket" in str(e):
- if globals.s3_european_buckets:
- self.bucket = self.conn.create_bucket(self.bucket_name,
- location=Location.EU)
- else:
- self.bucket = self.conn.create_bucket(self.bucket_name)
- else:
- raise e
- except Exception, e:
- log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
- "" % (n, self.bucket_name,
- e.__class__.__name__,
- str(e)))
- self.resetConnection()
-
- if not remote_filename:
- remote_filename = source_path.get_filename()
- key = self.key_prefix + remote_filename
- for n in range(1, globals.num_retries+1):
- if n > 1:
- # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
- time.sleep(10)
-
- if globals.s3_use_rrs:
- storage_class = 'REDUCED_REDUNDANCY'
- else:
- storage_class = 'STANDARD'
- log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
- try:
- headers = {
- 'Content-Type': 'application/octet-stream',
- 'x-amz-storage-class': storage_class
- }
- self.upload(source_path.name, key, headers)
- self.resetConnection()
- return
- except Exception, e:
- log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
- "" % (self.straight_url,
- remote_filename,
- n,
- e.__class__.__name__,
- str(e)))
- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
- self.resetConnection()
- log.Warn("Giving up trying to upload %s/%s after %d attempts" %
- (self.straight_url, remote_filename, globals.num_retries))
- raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
-
- def get(self, remote_filename, local_path):
- key = self.key_class(self.bucket)
- key.key = self.key_prefix + remote_filename
- for n in range(1, globals.num_retries+1):
- if n > 1:
- # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
- time.sleep(10)
- log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
- try:
- key.get_contents_to_filename(local_path.name)
- local_path.setdata()
- self.resetConnection()
- return
- except Exception, e:
- log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
- "" % (self.straight_url,
- remote_filename,
- n,
- e.__class__.__name__,
- str(e)), 1)
- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
- self.resetConnection()
- log.Warn("Giving up trying to download %s/%s after %d attempts" %
- (self.straight_url, remote_filename, globals.num_retries))
- raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
-
- def _list(self):
- if not self.bucket:
- raise BackendException("No connection to backend")
-
- for n in range(1, globals.num_retries+1):
- if n > 1:
- # sleep before retry
- time.sleep(30)
- log.Info("Listing %s" % self.straight_url)
- try:
- return self._list_filenames_in_bucket()
- except Exception, e:
- log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
- "" % (self.straight_url,
- n,
- e.__class__.__name__,
- str(e)), 1)
- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
- log.Warn("Giving up trying to list %s after %d attempts" %
- (self.straight_url, globals.num_retries))
- raise BackendException("Error listng %s" % self.straight_url)
-
- def _list_filenames_in_bucket(self):
- # We add a 'd' to the prefix to make sure it is not null (for boto) and
- # to optimize the listing of our filenames, which always begin with 'd'.
- # This will cause a failure in the regression tests as below:
- # FAIL: Test basic backend operations
- # <tracback snipped>
- # AssertionError: Got list: []
- # Wanted: ['testfile']
- # Because of the need for this optimization, it should be left as is.
- #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
- filename_list = []
- for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
- try:
- filename = k.key.replace(self.key_prefix, '', 1)
- filename_list.append(filename)
- log.Debug("Listed %s/%s" % (self.straight_url, filename))
- except AttributeError:
- pass
- return filename_list
-
- def delete(self, filename_list):
- for filename in filename_list:
- self.bucket.delete_key(self.key_prefix + filename)
- log.Debug("Deleted %s/%s" % (self.straight_url, filename))
-
- @retry
- def _query_file_info(self, filename, raise_errors=False):
- try:
- key = self.bucket.lookup(self.key_prefix + filename)
- if key is None:
- return {'size': -1}
- return {'size': key.size}
- except Exception, e:
- log.Warn("Query %s/%s failed: %s"
- "" % (self.straight_url,
- filename,
- str(e)))
- self.resetConnection()
- if raise_errors:
- raise e
- else:
- return {'size': None}
+ BotoSingleBackend.__init__(self, parsed_url)
+ self._setup_pool()
+
+ def _setup_pool(self):
+ number_of_procs = globals.s3_multipart_max_procs
+ if not number_of_procs:
+ number_of_procs = multiprocessing.cpu_count()
+
+ if getattr(self, '_pool', False):
+ log.Debug("A process pool already exists. Destroying previous pool.")
+ self._pool.terminate()
+ self._pool.join()
+ self._pool = None
+
+ log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs)
+
+ self._pool = multiprocessing.Pool(processes=number_of_procs)
+
+ def close(self):
+ BotoSingleBackend.close(self)
+ log.Debug("Closing pool")
+ self._pool.terminate()
+ self._pool.join()
def upload(self, filename, key, headers=None):
chunk_size = globals.s3_multipart_chunk_size
@@ -379,7 +124,7 @@
log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
- mp = self.bucket.initiate_multipart_upload(key, headers)
+ mp = self.bucket.initiate_multipart_upload(key.key, headers)
# Initiate a queue to share progress data between the pool
# workers and a consumer thread, that will collect and report
@@ -389,57 +134,80 @@
queue = manager.Queue()
consumer = ConsumerThread(queue)
consumer.start()
-
- pool = multiprocessing.Pool(processes=chunks)
+ tasks = []
for n in range(chunks):
- params = [self.scheme, self.parsed_url, self.bucket_name,
- mp.id, filename, n, chunk_size, globals.num_retries,
- queue]
- pool.apply_async(multipart_upload_worker, params)
- pool.close()
- pool.join()
+ params = [self.scheme, self.parsed_url, self.storage_uri, self.bucket_name,
+ mp.id, filename, n, chunk_size, globals.num_retries,
+ queue]
+ tasks.append(self._pool.apply_async(multipart_upload_worker, params))
+
+ log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks))
+ while tasks:
+ try:
+ tasks[0].wait(timeout=globals.s3_multipart_max_timeout)
+ if tasks[0].ready():
+ if tasks[0].successful():
+ del tasks[0]
+ else:
+ log.Debug("Part upload not successful, aborting multipart upload.")
+ self._setup_pool()
+ break
+ else:
+ raise multiprocessing.TimeoutError
+ except multiprocessing.TimeoutError:
+ log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks))
+ self._setup_pool()
+ break
+
+ log.Debug("Done waiting for the pool to finish processing")
# Terminate the consumer thread, if any
if globals.progress:
consumer.finish = True
consumer.join()
- if len(mp.get_all_parts()) < chunks:
+ if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
mp.cancel_upload()
raise BackendException("Multipart upload failed. Aborted.")
return mp.complete_upload()
-def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
- offset, bytes, num_retries, queue):
+def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
+ filename, offset, bytes, num_retries, queue):
"""
Worker method for uploading a file chunk to S3 using multipart upload.
Note that the file chunk is read into memory, so it's important to keep
this number reasonably small.
"""
- import traceback
def _upload_callback(uploaded, total):
worker_name = multiprocessing.current_process().name
log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
if not queue is None:
- queue.put([uploaded, total]) # Push data to the consumer thread
+ queue.put([uploaded, total]) # Push data to the consumer thread
def _upload(num_retries):
worker_name = multiprocessing.current_process().name
log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
try:
- conn = get_connection(scheme, parsed_url)
+ conn = get_connection(scheme, parsed_url, storage_uri)
bucket = conn.lookup(bucket_name)
- for mp in bucket.get_all_multipart_uploads():
+ for mp in bucket.list_multipart_uploads():
if mp.id == multipart_id:
with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
+ start = time.time()
mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
- num_cb=max(2, 8 * bytes / (1024 * 1024))
- ) # Max num of callbacks = 8 times x megabyte
+ num_cb=max(2, 8 * bytes / (1024 * 1024))
+ ) # Max num of callbacks = 8 times x megabyte
+ end = time.time()
+ log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start)))))
break
+ conn.close()
+ conn = None
+ bucket = None
+ del conn
except Exception, e:
traceback.print_exc()
if num_retries:
@@ -452,6 +220,3 @@
log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
return _upload(num_retries)
-
-duplicity.backend.register_backend("s3", BotoBackend)
-duplicity.backend.register_backend("s3+http", BotoBackend)
=== modified file 'duplicity/backends/_boto_single.py'
--- duplicity/backends/_boto_single.py 2014-01-13 15:54:13 +0000
+++ duplicity/backends/_boto_single.py 2014-02-21 17:46:27 +0000
@@ -19,6 +19,7 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import os
import time
import duplicity.backend
@@ -29,7 +30,90 @@
from duplicity.backend import retry
from duplicity import progress
-BOTO_MIN_VERSION = "2.0"
+BOTO_MIN_VERSION = "2.1.1"
+
+
+def get_connection(scheme, parsed_url, storage_uri):
+ try:
+ from boto.s3.connection import S3Connection
+ assert hasattr(S3Connection, 'lookup')
+
+ # Newer versions of boto default to using
+ # virtual hosting for buckets as a result of
+ # upstream deprecation of the old-style access
+ # method by Amazon S3. This change is not
+ # backwards compatible (in particular with
+ # respect to upper case characters in bucket
+ # names); so we default to forcing use of the
+ # old-style method unless the user has
+ # explicitly asked us to use new-style bucket
+ # access.
+ #
+ # Note that if the user wants to use new-style
+ # buckets, we use the subdomain calling form
+ # rather than given the option of both
+ # subdomain and vhost. The reason being that
+ # anything addressable as a vhost, is also
+ # addressable as a subdomain. Seeing as the
+ # latter is mostly a convenience method of
+ # allowing browse:able content semi-invisibly
+ # being hosted on S3, the former format makes
+ # a lot more sense for us to use - being
+ # explicit about what is happening (the fact
+ # that we are talking to S3 servers).
+
+ try:
+ from boto.s3.connection import OrdinaryCallingFormat
+ from boto.s3.connection import SubdomainCallingFormat
+ cfs_supported = True
+ calling_format = OrdinaryCallingFormat()
+ except ImportError:
+ cfs_supported = False
+ calling_format = None
+
+ if globals.s3_use_new_style:
+ if cfs_supported:
+ calling_format = SubdomainCallingFormat()
+ else:
+ log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
+ "requested, but does not seem to be supported by the "
+ "boto library. Either you need to upgrade your boto "
+ "library or duplicity has failed to correctly detect "
+ "the appropriate support.",
+ log.ErrorCode.boto_old_style)
+ else:
+ if cfs_supported:
+ calling_format = OrdinaryCallingFormat()
+ else:
+ calling_format = None
+
+ except ImportError:
+ log.FatalError("This backend (s3) requires boto library, version %s or later, "
+ "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
+ log.ErrorCode.boto_lib_too_old)
+
+ if not parsed_url.hostname:
+ # Use the default host.
+ conn = storage_uri.connect(is_secure=(not globals.s3_unencrypted_connection))
+ else:
+ assert scheme == 's3'
+ conn = storage_uri.connect(host=parsed_url.hostname,
+ is_secure=(not globals.s3_unencrypted_connection))
+
+ if hasattr(conn, 'calling_format'):
+ if calling_format is None:
+ log.FatalError("It seems we previously failed to detect support for calling "
+ "formats in the boto library, yet the support is there. This is "
+ "almost certainly a duplicity bug.",
+ log.ErrorCode.boto_calling_format)
+ else:
+ conn.calling_format = calling_format
+
+ else:
+ # Duplicity hangs if boto gets a null bucket name.
+ # HC: Caught a socket error, trying to recover
+ raise BackendException('Boto requires a bucket name.')
+ return conn
class BotoBackend(duplicity.backend.Backend):
@@ -78,94 +162,22 @@
parsed_url.path.lstrip('/')))
self.storage_uri = boto.storage_uri(self.boto_uri_str)
self.resetConnection()
+ self._listed_keys = {}
+
+ def close(self):
+ del self._listed_keys
+ self._listed_keys = {}
+ self.bucket = None
+ self.conn = None
+ del self.conn
def resetConnection(self):
+ if getattr(self, 'conn', False):
+ self.conn.close()
self.bucket = None
self.conn = None
-
- try:
- from boto.s3.connection import S3Connection
- from boto.s3.key import Key
- assert hasattr(S3Connection, 'lookup')
-
- # Newer versions of boto default to using
- # virtual hosting for buckets as a result of
- # upstream deprecation of the old-style access
- # method by Amazon S3. This change is not
- # backwards compatible (in particular with
- # respect to upper case characters in bucket
- # names); so we default to forcing use of the
- # old-style method unless the user has
- # explicitly asked us to use new-style bucket
- # access.
- #
- # Note that if the user wants to use new-style
- # buckets, we use the subdomain calling form
- # rather than given the option of both
- # subdomain and vhost. The reason being that
- # anything addressable as a vhost, is also
- # addressable as a subdomain. Seeing as the
- # latter is mostly a convenience method of
- # allowing browse:able content semi-invisibly
- # being hosted on S3, the former format makes
- # a lot more sense for us to use - being
- # explicit about what is happening (the fact
- # that we are talking to S3 servers).
-
- try:
- from boto.s3.connection import OrdinaryCallingFormat
- from boto.s3.connection import SubdomainCallingFormat
- cfs_supported = True
- calling_format = OrdinaryCallingFormat()
- except ImportError:
- cfs_supported = False
- calling_format = None
-
- if globals.s3_use_new_style:
- if cfs_supported:
- calling_format = SubdomainCallingFormat()
- else:
- log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
- "requested, but does not seem to be supported by the "
- "boto library. Either you need to upgrade your boto "
- "library or duplicity has failed to correctly detect "
- "the appropriate support.",
- log.ErrorCode.boto_old_style)
- else:
- if cfs_supported:
- calling_format = OrdinaryCallingFormat()
- else:
- calling_format = None
-
- except ImportError:
- log.FatalError("This backend (s3) requires boto library, version %s or later, "
- "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
- log.ErrorCode.boto_lib_too_old)
-
- if not self.parsed_url.hostname:
- # Use the default host.
- self.conn = self.storage_uri.connect(
- is_secure=(not globals.s3_unencrypted_connection))
- else:
- assert self.scheme == 's3'
- self.conn = self.storage_uri.connect(
- host=self.parsed_url.hostname,
- is_secure=(not globals.s3_unencrypted_connection))
-
- if hasattr(self.conn, 'calling_format'):
- if calling_format is None:
- log.FatalError("It seems we previously failed to detect support for calling "
- "formats in the boto library, yet the support is there. This is "
- "almost certainly a duplicity bug.",
- log.ErrorCode.boto_calling_format)
- else:
- self.conn.calling_format = calling_format
-
- else:
- # Duplicity hangs if boto gets a null bucket name.
- # HC: Caught a socket error, trying to recover
- raise BackendException('Boto requires a bucket name.')
-
+ del self.conn
+ self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
self.bucket = self.conn.lookup(self.bucket_name)
def put(self, source_path, remote_filename=None):
@@ -181,6 +193,7 @@
break
if n > 1:
time.sleep(30)
+ self.resetConnection()
try:
try:
self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
@@ -198,7 +211,6 @@
"" % (n, self.bucket_name,
e.__class__.__name__,
str(e)))
- self.resetConnection()
if not remote_filename:
remote_filename = source_path.get_filename()
@@ -215,14 +227,17 @@
storage_class = 'STANDARD'
log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
try:
- key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream',
- 'x-amz-storage-class': storage_class},
- cb=progress.report_transfer,
- num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
- ) # Max num of callbacks = 8 times x megabyte
-
- key.close()
+ headers = {
+ 'Content-Type': 'application/octet-stream',
+ 'x-amz-storage-class': storage_class
+ }
+ upload_start = time.time()
+ self.upload(source_path.name, key, headers)
+ upload_end = time.time()
+ total_s = abs(upload_end-upload_start) or 1 # prevent a zero value!
+ rough_upload_speed = os.path.getsize(source_path.name)/total_s
self.resetConnection()
+ log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
return
except Exception, e:
log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
@@ -238,19 +253,20 @@
raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
def get(self, remote_filename, local_path):
+ key_name = self.key_prefix + remote_filename
+ self.pre_process_download(remote_filename, wait=True)
+ key = self._listed_keys[key_name]
for n in range(1, globals.num_retries+1):
if n > 1:
# sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
time.sleep(10)
log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
try:
- key_name = self.key_prefix + remote_filename
- key = self.bucket.get_key(key_name)
+ self.resetConnection()
if key is None:
raise BackendException("%s: key not found" % key_name)
key.get_contents_to_filename(local_path.name)
local_path.setdata()
- self.resetConnection()
return
except Exception, e:
log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
@@ -260,7 +276,7 @@
e.__class__.__name__,
str(e)), 1)
log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
- self.resetConnection()
+
log.Warn("Giving up trying to download %s/%s after %d attempts" %
(self.straight_url, remote_filename, globals.num_retries))
raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
@@ -273,6 +289,7 @@
if n > 1:
# sleep before retry
time.sleep(30)
+ self.resetConnection()
log.Info("Listing %s" % self.straight_url)
try:
return self._list_filenames_in_bucket()
@@ -298,10 +315,11 @@
# Because of the need for this optimization, it should be left as is.
#for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
filename_list = []
- for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
+ for k in self.bucket.list(prefix=self.key_prefix, delimiter='/'):
try:
filename = k.key.replace(self.key_prefix, '', 1)
filename_list.append(filename)
+ self._listed_keys[k.key] = k
log.Debug("Listed %s/%s" % (self.straight_url, filename))
except AttributeError:
pass
@@ -330,6 +348,53 @@
else:
return {'size': None}
-duplicity.backend.register_backend("gs", BotoBackend)
-duplicity.backend.register_backend("s3", BotoBackend)
-duplicity.backend.register_backend("s3+http", BotoBackend)
+ def upload(self, filename, key, headers):
+ key.set_contents_from_filename(filename, headers,
+ cb=progress.report_transfer,
+ num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
+ ) # Max num of callbacks = 8 times x megabyte
+ key.close()
+
+ def pre_process_download(self, files_to_download, wait=False):
+ # Used primarily to move files in Glacier to S3
+ if isinstance(files_to_download, basestring):
+ files_to_download = [files_to_download]
+
+ for remote_filename in files_to_download:
+ success = False
+ for n in range(1, globals.num_retries+1):
+ if n > 1:
+ # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
+ time.sleep(10)
+ self.resetConnection()
+ try:
+ key_name = self.key_prefix + remote_filename
+ if not self._listed_keys.get(key_name, False):
+ self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
+ key = self._listed_keys[key_name]
+
+ if key.storage_class == "GLACIER":
+ # We need to move the file out of glacier
+ if not self.bucket.get_key(key.key).ongoing_restore:
+ log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
+ key.restore(days=1) # Shouldn't need this again after 1 day
+ if wait:
+ log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
+ while self.bucket.get_key(key.key).ongoing_restore:
+ time.sleep(60)
+ self.resetConnection()
+ log.Info("File %s was successfully restored from Glacier" % remote_filename)
+ success = True
+ break
+ except Exception, e:
+ log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)"
+ "" % (self.straight_url,
+ remote_filename,
+ n,
+ e.__class__.__name__,
+ str(e)), 1)
+ log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
+ if not success:
+ log.Warn("Giving up trying to restore %s/%s after %d attempts" %
+ (self.straight_url, remote_filename, globals.num_retries))
+ raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename))
=== modified file 'duplicity/backends/botobackend.py'
--- duplicity/backends/botobackend.py 2012-02-29 16:40:41 +0000
+++ duplicity/backends/botobackend.py 2014-02-21 17:46:27 +0000
@@ -20,13 +20,20 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import duplicity.backend
from duplicity import globals
import sys
+from _boto_multi import BotoBackend as BotoMultiUploadBackend
+from _boto_single import BotoBackend as BotoSingleUploadBackend
if globals.s3_use_multiprocessing:
- if sys.version_info[:2] < (2,6):
- print "Sorry, S3 multiprocessing requires version 2.5 or later of python"
+ if sys.version_info[:2] < (2, 6):
+ print "Sorry, S3 multiprocessing requires version 2.6 or later of python"
sys.exit(1)
- import _boto_multi
+ duplicity.backend.register_backend("gs", BotoMultiUploadBackend)
+ duplicity.backend.register_backend("s3", BotoMultiUploadBackend)
+ duplicity.backend.register_backend("s3+http", BotoMultiUploadBackend)
else:
- import _boto_single
+ duplicity.backend.register_backend("gs", BotoSingleUploadBackend)
+ duplicity.backend.register_backend("s3", BotoSingleUploadBackend)
+ duplicity.backend.register_backend("s3+http", BotoSingleUploadBackend)
=== modified file 'duplicity/commandline.py'
--- duplicity/commandline.py 2014-01-31 12:41:00 +0000
+++ duplicity/commandline.py 2014-02-21 17:46:27 +0000
@@ -495,6 +495,14 @@
parser.add_option("--s3-multipart-chunk-size", type = "int", action = "callback", metavar = _("number"),
callback = lambda o, s, v, p: setattr(p.values, "s3_multipart_chunk_size", v * 1024 * 1024))
+ # Number of processes to set the Processor Pool to when uploading multipart
+ # uploads to S3. Use this to control the maximum simultaneous uploads to S3.
+ parser.add_option("--s3-multipart-max-procs", type="int", metavar=_("number"))
+
+ # Number of seconds to wait for each part of a multipart upload to S3. Use this
+ # to prevent hangups when doing a multipart upload to S3.
+ parser.add_option("--s3_multipart_max_timeout", type="int", metavar=_("number"))
+
# Option to allow the s3/boto backend use the multiprocessing version.
# By default it is off since it does not work for Python 2.4 or 2.5.
if sys.version_info[:2] >= (2, 6):
=== modified file 'duplicity/globals.py'
--- duplicity/globals.py 2014-01-31 12:41:00 +0000
+++ duplicity/globals.py 2014-02-21 17:46:27 +0000
@@ -200,6 +200,12 @@
# Minimum chunk size accepted by S3
s3_multipart_minimum_chunk_size = 5 * 1024 * 1024
+# Maximum number of processes to use while doing a multipart upload to S3
+s3_multipart_max_procs = None
+
+# Maximum time to wait for a part to finish when doig a multipart upload to S3
+s3_multipart_max_timeout = None
+
# Whether to use the full email address as the user name when
# logging into an imap server. If false just the user name
# part of the email address is used.