← Back to team overview

duplicity-team team mailing list archive

[Merge] lp:~mterry/duplicity/consolidate-tests into lp:duplicity

 

Michael Terry has proposed merging lp:~mterry/duplicity/consolidate-tests into lp:duplicity.

Requested reviews:
  duplicity-team (duplicity-team)

For more details, see:
https://code.launchpad.net/~mterry/duplicity/consolidate-tests/+merge/215986

Consolidate all the duplicity-running code in the test framework.

This takes the four test files [1] that run duplicity itself (i.e. the high-level functional test files) and consolidates their code.  Before, it was madness.  Each one had its own run_duplicity method, with its own slightly tweaked features.  This should reduce a lot of duplication.

[1] cleanuptest.py, finaltest.py, restarttest.py, and badupload.py
-- 
https://code.launchpad.net/~mterry/duplicity/consolidate-tests/+merge/215986
Your team duplicity-team is requested to review the proposed merge of lp:~mterry/duplicity/consolidate-tests into lp:duplicity.
=== modified file 'testing/helpers/helper.py'
--- testing/helpers/helper.py	2011-11-04 19:18:51 +0000
+++ testing/helpers/helper.py	2014-04-15 23:45:18 +0000
@@ -20,15 +20,20 @@
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
 import os
+import time
+import unittest
+
 from duplicity import backend
 from duplicity import globals
 from duplicity import log
+from duplicity import pexpect
 
 sign_key = '56538CCF'
 sign_passphrase = 'test'
 encrypt_key1 = 'B5FA894F'
 encrypt_key2 = '9B736B2A'
 
+# TODO: remove this method
 def setup():
     """ setup for unit tests """
     log.setup()
@@ -36,11 +41,143 @@
     globals.print_statistics = 0
     backend.import_backends()
 
