← Back to team overview

duplicity-team team mailing list archive

Re: Python API

 

Hi All,

I hope that I'm not arriving too late to this particular party, but I wanted to throw my opinion out as well. I think that having a central duplicity API would be fantastic, though I also appreciate that creating anything to meet the needs of all of the different projects that use it would likely prove impossible.

I also appreciate that there are practical difficulties as well. As Ken notes:

<<  It's not really modular at this point.  There are a few classes, but
most activity is via straight function calls.  An API would be handy,
move the contents of duplicity-bin into duplicity module, make classes
as needed, and leave a thin shell as the executable.>>

When I was researching backends for Time Drive, I was originally drawn to duplicity for a number of reasons. One of those were it's very large number of backends, but an even larger reason was because it was written in Python. While I am a rather haphazard (and largely incompetent) programmer, I've always had good luck with Python. But having tried to directly build a pure python Gui on the duplicity classes, I found that getting things to work can involve some interesting acrobatics.

And it doesn't always make sense. While putting Time Drive's interface together (mostly just copying functions from duplicity-bin and fiddling with the outputs), I found that there are only a few instances that directly interacting with the duplicity classes was more effective than passing the calls through the command line. Those were:

  1. Retrieving the files in the archive and parsing the directory
     structure.
  2. Retrieving the list of snapshot times.
  3. Deleting backup sets and returning the operation results.

In other cases, such as backing up or restoring a file, using the command line interface was much faster and more convenient. It also has the additional benefit in that you can run multiple jobs at the same time without worrying about crashing the program or keeping track of/resetting the global variables.

This would change, though, if some of the non-modular parts could be reorganized. It would be brilliant to have a relatively high level API that the command line utility and a gui could leverage in a uniform way.

<<  Logging should be integral with the pure module approach, possibly even
opened up as a resource in the API.  We may not actually use it for user
output, but for debugging, its vital.>>

Agreed. I would love to a way of interacting with the log in a more cohesive way. I'm in the process of trying to implement a few simple logging methods to better catch duplicity errors and notify the user of problems at the moment, and if there were a way to cache or review a central duplicity log, that would be wonderful.

<<  Yes, a formal API would be good, as I said above.  I'm already doing
Epydoc on a regular basis and would like to continue.  We can use that
to help formalize an API.>>

In designing such an API, I would love to see a combination of both high level and low level methods. Considering the needs of a Gui client, the ability to start a backup operation, restore a file, delete a backup set, or combine (coalesce) a series of incremental snapshots would be highly convenient.

As a potential place to start discussions about what an API might look like and what needs it might fulfill, I've attached the python class that I use for Time Drive. From trial and error (the way I tend to do all development), there seem to be six methods that get used most consistently:

   * get_snapshot_list : Retrieves a lsit of the snapshots that are
     currently available in a given archive.
   * delete_backup : Deletes the backup sets specified by archive_url
     and backup_time
   * get_file_list : Retrieves a file_iterator associated with a
     specified archive url.
   * test_connection_url (which I need to overhaul) : Test the
     connection to an archive to ensure that it is valid.
   * restore_file : Restores a file specified by a relative_path to its
     destination
   * start_backup : Begin a new backup operation.

As noted above, additional methods to interact with the log/post updates to the user would also be nice. But even though those six methods cover Time Drive's needs fairly comprehensively, Time Drive is still a very young program. I'd be interested to hear what other things Michael and the others have used for their respective programs.

Cheers,

Rob Oakes
#!/usr/bin/env python
#
# Time Drive - based on duplicity - Encrypted bandwidth efficient backup.
#
# Copyright 2009 Rob Oakes	<LyX-Devel@oak-tree>
#
# Backend Methods that communicate with duplicity.  Because duplicity is
# a command line utility, it assumes that the entire environment will be 
# reset between each run.  The methods here have been slightly refactored so
# that multiple tasks can be performed within the same session.

import os
import os.path
import subprocess
import sys

from duplicity import backend
from duplicity import collections
from duplicity import commandline
from duplicity import diffdir
from duplicity import dup_time
from duplicity import file_naming
from duplicity import globals
from duplicity import gpg
from duplicity import log
from duplicity import path
import duplicity.backends
import duplicity.errors


