duplicity-team team mailing list archive
-
duplicity-team team
-
Mailing list archive
-
Message #02091
[Merge] lp:~mterry/duplicity/consolidate-tests into lp:duplicity
Michael Terry has proposed merging lp:~mterry/duplicity/consolidate-tests into lp:duplicity.
Requested reviews:
duplicity-team (duplicity-team)
For more details, see:
https://code.launchpad.net/~mterry/duplicity/consolidate-tests/+merge/215986
Consolidate all the duplicity-running code in the test framework.
This takes the four test files [1] that run duplicity itself (i.e. the high-level functional test files) and consolidates their code. Before, it was madness. Each one had its own run_duplicity method, with its own slightly tweaked features. This should reduce a lot of duplication.
[1] cleanuptest.py, finaltest.py, restarttest.py, and badupload.py
--
https://code.launchpad.net/~mterry/duplicity/consolidate-tests/+merge/215986
Your team duplicity-team is requested to review the proposed merge of lp:~mterry/duplicity/consolidate-tests into lp:duplicity.
=== modified file 'testing/helpers/helper.py'
--- testing/helpers/helper.py 2011-11-04 19:18:51 +0000
+++ testing/helpers/helper.py 2014-04-15 23:45:18 +0000
@@ -20,15 +20,20 @@
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import os
+import time
+import unittest
+
from duplicity import backend
from duplicity import globals
from duplicity import log
+from duplicity import pexpect
sign_key = '56538CCF'
sign_passphrase = 'test'
encrypt_key1 = 'B5FA894F'
encrypt_key2 = '9B736B2A'
+# TODO: remove this method
def setup():
""" setup for unit tests """
log.setup()
@@ -36,11 +41,143 @@
globals.print_statistics = 0
backend.import_backends()
-def set_environ(varname, value):
- if value is not None:
- os.environ[varname] = value
- else:
- try:
- del os.environ[varname]
- except Exception:
- pass
+
+class CmdError(Exception):
+ """Indicates an error running an external command"""
+ def __init__(self, code):
+ Exception.__init__(self, code)
+ self.exit_status = code
+
+
+class DuplicityTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.class_args = []
+ cls.backend_url = "file://testfiles/output"
+ cls.sign_key = sign_key
+ cls.sign_passphrase = sign_passphrase
+ cls.encrypt_key1 = encrypt_key1
+ cls.encrypt_key2 = encrypt_key2
+ setup()
+
+ def setUp(self):
+ assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
+ assert not os.system("rm -rf testfiles/output testfiles/largefiles "
+ "testfiles/restore_out testfiles/cache")
+ assert not os.system("mkdir testfiles/output testfiles/cache")
+
+ backend_inst = backend.get_backend(self.backend_url)
+ bl = backend_inst.list()
+ if bl:
+ backend_inst.delete(backend_inst.list())
+ backend_inst.close()
+
+ self.last_backup = None
+ self.set_environ('PASSPHRASE', self.sign_passphrase)
+ self.set_environ("SIGN_PASSPHRASE", self.sign_passphrase)
+
+ def tearDown(self):
+ self.set_environ("PASSPHRASE", None)
+ assert not os.system("rm -rf testfiles tempdir temp2.tar")
+
+ def set_environ(self, varname, value):
+ if value is not None:
+ os.environ[varname] = value
+ else:
+ try:
+ del os.environ[varname]
+ except Exception:
+ pass
+
+ def run_duplicity(self, options=[], current_time=None, fail=None,
+ passphrase_input=[]):
+ """
+ Run duplicity binary with given arguments and options
+ """
+ # We run under setsid and take input from /dev/null (below) because
+ # this way we force a failure if duplicity tries to read from the
+ # console unexpectedly (like for gpg password or such).
+ cmd_list = ["setsid", "duplicity"]
+ cmd_list.extend(options)
+ cmd_list.extend(["-v0"])
+ cmd_list.extend(["--no-print-statistics"])
+ cmd_list.extend(["--allow-source-mismatch"])
+ cmd_list.extend(["--archive-dir=testfiles/cache"])
+ if current_time:
+ cmd_list.extend(["--current-time", current_time])
+ cmd_list.extend(self.class_args)
+ if fail:
+ cmd_list.extend(["--fail", str(fail)])
+ cmdline = " ".join(map(lambda x: '"%s"' % x, cmd_list))
+
+ if not passphrase_input:
+ cmdline += " < /dev/null"
+ child = pexpect.spawn('/bin/sh', ['-c', cmdline])
+ for passphrase in passphrase_input:
+ child.expect('passphrase.*:')
+ child.sendline(passphrase)
+ child.wait()
+ return_val = child.exitstatus
+
+ #print "Ran duplicity command: ", cmdline, "\n with return_val: ", child.exitstatus
+ if fail:
+ self.assertEqual(30, child.exitstatus)
+ elif return_val:
+ raise CmdError(child.exitstatus)
+
+ def backup(self, type, input_dir, options=[], **kwargs):
+ """Run duplicity backup to default directory"""
+ options = [type, input_dir, self.backend_url, "--volsize", "1"] + options
+ before_files = self.get_backend_files()
+
+ # If a chain ends with time X and the next full chain begins at time X,
+ # we may trigger an assert in collections.py. If needed, sleep to
+ # avoid such problems
+ if self.last_backup == int(time.time()):
+ time.sleep(1)
+
+ result = self.run_duplicity(options=options, **kwargs)
+ self.last_backup = int(time.time())
+
+ after_files = self.get_backend_files()
+ return after_files - before_files
+
+ def restore(self, file_to_restore=None, time=None, options=[], **kwargs):
+ assert not os.system("rm -rf testfiles/restore_out")
+ options = [self.backend_url, "testfiles/restore_out"] + options
+ if file_to_restore:
+ options.extend(['--file-to-restore', file_to_restore])
+ if time:
+ options.extend(['--restore-time', str(time)])
+ self.run_duplicity(options=options, **kwargs)
+
+ def verify(self, dirname, file_to_verify=None, time=None, options=[],
+ **kwargs):
+ options = ["verify", self.backend_url, dirname] + options
+ if file_to_verify:
+ options.extend(['--file-to-restore', file_to_verify])
+ if time:
+ options.extend(['--restore-time', str(time)])
+ self.run_duplicity(options=options, **kwargs)
+
+ def cleanup(self, options=[]):
+ """
+ Run duplicity cleanup to default directory
+ """
+ options = ["cleanup", self.backend_url, "--force"] + options
+ self.run_duplicity(options=options)
+
+ def get_backend_files(self):
+ backend_inst = backend.get_backend(self.backend_url)
+ bl = backend_inst.list()
+ backend_inst.close()
+ return set(bl)
+
+ def make_largefiles(self, count=3, size=2):
+ """
+ Makes a number of large files in testfiles/largefiles that each are
+ the specified number of megabytes.
+ """
+ assert not os.system("mkdir testfiles/largefiles")
+ for n in range(count):
+ assert not os.system("dd if=/dev/urandom of=testfiles/largefiles/file%d bs=1024 count=%d > /dev/null 2>&1" % (n + 1, size * 1024))
=== modified file 'testing/tests/badupload.py'
--- testing/tests/badupload.py 2011-11-07 15:08:24 +0000
+++ testing/tests/badupload.py 2014-04-15 23:45:18 +0000
@@ -20,63 +20,24 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import helper
-import os, unittest, sys
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = 'file://testfiles/output'
-
-class CmdError(Exception):
- """Indicates an error running an external command"""
- return_val = -1
- def __init__(self, return_val):
- self.return_val = os.WEXITSTATUS(return_val)
-
-class BadUploadTest(unittest.TestCase):
+import unittest
+
+from helper import CmdError, DuplicityTestCase
+
+
+class BadUploadTest(DuplicityTestCase):
"""
Test missing volume upload using duplicity binary
"""
- def setUp(self):
- assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
- def tearDown(self):
- assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
- def run_duplicity(self, arglist, options = []):
- """
- Run duplicity binary with given arguments and options
- """
- options.append("--archive-dir testfiles/cache")
- cmd_list = ["duplicity"]
- cmd_list.extend(options + ["--allow-source-mismatch"])
- cmd_list.extend(arglist)
- cmdline = " ".join(cmd_list)
- if not os.environ.has_key('PASSPHRASE'):
- os.environ['PASSPHRASE'] = 'foobar'
- return_val = os.system(cmdline)
- if return_val:
- raise CmdError(return_val)
-
- def backup(self, type, input_dir, options = []):
- """Run duplicity backup to default directory"""
- options = options[:]
- if type == "full":
- options.insert(0, 'full')
- args = [input_dir, "'%s'" % backend_url]
- self.run_duplicity(args, options)
-
def test_missing_file(self):
"""
Test basic lost file
"""
- # we know we're going to fail this one, its forced
try:
- self.backup("full", "testfiles/dir1", options = ["--skip-volume 1"])
- assert False # shouldn't get this far
+ self.backup("full", "testfiles/dir1", options=["--skip-volume=1"])
+ self.fail()
except CmdError, e:
- assert e.return_val == 44, e.return_val
+ self.assertEqual(e.exit_status, 44)
if __name__ == "__main__":
unittest.main()
=== modified file 'testing/tests/cleanuptest.py'
--- testing/tests/cleanuptest.py 2012-09-30 21:18:32 +0000
+++ testing/tests/cleanuptest.py 2014-04-15 23:45:18 +0000
@@ -19,125 +19,35 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import helper
-import sys, os, unittest, time
-
-import duplicity.backend
-from duplicity import path
-
-helper.setup()
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = []
-
-class CmdError(Exception):
- """Indicates an error running an external command"""
- pass
-
-class CleanupTest(unittest.TestCase):
+import unittest
+
+from helper import DuplicityTestCase
+
+
+class CleanupTest(DuplicityTestCase):
"""
Test cleanup using duplicity binary
"""
- def setUp(self):
- assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
- self.deltmp()
-
- def tearDown(self):
- assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
- def run_duplicity(self, arglist, options = [], current_time = None):
- """
- Run duplicity binary with given arguments and options
- """
- before_files = set(os.listdir("testfiles/output"))
- options.append("--archive-dir testfiles/cache")
- cmd_list = ["duplicity"]
- cmd_list.extend(options + ["--allow-source-mismatch"])
- if current_time:
- cmd_list.append("--current-time %s" % (current_time,))
- if other_args:
- cmd_list.extend(other_args)
- cmd_list.extend(arglist)
- cmdline = " ".join(cmd_list)
- #print "Running '%s'." % cmdline
- if not os.environ.has_key('PASSPHRASE'):
- os.environ['PASSPHRASE'] = 'foobar'
-# print "CMD: %s" % cmdline
- return_val = os.system(cmdline)
- if return_val:
- raise CmdError(return_val)
- after_files = set(os.listdir("testfiles/output"))
- return after_files - before_files
-
- def backup(self, type, input_dir, options = [], current_time = None):
- """
- Run duplicity backup to default directory
- """
- options = options[:]
- if type == "full":
- options.insert(0, 'full')
- args = [input_dir, "file://testfiles/output"]
- new_files = self.run_duplicity(args, options, current_time)
- # If a chain ends with time X and the next full chain begins at time X,
- # we may trigger an assert in collections.py. This way, we avoid
- # such problems
- time.sleep(1)
- return new_files
-
- def verify(self, dirname, file_to_verify = None, time = None, options = [],
- current_time = None):
- options = ["verify"] + options[:]
- args = ["file://testfiles/output", dirname]
- if file_to_verify:
- options.extend(['--file-to-restore', file_to_verify])
- if time:
- options.extend(['--restore-time', str(time)])
- return self.run_duplicity(args, options, current_time)
-
- def cleanup(self, options = []):
- """
- Run duplicity cleanup to default directory
- """
- options = ["cleanup"] + options[:]
- args = ["file://testfiles/output"]
- return self.run_duplicity(args, options)
-
- def deltmp(self):
- """
- Delete temporary directories
- """
- assert not os.system("rm -rf testfiles/output testfiles/cache")
- assert not os.system("mkdir testfiles/output testfiles/cache")
- backend = duplicity.backend.get_backend("file://testfiles/output")
- bl = backend.list()
- if bl:
- backend.delete(backend.list())
- backend.close()
-
def test_cleanup_after_partial(self):
"""
Regression test for https://bugs.launchpad.net/bugs/409593
where duplicity deletes all the signatures during a cleanup
after a failed backup.
"""
- good_files = self.backup("full", "/bin", options = ["--vol 1"])
- good_files |= self.backup("inc", "/bin", options = ["--vol 1"])
- good_files |= self.backup("inc", "/bin", options = ["--vol 1"])
- # we know we're going to fail these, they are forced
- try:
- self.backup("full", "/bin", options = ["--vol 1", "--fail 1"])
- self.fail("Not supposed to reach this far")
- except CmdError:
- bad_files = set(os.listdir("testfiles/output"))
- bad_files -= good_files
- self.assertNotEqual(bad_files, set())
+ self.make_largefiles()
+ good_files = self.backup("full", "testfiles/largefiles")
+ good_files |= self.backup("inc", "testfiles/largefiles")
+ good_files |= self.backup("inc", "testfiles/largefiles")
+ self.backup("full", "testfiles/largefiles", fail=1)
+ bad_files = self.get_backend_files()
+ bad_files -= good_files
+ self.assertNotEqual(bad_files, set())
# the cleanup should go OK
- self.cleanup(options = ["--force"])
- leftovers = set(os.listdir("testfiles/output"))
+ self.run_duplicity(options=["cleanup", self.backend_url, "--force"])
+ leftovers = self.get_backend_files()
self.assertEqual(good_files, leftovers)
- self.backup("inc", "/bin", options = ["--vol 1"])
- self.verify("/bin")
+ self.backup("inc", "testfiles/largefiles")
+ self.verify("testfiles/largefiles")
def test_remove_all_but_n(self):
"""
@@ -145,9 +55,8 @@
"""
full1_files = self.backup("full", "testfiles/empty_dir")
full2_files = self.backup("full", "testfiles/empty_dir")
- self.run_duplicity(["file://testfiles/output"],
- ["remove-all-but-n", "1", "--force"])
- leftovers = set(os.listdir("testfiles/output"))
+ self.run_duplicity(options=["remove-all-but-n", "1", self.backend_url, "--force"])
+ leftovers = self.get_backend_files()
self.assertEqual(full2_files, leftovers)
def test_remove_all_inc_of_but_n(self):
@@ -157,11 +66,9 @@
full1_files = self.backup("full", "testfiles/empty_dir")
inc1_files = self.backup("inc", "testfiles/empty_dir")
full2_files = self.backup("full", "testfiles/empty_dir")
- self.run_duplicity(["file://testfiles/output"],
- ["remove-all-inc-of-but-n-full", "1", "--force"])
- leftovers = set(os.listdir("testfiles/output"))
+ self.run_duplicity(options=["remove-all-inc-of-but-n-full", "1", self.backend_url, "--force"])
+ leftovers = self.get_backend_files()
self.assertEqual(full1_files | full2_files, leftovers)
-
if __name__ == "__main__":
unittest.main()
=== modified file 'testing/tests/finaltest.py'
--- testing/tests/finaltest.py 2014-02-22 16:03:22 +0000
+++ testing/tests/finaltest.py 2014-04-15 23:45:18 +0000
@@ -19,102 +19,20 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import helper
-import sys, os, unittest
+import os
+import unittest
-import duplicity.backend
from duplicity import path
-from duplicity import collections
-from duplicity import commandline
-from duplicity import globals
-from duplicity import pexpect
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = "file://testfiles/output"
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = ["--short-filenames"]
-#other_args = ["--ssh-command 'ssh -v'", "--scp-command 'scp -C'"]
-#other_args = ['--no-encryption']
-
-# If this is set to true, after each backup, verify contents
-verify = 1
-
-class CmdError(Exception):
- """Indicates an error running an external command"""
- pass
-
-class FinalTest:
+from helper import CmdError, DuplicityTestCase
+
+
+class FinalTest(DuplicityTestCase):
"""
Test backup/restore using duplicity binary
"""
- def run_duplicity(self, arglist, options = [], current_time = None,
- passphrase_input = None):
- """Run duplicity binary with given arguments and options"""
- options.append("--archive-dir testfiles/cache")
- cmd_list = ["duplicity"]
- cmd_list.extend(options + ["--allow-source-mismatch"])
- if current_time:
- cmd_list.append("--current-time %s" % (current_time,))
- if other_args:
- cmd_list.extend(other_args)
- cmd_list.extend(arglist)
- cmdline = " ".join(cmd_list)
- #print "Running '%s'." % cmdline
- if passphrase_input is None and not os.environ.has_key('PASSPHRASE'):
- os.environ['PASSPHRASE'] = 'foobar'
- (output, return_val) = pexpect.run(cmdline, withexitstatus=True,
- events={'passphrase.*:': passphrase_input})
- if return_val:
- raise CmdError(return_val)
-
- def backup(self, type, input_dir, options = [], **kwargs):
- """Run duplicity backup to default directory"""
- options = options[:]
- if type == "full":
- options.insert(0, 'full')
- args = [input_dir, "'%s'" % backend_url]
- self.run_duplicity(args, options, **kwargs)
-
- def restore(self, file_to_restore = None, time = None, options = [],
- **kwargs):
- options = options[:] # just nip any mutability problems in bud
- assert not os.system("rm -rf testfiles/restore_out")
- args = ["'%s'" % backend_url, "testfiles/restore_out"]
- if file_to_restore:
- options.extend(['--file-to-restore', file_to_restore])
- if time:
- options.extend(['--restore-time', str(time)])
- self.run_duplicity(args, options, **kwargs)
-
- def verify(self, dirname, file_to_verify = None, time = None, options = [],
- **kwargs):
- options = ["verify"] + options[:]
- args = ["'%s'" % backend_url, dirname]
- if file_to_verify:
- options.extend(['--file-to-restore', file_to_verify])
- if time:
- options.extend(['--restore-time', str(time)])
- self.run_duplicity(args, options, **kwargs)
-
- def deltmp(self):
- """Delete temporary directories"""
- assert not os.system("rm -rf testfiles/output "
- "testfiles/restore_out testfiles/cache")
- assert not os.system("mkdir testfiles/output testfiles/cache")
- backend = duplicity.backend.get_backend(backend_url)
- bl = backend.list()
- if bl:
- backend.delete(backend.list())
- backend.close()
-
def runtest(self, dirlist, backup_options = [], restore_options = []):
"""Run backup/restore test on directories in dirlist"""
assert len(dirlist) >= 1
- self.deltmp()
# Back up directories to local backend
current_time = 100000
@@ -131,9 +49,8 @@
current_time = 100000*(i + 1)
self.restore(time = current_time, options = restore_options)
self.check_same(dirname, "testfiles/restore_out")
- if verify:
- self.verify(dirname,
- time = current_time, options = restore_options)
+ self.verify(dirname,
+ time = current_time, options = restore_options)
def check_same(self, filename1, filename2):
"""Verify two filenames are the same"""
@@ -156,28 +73,25 @@
self.restore(filename, time, options = restore_options)
self.check_same('testfiles/%s/%s' % (dir, filename),
'testfiles/restore_out')
- if verify:
- self.verify('testfiles/%s/%s' % (dir, filename),
- file_to_verify = filename, time = time,
- options = restore_options)
+ self.verify('testfiles/%s/%s' % (dir, filename),
+ file_to_verify = filename, time = time,
+ options = restore_options)
def test_asym_cycle(self):
"""Like test_basic_cycle but use asymmetric encryption and signing"""
- backup_options = ["--encrypt-key " + helper.encrypt_key1,
- "--sign-key " + helper.sign_key]
- restore_options = ["--encrypt-key " + helper.encrypt_key1,
- "--sign-key " + helper.sign_key]
- helper.set_environ("SIGN_PASSPHRASE", helper.sign_passphrase)
+ backup_options = ["--encrypt-key", self.encrypt_key1,
+ "--sign-key", self.sign_key]
+ restore_options = ["--encrypt-key", self.encrypt_key1,
+ "--sign-key", self.sign_key]
self.test_basic_cycle(backup_options = backup_options,
restore_options = restore_options)
def test_asym_with_hidden_recipient_cycle(self):
"""Like test_basic_cycle but use asymmetric encryption (hidding key id) and signing"""
- backup_options = ["--hidden-encrypt-key " + helper.encrypt_key1,
- "--sign-key " + helper.sign_key]
- restore_options = ["--hidden-encrypt-key " + helper.encrypt_key1,
- "--sign-key " + helper.sign_key]
- helper.set_environ("SIGN_PASSPHRASE", helper.sign_passphrase)
+ backup_options = ["--hidden-encrypt-key", self.encrypt_key1,
+ "--sign-key", self.sign_key]
+ restore_options = ["--hidden-encrypt-key", self.encrypt_key1,
+ "--sign-key", self.sign_key]
self.test_basic_cycle(backup_options = backup_options,
restore_options = restore_options)
@@ -221,7 +135,6 @@
def test_empty_restore(self):
"""Make sure error raised when restore doesn't match anything"""
- self.deltmp()
self.backup("full", "testfiles/dir1")
self.assertRaises(CmdError, self.restore, "this_file_does_not_exist")
self.backup("inc", "testfiles/empty_dir")
@@ -229,58 +142,40 @@
def test_remove_older_than(self):
"""Test removing old backup chains"""
- self.deltmp()
- self.backup("full", "testfiles/dir1", current_time = 10000)
- self.backup("inc", "testfiles/dir2", current_time = 20000)
- self.backup("full", "testfiles/dir1", current_time = 30000)
- self.backup("inc", "testfiles/dir3", current_time = 40000)
-
- b = duplicity.backend.get_backend(backend_url)
- commandline.set_archive_dir("testfiles/cache")
- cs = collections.CollectionsStatus(b, globals.archive_dir).set_values()
- assert len(cs.all_backup_chains) == 2, cs.all_backup_chains
- assert cs.matched_chain_pair
-
- self.run_duplicity(["--force", backend_url], options=["remove-older-than 35000"])
- cs2 = collections.CollectionsStatus(b, globals.archive_dir).set_values()
- assert len(cs2.all_backup_chains) == 1, cs.all_backup_chains
- assert cs2.matched_chain_pair
- chain = cs2.all_backup_chains[0]
- assert chain.start_time == 30000, chain.start_time
- assert chain.end_time == 40000, chain.end_time
+ first_chain = self.backup("full", "testfiles/dir1", current_time = 10000)
+ first_chain |= self.backup("inc", "testfiles/dir2", current_time = 20000)
+ second_chain = self.backup("full", "testfiles/dir1", current_time = 30000)
+ second_chain |= self.backup("inc", "testfiles/dir3", current_time = 40000)
+
+ self.assertEqual(self.get_backend_files(), first_chain | second_chain)
+
+ self.run_duplicity(options=["remove-older-than", "35000", "--force", self.backend_url])
+ self.assertEqual(self.get_backend_files(), second_chain)
# Now check to make sure we can't delete only chain
- self.run_duplicity(["--force", backend_url], options=["remove-older-than 50000"])
- cs3 = collections.CollectionsStatus(b, globals.archive_dir).set_values()
- assert len(cs3.all_backup_chains) == 1
- assert cs3.matched_chain_pair
- chain = cs3.all_backup_chains[0]
- assert chain.start_time == 30000, chain.start_time
- assert chain.end_time == 40000, chain.end_time
+ self.run_duplicity(options=["remove-older-than", "50000", "--force", self.backend_url])
+ self.assertEqual(self.get_backend_files(), second_chain)
def test_piped_password(self):
"""Make sure that prompting for a password works"""
+ self.set_environ("PASSPHRASE", None)
self.backup("full", "testfiles/empty_dir",
- passphrase_input="foobar\nfoobar\n")
- self.restore(passphrase_input="foobar\n")
-
-class FinalTest1(FinalTest, unittest.TestCase):
- def setUp(self):
- assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
- def tearDown(self):
- assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
- globals.old_filenames = False
-
-class FinalTest2(FinalTest, unittest.TestCase):
- def setUp(self):
- assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
-
- def tearDown(self):
- assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
- globals.old_filenames = True
+ passphrase_input=[self.sign_passphrase, self.sign_passphrase])
+ self.restore(passphrase_input=[self.sign_passphrase])
+
+
+class OldFilenamesFinalTest(FinalTest):
+ @classmethod
+ def setUpClass(cls):
+ super(OldFilenamesFinalTest, cls).setUpClass()
+ cls.class_args.extend(["--old-filenames"])
+
+
+class ShortFilenamesFinalTest(FinalTest):
+ @classmethod
+ def setUpClass(cls):
+ super(ShortFilenamesFinalTest, cls).setUpClass()
+ cls.class_args.extend(["--short-filenames"])
if __name__ == "__main__":
unittest.main()
=== modified file 'testing/tests/restarttest.py'
--- testing/tests/restarttest.py 2013-11-16 02:23:46 +0000
+++ testing/tests/restarttest.py 2014-04-15 23:45:18 +0000
@@ -19,208 +19,48 @@
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import helper
-import sys, os, unittest
import glob
+import os
import subprocess
-
-import duplicity.backend
-from duplicity import path
-
-helper.setup()
-
-# This can be changed to select the URL to use
-backend_url = "file://testfiles/output"
-
-# Extra arguments to be passed to duplicity
-other_args = ["-v0", "--no-print-statistics"]
-#other_args = ["--short-filenames"]
-#other_args = ["--ssh-command 'ssh -v'", "--scp-command 'scp -C'"]
-
-# If this is set to true, after each backup, verify contents
-verify = 1
-
-class CmdError(Exception):
- """Indicates an error running an external command"""
- def __init__(self, code):
- Exception.__init__(self, code)
- self.exit_status = code
-
-class RestartTest(unittest.TestCase):
+import unittest
+
+from helper import DuplicityTestCase
+
+
+class RestartTest(DuplicityTestCase):
"""
Test checkpoint/restart using duplicity binary
"""
- def setUp(self):
- self.class_args = []
- assert not os.system("tar xzf testfiles.tar.gz > /dev/null 2>&1")
- assert not os.system("rm -rf testfiles/output testfiles/largefiles "
- "testfiles/restore_out testfiles/cache")
- assert not os.system("mkdir testfiles/output testfiles/cache")
- backend = duplicity.backend.get_backend(backend_url)
- bl = backend.list()
- if bl:
- backend.delete(backend.list())
- backend.close()
-
- def tearDown(self):
- helper.set_environ("PASSPHRASE", None)
- assert not os.system("rm -rf testfiles tempdir temp2.tar")
-
- def run_duplicity(self, arglist, options = [], current_time = None):
- """
- Run duplicity binary with given arguments and options
- """
- options.append("--archive-dir testfiles/cache")
- # We run under setsid and take input from /dev/null (below) because
- # this way we force a failure if duplicity tries to read from the
- # console (like for gpg password or such).
- cmd_list = ["setsid", "duplicity"]
- cmd_list.extend(options + ["--allow-source-mismatch"])
- if current_time:
- cmd_list.append("--current-time %s" % (current_time,))
- if other_args:
- cmd_list.extend(other_args)
- cmd_list.extend(self.class_args)
- cmd_list.extend(arglist)
- cmd_list.extend(["<", "/dev/null"])
- cmdline = " ".join(cmd_list)
- #print "Running '%s'." % cmdline
- helper.set_environ('PASSPHRASE', helper.sign_passphrase)
-# print "CMD: %s" % cmdline
- return_val = os.system(cmdline)
- if return_val:
- raise CmdError(os.WEXITSTATUS(return_val))
-
- def backup(self, type, input_dir, options = [], current_time = None):
- """Run duplicity backup to default directory"""
- options = options[:]
- if type == "full":
- options.insert(0, 'full')
- args = [input_dir, "'%s'" % backend_url]
- self.run_duplicity(args, options, current_time)
-
- def restore(self, file_to_restore = None, time = None, options = [],
- current_time = None):
- options = options[:] # just nip any mutability problems in bud
- assert not os.system("rm -rf testfiles/restore_out")
- args = ["'%s'" % backend_url, "testfiles/restore_out"]
- if file_to_restore:
- options.extend(['--file-to-restore', file_to_restore])
- if time:
- options.extend(['--restore-time', str(time)])
- self.run_duplicity(args, options, current_time)
-
- def verify(self, dirname, file_to_verify = None, time = None, options = [],
- current_time = None):
- options = ["verify"] + options[:]
- args = ["'%s'" % backend_url, dirname]
- if file_to_verify:
- options.extend(['--file-to-restore', file_to_verify])
- if time:
- options.extend(['--restore-time', str(time)])
- self.run_duplicity(args, options, current_time)
-
- def runtest(self, dirlist, backup_options = [], restore_options = []):
- """
- Run backup/restore test on directories in dirlist
- """
- assert len(dirlist) >= 1
-
- # Back up directories to local backend
- current_time = 100000
- self.backup("full", dirlist[0], current_time = current_time,
- options = backup_options)
- for new_dir in dirlist[1:]:
- current_time += 100000
- self.backup("inc", new_dir, current_time = current_time,
- options = backup_options)
-
- # Restore each and compare them
- for i in range(len(dirlist)):
- dirname = dirlist[i]
- current_time = 100000*(i + 1)
- self.restore(time = current_time, options = restore_options)
- self.check_same(dirname, "testfiles/restore_out")
- if verify:
- self.verify(dirname,
- time = current_time, options = restore_options)
-
- def make_largefiles(self, count=3, size=2):
- """
- Makes a number of large files in testfiles/largefiles that each are
- the specified number of megabytes.
- """
- assert not os.system("mkdir testfiles/largefiles")
- for n in range(count):
- assert not os.system("dd if=/dev/urandom of=testfiles/largefiles/file%d bs=1024 count=%d > /dev/null 2>&1" % (n + 1, size * 1024))
-
- def check_same(self, filename1, filename2):
- """
- Verify two filenames are the same
- """
- path1, path2 = path.Path(filename1), path.Path(filename2)
- assert path1.compare_recursive(path2, verbose = 1)
-
def test_basic_checkpoint_restart(self):
"""
Test basic Checkpoint/Restart
"""
- excludes = ["--exclude '**/output'",
- "--exclude '**/cache'",]
- # we know we're going to fail this one, its forced
- try:
- self.backup("full", "testfiles", options = ["--vol 1", "--fail 1"] + excludes)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # this one should pass OK
- self.backup("full", "testfiles", options = excludes)
- self.verify("testfiles", options = excludes)
+ self.make_largefiles()
+ self.backup("full", "testfiles/largefiles", fail=1)
+ self.backup("full", "testfiles/largefiles")
+ self.verify("testfiles/largefiles")
def test_multiple_checkpoint_restart(self):
"""
Test multiple Checkpoint/Restart
"""
- excludes = ["--exclude '**/output'",
- "--exclude '**/cache'",]
self.make_largefiles()
- # we know we're going to fail these, they are forced
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 1"] + excludes)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 2"] + excludes)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"] + excludes)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # this one should pass OK
- self.backup("full", "testfiles/largefiles", options = excludes)
- self.verify("testfiles/largefiles", options = excludes)
+ self.backup("full", "testfiles/largefiles", fail=1)
+ self.backup("full", "testfiles/largefiles", fail=2)
+ self.backup("full", "testfiles/largefiles", fail=3)
+ self.backup("full", "testfiles/largefiles")
+ self.verify("testfiles/largefiles")
def test_first_volume_failure(self):
"""
Test restart when no volumes are available on the remote.
Caused when duplicity fails before the first transfer.
"""
- excludes = ["--exclude '**/output'",
- "--exclude '**/cache'",]
- # we know we're going to fail these, they are forced
- try:
- self.backup("full", "testfiles", options = ["--vol 1", "--fail 1"] + excludes)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.make_largefiles()
+ self.backup("full", "testfiles/largefiles", fail=1)
assert not os.system("rm testfiles/output/duplicity-full*difftar*")
- # this one should pass OK
- self.backup("full", "testfiles", options = excludes)
- self.verify("testfiles", options = excludes)
+ self.backup("full", "testfiles/largefiles")
+ self.verify("testfiles/largefiles")
def test_multi_volume_failure(self):
"""
@@ -229,15 +69,9 @@
fails the last queued transfer(s).
"""
self.make_largefiles()
- # we know we're going to fail these, they are forced
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.backup("full", "testfiles/largefiles", fail=3)
assert not os.system("rm testfiles/output/duplicity-full*vol[23].difftar*")
- # this one should pass OK
- self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+ self.backup("full", "testfiles/largefiles")
self.verify("testfiles/largefiles")
def test_restart_sign_and_encrypt(self):
@@ -246,15 +80,9 @@
https://bugs.launchpad.net/duplicity/+bug/946988
"""
self.make_largefiles()
- enc_opts = ["--sign-key " + helper.sign_key, "--encrypt-key " + helper.sign_key]
- # Force a failure partway through
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vols 1", "--fail 2"] + enc_opts)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # Now finish that backup
- self.backup("full", "testfiles/largefiles", options = enc_opts)
+ enc_opts = ["--sign-key", self.sign_key, "--encrypt-key", self.sign_key]
+ self.backup("full", "testfiles/largefiles", options=enc_opts, fail=2)
+ self.backup("full", "testfiles/largefiles", options=enc_opts)
self.verify("testfiles/largefiles")
def test_restart_sign_and_hidden_encrypt(self):
@@ -263,15 +91,9 @@
https://bugs.launchpad.net/duplicity/+bug/946988
"""
self.make_largefiles()
- enc_opts = ["--sign-key " + helper.sign_key, "--hidden-encrypt-key " + helper.sign_key]
- # Force a failure partway through
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vols 1", "--fail 2"] + enc_opts)
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # Now finish that backup
- self.backup("full", "testfiles/largefiles", options = enc_opts)
+ enc_opts = ["--sign-key", self.sign_key, "--hidden-encrypt-key", self.sign_key]
+ self.backup("full", "testfiles/largefiles", options=enc_opts, fail=2)
+ self.backup("full", "testfiles/largefiles", options=enc_opts)
self.verify("testfiles/largefiles")
def test_last_file_missing_in_middle(self):
@@ -281,15 +103,9 @@
the file in the middle of the backup, with files following.
"""
self.make_largefiles()
- # we know we're going to fail, it's forced
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 3"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.backup("full", "testfiles/largefiles", fail=3)
assert not os.system("rm testfiles/largefiles/file2")
- # this one should pass OK
- self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+ self.backup("full", "testfiles/largefiles")
#TODO: we can't verify but we need to to check for other errors that might show up
# there should be 2 differences found, one missing file, one mtime change
#self.verify("testfiles/largefiles")
@@ -301,15 +117,9 @@
the file at the end of the backup, with no files following.
"""
self.make_largefiles()
- # we know we're going to fail, it's forced
- try:
- self.backup("full", "testfiles/largefiles", options = ["--vol 1", "--fail 6"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.backup("full", "testfiles/largefiles", fail=6)
assert not os.system("rm testfiles/largefiles/file3")
- # this one should pass OK
- self.backup("full", "testfiles/largefiles", options = ["--vol 1"])
+ self.backup("full", "testfiles/largefiles")
#TODO: we can't verify but we need to to check for other errors that might show up
# there should be 2 differences found, one missing file, one mtime change
#self.verify("testfiles/largefiles")
@@ -318,16 +128,9 @@
"""
Test restarting an incremental backup
"""
- # Make first normal full backup
+ self.make_largefiles()
self.backup("full", "testfiles/dir1")
- self.make_largefiles()
- # Force a failure partway through
- try:
- self.backup("inc", "testfiles/largefiles", options = ["--vols 1", "--fail 2"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # Now finish that incremental
+ self.backup("inc", "testfiles/largefiles", fail=2)
self.backup("inc", "testfiles/largefiles")
self.verify("testfiles/largefiles")
@@ -394,14 +197,15 @@
"""
source = 'testfiles/largefiles'
self.make_largefiles(count=1, size=1)
- self.backup("full", source, options=["--name=backup1"])
+ self.backup("full", source, options=["--volsize=5", "--name=backup1"])
# Fake an interruption
self.make_fake_second_volume("backup1")
# Add new file
assert not os.system("cp %s/file1 %s/newfile" % (source, source))
# 'restart' the backup
- self.backup("full", source, options=["--name=backup1"])
+ self.backup("full", source, options=["--volsize=5", "--name=backup1"])
# Confirm we actually resumed the previous backup
+ print os.listdir("testfiles/output")
self.assertEqual(len(os.listdir("testfiles/output")), 4)
# Now make sure everything is byte-for-byte the same once restored
self.restore()
@@ -414,11 +218,11 @@
"""
source = 'testfiles/largefiles'
self.make_largefiles(count=1, size=3)
- self.backup("full", source, options=["--vols 1", "--name=backup1"])
+ self.backup("full", source, options=["--name=backup1"])
# Fake an interruption
self.make_fake_second_volume("backup1")
# 'restart' the backup
- self.backup("full", source, options=["--vols 1", "--name=backup1"])
+ self.backup("full", source, options=["--name=backup1"])
# Now make sure everything is byte-for-byte the same once restored
self.restore()
assert not os.system("diff -r %s testfiles/restore_out" % source)
@@ -456,12 +260,7 @@
"""
source = 'testfiles/largefiles'
self.make_largefiles(count=5, size=1)
- # intentionally interrupt initial backup
- try:
- self.backup("full", source, options = ["--vol 1", "--fail 3"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.backup("full", source, fail=3)
# now delete the last volume on remote end and some source files
assert not os.system("rm testfiles/output/duplicity-full*vol3.difftar*")
assert not os.system("rm %s/file[2345]" % source)
@@ -480,12 +279,7 @@
"""
source = 'testfiles/largefiles'
self.make_largefiles(count=1)
- # intentionally interrupt initial backup
- try:
- self.backup("full", source, options = ["--vol 1", "--fail 2"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
+ self.backup("full", source, fail=2)
# now remove starting source data and make sure we add something after
assert not os.system("rm %s/*" % source)
assert not os.system("echo hello > %s/z" % source)
@@ -498,9 +292,10 @@
# Note that this class duplicates all the tests in RestartTest
class RestartTestWithoutEncryption(RestartTest):
- def setUp(self):
- RestartTest.setUp(self)
- self.class_args.extend(["--no-encryption"])
+ @classmethod
+ def setUpClass(cls):
+ super(RestartTestWithoutEncryption, cls).setUpClass()
+ cls.class_args.extend(["--no-encryption"])
def test_no_write_double_snapshot(self):
"""
@@ -510,13 +305,7 @@
https://launchpad.net/bugs/929067
"""
self.make_largefiles()
- # Start backup
- try:
- self.backup("full", "testfiles/largefiles", options = ["--fail 2", "--vols 1"])
- self.fail()
- except CmdError, e:
- self.assertEqual(30, e.exit_status)
- # Finish it
+ self.backup("full", "testfiles/largefiles", fail=2)
self.backup("full", "testfiles/largefiles")
# Now check sigtar
sigtars = glob.glob("testfiles/output/duplicity-full*.sigtar.gz")
Follow ups