-def set_environ(varname, value):
-    if value is not None:
-        os.environ[varname] = value
-    else:
-        try:
-            del os.environ[varname]
-        except Exception:
-            pass
+
+class CmdError(Exception):
+    """Indicates an error running an external command"""
+    def __init__(self, code):
+        Exception.__init__(self, code)
+        self.exit_status = code
+
+
+class DuplicityTestCase(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.class_args = []
+        cls.backend_url = "file://testfiles/output"
+        cls.sign_key = sign_key
+        cls.sign_passphrase = sign_passphrase
+        cls.encrypt_key1 = encrypt_key1
+        cls.encrypt_key2 = encrypt_key2        
+        setup()
+
+    def setUp(self):
+        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
+        assert not os.system("rm -rf testfiles/output testfiles/largefiles "
+                             "testfiles/restore_out testfiles/cache")
+        assert not os.system("mkdir testfiles/output testfiles/cache")
+
+        backend_inst = backend.get_backend(self.backend_url)
+        bl = backend_inst.list()
+        if bl:
+            backend_inst.delete(backend_inst.list())
+        backend_inst.close()
+
+        self.last_backup = None
+        self.set_environ('PASSPHRASE', self.sign_passphrase)
+        self.set_environ("SIGN_PASSPHRASE", self.sign_passphrase)
+
+    def tearDown(self):
+        self.set_environ("PASSPHRASE", None)
+        assert not os.system("rm -rf testfiles tempdir temp2.tar")
+
+    def set_environ(self, varname, value):
+        if value is not None:
+            os.environ[varname] = value
+        else:
+            try:
+                del os.environ[varname]
+            except Exception:
+                pass
+
+    def run_duplicity(self, options=[], current_time=None, fail=None,
+                      passphrase_input=[]):
+        """
+        Run duplicity binary with given arguments and options
+        """
+        # We run under setsid and take input from /dev/null (below) because
+        # this way we force a failure if duplicity tries to read from the
+        # console unexpectedly (like for gpg password or such).
+        cmd_list = ["setsid", "duplicity"]
+        cmd_list.extend(options)
+        cmd_list.extend(["-v0"])
+        cmd_list.extend(["--no-print-statistics"])
+        cmd_list.extend(["--allow-source-mismatch"])
+        cmd_list.extend(["--archive-dir=testfiles/cache"])
+        if current_time:
+            cmd_list.extend(["--current-time", current_time])
+        cmd_list.extend(self.class_args)
+        if fail:
+            cmd_list.extend(["--fail", str(fail)])
+        cmdline = " ".join(map(lambda x: '"%s"' % x, cmd_list))
+
+        if not passphrase_input:
+            cmdline += " < /dev/null"
+        child = pexpect.spawn('/bin/sh', ['-c', cmdline])
+        for passphrase in passphrase_input:
+            child.expect('passphrase.*:')
+            child.sendline(passphrase)
+        child.wait()
+        return_val = child.exitstatus
+
+        #print "Ran duplicity command: ", cmdline, "\n with return_val: ", child.exitstatus
+        if fail:
+            self.assertEqual(30, child.exitstatus)
+        elif return_val:
+            raise CmdError(child.exitstatus)
+
+    def backup(self, type, input_dir, options=[], **kwargs):
+        """Run duplicity backup to default directory"""
+        options = [type, input_dir, self.backend_url, "--volsize", "1"] + options
+        before_files = self.get_backend_files()
+
+        # If a chain ends with time X and the next full chain begins at time X,
+        # we may trigger an assert in collections.py.  If needed, sleep to
+        # avoid such problems
+        if self.last_backup == int(time.time()):
+            time.sleep(1)
+
+        result = self.run_duplicity(options=options, **kwargs)
+        self.last_backup = int(time.time())
+
+        after_files = self.get_backend_files()
+        return after_files - before_files
+
+    def restore(self, file_to_restore=None, time=None, options=[], **kwargs):
+        assert not os.system("rm -rf testfiles/restore_out")
+        options = [self.backend_url, "testfiles/restore_out"] + options
+        if file_to_restore:
+            options.extend(['--file-to-restore', file_to_restore])
+        if time:
+            options.extend(['--restore-time', str(time)])
+        self.run_duplicity(options=options, **kwargs)
+
+    def verify(self, dirname, file_to_verify=None, time=None, options=[],
+               **kwargs):
+        options = ["verify", self.backend_url, dirname] + options
+        if file_to_verify:
+            options.extend(['--file-to-restore', file_to_verify])
+        if time:
+            options.extend(['--restore-time', str(time)])
+        self.run_duplicity(options=options, **kwargs)
+
+    def cleanup(self, options=[]):
+        """
+        Run duplicity cleanup to default directory
+        """
+        options = ["cleanup", self.backend_url, "--force"] + options
+        self.run_duplicity(options=options)
+
+    def get_backend_files(self):
+        backend_inst = backend.get_backend(self.backend_url)
+        bl = backend_inst.list()
+        backend_inst.close()
+        return set(bl)
+
+    def make_largefiles(self, count=3, size=2):
+        """
+        Makes a number of large files in testfiles/largefiles that each are
+        the specified number of megabytes.
+        """
+        assert not os.system("mkdir testfiles/largefiles")
+        for n in range(count):
+            assert not os.system("dd if=/dev/urandom of=testfiles/largefiles/file%d bs=1024 count=%d > /dev/null 2>&1" % (n + 1, size * 1024))

=== modified file 'testing/tests/badupload.py'
--- testing/tests/badupload.py	2011-11-07 15:08:24 +0000
+++ testing/tests/badupload.py	2014-04-15 23:45:18 +0000
@@ -20,63 +20,24 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
-import helper
-import os, unittest, sys
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = 'file://testfiles/output'
-
-class CmdError(Exception):
-    """Indicates an error running an external command"""
-    return_val = -1
-    def __init__(self, return_val):
-        self.return_val = os.WEXITSTATUS(return_val)
-
-class BadUploadTest(unittest.TestCase):
+import unittest
+
+from helper import CmdError, DuplicityTestCase
+
+
+class BadUploadTest(DuplicityTestCase):
     """
     Test missing volume upload using duplicity binary
     """
-    def setUp(self):
-        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
-    def tearDown(self):
-        assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
-    def run_duplicity(self, arglist, options = []):
-        """
-        Run duplicity binary with given arguments and options
-        """
-        options.append("--archive-dir testfiles/cache")
-        cmd_list = ["duplicity"]
-        cmd_list.extend(options + ["--allow-source-mismatch"])
-        cmd_list.extend(arglist)
-        cmdline = " ".join(cmd_list)
-        if not os.environ.has_key('PASSPHRASE'):
-            os.environ['PASSPHRASE'] = 'foobar'
-        return_val = os.system(cmdline)
-        if return_val:
-            raise CmdError(return_val)
-
-    def backup(self, type, input_dir, options = []):
-        """Run duplicity backup to default directory"""
-        options = options[:]
-        if type == "full":
-            options.insert(0, 'full')
-        args = [input_dir, "'%s'" % backend_url]
-        self.run_duplicity(args, options)
-
     def test_missing_file(self):
         """
         Test basic lost file
         """
-        # we know we're going to fail this one, its forced
         try:
-            self.backup("full", "testfiles/dir1", options = ["--skip-volume 1"])
-            assert False # shouldn't get this far
+            self.backup("full", "testfiles/dir1", options=["--skip-volume=1"])
+            self.fail()
         except CmdError, e:
-            assert e.return_val == 44, e.return_val
+            self.assertEqual(e.exit_status, 44)
 
 if __name__ == "__main__":
     unittest.main()

=== modified file 'testing/tests/cleanuptest.py'
--- testing/tests/cleanuptest.py	2012-09-30 21:18:32 +0000
+++ testing/tests/cleanuptest.py	2014-04-15 23:45:18 +0000
@@ -19,125 +19,35 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
-import helper
-import sys, os, unittest, time
-
-import duplicity.backend
-from duplicity import path
-
-helper.setup()
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = []
-
-class CmdError(Exception):
-    """Indicates an error running an external command"""
-    pass
-
-class CleanupTest(unittest.TestCase):
+import unittest
+
+from helper import DuplicityTestCase
+
+
+class CleanupTest(DuplicityTestCase):
     """
     Test cleanup using duplicity binary
     """
-    def setUp(self):
-        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-        self.deltmp()
-
-    def tearDown(self):
-        assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
-    def run_duplicity(self, arglist, options = [], current_time = None):
-        """
-        Run duplicity binary with given arguments and options
-        """
-        before_files = set(os.listdir("testfiles/output"))
-        options.append("--archive-dir testfiles/cache")
-        cmd_list = ["duplicity"]
-        cmd_list.extend(options + ["--allow-source-mismatch"])
-        if current_time:
-            cmd_list.append("--current-time %s" % (current_time,))
-        if other_args:
-            cmd_list.extend(other_args)
-        cmd_list.extend(arglist)
-        cmdline = " ".join(cmd_list)
-        #print "Running '%s'." % cmdline
-        if not os.environ.has_key('PASSPHRASE'):
-            os.environ['PASSPHRASE'] = 'foobar'
-#        print "CMD: %s" % cmdline
-        return_val = os.system(cmdline)
-        if return_val:
-            raise CmdError(return_val)
-        after_files = set(os.listdir("testfiles/output"))
-        return after_files - before_files
-
-    def backup(self, type, input_dir, options = [], current_time = None):
-        """
-        Run duplicity backup to default directory
-        """
-        options = options[:]
-        if type == "full":
-            options.insert(0, 'full')
-        args = [input_dir, "file://testfiles/output"]
-        new_files = self.run_duplicity(args, options, current_time)
-        # If a chain ends with time X and the next full chain begins at time X,
-        # we may trigger an assert in collections.py.  This way, we avoid
-        # such problems
-        time.sleep(1)
-        return new_files
-
-    def verify(self, dirname, file_to_verify = None, time = None, options = [],
-               current_time = None):
-        options = ["verify"] + options[:]
-        args = ["file://testfiles/output", dirname]
-        if file_to_verify:
-            options.extend(['--file-to-restore', file_to_verify])
-        if time:
-            options.extend(['--restore-time', str(time)])
-        return self.run_duplicity(args, options, current_time)
-
-    def cleanup(self, options = []):
-        """
-        Run duplicity cleanup to default directory
-        """
-        options = ["cleanup"] + options[:]
-        args = ["file://testfiles/output"]
-        return self.run_duplicity(args, options)
-
-    def deltmp(self):
-        """
-        Delete temporary directories
-        """
-        assert not os.system("rm -rf testfiles/output testfiles/cache")
-        assert not os.system("mkdir testfiles/output testfiles/cache")
-        backend = duplicity.backend.get_backend("file://testfiles/output")
-        bl = backend.list()
-        if bl:
-            backend.delete(backend.list())
-        backend.close()
-
     def test_cleanup_after_partial(self):
         """
         Regression test for https://bugs.launchpad.net/bugs/409593
         where duplicity deletes all the signatures during a cleanup
         after a failed backup.
         """
-        good_files = self.backup("full", "/bin", options = ["--vol 1"])
-        good_files |= self.backup("inc", "/bin", options = ["--vol 1"])
-        good_files |= self.backup("inc", "/bin", options = ["--vol 1"])
-        # we know we're going to fail these, they are forced
-        try:
-            self.backup("full", "/bin", options = ["--vol 1", "--fail 1"])
-            self.fail("Not supposed to reach this far")
-        except CmdError:
-            bad_files = set(os.listdir("testfiles/output"))
-            bad_files -= good_files
-            self.assertNotEqual(bad_files, set())
+        self.make_largefiles()
+        good_files = self.backup("full", "testfiles/largefiles")
+        good_files |= self.backup("inc", "testfiles/largefiles")
+        good_files |= self.backup("inc", "testfiles/largefiles")
+        self.backup("full", "testfiles/largefiles", fail=1)
+        bad_files = self.get_backend_files()
+        bad_files -= good_files
+        self.assertNotEqual(bad_files, set())
         # the cleanup should go OK
-        self.cleanup(options = ["--force"])
-        leftovers = set(os.listdir("testfiles/output"))
+        self.run_duplicity(options=["cleanup", self.backend_url, "--force"])
+        leftovers = self.get_backend_files()
         self.assertEqual(good_files, leftovers)
-        self.backup("inc", "/bin", options = ["--vol 1"])
-        self.verify("/bin")
+        self.backup("inc", "testfiles/largefiles")
+        self.verify("testfiles/largefiles")
 
     def test_remove_all_but_n(self):
         """
@@ -145,9 +55,8 @@
         """
         full1_files = self.backup("full", "testfiles/empty_dir")
         full2_files = self.backup("full", "testfiles/empty_dir")
-        self.run_duplicity(["file://testfiles/output"],
-                           ["remove-all-but-n", "1", "--force"])
-        leftovers = set(os.listdir("testfiles/output"))
+        self.run_duplicity(options=["remove-all-but-n", "1", self.backend_url, "--force"])
+        leftovers = self.get_backend_files()
         self.assertEqual(full2_files, leftovers)
 
     def test_remove_all_inc_of_but_n(self):
@@ -157,11 +66,9 @@
         full1_files = self.backup("full", "testfiles/empty_dir")
         inc1_files = self.backup("inc", "testfiles/empty_dir")
         full2_files = self.backup("full", "testfiles/empty_dir")
-        self.run_duplicity(["file://testfiles/output"],
-                           ["remove-all-inc-of-but-n-full", "1", "--force"])
-        leftovers = set(os.listdir("testfiles/output"))
+        self.run_duplicity(options=["remove-all-inc-of-but-n-full", "1", self.backend_url, "--force"])
+        leftovers = self.get_backend_files()
         self.assertEqual(full1_files | full2_files, leftovers)
 
-
 if __name__ == "__main__":
     unittest.main()

=== modified file 'testing/tests/finaltest.py'
--- testing/tests/finaltest.py	2014-02-22 16:03:22 +0000
+++ testing/tests/finaltest.py	2014-04-15 23:45:18 +0000
@@ -19,102 +19,20 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
-import helper
-import sys, os, unittest
+import os
+import unittest
 
-import duplicity.backend
 from duplicity import path
-from duplicity import collections
-from duplicity import commandline
-from duplicity import globals
-from duplicity import pexpect
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = "file://testfiles/output"
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = ["--short-filenames"]
-#other_args = ["--ssh-command 'ssh -v'", "--scp-command 'scp -C'"]
-#other_args = ['--no-encryption']
-
-# If this is set to true, after each backup, verify contents
-verify = 1
-
-class CmdError(Exception):
-    """Indicates an error running an external command"""
-    pass
-
-class FinalTest:
+from helper import CmdError, DuplicityTestCase
+
+
+class FinalTest(DuplicityTestCase):
     """
     Test backup/restore using duplicity binary
     """
-    def run_duplicity(self, arglist, options = [], current_time = None,
-                      passphrase_input = None):
-        """Run duplicity binary with given arguments and options"""
-        options.append("--archive-dir testfiles/cache")
-        cmd_list = ["duplicity"]
-        cmd_list.extend(options + ["--allow-source-mismatch"])
-        if current_time:
-            cmd_list.append("--current-time %s" % (current_time,))
-        if other_args:
-            cmd_list.extend(other_args)
-        cmd_list.extend(arglist)
-        cmdline = " ".join(cmd_list)
-        #print "Running '%s'." % cmdline
-        if passphrase_input is None and not os.environ.has_key('PASSPHRASE'):
-            os.environ['PASSPHRASE'] = 'foobar'
-        (output, return_val) = pexpect.run(cmdline, withexitstatus=True,
-            events={'passphrase.*:': passphrase_input}) 
-        if return_val:
-            raise CmdError(return_val)
-
-    def backup(self, type, input_dir, options = [], **kwargs):
-        """Run duplicity backup to default directory"""
-        options = options[:]
-        if type == "full":
-            options.insert(0, 'full')
-        args = [input_dir, "'%s'" % backend_url]
-        self.run_duplicity(args, options, **kwargs)
-
-    def restore(self, file_to_restore = None, time = None, options = [],
-                **kwargs):
-        options = options[:] # just nip any mutability problems in bud
-        assert not os.system("rm -rf testfiles/restore_out")
-        args = ["'%s'" % backend_url, "testfiles/restore_out"]
-        if file_to_restore:
-            options.extend(['--file-to-restore', file_to_restore])
-        if time:
-            options.extend(['--restore-time', str(time)])
-        self.run_duplicity(args, options, **kwargs)
-
-    def verify(self, dirname, file_to_verify = None, time = None, options = [],
-               **kwargs):
-        options = ["verify"] + options[:]
-        args = ["'%s'" % backend_url, dirname]
-        if file_to_verify:
-            options.extend(['--file-to-restore', file_to_verify])
-        if time:
-            options.extend(['--restore-time', str(time)])
-        self.run_duplicity(args, options, **kwargs)
-
-    def deltmp(self):
-        """Delete temporary directories"""
-        assert not os.system("rm -rf testfiles/output "
-                             "testfiles/restore_out testfiles/cache")
-        assert not os.system("mkdir testfiles/output testfiles/cache")
-        backend = duplicity.backend.get_backend(backend_url)
-        bl = backend.list()
-        if bl:
-            backend.delete(backend.list())
-        backend.close()
-
     def runtest(self, dirlist, backup_options = [], restore_options = []):
         """Run backup/restore test on directories in dirlist"""
         assert len(dirlist) >= 1
-        self.deltmp()
 
         # Back up directories to local backend
         current_time = 100000
@@ -131,9 +49,8 @@
             current_time = 100000*(i + 1)
             self.restore(time = current_time, options = restore_options)
             self.check_same(dirname, "testfiles/restore_out")
-            if verify:
-                self.verify(dirname,
-                            time = current_time, options = restore_options)
+            self.verify(dirname,
+                        time = current_time, options = restore_options)
 
     def check_same(self, filename1, filename2):
         """Verify two filenames are the same"""
@@ -156,28 +73,25 @@
             self.restore(filename, time, options = restore_options)
             self.check_same('testfiles/%s/%s' % (dir, filename),
                             'testfiles/restore_out')
-            if verify:
-                self.verify('testfiles/%s/%s' % (dir, filename),
-                            file_to_verify = filename, time = time,
-                            options = restore_options)
+            self.verify('testfiles/%s/%s' % (dir, filename),
+                        file_to_verify = filename, time = time,
+                        options = restore_options)
 
     def test_asym_cycle(self):
         """Like test_basic_cycle but use asymmetric encryption and signing"""
-        backup_options = ["--encrypt-key " + helper.encrypt_key1,
-                          "--sign-key " + helper.sign_key]
-        restore_options = ["--encrypt-key " + helper.encrypt_key1,
-                           "--sign-key " + helper.sign_key]
-        helper.set_environ("SIGN_PASSPHRASE", helper.sign_passphrase)
+        backup_options = ["--encrypt-key", self.encrypt_key1,
+                          "--sign-key", self.sign_key]
+        restore_options = ["--encrypt-key", self.encrypt_key1,
+                           "--sign-key", self.sign_key]
         self.test_basic_cycle(backup_options = backup_options,
                               restore_options = restore_options)
 
     def test_asym_with_hidden_recipient_cycle(self):
         """Like test_basic_cycle but use asymmetric encryption (hidding key id) and signing"""
-        backup_options = ["--hidden-encrypt-key " + helper.encrypt_key1,
-                          "--sign-key " + helper.sign_key]
-        restore_options = ["--hidden-encrypt-key " + helper.encrypt_key1,
-                           "--sign-key " + helper.sign_key]
-        helper.set_environ("SIGN_PASSPHRASE", helper.sign_passphrase)
+        backup_options = ["--hidden-encrypt-key", self.encrypt_key1,
+                          "--sign-key", self.sign_key]
+        restore_options = ["--hidden-encrypt-key", self.encrypt_key1,
+                           "--sign-key", self.sign_key]
         self.test_basic_cycle(backup_options = backup_options,
                               restore_options = restore_options)
 
@@ -221,7 +135,6 @@
 
     def test_empty_restore(self):
         """Make sure error raised when restore doesn't match anything"""
-        self.deltmp()
         self.backup("full", "testfiles/dir1")
         self.assertRaises(CmdError, self.restore, "this_file_does_not_exist")
         self.backup("inc", "testfiles/empty_dir")
@@ -229,58 +142,40 @@
 
     def test_remove_older_than(self):
         """Test removing old backup chains"""
-        self.deltmp()
-        self.backup("full", "testfiles/dir1", current_time = 10000)
-        self.backup("inc", "testfiles/dir2", current_time = 20000)
-        self.backup("full", "testfiles/dir1", current_time = 30000)
-        self.backup("inc", "testfiles/dir3", current_time = 40000)
-
-        b = duplicity.backend.get_backend(backend_url)
-        commandline.set_archive_dir("testfiles/cache")
-        cs = collections.CollectionsStatus(b, globals.archive_dir).set_values()
-        assert len(cs.all_backup_chains) == 2, cs.all_backup_chains
-        assert cs.matched_chain_pair
-
-        self.run_duplicity(["--force", backend_url], options=["remove-older-than 35000"])
-        cs2 = collections.CollectionsStatus(b, globals.archive_dir).set_values()
-        assert len(cs2.all_backup_chains) == 1, cs.all_backup_chains
-        assert cs2.matched_chain_pair
-        chain = cs2.all_backup_chains[0]
-        assert chain.start_time == 30000, chain.start_time
-        assert chain.end_time == 40000, chain.end_time
+        first_chain = self.backup("full", "testfiles/dir1", current_time = 10000)
+        first_chain |= self.backup("inc", "testfiles/dir2", current_time = 20000)
+        second_chain = self.backup("full", "testfiles/dir1", current_time = 30000)
+        second_chain |= self.backup("inc", "testfiles/dir3", current_time = 40000)
+
+        self.assertEqual(self.get_backend_files(), first_chain | second_chain)
+
+        self.run_duplicity(options=["remove-older-than", "35000", "--force", self.backend_url])
+        self.assertEqual(self.get_backend_files(), second_chain)
 
         # Now check to make sure we can't delete only chain
-        self.run_duplicity(["--force", backend_url], options=["remove-older-than 50000"])
-        cs3 = collections.CollectionsStatus(b, globals.archive_dir).set_values()
-        assert len(cs3.all_backup_chains) == 1
-        assert cs3.matched_chain_pair
-        chain = cs3.all_backup_chains[0]
-        assert chain.start_time == 30000, chain.start_time
-        assert chain.end_time == 40000, chain.end_time
+        self.run_duplicity(options=["remove-older-than", "50000", "--force", self.backend_url])
+        self.assertEqual(self.get_backend_files(), second_chain)
 
     def test_piped_password(self):
         """Make sure that prompting for a password works"""
+        self.set_environ("PASSPHRASE", None)
         self.backup("full", "testfiles/empty_dir",
-                    passphrase_input="foobar\nfoobar\n")
-        self.restore(passphrase_input="foobar\n")
-
-class FinalTest1(FinalTest, unittest.TestCase):
-    def setUp(self):
-        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
-    def tearDown(self):
-        assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
-    globals.old_filenames = False
-
-class FinalTest2(FinalTest, unittest.TestCase):
-    def setUp(self):
-        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
-    def tearDown(self):
-        assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
-    globals.old_filenames = True
+                    passphrase_input=[self.sign_passphrase, self.sign_passphrase])
+        self.restore(passphrase_input=[self.sign_passphrase])
+
+
+class OldFilenamesFinalTest(FinalTest):
+    @classmethod
+    def setUpClass(cls):
+        super(OldFilenamesFinalTest, cls).setUpClass()
+        cls.class_args.extend(["--old-filenames"])
+
+
+class ShortFilenamesFinalTest(FinalTest):
+    @classmethod
+    def setUpClass(cls):
+        super(ShortFilenamesFinalTest, cls).setUpClass()
+        cls.class_args.extend(["--short-filenames"])
 
 if __name__ == "__main__":
     unittest.main()