def get_snapshot_list(archive_url, gnu_passphrase, sync_remote = True,
	S3_AccessId = None, S3_SecretKey = None, S3_BucketType = None):
	"""Retrieves a list of the snapshots that are currently available in a
	given archive."""
	
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
	
	# Set Global Variables for European Amazon S3 Buckets
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		globals.s3_european_buckets = True
		globals.s3_use_new_style = True
	
	# Setup Log and Retrieve Appropriate Backend
	log.setup()
	duplicity.backend.import_backends()
	
	filelist_backend = backend.get_backend(archive_url)
	filelist_archive_cache = retrieve_archive_dir(archive_url)
	
	# Set global variables to appropriate values
	globals.archive_dir = filelist_archive_cache
	globals.gpg_profile = gpg.GPGProfile()
	globals.gpg_profile.passphrase = gnu_passphrase
	
	# Check to See if Local List Needs to Be Synced With Remote
	if sync_remote:
		sync_archive(filelist_backend, gnu_passphrase)
	
	col_stats = collections.CollectionsStatus(filelist_backend, 
		filelist_archive_cache).set_values()
		
	try:
		backup_list = col_stats.matched_chain_pair[1].to_log_info('')
	except:	
		cleanup_globals()
		return None
	
	cleanup_globals()
	filelist_backend.close()
	log.shutdown()
	
	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()
	
	return backup_list


def delete_backup(archive_url, gnu_passphrase, backup_time, S3_AccessId = None,
	S3_SecretKey = None, S3_BucketType = None):
	"""Deletes the backup sets specified by archive_url and backup_time."""
	
	log.setup()
	duplicity.backend.import_backends()
	
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
	
	# Set Global Variables for European Amazon S3 Buckets
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		globals.s3_european_buckets = True
		globals.s3_use_new_style = True
	
	filelist_backend = backend.get_backend(archive_url)
	filelist_archive_cache = retrieve_archive_dir(archive_url)
	
	# Set global variables to appropriate values
	globals.archive_dir = filelist_archive_cache
	globals.gpg_profile = gpg.GPGProfile()
	globals.gpg_profile.passphrase = gnu_passphrase
	
	col_stats = collections.CollectionsStatus(filelist_backend,
		filelist_archive_cache).set_values()
	result_sets, backup_chain = retrieve_backup_sets(col_stats, backup_time)
	
	try:
		for backup_set in result_sets:
			backup_set.delete()
		col_stats.set_values(sig_chain_warning = None).cleanup_signatures()
		operation_success = True
	except:
		operation_success = False
	
	# Restore Values to Default, Unset Amazon S3 and Close Log
	cleanup_globals()
	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()
	filelist_backend.close()
	log.shutdown()
	
	return operation_success


def retrieve_backup_sets(col_stats, backup_time):
	"""Retreives the backup sets specified by archive_url and backup_time."""
	
	if not col_stats.all_backup_chains:
		result_sets = []
	else:
		backup_chain = col_stats.get_backup_chain_at_time(backup_time)
		assert backup_chain, col_stats.all_backup_chains
		result_sets = backup_chain.get_all_sets()
	
	# Return the list of result sets
	return result_sets, backup_chain


def get_file_list(archive_url, gnu_passphrase, sync_remote = True, time = None,
	S3_AccessId = None, S3_SecretKey = None, S3_BucketType = None):
	"""
	Retrieve the file_list from the specified archive url.
	@param archive_url (string): valid duplicity archive url
	@param gnu_passphrase (string): the passphrase for the specified url
	@param sync_remote (bool): Default = True: Will first make sure that
		the local file list is in sync with the remote file list.
	@param time (string): Default = None: Specify the time from which to list
		files.  Date should be provided in the format year-month-date.
		Ex: 2009-04-10
	@param S3AccessId (string): Default = None: Amazon S3 Access Id.
	@param S3SecretKey (string): Default = None: Amazon S3 secret key.
	@rtype: path_iter
	@return: list of files contained in the archive.  Can be parsed
		with utils.ParseDirectoryStructure.
	"""
	
	log.setup()
	duplicity.backend.import_backends()
	
	# Set Amazon S3 AccessId and Secret Key (if applicable)
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
	
	# Set Global Variables for European Amazon S3 Buckets
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		globals.s3_european_buckets = True
		globals.s3_use_new_style = True
	
	# Create a Duplicity Backend Object and Link to the Cache Directory
	filelist_backend = backend.get_backend(archive_url)
	filelist_archive_cache = retrieve_archive_dir(archive_url)
	
	# Set Appropriate Global Options
	globals.archive_dir = filelist_archive_cache
	globals.gpg_profile = gpg.GPGProfile()
	globals.gpg_profile.passphrase = gnu_passphrase
	
	# Check archive sync with remote, set snapshot from which to
	# list files, if applicable set Amazon S3 Access Key and Id
	
	if sync_remote:
		sync_archive(filelist_backend, gnu_passphrase)
	if time != None:
		globals.restore_time = dup_time.genstrtotime(time)
	
	# Retrieve Current Collection Status
	col_stats = collections.CollectionsStatus(filelist_backend, 
		filelist_archive_cache).set_values()
	
	# Fetch the Filelist and File Listing
	path_iter = list_current(col_stats)
	
	# Restore Values to Default and Close Log
	cleanup_globals()
	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()
		
	filelist_backend.close()
	log.shutdown()
	
	if path_iter == None:
		return None
	
	return path_iter


