ubuntu-support-team team mailing list archive
-
ubuntu-support-team team
-
Mailing list archive
-
Message #00077
[Merge] ~ubuntu-support-team/software-properties:devel into software-properties:ubuntu/master
Dan Streetman has proposed merging ~ubuntu-support-team/software-properties:devel into software-properties:ubuntu/master.
Requested reviews:
Ubuntu Core Development Team (ubuntu-core-dev)
For more details, see:
https://code.launchpad.net/~ubuntu-support-team/software-properties/+git/software-properties/+merge/379928
--
Your team Ubuntu Support Team is subscribed to branch ~ubuntu-support-team/software-properties:devel.
diff --git a/add-apt-repository b/add-apt-repository
index ea5f7dc..fbef737 100755
--- a/add-apt-repository
+++ b/add-apt-repository
@@ -7,190 +7,284 @@ import os
import sys
import gettext
import locale
+import argparse
+import subprocess
-from softwareproperties.SoftwareProperties import SoftwareProperties, shortcut_handler
-from softwareproperties.shortcuts import ShortcutException
-import aptsources
-from aptsources.sourceslist import SourceEntry
-from optparse import OptionParser
+from softwareproperties.shortcuthandler import ShortcutException
+from softwareproperties.shortcuts import shortcut_handler
+from softwareproperties.ppa import PPAShortcutHandler
+from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+from softwareproperties.uri import URIShortcutHandler
+
+from aptsources.distro import get_distro
+from aptsources.sourceslist import (SourcesList, SourceEntry)
from gettext import gettext as _
-if __name__ == "__main__":
- # Force encoding to UTF-8 even in non-UTF-8 locales.
- sys.stdout = io.TextIOWrapper(
- sys.stdout.detach(), encoding="UTF-8", line_buffering=True)
-
- try:
- locale.setlocale(locale.LC_ALL, "")
- except:
- pass
- gettext.textdomain("software-properties")
- usage = """Usage: %prog <sourceline>
-
-%prog is a script for adding apt sources.list entries.
-It can be used to add any repository and also provides a shorthand
-syntax for adding a Launchpad PPA (Personal Package Archive)
-repository.
-
-<sourceline> - The apt repository source line to add. This is one of:
- a complete apt line in quotes,
- a repo url and areas in quotes (areas defaults to 'main')
- a PPA shortcut.
- a distro component
-
- Examples:
- apt-add-repository 'deb http://myserver/path/to/repo stable myrepo'
- apt-add-repository 'http://myserver/path/to/repo myrepo'
- apt-add-repository 'https://packages.medibuntu.org free non-free'
- apt-add-repository http://extras.ubuntu.com/ubuntu
- apt-add-repository ppa:user/repository
- apt-add-repository ppa:user/distro/repository
- apt-add-repository multiverse
-
-If --remove is given the tool will remove the given sourceline from your
-sources.list
-"""
- parser = OptionParser(usage)
- # FIXME: provide a --sources-list-file= option that
- # puts the line into a specific file in sources.list.d
- parser.add_option ("-m", "--massive-debug", action="store_true",
- dest="massive_debug", default=False,
- help=_("Print a lot of debug information to the command line"))
- parser.add_option("-r", "--remove", action="store_true",
- dest="remove", default=False,
- help=_("remove repository from sources.list.d directory"))
- parser.add_option("-s", "--enable-source", action="store_true",
- dest="enable_source", default=False,
- help=_("Allow downloading of the source packages from the repository"))
- parser.add_option("-y", "--yes", action="store_true",
- dest="assume_yes", default=False,
- help=_("Assume yes to all queries"))
- parser.add_option("-n", "--no-update", action="store_false",
- dest="update", default=True,
- help=_("Do not update package cache after adding"))
- parser.add_option("-u", "--update", action="store_true",
- dest="update", default=True,
- help=_("Update package cache after adding (legacy option)"))
- parser.add_option("-k", "--keyserver",
- dest="keyserver", default="",
- help=_("Legacy option, unused."))
-
- (options, args) = parser.parse_args()
-
- # We prefer to run apt-get update here. The built-in update support
- # does not have any progress, and only works for shortcuts. Moving
- # it to something like save() and using apt.progress.text would
- # solve the problem, but the new errors might cause problems with
- # the dbus server or other users of the API. Also, it's unclear
- # how good the text progress is or how to pass it best.
- update = options.update
- options.update = False
-
- if os.geteuid() != 0:
- print(_("Error: must run as root"))
- sys.exit(1)
-
- if len(args) == 0:
- print(_("Error: need a repository as argument"))
- sys.exit(1)
- elif len(args) > 1:
- print(_("Error: need a single repository as argument"))
- sys.exit(1)
-
- # force new ppa file to be 644 (LP: #399709)
- os.umask(0o022)
-
- # get the line
- line = args[0]
-
- # add it
- sp = SoftwareProperties(options=options)
- distro = aptsources.distro.get_distro()
- distro.get_sources(sp.sourceslist)
-
- # check if its a component that should be added/removed
- components = [comp.name for comp in distro.source_template.components]
- if line in components:
- if options.remove:
- if line in distro.enabled_comps:
- distro.disable_component(line)
- print(_("'%s' distribution component disabled for all sources.") % line)
+
+class AddAptRepository(object):
+ def __init__(self):
+ gettext.textdomain("software-properties")
+ self.distro = get_distro()
+ self.sourceslist = SourcesList()
+ self.distro.get_sources(self.sourceslist)
+
+ def parse_args(self, args):
+ description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified"
+
+ parser = argparse.ArgumentParser(description=description)
+ parser.add_argument("-d", "--debug", action="store_true",
+ help=_("Print debug"))
+ parser.add_argument("-r", "--remove", action="store_true",
+ help=_("Disable repository"))
+ parser.add_argument("-s", "--enable-source", action="count", default=0,
+ help=_("Allow downloading of the source packages from the repository"))
+ parser.add_argument("-c", "--component", action="append", default=[],
+ help=_("Components to use with the repository"))
+ parser.add_argument("-p", "--pocket",
+ help=_("Add entry for this pocket"))
+ parser.add_argument("-y", "--yes", action="store_true",
+ help=_("Assume yes to all queries"))
+ parser.add_argument("-n", "--no-update", dest="update", action="store_false",
+ help=_("Do not update package cache after adding"))
+ parser.add_argument("-u", "--update", action="store_true", default=True,
+ help=argparse.SUPPRESS)
+ parser.add_argument("-l", "--login", action="store_true",
+ help=_("Login to Launchpad."))
+ parser.add_argument("--dry-run", action="store_true",
+ help=_("Don't actually make any changes."))
+
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("-P", "--ppa",
+ help=_("PPA to add"))
+ group.add_argument("-C", "--cloud",
+ help=_("Cloud Archive to add"))
+ group.add_argument("-U", "--uri",
+ help=_("Archive URI to add"))
+ group.add_argument("-S", "--sourceslist", nargs='+', default=[],
+ help=_("Full sources.list entry line to add"))
+ group.add_argument("line", nargs='*', default=[],
+ help=_("sources.list line to add (deprecated)"))
+
+ self.parser = parser
+ self.options = self.parser.parse_args(args)
+
+ @property
+ def dry_run(self):
+ return self.options.dry_run
+
+ @property
+ def enable_source(self):
+ return self.options.enable_source > 0
+
+ @property
+ def components(self):
+ return self.options.component
+
+ @property
+ def pocket(self):
+ return self.options.pocket
+
+ def is_components(self, comps):
+ if not comps:
+ return False
+ return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components])
+
+ def apt_update(self):
+ if self.options.update and not self.dry_run:
+ # We prefer to run apt-get update here. The built-in update support
+ # does not have any progress, and only works for shortcuts. Moving
+ # it to something like save() and using apt.progress.text would
+ # solve the problem, but the new errors might cause problems with
+ # the dbus server or other users of the API. Also, it's unclear
+ # how good the text progress is or how to pass it best.
+ subprocess.run(['apt-get', 'update'])
+
+ def prompt_user(self):
+ if self.dry_run:
+ print(_("DRY-RUN mode: no modifications will be made"))
+ return
+ if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ:
+ try:
+ input(_("Press [ENTER] to continue or Ctrl-c to cancel."))
+ except KeyboardInterrupt:
+ print(_("Aborted."))
+ sys.exit(1)
+
+ def prompt_user_shortcut(self, shortcut):
+ '''Display more information about the shortcut / ppa info'''
+ print(_("Repository: '%s'") % shortcut.SourceEntry().line)
+ if shortcut.description:
+ print(_("Description:"))
+ print(shortcut.description)
+ if shortcut.web_link:
+ print(_("More info: %s") % shortcut.web_link)
+ if self.options.remove:
+ print(_("Removing repository."))
+ else:
+ print(_("Adding repository."))
+ self.prompt_user()
+
+ def change_components(self):
+ for c in self.components:
+ if self.options.remove:
+ self.distro.disable_component(c)
+ print(_("Removed component %s") % c)
else:
- print(_("'%s' distribution component is already disabled for all sources.") % line)
- sys.exit(0)
+ self.distro.enable_component(c)
+ print(_("Added component %s") % c)
+ if not self.dry_run:
+ self.sourceslist.save()
+
+ def _add_pocket(self):
+ sourceslist = [s for s in self.sourceslist if not s.invalid and not s.disabled]
+ for s in sourceslist:
+ # add new pocket ONLY for templated sources
+ if not s.template:
+ continue
+ # add new pocket ONLY for sources currently using -release
+ codename = self.distro.codename
+ if not s.dist in [codename, '%s-release' % codename]:
+ continue
+ new_dist = '%s-%s' % (codename, self.pocket)
+ new_entry = s._replace(dist=new_dist)
+ self.sourceslist.add_entry(new_entry, after=s)
+ print(_("Adding: %s") % str(new_entry).strip())
+
+ def _remove_pocket(self):
+ dist = '%s-%s' % (self.distro.codename, self.pocket)
+ for s in [s for s in self.sourceslist if not s.invalid and not s.disabled]:
+ if s.dist == dist:
+ s.set_enabled(False)
+ print(_("Disabled: %s") % str(s).strip())
+
+ def change_pocket(self):
+ if self.options.remove:
+ self._remove_pocket()
+ else:
+ self._add_pocket()
+ if not self.dry_run:
+ self.sourceslist.save()
+
+ def _enable_source(self):
+ for s in [s for s in self.sourceslist if not s.invalid]:
+ if s.type == self.distro.source_type:
+ s.set_enabled(True)
+ print(_("Enabled: %s") % str(s).strip())
+ if self.options.enable_source > 1:
+ # if -s passed twice, add new deb-src entries if needed for all deb entries
+ for s in [s for s in self.sourceslist if not s.invalid and not s.disabled]:
+ if s.type == self.distro.binary_type:
+ entry = s._replace(type=self.distro.source_type)
+ if not self.sourceslist.has_entry(entry):
+ self.sourceslist.add_entry(entry, after=s)
+ print(_("Added: %s") % str(entry).strip())
+
+ def _disable_source(self):
+ for s in [s for s in self.sourceslist if not s.invalid and not s.disabled]:
+ if s.type == self.distro.source_type:
+ s.set_enabled(False)
+ print(_("Disabled: %s") % str(s).strip())
+
+ def change_source(self):
+ if self.options.remove:
+ self._disable_source()
else:
- if line not in distro.enabled_comps:
- distro.enable_component(line)
- print(_("'%s' distribution component enabled for all sources.") % line)
+ self._enable_source()
+ if not self.dry_run:
+ self.sourceslist.save()
+
+ def global_change(self):
+ if self.components:
+ if self.options.remove:
+ print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components))
else:
- print(_("'%s' distribution component is already enabled for all sources.") % line)
- sys.exit(0)
- sp.sourceslist.save()
- if update and not options.remove:
- os.execvp("apt-get", ["apt-get", "update"])
- sys.exit(0)
-
- # this wasn't a component name ('multiverse', 'backports'), so its either
- # a actual line to be added or a shortcut.
- try:
- shortcut = shortcut_handler(line)
- except ShortcutException as e:
- print(e)
- sys.exit(1)
-
- # display more information about the shortcut / ppa info
- if not options.assume_yes and shortcut.should_confirm():
- try:
- info = shortcut.info()
- except ShortcutException as e:
- print(e)
- sys.exit(1)
-
- print(" %s" % (info["description"] or ""))
- print(_(" More info: %s") % str(info["web_link"]))
- if (sys.stdin.isatty() and
- not "FORCE_ADD_APT_REPOSITORY" in os.environ):
- if options.remove:
- print(_("Press [ENTER] to continue or Ctrl-c to cancel removing it."))
+ print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components))
+ if self.pocket:
+ if self.options.remove:
+ print(_("Removing pocket %s for all repositories.") % self.pocket)
else:
- print(_("Press [ENTER] to continue or Ctrl-c to cancel adding it."))
- try:
- sys.stdin.readline()
- except KeyboardInterrupt:
- print("\n")
- sys.exit(1)
+ print(_("Adding pocket %s for all repositories.") % self.pocket)
+ if self.enable_source:
+ if self.options.remove:
+ print(_("Disabling %s for all repositories.") % self.distro.source_type)
+ else:
+ print(_("Enabling %s for all repositories.") % self.distro.source_type)
+ self.prompt_user()
+ if self.components:
+ self.change_components()
+ if self.pocket:
+ self.change_pocket()
+ if self.enable_source:
+ self.change_source()
+ def main(self, args=sys.argv[1:]):
+ self.parse_args(args)
- if options.remove:
- try:
- (line, file) = shortcut.expand(
- sp.distro.codename, sp.distro.id.lower())
- except ShortcutException as e:
- print(e)
- sys.exit(1)
- deb_line = sp.expand_http_line(line)
- debsrc_line = 'deb-src' + deb_line[3:]
- deb_entry = SourceEntry(deb_line, file)
- debsrc_entry = SourceEntry(debsrc_line, file)
- try:
- sp.remove_source(deb_entry)
- except ValueError:
- print(_("Error: '%s' doesn't exist in a sourcelist file") % deb_line)
- try:
- sp.remove_source(debsrc_entry)
- except ValueError:
- print(_("Error: '%s' doesn't exist in a sourcelist file") % debsrc_line)
+ if not self.dry_run and os.geteuid() != 0:
+ print(_("Error: must run as root"))
+ return False
+
+ line = ' '.join(self.options.line)
+ if line == '-':
+ line = sys.stdin.readline().strip()
+
+ # if 'line' is only (valid) components, handle as if only -c was used with no line
+ if self.is_components(line):
+ self.options.component += line.split()
+ line = ''
+
+ if self.options.ppa:
+ source = self.options.ppa
+ if not ':' in source:
+ source = 'ppa:' + source
+ handler = PPAShortcutHandler
+ elif self.options.cloud:
+ source = self.options.cloud
+ if not ':' in source:
+ source = 'uca:' + source
+ handler = CloudArchiveShortcutHandler
+ elif self.options.uri:
+ source = self.options.uri
+ handler = URIShortcutHandler
+ elif self.options.sourceslist:
+ source = ' '.join(self.options.sourceslist)
+ handler = SourcesListShortcutHandler
+ elif line:
+ source = line
+ handler = shortcut_handler
+ elif any((self.enable_source, self.components, self.pocket)):
+ self.global_change()
+ self.apt_update()
+ return True
+ else:
+ print(_("Error: no actions requested."))
+ self.parser.print_help()
+ return False
- else:
try:
- if not sp.add_source_from_shortcut(shortcut, options.enable_source):
- print(_("Error: '%s' invalid") % line)
- sys.exit(1)
+ shortcut_params = {
+ 'login': self.options.login,
+ 'enable_source': self.enable_source,
+ 'dry_run': self.dry_run,
+ 'components': self.components,
+ 'pocket': self.pocket,
+ }
+ shortcut = handler(source, **shortcut_params)
except ShortcutException as e:
print(e)
- sys.exit(1)
+ return False
+
+ self.prompt_user_shortcut(shortcut)
+
+ if self.options.remove:
+ shortcut.remove()
+ else:
+ shortcut.add()
+
+ self.apt_update()
+ return True
- sp.sourceslist.save()
- if update and not options.remove:
- os.execvp("apt-get", ["apt-get", "update"])
- sys.exit(0)
+if __name__ == '__main__':
+ addaptrepo = AddAptRepository()
+ sys.exit(0 if addaptrepo.main() else 1)
diff --git a/debian/bash-completion/add-apt-repository b/debian/bash-completion/add-apt-repository
new file mode 100644
index 0000000..4a151e4
--- /dev/null
+++ b/debian/bash-completion/add-apt-repository
@@ -0,0 +1,93 @@
+# bash completion for add-apt-repository -*- shell-script -*-
+
+_add-apt-repository_components()
+{
+ if test -r /etc/lsb-release && grep -q Ubuntu /etc/lsb-release; then
+ COMPREPLY=( $(compgen -W 'main restricted universe multiverse main/debug' -- "$cur") )
+ elif test -r /etc/debian_version; then
+ COMPREPLY=( $(compgen -W 'main contrib free non-free' -- "$cur") )
+ fi
+}
+
+_add-apt-repository_ppa()
+{
+ : # nothing to do currently
+}
+
+_add-apt-repository_uca()
+{
+ : # add param to list valid UCAs for current release
+}
+
+_add-apt-repository_uri()
+{
+ COMPREPLY=( $(compgen -W 'http:// https:// mirror:// ftp:// file:/ copy:/ rsh:// ssh://' -- "$cur") )
+}
+
+_add-apt-repository_sourceslist()
+{
+ : # maybe add help to autofill full sources.list line
+}
+
+_add-apt-repository_help()
+{
+ "$1" --help | grep '^\s*-' | tr ',' '\n' | _parse_help -
+}
+
+_add-apt-repository()
+{
+ local cur prev words cword
+ _init_completion || return
+
+ if [[ $cur == -* ]]; then
+ COMPREPLY=( $(compgen -W "$(_add-apt-repository_help $1)" -- "$cur") )
+ return
+ fi
+
+ case $prev in
+ -p|--pocket)
+ COMPREPLY=( $(compgen -W 'release security updates proposed backports' -- "$cur") )
+ return
+ ;;
+ -c|--component)
+ _add-apt-repository_components
+ return
+ ;;
+ -P|--ppa)
+ _add-apt-repository_ppa
+ return
+ ;;
+ -C|--cloud)
+ _add-apt-repository_uca
+ return
+ ;;
+ -U|--uri)
+ _add-apt-repository_uri
+ return
+ ;;
+ -S|--sourceslist)
+ _add-apt-repository_sourceslist
+ return
+ ;;
+ esac
+
+ # check if last param was -S/--sourceslist,
+ # as it can accept multiple words
+ local i=$cword
+ while [[ $i -ge 0 ]]; do
+ case $words[$i] in
+ -S|--sourceslist)
+ _add-apt-repository_sourceslist
+ return
+ ;;
+ esac
+ i=$(( $i - 1 ))
+ done
+}
+
+complete -F _add-apt-repository add-apt-repository
+
+# also complete the alias
+complete -F _add-apt-repository apt-add-repository
+
+# ex: filetype=sh
diff --git a/debian/control b/debian/control
index 6998d5e..9f9441f 100644
--- a/debian/control
+++ b/debian/control
@@ -21,6 +21,7 @@ Build-Depends: dbus-x11 <!nocheck>,
python3-distro-info <!nocheck>,
python3-distutils-extra,
python3-gi <!nocheck>,
+ python3-launchpadlib,
python3-mock <!nocheck>,
python3-requests-unixsocket <!nocheck>,
python3-setuptools,
@@ -40,6 +41,7 @@ Depends: gpg,
python3-apt (>=
0.6.20ubuntu16),
python3-gi,
+ python3-launchpadlib,
${misc:Depends},
${python3:Depends}
Recommends: unattended-upgrades
diff --git a/debian/manpages/add-apt-repository.1 b/debian/manpages/add-apt-repository.1
index 3a195a4..7662cf8 100644
--- a/debian/manpages/add-apt-repository.1
+++ b/debian/manpages/add-apt-repository.1
@@ -4,53 +4,130 @@ add-apt-repository \- Adds a repository into the
/etc/apt/sources.list or /etc/apt/sources.list.d
or removes an existing one
.SH SYNOPSIS
-.B add-apt-repository \fI[OPTIONS]\fR \fIREPOSITORY\fR
+.B add-apt-repository \fI[OPTIONS]\fR \fI[LINE]\fR
.SH DESCRIPTION
.B add-apt-repository
is a script which adds an external APT repository to either
/etc/apt/sources.list or a file in /etc/apt/sources.list.d/
or removes an already existing repository.
-The options supported by add-apt-repository are:
-
+.SH OPTIONS
+Note that the \fB--ppa\fR, \fB--cloud\fR, \fB--uri\fR, \fB--sourceslist\fR, and \fBLINE\fR parameters are mutually exclusive;
+only one (or none) of them may be specified.
+.TP
.B -h, --help
Show help message and exit
-
-.B -m, --massive-debug
-Print a lot of debug information to the command line
-
+.TP
+.B -d, --debug
+Print debug information to the command line
+.TP
.B -r, --remove
-Remove the specified repository
+Remove the specified repository; this first will disable (comment out) the matching line(s),
+and then any modified file(s) under sources.list.d/ will be removed if they contain only empty and commented lines.
+Note that this performs differently when used with the \fI--enable-source\fR and/or \fI--component\fR parameters.
+Without either of those parameters, this removes the specified repository, including any \fBdeb-src\fR line(s), and all components.
+If \fI--enable-source\fR is used, this removes \fBonly\fR the 'deb-src' line(s).
+If \fI--component\fR is used, this removes \fBonly\FR the specified component(s), and only removes the repository if no components remain.
+
+If both \fI--enable-source\fR and \fI--component\fR are used with \fI--remove\fR, the actions are performed separately: the specified
+component(s) will be removed from both \fBdeb\fR and \fBdeb-src\fR lines, and \fBdeb-src\fR lines will be disabled.
+.TP
.B -y, --yes
Assume yes to all queries
-
-.B -u, --update
-After adding the repository, update the package cache with packages from this repository (avoids need to apt-get update)
-
-.B -k, --keyserver
-Use a custom keyserver URL instead of the default
-
+.TP
+.B -n, --no-update
+After adding the repository, do not update the package cache
+.TP
+.B -l, --login
+Login to Launchpad (this is only needed for private PPAs)
+.TP
.B -s, --enable-source
-Allow downloading of the source packages from the repository
-
-
-.SH REPOSITORY STRING
-\fIREPOSITORY\fR can be either a line that can be added directly to
-sources.list(5), in the form ppa:<user>/<ppa-name> for adding Personal
-Package Archives, or a distribution component to enable.
-
-In the first form, \fIREPOSITORY\fR will just be appended to
-/etc/apt/sources.list.
-
-In the second form, ppa:<user>/<ppa-name> will be expanded to the full deb line
-of the PPA and added into a new file in the /etc/apt/sources.list.d/
-directory.
-The GPG public key of the newly added PPA will also be downloaded and
-added to apt's keyring.
-
-In the third form, the given distribution component will be enabled for all
-sources.
+Allow downloading of the source packages from the repository; specifically, this adds and enables a 'deb-src' line for the repostiory.
+If this parameter is used without any repository, it will enable source for all currently existing repositories.
+.TP
+.B -c, --component
+Which component(s) should be used with the specified repository. If not specified, this will default to 'main'.
+This may be used multiple times to specify multiple components. If this is used without any repository, it will add the
+component(s) to all currently existing repositories.
+.TP
+.B -p, --pocket
+What pocket to use. Defaults to none, which is equivalent to the release pocket.
+.TP
+.B --dry-run
+Show what would be done, but don't make any changes
+.TP
+.B -P, --ppa
+Add an Ubuntu Launchpad Personal Package Archive.
+Must be in the format \fBppa:USER/PPA\fR, \fBUSER/PPA\fR, or \fBUSER\fR.
+The \fBUSER\fR parameter should be the Launchpad team or person that owns the PPA.
+The \fBPPA\fR parameter should be the name of the PPA; if not provided, it defaults to 'ppa'.
+The GPG public key of the PPA will also be downloaded and added to apt's keyring.
+To add a private PPA, you must also use the \FI--login\fR parameter, and of course you must also be subscribed to the private PPA.
+.TP
+.B -C, --cloud
+Add an Ubuntu Cloud Archive.
+Must be in the format \fBcloud-archive:CANAME\fR, \fBuca:CANAME\fR, or \fBCANAME\fR.
+The \fBCANAME\fR parameter should be the name of the Cloud Archive.
+The \fBCANAME\fR parameter may optionally be suffixed with the pocket, as either \fB-updates\fR or \fB-proposed\fR.
+If not specified, the pocket defaults to \fB-updates\fR.
+.TP
+.B -U, --uri
+Add an archive, specified as a single URI.
+If the URI provided is detected to be a PPA, this will operate as if the \fI--ppa\fR parameter was used.
+.TP
+.B -S, --sourceslist
+Add an archive, specified as a full source entry line in one-line sources.list format.
+It must follow the \fIONE-LINE-STYLE\fR format as described in the \fBsources.list\fR manpage.
+If the URI provided is detected to be a PPA, this will operate as if the \fI--ppa\fR parameter was used.
+
+.SH LINE
+\fILINE\fR is a deprecated method to specify the repository to add/remove, provided only for backwards compatibility.
+It can be specified in any of the supported formats: sources.list line, plain uri, ppa shortcut, or cloud-archive shortcut.
+It can also be specified as one or more valid component(s). The script will attempt to detect which format is provided.
+
+This is not recommended as the autodetection of which repository format is intended can be ambiguous, but older
+scripts may still use this method of specifying the repository.
+
+One special case of \fILINE\fR is providing the value \fB-\fR, which will then read the \fILINE\fR from stdin.
+
+.SH DEPRECATED EXAMPLES
+.TP
+add-apt-repository -p ppa:user/repository
+.TP
+add-apt-repository -p user/repository
+.TP
+add-apt-repository -c cloud-archive:queens
+.TP
+add-apt-repository -c queens
+.TP
+add-apt-repository -S 'deb http://myserver/path/to/repo stable main'
+.TP
+add-apt-repository -U http://myserver/path/to/repo -C main
+.TP
+add-apt-repository -U https://packages.medibuntu.org -C free -C non-free
+.TP
+add-apt-repository -U http://extras.ubuntu.com/ubuntu
+.TP
+add-apt-repository -s
+.TP
+add-apt-repository -s -r
+.TP
+add-apt-repository -C universe
+.TP
+add-apt-repository -r -C multiverse
+
+.SH DEPRECATED EXAMPLES
+.TP
+add-apt-repository deb http://myserver/path/to/repo stable main
+.TP
+add-apt-repository http://myserver/path/to/repo main
+.TP
+add-apt-repository https://packages.medibuntu.org free non-free
+.TP
+add-apt-repository http://extras.ubuntu.com/ubuntu
+.TP
+add-apt-repository multiverse
.SH SEE ALSO
\fBsources.list\fR(5)
diff --git a/debian/software-properties-common.install b/debian/software-properties-common.install
index 3d204ed..90f11bf 100644
--- a/debian/software-properties-common.install
+++ b/debian/software-properties-common.install
@@ -5,3 +5,4 @@ usr/lib/software-properties
usr/share/dbus-1/
usr/share/locale/*
usr/share/polkit-1
+debian/bash-completion/add-apt-repository /usr/share/bash-completion/completions
diff --git a/debian/software-properties-common.links b/debian/software-properties-common.links
index 23d3a4c..54072b6 100644
--- a/debian/software-properties-common.links
+++ b/debian/software-properties-common.links
@@ -1,2 +1,3 @@
usr/bin/add-apt-repository usr/bin/apt-add-repository
usr/share/man/man1/add-apt-repository.1.gz usr/share/man/man1/apt-add-repository.1.gz
+usr/share/bash-completion/completions/add-apt-repository usr/share/bash-completion/completions/apt-add-repository
diff --git a/debian/tests/add-apt-repository b/debian/tests/add-apt-repository
deleted file mode 100755
index 7dab076..0000000
--- a/debian/tests/add-apt-repository
+++ /dev/null
@@ -1,14 +0,0 @@
-#!/bin/sh
-set -e
-
-for locale in C.UTF-8 C
-do
- export LC_ALL=$locale
- echo LC_ALL=$locale test...
- rm -f /etc/apt/sources.list.d/xnox-ubuntu-nonvirt-*.list
- rm -f /etc/apt/trusted.gpg.d/xnox_ubuntu_nonvirt.gpg
- add-apt-repository ppa:xnox/nonvirt --yes --no-update
- [ -s /etc/apt/sources.list.d/xnox-ubuntu-nonvirt-*.list ]
- [ -s /etc/apt/trusted.gpg.d/xnox_ubuntu_nonvirt.gpg ]
- gpg -q --homedir $(mktemp -d) --no-default-keyring --keyring /etc/apt/trusted.gpg.d/xnox_ubuntu_nonvirt.gpg --fingerprint
-done
diff --git a/debian/tests/add-apt-repository-archive b/debian/tests/add-apt-repository-archive
new file mode 100755
index 0000000..2907391
--- /dev/null
+++ b/debian/tests/add-apt-repository-archive
@@ -0,0 +1,56 @@
+#!/usr/bin/python3
+
+import contextlib
+import os
+import sys
+import subprocess
+import tempfile
+
+from aptsources.distro import get_distro
+
+
+codename=get_distro().codename
+URI='http://fake.mirror.private.com/ubuntu'
+SOURCESLIST=f'deb {URI} {codename} main'
+SOURCESLISTFILE=f'/etc/apt/sources.list.d/archive_uri-http_fake_mirror_private_com_ubuntu-{codename}.list'
+
+def run_test(archive, param, yes, noupdate, remove, locale):
+ env = os.environ.copy()
+ if locale:
+ env['LC_ALL'] = locale
+
+ with contextlib.suppress(FileNotFoundError):
+ os.remove(SOURCESLISTFILE)
+
+ cmd = f'add-apt-repository {yes} {noupdate} {param} {archive}'
+ subprocess.check_call(cmd.split(), env=env)
+
+ if not os.path.exists(SOURCESLISTFILE) or os.path.getsize(SOURCESLISTFILE) == 0:
+ print("Missing/empty sources.list file: %s" % SOURCESLISTFILE)
+ sys.exit(1)
+
+ cmd = f'add-apt-repository {remove} {yes} {noupdate} {param} {archive}'
+ subprocess.check_call(cmd.split(), env=env)
+
+ if os.path.exists(SOURCESLISTFILE):
+ print("sources.list file not removed: %s" % SOURCESLISTFILE)
+ with open(SOURCESLISTFILE) as f:
+ print(f.read())
+ sys.exit(1)
+
+
+for PARAM in ['-U', '--uri', '']:
+ for YES in ['-y', '--yes']:
+ for NOUPDATE in ['-n', '--no-update', '']:
+ for REMOVE in ['-r', '--remove']:
+ for LOCALE in ['', 'C', 'C.UTF-8']:
+ run_test(URI, PARAM, YES, NOUPDATE, REMOVE, LOCALE)
+
+for PARAM in ['-S', '--sourceslist', '']:
+ for YES in ['-y', '--yes']:
+ for NOUPDATE in ['-n', '--no-update', '']:
+ for REMOVE in ['-r', '--remove']:
+ for LOCALE in ['', 'C', 'C.UTF-8']:
+ run_test(SOURCESLIST, PARAM, YES, NOUPDATE, REMOVE, LOCALE)
+
+sys.exit(0)
diff --git a/debian/tests/add-apt-repository-cloud b/debian/tests/add-apt-repository-cloud
new file mode 100755
index 0000000..6809234
--- /dev/null
+++ b/debian/tests/add-apt-repository-cloud
@@ -0,0 +1,59 @@
+#!/usr/bin/python3
+
+import contextlib
+import os
+import sys
+import subprocess
+
+from softwareproperties.cloudarchive import RELEASE_MAP
+from aptsources.distro import get_distro
+
+codename = get_distro().codename
+uca_releases = list(filter(lambda r: RELEASE_MAP[r] == codename, RELEASE_MAP.keys()))
+
+if not uca_releases:
+ print("No UCA releases available for this Ubuntu release")
+ sys.exit(77)
+
+def run_test(caname, uca, param, yes, noupdate, remove, locale):
+ env = os.environ.copy()
+ if locale:
+ env['LC_ALL'] = locale
+
+ SOURCESLISTFILE=f'/etc/apt/sources.list.d/cloudarchive-{caname}.list'
+
+ with contextlib.suppress(FileNotFoundError):
+ os.remove(SOURCESLISTFILE)
+
+ cmd = 'apt-get -q -y remove ubuntu-cloud-keyring'
+ subprocess.run(cmd.split(), stderr=subprocess.DEVNULL, env=env)
+
+ cmd = f'add-apt-repository {yes} {noupdate} {param} {uca}'
+ subprocess.check_call(cmd.split(), env=env)
+
+ if not os.path.exists(SOURCESLISTFILE) or os.path.getsize(SOURCESLISTFILE) == 0:
+ print("Missing/empty sources.list file: %s" % SOURCESLISTFILE)
+ sys.exit(1)
+
+ cmd = 'dpkg-query -l ubuntu-cloud-keyring'
+ subprocess.check_call(cmd.split(), env=env)
+
+ cmd = f'add-apt-repository {remove} {yes} {noupdate} {param} {uca}'
+ subprocess.check_call(cmd.split(), env=env)
+
+ if os.path.exists(SOURCESLISTFILE):
+ print("sources.list file not removed: %s" % SOURCESLISTFILE)
+ with open(SOURCESLISTFILE) as f:
+ print(f.read())
+ sys.exit(1)
+
+for CANAME in uca_releases:
+ for UCA in [f'{CANAME}', f'cloud-archive:{CANAME}', f'uca:{CANAME}']:
+ for PARAM in ['-C', '--cloud', '']:
+ for YES in ['-y', '--yes']:
+ for NOUPDATE in ['-n', '--no-update', '']:
+ for REMOVE in ['-r', '--remove']:
+ for LOCALE in ['', 'C', 'C.UTF-8']:
+ run_test(CANAME, UCA, PARAM, YES, NOUPDATE, REMOVE, LOCALE)
+
+sys.exit(0)
diff --git a/debian/tests/add-apt-repository-ppa b/debian/tests/add-apt-repository-ppa
new file mode 100755
index 0000000..06904ea
--- /dev/null
+++ b/debian/tests/add-apt-repository-ppa
@@ -0,0 +1,71 @@
+#!/usr/bin/python3
+
+import contextlib
+import os
+import sys
+import subprocess
+import tempfile
+
+from aptsources.distro import get_distro
+
+codename=get_distro().codename
+SOURCESLISTFILE=f'/etc/apt/sources.list.d/ubuntu-support-team-ubuntu-software-properties-autopkgtest-{codename}.list'
+TRUSTEDFILE='/etc/apt/trusted.gpg.d/ubuntu-support-team-ubuntu-software-properties-autopkgtest.gpg'
+PPANAME='ubuntu-support-team/software-properties-autopkgtest'
+
+def run_test(ppa, param, yes, noupdate, remove, locale):
+ env = os.environ.copy()
+ if locale:
+ env['LC_ALL'] = locale
+
+ with contextlib.suppress(FileNotFoundError):
+ os.remove(SOURCESLISTFILE)
+ with contextlib.suppress(FileNotFoundError):
+ os.remove(TRUSTEDFILE)
+
+ cmd = f'add-apt-repository {yes} {noupdate} {param} {ppa}'
+ try:
+ subprocess.check_call(cmd.split(), env=env)
+ except subprocess.CalledProcessError:
+ if not param and not ppa.startswith('ppa:'):
+ # When using 'line' instead of --ppa, the 'ppa:' prefix is required
+ return
+ raise
+
+ if not os.path.exists(SOURCESLISTFILE) or os.path.getsize(SOURCESLISTFILE) == 0:
+ print("Missing/empty sources.list file: %s" % SOURCESLISTFILE)
+ sys.exit(1)
+
+ if not os.path.exists(TRUSTEDFILE) or os.path.getsize(TRUSTEDFILE) == 0:
+ print("Missing/empty trusted.gpg file: %s" % TRUSTEDFILE)
+ sys.exit(1)
+
+ with tempfile.TemporaryDirectory() as homedir:
+ cmd = f'gpg -q --homedir {homedir} --no-default-keyring --keyring {TRUSTEDFILE} --fingerprint'
+ subprocess.check_call(cmd.split(), env=env)
+
+ cmd = f'add-apt-repository {remove} {yes} {noupdate} {param} {ppa}'
+ subprocess.check_call(cmd.split(), env=env)
+
+ if os.path.exists(SOURCESLISTFILE):
+ print("sources.list file not removed: %s" % SOURCESLISTFILE)
+ with open(SOURCESLISTFILE) as f:
+ print(f.read())
+ sys.exit(1)
+
+ if not os.path.exists(TRUSTEDFILE):
+ print("trusted.gpg should not have been removed, but it was: %s" % TRUSTEDFILE)
+ sys.exit(1)
+
+
+for PPAFORMAT in [f'{PPANAME}', f'{PPANAME}'.replace('/', '/ubuntu/')]:
+ for PPA in [f'{PPAFORMAT}', f'ppa:{PPAFORMAT}']:
+ for PARAM in ['-P', '--ppa', '']:
+ for YES in ['-y', '--yes']:
+ for NOUPDATE in ['-n', '--no-update', '']:
+ for REMOVE in ['-r', '--remove']:
+ for LOCALE in ['', 'C', 'C.UTF-8']:
+ run_test(PPA, PARAM, YES, NOUPDATE, REMOVE, LOCALE)
+
+sys.exit(0)
+
diff --git a/debian/tests/control b/debian/tests/control
index c90e3ef..ad74b04 100644
--- a/debian/tests/control
+++ b/debian/tests/control
@@ -11,6 +11,14 @@ Depends: dbus-x11,
xvfb,
@
-Tests: add-apt-repository
+Tests: add-apt-repository-ppa
Depends: gpg, software-properties-common
-Restrictions: needs-root, breaks-testbed
+Restrictions: needs-root, breaks-testbed, allow-stderr
+
+Tests: add-apt-repository-cloud
+Depends: software-properties-common
+Restrictions: needs-root, breaks-testbed, allow-stderr, skippable
+
+Tests: add-apt-repository-archive
+Depends: software-properties-common
+Restrictions: needs-root, breaks-testbed, allow-stderr
diff --git a/softwareproperties/SoftwareProperties.py b/softwareproperties/SoftwareProperties.py
index fae6d18..30c0289 100644
--- a/softwareproperties/SoftwareProperties.py
+++ b/softwareproperties/SoftwareProperties.py
@@ -60,19 +60,12 @@ import aptsources.distro
import softwareproperties
from .AptAuth import AptAuth
-from aptsources.sourceslist import SourcesList, SourceEntry
-from . import shortcuts
-from . import ppa
-from . import cloudarchive
+from aptsources.sourceslist import (SourcesList, SourceEntry)
+from softwareproperties.shortcuthandler import (ShortcutException, InvalidShortcutException)
+from softwareproperties.shortcuts import shortcut_handler
from gi.repository import Gio
-_SHORTCUT_FACTORIES = [
- ppa.shortcut_handler,
- cloudarchive.shortcut_handler,
- shortcuts.shortcut_handler,
-]
-
class SoftwareProperties(object):
@@ -678,31 +671,25 @@ class SoftwareProperties(object):
return os.path.splitext(os.path.basename(f))[0]
return None
- def check_and_add_key_for_whitelisted_channels(self, srcline):
- # This is maintained for any legacy callers
- return self.check_and_add_key_for_whitelisted_shortcut(shortcut_handler(srcline))
-
def check_and_add_key_for_whitelisted_shortcut(self, shortcut):
"""
helper that adds the gpg key of the channel to the apt
keyring *if* the channel is in the whitelist
/usr/share/app-install/channels or it is a public Launchpad PPA.
"""
- (srcline, _fname) = shortcut.expand(
- codename=self.distro.codename, distro=self.distro.id.lower())
+ srcline = shortcut.SourceEntry().line
channel = self._is_line_in_whitelisted_channel(srcline)
if channel:
keyp = "%s/%s.key" % (self.CHANNEL_PATH, channel)
self.add_key(keyp)
- cdata = (shortcut.add_key, {'keyserver': (self.options)})
+ cdata = (shortcut.add_key, {})
def addkey_func():
func, kwargs = cdata
msg = "Added key."
try:
- ret = func(**kwargs)
- if not ret:
- msg = "Failed to add key."
+ func(**kwargs)
+ ret = True
except Exception as e:
ret = False
msg = str(e)
@@ -736,9 +723,12 @@ class SoftwareProperties(object):
"""
Add a source for the given line.
"""
- return self.add_source_from_shortcut(
- shortcut=shortcut_handler(line.strip()),
- enable_source_code=enable_source_code)
+ try:
+ shortcut = shortcut_handler(line.strip())
+ except InvalidShortcutException:
+ return False
+
+ return self.add_source_from_shortcut(shortcut, enable_source_code)
def add_source_from_shortcut(self, shortcut, enable_source_code=False):
"""
@@ -746,8 +736,8 @@ class SoftwareProperties(object):
site is in whitelist or the shortcut implementer adds it.
"""
- (deb_line, file) = shortcut.expand(
- codename=self.distro.codename, distro=self.distro.id.lower())
+ deb_line = shortcut.SourceEntry().line
+ file = shortcut.sourceparts_file
deb_line = self.expand_http_line(deb_line)
debsrc_entry_type = 'deb-src' if enable_source_code else '# deb-src'
debsrc_line = debsrc_entry_type + deb_line[3:]
@@ -776,10 +766,10 @@ class SoftwareProperties(object):
worker.join(30)
if worker.isAlive():
# thread timed out.
- raise shortcuts.ShortcutException("Error: retrieving gpg key timed out.")
+ raise ShortcutException("Error: retrieving gpg key timed out.")
result, msg = self.myqueue.get()
if not result:
- raise shortcuts.ShortcutException(msg)
+ raise ShortcutException(msg)
if self.options and self.options.update:
import apt
@@ -866,14 +856,6 @@ class SoftwareProperties(object):
return "%s;%s;%s;" % (ver.package.shortname, ver.version,
ver.package.architecture())
-def shortcut_handler(shortcut):
- for factory in _SHORTCUT_FACTORIES:
- ret = factory(shortcut)
- if ret is not None:
- return ret
-
- raise shortcuts.ShortcutException("Unable to handle input '%s'" % shortcut)
-
if __name__ == "__main__":
sp = SoftwareProperties()
diff --git a/softwareproperties/cloudarchive.py b/softwareproperties/cloudarchive.py
index 476452d..9975a24 100644
--- a/softwareproperties/cloudarchive.py
+++ b/softwareproperties/cloudarchive.py
@@ -21,12 +21,17 @@
from __future__ import print_function
-import apt_pkg
import os
-import subprocess
+
from gettext import gettext as _
-from softwareproperties.shortcuts import ShortcutException
+from softwareproperties.shortcuthandler import (ShortcutHandler, ShortcutException,
+ InvalidShortcutException)
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+from softwareproperties.uri import URIShortcutHandler
+
+from urllib.parse import urlparse
+
RELEASE_MAP = {
'folsom': 'precise',
@@ -46,98 +51,108 @@ RELEASE_MAP = {
'train': 'bionic',
'ussuri': 'bionic',
}
-MIRROR = "http://ubuntu-cloud.archive.canonical.com/ubuntu"
UCA = "Ubuntu Cloud Archive"
WEB_LINK = 'https://wiki.ubuntu.com/OpenStack/CloudArchive'
-APT_INSTALL_KEY = ['apt-get', '--quiet', '--assume-yes', 'install',
- 'ubuntu-cloud-keyring']
-
-ALIASES = {'tools-updates': 'tools'}
-for _r in RELEASE_MAP:
- ALIASES["%s-updates" % _r] = _r
-
-MAP = {
- 'tools': {
- 'sldfmt': '%(codename)s-updates/cloud-tools',
- 'description': UCA + " for cloud-tools (JuJu and MAAS)"},
- 'tools-proposed': {
- 'sldfmt': '%(codename)s-proposed/cloud-tools',
- 'description': UCA + " for cloud-tools (JuJu and MAAS) [proposed]"}
-}
-
-for _r in RELEASE_MAP:
- MAP[_r] = {
- 'sldfmt': '%(codename)s-updates/' + _r,
- 'description': UCA + ' for ' + 'OpenStack ' + _r.capitalize(),
- 'release': RELEASE_MAP[_r]}
- MAP[_r + "-proposed"] = {
- 'sldfmt': '%(codename)s-proposed/' + _r,
- 'description': UCA + ' for ' + 'OpenStack %s [proposed]' % _r.capitalize(),
- 'release': RELEASE_MAP[_r]}
-
-class CloudArchiveShortcutHandler(object):
- def __init__(self, shortcut):
- self.shortcut = shortcut
-
- prefix = "cloud-archive:"
-
- subs = {'shortcut': shortcut, 'prefix': prefix,
- 'ca_names': sorted(MAP.keys())}
- if not shortcut.startswith(prefix):
- raise ValueError(
- _("shortcut '%(shortcut)s' did not start with '%(prefix)s'")
- % subs)
-
- name_in = shortcut[len(prefix):]
- caname = ALIASES.get(name_in, name_in)
+UCA_ARCHIVE = "http://ubuntu-cloud.archive.canonical.com/ubuntu"
+UCA_PREFIXES = ['cloud-archive', 'uca']
+UCA_VALID_POCKETS = ['updates', 'proposed']
+UCA_DEFAULT_POCKET = UCA_VALID_POCKETS[0]
+
+
+class CloudArchiveShortcutHandler(ShortcutHandler):
+ def __init__(self, shortcut, **kwargs):
+ super(CloudArchiveShortcutHandler, self).__init__(shortcut, **kwargs)
+ self.caname = None
+
+ # one of these will set caname and pocket, and maybe _source_entry
+ if not any((self._match_uca(shortcut),
+ self._match_uri(shortcut),
+ self._match_sourceslist(shortcut))):
+ msg = (_("not a valid cloud-archive format: '%s'") % shortcut)
+ raise InvalidShortcutException(msg)
+
+ self.caname = self.caname.lower()
+
+ self._filebase = "cloudarchive-%s" % self.caname
+
+ self.pocket = self.pocket.lower()
+ if not self.pocket in UCA_VALID_POCKETS:
+ msg = (_("not a valid cloud-archive pocket: '%s'") % self.pocket)
+ raise ShortcutException(msg)
+
+ if not self.caname in RELEASE_MAP:
+ msg = (_("not a valid cloud-archive: '%s'") % self.caname)
+ raise ShortcutException(msg)
+
+ codename = RELEASE_MAP[self.caname]
+ validnames = (self.codename, os.getenv("CA_ALLOW_CODENAME"))
+ if codename not in validnames:
+ msg = (_("cloud-archive for %s only supported on %s") %
+ (self.caname.capitalize(), codename.capitalize()))
+ raise ShortcutException(msg)
+
+ self._description = f'{UCA} for OpenStack {self.caname.capitalize()}'
+ if self.pocket == 'proposed':
+ self._description += ' [proposed]'
+
+ if not self._source_entry:
+ dist = ('%s-%s/%s' % (codename, self.pocket, self.caname))
+ comps = ' '.join(self.components) or 'main'
+ line = ' '.join([self.distro.binary_type, UCA_ARCHIVE, dist, comps])
+ self._set_source_entry(line)
+
+ @property
+ def description(self):
+ return self._description
+
+ @property
+ def web_link(self):
+ return WEB_LINK
+
+ def _encode_filebase(self, suffix=None):
+ # ignore suffix
+ return super(CloudArchiveShortcutHandler, self)._encode_filebase()
+
+ def _match_uca(self, shortcut):
+ (prefix, _, uca) = shortcut.rpartition(':')
+ if not prefix.lower() in UCA_PREFIXES:
+ return False
- subs.update({'input_name': name_in})
- if caname not in MAP:
- raise ShortcutException(
- _("'%(input_name)s': not a valid cloud-archive name.\n"
- "Must be one of %(ca_names)s") % subs)
+ (caname, _, pocket) = uca.partition('-')
+ if not caname:
+ return False
self.caname = caname
- self._info = MAP[caname].copy()
- self._info['web_link' ] = WEB_LINK
-
- def info(self):
- return self._info
-
- def expand(self, codename, distro=None):
- if codename not in (MAP[self.caname]['release'],
- os.environ.get("CA_ALLOW_CODENAME")):
- raise ShortcutException(
- _("cloud-archive for %(os_release)s only supported on %(codename)s")
- % {'codename': MAP[self.caname]['release'],
- 'os_release': self.caname.capitalize()})
- dist = MAP[self.caname]['sldfmt'] % {'codename': codename}
- line = ' '.join(('deb', MIRROR, dist, 'main',))
- return (line, _fname_for_caname(self.caname))
-
- def should_confirm(self):
+ self.pocket = pocket or self.pocket or UCA_DEFAULT_POCKET
return True
- def add_key(self, keyserver=None):
- env = os.environ.copy()
- env['DEBIAN_FRONTEND'] = 'noninteractive'
+ def _match_uri(self, shortcut):
try:
- subprocess.check_call(args=APT_INSTALL_KEY, env=env)
- except subprocess.CalledProcessError:
+ return self._match_handler(URIShortcutHandler(shortcut))
+ except InvalidShortcutException:
+ return False
+
+ def _match_sourceslist(self, shortcut):
+ try:
+ return self._match_handler(SourcesListShortcutHandler(shortcut))
+ except InvalidShortcutException:
+ return False
+
+ def _match_handler(self, handler):
+ parsed = urlparse(handler.SourceEntry().uri)
+ if parsed.hostname != urlparse(UCA_ARCHIVE).hostname:
return False
- return True
+ (codename, _, caname) = handler.SourceEntry().dist.partition('/')
+ (codename, _, pocket) = codename.partition('-')
-def _fname_for_caname(caname):
- # caname is an entry in MAP ('tools' or 'tools-proposed')
- return os.path.join(
- apt_pkg.config.find_dir("Dir::Etc::sourceparts"),
- 'cloudarchive-%s.list' % caname)
+ if not all((codename, caname)):
+ return False
+ self.caname = caname
+ self.pocket = pocket or self.pocket or UCA_DEFAULT_POCKET
+
+ self._set_source_entry(handler.SourceEntry().line)
+ return True
-def shortcut_handler(shortcut):
- try:
- return CloudArchiveShortcutHandler(shortcut)
- except ValueError:
- return None
diff --git a/softwareproperties/gtk/DialogCacheOutdated.py b/softwareproperties/gtk/DialogCacheOutdated.py
index 5852897..9ee671e 100644
--- a/softwareproperties/gtk/DialogCacheOutdated.py
+++ b/softwareproperties/gtk/DialogCacheOutdated.py
@@ -81,9 +81,8 @@ class DialogCacheOutdated:
self._pdia.progressbar.set_fraction(perc / 100.0)
def on_pktask_finish(self, source, result, udata=(None,)):
- results = None
try:
- results = self._pktask.generic_finish(result)
+ self._pktask.generic_finish(result)
except Exception as e:
dialog = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.CANCEL, _("Error while refreshing cache"))
diff --git a/softwareproperties/gtk/SoftwarePropertiesGtk.py b/softwareproperties/gtk/SoftwarePropertiesGtk.py
index ae35ec0..a3021f6 100644
--- a/softwareproperties/gtk/SoftwarePropertiesGtk.py
+++ b/softwareproperties/gtk/SoftwarePropertiesGtk.py
@@ -1072,9 +1072,8 @@ class SoftwarePropertiesGtk(SoftwareProperties, SimpleGtkbuilderApp):
self.progress_bar.set_fraction(prog_value / 100.0)
def on_driver_changes_finish(self, source, result, installs_pending):
- results = None
try:
- results = self.pk_task.generic_finish(result)
+ self.pk_task.generic_finish(result)
except Exception as e:
self.on_driver_changes_revert()
error(self.window_main, _("Error while applying changes"), str(e))
diff --git a/softwareproperties/kde/.keep b/softwareproperties/kde/.keep
deleted file mode 100644
index e69de29..0000000
--- a/softwareproperties/kde/.keep
+++ /dev/null
diff --git a/softwareproperties/ppa.py b/softwareproperties/ppa.py
index 9ee8df8..28f2d2f 100644
--- a/softwareproperties/ppa.py
+++ b/softwareproperties/ppa.py
@@ -1,8 +1,9 @@
-# software-properties PPA support
+# software-properties PPA support, using launchpadlib
#
-# Copyright (c) 2004-2009 Canonical Ltd.
+# Copyright (c) 2019 Canonical Ltd.
#
-# Author: Michael Vogt <mvo@xxxxxxxxxx>
+# Original Author: Michael Vogt <mvo@xxxxxxxxxx>
+# Rewrite: Dan Streetman <ddstreet@xxxxxxxxxxxxx>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
@@ -19,457 +20,191 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
-from __future__ import print_function
-
-import apt_pkg
-import json
-import os
-import re
-import shutil
-import subprocess
-import tempfile
-import time
-
from gettext import gettext as _
-from threading import Thread
-
-from softwareproperties.shortcuts import ShortcutException
-
-try:
- import urllib.request
- from urllib.error import HTTPError, URLError
- import urllib.parse
- from http.client import HTTPException
- NEED_PYCURL = False
-except ImportError:
- NEED_PYCURL = True
- import pycurl
- HTTPError = pycurl.error
-
-
-SKS_KEYSERVER = 'https://keyserver.ubuntu.com/pks/lookup?op=get&options=mr&exact=on&search=0x%s'
-# maintained until 2015
-LAUNCHPAD_PPA_API = 'https://launchpad.net/api/devel/%s/+archive/%s'
-LAUNCHPAD_USER_API = 'https://launchpad.net/api/1.0/%s'
-LAUNCHPAD_USER_PPAS_API = 'https://launchpad.net/api/1.0/%s/ppas'
-LAUNCHPAD_DISTRIBUTION_API = 'https://launchpad.net/api/1.0/%s'
-LAUNCHPAD_DISTRIBUTION_SERIES_API = 'https://launchpad.net/api/1.0/%s/%s'
-# Specify to use the system default SSL store; change to a different path
-# to test with custom certificates.
-LAUNCHPAD_PPA_CERT = "/etc/ssl/certs/ca-certificates.crt"
-
-
-class CurlCallback:
- def __init__(self):
- self.contents = ''
-
- def body_callback(self, buf):
- self.contents = self.contents + buf
-
-
-class PPAException(Exception):
-
- def __init__(self, value, original_error=None):
- self.value = value
- self.original_error = original_error
-
- def __str__(self):
- return repr(self.value)
-
-
-def encode(s):
- return re.sub("[^a-zA-Z0-9_-]", "_", s)
-
-
-def get_info_from_https(url, accept_json, retry_delays=None):
- """Return the content from url.
- accept_json indicates that:
- a.) Send header Accept: 'application/json'
- b.) Instead of raw content, return json.loads(content)
- retry_delays is None or an iterator (including list or tuple)
- If it is None, no retries will be done.
- If it is an iterator, each value is number of seconds to delay before
- retrying. For example, retry_delays=(3,5) means to try up to 3
- times, with a 3s delay after first failure and 5s delay after second.
- Retries will not be done on 404."""
- func = _get_https_content_pycurl if NEED_PYCURL else _get_https_content_py3
- data = func(lp_url=url, accept_json=accept_json, retry_delays=retry_delays)
- if accept_json:
- return json.loads(data)
- else:
- return data
-
-
-def get_info_from_lp(lp_url):
- return get_info_from_https(lp_url, True)
-
-def get_ppa_info_from_lp(owner_name, ppa):
- if owner_name[0] != '~':
- owner_name = '~' + owner_name
- lp_url = LAUNCHPAD_PPA_API % (owner_name, ppa)
- return get_info_from_lp(lp_url)
-
-def series_valid_for_distro(distribution, series):
- lp_url = LAUNCHPAD_DISTRIBUTION_SERIES_API % (distribution, series)
- try:
- get_info_from_lp(lp_url)
- return True
- except PPAException:
- return False
-def get_current_series_from_lp(distribution):
- lp_url = LAUNCHPAD_DISTRIBUTION_API % distribution
- return os.path.basename(get_info_from_lp(lp_url)["current_series_link"])
+from launchpadlib.launchpad import Launchpad
+from lazr.restfulclient.errors import (NotFound, BadRequest, Unauthorized)
+from softwareproperties.shortcuthandler import (ShortcutHandler, ShortcutException,
+ InvalidShortcutException)
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+from softwareproperties.uri import URIShortcutHandler
-def _get_https_content_py3(lp_url, accept_json, retry_delays=None):
- if retry_delays is None:
- retry_delays = []
+from urllib.parse import urlparse
- trynum = 0
- err = None
- sleep_waits = iter(retry_delays)
- headers = {"Accept": "application/json"} if accept_json else {}
- while True:
- trynum += 1
- try:
- request = urllib.request.Request(str(lp_url), headers=headers)
- lp_page = urllib.request.urlopen(request,
- cafile=LAUNCHPAD_PPA_CERT)
- return lp_page.read().decode("utf-8", "strict")
- except (HTTPException, URLError) as e:
- err = PPAException(
- "Error reading %s (%d tries): %s" % (lp_url, trynum, e.reason),
- e)
- # do not retry on 404. HTTPError is a subclass of URLError.
- if isinstance(e, HTTPError) and e.code == 404:
- break
- try:
- time.sleep(next(sleep_waits))
- except StopIteration:
- break
+PPA_URI_FORMAT = 'http://ppa.launchpad.net/{team}/{ppa}/ubuntu/'
+PRIVATE_PPA_URI_FORMAT = 'https://private-ppa.launchpad.net/{team}/{ppa}/ubuntu/'
+PPA_VALID_HOSTNAMES = [urlparse(PPA_URI_FORMAT).hostname, urlparse(PRIVATE_PPA_URI_FORMAT).hostname]
- raise err
+PPA_VALID_COMPS = ['main', 'main/debug']
-def _get_https_content_pycurl(lp_url, accept_json, retry_delays=None):
- # this is the fallback code for python2
- if retry_delays is None:
- retry_delays = []
+class PPAShortcutHandler(ShortcutHandler):
+ def __init__(self, shortcut, login=False, **kwargs):
+ super(PPAShortcutHandler, self).__init__(shortcut, **kwargs)
+ self._lp_anon = not login
+ self._signing_key_data = None
- trynum = 0
- sleep_waits = iter(retry_delays)
+ self._lp = None # LP object
+ self._lpteam = None # Person/Team LP object
+ self._lpppa = None # PPA Archive LP object
- while True:
- err_msg = None
- err = None
- trynum += 1
- try:
- callback = CurlCallback()
- curl = pycurl.Curl()
- curl.setopt(pycurl.SSL_VERIFYPEER, 1)
- curl.setopt(pycurl.SSL_VERIFYHOST, 2)
- curl.setopt(pycurl.FOLLOWLOCATION, 1)
- curl.setopt(pycurl.WRITEFUNCTION, callback.body_callback)
- if LAUNCHPAD_PPA_CERT:
- curl.setopt(pycurl.CAINFO, LAUNCHPAD_PPA_CERT)
- curl.setopt(pycurl.URL, str(lp_url))
- if accept_json:
- curl.setopt(pycurl.HTTPHEADER, ["Accept: application/json"])
- curl.perform()
- response = curl.getinfo(curl.RESPONSE_CODE)
- curl.close()
-
- if response != 200:
- err_msg = "response code %i" % response
- except pycurl.error as e:
- err_msg = str(e)
- err = e
-
- if err_msg is None:
- return callback.contents
+ # one of these will set teamname and ppaname, and maybe _source_entry
+ if not any((self._match_ppa(shortcut),
+ self._match_uri(shortcut),
+ self._match_sourceslist(shortcut))):
+ msg = (_("ERROR: '%s' is not a valid ppa format") % shortcut)
+ raise InvalidShortcutException(msg)
- try:
- time.sleep(next(sleep_waits))
- except StopIteration:
- break
-
- raise PPAException(
- "Error reading %s (%d tries): %s" % (lp_url, trynum, err_msg),
- original_error=err)
-
-
-def mangle_ppa_shortcut(shortcut):
- if ":" in shortcut:
- ppa_shortcut = shortcut.split(":")[1]
- else:
- ppa_shortcut = shortcut
- if ppa_shortcut.startswith("/"):
- ppa_shortcut = ppa_shortcut.lstrip("/")
- user = ppa_shortcut.split("/")[0]
- if (user[0] == "~"):
- user = user[1:]
- ppa_path_objs = ppa_shortcut.split("/")[1:]
- ppa_path = []
- if (len(ppa_path_objs) < 1):
- ppa_path = ['ubuntu', 'ppa']
- elif (len(ppa_path_objs) == 1):
- ppa_path.insert(0, "ubuntu")
- ppa_path.extend(ppa_path_objs)
- else:
- ppa_path = ppa_path_objs
- ppa = "~%s/%s" % (user, "/".join(ppa_path))
- return ppa
-
-def verify_keyid_is_v4(signing_key_fingerprint):
- """Verify that the keyid is a v4 fingerprint with at least 160bit"""
- return len(signing_key_fingerprint) >= 160/8
-
-
-class AddPPASigningKey(object):
- " thread class for adding the signing key in the background "
-
- def __init__(self, ppa_path, keyserver=None):
- self.ppa_path = ppa_path
- self._homedir = tempfile.mkdtemp()
-
- def __del__(self):
- shutil.rmtree(self._homedir)
-
- def gpg_cmd(self, args):
- cmd = "gpg -q --homedir %s --no-default-keyring --no-options --import --import-options %s" % (self._homedir, args)
- return subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
- def _recv_key(self, ppa_info):
- signing_key_fingerprint = ppa_info["signing_key_fingerprint"]
- try:
- # double check that the signing key is a v4 fingerprint (160bit)
- if not verify_keyid_is_v4(signing_key_fingerprint):
- print("Error: signing key fingerprint '%s' too short" %
- signing_key_fingerprint)
- return False
- except TypeError:
- print("Error: signing key fingerprint does not exist")
- return False
+ self._filebase = "%s-ubuntu-%s" % (self.teamname, self.ppaname)
+ self._set_auth()
- return get_ppa_signing_key_data(ppa_info)
+ if not self._source_entry:
+ comps = self.components
+ if not comps:
+ comps = ['main']
+ if self._lpppa.publish_debug_symbols:
+ comps += ['main/debug']
- def _minimize_key(self, key):
- p = self.gpg_cmd("import-minimal,import-export")
- (minimal_key, _) = p.communicate(key.encode())
-
- if p.returncode != 0:
- return False
- return minimal_key
-
- def _get_fingerprints(self, key):
- fingerprints = []
- p = self.gpg_cmd("show-only --fingerprint --batch --with-colons")
- (output, _) = p.communicate(key)
- if p.returncode == 0:
- for line in output.decode('utf-8').splitlines():
- if line.startswith("fpr:"):
- fingerprints.append(line.split(":")[9])
- return fingerprints
-
- def _verify_fingerprint(self, key, expected_fingerprint):
- got_fingerprints = self._get_fingerprints(key)
- if len(got_fingerprints) != 1:
- print("Got '%s' fingerprints, expected only one" %
- len(got_fingerprints))
- return False
- got_fingerprint = got_fingerprints[0]
- if got_fingerprint != expected_fingerprint:
- print("Fingerprints do not match, not importing: '%s' != '%s'" % (
- expected_fingerprint, got_fingerprint))
+ uri_format = PRIVATE_PPA_URI_FORMAT if self.lpppa.private else PPA_URI_FORMAT
+ uri = uri_format.format(team=self.teamname, ppa=self.ppaname)
+ line = ('%s %s %s %s' % (self.binary_type, uri, self.dist, ' '.join(comps)))
+ self._set_source_entry(line)
+
+ @property
+ def lp(self):
+ if not self._lp:
+ if self._lp_anon:
+ login_func = Launchpad.login_anonymously
+ else:
+ login_func = Launchpad.login_with
+ self._lp = login_func("%s.%s" % (self.__module__, self.__class__.__name__),
+ service_root='production',
+ version='devel')
+ return self._lp
+
+ @property
+ def lpteam(self):
+ if not self._lpteam:
+ try:
+ self._lpteam = self.lp.people(self.teamname)
+ except NotFound:
+ msg = (_("ERROR: user/team '%s' not found (use --login if private)") % self.teamname)
+ raise ShortcutException(msg)
+ except Unauthorized:
+ msg = (_("ERROR: invalid user/team name '%s'") % self.teamname)
+ raise ShortcutException(msg)
+ return self._lpteam
+
+ @property
+ def lpppa(self):
+ if not self._lpppa:
+ try:
+ self._lpppa = self.lpteam.getPPAByName(name=self.ppaname)
+ except NotFound:
+ msg = (_("ERROR: ppa '%s/%s' not found (use --login if private)") %
+ (self.teamname, self.ppaname))
+ raise ShortcutException(msg)
+ except BadRequest:
+ msg = (_("ERROR: invalid ppa name '%s'") % self.ppaname)
+ raise ShortcutException(msg)
+ return self._lpppa
+
+ @property
+ def description(self):
+ return self.lpppa.description
+
+ @property
+ def web_link(self):
+ return self.lpppa.web_link
+
+ @property
+ def trustedparts_content(self):
+ if not self._signing_key_data:
+ key = self.lpppa.getSigningKeyData()
+ fingerprint = self.lpppa.signing_key_fingerprint
+
+ if not fingerprint:
+ msg = _("Warning: could not get PPA signing_key_fingerprint from LP, using anyway")
+ elif not fingerprint in self.fingerprints(key):
+ msg = (_("Fingerprints do not match, not importing: '%s' != '%s'") %
+ (fingerprint, ','.join(self.fingerprints(key))))
+ raise ShortcutException(msg)
+
+ self._signing_key_data = key
+ return self._signing_key_data
+
+ def _set_source_entry(self, line):
+ super(PPAShortcutHandler, self)._set_source_entry(line)
+
+ invalid_comps = set(self.SourceEntry().comps) - set(PPA_VALID_COMPS)
+ if invalid_comps:
+ print(_("Warning: components '%s' not valid for PPA") % ' '.join(invalid_comps))
+
+ def _match_ppa(self, shortcut):
+ (prefix, _, ppa) = shortcut.rpartition(':')
+ if not prefix.lower() == 'ppa':
return False
- return True
- def add_ppa_signing_key(self, ppa_path=None):
- """Query and add the corresponding PPA signing key.
+ (teamname, _, ppaname) = ppa.partition('/')
+ teamname = teamname.lstrip('~')
+ if '/' in ppaname:
+ (ubuntu, _, ppaname) = ppaname.partition('/')
+ if ubuntu.lower() != 'ubuntu':
+ # PPAs only support ubuntu
+ return False
+ if '/' in ppaname:
+ # Path is too long for valid ppa
+ return False
- The signing key fingerprint is obtained from the Launchpad PPA page,
- via a secure channel, so it can be trusted.
- """
- if ppa_path is None:
- ppa_path = self.ppa_path
+ self.teamname = teamname
+ self.ppaname = ppaname or 'ppa'
+ return True
+ def _match_uri(self, shortcut):
try:
- ppa_info = get_ppa_info(mangle_ppa_shortcut(ppa_path))
- except PPAException as e:
- print(e.value)
+ return self._match_handler(URIShortcutHandler(shortcut))
+ except InvalidShortcutException:
return False
+
+ def _match_sourceslist(self, shortcut):
try:
- signing_key_fingerprint = ppa_info["signing_key_fingerprint"]
- except IndexError:
- print("Error: can't find signing_key_fingerprint at %s" % ppa_path)
+ return self._match_handler(SourcesListShortcutHandler(shortcut))
+ except InvalidShortcutException:
return False
-
- # download the armored_key
- armored_key = self._recv_key(ppa_info)
- if not armored_key:
- return False
-
- trustedgpgd = apt_pkg.config.find_dir("Dir::Etc::trustedparts")
- apt_keyring = os.path.join(trustedgpgd, encode(ppa_info["reference"][1:]))
- minimal_key = self._minimize_key(armored_key)
- if not minimal_key:
+ def _match_handler(self, handler):
+ parsed = urlparse(handler.SourceEntry().uri)
+ if not parsed.hostname in PPA_VALID_HOSTNAMES:
return False
-
- if not self._verify_fingerprint(minimal_key, signing_key_fingerprint):
- return False
-
- with open('%s.gpg' % apt_keyring, 'wb') as f:
- f.write(minimal_key)
-
- return True
-
-class AddPPASigningKeyThread(Thread, AddPPASigningKey):
- # This class is legacy. There are no users inside the software-properties
- # codebase other than a test case. It was left in case there were outside
- # users. Internally, we've changed from having a class implement the
- # tread to explicitly launching a thread and invoking a method in it
- # see check_and_add_key_for_whitelisted_shortcut for how.
- def __init__(self, ppa_path, keyserver=None):
- Thread.__init__(self)
- AddPPASigningKey.__init__(self, ppa_path=ppa_path, keyserver=keyserver)
-
- def run(self):
- self.add_ppa_signing_key(self.ppa_path)
+ path = parsed.path.strip().strip('/').split('/')
+ if len(path) < 2:
+ return False
+ self.teamname = path[0]
+ self.ppaname = path[1]
+ self._username = handler.username
+ self._password = handler.password
-def _get_suggested_ppa_message(user, ppa_name):
- try:
- msg = []
- try:
- try:
- lp_user = get_info_from_lp(LAUNCHPAD_USER_API % user)
- except PPAException:
- return _("ERROR: '{user}' user or team does not exist.").format(user=user)
- lp_ppas = get_info_from_lp(LAUNCHPAD_USER_PPAS_API % user)
- entity_name = _("team") if lp_user["is_team"] else _("user")
- if lp_ppas["total_size"] > 0:
- # Translators: %(entity)s is either "team" or "user"
- msg.append(_("The %(entity)s named '%(user)s' has no PPA named '%(ppa)s'") % {
- 'entity' : entity_name,
- 'user' : user,
- 'ppa' : ppa_name})
- msg.append(_("Please choose from the following available PPAs:"))
- for ppa in lp_ppas["entries"]:
- msg.append(_(" * '%(name)s': %(displayname)s") % {
- 'name' : ppa["name"],
- 'displayname' : ppa["displayname"]})
- else:
- # Translators: %(entity)s is either "team" or "user"
- msg.append(_("The %(entity)s named '%(user)s' does not have any PPA") % {
- 'entity' : entity_name, 'user' : user})
- return '\n'.join(msg)
- except KeyError:
- return ''
- except ImportError:
- return _("Please check that the PPA name or format is correct.")
-
-
-def get_ppa_info(shortcut):
- user = shortcut.split("/")[0]
- ppa = "/".join(shortcut.split("/")[1:])
- try:
- ret = get_ppa_info_from_lp(user, ppa)
- ret["distribution"] = ret["distribution_link"].split('/')[-1]
- ret["owner"] = ret["owner_link"].split('/')[-1]
- return ret
- except (HTTPError, Exception):
- msg = []
- msg.append(_("Cannot add PPA: 'ppa:%s/%s'.") % (
- user, ppa))
-
- # If the PPA does not exist, then try to find if the user/team
- # exists. If it exists, list down the PPAs
- raise ShortcutException('\n'.join(msg) + "\n" +
- _get_suggested_ppa_message(user, ppa))
-
- except (ValueError, PPAException):
- raise ShortcutException(
- _("Cannot access PPA (%s) to get PPA information, "
- "please check your internet connection.") % \
- (LAUNCHPAD_PPA_API % (user, ppa)))
-
-
-def get_ppa_signing_key_data(info=None):
- """Return signing key data in armored ascii format for the provided ppa.
-
- If 'info' is a dictionary, it is assumed to be the result
- of 'get_ppa_info(ppa)'. If it is a string, it is assumed to
- be a ppa_path.
-
- Return value is a text string."""
- if isinstance(info, dict):
- link = info["self_link"]
- else:
- link = get_ppa_info(mangle_ppa_shortcut(info))["self_link"]
-
- return get_info_from_https(link + "?ws.op=getSigningKeyData",
- accept_json=True, retry_delays=(1, 2, 3))
-
-
-class PPAShortcutHandler(object):
- def __init__(self, shortcut):
- super(PPAShortcutHandler, self).__init__()
- try:
- self.shortcut = mangle_ppa_shortcut(shortcut)
- except:
- raise ShortcutException(_("ERROR: '{shortcut}' is not a valid ppa format")
- .format(shortcut=shortcut))
- info = get_ppa_info(self.shortcut)
-
- if "private" in info and info["private"]:
- raise ShortcutException(
- _("Adding private PPAs is not supported currently"))
-
- self._info = info
-
- def info(self):
- return self._info
-
- def expand(self, codename, distro=None):
- if (distro is not None
- and distro != self._info["distribution"]
- and not series_valid_for_distro(self._info["distribution"], codename)):
- # The requested PPA is for a foreign distribution. Guess that
- # the user wants that distribution's current series.
- # This only applies if the local distribution is not the same
- # distribution the remote PPA is associated with AND the local
- # codename is not equal to the PPA's series.
- # e.g. local:Foobar/xenial and ppa:Ubuntu/xenial will use 'xenial'
- # local:Foobar/fluffy and ppa:Ubuntu/xenial will use '$latest'
- codename = get_current_series_from_lp(self._info["distribution"])
- debline = "deb http://ppa.launchpad.net/%s/%s/%s %s main" % (
- self._info["owner"][1:], self._info["name"],
- self._info["distribution"], codename)
- sourceslistd = apt_pkg.config.find_dir("Dir::Etc::sourceparts")
- filename = os.path.join(sourceslistd, "%s-%s-%s-%s.list" % (
- encode(self._info["owner"][1:]), encode(self._info["distribution"]),
- encode(self._info["name"]), codename))
- return (debline, filename)
-
- def should_confirm(self):
+ self._set_source_entry(handler.SourceEntry().line)
return True
- def add_key(self, keyserver=None):
- apsk = AddPPASigningKey(self._info["reference"], keyserver=keyserver)
- return apsk.add_ppa_signing_key()
-
+ def _set_auth(self):
+ if self._lp_anon or not self.lpppa.private:
+ return
-def shortcut_handler(shortcut):
- if not shortcut.startswith("ppa:"):
- return None
- return PPAShortcutHandler(shortcut)
+ if self._username and self._password:
+ return
-
-if __name__ == "__main__":
- import sys
- ppa = mangle_ppa_shortcut(sys.argv[1])
- print(get_ppa_info(ppa))
+ for url in self.lp.me.getArchiveSubscriptionURLs():
+ parsed = urlparse(url)
+ if parsed.path.startswith(f'/{self.teamname}/{self.ppaname}/ubuntu'):
+ self._username = parsed.username
+ self._password = parsed.password
+ break
+ else:
+ msg = (_("Could not find PPA subscription for ppa:%s/%s, you may need to request access") %
+ (self.teamname, self.ppaname))
+ raise ShortcutException(msg)
diff --git a/softwareproperties/shortcuthandler.py b/softwareproperties/shortcuthandler.py
new file mode 100644
index 0000000..110691d
--- /dev/null
+++ b/softwareproperties/shortcuthandler.py
@@ -0,0 +1,623 @@
+# Copyright (c) 2019 Canonical Ltd.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+import os
+import re
+import apt_pkg
+import subprocess
+import tempfile
+
+from aptsources.distro import get_distro
+from aptsources.sourceslist import (SourceEntry, SourcesList)
+
+from contextlib import suppress
+
+from copy import copy
+
+from gettext import gettext as _
+
+from urllib.parse import urlparse
+
+
+GPG_KEYRING_CMD = 'gpg -q --no-options --no-default-keyring --batch --keyring %s'
+
+class ShortcutHandler(object):
+ '''Superclass for shortcut handler implementations.
+
+ This provides a way to take a apt repository reference, in various forms,
+ and write the specific apt configuration to local files. This also can
+ remove previously written configuration from local files.
+
+ This class and any subclasses should never modify any main apt configuration
+ files, only specifically named files in '.d' subdirs (e.g. sources.list.d, etc)
+ should be modified. The only exception to that rule is adding or removing
+ sourceslist lines or components of existing source entries.
+ '''
+ def __init__(self, shortcut, components=None, enable_source=False, codename=None, pocket=None, dry_run=False, **kwargs):
+ self.shortcut = shortcut
+ self.components = components or []
+ self.enable_source = enable_source
+ self.distro = get_distro()
+ self.codename = codename or self.distro.codename
+ self.pocket = pocket
+ self.dry_run = dry_run
+
+ # Subclasses should not directly reference _source_entry,
+ # use _set_source_entry() and SourceEntry()
+ self._source_entry = None
+
+ # Subclasses should directly set these fields, if appropriate
+ self._filebase = None
+ self._username = None
+ self._password = None
+
+ @classmethod
+ def is_valid_uri(cls, uri):
+ '''Return if the uri is in valid uri format'''
+ parsed = urlparse(uri)
+ return parsed.scheme and parsed.netloc
+
+ @classmethod
+ def uri_strip_auth(cls, uri):
+ '''Return the uri with the username and password stripped'''
+ parsed = urlparse(uri)
+ # urlparse doesn't have any great way to simply remove the auth data,
+ # so let's just strip everything to the left of '@'
+ return parsed._replace(netloc=parsed.netloc.rpartition('@')[2]).geturl()
+
+ @classmethod
+ def uri_insert_auth(cls, uri, username, password):
+ '''Return the uri with the username and password included'''
+ parsed = urlparse(cls.uri_strip_auth(uri))
+ netloc='%s:%s@%s' % (username, password, parsed.netloc)
+ return parsed._replace(netloc=netloc).geturl()
+
+ @classmethod
+ def fingerprints(cls, keys):
+ '''Return an array of fingerprint(s) for provided key(s).
+
+ The 'keys' parameter should be in text (str) or binary (bytes) format;
+ it is converted to bytes if needed, and then passed to the 'gpg' program.
+ '''
+ cmd = 'gpg -q --no-options --no-keyring --batch --with-colons'
+ # yes, --with-fingerprint twice, to print subkey fingerprints
+ cmd += ' --with-fingerprint' * 2
+ try:
+ with tempfile.TemporaryDirectory() as homedir:
+ cmd += f' --homedir {homedir}'
+ if not isinstance(keys, bytes):
+ keys = keys.encode()
+ stdout = subprocess.run(cmd.split(), check=True, input=keys,
+ stdout=subprocess.PIPE).stdout.decode()
+ except subprocess.CalledProcessError as e:
+ print(_("Warning: gpg error while processing keys:\n%s") % e)
+ return []
+
+ try:
+ # gpg --with-colons fpr field puts fingerprint into (1-based) field 10
+ return [l.split(':')[9] for l in stdout.splitlines() if l.startswith('fpr')]
+ except KeyError:
+ print(_("Warning: invalid gpg output:\n%s") % stdout)
+ return []
+
+ @property
+ def description(self):
+ return (_("Archive for codename: %s components: %s" %
+ (self.SourceEntry().dist,
+ ','.join(self.SourceEntry().comps))))
+
+ @property
+ def web_link(self):
+ return self.archive_link
+
+ @property
+ def archive_link(self):
+ return self.SourceEntry().uri
+
+ @property
+ def dist(self):
+ if self.pocket:
+ return '%s-%s' % (self.codename, self.pocket)
+ return self.codename
+
+ @property
+ def binary_type(self):
+ '''Text indicating a binary-type SourceEntry.'''
+ return self.distro.binary_type
+
+ @property
+ def source_type(self):
+ '''Text indicating a source-type SourceEntry.'''
+ return self.distro.source_type
+
+ def SourceEntry(self, pkgtype=None):
+ '''Get the SourceEntry representing this archive/shortcut.
+
+ This should never include any authentication data; if required,
+ the username and password should only be available from the
+ username and password properties, as well as from the
+ netrcparts_content property.
+
+ If pkgtype is provided, it must be either binary_type or source_type,
+ in which case this returns a SourceEntry with the requested type.
+ If pkgtype is not specified, this returns a SourceEntry with an
+ implementation-dependent type (in most cases, implementations should
+ default to binary_type).
+
+ Note that the default SourceEntry will be returned without modification,
+ and the implementation will determine if it is enabled or disabled;
+ while the source-type SourceEntry will be enabled or disabled based on
+ self.enable_source. The binary-type SourceEntry will always be enabled.
+
+ The SourceEntry 'file' field should always be set to the value of
+ sourceparts_file.
+ '''
+ if not self._source_entry:
+ raise NotImplementedError('Implementation class did not set self._source_entry')
+ e = copy(self._source_entry)
+ if not pkgtype:
+ return e
+ if pkgtype == self.binary_type:
+ e.set_enabled(True)
+ e.type = self.binary_type
+ elif pkgtype == self.source_type:
+ e.set_enabled(self.enable_source)
+ e.type = self.source_type
+ else:
+ raise ValueError('Invalid pkgtype: %s' % pkgtype)
+ return SourceEntry(str(e), file=e.file)
+
+ @property
+ def username(self):
+ '''Return the username used for authentication
+
+ If authentication is used, return the username; otherwise return None.
+
+ By default, this returns the private variable self._username, which
+ defaults to None. Subclasses should override this method and/or
+ set self._username if they have authentication data.
+ '''
+ return self._username
+
+ @property
+ def password(self):
+ '''Return the password used for authentication
+
+ If authentication is used, return the password; otherwise return None.
+
+ By default, this returns the private variable self._password, which
+ defaults to None. Subclasses should override this method and/or
+ set self._password if they have authentication data.
+ '''
+ return self._password
+
+ def add(self):
+ '''Save all data for this shortcut to file(s).
+
+ This writes everything to the relevant files. By default, it
+ calls add_source(), add_key(), and add_login(). Subclasses
+ should override it if other actions are required.
+ '''
+ self.add_source()
+ self.add_key()
+ self.add_login()
+
+ def remove(self):
+ '''Remove all data for this shortcut from file(s).
+
+ This removes everything from the relevant files. By default, it
+ only calls remove_source() and remove_login(). Subclasses
+ should override it if other actions are required. Note that by
+ default is does not call remove_key().
+ '''
+ self.remove_source()
+ self.remove_login()
+
+ def add_source(self):
+ '''Add the apt SourceEntries.
+
+ This uses SourcesList to add the binary-type and source-type
+ SourceEntries.
+
+ If the SourceEntry matches a known apt template, this will ignore
+ the sourceparts_file and instead place the SourceEntries into
+ the main/default sources.list file. Otherwise, this will add
+ the SourceEntries into the sourceparts_file.
+
+ If either the binary-type or source-type entry exist in the current
+ SourcesList, the existing entries are updated instead of placing
+ the entries in the sourceparts_file.
+ '''
+ binentry = self.SourceEntry(self.binary_type)
+ srcentry = self.SourceEntry(self.source_type)
+ mode = self.sourceparts_mode
+
+ sourceslist = SourcesList()
+
+ count = len(sourceslist.list)
+ newentry = sourceslist.add_entry(binentry)
+ if count == len(sourceslist.list):
+ print(_("Found existing %s entry in %s") % (newentry.type, newentry.file))
+ if binentry.file != newentry.file:
+ # existing binentry, but not in file we were expecting, just update it
+ print(_("Updating existing entry instead of using %s") % binentry.file)
+ elif newentry.template:
+ # our SourceEntry matches a template; use default sources.list file
+ newentry.file = SourceEntry('').file
+ print(_("Archive has template, updating %s") % newentry.file)
+ elif binentry.disabled:
+ print(_("Adding disabled %s entry to %s") % (newentry.type, newentry.file))
+ else:
+ print(_("Adding %s entry to %s") % (newentry.type, newentry.file))
+ newentry.set_enabled(not binentry.disabled)
+ binentry = newentry
+
+ # Unless it already exists somewhere, add the srcentry right after the binentry
+ srcentry.file = binentry.file
+ count = len(sourceslist.list)
+ newentry = sourceslist.add_entry(srcentry, after=binentry)
+ if count == len(sourceslist.list):
+ print(_("Found existing %s entry in %s") % (newentry.type, newentry.file))
+ if srcentry.file != newentry.file:
+ # existing srcentry, but not in file we were expecting, just update it
+ print(_("Updating existing entry instead of using %s") % srcentry.file)
+ elif srcentry.disabled:
+ print(_("Adding disabled %s entry to %s") % (newentry.type, newentry.file))
+ else:
+ print(_("Adding %s entry to %s") % (newentry.type, newentry.file))
+ newentry.set_enabled(not srcentry.disabled)
+ srcentry = newentry
+
+ if not self.dry_run:
+ # If the file doesn't exist, create it so we can set the mode
+ for entryfile in set([binentry.file, srcentry.file]):
+ if not os.path.exists(entryfile):
+ with open(entryfile, 'w'):
+ os.chmod(entryfile, mode)
+ sourceslist.save()
+
+ def remove_source(self):
+ '''Remove the apt SourceEntries.
+
+ This uses SourcesList to remove the binary-type and source-type
+ SourceEntries.
+
+ This must disable the corresponding SourceEntries, from whatever file(s)
+ they are located in. This must not disable more than matches, e.g.
+ if the existing SourceEntry line contains more components this must
+ edit the existing line to remove this SourceEntry's component(s).
+
+ After disabling all matching SourceEntries, if the sourceparts_file is
+ empty or contains only invalid and/or disabled SourceEntries, this
+ may remove the sourceparts_file.
+ '''
+ sourceslist = SourcesList()
+
+ binentry = self.SourceEntry(self.binary_type)
+ srcentry = self.SourceEntry(self.source_type)
+ binentry.set_enabled(False)
+ srcentry.set_enabled(False)
+
+ # first, disable our entries
+ print(_("Disabling %s entry in %s") % (binentry.type, binentry.file))
+ sourceslist.add_entry(binentry)
+ print(_("Disabling %s entry in %s") % (srcentry.type, srcentry.file))
+ sourceslist.add_entry(srcentry)
+
+ file_entries = [s for s in sourceslist if s.file == self.sourceparts_file]
+ if not [e for e in file_entries if not e.invalid and not e.disabled]:
+ # no more valid/enabled entries in our file, remove them
+ for e in file_entries:
+ if not e.invalid:
+ print(_("Removing disabled %s entry from %s") % (e.type, e.file))
+ sourceslist.remove(e)
+
+ if not self.dry_run:
+ sourceslist.save(remove=True)
+
+ @property
+ def sourceparts_path(self):
+ '''Return result of apt_pkg.config.find_dir("Dir::Etc::sourceparts")'''
+ return apt_pkg.config.find_dir("Dir::Etc::sourceparts")
+
+ @property
+ def sourceparts_filename(self):
+ '''Get the sources.list.d filename, without the leading path.
+
+ By default, this combines the filebase with the codename, and uses a
+ extension of 'list'. This is different than the trustedparts or
+ netrcparts filenames, which use only the filebase plus extension.
+ '''
+ return self._filebase_to_filename('list', suffix=self.codename)
+
+ @property
+ def sourceparts_file(self):
+ '''Get the sources.list.d absolute-path filename.
+
+ Note that the add_source() function will not use this file if this shortcut's
+ SourceEntry matches a known apt template; instead the entries will be placed
+ in the main sources.list file. Also, if the SourceEntry already exists in
+ the SourcesList, it will be edited in place, instead of using this file.
+ See add_source() for more details.
+ '''
+ return self._filename_to_file(self.sourceparts_path, self.sourceparts_filename)
+
+ @property
+ def sourceparts_mode(self):
+ '''Mode of sourceparts file.
+
+ Note that add_source() will only use this mode if it creates a new file
+ for sourceparts_file; if the file already exists or if the SourceEntry is
+ saved in a different file, this mode is not used.
+ '''
+ return 0o644
+
+ def add_key(self):
+ '''Add the GPG key(s) corresponding to this repo.
+
+ By default, if self.trustedparts_content contains content,
+ and self.trustedparts_file points to a file, the key(s) will
+ be added to the file.
+
+ If the file does not yet exist, and self.trustedparts_mode is set,
+ the file will be created with that mode.
+ '''
+ if not all((self.trustedparts_file, self.trustedparts_content)):
+ return
+
+ dest = self.trustedparts_file
+ keys = self.trustedparts_content
+ if not isinstance(keys, bytes):
+ keys = keys.encode()
+ fp = self.fingerprints(keys)
+
+ print(_("Adding key to %s with fingerprint %s") % (dest, ','.join(fp)))
+
+ cmd = GPG_KEYRING_CMD % dest
+ action = "--import"
+ if not self.dry_run:
+ if not os.path.exists(dest) and self.trustedparts_mode:
+ with open(dest, 'wb'):
+ os.chmod(dest, self.trustedparts_mode)
+ try:
+ with tempfile.TemporaryDirectory() as homedir:
+ cmd += f" --homedir {homedir} {action}"
+ subprocess.run(cmd.split(), check=True, input=keys)
+ except subprocess.CalledProcessError as e:
+ raise ShortcutException(e)
+
+ def remove_key(self):
+ '''Remove the GPG key(s) corresponding to this repo.
+
+ By default, if self.trustedparts_content contains content,
+ and self.trustedparts_file points to a file, the key(s) will
+ be removed from the file.
+
+ If the file contains no more keys after removal, the file will
+ be removed.
+
+ This does not consider other files; multiple repositories may
+ use the same signing key. This only modifies/removes
+ self.trustedparts_file.
+ '''
+ if not all((self.trustedparts_file, self.trustedparts_content)):
+ return
+
+ dest = self.trustedparts_file
+ fp = self.fingerprints(self.trustedparts_content)
+
+ if not os.path.exists(dest):
+ return
+
+ print(_("Removing key from %s with fingerprint %s") % (dest, ','.join(fp)))
+
+ cmd = GPG_KEYRING_CMD % dest
+ action = "--delete-keys %s" % ' '.join(fp)
+ if not self.dry_run:
+ try:
+ with tempfile.TemporaryDirectory() as homedir:
+ cmd += f" --homedir {homedir} {action}"
+ subprocess.run(cmd.split(), check=True)
+ except subprocess.CalledProcessError as e:
+ raise ShortcutException(e)
+
+ with open(dest, 'rb') as f:
+ empty = not self.fingerprints(f.read())
+ if empty:
+ os.remove(dest)
+
+ @property
+ def trustedparts_path(self):
+ '''Return result of apt_pkg.config.find_dir("Dir::Etc::trustedparts")'''
+ return apt_pkg.config.find_dir("Dir::Etc::trustedparts")
+
+ @property
+ def trustedparts_filename(self):
+ '''Get the trusted.gpg.d filename, without the leading path.'''
+ return self._filebase_to_filename('gpg')
+
+ @property
+ def trustedparts_file(self):
+ '''Get the trusted.gpg.d absolute-path filename.'''
+ return self._filename_to_file(self.trustedparts_path, self.trustedparts_filename)
+
+ @property
+ def trustedparts_content(self):
+ '''Content to put into trusted.gpg.d file'''
+ return None
+
+ @property
+ def trustedparts_mode(self):
+ '''Mode of trustedparts file'''
+ return 0o644
+
+ def add_login(self):
+ '''Add the login credentials corresponding to this repo.
+
+ By default, if self.netrcparts_content contains content,
+ and self.netrcparts_file points to a file, the file will be
+ created and content placed into it.
+ '''
+ if not all((self.netrcparts_file, self.netrcparts_content)):
+ return
+
+ dest = self.netrcparts_file
+ content = self.netrcparts_content
+
+ newfile = not os.path.exists(dest)
+ finalchar = '\n'
+ if not newfile:
+ with open(dest, 'r') as f:
+ lines = [l.strip() for l in f.readlines()]
+ with suppress(KeyError):
+ finalchar = lines[-1][-1]
+ if all([l.strip() in lines for l in content.splitlines()]):
+ print(_("Authentication data already in %s") % dest)
+ return
+
+ print(_("Adding authentication data to %s") % dest)
+ if not self.dry_run:
+ if newfile and self.netrcparts_mode:
+ with open(dest, 'w'):
+ os.chmod(dest, self.netrcparts_mode)
+ with open(dest, 'a') as f:
+ # we're appending; if the file doesn't end in \n, throw one in
+ if finalchar != '\n':
+ f.write('\n')
+ f.write(self.netrcparts_content)
+
+ def remove_login(self):
+ '''Remove the login credentials corresponding to this repo.
+
+ By default, if self.netrcparts_content contains content,
+ and self.netrcparts_file points to a file, the content will
+ be removed from the file.
+
+ If the file is empty (other than whitespace) after removal, the file
+ will be removed.
+
+ This does not consider other files; this only modifies/removes
+ self.netrcparts_file.
+ '''
+ if not all((self.netrcparts_file, self.netrcparts_content)):
+ return
+
+ dest = self.netrcparts_file
+ content = set([l.strip() for l in self.netrcparts_content.splitlines()])
+
+ if not os.path.exists(dest):
+ return
+
+ with open(dest, 'r') as f:
+ filecontent = set([l.strip() for l in f.readlines()])
+ if not filecontent & content:
+ print(_("Authentication data not contained in %s") % dest)
+ else:
+ print(_("Removing authentication data from %s") % dest)
+ if not self.dry_run:
+ with open(dest, 'w') as f:
+ f.write('\n'.join(filecontent - content))
+
+ if not self.dry_run:
+ with open(dest, 'r') as f:
+ empty = not f.read().strip()
+ if empty:
+ os.remove(dest)
+
+ @property
+ def netrcparts_path(self):
+ '''Return result of apt_pkg.config.find_dir("Dir::Etc::netrcparts")'''
+ return apt_pkg.config.find_dir("Dir::Etc::netrcparts")
+
+ @property
+ def netrcparts_filename(self):
+ '''Get the auth.conf.d filename, without the leading path.'''
+ return self._filebase_to_filename('conf')
+
+ @property
+ def netrcparts_file(self):
+ '''Get the auth.conf.d absolute-path filename.'''
+ return self._filename_to_file(self.netrcparts_path, self.netrcparts_filename)
+
+ @property
+ def netrcparts_content(self):
+ '''Content to put into auth.conf.d file
+
+ By default, if both username and password are set, this will return a proper
+ netrc-formatted line with the authentication information, including the
+ hostname and path.
+ '''
+ if not all((self.username, self.password)):
+ return None
+
+ hostname = urlparse(self.SourceEntry().uri).hostname
+ path = urlparse(self.SourceEntry().uri).path
+ return f'machine {hostname}{path} login {self.username} password {self.password}'
+
+ @property
+ def netrcparts_mode(self):
+ '''Mode of netrcparts file'''
+ return 0o600
+
+ def _set_source_entry(self, line):
+ '''Set the SourceEntry.
+
+ This should be called from subclasses to set the SourceEntry.
+ The SourceEntry file will be set to the sourceparts_file value.
+
+ The self.components, if any, will be added to the line's component(s).
+ '''
+ e = SourceEntry(line)
+ e.comps = list(set(e.comps) | set(self.components))
+ self._source_entry = SourceEntry(str(e), file=self.sourceparts_file)
+
+ def _encode_filebase(self, suffix=None):
+ base = self._filebase
+ if not base:
+ return None
+ if suffix:
+ base += '-%s' % suffix
+ return re.sub("[^a-z0-9_-]+", "_", base.lower())
+
+ def _filebase_to_filename(self, ext, suffix=None):
+ base = self._encode_filebase(suffix=suffix)
+ if not base:
+ return None
+ return '%s.%s' % (base, ext)
+
+ def _filename_to_file(self, path, name):
+ if not name:
+ return None
+ return os.path.join(path, name)
+
+
+class ShortcutException(Exception):
+ '''General Exception during shortcut processing.'''
+ pass
+
+
+class InvalidShortcutException(ShortcutException):
+ '''Invalid shortcut.
+
+ This should only be thrown from the constructor of a ShortcutHandler
+ subclass, and only to indicate that the provided shortcut is invalid
+ for that ShortcutHandler class.
+ '''
+ pass
+
+
+# vi: ts=4 expandtab
diff --git a/softwareproperties/shortcuts.py b/softwareproperties/shortcuts.py
index c5f246c..f49321d 100644
--- a/softwareproperties/shortcuts.py
+++ b/softwareproperties/shortcuts.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2013 Canonical Ltd.
+# Copyright (c) 2013-2019 Canonical Ltd.
#
# Author: Scott Moser <smoser@xxxxxxxxxx>
#
@@ -17,41 +17,31 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
-import aptsources.distro
from gettext import gettext as _
-_DEF_CODENAME = aptsources.distro.get_distro().codename
+from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
+from softwareproperties.ppa import PPAShortcutHandler
+from softwareproperties.shortcuthandler import InvalidShortcutException
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+from softwareproperties.uri import URIShortcutHandler
-class ShortcutHandler(object):
- # the defeault ShortcutHandler only handles actual apt lines.
- # ie, 'shortcut' here is a line like you'd find in /etc/apt/sources.list:
- # deb MIRROR RELEASE-POCKET COMPONENT
- def __init__(self, shortcut):
- self.shortcut = shortcut
+SHORTCUT_HANDLERS = [
+ PPAShortcutHandler,
+ CloudArchiveShortcutHandler,
+ SourcesListShortcutHandler,
+ URIShortcutHandler,
+]
- def add_key(self, keyserver=None):
- return True
- def expand(self, codename=None, distro=None):
- return (self.shortcut, None)
+def shortcut_handler(shortcut, **kwargs):
+ for handler in SHORTCUT_HANDLERS:
+ try:
+ return handler(shortcut, **kwargs)
+ except InvalidShortcutException:
+ pass
- def info(self):
- return {
- 'description': _("No description available for '%(shortcut)s'") %
- {'shortcut': self.shortcut},
- 'web_link': _("web link unavailable")}
+ raise InvalidShortcutException(_("Unable to handle input '%s'") % shortcut)
- def should_confirm(self):
- return False
-
-
-class ShortcutException(Exception):
- pass
-
-
-def shortcut_handler(shortcut):
- # this is the default shortcut handler, so it matches anything
- return ShortcutHandler(shortcut)
# vi: ts=4 expandtab
diff --git a/softwareproperties/sourceslist.py b/softwareproperties/sourceslist.py
new file mode 100644
index 0000000..407dc94
--- /dev/null
+++ b/softwareproperties/sourceslist.py
@@ -0,0 +1,56 @@
+# Copyright (c) 2019 Canonical Ltd.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+from gettext import gettext as _
+
+from aptsources.sourceslist import SourceEntry
+
+from softwareproperties.shortcuthandler import (ShortcutHandler, InvalidShortcutException)
+
+from urllib.parse import urlparse
+
+
+SOURCESLIST_FILE_PREFIX = "archive_uri"
+
+class SourcesListShortcutHandler(ShortcutHandler):
+ def __init__(self, shortcut, ignore_line_comps=False, **kwargs):
+ super(SourcesListShortcutHandler, self).__init__(shortcut, **kwargs)
+
+ entry = SourceEntry(shortcut)
+ if entry.invalid:
+ raise InvalidShortcutException(_("Invalid sources.list line: '%s'") % shortcut)
+
+ uri = entry.uri
+ if not self.is_valid_uri(uri):
+ raise InvalidShortcutException(_("Invalid URI: '%s'") % uri)
+
+ # ignore_line_comps is used by URIShortcutHandler when no comps are provided
+ if not ignore_line_comps:
+ self.components = list(set(self.components) | set(entry.comps))
+
+ parsed = urlparse(uri)
+
+ self._username = parsed.username
+ self._password = parsed.password
+
+ entry.uri = self.uri_strip_auth(entry.uri)
+ # must set _filebase first; _set_source_entry uses it to set entry.file
+ self._filebase = f"{SOURCESLIST_FILE_PREFIX}-{entry.uri}"
+ self._set_source_entry(str(entry))
+
+
+# vi: ts=4 expandtab
diff --git a/softwareproperties/uri.py b/softwareproperties/uri.py
new file mode 100644
index 0000000..e02a9c3
--- /dev/null
+++ b/softwareproperties/uri.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2019 Canonical Ltd.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+from aptsources.distro import get_distro
+
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+
+
+class URIShortcutHandler(SourcesListShortcutHandler):
+ def __init__(self, shortcut, **kwargs):
+ (uri, _, comps) = shortcut.strip().partition(' ')
+
+ # can't use self.codename here, as we haven't called superclass constructor yet
+ distro = get_distro()
+ codename = kwargs.get('codename', distro.codename)
+ pocket = kwargs.get('pocket')
+ dist = codename
+ if pocket:
+ dist = '%s-%s' % (dist, pocket)
+
+ line = ('%s %s %s %s' % (distro.binary_type, uri, dist, comps or 'main'))
+
+ super(URIShortcutHandler, self).__init__(line, ignore_line_comps=not comps, **kwargs)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/aptroot/etc/apt/apt.conf.d/.keep b/tests/aptroot/etc/apt/apt.conf.d/.keep
deleted file mode 100644
index e69de29..0000000
--- a/tests/aptroot/etc/apt/apt.conf.d/.keep
+++ /dev/null
diff --git a/tests/test_aptsources.py b/tests/test_aptsources.py
index 83f28a1..13043ff 100755
--- a/tests/test_aptsources.py
+++ b/tests/test_aptsources.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
from __future__ import print_function
diff --git a/tests/test_dbus.py b/tests/test_dbus.py
index 2505c1e..08c8890 100755
--- a/tests/test_dbus.py
+++ b/tests/test_dbus.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
from __future__ import print_function
@@ -7,6 +7,7 @@ from gi.repository import GLib, Gio
import apt_pkg
import aptsources.distro
+import aptsources.sourceslist
import dbus
import logging
@@ -62,9 +63,8 @@ def clear_apt_config():
if os.path.isfile(path):
os.unlink(path)
- if not os.path.exists(os.path.join(etc_apt, "apt.conf.d")):
- os.mkdir(os.path.join(etc_apt, "apt.conf.d"))
-
+ for d in ["apt.conf.d", "sources.list.d", "trusted.gpg.d", "auth.conf.d"]:
+ os.makedirs(os.path.join(etc_apt, d), exist_ok=True)
def create_sources_list():
s = get_test_source_line() + "\n"
@@ -147,7 +147,8 @@ class TestDBus(unittest.TestCase):
# keep track of signal emissions
self.sources_list_count = 0
self.distro_release = get_distro_release()
- self.sources_list_path = create_sources_list()
+ create_sources_list()
+ self._sourceslist = aptsources.sourceslist.SourcesList()
# create the client proxy
bus = dbus.SessionBus(private=True, mainloop=DBusGMainLoop())
proxy = bus.get_object("com.ubuntu.SoftwareProperties", "/")
@@ -164,10 +165,19 @@ class TestDBus(unittest.TestCase):
#print("_on_modified_sources_list")
self.sources_list_count += 1
+ @property
+ def sourceslist(self):
+ self._sourceslist.refresh()
+ return ''.join([str(e) for e in self._sourceslist])
+
+ @property
+ def enabled_sourceslist(self):
+ self._sourceslist.refresh()
+ return ''.join([str(e) for e in self._sourceslist
+ if not e.invalid and not e.disabled])
+
def _debug_sourceslist(self, text=""):
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- logging.debug("sourceslist: %s '%s'" % (text, sourceslist))
+ logging.debug("sourceslist: %s '%s'" % (text, self.sourceslist))
# this is an async call - give it a few seconds to catch up with what we expect
def _assert_eventually(self, prop, n):
@@ -181,62 +191,44 @@ class TestDBus(unittest.TestCase):
def test_enable_disable_component(self):
# ensure its not there
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse("universe" in sourceslist)
+ self.assertNotIn("universe", self.sourceslist)
# enable
self.iface.EnableComponent("universe")
self._debug_sourceslist("2")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertTrue("universe" in sourceslist)
+ self.assertIn("universe", self.sourceslist)
# disable again
self.iface.DisableComponent("universe")
self._debug_sourceslist("3")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse("universe" in sourceslist)
+ self.assertNotIn("universe", self.sourceslist)
self._assert_eventually("sources_list_count", 2)
def test_enable_enable_disable_source_code_sources(self):
# ensure its not there
self._debug_sourceslist("4")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse("deb-src" in sourceslist)
+ self.assertNotIn('deb-src', self.enabled_sourceslist)
# enable
self.iface.EnableSourceCodeSources()
self._debug_sourceslist("5")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertTrue("deb-src" in sourceslist)
+ self.assertIn('deb-src', self.enabled_sourceslist)
# disable again
self.iface.DisableSourceCodeSources()
self._debug_sourceslist("6")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse("deb-src" in sourceslist)
+ self.assertNotIn('deb-src', self.enabled_sourceslist)
self._assert_eventually("sources_list_count", 3)
def test_enable_child_source(self):
child_source = "%s-updates" % self.distro_release
# ensure its not there
self._debug_sourceslist("7")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse(child_source in sourceslist)
+ self.assertNotIn(child_source, self.sourceslist)
# enable
self.iface.EnableChildSource(child_source)
self._debug_sourceslist("8")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertTrue(child_source in sourceslist)
+ self.assertIn(child_source, self.sourceslist)
# disable again
self.iface.DisableChildSource(child_source)
self._debug_sourceslist("9")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse(child_source in sourceslist)
+ self.assertNotIn(child_source, self.sourceslist)
self._assert_eventually("sources_list_count", 2)
def test_toggle_source(self):
@@ -244,17 +236,13 @@ class TestDBus(unittest.TestCase):
source = get_test_source_line()
self.iface.ToggleSourceUse(source)
self._debug_sourceslist("10")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
primary_debline = "# deb %s" % PRIMARY_MIRROR
- self.assertTrue(primary_debline in sourceslist)
+ self.assertIn(primary_debline, self.sourceslist)
# to disable the line again, we need to match the new "#"
source = "# " + source
self.iface.ToggleSourceUse(source)
self._debug_sourceslist("11")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse(primary_debline in sourceslist)
+ self.assertNotIn(primary_debline, self.sourceslist)
self._assert_eventually("sources_list_count", 2)
@@ -264,10 +252,8 @@ class TestDBus(unittest.TestCase):
source_new = "deb http://xxx/ %s" % self.distro_release
self.iface.ReplaceSourceEntry(source, source_new)
self._debug_sourceslist("11")
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertTrue(source_new in sourceslist)
- self.assertFalse(source in sourceslist)
+ self.assertIn(source_new, self.sourceslist)
+ self.assertNotIn(source, self.sourceslist)
self._assert_eventually("sources_list_count", 1)
self.iface.ReplaceSourceEntry(source_new, source)
self._assert_eventually("sources_list_count", 2)
@@ -278,19 +264,19 @@ class TestDBus(unittest.TestCase):
"aptroot", "etc", "popularity-contest.conf")
with open(popcon_p) as f:
popcon = f.read()
- self.assertTrue('PARTICIPATE="no"' in popcon)
+ self.assertIn('PARTICIPATE="no"', popcon)
# toggle
self.iface.SetPopconPariticipation(True)
with open(popcon_p) as f:
popcon = f.read()
- self.assertTrue('PARTICIPATE="yes"' in popcon)
- self.assertFalse('PARTICIPATE="no"' in popcon)
+ self.assertIn('PARTICIPATE="yes"', popcon)
+ self.assertNotIn('PARTICIPATE="no"', popcon)
# and back
self.iface.SetPopconPariticipation(False)
with open(popcon_p) as f:
popcon = f.read()
- self.assertFalse('PARTICIPATE="yes"' in popcon)
- self.assertTrue('PARTICIPATE="no"' in popcon)
+ self.assertNotIn('PARTICIPATE="yes"', popcon)
+ self.assertIn('PARTICIPATE="no"', popcon)
def test_updates_automation(self):
states = [UPDATE_INST_SEC, UPDATE_DOWNLOAD, UPDATE_NOTIFY]
@@ -301,19 +287,19 @@ class TestDBus(unittest.TestCase):
"10periodic")
with open(cfg) as f:
config = f.read()
- self.assertTrue('APT::Periodic::Unattended-Upgrade "1";' in config)
+ self.assertIn('APT::Periodic::Unattended-Upgrade "1";', config)
# download
self.iface.SetUpdateAutomationLevel(states[1])
with open(cfg) as f:
config = f.read()
- self.assertTrue('APT::Periodic::Unattended-Upgrade "0";' in config)
- self.assertTrue('APT::Periodic::Download-Upgradeable-Packages "1";' in config)
+ self.assertIn('APT::Periodic::Unattended-Upgrade "0";', config)
+ self.assertIn('APT::Periodic::Download-Upgradeable-Packages "1";', config)
# notify
self.iface.SetUpdateAutomationLevel(states[2])
with open(cfg) as f:
config = f.read()
- self.assertTrue('APT::Periodic::Unattended-Upgrade "0";' in config)
- self.assertTrue('APT::Periodic::Download-Upgradeable-Packages "0";' in config)
+ self.assertIn('APT::Periodic::Unattended-Upgrade "0";', config)
+ self.assertIn('APT::Periodic::Download-Upgradeable-Packages "0";', config)
def test_updates_interval(self):
# interval
@@ -329,30 +315,26 @@ class TestDBus(unittest.TestCase):
self.iface.SetUpdateInterval(1)
with open(cfg) as f:
config = f.read()
- self.assertTrue('APT::Periodic::Update-Package-Lists "1";' in config)
+ self.assertIn('APT::Periodic::Update-Package-Lists "1";', config)
self.iface.SetUpdateInterval(0)
with open(cfg) as f:
config = f.read()
- self.assertTrue('APT::Periodic::Update-Package-Lists "0";' in config)
+ self.assertIn('APT::Periodic::Update-Package-Lists "0";', config)
def test_add_remove_source_by_line(self):
# add invalid
res = self.iface.AddSourceFromLine("xxx")
self.assertFalse(res)
# add real
- s = "deb http//ppa.launchpad.net/ foo bar"
+ s = "deb http://ppa.launchpad.net/ foo bar"
self.iface.AddSourceFromLine(s)
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertTrue(s in sourceslist)
- self.assertTrue(s.replace("deb", "# deb-src") in sourceslist)
+ self.assertIn(s, self.sourceslist)
+ self.assertIn(s.replace("deb", "# deb-src"), self.sourceslist)
# remove again
self.iface.RemoveSource(s)
self.iface.RemoveSource(s.replace("deb", "deb-src"))
- with open(self.sources_list_path) as f:
- sourceslist = f.read()
- self.assertFalse(s in sourceslist)
- self.assertFalse(s.replace("deb", "# deb-src") in sourceslist)
+ self.assertNotIn(s, self.sourceslist)
+ self.assertNotIn(s.replace("deb", "# deb-src"), self.sourceslist)
self._assert_eventually("sources_list_count", 4)
def test_add_gpg_key(self):
diff --git a/tests/test_lp.py b/tests/test_lp.py
deleted file mode 100755
index a732ab0..0000000
--- a/tests/test_lp.py
+++ /dev/null
@@ -1,167 +0,0 @@
-#!/usr/bin/python
-
-import apt_pkg
-
-import os
-import unittest
-import sys
-sys.path.insert(0, "..")
-
-from mock import patch
-
-import softwareproperties.ppa
-from softwareproperties.ppa import (
- AddPPASigningKeyThread,
- mangle_ppa_shortcut,
- verify_keyid_is_v4,
- )
-
-
-MOCK_PPA_INFO={
- "displayname": "PPA for Michael Vogt",
- "web_link": "https://launchpad.net/~mvo/+archive/ppa",
- "signing_key_fingerprint": "019A25FED88F961763935D7F129196470EB12F05",
- "name": "ppa",
- 'distribution_link': 'https://launchpad.net/api/1.0/ubuntu',
- 'owner_link': 'https://launchpad.net/api/1.0/~mvo',
- 'reference': '~mvo/ubuntu/ppa',
- 'self_link': 'https://launchpad.net/api/devel/~mvo/+archive/ubuntu/ppa',
- }
-
-MOCK_KEY="""
------BEGIN PGP PUBLIC KEY BLOCK-----
-Version: SKS 1.1.6
-Comment: Hostname: keyserver.ubuntu.com
-
-mI0ESXP67wEEAN2m3xWkAP0p1erHbJx1wYBCL6tLqWXESx1BmF0htLzdD9lfsUYiNs+Zgg3w
-uU0PrQIcqZtyTESh514tw3KQ+OAK2I0a2XJR99lXPksiKoxaOOsr0pTVWDYuIlfV3yfmXvnK
-FZSmaMjjKuqQbCwZe8Ev7yry9Gh9pM5Y87MbNT05ABEBAAG0HkxhdW5jaHBhZCBQUEEgZm9y
-IE1pY2hhZWwgVm9ndIi2BBMBAgAgBQJJc/rvAhsDBgsJCAcDAgQVAggDBBYCAwECHgECF4AA
-CgkQEpGWRw6xLwVofAP/YyU3YykXbr8p7wRp1EpFlDmtbPlFXp00gt4Cqlu2AWVOkwkVoMRQ
-Ncb7wog2Z6u7KyUhD8pgC2FEL0+FQjyNemv7D0OYBG+6DLdjtRsv0CumLdWFmviU96j3OcwT
-G2GkIC/eB2maTrV/vj7vlZ0Qe/T1NL6XLpr0A6Rg6JAtkFM=
-=SMbJ
------END PGP PUBLIC KEY BLOCK-----
-"""
-
-MOCK_SECOND_KEY="""
------BEGIN PGP PUBLIC KEY BLOCK-----
-Version: SKS 1.1.6
-Comment: Hostname: keyserver.ubuntu.com
-
-mI0ESX34EgEEAOTzplZO3TXmb9dRLu7kOuIEia21e4gwQ/RQe+LD7HdhikcETjf2Ruu0mn6S
-sgPLL+duhKxmv6ZciLUgkk0qEDCZuR6BPxdgAIwqmQmFipcv6UTMQitRPUa9WlPU37Qg+joL
-cTBUdamnVq+yJhLmnuO44UWAty85nNJzDd29gxqXABEBAAG0LUxhdW5jaHBhZCBQUEEgZm9y
-INCU0LzQuNGC0YDQuNC5INCb0LXQtNC60L7Qsoi2BBMBAgAgBQJJffgSAhsDBgsJCAcDAgQV
-AggDBBYCAwECHgECF4AACgkQFXlR/kAx0oeuSwQAuhhgWgeeG3F9XMYDqgJShzMSeQOLMKBq
-6mNFEL1sDhRdbinf7rwuQFXDSSNCj8/PLa3DF/u09tAm6CTi10iwxxbXf16pTq21gxCA3/xS
-fszv352yZpcN85MD5aozqv7qUCGOQ9Gey7JzgD7L4wMEjyRScVjx1chfLgyapdj822E=
-=pdql
------END PGP PUBLIC KEY BLOCK-----
-"""
-
-class LaunchpadPPATestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- for k in apt_pkg.config.keys():
- apt_pkg.config.clear(k)
- apt_pkg.init()
-
- @unittest.skipUnless(
- "TEST_ONLINE" in os.environ,
- "skipping online tests unless TEST_ONLINE environment variable is set")
- @unittest.skipUnless(
- sys.version_info[0] > 2,
- "pycurl doesn't raise SSL exceptions anymore it seems")
- def test_ppa_info_from_lp(self):
- # use correct data
- info = softwareproperties.ppa.get_ppa_info_from_lp("mvo", "ppa")
- self.assertNotEqual(info, {})
- self.assertEqual(info["name"], "ppa")
- # use empty CERT file
- softwareproperties.ppa.LAUNCHPAD_PPA_CERT = "/dev/null"
- with self.assertRaises(Exception):
- softwareproperties.ppa.get_ppa_info_from_lp("mvo", "ppa")
-
- def test_mangle_ppa_shortcut(self):
- self.assertEqual("~mvo/ubuntu/ppa", mangle_ppa_shortcut("ppa:mvo"))
- self.assertEqual(
- "~mvo/ubuntu/compiz", mangle_ppa_shortcut("ppa:mvo/compiz"))
- self.assertEqual(
- "~mvo/ubuntu-rtm/compiz",
- mangle_ppa_shortcut("ppa:mvo/ubuntu-rtm/compiz"))
-
- def test_mangle_ppa_shortcut_leading_slash(self):
- # Test for LP: #1426933
- self.assertEqual("~gottcode/ubuntu/gcppa",
- mangle_ppa_shortcut("ppa:/gottcode/gcppa"))
-
- def test_mangle_ppa_supports_no_ppa_colon_prefix(self):
- """mangle_ppa should also support input without 'ppa:'."""
- self.assertEqual("~mvo/ubuntu/ppa", mangle_ppa_shortcut("~mvo/ppa"))
-
-
-class AddPPASigningKeyTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- for k in apt_pkg.config.keys():
- apt_pkg.config.clear(k)
- apt_pkg.init()
- cls.trustedgpg = os.path.join(
- os.path.dirname(__file__), "aptroot", "etc", "apt", "trusted.gpg.d")
- try:
- os.makedirs(cls.trustedgpg)
- except:
- pass
-
- def setUp(self):
- self.t = AddPPASigningKeyThread("~mvo/ubuntu/ppa")
-
- @patch("softwareproperties.ppa.get_ppa_info_from_lp")
- @patch("softwareproperties.ppa.subprocess")
- def test_fingerprint_len_check(self, mock_subprocess, mock_get_ppa_info):
- """Test that short keyids (<160bit) are rejected"""
- mock_ppa_info = MOCK_PPA_INFO.copy()
- mock_ppa_info["signing_key_fingerprint"] = "0EB12F05"
- mock_get_ppa_info.return_value = mock_ppa_info
- # do it
- res = self.t.add_ppa_signing_key("~mvo/ubuntu/ppa")
- self.assertFalse(res)
- self.assertFalse(mock_subprocess.Popen.called)
- self.assertFalse(mock_subprocess.call.called)
-
- @patch("softwareproperties.ppa.get_ppa_info_from_lp")
- @patch("softwareproperties.ppa.get_info_from_https")
- def test_add_ppa_signing_key_wrong_fingerprint(self, mock_https, mock_get_ppa_info):
- mock_get_ppa_info.return_value = MOCK_PPA_INFO
- mock_https.return_value = MOCK_SECOND_KEY
- res = self.t.add_ppa_signing_key("~mvo/ubuntu/ppa")
- self.assertFalse(res)
-
- @patch("softwareproperties.ppa.get_ppa_info_from_lp")
- @patch("softwareproperties.ppa.get_info_from_https")
- def test_add_ppa_signing_key_multiple_fingerprints(self, mock_https, mock_get_ppa_info):
- mock_get_ppa_info.return_value = MOCK_PPA_INFO
- mock_https.return_value = '\n'.join([MOCK_KEY, MOCK_SECOND_KEY])
- res = self.t.add_ppa_signing_key("~mvo/ubuntu/ppa")
- self.assertFalse(res)
-
- @patch("softwareproperties.ppa.get_ppa_info_from_lp")
- @patch("softwareproperties.ppa.get_info_from_https")
- @patch("apt_pkg.config")
- def test_add_ppa_signing_key_ok(self, mock_config, mock_https, mock_get_ppa_info):
- mock_get_ppa_info.return_value = MOCK_PPA_INFO
- mock_https.return_value = MOCK_KEY
- mock_config.find_dir.return_value = self.trustedgpg
- res = self.t.add_ppa_signing_key("~mvo/ubuntu/ppa")
- self.assertTrue(res)
-
- def test_verify_keyid_is_v4(self):
- keyid = "0EB12F05"
- self.assertFalse(verify_keyid_is_v4(keyid))
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tests/test_pyflakes.py b/tests/test_pyflakes.py
index b066e2d..83aef4a 100755
--- a/tests/test_pyflakes.py
+++ b/tests/test_pyflakes.py
@@ -3,7 +3,6 @@ import os
import subprocess
import unittest
-@unittest.skip("It's not clean")
class TestPyflakesClean(unittest.TestCase):
""" ensure that the tree is pyflakes clean """
diff --git a/tests/test_shortcuts.py b/tests/test_shortcuts.py
index 1338e25..bf6651d 100644
--- a/tests/test_shortcuts.py
+++ b/tests/test_shortcuts.py
@@ -1,34 +1,106 @@
-#!/usr/bin/python
+#!/usr/bin/python3
import apt
import unittest
import sys
-try:
- from urllib.request import urlopen
- from urllib.error import HTTPError, URLError
-except ImportError:
- from urllib2 import HTTPError, URLError, urlopen
-try:
- from http.client import HTTPException
-except ImportError:
- from httplib import HTTPException
+import os
+
+from aptsources.distro import get_distro
+from aptsources.sourceslist import SourceEntry
+from contextlib import contextmanager
+from http.client import HTTPException
+from launchpadlib.launchpad import Launchpad
+from mock import (patch, Mock)
+from urllib.request import urlopen
+from urllib.error import URLError
sys.path.insert(0, "..")
-from softwareproperties.SoftwareProperties import shortcut_handler
-from softwareproperties.shortcuts import ShortcutException
-from mock import patch
+from softwareproperties.sourceslist import SourcesListShortcutHandler
+from softwareproperties.uri import URIShortcutHandler
+from softwareproperties.cloudarchive import CloudArchiveShortcutHandler
+from softwareproperties.ppa import PPAShortcutHandler
+from softwareproperties.shortcuthandler import InvalidShortcutException
+from softwareproperties.shortcuts import shortcut_handler
+
+
+DISTRO = get_distro()
+CODENAME = DISTRO.codename
+
+# These must match the ppa used in the VALID_PPAS
+PPA_LINE = f"deb http://ppa.launchpad.net/ddstreet/ppa/ubuntu/ {CODENAME} main"
+PPA_FILEBASE = "ddstreet-ubuntu-ppa"
+PPA_SOURCEFILE = f"{PPA_FILEBASE}-{CODENAME}.list"
+PPA_TRUSTEDFILE = f"{PPA_FILEBASE}.gpg"
+PPA_NETRCFILE = f"{PPA_FILEBASE}.conf"
+
+PRIVATE_PPA_PASSWORD = "thisisnotarealpassword"
+PRIVATE_PPA_LINE = f"deb https://private-ppa.launchpad.net/ddstreet/ppa/ubuntu/ {CODENAME} main"
+PRIVATE_PPA_NETRCCONTENT = f"machine private-ppa.launchpad.net/ddstreet/ppa/ubuntu/ login ddstreet password {PRIVATE_PPA_PASSWORD}"
+PRIVATE_PPA_SUBSCRIPTION_URLS = [f"https://ddstreet:{PRIVATE_PPA_PASSWORD}@private-ppa.launchpad.net/ddstreet/ppa/ubuntu/"]
+
+# These must match the uca used in VALID_UCAS
+UCA_CANAME = "train"
+UCA_ARCHIVE = "http://ubuntu-cloud.archive.canonical.com/ubuntu"
+UCA_LINE = f"deb {UCA_ARCHIVE} bionic-updates/{UCA_CANAME} main"
+UCA_LINE_PROPOSED = f"deb {UCA_ARCHIVE} bionic-proposed/{UCA_CANAME} main"
+UCA_FILEBASE = f"cloudarchive-{UCA_CANAME}"
+UCA_SOURCEFILE = f"{UCA_FILEBASE}.list"
+CA_ALLOW_CODENAME = "bionic"
+
+# This must match the VALID_URIS
+URI = "http://fake.mirror.private.com/ubuntu"
+URI_FILEBASE = f"archive_uri-http_fake_mirror_private_com_ubuntu"
+URI_SOURCEFILE = f"{URI_FILEBASE}-{CODENAME}.list"
+
+VALID_LINES = [f"deb {URI} bionic main"]
+VALID_URIS = [URI]
+VALID_PPAS = ["ppa:ddstreet", "ppa:~ddstreet", "ppa:ddstreet/ppa", "ppa:~ddstreet/ppa", "ppa:ddstreet/ubuntu/ppa", "ppa:~ddstreet/ubuntu/ppa"]
+VALID_UCAS = [f"cloud-archive:{UCA_CANAME}", f"cloud-archive:{UCA_CANAME}-updates", f"cloud-archive:{UCA_CANAME}-proposed", f"uca:{UCA_CANAME}", f"uca:{UCA_CANAME}-updates", f"uca:{UCA_CANAME}-proposed"]
+VALID_ALL = VALID_LINES + VALID_URIS + VALID_PPAS + VALID_UCAS
+
+INVALID_LINES = ["xxx invalid deb line"]
+INVALID_URIS = ["invalid"]
+INVALID_PPAS = ["ppainvalid:ddstreet", "ppa:ddstreet/ubuntu/ppa/invalid"]
+INVALID_UCAS = [f"cloud-invalid:{UCA_CANAME}", "cloud-archive:"]
+INVALID_ALL = INVALID_LINES + INVALID_URIS + INVALID_PPAS + INVALID_UCAS
def has_network():
try:
- network = urlopen("https://launchpad.net/")
- network
+ with urlopen("https://launchpad.net/"):
+ pass
except (URLError, HTTPException):
return False
return True
+def mock_login_with(*args, **kwargs):
+ _lp = Launchpad.login_anonymously(*args, **kwargs)
+ lp = Mock(wraps=_lp)
+
+ lp.me = Mock()
+ lp.me.name = 'ddstreet'
+ lp.me.getArchiveSubscriptionURLs = lambda: PRIVATE_PPA_SUBSCRIPTION_URLS
+
+ def mock_getPPAByName(_team, name):
+ _ppa = _team.getPPAByName(name=name)
+ ppa = Mock(wraps=_ppa)
+ ppa.signing_key_fingerprint = _ppa.signing_key_fingerprint
+ ppa.private = True
+ return ppa
+
+ def mock_people(teamname):
+ _team = _lp.people(teamname)
+ team = Mock(wraps=_team)
+ team.getPPAByName = lambda name: mock_getPPAByName(_team, name)
+ return team
+
+ lp.people = mock_people
+ return lp
+
+
class ShortcutsTestcase(unittest.TestCase):
+ enable_source = False
@classmethod
def setUpClass(cls):
@@ -42,37 +114,133 @@ class ShortcutsTestcase(unittest.TestCase):
apt.apt_pkg.config.set("Dir::Etc", "etc/apt")
apt.apt_pkg.config.set("Dir::Etc::sourcelist", "sources.list")
apt.apt_pkg.config.set("Dir::Etc::sourceparts", "sources.list.d")
+ apt.apt_pkg.config.set("Dir::Etc::trustedparts", "trusted.gpg.d")
+ apt.apt_pkg.config.set("Dir::Etc::netrcparts", "auth.conf.d")
+
+ def create_handler(self, line, handler, *args, **kwargs):
+ return handler(line, *args, enable_source=self.enable_source, **kwargs)
- def test_shortcut_none(self):
- line = "deb http://ubuntu.com/ubuntu trusty main"
- handler = shortcut_handler(line)
- self.assertEqual((line, None), handler.expand())
+ def create_handlers(self, line, handler, *args, **kwargs):
+ handlers = handler if isinstance(handler, list) else [handler]
+ # note, always appends shortcut_handler
+ return [self.create_handler(line, handler, *args, **kwargs)
+ for handler in handlers + [shortcut_handler]]
+
+ def check_shortcut(self, shortcut, line, sourcefile=None, trustedfile=None, netrcfile=None,
+ sourceparts=apt.apt_pkg.config.find_dir("Dir::Etc::sourceparts"),
+ trustedparts=apt.apt_pkg.config.find_dir("Dir::Etc::trustedparts"),
+ netrcparts=apt.apt_pkg.config.find_dir("Dir::Etc::netrcparts"),
+ trustedcontent=False, netrccontent=None):
+ self.assertEqual(shortcut.SourceEntry().line, line)
+
+ self.assertEqual(shortcut.sourceparts_path, sourceparts)
+ if sourcefile:
+ self.assertEqual(shortcut.SourceEntry().file, os.path.join(sourceparts, sourcefile))
+ self.assertEqual(shortcut.sourceparts_filename, sourcefile)
+ self.assertEqual(shortcut.sourceparts_file, os.path.join(sourceparts, sourcefile))
+
+ binentry = SourceEntry(line)
+ binentry.type = DISTRO.binary_type
+ self.assertEqual(shortcut.SourceEntry(shortcut.binary_type), binentry)
+
+ srcentry = SourceEntry(line)
+ srcentry.type = DISTRO.source_type
+ srcentry.set_enabled(self.enable_source)
+ self.assertEqual(shortcut.SourceEntry(shortcut.source_type), srcentry)
+
+ self.assertEqual(shortcut.trustedparts_path, trustedparts)
+ if trustedfile:
+ self.assertEqual(shortcut.trustedparts_filename, trustedfile)
+ self.assertEqual(shortcut.trustedparts_file, os.path.join(trustedparts, trustedfile))
+
+ # Checking the actual gpg key content is too much work.
+ if trustedcontent:
+ self.assertIsNotNone(shortcut.trustedparts_content)
+ else:
+ self.assertIsNone(shortcut.trustedparts_content)
+
+ self.assertEqual(shortcut.netrcparts_path, netrcparts)
+ if netrcfile:
+ self.assertEqual(shortcut.netrcparts_filename, netrcfile)
+ self.assertEqual(shortcut.netrcparts_file, os.path.join(netrcparts, netrcfile))
+
+ self.assertEqual(shortcut.netrcparts_content, netrccontent)
+
+ def test_shortcut_sourceslist(self):
+ for line in VALID_LINES:
+ for shortcut in self.create_handlers(line, SourcesListShortcutHandler):
+ self.check_shortcut(shortcut, line)
+
+ def test_shortcut_uri(self):
+ for uri in VALID_URIS:
+ line = f"deb {uri} {CODENAME} main"
+ for shortcut in self.create_handlers(uri, URIShortcutHandler):
+ self.check_shortcut(shortcut, line, sourcefile=URI_SOURCEFILE)
@unittest.skipUnless(has_network(), "requires network")
def test_shortcut_ppa(self):
- line = "ppa:mvo"
- handler = shortcut_handler(line)
- self.assertEqual(
- ('deb http://ppa.launchpad.net/mvo/ppa/ubuntu trusty main',
- '/etc/apt/sources.list.d/mvo-ubuntu-ppa-trusty.list'),
- handler.expand("trusty", distro="ubuntu"))
+ for ppa in VALID_PPAS:
+ for shortcut in self.create_handlers(ppa, PPAShortcutHandler):
+ self.check_shortcut(shortcut, PPA_LINE,
+ sourcefile=PPA_SOURCEFILE,
+ trustedfile=PPA_TRUSTEDFILE,
+ netrcfile=PPA_NETRCFILE,
+ trustedcontent=True)
@unittest.skipUnless(has_network(), "requires network")
+ def test_shortcut_private_ppa(self):
+ # this is the same tests as the public ppa, but login=True will use the mocked lp instance
+ # this *does not* actually test/verify this works with a real private ppa; that must be done manually
+ with patch('launchpadlib.launchpad.Launchpad.login_with', new=mock_login_with):
+ for ppa in VALID_PPAS:
+ for shortcut in self.create_handlers(ppa, PPAShortcutHandler, login=True):
+ self.check_shortcut(shortcut, PRIVATE_PPA_LINE,
+ sourcefile=PPA_SOURCEFILE,
+ trustedfile=PPA_TRUSTEDFILE,
+ netrcfile=PPA_NETRCFILE,
+ trustedcontent=True,
+ netrccontent=PRIVATE_PPA_NETRCCONTENT)
+
+ @contextmanager
+ def ca_allow_codename(self, codename):
+ key = "CA_ALLOW_CODENAME"
+ orig = os.environ.get(key, None)
+ try:
+ os.environ[key] = codename
+ yield
+ finally:
+ if orig:
+ os.environ[key] = orig
+ else:
+ os.environ.pop(key, None)
+
def test_shortcut_cloudarchive(self):
- line = "cloud-archive:folsom"
- handler = shortcut_handler(line)
- self.assertEqual(
- ('deb http://ubuntu-cloud.archive.canonical.com/ubuntu '\
- 'precise-updates/folsom main',
- '/etc/apt/sources.list.d/cloudarchive-folsom.list'),
- handler.expand("precise", distro="ubuntu"))
-
- def test_shortcut_exception(self):
- with self.assertRaises(ShortcutException):
- with patch('softwareproperties.ppa.get_ppa_info_from_lp',
- side_effect=lambda *args: HTTPError("url", 404, "not found", [], None)):
- shortcut_handler("ppa:mvo")
+ for uca in VALID_UCAS:
+ line = UCA_LINE_PROPOSED if 'proposed' in uca else UCA_LINE
+ with self.ca_allow_codename(CA_ALLOW_CODENAME):
+ for shortcut in self.create_handlers(uca, CloudArchiveShortcutHandler):
+ self.check_shortcut(shortcut, line, sourcefile=UCA_SOURCEFILE)
+
+ def check_invalid_shortcut(self, handler, shortcut):
+ msg = "'%s' should have rejected '%s'" % (handler, shortcut)
+ with self.assertRaises(InvalidShortcutException, msg=msg):
+ self.create_handler(shortcut, handler)
+
+ def test_shortcut_invalid(self):
+ for s in INVALID_ALL + VALID_URIS + VALID_PPAS + VALID_UCAS:
+ self.check_invalid_shortcut(SourcesListShortcutHandler, s)
+ for s in INVALID_ALL + VALID_LINES + VALID_PPAS + VALID_UCAS:
+ self.check_invalid_shortcut(URIShortcutHandler, s)
+ for s in INVALID_ALL + VALID_LINES + VALID_URIS + VALID_UCAS:
+ self.check_invalid_shortcut(PPAShortcutHandler, s)
+ for s in INVALID_ALL + VALID_LINES + VALID_URIS + VALID_PPAS:
+ self.check_invalid_shortcut(CloudArchiveShortcutHandler, s)
+ for s in INVALID_ALL:
+ self.check_invalid_shortcut(shortcut_handler, s)
+
+class EnableSourceShortcutsTestcase(ShortcutsTestcase):
+ enable_source = True
if __name__ == "__main__":
diff --git a/tests/test_sp.py b/tests/test_sp.py
index b372f66..f6bc8fc 100644
--- a/tests/test_sp.py
+++ b/tests/test_sp.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
# -*- coding: utf-8 -*-
import apt_pkg
Follow ups