=== modified file 'testing/tests/restarttest.py'
--- testing/tests/restarttest.py	2013-11-16 02:23:46 +0000
+++ testing/tests/restarttest.py	2014-04-15 23:45:18 +0000
@@ -19,208 +19,48 @@
 # along with duplicity; if not, write to the Free Software Foundation,
 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
-import helper
-import sys, os, unittest
 import glob
+import os
 import subprocess
-
-import duplicity.backend
-from duplicity import path
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = "file://testfiles/output"
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = ["--short-filenames"]
-#other_args = ["--ssh-command 'ssh -v'", "--scp-command 'scp -C'"]
-
-# If this is set to true, after each backup, verify contents
-verify = 1
-
-class CmdError(Exception):
-    """Indicates an error running an external command"""
-    def __init__(self, code):
-        Exception.__init__(self, code)
-        self.exit_status = code
-
-class RestartTest(unittest.TestCase):
+import unittest
+
+from helper import DuplicityTestCase
+
+
+class RestartTest(DuplicityTestCase):
     """
     Test checkpoint/restart using duplicity binary
     """
-    def setUp(self):
-        self.class_args = []
-        assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-        assert not os.system("rm -rf testfiles/output testfiles/largefiles "
-                             "testfiles/restore_out testfiles/cache")
-        assert not os.system("mkdir testfiles/output testfiles/cache")
-        backend = duplicity.backend.get_backend(backend_url)
-        bl = backend.list()
-        if bl:
-            backend.delete(backend.list())
-        backend.close()
-
-    def tearDown(self):
-        helper.set_environ("PASSPHRASE", None)
-        assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
-    def run_duplicity(self, arglist, options = [], current_time = None):
-        """
-        Run duplicity binary with given arguments and options
-        """
-        options.append("--archive-dir testfiles/cache")
-        # We run under setsid and take input from /dev/null (below) because
-        # this way we force a failure if duplicity tries to read from the
-        # console (like for gpg password or such).
-        cmd_list = ["setsid", "duplicity"]
-        cmd_list.extend(options + ["--allow-source-mismatch"])
-        if current_time:
-            cmd_list.append("--current-time %s" % (current_time,))
-        if other_args:
-            cmd_list.extend(other_args)
-        cmd_list.extend(self.class_args)
-        cmd_list.extend(arglist)
-        cmd_list.extend(["<", "/dev/null"])
-        cmdline = " ".join(cmd_list)
-        #print "Running '%s'." % cmdline
-        helper.set_environ('PASSPHRASE', helper.sign_passphrase)
-#        print "CMD: %s" % cmdline
-        return_val = os.system(cmdline)
-        if return_val:
-            raise CmdError(os.WEXITSTATUS(return_val))
-
-    def backup(self, type, input_dir, options = [], current_time = None):
-        """Run duplicity backup to default directory"""
-        options = options[:]
-        if type == "full":
-            options.insert(0, 'full')
-        args = [input_dir, "'%s'" % backend_url]
-        self.run_duplicity(args, options, current_time)
-
-    def restore(self, file_to_restore = None, time = None, options = [],
-                current_time = None):
-        options = options[:] # just nip any mutability problems in bud
-        assert not os.system("rm -rf testfiles/restore_out")
-        args = ["'%s'" % backend_url, "testfiles/restore_out"]
-        if file_to_restore:
-            options.extend(['--file-to-restore', file_to_restore])
-        if time:
-            options.extend(['--restore-time', str(time)])
-        self.run_duplicity(args, options, current_time)
-
-    def verify(self, dirname, file_to_verify = None, time = None, options = [],
-               current_time = None):
-        options = ["verify"] + options[:]
-        args = ["'%s'" % backend_url, dirname]
-        if file_to_verify:
-            options.extend(['--file-to-restore', file_to_verify])
-        if time:
-            options.extend(['--restore-time', str(time)])
-        self.run_duplicity(args, options, current_time)
-
-    def runtest(self, dirlist, backup_options = [], restore_options = []):
-        """
-        Run backup/restore test on directories in dirlist
-        """
-        assert len(dirlist) >= 1
-
-        # Back up directories to local backend
-        current_time = 100000
-        self.backup("full", dirlist[0], current_time = current_time,
-                    options = backup_options)
-        for new_dir in dirlist[1:]:
-            current_time += 100000
-            self.backup("inc", new_dir, current_time = current_time,
-                        options = backup_options)
-
-        # Restore each and compare them
-        for i in range(len(dirlist)):
-            dirname = dirlist[i]
-            current_time = 100000*(i + 1)
-            self.restore(time = current_time, options = restore_options)
-            self.check_same(dirname, "testfiles/restore_out")
-            if verify:
-                self.verify(dirname,
-                            time = current_time, options = restore_options)
-
-    def make_largefiles(self, count=3, size=2):
-        """
-        Makes a number of large files in testfiles/largefiles that each are
-        the specified number of megabytes.
-        """
-        assert not os.system("mkdir testfiles/largefiles")
-        for n in range(count):
-            assert not os.system("dd if=/dev/urandom of=testfiles/largefiles/file%d bs=1024 count=%d > /dev/null 2>&1" % (n + 1, size * 1024))
-
-    def check_same(self, filename1, filename2):
-        """
-        Verify two filenames are the same
-        """
-        path1, path2 = path.Path(filename1), path.Path(filename2)
-        assert path1.compare_recursive(path2, verbose = 1)
-
     def test_basic_checkpoint_restart(self):
         """
         Test basic Checkpoint/Restart
         """