def test_connection_url(archive_url, S3_AccessId = None, 
	S3_SecretKey = None, S3_BucketType = None):
	"""Test the remote conneciton to ensure that it is valid"""
	
	log.setup()
	duplicity.backend.import_backends()
	
	# Set Appropriate Environment Variables for Amazon S3
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
	
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		globals.s3_european_buckets = True
		globals.s3_use_new_style = True
	
	try:
		filelist_backend = backend.get_backend(str(archive_url))
		filelist_backend.list()
		tstResult = True
	except:
		tstResult = False
	
	# Unset Amazon S3 Environment Variables
	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()
		
	log.shutdown()
	return tstResult

def restore_file(relative_path, archive_url, gnu_passphrase, destination, 
	restore_time = None, S3_AccessId = None, S3_SecretKey = None, S3_BucketType = None):
	"""Restores the file specified by relative_path to destination"""
	
	# Set the Amazon AccessId and Secret Key (if applicable)
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
	
	set_Passphrase(gnu_passphrase)
	
	# Set Global Variables for European Amazon S3 Buckets
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		S3_Bucket = "--s3-european-buckets --s3-use-new-style "
	else:
		S3_Bucket = ""
	
	if restore_time != None:
		timeCmd = "--restore-time '" + restore_time + "' "
		backupCmd = "duplicity " + S3_Bucket + timeCmd + "--file-to-restore '" + relative_path + "' '" + archive_url + "' '" + destination + '/' + os.path.basename(str(relative_path)) + "'"
	else:
		backupCmd = "duplicity " + S3_Bucket + "--file-to-restore '" + relative_path + "' '" + archive_url + "' '" + destination + '/' + os.path.basename(str(relative_path)) + "'"
	
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)
		
	result = _execute_output(str(backupCmd))

	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()

	return result


def start_backup(path, archive_url, gnu_passphrase, ExcludeList = None, 
	FullBackupInterval = None, S3_AccessId = None, S3_SecretKey = None, 
	S3_BucketType = None):
	"""Begins a new backup operation for the specified path and archive url.
	Supports exclusions (ExcludeList, Default = None) and setting how often 
	a new full backup should be made (FullBackupInterval, Default = None)"""

	if ExcludeList != None:
		exclude = ""
		for item, itemtype in ExcludeList:
			if itemtype == 1:
				exclude = exclude + "--exclude-regexp " + "'" + item + "' "
			if itemtype == 2 or itemtype == 3:
				exclude = exclude + "--exclude " + "'" + item + "' "
	
	if (FullBackupInterval != None)&(FullBackupInterval > 0):
		backupInterval = "--full-if-older-than " + str(FullBackupInterval) + "D "
	else:
		backupInterval = ""
		
	if (S3_BucketType != None)&(S3_BucketType == "European"):
		S3_Bucket = "--s3-european-buckets --s3-use-new-style "
	else:
		S3_Bucket = ""

	if gnu_passphrase is None:
		backupCmd = "duplicity " + backupInterval + "--no-encryption " + exclude + "'" + path.rstrip() + "'" + " '" + archive_url + "'"
	else:
		set_Passphrase(gnu_passphrase)
		backupCmd = "duplicity " + backupInterval + exclude + "'" + path.rstrip() + "'" + " '" + archive_url + "'"
	
	if S3_AccessId != None and S3_SecretKey != None:
		set_AmazonS3(S3_AccessId, S3_SecretKey)

	result = _execute_output(str(backupCmd))
	
	if S3_AccessId != None and S3_SecretKey != None:
		unset_AmazonS3()
	
	if gnu_passphrase != None:
		unset_Passphrase()

	return result


def cleanup_globals():
	"""Returns global variables to default states after action has finished."""
	
	globals.gpg_profile = None
	globals.backend = None
	globals.archive_dir = os.path.expandvars("$XDG_CACHE_HOME/duplicity")
	globals.restore_time = None
	globals.s3_use_new_style = False
	globals.s3_european_buckets = False


