← Back to team overview

duplicity-team team mailing list archive

[Merge] lp:~juan-f/duplicity/progress into lp:duplicity

 

Juan A. Moya Vicén has proposed merging lp:~juan-f/duplicity/progress into lp:duplicity.

Requested reviews:
  duplicity-team (duplicity-team): new feature

For more details, see:
https://code.launchpad.net/~juan-f/duplicity/progress/+merge/144944

>From time ago, there are people asking for a progress bar estimation in duplicity.
There is even a script that circumvents the issue, getting info from the log so as to estimate the progress status
( https://github.com/quentin/Duplicity-progress ) but does not give enough feedback and the estimation is rather plain.

I have developed a set of heuristics that gather information from the deltas and the transfer ratios of the backend so as to forecast % of progress, estimation of remaining time and average speed, for both full and incremental backup uploads.

The current implementation works for boto backend, but to port the other backends to use this feature would be quite easy (we can discuss the details if interested).

The algorithm is activated by the --progress command line flag, and will perform a first-pass dry-run to collect evidence for all the deltas. Next it will trigger the real upload, while a thread statistically estimates the ratio of changes and compression for the data in/out, and uses these ratios to forecast time remaining and % of completion.

The progress data will be logged periodically each 3 seconds, or the --progress-rate flag.

Enjoy!
-- 
https://code.launchpad.net/~juan-f/duplicity/progress/+merge/144944
Your team duplicity-team is requested to review the proposed merge of lp:~juan-f/duplicity/progress into lp:duplicity.
=== modified file 'bin/duplicity'
--- bin/duplicity	2013-01-06 18:12:52 +0000
+++ bin/duplicity	2013-01-25 13:57:26 +0000
@@ -29,6 +29,8 @@
 
 import getpass, gzip, os, sys, time, types
 import traceback, platform, statvfs, resource, re
+import threading
+from datetime import datetime
 
 pwd = os.path.abspath(os.path.dirname(sys.argv[0]))
 if os.path.exists(os.path.join(pwd, "../duplicity")):
@@ -57,11 +59,11 @@
 from duplicity import tempdir
 from duplicity import asyncscheduler
 from duplicity import util
+from duplicity import progress
 
 # If exit_val is not None, exit with given value at end.
 exit_val = None
 
-
 def get_passphrase(n, action, for_signing = False):
     """
     Check to make sure passphrase is indeed needed, then get
@@ -307,6 +309,7 @@
             tdp.delete()
         return putsize
 
+
     def validate_encryption_settings(backup_set, manifest):
         """
         When restarting a backup, we have no way to verify that the current
@@ -368,6 +371,11 @@
 
     io_scheduler = asyncscheduler.AsyncScheduler(globals.async_concurrency)
     async_waiters = []
+    # If --progress option is given, initiate a background thread that will
+    # periodically report progress to the Log.
+    if globals.progress:
+        progress.progress_thread = progress.LogProgressThread()
+        progress.progress_thread.start()
 
     while not at_end:
         # set up iterator
@@ -420,6 +428,11 @@
     # Upload the collection summary.
     #bytes_written += write_manifest(mf, backup_type, backend)
 
+    # Terminate the background thread now, if any
+    if globals.progress:
+        progress.progress_thread.finished = True
+        progress.progress_thread.join()
+
     return bytes_written
 
 
@@ -495,6 +508,17 @@
     @rtype: void
     @return: void
     """
+    if globals.progress:
+        progress.tracker = progress.ProgressTracker()
+        # Fake a backup to compute total of moving bytes
+        tarblock_iter = diffdir.DirFull(globals.select)
+        dummy_backup(tarblock_iter)
+        # Store computed stats to compute progress later
+        progress.tracker.set_evidence(diffdir.stats)
+        # Reinit the globals.select iterator, so
+        # the core of duplicity can rescan the paths
+        commandline.set_selection()
+
     if globals.dry_run:
         tarblock_iter = diffdir.DirFull(globals.select)
         bytes_written = dummy_backup(tarblock_iter)
@@ -566,6 +590,19 @@
             time.sleep(2)
             dup_time.setcurtime()
             assert dup_time.curtime != dup_time.prevtime, "time not moving forward at appropriate pace - system clock issues?"
+
+    if globals.progress:
+        progress.tracker = progress.ProgressTracker()
+        # Fake a backup to compute total of moving bytes
+        tarblock_iter = diffdir.DirDelta(globals.select,
+                                         sig_chain.get_fileobjs())
+        dummy_backup(tarblock_iter)
+        # Store computed stats to compute progress later
+        progress.tracker.set_evidence(diffdir.stats)
+        # Reinit the globals.select iterator, so
+        # the core of duplicity can rescan the paths
+        commandline.set_selection()
+
     if globals.dry_run:
         tarblock_iter = diffdir.DirDelta(globals.select,
                                          sig_chain.get_fileobjs())

=== modified file 'bin/duplicity.1'
--- bin/duplicity.1	2013-01-18 15:21:08 +0000
+++ bin/duplicity.1	2013-01-25 13:57:26 +0000
@@ -644,6 +644,20 @@
 the new filename format.
 
 .TP
+.B --progress
+When selected, duplicity will output the current upload progress and estimated
+upload time. To annotate changes, it will perform a first dry-run before a full
+or incremental, and then runs the real operation estimating the real upload
+progress.
+
+.TP
+.BI "--progress_rate " number
+Sets the update rate at which duplicity will output the upload progress
+messages (requires
+.B --progress
+option). Default is to prompt the status each 3 seconds.
+
+.TP
 .BI "--rename " "orig new"
 Treats the path
 .I orig

=== modified file 'duplicity/backend.py'
--- duplicity/backend.py	2013-01-10 19:04:39 +0000
+++ duplicity/backend.py	2013-01-25 13:57:26 +0000
@@ -39,6 +39,7 @@
 from duplicity import globals
 from duplicity import log
 from duplicity import urlparse_2_5 as urlparser
+from duplicity import progress
 
 from duplicity.util import exception_traceback
 

=== modified file 'duplicity/backends/_boto_multi.py'
--- duplicity/backends/_boto_multi.py	2012-02-05 17:38:19 +0000
+++ duplicity/backends/_boto_multi.py	2013-01-25 13:57:26 +0000
@@ -23,6 +23,8 @@
 import os
 import sys
 import time
+import threading
+import Queue
 
 import duplicity.backend
 
@@ -32,6 +34,7 @@
 from duplicity.util import exception_traceback
 from duplicity.backend import retry
 from duplicity.filechunkio import FileChunkIO
+from duplicity import progress
 
 BOTO_MIN_VERSION = "1.6a"
 
@@ -43,6 +46,27 @@
     import multiprocessing
 
 
+class ConsumerThread(threading.Thread):
+    """
+    A background thread that collects all written bytes from all
+    the pool workers, and reports it to the progress module.
+    Wakes up every second to check for termination
+    """
+    def __init__(self, queue):
+        super(ConsumerThread, self).__init__()
+        self.daemon = True
+        self.finish = False
+        self.queue = queue
+
+    def run(self):
+        while not self.finish:
+            try:
+                args = self.queue.get(True, 1) 
+                progress.report_transfer(args[0], args[1])
+            except Queue.Empty, e:
+                pass
+            
+
 def get_connection(scheme, parsed_url):
     try:
         import boto
@@ -357,14 +381,29 @@
 
         mp = self.bucket.initiate_multipart_upload(key, headers)
 
+        # Initiate a queue to share progress data between the pool
+        # workers and a consumer thread, that will collect and report
+        queue = None
+        if globals.progress:
+            manager = multiprocessing.Manager()
+            queue = manager.Queue()
+            consumer = ConsumerThread(queue)
+            consumer.start()
+
         pool = multiprocessing.Pool(processes=chunks)
         for n in range(chunks):
              params = [self.scheme, self.parsed_url, self.bucket_name, 
-                 mp.id, filename, n, chunk_size, globals.num_retries]
+                 mp.id, filename, n, chunk_size, globals.num_retries, 
+                 queue]
              pool.apply_async(multipart_upload_worker, params)
         pool.close()
         pool.join()
 
+        # Terminate the consumer thread, if any
+        if globals.progress:
+            consumer.finish = True
+            consumer.join()
+
         if len(mp.get_all_parts()) < chunks:
             mp.cancel_upload()
             raise BackendException("Multipart upload failed. Aborted.")
@@ -373,7 +412,7 @@
 
 
 def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
-                            offset, bytes, num_retries):
+                            offset, bytes, num_retries, queue):
     """
     Worker method for uploading a file chunk to S3 using multipart upload.
     Note that the file chunk is read into memory, so it's important to keep
@@ -384,6 +423,8 @@
     def _upload_callback(uploaded, total):
         worker_name = multiprocessing.current_process().name
         log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
+        if not queue is None:
+            queue.put([uploaded, total]) # Push data to the consumer thread
 
     def _upload(num_retries):
         worker_name = multiprocessing.current_process().name
@@ -395,7 +436,9 @@
             for mp in bucket.get_all_multipart_uploads():
                 if mp.id == multipart_id:
                     with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
-                        mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback)
+                        mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
+                                                    num_cb=max(2, 8 * bytes / (1024 * 1024))
+                                                ) # Max num of callbacks = 8 times x megabyte
                     break
         except Exception, e:
             traceback.print_exc()