-        excludes = ["--exclude '**/output'",
-                    "--exclude '**/cache'",]
-        # we know we're going to fail this one, its forced
-        try:
-            self.backup("full", "testfiles", options = ["--vol 1", "--fail 1"] + excludes)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # this one should pass OK
-        self.backup("full", "testfiles", options = excludes)
-        self.verify("testfiles", options = excludes)
+        self.make_largefiles()
+        self.backup("full", "testfiles/largefiles", fail=1)
+        self.backup("full", "testfiles/largefiles")
+        self.verify("testfiles/largefiles")
 
     def test_multiple_checkpoint_restart(self):
         """
         Test multiple Checkpoint/Restart
         """
-        excludes = ["--exclude '**/output'",
-                    "--exclude '**/cache'",]
         self.make_largefiles()
-        # we know we're going to fail these, they are forced
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 1"] + excludes)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 2"] + excludes)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"] + excludes)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # this one should pass OK
-        self.backup("full", "testfiles/largefiles", options = excludes)
-        self.verify("testfiles/largefiles", options = excludes)
+        self.backup("full", "testfiles/largefiles", fail=1)
+        self.backup("full", "testfiles/largefiles", fail=2)
+        self.backup("full", "testfiles/largefiles", fail=3)
+        self.backup("full", "testfiles/largefiles")
+        self.verify("testfiles/largefiles")
 
     def test_first_volume_failure(self):
         """
         Test restart when no volumes are available on the remote.
         Caused when duplicity fails before the first transfer.
         """