def retrieve_archive_dir(archive_url):
	"""
	@param archive_url: URL to archive
	@returns: Path to the directory where the local archive cache is stored.
	"""
	base_url = os.path.expandvars("$XDG_CACHE_HOME/duplicity")
	backup_name = commandline.generate_default_backup_name(archive_url)
	archive_expanded_url = commandline.expand_fn(os.path.join(base_url, backup_name))
	
	if not os.path.exists(archive_expanded_url):
		"""Check archive dir and construct path"""
		try:
			os.makedirs(archive_expanded_url)
		except:
			pass
	
	archive_dir = path.Path(archive_expanded_url)
	
	if not archive_dir.isdir():
		log.FatalError(_("Specified archive directory '%s' does not exist, "
			"or is not a directory") % (archive_dir.name,),
			log.ErrorCode.bad_archive_dir)
	
	return archive_dir


def list_current(col_stats):
	"""
	Returns a list of the files that are currently in the archive.
	The list is generated by examining the signature only.
	@type col_stats: CollectionStatus object
	@param col_stats: collection status
	
	@rtype: path_iter (duplicity class)
	@return: path_iter which contains the contents of the specified file collection
	"""
	sig_chain = check_sig_chain(col_stats)
	if not sig_chain:
		log.Notice(_("No signature data found, unable to list files."))
		return None
	time = globals.restore_time # if None, will use latest
	path_iter = diffdir.get_combined_path_iter(sig_chain.get_fileobjs(time))
	
	return path_iter


def check_sig_chain(col_stats):
	"""
	Get last signature chain for inc backup, or None if none avaialble.
	
	@type col_stats: CollectionStatus object
	@param col_stats: collection status
	"""
	if not col_stats.matched_chain_pair:
		if globals.incremental:
			log.FatalError(_("Fatal Error: Unable to start incremental backup.  "
				"Old signatures not found and incremental specified"),
				log.ErrorCode.inc_without_sigs)
	
		else:
			log.Warn(_("No signatures found, switching to full backup."))
		return None
	return col_stats.matched_chain_pair[0]
	