=== modified file 'duplicity/backends/_boto_single.py'
--- duplicity/backends/_boto_single.py	2011-11-25 17:47:57 +0000
+++ duplicity/backends/_boto_single.py	2013-01-25 13:57:26 +0000
@@ -27,6 +27,7 @@
 from duplicity.errors import * #@UnusedWildImport
 from duplicity.util import exception_traceback
 from duplicity.backend import retry
+from duplicity import progress
 
 BOTO_MIN_VERSION = "1.6a"
 
@@ -200,6 +201,7 @@
             remote_filename = source_path.get_filename()
         key = self.key_class(self.bucket)
         key.key = self.key_prefix + remote_filename
+
         for n in range(1, globals.num_retries+1):
             if n > 1:
                 # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
@@ -212,7 +214,11 @@
             log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
             try:
                 key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream',
-                                                                  'x-amz-storage-class': storage_class})
+                                                                  'x-amz-storage-class': storage_class},
+                                                cb=progress.report_transfer,
+                                                num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
+                                              ) # Max num of callbacks = 8 times x megabyte
+
                 key.close()
                 self.resetConnection()
                 return

=== modified file 'duplicity/commandline.py'
--- duplicity/commandline.py	2013-01-18 15:17:55 +0000
+++ duplicity/commandline.py	2013-01-25 13:57:26 +0000
@@ -431,6 +431,12 @@
                       callback=lambda o, s, v, p: (setattr(p.values, o.dest, True),
                                                    old_fn_deprecation(s)))
 