-        excludes = ["--exclude '**/output'",
-                    "--exclude '**/cache'",]
-        # we know we're going to fail these, they are forced
-        try:
-            self.backup("full", "testfiles", options = ["--vol 1", "--fail 1"] + excludes)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.make_largefiles()
+        self.backup("full", "testfiles/largefiles", fail=1)
         assert not os.system("rm testfiles/output/duplicity-full*difftar*")
-        # this one should pass OK
-        self.backup("full", "testfiles", options = excludes)
-        self.verify("testfiles", options = excludes)
+        self.backup("full", "testfiles/largefiles")
+        self.verify("testfiles/largefiles")
 
     def test_multi_volume_failure(self):
         """
@@ -229,15 +69,9 @@
         fails the last queued transfer(s).
         """
         self.make_largefiles()
-        # we know we're going to fail these, they are forced
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.backup("full", "testfiles/largefiles", fail=3)
         assert not os.system("rm testfiles/output/duplicity-full*vol[23].difftar*")
-        # this one should pass OK
-        self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+        self.backup("full", "testfiles/largefiles")
         self.verify("testfiles/largefiles")
 
     def test_restart_sign_and_encrypt(self):
@@ -246,15 +80,9 @@
         https://bugs.launchpad.net/duplicity/+bug/946988
         """
         self.make_largefiles()
-        enc_opts = ["--sign-key " + helper.sign_key, "--encrypt-key " + helper.sign_key]
-        # Force a failure partway through
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vols 1", "--fail 2"] + enc_opts)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # Now finish that backup
-        self.backup("full", "testfiles/largefiles", options = enc_opts)
+        enc_opts = ["--sign-key", self.sign_key, "--encrypt-key", self.sign_key]
+        self.backup("full", "testfiles/largefiles", options=enc_opts, fail=2)
+        self.backup("full", "testfiles/largefiles", options=enc_opts)
         self.verify("testfiles/largefiles")
 
     def test_restart_sign_and_hidden_encrypt(self):
@@ -263,15 +91,9 @@
         https://bugs.launchpad.net/duplicity/+bug/946988
         """
         self.make_largefiles()