def sync_archive(backend, gnu_passphrase):
	"""
	Synchronize local archive manifest file and sig chains to remote archives.
	Copy missing files from remote to local as needed to make sure the local
	archive is synchronized to remote storage.
	
	@type backend: backend class
	
	@type gnu_passphrase: string
	@param gnu_passphrase: the encryption key to the archive
	
	@type encryption: Boolean (True/False)
	@param encryption: whether the archive is encrypted or not.  Provided by the settings.
	
	@rtype: void
	@return: void
	"""
	suffixes = [".g", ".gpg", ".z", ".gz"]
	
	def get_metafiles(filelist):
		"""
		Return metafiles of interest from the file list.
		Files of interest are:
		  sigtar - signature files
		  manifest - signature files
		Files excluded are:
		  non-duplicity files
		  duplicity partial files
		  
		@rtype: list
		@return: list of duplicity metadata files
		"""
		metafiles = {}
		need_passphrase = False
		for fn in filelist:
			pr = file_naming.parse(fn)
			if not pr:
				continue
			if pr.partial:
				continue
			if pr.encrypted:
				need_passphrase = True
			if pr.type in ["full-sig", "new-sig"] or pr.manifest:
				base, ext = os.path.splitext(fn)
				if ext in suffixes:
					metafiles[base] = fn
				else:
					metafiles[fn] = fn
		return metafiles, need_passphrase
		
	def copy_raw(src_iter, filename):
		"""
		Copy data from src_iter to file at fn
		"""
		block_size = 128 * 1024
		file = open(filename, "wb")
		while True:
			try:
				data = src_iter.next(block_size).data
			except StopIteration:
				break
			file.write(data)
		file.close()
	
	def resolve_basename(fn, backend):
		"""
		@return: (parsedresult, local_name, remote_name)
		"""
		pr = file_naming.parse(fn)
		if pr.manifest:
			suffix = file_naming.get_suffix(globals.encryption, False)
		else:
			suffix = file_naming.get_suffix(globals.encryption, not globals.encryption)
		rem_name = fn + suffix
		
		if pr.manifest:
			suffix = file_naming.get_suffix(False, False)
		else:
			suffix = file_naming.get_suffix(False, True)
		loc_name = fn + suffix
		
		return (pr, loc_name, rem_name)
		
	def remove_local(fn):
		pr, loc_name, rem_name = resolve_basename(fn, backend)
		
		del_name = globals.archive_dir.append(loc_name).name
		log.Notice(_("Deleting local %s (not authoritative at backend).") % del_name)
		os.unlink(del_name)
	
	def copy_to_local(fn, backend):
		"""
		Copy remote file fn to local cache.
		"""
		class Block:
			"""
			Data block to return from SrcIter
			"""
			def __init__(self, data):
				self.data = data
		
		class SrcIter:
			"""
			Iterate over source and return Block of data.
			"""
			def __init__(self, fileobj):
				self.fileobj = fileobj
				
			def next(self, size):
				try:
					res = Block(self.fileobj.read(size))
				except:
					log.FatalError(_("Failed to read %s: %s") %
						(self.fileobj.name, sys.exc_info()),
						log.ErrorCode.generic)
				
				if not res.data:
					self.fileobj.close()
					raise StopIteration
				return res
			
			def get_footer(self):
				return ""
			
		log.Notice(_("Copying %s to local cache.") % fn)
		
		pr, loc_name, rem_name = resolve_basename(fn, backend)
			
		fileobj = backend.get_fileobj_read(rem_name)
		src_iter = SrcIter(fileobj)
		if pr.manifest:
			copy_raw(src_iter, globals.archive_dir.append(loc_name).name)
		else:
			gpg.GzipWriteFile(src_iter, globals.archive_dir.append(loc_name).name, 
				size = sys.maxint)
	
	# get remote metafile list
	remlist = backend.list()
	remote_metafiles, rem_needpass = get_metafiles(remlist)
	
	# get local metafile list
	loclist = globals.archive_dir.listdir()
	local_metafiles, loc_needpass = get_metafiles(loclist)
	
	if rem_needpass or loc_needpass:
		globals.gpg_profile.passphrase = gnu_passphrase
	
	# we have the list of metafiles on both sides. remote is always
	# authoritative. figure out which are local spurious (should not
	# be there) and missing (should be there but are not).
	local_keys = local_metafiles.keys()
	remote_keys = remote_metafiles.keys()
	
	local_missing = []
	local_spurious = []
	
	for key in remote_keys:
		if not key in local_keys:
			local_missing.append(key)
	
	for key in local_keys:
		if not key in remote_keys:
			local_spurious.append(key)
			
	# finally finish the process
	if not local_missing and not local_spurious:
		log.Notice(_("Local and Remote metadata are synchronized, no sync needed."))
	else:
		local_missing.sort()
		local_spurious.sort()
		if not globals.dry_run:
			log.Notice(_("Synchronizing remote metadata to local cache..."))
			for fn in local_spurious:
				remove_local(fn)
			for fn in local_missing:
				copy_to_local(fn, backend)
		else:
			if local_missing:
				log.Notice(_("Sync would copy the following from remote to local:")
					+ "\n" + "\n".join(local_missing))
			if local_spurious:
				log.Notice(_("Sync would remove the following spurious local files:")
					+ "\n" + "\n".join(local_spurious))

def _execute(cmd, callback = None, user_data = None):
	ret_val = 0
	
	if callback is None:
		ret_val = os.system(cmd)
	else:
		pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,)
		
		while True:
			line = pipe.stdout.readline()
			if len( line ) == 0:
				break
			callback( line.strip(), user_data)
			
		pipe.stdout.close()
		if pipe.wait() is 0:
			ret_val = 0
	
	return ret_val

def _execute_output(cmd, callback = None, user_data = None ):
	output = ''
	pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,)
		
	while True:
		line = pipe.stdout.readline()
		if len( line ) == 0:
			break
		output = output + line
		if not callback is None:
			callback( line.strip(), user_data )
	
	pipe.stdout.close()
	if pipe.wait() is 0:
		ret_val = 0

	return output


def set_AmazonS3(S3_AccessId, S3_SecretKey):
	os.environ["AWS_ACCESS_KEY_ID"] = str(S3_AccessId)
	os.environ["AWS_SECRET_ACCESS_KEY"] = str(S3_SecretKey)

def unset_AmazonS3():
	os.unsetenv("AWS_ACCESS_KEY_ID")
	os.unsetenv("AWS_SECRET_ACCESS_KEY")

def set_Passphrase(passphrase):
	os.putenv("PASSPHRASE", passphrase)
	
def unset_Passphrase():
	os.unsetenv("PASSPHRASE")
	

References