+    # Used to display the progress for the full and incremental backup operations
+    parser.add_option("--progress", action="store_true")
+
+    # Used to control the progress option update rate in seconds. Default: prompts each 3 seconds
+    parser.add_option("--progress-rate", type="int", metavar=_("number"))
+
     # option to trigger Pydev debugger
     parser.add_option("--pydevd", action="store_true")
 

=== modified file 'duplicity/diffdir.py'
--- duplicity/diffdir.py	2013-01-06 18:12:52 +0000
+++ duplicity/diffdir.py	2013-01-25 13:57:26 +0000
@@ -27,15 +27,16 @@
 the second, the ROPath iterator is put into tar block form.
 """
 
-import cStringIO, types
+import cStringIO, types, math
 from duplicity import statistics
 from duplicity import util
 from duplicity.path import * #@UnusedWildImport
 from duplicity.lazy import * #@UnusedWildImport
+from duplicity import progress
 
-# A StatsObj will be written to this from DirDelta_WriteSig only.
+# A StatsObj will be written to this from DirDelta and DirDelta_WriteSig.
 stats = None
-
+tracker = None
 
 class DiffDirException(Exception):
     pass
@@ -82,7 +83,7 @@
     else:
         sig_iter = sigtar2path_iter(dirsig_fileobj_list)
     delta_iter = get_delta_iter(path_iter, sig_iter)
-    if globals.dry_run:
+    if globals.dry_run or (globals.progress and not progress.tracker.has_collected_evidence()):
         return DummyBlockIter(delta_iter)
     else:
         return DeltaTarBlockIter(delta_iter)
@@ -363,7 +364,7 @@
     else:
         sig_path_iter = sigtar2path_iter(sig_infp_list)
     delta_iter = get_delta_iter(path_iter, sig_path_iter, newsig_outfp)
-    if globals.dry_run:
+    if globals.dry_run or (globals.progress and not progress.tracker.has_collected_evidence()):
         return DummyBlockIter(delta_iter)
     else:
         return DeltaTarBlockIter(delta_iter)

=== modified file 'duplicity/globals.py'
--- duplicity/globals.py	2013-01-10 19:04:39 +0000
+++ duplicity/globals.py	2013-01-25 13:57:26 +0000
@@ -241,3 +241,11 @@
 
 # Renames (--rename)
 rename = {}
+
+# When selected, triggers a dry-run before a full or incremental to compute
+# changes, then runs the real operation and keeps track of the real progress
+progress = False
+
+# Controls the upload progress messages refresh rate. Default: update each 
+# 3 seconds
+progress_rate = 3 

=== modified file 'duplicity/log.py'
--- duplicity/log.py	2012-06-21 19:13:45 +0000
+++ duplicity/log.py	2013-01-25 13:57:26 +0000
@@ -26,6 +26,7 @@
 import os
 import sys
 import logging
+import datetime
 
 MIN = 0
 ERROR = 0
@@ -100,6 +101,7 @@
     synchronous_upload_done = 13
     asynchronous_upload_done = 14
     skipping_socket = 15
+    upload_progress = 16
 
 def Info(s, code=InfoCode.generic, extra=None):
     """Shortcut used for info messages (verbosity 5)."""
@@ -113,6 +115,83 @@
         controlLine = '%d' % current
     Log(s, INFO, InfoCode.progress, controlLine)
 
+def _ElapsedSecs2Str(secs):
+    tdelta = datetime.timedelta(seconds=secs)
+    hours,rem = divmod(tdelta.seconds, 3600)
+    minutes,seconds = divmod(rem, 60)
+    fmt = ""
+    if tdelta.days > 0:
+        fmt = "%dd," % (tdelta.days)
+    fmt = "%s%02d:%02d:%02d" % (fmt, hours, minutes, seconds)
+    return fmt
+
+def _RemainingSecs2Str(secs):
+    tdelta = datetime.timedelta(seconds=secs)
+    hours,rem = divmod(tdelta.seconds, 3600)
+    minutes,seconds = divmod(rem, 60)
+    fmt = ""
+    if tdelta.days > 0:
+        fmt = "%dd" % (tdelta.days)
+        if hours > 0:
+            fmt = "%s %dh" % (fmt, hours)
+        if minutes > 0:
+            fmt = "%s %dmin" % (fmt, minutes)
+    elif hours > 0:
+        fmt = "%dh" % hours
+        if minutes > 0:
+            fmt = "%s %dmin" % (fmt, minutes)
+    elif minutes > 5:
+        fmt = "%dmin" % minutes
+    elif minutes > 0:
+        fmt = "%dmin" % minutes
+        if seconds >= 30:
+            fmt = "%s 30sec" % fmt
+    elif seconds > 45:
+        fmt = "< 1min"
+    elif seconds > 30:
+        fmt = "< 45sec"
+    elif seconds > 15:
+        fmt = "< 30sec"
+    else:
+        fmt = "%dsec" % seconds
+    return fmt
+
+def TransferProgress(progress, eta, changed_bytes, elapsed, speed, stalled):
+    """Shortcut used for upload progress messages (verbosity 5)."""
+    dots = int(0.4 * progress) # int(40.0 * progress / 100.0) -- for 40 chars
+    data_amount = float(changed_bytes) / 1024.0
+    data_scale = "KB"
+    if data_amount > 1000.0:
+        data_amount /= 1024.0
+        data_scale = "MB"
+    if data_amount > 1000.0:
+        data_amount /= 1024.0
+        data_scale = "GB"
+    if stalled:
+        eta_str = "Stalled!" 
+        speed_amount = 0
+        speed_scale = "B"
+    else:
+        eta_str = _RemainingSecs2Str(eta)
+        speed_amount = float(speed) / 1024.0
+        speed_scale = "KB"
+        if speed_amount > 1000.0:
+            speed_amount /= 1024.0
+            speed_scale = "MB"
+        if speed_amount > 1000.0:
+            speed_amount /= 1024.0
+            speed_scale = "GB"
+    s = "%.1f%s %s [%.1f%s/s] [%s>%s] %d%% ETA %s" % (data_amount, data_scale,
+                                                            _ElapsedSecs2Str(elapsed), 
+                                                            speed_amount, speed_scale, 
+                                                            '='*dots, ' '*(40-dots), 
+                                                            progress, 
+                                                            eta_str
+                                                          )
+
+    controlLine = "%d %d %d %d %d" % (changed_bytes, elapsed, progress, eta, stalled)
+    Log(s, INFO, InfoCode.upload_progress, controlLine)
+
 def PrintCollectionStatus(col_stats, force_print=False):
     """Prints a collection status to the log"""
     Log(str(col_stats), 8, InfoCode.collection_status,

=== added file 'duplicity/progress.py'
--- duplicity/progress.py	1970-01-01 00:00:00 +0000
+++ duplicity/progress.py	2013-01-25 13:57:26 +0000
@@ -0,0 +1,253 @@
+# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
+#
+# Copyright 2002 Ben Escoto <ben@xxxxxxxxxxx>
+# Copyright 2007 Kenneth Loafman <kenneth@xxxxxxxxxxx>
+#
+# This file is part of duplicity.
+#
+# Duplicity is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# Duplicity is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with duplicity; if not, write to the Free Software Foundation,
+# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# @author: Juan Antonio Moya Vicen <juan@xxxxxxxxxxxxxxxx>
+#
+"""
+Functions to compute progress of compress & upload files
+The heuristics try to infer the ratio between the amount of data collected
+by the deltas and the total size of the changing files. It also infers the
+compression and encryption ration of the raw deltas before sending them to
+the backend.
+With the inferred ratios, the heuristics estimate the percentage of completion
+and the time left to transfer all the (yet unknown) amount of data to send.
+This is a forecast based on gathered evidence.
+"""
+
+
+import math
+import threading
+import time
+from datetime import datetime, timedelta
+from duplicity import globals
+from duplicity import log
+
+def import_non_local(name, custom_name=None):
+    """
+    This function is needed to play a trick... as there exists a local
+    "collections" module, that is named the same as a system module
+    """
+    import imp, sys
+
+    custom_name = custom_name or name
+
+    f, pathname, desc = imp.find_module(name, sys.path[1:])
+    module = imp.load_module(custom_name, f, pathname, desc)
+    f.close()
+
+    return module
+
+"""
+Import non-local module, use a custom name to differentiate it from local
+This name is only used internally for identifying the module. We decide
+the name in the local scope by assigning it to the variable sys_collections.
+"""
+sys_collections = import_non_local('collections','sys_collections')
+
+
+
+tracker = None
+progress_thread = None
+
+class ProgressTracker():
+    
+    def __init__(self):
+        self.total_stats = None
+        self.nsteps = 0
+        self.start_time = None
+        self.change_mean_ratio = 0.0
+        self.change_r_estimation = 0.0
+        self.compress_mean_ratio = 0.0
+        self.compress_r_estimation = 0.0
+        self.progress_estimation = 0.0
+        self.time_estimation = 0
+        self.total_bytecount = 0
+        self.last_total_bytecount = 0
+        self.last_bytecount = 0
+        self.stall_last_time = None
+        self.last_time = None
+        self.elapsed_sum = timedelta()
+        self.speed = 0.0
+        self.transfers = sys_collections.deque()
+    
+    def has_collected_evidence(self):
+        """
+        Returns true if the progress computation is on and duplicity has not
+        yet started the first dry-run pass to collect some information
+        """
+        return (not self.total_stats is None)
+    
+    def log_upload_progress(self):
+        """
+        Aproximative and evolving method of computing the progress of upload
+        """
+        if not globals.progress or not self.has_collected_evidence():
+            return
+
+        current_time = datetime.now()
+        if self.start_time is None:
+            self.start_time = current_time
+        if not self.last_time is None:
+            elapsed = (current_time - self.last_time)
+        else:
+            elapsed = timedelta()
+        self.last_time = current_time
+    
+        # Detect (and report) a stallment if no changing data for more than 5 seconds
+        if self.stall_last_time is None:
+            self.stall_last_time = current_time
+        if (current_time - self.stall_last_time).seconds > max(5, 2 * globals.progress_rate):
+            log.TransferProgress(100.0 * self.progress_estimation, 
+                                    self.time_estimation, self.total_bytecount, 
+                                    (current_time - self.start_time).seconds,
+                                    self.speed, 
+                                    True
+                                )
+            return
+    
+        self.nsteps += 1
+    
+        """
+        Compute the ratio of information being written for deltas vs file sizes
+        Using Knuth algorithm to estimate approximate upper bound in % of completion
+        The progress is estimated on the current bytes written vs the total bytes to
+        change as estimated by a first-dry-run. The weight is the ratio of changing 
+        data (Delta) against the total file sizes. (pessimistic estimation)
+        """
+        from duplicity import diffdir
+        changes = diffdir.stats.NewFileSize + diffdir.stats.ChangedFileSize
+        total_changes = self.total_stats.NewFileSize + self.total_stats.ChangedFileSize
+        if changes == 0 or total_changes == 0:
+            return
+    
+        # Snapshot current values for progress
+        last_progress_estimation = self.progress_estimation
+        
+        # Compute ratio of changes
+        change_ratio = diffdir.stats.RawDeltaSize / float(changes)
+        change_delta = change_ratio - self.change_mean_ratio
+        self.change_mean_ratio += change_delta / float(self.nsteps) # mean cumulated ratio
+        self.change_r_estimation += change_delta * (change_ratio - self.change_mean_ratio)
+        change_sigma = math.sqrt(math.fabs(self.change_r_estimation / float(self.nsteps)))
+    
+        # Compute ratio of compression of the deltas
+        compress_ratio = self.total_bytecount / float(diffdir.stats.RawDeltaSize)
+        compress_delta = compress_ratio - self.compress_mean_ratio
+        self.compress_mean_ratio += compress_delta / float(self.nsteps) # mean cumulated ratio
+        self.compress_r_estimation += compress_delta * (compress_ratio - self.compress_mean_ratio)
+        compress_sigma = math.sqrt(math.fabs(self.compress_r_estimation / float(self.nsteps)))
+    
+        # Combine 2 statistically independent variables (ratios) optimistically
+        self.progress_estimation = (self.change_mean_ratio * self.compress_mean_ratio 
+                                        + change_sigma + compress_sigma) * float(changes) / float(total_changes)
+        self.progress_estimation = max(0.0, min(self.progress_estimation, 1.0))
+    
+
+        """
+        Estimate the time just as a projection of the remaining time, fit to a [(1 - x) / x] curve
+        """
+        self.elapsed_sum += elapsed # As sum of timedeltas, so as to avoid clock skew in long runs (adding also microseconds)
+        projection = 1.0
+        if self.progress_estimation > 0:
+            projection = (1.0 - self.progress_estimation) / self.progress_estimation
+        self.time_estimation = long(projection * float(self.elapsed_sum.total_seconds()))
+    
+        # Apply values only when monotonic, so the estimates look more consistent to the human eye
+        if self.progress_estimation < last_progress_estimation:
+            self.progress_estimation = last_progress_estimation
+    
+        """
+        Compute Exponential Moving Average of speed as bytes/sec of the last 30 probes
+        """
+        self.transfers.append(float(self.total_bytecount - self.last_total_bytecount) / float(elapsed.total_seconds()))
+        self.last_total_bytecount = self.total_bytecount
+        if len(self.transfers) > 30:
+            self.transfers.popleft()
+        self.speed = 0.0
+        for x in self.transfers:
+            self.speed = 0.3 * x + 0.7 * self.speed
+
+        log.TransferProgress(100.0 * self.progress_estimation, 
+                                self.time_estimation, 
+                                self.total_bytecount, 
+                                (current_time - self.start_time).seconds, 
+                                self.speed,
+                                False
+                            )
+    
+    
+    def annotate_written_bytes(self, bytecount):
+        """
+        Annotate the number of bytes that have been added/changed since last time
+        this function was called.
+        bytecount param will show the number of bytes since the start of the current
+        volume and for the current volume
+        """
+        changing = max(bytecount - self.last_bytecount, 0)
+        self.total_bytecount += long(changing) # Annotate only changing bytes since last probe
+        self.last_bytecount = bytecount
+        if changing > 0:
+            self.stall_last_time = datetime.now()
+    
+    def set_evidence(self, stats):
+        """
+        Stores the collected statistics from a first-pass dry-run, to use this
+        information later so as to estimate progress
+        """
+        self.total_stats = stats
+    
+    def total_elapsed_seconds(self):
+        """
+        Elapsed seconds since the first call to log_upload_progress method
+        """
+        return (datetime.now() - self.start_time).seconds
+    
+
+def report_transfer(bytecount, totalbytes):
+    """
+    Method to call tracker.annotate_written_bytes from outside
+    the class, and to offer the "function(long, long)" signature
+    which is handy to pass as callback
+    """
+    global tracker
+    global progress_thread
+    if not progress_thread is None and not tracker is None:
+        tracker.annotate_written_bytes(bytecount)
+
+
+class LogProgressThread(threading.Thread):
+    """
+    Background thread that reports progress to the log, 
+    every --progress-rate seconds 
+    """
+    def __init__(self):
+        super(LogProgressThread, self).__init__()
+        self.setDaemon(True)
+        self.finished = False
+
+    def run(self):
+        global tracker
+        if not globals.dry_run and globals.progress and tracker.has_collected_evidence():
+            while not self.finished:
+                tracker.log_upload_progress()
+                time.sleep(globals.progress_rate)
+            log.TransferProgress(100.0, 0, tracker.total_bytecount, tracker.total_elapsed_seconds(), tracker.speed, False)
+


Follow ups