-        enc_opts = ["--sign-key " + helper.sign_key, "--hidden-encrypt-key " + helper.sign_key]
-        # Force a failure partway through
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vols 1", "--fail 2"] + enc_opts)
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # Now finish that backup
-        self.backup("full", "testfiles/largefiles", options = enc_opts)
+        enc_opts = ["--sign-key", self.sign_key, "--hidden-encrypt-key", self.sign_key]
+        self.backup("full", "testfiles/largefiles", options=enc_opts, fail=2)
+        self.backup("full", "testfiles/largefiles", options=enc_opts)
         self.verify("testfiles/largefiles")
 
     def test_last_file_missing_in_middle(self):
@@ -281,15 +103,9 @@
         the file in the middle of the backup, with files following.
         """
         self.make_largefiles()
-        # we know we're going to fail, it's forced
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.backup("full", "testfiles/largefiles", fail=3)
         assert not os.system("rm testfiles/largefiles/file2")
-        # this one should pass OK
-        self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+        self.backup("full", "testfiles/largefiles")
         #TODO: we can't verify but we need to to check for other errors that might show up
         # there should be 2 differences found, one missing file, one mtime change
         #self.verify("testfiles/largefiles")
@@ -301,15 +117,9 @@
         the file at the end of the backup, with no files following.
         """
         self.make_largefiles()
-        # we know we're going to fail, it's forced
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 6"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.backup("full", "testfiles/largefiles", fail=6)
         assert not os.system("rm testfiles/largefiles/file3")
-        # this one should pass OK
-        self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+        self.backup("full", "testfiles/largefiles")
         #TODO: we can't verify but we need to to check for other errors that might show up
         # there should be 2 differences found, one missing file, one mtime change
         #self.verify("testfiles/largefiles")
@@ -318,16 +128,9 @@
         """
         Test restarting an incremental backup
         """
-        # Make first normal full backup
+        self.make_largefiles()
         self.backup("full", "testfiles/dir1")
-        self.make_largefiles()
-        # Force a failure partway through
-        try:
-            self.backup("inc", "testfiles/largefiles", options = ["--vols 1", "--fail 2"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # Now finish that incremental
+        self.backup("inc", "testfiles/largefiles", fail=2)
         self.backup("inc", "testfiles/largefiles")
         self.verify("testfiles/largefiles")
 
@@ -394,14 +197,15 @@
         """
         source = 'testfiles/largefiles'
         self.make_largefiles(count=1, size=1)
-        self.backup("full", source, options=["--name=backup1"])
+        self.backup("full", source, options=["--volsize=5", "--name=backup1"])
         # Fake an interruption
         self.make_fake_second_volume("backup1")
         # Add new file
         assert not os.system("cp %s/file1 %s/newfile" % (source, source))
         # 'restart' the backup
-        self.backup("full", source, options=["--name=backup1"])
+        self.backup("full", source, options=["--volsize=5", "--name=backup1"])
         # Confirm we actually resumed the previous backup
+        print os.listdir("testfiles/output")
         self.assertEqual(len(os.listdir("testfiles/output")), 4)
         # Now make sure everything is byte-for-byte the same once restored
         self.restore()
@@ -414,11 +218,11 @@
         """
         source = 'testfiles/largefiles'
         self.make_largefiles(count=1, size=3)
-        self.backup("full", source, options=["--vols 1", "--name=backup1"])
+        self.backup("full", source, options=["--name=backup1"])
         # Fake an interruption
         self.make_fake_second_volume("backup1")
         # 'restart' the backup
-        self.backup("full", source, options=["--vols 1", "--name=backup1"])
+        self.backup("full", source, options=["--name=backup1"])
         # Now make sure everything is byte-for-byte the same once restored
         self.restore()
         assert not os.system("diff -r %s testfiles/restore_out" % source)
@@ -456,12 +260,7 @@
         """
         source = 'testfiles/largefiles'
         self.make_largefiles(count=5, size=1)
-        # intentionally interrupt initial backup
-        try:
-            self.backup("full", source, options = ["--vol 1", "--fail 3"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.backup("full", source, fail=3)
         # now delete the last volume on remote end and some source files
         assert not os.system("rm testfiles/output/duplicity-full*vol3.difftar*")
         assert not os.system("rm %s/file[2345]" % source)
@@ -480,12 +279,7 @@
         """
         source = 'testfiles/largefiles'
         self.make_largefiles(count=1)
-        # intentionally interrupt initial backup
-        try:
-            self.backup("full", source, options = ["--vol 1", "--fail 2"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
+        self.backup("full", source, fail=2)
         # now remove starting source data and make sure we add something after
         assert not os.system("rm %s/*" % source)
         assert not os.system("echo hello > %s/z" % source)
@@ -498,9 +292,10 @@
 
 # Note that this class duplicates all the tests in RestartTest
 class RestartTestWithoutEncryption(RestartTest):
-    def setUp(self):
-        RestartTest.setUp(self)
-        self.class_args.extend(["--no-encryption"])
+    @classmethod
+    def setUpClass(cls):
+        super(RestartTestWithoutEncryption, cls).setUpClass()
+        cls.class_args.extend(["--no-encryption"])
 
     def test_no_write_double_snapshot(self):
         """
@@ -510,13 +305,7 @@
         https://launchpad.net/bugs/929067
         """
         self.make_largefiles()
-        # Start backup
-        try:
-            self.backup("full", "testfiles/largefiles", options = ["--fail 2", "--vols 1"])
-            self.fail()
-        except CmdError, e:
-            self.assertEqual(30, e.exit_status)
-        # Finish it
+        self.backup("full", "testfiles/largefiles", fail=2)
         self.backup("full", "testfiles/largefiles")
         # Now check sigtar
         sigtars = glob.glob("testfiles/output/duplicity-full*.sigtar.gz")


Follow ups