canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #00793
[Merge] ~paride/autopkgtest-cloud:pre-commit-black-isort into autopkgtest-cloud:master
Paride Legovini has proposed merging ~paride/autopkgtest-cloud:pre-commit-black-isort into autopkgtest-cloud:master.
Commit message:
Apply black and isort and add them to CI.
Requested reviews:
Canonical's Ubuntu QA (canonical-ubuntu-qa)
For more details, see:
https://code.launchpad.net/~paride/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/445542
--
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~paride/autopkgtest-cloud:pre-commit-black-isort into autopkgtest-cloud:master.
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 3ad872e..0735120 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -14,3 +14,11 @@ repos:
hooks:
- id: yamllint
args: [--strict]
+ - repo: https://github.com/ambv/black
+ rev: 23.3.0
+ hooks:
+ - id: black
+ - repo: https://github.com/pycqa/isort
+ rev: 5.12.0
+ hooks:
+ - id: isort
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-instances b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-instances
index 14caca8..727634e 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-instances
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-instances
@@ -8,7 +8,8 @@ import subprocess
import time
from urllib.error import HTTPError
-import novaclient.client, novaclient.exceptions
+import novaclient.client
+import novaclient.exceptions
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from keystoneauth1 import session
@@ -116,7 +117,8 @@ for instance in nova.servers.list():
if not instance.name.endswith("-" + our_hostname):
logging.debug(
- "instance %s is managed by different worker, ignoring", instance.name
+ "instance %s is managed by different worker, ignoring",
+ instance.name,
)
continue
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-lxd b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-lxd
index 9b3d376..5e9724b 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-lxd
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/cleanup-lxd
@@ -1,10 +1,10 @@
#!/usr/bin/python3
+import datetime
import glob
import json
import os
import subprocess
import sys
-import datetime
MINIMUM_AGE_MINS = 60
@@ -18,13 +18,17 @@ def check_remote(remote):
containers = json.loads(
subprocess.check_output(["lxc", "list", "-fjson", remote + ":"])
)
- containers = [c for c in containers if c["name"].startswith("autopkgtest-lxd-")]
+ containers = [
+ c for c in containers if c["name"].startswith("autopkgtest-lxd-")
+ ]
containers = sorted(
containers, key=lambda c: parse_lxd_time(c["created_at"]), reverse=True
)
# Keep as many containers as we have services
- to_keep = len(glob.glob(f"/etc/systemd/system/autopkgtest@{remote}-*.service"))
+ to_keep = len(
+ glob.glob(f"/etc/systemd/system/autopkgtest@{remote}-*.service")
+ )
if to_keep < len(containers) and os.getenv("DEBUG"):
print(
@@ -36,7 +40,10 @@ def check_remote(remote):
if now - parse_lxd_time(container["created_at"]) >= datetime.timedelta(
minutes=MINIMUM_AGE_MINS
):
- print(f"{remote}:{container['name']} is old - deleting", file=sys.stderr)
+ print(
+ f"{remote}:{container['name']} is old - deleting",
+ file=sys.stderr,
+ )
subprocess.check_call(
["lxc", "delete", "--force", f"{remote}:{container['name']}"]
)
@@ -46,7 +53,9 @@ def main():
if not os.path.exists("/usr/bin/lxc"):
return 0
- remotes = json.loads(subprocess.check_output(["lxc", "remote", "list", "-fjson"]))
+ remotes = json.loads(
+ subprocess.check_output(["lxc", "remote", "list", "-fjson"])
+ )
for remote in remotes:
if not remote.startswith("lxd-armhf"):
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/copy-security-group b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/copy-security-group
index 779a556..e9d5c2f 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/copy-security-group
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/copy-security-group
@@ -7,97 +7,115 @@ Copies SOURCE to NAME, after deleting any existing groups called NAME.
If --delete-only is given, it only deletes existing groups called NAME.
"""
-import os
import argparse
+import os
-
-from keystoneauth1.identity import v2, v3
from keystoneauth1 import session
+from keystoneauth1.identity import v2, v3
from neutronclient.v2_0 import client
-
# Members in a security group rule that cannot be copied.
-RULE_MEMBERS_IGNORE = ["id", "tags", "updated_at",
- "created_at", "revision_number",
- "project_id", "tenant_id", ]
+RULE_MEMBERS_IGNORE = [
+ "id",
+ "tags",
+ "updated_at",
+ "created_at",
+ "revision_number",
+ "project_id",
+ "tenant_id",
+]
def main():
- parser = argparse.ArgumentParser(description='Copy security groups')
- parser.add_argument('name', metavar='NAME', help='name of security group')
- parser.add_argument('--source', default='default',
- help='source group to copy from')
- parser.add_argument('--delete-only', action='store_true',
- help='only delete group')
+ parser = argparse.ArgumentParser(description="Copy security groups")
+ parser.add_argument("name", metavar="NAME", help="name of security group")
+ parser.add_argument(
+ "--source", default="default", help="source group to copy from"
+ )
+ parser.add_argument(
+ "--delete-only", action="store_true", help="only delete group"
+ )
args = parser.parse_args()
# we get called from ExecStartPre of lxd units too (where
# copy-security-group isn't required), just bail out if that's the case
- if 'lxd' in args.name:
+ if "lxd" in args.name:
return
- if os.environ.get('OS_IDENTITY_API_VERSION') == '3':
- auth = v3.Password(auth_url=os.environ['OS_AUTH_URL'],
- username=os.environ['OS_USERNAME'],
- password=os.environ['OS_PASSWORD'],
- project_name=os.environ['OS_PROJECT_NAME'],
- user_domain_name=os.environ['OS_USER_DOMAIN_NAME'],
- project_domain_name=os.environ['OS_PROJECT_DOMAIN_NAME'])
+ if os.environ.get("OS_IDENTITY_API_VERSION") == "3":
+ auth = v3.Password(
+ auth_url=os.environ["OS_AUTH_URL"],
+ username=os.environ["OS_USERNAME"],
+ password=os.environ["OS_PASSWORD"],
+ project_name=os.environ["OS_PROJECT_NAME"],
+ user_domain_name=os.environ["OS_USER_DOMAIN_NAME"],
+ project_domain_name=os.environ["OS_PROJECT_DOMAIN_NAME"],
+ )
else:
auth = v2.Password(
- auth_url=os.environ['OS_AUTH_URL'],
- username=os.environ['OS_USERNAME'],
- password=os.environ['OS_PASSWORD'],
- tenant_name=os.environ['OS_TENANT_NAME'])
+ auth_url=os.environ["OS_AUTH_URL"],
+ username=os.environ["OS_USERNAME"],
+ password=os.environ["OS_PASSWORD"],
+ tenant_name=os.environ["OS_TENANT_NAME"],
+ )
sess = session.Session(auth=auth)
- neutron = client.Client(session=sess,
- tenant_name=os.environ.get("OS_TENANT_NAME"),
- region_name=os.environ["OS_REGION_NAME"])
+ neutron = client.Client(
+ session=sess,
+ tenant_name=os.environ.get("OS_TENANT_NAME"),
+ region_name=os.environ["OS_REGION_NAME"],
+ )
# Find the source group - crashes if it does not exists
- source = [g for g in neutron.list_security_groups()
- ['security_groups'] if g['name'] == args.source][0]
+ source = [
+ g
+ for g in neutron.list_security_groups()["security_groups"]
+ if g["name"] == args.source
+ ][0]
- description = "copy {} of {} ({})".format(args.name, args.source,
- source['description'])
+ description = "copy {} of {} ({})".format(
+ args.name, args.source, source["description"]
+ )
# Delete any existing group with the same name
- existing_groups = [g for g in
- neutron.list_security_groups()['security_groups']
- if g['name'] == args.name]
- existing_ports = neutron.list_ports()['ports']
+ existing_groups = [
+ g
+ for g in neutron.list_security_groups()["security_groups"]
+ if g["name"] == args.name
+ ]
+ existing_ports = neutron.list_ports()["ports"]
for target in existing_groups:
print("Deleting existing group", target)
for port in existing_ports:
- if target['id'] in port['security_groups']:
- print("Deleting port in group:", target['id'])
+ if target["id"] in port["security_groups"]:
+ print("Deleting port in group:", target["id"])
try:
- neutron.delete_port(port['id'])
+ neutron.delete_port(port["id"])
except Exception as e:
print("Could not delete port:", e)
- neutron.delete_security_group(target['id'])
+ neutron.delete_security_group(target["id"])
if not args.delete_only:
print("Creating", description)
target = neutron.create_security_group(
- {'security_group': {'name': args.name,
- 'description': description}}
+ {"security_group": {"name": args.name, "description": description}}
)["security_group"]
for rule in target["security_group_rules"]:
neutron.delete_security_group_rule(rule["id"])
for rule in source["security_group_rules"]:
- rule = {k: v for k, v in rule.items()
- if v is not None and
- k not in RULE_MEMBERS_IGNORE}
+ rule = {
+ k: v
+ for k, v in rule.items()
+ if v is not None and k not in RULE_MEMBERS_IGNORE
+ }
rule["security_group_id"] = target["id"]
print("Copying rule", rule)
- neutron.create_security_group_rule({'security_group_rule': rule})
+ neutron.create_security_group_rule({"security_group_rule": rule})
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/create-nova-image-with-proposed-package b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/create-nova-image-with-proposed-package
index d20bbc2..641f771 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/create-nova-image-with-proposed-package
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/create-nova-image-with-proposed-package
@@ -14,34 +14,39 @@
#
# This creates a new image proposed-<source>/<original image base name>.
-import sys
import os
-import keystoneauth1.loading
-import glanceclient
-from glanceclient.common import utils
import re
-import tempfile
import subprocess
+import sys
+import tempfile
+
+import glanceclient
+import keystoneauth1.loading
+from glanceclient.common import utils
def get_glance():
- '''Return glance client object'''
-
- loader = keystoneauth1.loading.get_plugin_loader('password')
- auth = loader.load_from_options(auth_url=os.environ['OS_AUTH_URL'],
- username=os.environ['OS_USERNAME'],
- password=os.environ['OS_PASSWORD'],
- tenant_name=os.environ['OS_TENANT_NAME'])
+ """Return glance client object"""
+
+ loader = keystoneauth1.loading.get_plugin_loader("password")
+ auth = loader.load_from_options(
+ auth_url=os.environ["OS_AUTH_URL"],
+ username=os.environ["OS_USERNAME"],
+ password=os.environ["OS_PASSWORD"],
+ tenant_name=os.environ["OS_TENANT_NAME"],
+ )
session = keystoneauth1.session.Session(auth=auth)
- return glanceclient.Client('2', session=session, region_name=os.environ['NOVA_REGION'])
+ return glanceclient.Client(
+ "2", session=session, region_name=os.environ["NOVA_REGION"]
+ )
def find_latest_image(img_re):
- '''find latest image that matches given RE'''
+ """find latest image that matches given RE"""
latest = None
for img in glance.images.list():
- if img.status == 'active' and image_re.search(img.name):
+ if img.status == "active" and image_re.search(img.name):
if latest is None or img.created_at > latest.created_at:
latest = img
if not latest:
@@ -52,12 +57,25 @@ def find_latest_image(img_re):
def setup_image(image_path, source):
# get a chroot shell into the image
- img_shell = subprocess.Popen(['sudo', '-n', 'mount-image-callback', '--system-mounts', '--system-resolvconf',
- image_path, 'chroot', '_MOUNTPOINT_', '/bin/sh'],
- stdin=subprocess.PIPE)
+ img_shell = subprocess.Popen(
+ [
+ "sudo",
+ "-n",
+ "mount-image-callback",
+ "--system-mounts",
+ "--system-resolvconf",
+ image_path,
+ "chroot",
+ "_MOUNTPOINT_",
+ "/bin/sh",
+ ],
+ stdin=subprocess.PIPE,
+ )
# find and install proposed binaries for source
- img_shell.stdin.write(('''
+ img_shell.stdin.write(
+ (
+ """
set -e
echo '* Creating policy-rc.d'
printf '#!/bin/sh\\nexit 101\\n' > /usr/sbin/policy-rc.d
@@ -79,7 +97,10 @@ DEBIAN_FRONTEND=noninteractive apt-get install -y $SRCS
echo '* Cleaning up'
apt-get clean
rm -f /etc/machine-id /usr/sbin/policy-rc.d
- ''' % {'src': source}).encode())
+ """
+ % {"src": source}
+ ).encode()
+ )
img_shell.stdin.close()
img_shell.wait()
@@ -90,7 +111,9 @@ rm -f /etc/machine-id /usr/sbin/policy-rc.d
#
if len(sys.argv) != 3:
- sys.stderr.write('Usage: %s <image RE> <proposed source package name>\n' % sys.argv[0])
+ sys.stderr.write(
+ "Usage: %s <image RE> <proposed source package name>\n" % sys.argv[0]
+ )
sys.exit(1)
image_re = re.compile(sys.argv[1])
@@ -98,15 +121,21 @@ source = sys.argv[2]
glance = get_glance()
latest = find_latest_image(image_re)
-print('* Downloading image %s (UUID: %s)...' % (latest.name, latest.id))
-workdir = tempfile.TemporaryDirectory(prefix='make-image-with-proposed-package.')
-img = os.path.join(workdir.name, 'image')
+print("* Downloading image %s (UUID: %s)..." % (latest.name, latest.id))
+workdir = tempfile.TemporaryDirectory(
+ prefix="make-image-with-proposed-package."
+)
+img = os.path.join(workdir.name, "image")
utils.save_image(glance.images.data(latest.id), img)
setup_image(img, source)
-newimg_name = 'proposed-%s/%s' % (source, os.path.basename(latest.name))
-newimg = glance.images.create(name=newimg_name, disk_format=latest.disk_format, container_format=latest.container_format)
-print('* Uploading new image %s (UUID: %s)...' % (newimg.name, newimg.id))
-with open(img, 'rb') as f:
+newimg_name = "proposed-%s/%s" % (source, os.path.basename(latest.name))
+newimg = glance.images.create(
+ name=newimg_name,
+ disk_format=latest.disk_format,
+ container_format=latest.container_format,
+)
+print("* Uploading new image %s (UUID: %s)..." % (newimg.name, newimg.id))
+with open(img, "rb") as f:
glance.images.upload(newimg.id, f)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
index f6dda5a..9ff9df2 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
@@ -3,17 +3,21 @@
import logging
import optparse
-import sys
import re
+import sys
import urllib.parse
+
import amqplib.client_0_8 as amqp
def filter_amqp(options, host, queue_name, regex):
url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
- filter_re = re.compile(regex.encode('UTF-8'))
- amqp_con = amqp.Connection(url_parts.hostname, userid=url_parts.username,
- password=url_parts.password)
+ filter_re = re.compile(regex.encode("UTF-8"))
+ amqp_con = amqp.Connection(
+ url_parts.hostname,
+ userid=url_parts.username,
+ password=url_parts.password,
+ )
ch = amqp_con.channel()
while True:
@@ -21,7 +25,7 @@ def filter_amqp(options, host, queue_name, regex):
if r is None:
break
if isinstance(r.body, str):
- body = r.body.encode('UTF-8')
+ body = r.body.encode("UTF-8")
else:
body = r.body
logging.debug("queue item: %s (not deleting)", body)
@@ -35,23 +39,35 @@ def filter_amqp(options, host, queue_name, regex):
def main():
parser = optparse.OptionParser(
- usage="usage: %prog [options] amqp://user:pass@host queue_name regex")
+ usage="usage: %prog [options] amqp://user:pass@host queue_name regex"
+ )
parser.add_option(
- "-n", "--dry-run", default=False, action="store_true",
- help="only show the operations that would be performed")
+ "-n",
+ "--dry-run",
+ default=False,
+ action="store_true",
+ help="only show the operations that would be performed",
+ )
parser.add_option(
- "-v", "--verbose", default=False, action="store_true",
- help="additionally show queue items that are not removed")
+ "-v",
+ "--verbose",
+ default=False,
+ action="store_true",
+ help="additionally show queue items that are not removed",
+ )
opts, args = parser.parse_args()
- logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
- format="%(asctime)s - %(message)s")
+ logging.basicConfig(
+ level=logging.DEBUG if opts.verbose else logging.INFO,
+ format="%(asctime)s - %(message)s",
+ )
if len(args) != 3:
parser.error("Need to specify host, queue and regex")
filter_amqp(opts, args[0], args[1], args[2])
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
index 965fa10..9e49803 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
@@ -1,35 +1,43 @@
#!/usr/bin/python3
# Filter out all but the latest request for a given upstream PR
-import dateutil.parser
-import distro_info
import json
import logging
import optparse
import os
-import sys
import re
+import sys
import urllib.parse
-import amqplib.client_0_8 as amqp
-
from collections import defaultdict
+import amqplib.client_0_8 as amqp
+import dateutil.parser
+import distro_info
+
UDI = distro_info.UbuntuDistroInfo()
ALL_UBUNTU_RELEASES = UDI.all
SUPPORTED_UBUNTU_RELEASES = sorted(
set(UDI.supported() + UDI.supported_esm()), key=ALL_UBUNTU_RELEASES.index
)
+
def filter_amqp(options, host):
url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
- amqp_con = amqp.Connection(url_parts.hostname, userid=url_parts.username,
- password=url_parts.password)
- dry_run = '[dry-run] ' if options.dry_run else ''
+ amqp_con = amqp.Connection(
+ url_parts.hostname,
+ userid=url_parts.username,
+ password=url_parts.password,
+ )
+ dry_run = "[dry-run] " if options.dry_run else ""
- queues = (f'debci-upstream-{release}-{arch}' for release in SUPPORTED_UBUNTU_RELEASES for arch in ('amd64', 'arm64', 'armhf', 'i386', 'ppc64el', 's390x'))
+ queues = (
+ f"debci-upstream-{release}-{arch}"
+ for release in SUPPORTED_UBUNTU_RELEASES
+ for arch in ("amd64", "arm64", "armhf", "i386", "ppc64el", "s390x")
+ )
for queue_name in queues:
ch = amqp_con.channel()
- logging.debug('Looking at %s', queue_name)
+ logging.debug("Looking at %s", queue_name)
seen = defaultdict(dict)
while True:
try:
@@ -38,52 +46,72 @@ def filter_amqp(options, host):
(code, _, _, _) = e.args
if code != 404:
raise
- logging.debug(f'No such queue {queue_name}')
+ logging.debug(f"No such queue {queue_name}")
break
if r is None:
break
if not isinstance(r.body, str):
- body = r.body.decode('UTF-8')
+ body = r.body.decode("UTF-8")
else:
body = r.body
- (pkg, params) = body.split(' ', 1)
+ (pkg, params) = body.split(" ", 1)
params_j = json.loads(params)
- submit_time = dateutil.parser.parse(params_j['submit-time'])
- pr = [val.split('=', 1)[1] for val in params_j['env'] if val.startswith('UPSTREAM_PULL_REQUEST')][0]
+ submit_time = dateutil.parser.parse(params_j["submit-time"])
+ pr = [
+ val.split("=", 1)[1]
+ for val in params_j["env"]
+ if val.startswith("UPSTREAM_PULL_REQUEST")
+ ][0]
try:
(delivery_tag, old_submit_time) = seen[pkg][pr]
if old_submit_time <= submit_time:
- logging.info(f'{dry_run}We have seen PR {pr} in {queue_name} before: acking the previous request')
+ logging.info(
+ f"{dry_run}We have seen PR {pr} in {queue_name} before: acking the previous request"
+ )
if not options.dry_run:
- ch.basic_ack(delivery_tag) # delivery tag, the old one NOT r.delivery_tag!
+ ch.basic_ack(
+ delivery_tag
+ ) # delivery tag, the old one NOT r.delivery_tag!
del seen[pkg][pr]
except KeyError:
pass
finally:
- logging.debug(f'Recording {pkg}/{pr} for {queue_name}')
+ logging.debug(f"Recording {pkg}/{pr} for {queue_name}")
seen[pkg][pr] = (r.delivery_tag, submit_time)
def main():
parser = optparse.OptionParser(
- usage="usage: %prog [options] amqp://user:pass@host queue_name regex")
+ usage="usage: %prog [options] amqp://user:pass@host queue_name regex"
+ )
parser.add_option(
- "-n", "--dry-run", default=False, action="store_true",
- help="only show the operations that would be performed")
+ "-n",
+ "--dry-run",
+ default=False,
+ action="store_true",
+ help="only show the operations that would be performed",
+ )
parser.add_option(
- "-v", "--verbose", default=False, action="store_true",
- help="additionally show queue items that are not removed")
+ "-v",
+ "--verbose",
+ default=False,
+ action="store_true",
+ help="additionally show queue items that are not removed",
+ )
opts, args = parser.parse_args()
- logging.basicConfig(level=logging.DEBUG if opts.verbose else logging.INFO,
- format="%(asctime)s - %(message)s")
+ logging.basicConfig(
+ level=logging.DEBUG if opts.verbose else logging.INFO,
+ format="%(asctime)s - %(message)s",
+ )
- user = os.environ['RABBIT_USER']
- password = os.environ['RABBIT_PASSWORD']
- host = os.environ['RABBIT_HOST']
- uri = f'amqp://{user}:{password}@{host}'
+ user = os.environ["RABBIT_USER"]
+ password = os.environ["RABBIT_PASSWORD"]
+ host = os.environ["RABBIT_HOST"]
+ uri = f"amqp://{user}:{password}@{host}"
filter_amqp(opts, uri)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/metrics b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/metrics
index e7c552f..f11d3ca 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/metrics
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/metrics
@@ -1,12 +1,12 @@
#!/usr/bin/python3
-from gi.repository import GLib, Gio
-from influxdb import InfluxDBClient
-
import json
import os
import subprocess
+from gi.repository import Gio, GLib
+from influxdb import InfluxDBClient
+
SYSTEM_BUS = Gio.bus_get_sync(Gio.BusType.SYSTEM)
INFLUXDB_CONTEXT = os.environ["INFLUXDB_CONTEXT"]
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/retry-github-test b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/retry-github-test
index e47ecc2..4e258a6 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/retry-github-test
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/retry-github-test
@@ -1,40 +1,62 @@
#!/usr/bin/python3
-'''Retry a GitHub PR test request to autopkgtest.ubuntu.com'''
+"""Retry a GitHub PR test request to autopkgtest.ubuntu.com"""
-import os
-import sys
+import argparse
import hmac
import json
-import urllib.request
+import os
+import sys
import urllib.error
-import argparse
+import urllib.request
-p = argparse.ArgumentParser(description='Retry a GitHub PR test request to autopkgtest.ubuntu.com')
-p.add_argument('pr_api_url',
- help='GitHub PR API URL (e. g. https://api.github.com/repos/JoeDev/coolproj/pulls/1')
-p.add_argument('test_url',
- help='autopkgtest URL (https://autopkgtest.ubuntu.com/request.cgi?release=xenial&arch=i386&...)')
-p.add_argument('secret_file', type=argparse.FileType('rb'),
- help='Path to the GitHub secret for this test web hook')
+p = argparse.ArgumentParser(
+ description="Retry a GitHub PR test request to autopkgtest.ubuntu.com"
+)
+p.add_argument(
+ "pr_api_url",
+ help="GitHub PR API URL (e. g. https://api.github.com/repos/JoeDev/coolproj/pulls/1",
+)
+p.add_argument(
+ "test_url",
+ help="autopkgtest URL (https://autopkgtest.ubuntu.com/request.cgi?release=xenial&arch=i386&...)",
+)
+p.add_argument(
+ "secret_file",
+ type=argparse.FileType("rb"),
+ help="Path to the GitHub secret for this test web hook",
+)
args = p.parse_args()
with urllib.request.urlopen(args.pr_api_url) as f:
api_info = json.loads(f.read().decode())
-payload = json.dumps({'action': 'synchronize',
- 'number': os.path.basename(args.pr_api_url),
- 'pull_request': {'statuses_url': api_info['statuses_url'],
- 'base': api_info['base']}})
+payload = json.dumps(
+ {
+ "action": "synchronize",
+ "number": os.path.basename(args.pr_api_url),
+ "pull_request": {
+ "statuses_url": api_info["statuses_url"],
+ "base": api_info["base"],
+ },
+ }
+)
payload = payload.encode()
-payload_sig = hmac.new(args.secret_file.read().strip(), payload, 'sha1').hexdigest()
+payload_sig = hmac.new(
+ args.secret_file.read().strip(), payload, "sha1"
+).hexdigest()
-req = urllib.request.Request(args.test_url, data=payload,
- headers={'X-Hub-Signature': 'sha1=' + payload_sig,
- 'Content-Type': 'application/json'})
+req = urllib.request.Request(
+ args.test_url,
+ data=payload,
+ headers={
+ "X-Hub-Signature": "sha1=" + payload_sig,
+ "Content-Type": "application/json",
+ },
+)
try:
with urllib.request.urlopen(req) as f:
print(f.read().decode())
except urllib.error.HTTPError as e:
- sys.stderr.write('Request failed with code %i: %s' % (e.code, e.msg))
- sys.stderr.write(e.fp.read().decode('UTF-8', 'replace'))
+ sys.stderr.write("Request failed with code %i: %s" % (e.code, e.msg))
+ sys.stderr.write(e.fp.read().decode("UTF-8", "replace"))
sys.exit(1)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
index a09ad42..f59de0e 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
@@ -3,12 +3,12 @@
# Imported from lp:ubuntu-archive-scripts, lightly modified to not rely on a
# britney config file, to be used for administration or testing.
-from datetime import datetime
-import os
-import sys
import argparse
import json
+import os
+import sys
import urllib.parse
+from datetime import datetime
import amqplib.client_0_8 as amqp
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/seed-new-release b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/seed-new-release
index 656c5f7..6be9c96 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/seed-new-release
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/seed-new-release
@@ -5,29 +5,32 @@
# succeeded in the previous release.
from __future__ import print_function
-import os
-import sys
+
import argparse
+import os
import sqlite3
+import sys
import time
import swiftclient
def srchash(src):
- if src.startswith('lib'):
+ if src.startswith("lib"):
return src[:4]
else:
return src[0]
def connect_swift():
- return swiftclient.Connection(authurl=os.environ['OS_AUTH_URL'],
- user=os.environ['OS_USERNAME'],
- key=os.environ['OS_PASSWORD'],
- tenant_name=os.environ['OS_TENANT_NAME'],
- os_options={'region_name': os.environ['OS_REGION_NAME']},
- auth_version='2.0')
+ return swiftclient.Connection(
+ authurl=os.environ["OS_AUTH_URL"],
+ user=os.environ["OS_USERNAME"],
+ key=os.environ["OS_PASSWORD"],
+ tenant_name=os.environ["OS_TENANT_NAME"],
+ os_options={"region_name": os.environ["OS_REGION_NAME"]},
+ auth_version="2.0",
+ )
def copy_result(rel_path, old_release, new_release):
@@ -36,14 +39,16 @@ def copy_result(rel_path, old_release, new_release):
from_path = old_release + rel_path
to_path = new_release + rel_path
try:
- print('Getting %s' % from_path)
- headers, contents = swift_con.get_object('autopkgtest-' + old_release, from_path)
+ print("Getting %s" % from_path)
+ headers, contents = swift_con.get_object(
+ "autopkgtest-" + old_release, from_path
+ )
except swiftclient.exceptions.ClientException:
- print('Could not get %s - not found' % rel_path)
+ print("Could not get %s - not found" % rel_path)
return
headers_to_copy = {}
- for hdr in 'content-encoding', 'content-type':
+ for hdr in "content-encoding", "content-type":
try:
headers_to_copy[hdr] = headers[hdr]
except KeyError:
@@ -52,76 +57,109 @@ def copy_result(rel_path, old_release, new_release):
for retry in range(10):
try:
if not args.dry_run:
- print('Putting %s' % from_path)
- swiftclient.put_object(swift_con.url, token=swift_con.token,
- container='autopkgtest-' + new_release,
- name=to_path,
- contents=contents,
- content_length=headers['content-length'],
- headers=headers_to_copy)
+ print("Putting %s" % from_path)
+ swiftclient.put_object(
+ swift_con.url,
+ token=swift_con.token,
+ container="autopkgtest-" + new_release,
+ name=to_path,
+ contents=contents,
+ content_length=headers["content-length"],
+ headers=headers_to_copy,
+ )
break
- except (IOError, AttributeError, swiftclient.exceptions.ClientException) as e:
- print('Error connecting to swift, re-connecting in %is: %s' % (5 * retry, e))
+ except (
+ IOError,
+ AttributeError,
+ swiftclient.exceptions.ClientException,
+ ) as e:
+ print(
+ "Error connecting to swift, re-connecting in %is: %s"
+ % (5 * retry, e)
+ )
time.sleep(5 * retry)
swift_con = connect_swift()
else:
- print('Repeated failure to connect to swift')
+ print("Repeated failure to connect to swift")
sys.exit(1)
ap = argparse.ArgumentParser()
-ap.add_argument('old_release')
-ap.add_argument('new_release')
-ap.add_argument('results_db', help='path to autopkgtest.db')
-ap.add_argument('-d', '--dry-run', action='store_true',
- help="Doesn't copy results to new container.")
+ap.add_argument("old_release")
+ap.add_argument("new_release")
+ap.add_argument("results_db", help="path to autopkgtest.db")
+ap.add_argument(
+ "-d",
+ "--dry-run",
+ action="store_true",
+ help="Doesn't copy results to new container.",
+)
args = ap.parse_args()
# connect to Swift
swift_con = connect_swift()
# create new container
-swift_con.put_container('autopkgtest-' + args.new_release,
- headers={'X-Container-Read': '.rlistings,.r:*'})
+swift_con.put_container(
+ "autopkgtest-" + args.new_release,
+ headers={"X-Container-Read": ".rlistings,.r:*"},
+)
# read existing names (needs multiple batches)
existing = set()
-last = ''
+last = ""
while True:
- print('Getting existing results starting with "%s"' % last)
- batch = [i['name'] for i in swift_con.get_container('autopkgtest-' + args.new_release, marker=last)[1]]
- if not batch:
- break
- last = batch[-1]
- existing.update(batch)
+ print('Getting existing results starting with "%s"' % last)
+ batch = [
+ i["name"]
+ for i in swift_con.get_container(
+ "autopkgtest-" + args.new_release, marker=last
+ )[1]
+ ]
+ if not batch:
+ break
+ last = batch[-1]
+ existing.update(batch)
# get passing result per package/arch from database
db_con = sqlite3.connect(args.results_db)
-for (package, arch, run_id) in db_con.execute(
- "SELECT package, arch, MAX(run_id) "
- "FROM test, result "
- "WHERE test.id = result.test_id AND release = '%s' "
- " AND (exitcode = 0 OR exitcode = 2 "
- " OR triggers = 'migration-reference/0') "
- "GROUP BY package, arch" % args.old_release):
-
- for file in 'artifacts.tar.gz', 'result.tar', 'log.gz':
- path = '/%s/%s/%s/%s/%s' % (arch, srchash(package), package, run_id, file)
+for package, arch, run_id in db_con.execute(
+ "SELECT package, arch, MAX(run_id) "
+ "FROM test, result "
+ "WHERE test.id = result.test_id AND release = '%s' "
+ " AND (exitcode = 0 OR exitcode = 2 "
+ " OR triggers = 'migration-reference/0') "
+ "GROUP BY package, arch" % args.old_release
+):
+ for file in "artifacts.tar.gz", "result.tar", "log.gz":
+ path = "/%s/%s/%s/%s/%s" % (
+ arch,
+ srchash(package),
+ package,
+ run_id,
+ file,
+ )
if args.new_release + path in existing:
- print('%s%s already exists, skipping' % (args.old_release, path))
+ print("%s%s already exists, skipping" % (args.old_release, path))
continue
copy_result(path, args.old_release, args.new_release)
-for (package, arch, run_id) in db_con.execute(
- "SELECT package, arch, MAX(run_id) "
- "FROM test, result "
- "WHERE test.id = result.test_id AND release = '%s' "
- " AND triggers = 'migration-reference/0' "
- "GROUP BY package, arch" % args.old_release):
-
- for file in 'artifacts.tar.gz', 'result.tar', 'log.gz':
- path = '/%s/%s/%s/%s/%s' % (arch, srchash(package), package, run_id, file)
+for package, arch, run_id in db_con.execute(
+ "SELECT package, arch, MAX(run_id) "
+ "FROM test, result "
+ "WHERE test.id = result.test_id AND release = '%s' "
+ " AND triggers = 'migration-reference/0' "
+ "GROUP BY package, arch" % args.old_release
+):
+ for file in "artifacts.tar.gz", "result.tar", "log.gz":
+ path = "/%s/%s/%s/%s/%s" % (
+ arch,
+ srchash(package),
+ package,
+ run_id,
+ file,
+ )
if args.new_release + path in existing:
- print('%s%s already exists, skipping' % (args.old_release, path))
+ print("%s%s already exists, skipping" % (args.old_release, path))
continue
copy_result(path, args.old_release, args.new_release)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
index dd4b1c8..442e914 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
@@ -34,7 +34,9 @@ def amqp_lock(name):
password=os.environ["RABBIT_PASSWORD"],
)
channel = amqp_con.channel()
- channel.queue_declare(name, arguments={"args.queue.x-single-active-consumer": True})
+ channel.queue_declare(
+ name, arguments={"args.queue.x-single-active-consumer": True}
+ )
channel.basic_publish(amqp.Message(""), routing_key=name)
consumer_tag = channel.basic_consume(queue=name, callback=callback)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index 4ca5f88..1bbcbae 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -5,34 +5,33 @@
# Requirements: python3-amqplib python3-swiftclient python3-influxdb
# Requirements for running autopkgtest from git: python3-debian libdpkg-perl
-import os
-import sys
-import time
import argparse
import configparser
-import subprocess
+import fnmatch
+import hashlib
+import json
import logging
-import tempfile
+import os
+import random
+import re
import shutil
import signal
-import json
-import urllib.request
-import re
-import hashlib
-import random
-import fnmatch
import socket
+import subprocess
+import sys
+import tempfile
+import time
+import urllib.request
+from urllib.error import HTTPError
import amqplib.client_0_8 as amqp
import distro_info
import swiftclient
import systemd.journal
-
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
-from urllib.error import HTTPError
-ALL_RELEASES = distro_info.UbuntuDistroInfo().get_all(result='object')
+ALL_RELEASES = distro_info.UbuntuDistroInfo().get_all(result="object")
try:
INFLUXDB_CONTEXT = os.environ["INFLUXDB_CONTEXT"]
@@ -59,8 +58,8 @@ cfg = None
swift_creds = {}
exit_requested = None
running_test = False
-status_exchange_name = 'teststatus.fanout'
-complete_exchange_name = 'testcomplete.fanout'
+status_exchange_name = "teststatus.fanout"
+complete_exchange_name = "testcomplete.fanout"
amqp_con = None
systemd_logging_handler = systemd.journal.JournalHandler()
hostname = socket.gethostname()
@@ -79,71 +78,95 @@ FAIL_CODES = (4, 6, 12, 14, 20)
# Note that you get *three* tries, and you need to fail in this way every time
# for the failure to be converted from a tmpfail to a real one.
-FAIL_STRINGS = ['Kernel panic - not syncing:',
- 'Freezing execution.',
- 'Out of memory: Kill process',
- 'error: you need to load the kernel first.',
- 'Faulting instruction address:',
- 'Call Trace:']
-TEMPORARY_TEST_FAIL_STRINGS = ['Could not connect to ftpmaster.internal:80',
- 'Cannot initiate the connection to ppa.launchpad.net:80',
- 'Failed to fetch http://ftpmaster.internal/',
- '" failed with stderr "error: Get https://0.0.0.0/1.0/operations/',
- 'RecursionError: maximum recursion depth exceeded in comparison', # #1908506
- 'Temporary failure resolving \'archive.ubuntu.com\'',
- 'Temporary failure resolving \'ports.ubuntu.com\'',
- 'Temporary failure resolving \'ftpmaster.internal\'',
- 'Temporary failure in name resolution',
- 'Unable to connect to ftpmaster.internal:http:',
- '/tmp/autopkgtest-run-wrapper: command not found', # #1896466
- ': error cleaning up:',
- ' has modification time '] # clock skew, LP: #1880839
+FAIL_STRINGS = [
+ "Kernel panic - not syncing:",
+ "Freezing execution.",
+ "Out of memory: Kill process",
+ "error: you need to load the kernel first.",
+ "Faulting instruction address:",
+ "Call Trace:",
+]
+TEMPORARY_TEST_FAIL_STRINGS = [
+ "Could not connect to ftpmaster.internal:80",
+ "Cannot initiate the connection to ppa.launchpad.net:80",
+ "Failed to fetch http://ftpmaster.internal/",
+ '" failed with stderr "error: Get https://0.0.0.0/1.0/operations/',
+ "RecursionError: maximum recursion depth exceeded in comparison", # #1908506
+ "Temporary failure resolving 'archive.ubuntu.com'",
+ "Temporary failure resolving 'ports.ubuntu.com'",
+ "Temporary failure resolving 'ftpmaster.internal'",
+ "Temporary failure in name resolution",
+ "Unable to connect to ftpmaster.internal:http:",
+ "/tmp/autopkgtest-run-wrapper: command not found", # #1896466
+ ": error cleaning up:",
+ " has modification time ",
+] # clock skew, LP: #1880839
# If we repeatedly time out when installing, there's probably a problem with
# one of the packages' maintainer scripts.
-FAIL_STRINGS_REGEX = [r'timed out on command.*apt-get install',
- r'Removing (?:ubuntu-minimal|netplan\.io)']
+FAIL_STRINGS_REGEX = [
+ r"timed out on command.*apt-get install",
+ r"Removing (?:ubuntu-minimal|netplan\.io)",
+]
# Some packages can provoke specific breakage. For most packages, this would be
# a sign of infrastructure trouble, but for these we should play it safe and
# consider these to be regressions. If they *are* infrastructure problems,
# we'll have to retry them.
-FAIL_PKG_STRINGS = {'systemd*': ['timed out waiting for testbed to reboot',
- 'Timed out on waiting for ssh connection',
- 'Temporary failure resolving',
- 'VirtSubproc.Timeout',
- 'ERROR: testbed failure: testbed auxverb failed with exit code 255',
- 'ERROR: testbed failure: rules extract failed with exit code 100 (apt failure)'],
- 'linux-*': ['timed out waiting for testbed to reboot',
- 'Timed out on waiting for ssh connection',
- 'ERROR: testbed failure: testbed auxverb failed',
- '/bin/bash: No such file or directory'],
- 'libnfs': ['timed out waiting for testbed to reboot',
- 'Timed out on waiting for ssh connection',
- 'ERROR: testbed failure: testbed auxverb failed'],
- 'cluster-glue': ['timed out waiting for testbed to reboot'],
- 'lxc': ['Error starting container'],
- 'nplan': ['VirtSubproc.Timeout'],
- 'netplan.io': ['VirtSubproc.Timeout',
- 'Timed out on waiting for ssh connection',
- 'Temporary failure resolving'],
- 'dnspython': ['dns.exception.Timeout'],
- 'pcapfix': ['Cannot open input file: No such file or directory'],
- 'makedumpfile': ['This does not look like a tar archive',
- 'Timed out on waiting for ssh connection'],
- 'kdump-tools': ['This does not look like a tar archive',
- 'Timed out on waiting for ssh connection'],
- 'llvm-toolchain-*': ['clang: error: unable to execute command: Segmentation fault (core dumped)']}
+FAIL_PKG_STRINGS = {
+ "systemd*": [
+ "timed out waiting for testbed to reboot",
+ "Timed out on waiting for ssh connection",
+ "Temporary failure resolving",
+ "VirtSubproc.Timeout",
+ "ERROR: testbed failure: testbed auxverb failed with exit code 255",
+ "ERROR: testbed failure: rules extract failed with exit code 100 (apt failure)",
+ ],
+ "linux-*": [
+ "timed out waiting for testbed to reboot",
+ "Timed out on waiting for ssh connection",
+ "ERROR: testbed failure: testbed auxverb failed",
+ "/bin/bash: No such file or directory",
+ ],
+ "libnfs": [
+ "timed out waiting for testbed to reboot",
+ "Timed out on waiting for ssh connection",
+ "ERROR: testbed failure: testbed auxverb failed",
+ ],
+ "cluster-glue": ["timed out waiting for testbed to reboot"],
+ "lxc": ["Error starting container"],
+ "nplan": ["VirtSubproc.Timeout"],
+ "netplan.io": [
+ "VirtSubproc.Timeout",
+ "Timed out on waiting for ssh connection",
+ "Temporary failure resolving",
+ ],
+ "dnspython": ["dns.exception.Timeout"],
+ "pcapfix": ["Cannot open input file: No such file or directory"],
+ "makedumpfile": [
+ "This does not look like a tar archive",
+ "Timed out on waiting for ssh connection",
+ ],
+ "kdump-tools": [
+ "This does not look like a tar archive",
+ "Timed out on waiting for ssh connection",
+ ],
+ "llvm-toolchain-*": [
+ "clang: error: unable to execute command: Segmentation fault (core dumped)"
+ ],
+}
# Exemptions from TEMPORARY_TEST_FAIL_STRINGS / FAIL_{PKG_,}STRINGS
# Adding dbconfig-common here is a hack of sorts LP: #2001714
OK_PKG_STRINGS = {
- 'dbconfig-common': ['Temporary failure in name resolution'],
- 'debspawn': ['Temporary failure resolving \'archive.ubuntu.com\'',
- 'Temporary failure resolving \'ports.ubuntu.com\'',
- 'Temporary failure resolving \'ftpmaster.internal\''],
- 'dnspython': ['Temporary failure in name resolution'],
- 'systemd*': ['Temporary failure in name resolution']
+ "dbconfig-common": ["Temporary failure in name resolution"],
+ "debspawn": [
+ "Temporary failure resolving 'archive.ubuntu.com'",
+ "Temporary failure resolving 'ports.ubuntu.com'",
+ "Temporary failure resolving 'ftpmaster.internal'",
+ ],
+ "dnspython": ["Temporary failure in name resolution"],
+ "systemd*": ["Temporary failure in name resolution"],
}
@@ -186,9 +209,9 @@ def getglob(d, glob, default=None):
def term_handler(signum, frame):
- '''SIGTERM handler, for clean exit after current test'''
+ """SIGTERM handler, for clean exit after current test"""
- logging.info('Caught SIGTERM, requesting exit')
+ logging.info("Caught SIGTERM, requesting exit")
global exit_requested
exit_requested = 0
if not running_test:
@@ -196,9 +219,9 @@ def term_handler(signum, frame):
def hup_handler(signum, frame):
- '''SIGHUP handler, for restarting after current test'''
+ """SIGHUP handler, for restarting after current test"""
- logging.info('Caught SIGHUP, requesting restart')
+ logging.info("Caught SIGHUP, requesting restart")
global exit_requested
exit_requested = 10
if not running_test:
@@ -206,24 +229,37 @@ def hup_handler(signum, frame):
def parse_args():
- '''Parse command line and return argparse.args object'''
+ """Parse command line and return argparse.args object"""
parser = argparse.ArgumentParser()
- parser.add_argument('-c', '--config',
- default=os.path.join(my_path, 'worker.conf'),
- help='configuration file (default: %(default)s)')
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='enable debug logging')
- parser.add_argument('-v', '--variable', metavar='KEY=VALUE',
- action='append', default=[],
- help='define additional variable for given config file')
+ parser.add_argument(
+ "-c",
+ "--config",
+ default=os.path.join(my_path, "worker.conf"),
+ help="configuration file (default: %(default)s)",
+ )
+ parser.add_argument(
+ "-d",
+ "--debug",
+ action="store_true",
+ default=False,
+ help="enable debug logging",
+ )
+ parser.add_argument(
+ "-v",
+ "--variable",
+ metavar="KEY=VALUE",
+ action="append",
+ default=[],
+ help="define additional variable for given config file",
+ )
return parser.parse_args()
def read_per_package_configs(cfg):
def read_per_package_file(filename):
out = set()
- with open(filename, 'r') as f:
+ with open(filename, "r") as f:
entries = {
line.strip()
for line in f.readlines()
@@ -243,9 +279,10 @@ def read_per_package_configs(cfg):
finally:
out.add(f"{package}/{arch}/{release}")
return out
+
global big_packages, long_tests, never_run
- dir = cfg.get('autopkgtest', 'per_package_config_dir').strip()
+ dir = cfg.get("autopkgtest", "per_package_config_dir").strip()
big_packages = read_per_package_file(os.path.join(dir, "big_packages"))
long_tests = read_per_package_file(os.path.join(dir, "long_tests"))
@@ -253,14 +290,24 @@ def read_per_package_configs(cfg):
def request_matches_per_package(package, arch, release, s):
- return (any(fnmatch.fnmatchcase(f"{package}/{arch}/{release}", entry) for entry in s) or
- any(fnmatch.fnmatchcase(f"{package}/all/{release}", entry) for entry in s) or
- any(fnmatch.fnmatchcase(f"{package}/{arch}/all", entry) for entry in s) or
- any(fnmatch.fnmatchcase(f"{package}/all/all", entry) for entry in s))
+ return (
+ any(
+ fnmatch.fnmatchcase(f"{package}/{arch}/{release}", entry)
+ for entry in s
+ )
+ or any(
+ fnmatch.fnmatchcase(f"{package}/all/{release}", entry)
+ for entry in s
+ )
+ or any(
+ fnmatch.fnmatchcase(f"{package}/{arch}/all", entry) for entry in s
+ )
+ or any(fnmatch.fnmatchcase(f"{package}/all/all", entry) for entry in s)
+ )
def process_output_dir(dir, pkgname, code, triggers):
- '''Post-process output directory'''
+ """Post-process output directory"""
files = set(os.listdir(dir))
@@ -268,57 +315,70 @@ def process_output_dir(dir, pkgname, code, triggers):
# In failure cases where we don't know the version, write 'unknown' out as
# the version, so that frontends (e.g. autopkgtest-web, or britney) can
# display the result.
- if code in FAIL_CODES and 'testpkg-version' not in files:
- logging.warning('Code %d returned and no testpkg-version - returning "unknown" for %s' % (code, pkgname))
- with open(os.path.join(dir, 'testpkg-version'), 'w') as testpkg_version:
- testpkg_version.write('%s unknown' % pkgname)
- files.add('testpkg-version')
+ if code in FAIL_CODES and "testpkg-version" not in files:
+ logging.warning(
+ 'Code %d returned and no testpkg-version - returning "unknown" for %s'
+ % (code, pkgname)
+ )
+ with open(
+ os.path.join(dir, "testpkg-version"), "w"
+ ) as testpkg_version:
+ testpkg_version.write("%s unknown" % pkgname)
+ files.add("testpkg-version")
# we might need to fake testinfo.json up too, depending on how
# autopkgtest failed. britney uses this to associate results with
# requests
- if 'testinfo.json' not in files and triggers:
- logging.warning('...testinfo.json is missing too, faking one up')
- triggers = ' '.join(triggers)
- with open(os.path.join(dir, 'testinfo.json'), 'w') as testinfo:
- d = {'custom_environment':
- ['ADT_TEST_TRIGGERS=%s' % triggers]}
+ if "testinfo.json" not in files and triggers:
+ logging.warning("...testinfo.json is missing too, faking one up")
+ triggers = " ".join(triggers)
+ with open(os.path.join(dir, "testinfo.json"), "w") as testinfo:
+ d = {"custom_environment": ["ADT_TEST_TRIGGERS=%s" % triggers]}
json.dump(d, testinfo, indent=True)
- files.add('testinfo.json')
+ files.add("testinfo.json")
- with open(os.path.join(dir, 'testpkg-version'), 'r') as tpv:
+ with open(os.path.join(dir, "testpkg-version"), "r") as tpv:
testpkg_version = tpv.read().split()[1]
try:
- with open(os.path.join(dir, 'duration'), 'r') as dur:
+ with open(os.path.join(dir, "duration"), "r") as dur:
duration = dur.read()
except FileNotFoundError:
duration = None
try:
- with open(os.path.join(dir, 'requester'), 'r') as req:
+ with open(os.path.join(dir, "requester"), "r") as req:
requester = req.read()
except FileNotFoundError:
requester = None
# these are small and we need only these for gating and indexing
- resultfiles = ['exitcode']
+ resultfiles = ["exitcode"]
# these might not be present in infrastructure failure cases
- for f in ['testbed-packages', 'testpkg-version', 'duration', 'testinfo.json', 'requester', 'summary']:
+ for f in [
+ "testbed-packages",
+ "testpkg-version",
+ "duration",
+ "testinfo.json",
+ "requester",
+ "summary",
+ ]:
if f in files:
resultfiles.append(f)
- subprocess.check_call(['tar', 'cf', 'result.tar'] + resultfiles, cwd=dir)
+ subprocess.check_call(["tar", "cf", "result.tar"] + resultfiles, cwd=dir)
# compress main log file, for direct access
- subprocess.check_call(['gzip', '-9', os.path.join(dir, 'log')])
- files.discard('log')
+ subprocess.check_call(["gzip", "-9", os.path.join(dir, "log")])
+ files.discard("log")
# the readable-by file, if present, needs to stay intact and be uploaded
# to the container as is, as it's used for ACL
- files.discard('readable-by')
+ files.discard("readable-by")
if files:
# tar up all other artifacts
- subprocess.check_call(['tar', '-czf', 'artifacts.tar.gz'] + list(files), cwd=dir)
+ subprocess.check_call(
+ ["tar", "-czf", "artifacts.tar.gz"] + list(files), cwd=dir
+ )
for f in files:
path = os.path.join(dir, f)
if os.path.isdir(path):
@@ -333,58 +393,71 @@ def series_to_version(series):
versions = [x.version for x in ALL_RELEASES if x.series == series]
if len(versions) < 1:
return None
- return versions[0].strip(' LTS')
+ return versions[0].strip(" LTS")
+
def i386_cross_series(series):
# the first version where i386 is a partiel architecture and only cross
# testing is done is 20.04
- return series_to_version(series) > '19.10'
+ return series_to_version(series) > "19.10"
def host_arch(release, architecture):
- if architecture != 'i386':
+ if architecture != "i386":
return architecture
if not i386_cross_series(release):
return architecture
- return 'amd64'
+ return "amd64"
def subst(s, big_package, release, architecture, hostarch, pkgname):
subst = {
- 'RELEASE': release,
- 'ARCHITECTURE': architecture,
- 'HOSTARCH': hostarch,
- 'PACKAGENAME': pkgname,
- 'PACKAGESIZE': cfg.get('virt',
- big_package and 'package_size_big' or 'package_size_default'),
- 'TIMESTAMP': time.strftime('%Y%m%d-%H%M%S'),
- 'HOSTNAME': hostname,
+ "RELEASE": release,
+ "ARCHITECTURE": architecture,
+ "HOSTARCH": hostarch,
+ "PACKAGENAME": pkgname,
+ "PACKAGESIZE": cfg.get(
+ "virt",
+ big_package and "package_size_big" or "package_size_default",
+ ),
+ "TIMESTAMP": time.strftime("%Y%m%d-%H%M%S"),
+ "HOSTNAME": hostname,
}
for i in args.variable:
- k, v = i.split('=', 1)
+ k, v = i.split("=", 1)
subst[k] = v
for k, v in subst.items():
- s = s.replace('$' + k, v)
+ s = s.replace("$" + k, v)
return s
-def send_status_info(queue, release, architecture, pkgname, params, out_dir, running, duration, private=False):
- '''Send status and logtail to status queue'''
+def send_status_info(
+ queue,
+ release,
+ architecture,
+ pkgname,
+ params,
+ out_dir,
+ running,
+ duration,
+ private=False,
+):
+ """Send status and logtail to status queue"""
if not queue:
return
if private:
- pkgname = 'private-test'
+ pkgname = "private-test"
params = {}
- logtail = 'Running private test'
+ logtail = "Running private test"
else:
# print('status_info:', release, architecture, pkgname, out_dir, running)
try:
- with open(os.path.join(out_dir, 'log'), 'rb') as f:
+ with open(os.path.join(out_dir, "log"), "rb") as f:
try:
f.seek(-2000, os.SEEK_END)
# throw away the first line as we almost surely cut that out in
@@ -393,32 +466,52 @@ def send_status_info(queue, release, architecture, pkgname, params, out_dir, run
except IOError:
# file is smaller than 2000 bytes? okay
pass
- logtail = f.read().decode('UTF-8', errors='replace')
+ logtail = f.read().decode("UTF-8", errors="replace")
except (IOError, OSError) as e:
- logtail = 'Cannot read log file: %s' % e
+ logtail = "Cannot read log file: %s" % e
+
+ msg = json.dumps(
+ {
+ "release": release,
+ "architecture": architecture,
+ "package": pkgname,
+ "running": running,
+ "params": params,
+ "duration": duration,
+ "logtail": logtail,
+ }
+ )
+ queue.basic_publish(
+ amqp.Message(msg, delivery_mode=2), status_exchange_name, ""
+ )
- msg = json.dumps({'release': release,
- 'architecture': architecture,
- 'package': pkgname,
- 'running': running,
- 'params': params,
- 'duration': duration,
- 'logtail': logtail})
- queue.basic_publish(amqp.Message(msg, delivery_mode=2), status_exchange_name, '')
-def call_autopkgtest(argv, release, architecture, pkgname, params, out_dir, start_time, private=False):
- '''Call autopkgtest and regularly send status/logtail to status_exchange_name
+def call_autopkgtest(
+ argv,
+ release,
+ architecture,
+ pkgname,
+ params,
+ out_dir,
+ start_time,
+ private=False,
+):
+ """Call autopkgtest and regularly send status/logtail to status_exchange_name
Return exit code.
- '''
+ """
# set up status AMQP exchange
global amqp_con
status_amqp = amqp_con.channel()
- status_amqp.access_request('/data', active=True, read=False, write=True)
- status_amqp.exchange_declare(status_exchange_name, 'fanout', durable=False, auto_delete=True)
+ status_amqp.access_request("/data", active=True, read=False, write=True)
+ status_amqp.exchange_declare(
+ status_exchange_name, "fanout", durable=False, auto_delete=True
+ )
- null_fd = open('/dev/null', 'w')
- autopkgtest = subprocess.Popen(argv, stdout=null_fd, stderr=subprocess.STDOUT)
+ null_fd = open("/dev/null", "w")
+ autopkgtest = subprocess.Popen(
+ argv, stdout=null_fd, stderr=subprocess.STDOUT
+ )
# FIXME: Use autopkgtest.wait(timeout=10) once moving to Python 3
# only send status update every 10s, but check if program has finished every 1s
status_update_counter = 0
@@ -426,31 +519,49 @@ def call_autopkgtest(argv, release, architecture, pkgname, params, out_dir, star
time.sleep(1)
status_update_counter = (status_update_counter + 1) % 10
if status_update_counter == 0:
- send_status_info(status_amqp, release, architecture, pkgname,
- params, out_dir, True, int(time.time() - start_time),
- private)
+ send_status_info(
+ status_amqp,
+ release,
+ architecture,
+ pkgname,
+ params,
+ out_dir,
+ True,
+ int(time.time() - start_time),
+ private,
+ )
ret = autopkgtest.wait()
- send_status_info(status_amqp, release, architecture, pkgname, params,
- out_dir, False, int(time.time() - start_time),
- private)
+ send_status_info(
+ status_amqp,
+ release,
+ architecture,
+ pkgname,
+ params,
+ out_dir,
+ False,
+ int(time.time() - start_time),
+ private,
+ )
return ret
def log_contents(out_dir):
try:
- with open(os.path.join(out_dir, 'log'),
- encoding='utf-8',
- errors='surrogateescape') as f:
+ with open(
+ os.path.join(out_dir, "log"),
+ encoding="utf-8",
+ errors="surrogateescape",
+ ) as f:
return f.read()
except IOError as e:
- logging.error('Could not read log file: %s' % str(e))
+ logging.error("Could not read log file: %s" % str(e))
return ""
def cleanup_and_sleep(out_dir):
- '''Empty the output dir for the next run, otherwise autopkgtest complains'''
+ """Empty the output dir for the next run, otherwise autopkgtest complains"""
shutil.rmtree(out_dir)
os.mkdir(out_dir)
running_test = False
@@ -459,7 +570,7 @@ def cleanup_and_sleep(out_dir):
def request(msg):
- '''Callback for AMQP queue request'''
+ """Callback for AMQP queue request"""
# Cleanup extras
for extra in list(systemd_logging_handler._extra.keys()):
@@ -474,13 +585,15 @@ def request(msg):
private = False
# FIXME: make this more elegant
- fields = msg.delivery_info['routing_key'].split('-')
+ fields = msg.delivery_info["routing_key"].split("-")
if len(fields) == 4:
release, architecture = fields[2:4]
elif len(fields) == 3:
release, architecture = fields[1:3]
else:
- raise NotImplementedError('cannot parse queue name %s' % msg.delivery_info['routing_key'])
+ raise NotImplementedError(
+ "cannot parse queue name %s" % msg.delivery_info["routing_key"]
+ )
systemd_logging_handler._extra["ADT_RELEASE"] = release
systemd_logging_handler._extra["ADT_ARCH"] = architecture
@@ -488,7 +601,7 @@ def request(msg):
body = msg.body
if isinstance(body, bytes):
try:
- body = msg.body.decode('UTF-8')
+ body = msg.body.decode("UTF-8")
except UnicodeDecodeError as e:
logging.error('Bad encoding in request "%s": %s', msg.body, e)
return
@@ -505,27 +618,36 @@ def request(msg):
logging.error('Received invalid request format "%s"', body)
return
- if not re.match('[a-zA-Z0-9.+-]+$', pkgname):
- logging.error('Request contains invalid package name, dropping: "%s"', body)
+ if not re.match("[a-zA-Z0-9.+-]+$", pkgname):
+ logging.error(
+ 'Request contains invalid package name, dropping: "%s"', body
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
systemd_logging_handler._extra["ADT_PACKAGE"] = pkgname
systemd_logging_handler._extra["ADT_PARAMS"] = str(params)
- logging.info('Received request for package %s on %s/%s; params: %s',
- pkgname, release, architecture, params)
+ logging.info(
+ "Received request for package %s on %s/%s; params: %s",
+ pkgname,
+ release,
+ architecture,
+ params,
+ )
- current_region = os.environ.get('REGION')
+ current_region = os.environ.get("REGION")
# build autopkgtest command line
- work_dir = tempfile.mkdtemp(prefix='autopkgtest-work.')
+ work_dir = tempfile.mkdtemp(prefix="autopkgtest-work.")
try:
- out_dir = os.path.join(work_dir, 'out')
+ out_dir = os.path.join(work_dir, "out")
- if request_matches_per_package(pkgname, architecture, release, never_run):
- logging.warning('Marked to never run, ignoring')
+ if request_matches_per_package(
+ pkgname, architecture, release, never_run
+ ):
+ logging.warning("Marked to never run, ignoring")
dont_run = True
# these will be written later on
@@ -535,56 +657,83 @@ def request(msg):
os.makedirs(out_dir)
# now let's fake up a log file
- with open(os.path.join(out_dir, 'log'), 'w') as log:
- log.write('This package is marked to never run. To get the entry removed, contact a member of the Ubuntu Release or Canonical Ubuntu QA team.')
+ with open(os.path.join(out_dir, "log"), "w") as log:
+ log.write(
+ "This package is marked to never run. To get the entry removed, contact a member of the Ubuntu Release or Canonical Ubuntu QA team."
+ )
triggers = None
# a json file containing the env
- if 'triggers' in params:
- triggers = ' '.join(params['triggers'])
- with open(os.path.join(out_dir, 'testinfo.json'), 'w') as testinfo:
- d = {'custom_environment':
- ['ADT_TEST_TRIGGERS=%s' % triggers]}
+ if "triggers" in params:
+ triggers = " ".join(params["triggers"])
+ with open(
+ os.path.join(out_dir, "testinfo.json"), "w"
+ ) as testinfo:
+ d = {
+ "custom_environment": [
+ "ADT_TEST_TRIGGERS=%s" % triggers
+ ]
+ }
json.dump(d, testinfo, indent=True)
# and the testpackage version (pkgname blacklisted)
# XXX: replace "blacklisted" here, but needs changes in
# proposed-migration and hints
- with open(os.path.join(out_dir, 'testpkg-version'), 'w') as testpkg_version:
- testpkg_version.write('%s blacklisted' % pkgname)
+ with open(
+ os.path.join(out_dir, "testpkg-version"), "w"
+ ) as testpkg_version:
+ testpkg_version.write("%s blacklisted" % pkgname)
- container = 'autopkgtest-' + release
- big_pkg = request_matches_per_package(pkgname,
- architecture,
- release,
- big_packages)
+ container = "autopkgtest-" + release
+ big_pkg = request_matches_per_package(
+ pkgname, architecture, release, big_packages
+ )
- autopkgtest_checkout = cfg.get('autopkgtest', 'checkout_dir').strip()
+ autopkgtest_checkout = cfg.get("autopkgtest", "checkout_dir").strip()
if autopkgtest_checkout:
- argv = [os.path.join(autopkgtest_checkout, 'runner', 'autopkgtest')]
+ argv = [
+ os.path.join(autopkgtest_checkout, "runner", "autopkgtest")
+ ]
else:
- argv = ['autopkgtest']
- argv += ['--output-dir', out_dir, '--timeout-copy=6000']
+ argv = ["autopkgtest"]
+ argv += ["--output-dir", out_dir, "--timeout-copy=6000"]
- if i386_cross_series(release) and architecture == 'i386':
- argv += ['-a', 'i386']
+ if i386_cross_series(release) and architecture == "i386":
+ argv += ["-a", "i386"]
- c = cfg.get('autopkgtest', 'extra_args')
+ c = cfg.get("autopkgtest", "extra_args")
if c:
argv += c.strip().split()
- c = cfg.get('autopkgtest', 'setup_command').strip()
+ c = cfg.get("autopkgtest", "setup_command").strip()
if c:
- c = subst(c, big_pkg, release, architecture, host_arch(release, architecture), pkgname)
- argv += ['--setup-commands', c]
- c = cfg.get('autopkgtest', 'setup_command2').strip()
+ c = subst(
+ c,
+ big_pkg,
+ release,
+ architecture,
+ host_arch(release, architecture),
+ pkgname,
+ )
+ argv += ["--setup-commands", c]
+ c = cfg.get("autopkgtest", "setup_command2").strip()
if c:
- c = subst(c, big_pkg, release, architecture, host_arch(release, architecture), pkgname)
- argv += ['--setup-commands', c]
-
- if 'triggers' in params and 'qemu-efi-noacpi/0' in params['triggers']:
- if architecture == 'arm64':
- argv += ['--setup-commands', '/home/ubuntu/autopkgtest-cloud/worker-config-production/qemu-efi-noacpi.sh']
+ c = subst(
+ c,
+ big_pkg,
+ release,
+ architecture,
+ host_arch(release, architecture),
+ pkgname,
+ )
+ argv += ["--setup-commands", c]
+
+ if "triggers" in params and "qemu-efi-noacpi/0" in params["triggers"]:
+ if architecture == "arm64":
+ argv += [
+ "--setup-commands",
+ "/home/ubuntu/autopkgtest-cloud/worker-config-production/qemu-efi-noacpi.sh",
+ ]
else:
# these will be written later on
code = 99
@@ -592,43 +741,69 @@ def request(msg):
os.makedirs(out_dir)
# fake a log file
- with open(os.path.join(out_dir, 'log'), 'w') as log:
- log.write('Not running due to invalid trigger: qemu-efi-noacpi/0 is arm64 only')
+ with open(os.path.join(out_dir, "log"), "w") as log:
+ log.write(
+ "Not running due to invalid trigger: qemu-efi-noacpi/0 is arm64 only"
+ )
dont_run = True
# and the testpackage version (invalid trigger with a reason)
- with open(os.path.join(out_dir, 'testpkg-version'), 'w') as testpkg_version:
- testpkg_version.write('invalid trigger: qemu-efi-noacpi/0 is arm64 only')
-
- if 'ppas' in params and params['ppas']:
- for ppa in params['ppas']:
+ with open(
+ os.path.join(out_dir, "testpkg-version"), "w"
+ ) as testpkg_version:
+ testpkg_version.write(
+ "invalid trigger: qemu-efi-noacpi/0 is arm64 only"
+ )
+
+ if "ppas" in params and params["ppas"]:
+ for ppa in params["ppas"]:
try:
- (ppacreds, _, ppaurl) = ppa.rpartition('@')
- (ppaurl, _, fingerprint) = ppaurl.partition(':')
- (ppacreds_user, ppacreds_pass) = ppacreds.split(':') if ppacreds else (None, None)
- (ppauser, ppaname) = ppaurl.split('/')
+ (ppacreds, _, ppaurl) = ppa.rpartition("@")
+ (ppaurl, _, fingerprint) = ppaurl.partition(":")
+ (ppacreds_user, ppacreds_pass) = (
+ ppacreds.split(":") if ppacreds else (None, None)
+ )
+ (ppauser, ppaname) = ppaurl.split("/")
except ValueError:
- logging.error('Invalid PPA specification, must be [user:token@]lpuser/ppa_name[:fingerprint]')
+ logging.error(
+ "Invalid PPA specification, must be [user:token@]lpuser/ppa_name[:fingerprint]"
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
if fingerprint:
- logging.debug('Request states that PPA user %s, name %s has GPG fingerprint %s' % (ppauser, ppaname, fingerprint))
+ logging.debug(
+ "Request states that PPA user %s, name %s has GPG fingerprint %s"
+ % (ppauser, ppaname, fingerprint)
+ )
else:
# Private PPAs require the fingerprint passed through the
# request as we can't use the LP API to fetch it.
if ppacreds_user:
- logging.error('Invalid PPA specification, GPG fingerprint required for private PPAs')
+ logging.error(
+ "Invalid PPA specification, GPG fingerprint required for private PPAs"
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
for retry in range(5):
try:
- f = urllib.request.urlopen('https://api.launchpad.net/1.0/~%s/+archive/ubuntu/%s' % (ppauser, ppaname))
- contents = f.read().decode('UTF-8')
+ f = urllib.request.urlopen(
+ "https://api.launchpad.net/1.0/~%s/+archive/ubuntu/%s"
+ % (ppauser, ppaname)
+ )
+ contents = f.read().decode("UTF-8")
f.close()
- fingerprint = json.loads(contents)['signing_key_fingerprint']
- logging.debug('PPA user %s, name %s has GPG fingerprint %s' % (ppauser, ppaname, fingerprint))
+ fingerprint = json.loads(contents)[
+ "signing_key_fingerprint"
+ ]
+ logging.debug(
+ "PPA user %s, name %s has GPG fingerprint %s"
+ % (ppauser, ppaname, fingerprint)
+ )
except (IOError, ValueError, KeyError) as e:
- logging.error('Cannot get PPA information: "%s". Consuming the request - it will be left dangling; retry once the problem is resolved.' % e)
+ logging.error(
+ 'Cannot get PPA information: "%s". Consuming the request - it will be left dangling; retry once the problem is resolved.'
+ % e
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
except HTTPError as e:
@@ -636,155 +811,217 @@ def request(msg):
# few times.
if e.code != 503:
raise
- logging.warning('Got error 503 from launchpad API')
+ logging.warning("Got error 503 from launchpad API")
time.sleep(10)
else:
break
else:
- logging.error('Cannot contact Launchpad to get PPA information. Consuming the request - it will be left dangling; retry once the problem is resolved.')
+ logging.error(
+ "Cannot contact Launchpad to get PPA information. Consuming the request - it will be left dangling; retry once the problem is resolved."
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
if ppacreds_user:
# Any run with at least one private PPA needs to be private.
private = True
- ppaprefix = 'https://%s:%s@private-' % (ppacreds_user, ppacreds_pass)
+ ppaprefix = "https://%s:%s@private-" % (
+ ppacreds_user,
+ ppacreds_pass,
+ )
else:
- ppaprefix = 'http://'
+ ppaprefix = "http://"
# add GPG key
- argv += ['--setup-commands', 'apt-key adv --keyserver keyserver.ubuntu.com --recv-key ' + fingerprint]
+ argv += [
+ "--setup-commands",
+ "apt-key adv --keyserver keyserver.ubuntu.com --recv-key "
+ + fingerprint,
+ ]
# add apt source
- argv += ['--setup-commands', 'REL=$(sed -rn "/^(deb|deb-src) .*(ubuntu.com|ftpmaster)/ { s/^[^ ]+ +(\[.*\] *)?[^ ]* +([^ -]+) +.*$/\\2/p; q }" /etc/apt/sources.list); '
- 'echo "deb %(prefix)sppa.launchpad.net/%(u)s/%(p)s/ubuntu $REL main" > /etc/apt/sources.list.d/autopkgtest-%(u)s-%(p)s.list; '
- 'echo "deb-src %(prefix)sppa.launchpad.net/%(u)s/%(p)s/ubuntu $REL main" >> /etc/apt/sources.list.d/autopkgtest-%(u)s-%(p)s.list;' %
- {'prefix': ppaprefix, 'u': ppauser, 'p': ppaname}]
+ argv += [
+ "--setup-commands",
+ 'REL=$(sed -rn "/^(deb|deb-src) .*(ubuntu.com|ftpmaster)/ { s/^[^ ]+ +(\[.*\] *)?[^ ]* +([^ -]+) +.*$/\\2/p; q }" /etc/apt/sources.list); '
+ 'echo "deb %(prefix)sppa.launchpad.net/%(u)s/%(p)s/ubuntu $REL main" > /etc/apt/sources.list.d/autopkgtest-%(u)s-%(p)s.list; '
+ 'echo "deb-src %(prefix)sppa.launchpad.net/%(u)s/%(p)s/ubuntu $REL main" >> /etc/apt/sources.list.d/autopkgtest-%(u)s-%(p)s.list;'
+ % {"prefix": ppaprefix, "u": ppauser, "p": ppaname},
+ ]
# put results into separate container, named by the last PPA
- container += '-%s-%s' % (ppauser, ppaname)
+ container += "-%s-%s" % (ppauser, ppaname)
# only install the triggering package from -proposed, rest from -release
# this provides better isolation between -proposed packages; but only do
# that for Ubuntu itself, not for things from git, PPAs, etc.
# also skip that for the kernel as the linux vs. linux-meta split always
# screws up the apt pinning
- if cfg.get('virt', 'args') != 'null':
- if 'test-git' not in params and 'test-bzr' not in params and ('ppas' not in params or 'all-proposed' in params):
- pocket_arg = '--apt-pocket=proposed'
- if 'all-proposed' not in params and not pkgname.startswith('linux'):
- trigs = ['src:' + t.split('/', 1)[0] for t in params.get('triggers', []) if t not in ('migration-reference/0', 'qemu-efi-noacpi/0')]
+ if cfg.get("virt", "args") != "null":
+ if (
+ "test-git" not in params
+ and "test-bzr" not in params
+ and ("ppas" not in params or "all-proposed" in params)
+ ):
+ pocket_arg = "--apt-pocket=proposed"
+ if "all-proposed" not in params and not pkgname.startswith(
+ "linux"
+ ):
+ trigs = [
+ "src:" + t.split("/", 1)[0]
+ for t in params.get("triggers", [])
+ if t
+ not in ("migration-reference/0", "qemu-efi-noacpi/0")
+ ]
if trigs:
- pocket_arg += '=' + ','.join(trigs)
+ pocket_arg += "=" + ",".join(trigs)
else:
pocket_arg = ""
if pocket_arg:
argv.append(pocket_arg)
- argv.append('--apt-upgrade')
+ argv.append("--apt-upgrade")
# determine which test to run
- if 'test-git' in params:
- testargs = ['--no-built-binaries', params['test-git']]
- elif 'build-git' in params:
- testargs = [params['build-git']]
- elif 'test-bzr' in params:
- checkout_dir = os.path.join(work_dir, 'checkout')
- subprocess.check_call(['bzr', 'checkout', '--lightweight', params['test-bzr'], checkout_dir])
- testargs = ['--no-built-binaries', checkout_dir]
+ if "test-git" in params:
+ testargs = ["--no-built-binaries", params["test-git"]]
+ elif "build-git" in params:
+ testargs = [params["build-git"]]
+ elif "test-bzr" in params:
+ checkout_dir = os.path.join(work_dir, "checkout")
+ subprocess.check_call(
+ [
+ "bzr",
+ "checkout",
+ "--lightweight",
+ params["test-bzr"],
+ checkout_dir,
+ ]
+ )
+ testargs = ["--no-built-binaries", checkout_dir]
else:
testargs = [pkgname]
argv += testargs
if args.debug:
- argv.append('--debug')
- argv.append('--timeout-short=300')
- if request_matches_per_package(pkgname, architecture, release, long_tests):
- argv.append('--timeout-copy=40000')
- argv.append('--timeout-test=40000')
- argv.append('--timeout-build=40000')
+ argv.append("--debug")
+ argv.append("--timeout-short=300")
+ if request_matches_per_package(
+ pkgname, architecture, release, long_tests
+ ):
+ argv.append("--timeout-copy=40000")
+ argv.append("--timeout-test=40000")
+ argv.append("--timeout-build=40000")
elif big_pkg:
- argv.append('--timeout-copy=20000')
- argv.append('--timeout-test=20000')
- argv.append('--timeout-build=20000')
+ argv.append("--timeout-copy=20000")
+ argv.append("--timeout-test=20000")
+ argv.append("--timeout-build=20000")
else:
- argv.append('--timeout-copy=20000')
- argv.append('--timeout-build=20000')
+ argv.append("--timeout-copy=20000")
+ argv.append("--timeout-build=20000")
- for e in params.get('env', []):
- argv.append('--env=%s' % e)
+ for e in params.get("env", []):
+ argv.append("--env=%s" % e)
triggers = None
- if 'triggers' in params:
- triggers = ' '.join(params['triggers'])
- argv.append('--env=ADT_TEST_TRIGGERS=%s' % triggers)
+ if "triggers" in params:
+ triggers = " ".join(params["triggers"])
+ argv.append("--env=ADT_TEST_TRIGGERS=%s" % triggers)
# want to run against a non-default kernel?
- for t in params['triggers']:
- if t.startswith('linux-meta'):
- totest = t.split('/')[0].replace('linux-meta', 'linux')
+ for t in params["triggers"]:
+ if t.startswith("linux-meta"):
+ totest = t.split("/")[0].replace("linux-meta", "linux")
# XXX: this is all legacy code guessing the package name to
# install from the series and source. We are moving to a
# consistent Provides: on the first flavour meta package.
# Generated by replacing -meta with -image and -headers.
- flavor = t.split('/')[0].replace('linux-meta', '')
+ flavor = t.split("/")[0].replace("linux-meta", "")
# HWE kernels have their official release name in the binary package names.
- if flavor.startswith('-hwe'):
+ if flavor.startswith("-hwe"):
ubuntu_version = series_to_version(release)
if ubuntu_version:
- flavor = flavor.replace('-hwe', '-hwe-' + ubuntu_version, 1)
+ flavor = flavor.replace(
+ "-hwe", "-hwe-" + ubuntu_version, 1
+ )
# OEM kernels have their official release name in the binary
# package names.
# The source package names are of the form linux-meta-oem-XX.YY
# where XX and YY are kernel version numbers. The binary
# package names are linux-image-oem-MM.NN where MM and NN are
# Ubuntu release numbers (i.e. year and month).
- elif any(flavor.startswith(x) for x in ('-oem-{}'.format(n) for n in range(5, 10))):
+ elif any(
+ flavor.startswith(x)
+ for x in ("-oem-{}".format(n) for n in range(5, 10))
+ ):
ubuntu_version = series_to_version(release)
if ubuntu_version:
- flavor = '-oem-{}'.format(ubuntu_version)
- elif flavor == '-ti-omap4':
+ flavor = "-oem-{}".format(ubuntu_version)
+ elif flavor == "-ti-omap4":
# yay consistency
- argv += ['--setup-commands', 'apt-get install -y linux-omap4']
+ argv += [
+ "--setup-commands",
+ "apt-get install -y linux-omap4",
+ ]
# escape dots in flavor
totest = re.escape(totest)
flavor = re.escape(flavor)
- if release == 'precise' and architecture == 'armhf' and flavor == '':
+ if (
+ release == "precise"
+ and architecture == "armhf"
+ and flavor == ""
+ ):
# no linux-image-generic in precise/armhf yet
- argv += ['--setup-commands', 'apt-get install -y linux-image-omap linux-headers-omap']
+ argv += [
+ "--setup-commands",
+ "apt-get install -y linux-image-omap linux-headers-omap",
+ ]
else:
- argv += ['--setup-commands',
- ('apt-get install -y ^kernel-testing--%(t)s--full--preferred$ || ' +
- 'apt-get install -y ^linux-image%(f)s$ ^linux-headers%(f)s$ || ' +
- 'apt-get install -y ^linux-image-generic%(f)s$ ^linux-headers-generic%(f)s$') %
- {'f': flavor, 't': totest}]
- argv += ['--setup-commands',
- ('apt-get install -y ^kernel-testing--%(t)s--modules-extra--preferred$ || ' +
- 'apt-get install -y ^linux-modules-extra%(f)s$ || :') %
- {'f': flavor, 't': totest}]
+ argv += [
+ "--setup-commands",
+ (
+ "apt-get install -y ^kernel-testing--%(t)s--full--preferred$ || "
+ + "apt-get install -y ^linux-image%(f)s$ ^linux-headers%(f)s$ || "
+ + "apt-get install -y ^linux-image-generic%(f)s$ ^linux-headers-generic%(f)s$"
+ )
+ % {"f": flavor, "t": totest},
+ ]
+ argv += [
+ "--setup-commands",
+ (
+ "apt-get install -y ^kernel-testing--%(t)s--modules-extra--preferred$ || "
+ + "apt-get install -y ^linux-modules-extra%(f)s$ || :"
+ )
+ % {"f": flavor, "t": totest},
+ ]
break
- if 'testname' in params:
- argv.append('--testname=%s' % params['testname'])
+ if "testname" in params:
+ argv.append("--testname=%s" % params["testname"])
- argv.append('--')
- argv += subst(cfg.get('virt', 'args'), big_pkg,
- release, architecture,
- host_arch(release, architecture),
- pkgname).split()
+ argv.append("--")
+ argv += subst(
+ cfg.get("virt", "args"),
+ big_pkg,
+ release,
+ architecture,
+ host_arch(release, architecture),
+ pkgname,
+ ).split()
- if 'swiftuser' in params:
+ if "swiftuser" in params:
private = True
elif private:
# Some combination already marked the run as private, but no
# swiftuser user has been specified. This is not valid, as otherwise
# no one would be realistically able to read back the results.
- logging.error('Private autopkgtest run detected but no swiftuser identity provided.')
+ logging.error(
+ "Private autopkgtest run detected but no swiftuser identity provided."
+ )
msg.channel.basic_ack(msg.delivery_tag)
return
if private:
- container = 'private-{}'.format(container)
+ container = "private-{}".format(container)
# run autopkgtest; retry up to three times on tmpfail issues
if not dont_run:
@@ -794,13 +1031,21 @@ def request(msg):
num_failures = 0
for retry in range(3):
retry_start_time = time.time()
- logging.info('Running %s', ' '.join(argv))
- code = call_autopkgtest(argv, release, architecture, pkgname, params, out_dir, start_time,
- private)
+ logging.info("Running %s", " ".join(argv))
+ code = call_autopkgtest(
+ argv,
+ release,
+ architecture,
+ pkgname,
+ params,
+ out_dir,
+ start_time,
+ private,
+ )
is_failure = code in FAIL_CODES
files = set(os.listdir(out_dir))
- is_unknown_version = 'testpkg-version' not in files
+ is_unknown_version = "testpkg-version" not in files
retrying = "Retrying in 5 minutes... " if retry < 2 else ""
@@ -808,146 +1053,231 @@ def request(msg):
# this is an 'unknown' result; try three times but fail
# properly after that (do not tmpfail)
contents = log_contents(out_dir)
- logging.warning("Test run failed with no version. %sLog follows:",
- retrying)
+ logging.warning(
+ "Test run failed with no version. %sLog follows:",
+ retrying,
+ )
logging.error(contents)
- submit_metric(architecture, code, pkgname, current_region, True, release)
+ submit_metric(
+ architecture,
+ code,
+ pkgname,
+ current_region,
+ True,
+ release,
+ )
cleanup_and_sleep(out_dir)
elif is_failure:
contents = log_contents(out_dir)
- temp_fails = [s for s in (set(TEMPORARY_TEST_FAIL_STRINGS)
- - set(getglob(OK_PKG_STRINGS, pkgname, [])))
- if s in contents]
+ temp_fails = [
+ s
+ for s in (
+ set(TEMPORARY_TEST_FAIL_STRINGS)
+ - set(getglob(OK_PKG_STRINGS, pkgname, []))
+ )
+ if s in contents
+ ]
if temp_fails:
- logging.warning('Saw %s in log, which is a sign of a temporary failure.',
- ' and '.join(temp_fails))
- logging.warning('%sLog follows:', retrying)
+ logging.warning(
+ "Saw %s in log, which is a sign of a temporary failure.",
+ " and ".join(temp_fails),
+ )
+ logging.warning("%sLog follows:", retrying)
logging.error(contents)
if retry < 2:
- submit_metric(architecture, code, pkgname, current_region, True, release)
+ submit_metric(
+ architecture,
+ code,
+ pkgname,
+ current_region,
+ True,
+ release,
+ )
cleanup_and_sleep(out_dir)
else:
break
elif code == 16 or code < 0:
contents = log_contents(out_dir)
if exit_requested is not None:
- logging.warning('Testbed failure and exit %i requested. Log follows:', exit_requested)
+ logging.warning(
+ "Testbed failure and exit %i requested. Log follows:",
+ exit_requested,
+ )
logging.error(contents)
sys.exit(exit_requested)
# Get the package-specific string for triggers too, since they might have broken the run
- trigs = [t.split('/', 1)[0] for t in params.get('triggers', [])]
- fail_trigs = [j for i in [getglob(FAIL_PKG_STRINGS, trig, []) for trig in trigs] for j in i]
+ trigs = [
+ t.split("/", 1)[0] for t in params.get("triggers", [])
+ ]
+ fail_trigs = [
+ j
+ for i in [
+ getglob(FAIL_PKG_STRINGS, trig, [])
+ for trig in trigs
+ ]
+ for j in i
+ ]
# Or if all-proposed, just give up and accept everything
- fail_all_proposed = [j for i in FAIL_PKG_STRINGS.values() for j in i]
-
- allowed_fail_strings = set(FAIL_STRINGS +
- getglob(FAIL_PKG_STRINGS, pkgname, []) +
- fail_trigs +
- (fail_all_proposed if 'all-proposed' in params else [])) \
- - set(getglob(OK_PKG_STRINGS, pkgname, []))
-
- fails = [s for s in allowed_fail_strings if s in contents] + \
- [s for s in FAIL_STRINGS_REGEX if re.search(s, contents)]
+ fail_all_proposed = [
+ j for i in FAIL_PKG_STRINGS.values() for j in i
+ ]
+
+ allowed_fail_strings = set(
+ FAIL_STRINGS
+ + getglob(FAIL_PKG_STRINGS, pkgname, [])
+ + fail_trigs
+ + (
+ fail_all_proposed
+ if "all-proposed" in params
+ else []
+ )
+ ) - set(getglob(OK_PKG_STRINGS, pkgname, []))
+
+ fails = [
+ s for s in allowed_fail_strings if s in contents
+ ] + [
+ s for s in FAIL_STRINGS_REGEX if re.search(s, contents)
+ ]
if fails:
num_failures += 1
- logging.warning('Saw %s in log, which is a sign of a real (not tmp) failure - seen %d so far',
- ' and '.join(fails), num_failures)
- logging.warning('Testbed failure. %sLog follows:', retrying)
+ logging.warning(
+ "Saw %s in log, which is a sign of a real (not tmp) failure - seen %d so far",
+ " and ".join(fails),
+ num_failures,
+ )
+ logging.warning(
+ "Testbed failure. %sLog follows:", retrying
+ )
logging.error(contents)
if retry < 2:
- submit_metric(architecture, code, pkgname, current_region, True, release)
+ submit_metric(
+ architecture,
+ code,
+ pkgname,
+ current_region,
+ True,
+ release,
+ )
cleanup_and_sleep(out_dir)
else: # code == 0, no retry needed
break
else:
if num_failures >= 3:
- logging.warning('Three fails in a row - considering this a failure rather than tmpfail')
+ logging.warning(
+ "Three fails in a row - considering this a failure rather than tmpfail"
+ )
code = 4
else:
# 2022-07-05 what code is passed to submit_metric in this code path?
- submit_metric(architecture, code, pkgname, current_region, False, release)
- logging.error('Three tmpfails in a row, aborting worker. Log follows:')
+ submit_metric(
+ architecture,
+ code,
+ pkgname,
+ current_region,
+ False,
+ release,
+ )
+ logging.error(
+ "Three tmpfails in a row, aborting worker. Log follows:"
+ )
logging.error(log_contents(out_dir))
sys.exit(99)
duration = int(time.time() - retry_start_time)
- logging.info('autopkgtest exited with code %i', code)
- submit_metric(architecture, code, pkgname, current_region, False, release)
+ logging.info("autopkgtest exited with code %i", code)
+ submit_metric(
+ architecture, code, pkgname, current_region, False, release
+ )
if code == 1:
- logging.error('autopkgtest exited with unexpected error code 1')
+ logging.error("autopkgtest exited with unexpected error code 1")
sys.exit(1)
- with open(os.path.join(out_dir, 'exitcode'), 'w') as f:
- f.write('%i\n' % code)
- with open(os.path.join(out_dir, 'duration'), 'w') as f:
- f.write('%u\n' % duration)
-
- if 'requester' in params:
- with open(os.path.join(out_dir, 'requester'), 'w') as f:
- f.write('%s\n' % params['requester'])
-
- if 'readable-by' in params:
- with open(os.path.join(out_dir, 'readable-by'), 'w') as f:
- if isinstance(params['readable-by'], list):
- f.write('\n'.join(params['readable-by']))
+ with open(os.path.join(out_dir, "exitcode"), "w") as f:
+ f.write("%i\n" % code)
+ with open(os.path.join(out_dir, "duration"), "w") as f:
+ f.write("%u\n" % duration)
+
+ if "requester" in params:
+ with open(os.path.join(out_dir, "requester"), "w") as f:
+ f.write("%s\n" % params["requester"])
+
+ if "readable-by" in params:
+ with open(os.path.join(out_dir, "readable-by"), "w") as f:
+ if isinstance(params["readable-by"], list):
+ f.write("\n".join(params["readable-by"]))
else:
- f.write('%s\n' % params['readable-by'])
+ f.write("%s\n" % params["readable-by"])
- (testpkg_version, duration, requester) = process_output_dir(out_dir,
- pkgname,
- code,
- params.get('triggers', []))
+ (testpkg_version, duration, requester) = process_output_dir(
+ out_dir, pkgname, code, params.get("triggers", [])
+ )
# If two tests for the same package with different triggers finish at the
# same second, we get collisions with just the timestamp; disambiguate with
# the hashed params. We append a '@' which is a nice delimiter for querying
# runs in swift.
- run_id = '%s_%s@' % (
- time.strftime('%Y%m%d_%H%M%S', time.gmtime()),
- hashlib.sha1(body.encode('UTF-8')).hexdigest()[:5])
- if pkgname.startswith('lib'):
+ run_id = "%s_%s@" % (
+ time.strftime("%Y%m%d_%H%M%S", time.gmtime()),
+ hashlib.sha1(body.encode("UTF-8")).hexdigest()[:5],
+ )
+ if pkgname.startswith("lib"):
prefix = pkgname[:4]
else:
prefix = pkgname[0]
- swift_dir = os.path.join(release, architecture, prefix, pkgname, run_id)
+ swift_dir = os.path.join(
+ release, architecture, prefix, pkgname, run_id
+ )
# publish results into swift
- logging.info('Putting results into swift %s %s', container, swift_dir)
+ logging.info("Putting results into swift %s %s", container, swift_dir)
# create it if it does not exist yet
swift_con = swiftclient.Connection(**swift_creds)
try:
swift_con.get_container(container, limit=1)
except swiftclient.exceptions.ClientException:
- logging.info('container %s does not exist, creating it', container)
+ logging.info("container %s does not exist, creating it", container)
if private:
# private result, share only with swiftuser
- swift_con.put_container(container, headers={'X-Container-Read': '*:%s' % params['swiftuser']})
+ swift_con.put_container(
+ container,
+ headers={"X-Container-Read": "*:%s" % params["swiftuser"]},
+ )
else:
# make it publicly readable
- swift_con.put_container(container, headers={'X-Container-Read': '.rlistings,.r:*'})
+ swift_con.put_container(
+ container, headers={"X-Container-Read": ".rlistings,.r:*"}
+ )
# wait until it exists
timeout = 50
while timeout > 0:
try:
swift_con.get_container(container, limit=1)
- logging.debug('newly created container %s exists now', container)
+ logging.debug(
+ "newly created container %s exists now", container
+ )
break
except swiftclient.exceptions.ClientException:
- logging.debug('newly created container %s does not exist yet, continuing poll', container)
+ logging.debug(
+ "newly created container %s does not exist yet, continuing poll",
+ container,
+ )
time.sleep(1)
timeout -= 1
else:
- logging.error('timed out waiting for newly created container %s', container)
+ logging.error(
+ "timed out waiting for newly created container %s",
+ container,
+ )
sys.exit(1)
for f in os.listdir(out_dir):
path = os.path.join(out_dir, f)
- with open(path, 'rb') as fd:
- if path.endswith('log.gz'):
- content_type = 'text/plain; charset=UTF-8'
- headers = {'Content-Encoding': 'gzip'}
+ with open(path, "rb") as fd:
+ if path.endswith("log.gz"):
+ content_type = "text/plain; charset=UTF-8"
+ headers = {"Content-Encoding": "gzip"}
else:
content_type = None
headers = None
@@ -956,18 +1286,23 @@ def request(msg):
for retry in reversed(range(5)):
try:
# swift_con.put_object() is missing the name kwarg
- swiftclient.put_object(swift_con.url, token=swift_con.token,
- container=container,
- name=os.path.join(swift_dir, f),
- contents=fd,
- content_type=content_type,
- headers=headers,
- content_length=os.path.getsize(path))
+ swiftclient.put_object(
+ swift_con.url,
+ token=swift_con.token,
+ container=container,
+ name=os.path.join(swift_dir, f),
+ contents=fd,
+ content_type=content_type,
+ headers=headers,
+ content_length=os.path.getsize(path),
+ )
break
except swiftclient.exceptions.ClientException as e:
if retry > 0:
- logging.info('Failed to upload %s to swift (%s), retrying in %s seconds...' %
- (path, str(e), sleep_time))
+ logging.info(
+ "Failed to upload %s to swift (%s), retrying in %s seconds..."
+ % (path, str(e), sleep_time)
+ )
time.sleep(sleep_time)
sleep_time *= 2
continue
@@ -980,73 +1315,87 @@ def request(msg):
global amqp_con
complete_amqp = amqp_con.channel()
- complete_amqp.access_request('/complete', active=True, read=False, write=True)
- complete_amqp.exchange_declare(complete_exchange_name, 'fanout', durable=True, auto_delete=False)
- complete_msg = json.dumps ({'architecture': architecture,
- 'container': container,
- 'duration': duration,
- 'exitcode': code,
- 'package': pkgname,
- 'testpkg_version': testpkg_version,
- 'release': release,
- 'requester': requester,
- 'swift_dir': swift_dir,
- 'triggers': triggers})
- complete_amqp.basic_publish(amqp.Message(complete_msg, delivery_mode=2),
- complete_exchange_name, '')
-
- logging.info('Acknowledging request %s' % body)
+ complete_amqp.access_request(
+ "/complete", active=True, read=False, write=True
+ )
+ complete_amqp.exchange_declare(
+ complete_exchange_name, "fanout", durable=True, auto_delete=False
+ )
+ complete_msg = json.dumps(
+ {
+ "architecture": architecture,
+ "container": container,
+ "duration": duration,
+ "exitcode": code,
+ "package": pkgname,
+ "testpkg_version": testpkg_version,
+ "release": release,
+ "requester": requester,
+ "swift_dir": swift_dir,
+ "triggers": triggers,
+ }
+ )
+ complete_amqp.basic_publish(
+ amqp.Message(complete_msg, delivery_mode=2), complete_exchange_name, ""
+ )
+
+ logging.info("Acknowledging request %s" % body)
msg.channel.basic_ack(msg.delivery_tag)
running_test = False
def amqp_connect(cfg, callback):
- '''Connect to AMQP host using given configuration
+ """Connect to AMQP host using given configuration
Connect "callback" to queues for all configured releases and
architectures.
Return queue object.
- '''
+ """
global amqp_con
- logging.info('Connecting to AMQP server %s', os.environ['RABBIT_HOST'])
- amqp_con = amqp.Connection(os.environ['RABBIT_HOST'],
- userid=os.environ['RABBIT_USER'],
- password=os.environ['RABBIT_PASSWORD'],
- confirm_publish=True)
+ logging.info("Connecting to AMQP server %s", os.environ["RABBIT_HOST"])
+ amqp_con = amqp.Connection(
+ os.environ["RABBIT_HOST"],
+ userid=os.environ["RABBIT_USER"],
+ password=os.environ["RABBIT_PASSWORD"],
+ confirm_publish=True,
+ )
queue = amqp_con.channel()
# avoids greedy grabbing of the entire queue while being too busy
queue.basic_qos(0, 1, True)
- arch_str = cfg.get('autopkgtest', 'architectures')
- arch_str = subst(arch_str, 'n/a', 'n/a', 'n/a', 'n/a', 'n/a')
+ arch_str = cfg.get("autopkgtest", "architectures")
+ arch_str = subst(arch_str, "n/a", "n/a", "n/a", "n/a", "n/a")
arches = arch_str.split()
if not arches:
- my_arch = subprocess.check_output(['dpkg', '--print-architecture'],
- universal_newlines=True).strip()
- logging.info('No architectures in configuration, defaulting to %s', my_arch)
+ my_arch = subprocess.check_output(
+ ["dpkg", "--print-architecture"], universal_newlines=True
+ ).strip()
+ logging.info(
+ "No architectures in configuration, defaulting to %s", my_arch
+ )
arches = [my_arch]
# avoid preferring the same architecture on all workers
queues = []
- contexts = ['', 'huge-', 'ppa-']
+ contexts = ["", "huge-", "ppa-"]
# crude way to not allow upstream tests to monopolise resources - only 50%
# of workers will take them
if random.randint(1, 100) < 50:
- contexts += ['upstream-']
+ contexts += ["upstream-"]
- for release in cfg.get('autopkgtest', 'releases').split():
+ for release in cfg.get("autopkgtest", "releases").split():
for context in contexts:
for arch in arches:
- queue_name = 'debci-%s%s-%s' % (context, release, arch)
+ queue_name = "debci-%s%s-%s" % (context, release, arch)
queues.append(queue_name)
random.shuffle(queues)
for queue_name in queues:
- logging.info('Setting up and listening to AMQP queue %s', queue_name)
+ logging.info("Setting up and listening to AMQP queue %s", queue_name)
queue.queue_declare(queue_name, durable=True, auto_delete=False)
queue.basic_consume(queue=queue_name, callback=request)
@@ -1054,7 +1403,7 @@ def amqp_connect(cfg, callback):
def main():
- '''Main program'''
+ """Main program"""
global cfg, args, swift_creds
@@ -1065,42 +1414,47 @@ def main():
# load configuration
cfg = configparser.ConfigParser(
- {'setup_command': '', 'setup_command2': '',
- 'checkout_dir': '',
- 'package_size_default': '', 'package_size_big': '',
- 'extra_args': ''},
- allow_no_value=True)
+ {
+ "setup_command": "",
+ "setup_command2": "",
+ "checkout_dir": "",
+ "package_size_default": "",
+ "package_size_big": "",
+ "extra_args": "",
+ },
+ allow_no_value=True,
+ )
cfg.read(args.config)
- logging.basicConfig(level=(args.debug and logging.DEBUG or logging.INFO),
- format='%(levelname)s: %(message)s',
- handlers=[systemd_logging_handler])
+ logging.basicConfig(
+ level=(args.debug and logging.DEBUG or logging.INFO),
+ format="%(levelname)s: %(message)s",
+ handlers=[systemd_logging_handler],
+ )
- auth_version = os.environ['SWIFT_AUTH_VERSION']
+ auth_version = os.environ["SWIFT_AUTH_VERSION"]
- if auth_version == '2':
+ if auth_version == "2":
swift_creds = {
- 'authurl': os.environ['SWIFT_AUTH_URL'],
- 'user': os.environ['SWIFT_USERNAME'],
- 'key': os.environ['SWIFT_PASSWORD'],
- 'tenant_name': os.environ['SWIFT_TENANT'],
- 'os_options': {
- 'region_name': os.environ['SWIFT_REGION']
- },
- 'auth_version': os.environ['SWIFT_AUTH_VERSION']
+ "authurl": os.environ["SWIFT_AUTH_URL"],
+ "user": os.environ["SWIFT_USERNAME"],
+ "key": os.environ["SWIFT_PASSWORD"],
+ "tenant_name": os.environ["SWIFT_TENANT"],
+ "os_options": {"region_name": os.environ["SWIFT_REGION"]},
+ "auth_version": os.environ["SWIFT_AUTH_VERSION"],
}
else: # 3
swift_creds = {
- 'authurl': os.environ['SWIFT_AUTH_URL'],
- 'user': os.environ['SWIFT_USERNAME'],
- 'key': os.environ['SWIFT_PASSWORD'],
- 'os_options': {
- 'region_name': os.environ['SWIFT_REGION'],
- 'project_domain_name': os.environ['SWIFT_PROJECT_DOMAIN_NAME'],
- 'project_name': os.environ['SWIFT_PROJECT_NAME'],
- 'user_domain_name': os.environ['SWIFT_USER_DOMAIN_NAME']
- },
- 'auth_version': auth_version
+ "authurl": os.environ["SWIFT_AUTH_URL"],
+ "user": os.environ["SWIFT_USERNAME"],
+ "key": os.environ["SWIFT_PASSWORD"],
+ "os_options": {
+ "region_name": os.environ["SWIFT_REGION"],
+ "project_domain_name": os.environ["SWIFT_PROJECT_DOMAIN_NAME"],
+ "project_name": os.environ["SWIFT_PROJECT_NAME"],
+ "user_domain_name": os.environ["SWIFT_USER_DOMAIN_NAME"],
+ },
+ "auth_version": auth_version,
}
# ensure that we can connect to swift
@@ -1112,15 +1466,17 @@ def main():
# process queues forever
try:
while exit_requested is None:
- logging.info('Waiting for and processing AMQP requests')
+ logging.info("Waiting for and processing AMQP requests")
queue.wait()
except IOError:
if exit_requested is None:
raise
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
if exit_requested:
- logging.info('Exiting with %i due to queued exit request' % exit_requested)
+ logging.info(
+ "Exiting with %i due to queued exit request" % exit_requested
+ )
sys.exit(exit_requested)
diff --git a/charms/focal/autopkgtest-cloud-worker/lib/systemd.py b/charms/focal/autopkgtest-cloud-worker/lib/systemd.py
index b83828f..6a87cbd 100644
--- a/charms/focal/autopkgtest-cloud-worker/lib/systemd.py
+++ b/charms/focal/autopkgtest-cloud-worker/lib/systemd.py
@@ -1,19 +1,20 @@
-#pylint: disable=missing-function-docstring
+# pylint: disable=missing-function-docstring
import os
import shutil
+from collections import defaultdict
from textwrap import dedent
-from collections import defaultdict
-from gi.repository import GLib, Gio
+from gi.repository import Gio, GLib
from lib.utils import UbuntuRelease
-
SYSTEM_BUS = Gio.bus_get_sync(Gio.BusType.SYSTEM)
def get_unit_names(region, arch, ns):
if arch == "amd64":
- unit_names = ["autopkgtest@{}-{}.service".format(region, n) for n in ns]
+ unit_names = [
+ "autopkgtest@{}-{}.service".format(region, n) for n in ns
+ ]
else:
unit_names = [
"autopkgtest@{}-{}-{}.service".format(region, arch, n) for n in ns
@@ -222,7 +223,7 @@ def update_lxd_dropins(arch, ip, n):
pass
with open(
- os.path.join(dropindir, "autopkgtest-lxd-remote.conf"), "w"
+ os.path.join(dropindir, "autopkgtest-lxd-remote.conf"), "w"
) as f:
remote_unit = "autopkgtest-lxd-remote@lxd-{}-{}.service".format(
arch, ip
diff --git a/charms/focal/autopkgtest-cloud-worker/lib/utils.py b/charms/focal/autopkgtest-cloud-worker/lib/utils.py
index c7fce49..29ae3f4 100644
--- a/charms/focal/autopkgtest-cloud-worker/lib/utils.py
+++ b/charms/focal/autopkgtest-cloud-worker/lib/utils.py
@@ -1,14 +1,15 @@
-#pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring
+# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring
import os
import pwd
import subprocess
from functools import total_ordering
+
from charmhelpers.core.hookenv import log
from distro_info import UbuntuDistroInfo
-#class UnixUser(object):
-class UnixUser():
+# class UnixUser(object):
+class UnixUser:
def __init__(self, username):
self.username = username
pwnam = pwd.getpwnam(username)
@@ -46,7 +47,7 @@ def install_autodep8(location):
def pull(repository):
- """ This will do a sort of git fetch origin && git reset --hard origin/master """
+ """This will do a sort of git fetch origin && git reset --hard origin/master"""
origin = [
remote for remote in repository.remotes if remote.name == "origin"
][0]
@@ -61,8 +62,8 @@ def pull(repository):
@total_ordering
-#class UbuntuRelease(object):
-class UbuntuRelease():
+# class UbuntuRelease(object):
+class UbuntuRelease:
all_releases = UbuntuDistroInfo().all
def __init__(self, release):
diff --git a/charms/focal/autopkgtest-cloud-worker/reactive/autopkgtest_cloud_worker.py b/charms/focal/autopkgtest-cloud-worker/reactive/autopkgtest_cloud_worker.py
index 3dc4625..12b0b66 100644
--- a/charms/focal/autopkgtest-cloud-worker/reactive/autopkgtest_cloud_worker.py
+++ b/charms/focal/autopkgtest-cloud-worker/reactive/autopkgtest_cloud_worker.py
@@ -1,35 +1,33 @@
-#pylint: disable=missing-module-docstring,missing-function-docstring
+# pylint: disable=missing-module-docstring,missing-function-docstring
+import glob
+import os
+import socket
+import subprocess
+from textwrap import dedent
+
+import pygit2
+import yaml
+from charmhelpers.core.hookenv import (
+ charm_dir,
+ config,
+ log,
+ storage_get,
+ storage_list,
+)
from charms.layer import status
from charms.reactive import (
+ clear_flag,
+ hook,
+ not_unless,
+ set_flag,
when,
when_all,
when_any,
when_not,
when_not_all,
- clear_flag,
- set_flag,
- hook,
- not_unless,
)
from charms.reactive.relations import endpoint_from_flag
-from charmhelpers.core.hookenv import (
- charm_dir,
- config,
- log,
- storage_get,
- storage_list,
-)
-from utils import install_autodep8, UnixUser
-
-from textwrap import dedent
-
-import glob
-import os
-import pygit2
-import socket
-import subprocess
-import yaml
-
+from utils import UnixUser, install_autodep8
AUTOPKGTEST_LOCATION = os.path.expanduser("~ubuntu/autopkgtest")
AUTOPKGTEST_CLOUD_LOCATION = os.path.expanduser("~ubuntu/autopkgtest-cloud")
@@ -45,8 +43,10 @@ AUTODEP8_CLONE_LOCATION = (
"https://git.launchpad.net/~ubuntu-release/+git/autodep8"
)
-AUTOPKGTEST_PER_PACKAGE_CLONE_LOCATION = "https://git.launchpad.net/~ubuntu-release/autopkgtest" + \
- "-cloud/+git/autopkgtest-package-configs"
+AUTOPKGTEST_PER_PACKAGE_CLONE_LOCATION = (
+ "https://git.launchpad.net/~ubuntu-release/autopkgtest"
+ + "-cloud/+git/autopkgtest-package-configs"
+)
RABBITMQ_CRED_PATH = os.path.expanduser("~ubuntu/rabbitmq.cred")
@@ -240,10 +240,9 @@ def clear_rabbitmq():
@when("config.changed.nova-rcs")
def update_nova_rcs():
- from tarfile import TarFile
- from io import BytesIO
-
import base64
+ from io import BytesIO
+ from tarfile import TarFile
rctar = config().get("nova-rcs")
@@ -398,7 +397,7 @@ def write_v2_config():
def write_swift_config():
with open(
- os.path.expanduser("~ubuntu/swift-password.cred"), "w"
+ os.path.expanduser("~ubuntu/swift-password.cred"), "w"
) as swift_password_file:
for key in config():
if key.startswith("swift") and config()[key] is not None:
@@ -481,8 +480,12 @@ def write_worker_config():
with open(conf_file, "r") as cf:
conf_data = cf.read()
with open(conf_file, "w") as cf:
- cf.write(conf_data.replace(config().get("mirror"), "http://us.ports.ubuntu.com/ubuntu-ports/"))
-
+ cf.write(
+ conf_data.replace(
+ config().get("mirror"),
+ "http://us.ports.ubuntu.com/ubuntu-ports/",
+ )
+ )
for region in nworkers_yaml:
for arch in nworkers_yaml[region]:
diff --git a/charms/focal/autopkgtest-cloud-worker/tests/10-deploy b/charms/focal/autopkgtest-cloud-worker/tests/10-deploy
index 06cac38..26c9786 100755
--- a/charms/focal/autopkgtest-cloud-worker/tests/10-deploy
+++ b/charms/focal/autopkgtest-cloud-worker/tests/10-deploy
@@ -1,25 +1,28 @@
#!/usr/bin/python3
+import unittest
+
import amulet
import requests
-import unittest
class TestCharm(unittest.TestCase):
def setUp(self):
self.d = amulet.Deployment()
- self.d.add('autopkgtest-cloud-worker')
- self.d.expose('autopkgtest-cloud-worker')
+ self.d.add("autopkgtest-cloud-worker")
+ self.d.expose("autopkgtest-cloud-worker")
self.d.setup(timeout=900)
self.d.sentry.wait()
- self.unit = self.d.sentry['autopkgtest-cloud-worker'][0]
+ self.unit = self.d.sentry["autopkgtest-cloud-worker"][0]
def test_service(self):
# test we can access over http
- page = requests.get('http://{}'.format(self.unit.info['public-address']))
+ page = requests.get(
+ "http://{}".format(self.unit.info["public-address"])
+ )
self.assertEqual(page.status_code, 200)
# Now you can use self.d.sentry[SERVICE][UNIT] to address each of the units and perform
# more in-depth steps. Each self.d.sentry[SERVICE][UNIT] has the following methods:
@@ -31,5 +34,5 @@ class TestCharm(unittest.TestCase):
# - .relation(relation, service:rel) - Get relation data from return service
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/charms/focal/autopkgtest-web/reactive/autopkgtest_web.py b/charms/focal/autopkgtest-web/reactive/autopkgtest_web.py
index fc87317..08ec944 100644
--- a/charms/focal/autopkgtest-web/reactive/autopkgtest_web.py
+++ b/charms/focal/autopkgtest-web/reactive/autopkgtest_web.py
@@ -1,21 +1,20 @@
+import glob
+import os
+import shutil
+import subprocess
+from textwrap import dedent
+
+from charmhelpers.core.hookenv import charm_dir, config
from charms.layer import status
from charms.reactive import (
+ clear_flag,
+ hook,
+ set_flag,
when,
when_all,
when_any,
when_not,
- set_flag,
- clear_flag,
- hook,
)
-from charmhelpers.core.hookenv import charm_dir, config
-
-from textwrap import dedent
-
-import glob
-import os
-import shutil
-import subprocess
AUTOPKGTEST_CLOUD_CONF = os.path.expanduser("~ubuntu/autopkgtest-cloud.conf")
GITHUB_SECRETS_PATH = os.path.expanduser("~ubuntu/github-secrets.json")
@@ -124,7 +123,7 @@ def initially_configure_website(website):
"config.set.hostname",
"config.set.storage-url-internal",
"website.available",
- "autopkgtest-web.website-initially-configured"
+ "autopkgtest-web.website-initially-configured",
)
def set_up_web_config(apache):
webcontrol_dir = os.path.join(charm_dir(), "webcontrol")
@@ -243,8 +242,9 @@ def clear_github_secrets():
pass
-@when_all("config.changed.swift-web-credentials",
- "config.set.swift-web-credentials")
+@when_all(
+ "config.changed.swift-web-credentials", "config.set.swift-web-credentials"
+)
def write_swift_web_credentials():
swift_credentials = config().get("swift-web-credentials")
@@ -351,7 +351,9 @@ def symlink_public_db():
shutil.chown(publicdir, user="ubuntu", group="ubuntu")
os.symlink(
os.path.join(publicdir, "autopkgtest.db"),
- os.path.join(charm_dir(), "webcontrol", "static", "autopkgtest.db"),
+ os.path.join(
+ charm_dir(), "webcontrol", "static", "autopkgtest.db"
+ ),
)
set_flag("autopkgtest-web.public-db-symlinked")
except FileExistsError:
diff --git a/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector b/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
index 66a9ed4..6b6bbfa 100755
--- a/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
+++ b/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
@@ -2,21 +2,20 @@
# Pick up running tests, their status and logtail from the "teststatus" fanout
# queue, and regularly write it into /run/running.json
-import os
+import configparser
import json
+import logging
+import os
import socket
import time
-import configparser
import urllib.parse
-import logging
import amqplib.client_0_8 as amqp
-exchange_name = 'teststatus.fanout'
-running_name = os.path.join(os.path.sep,
- 'run',
- 'amqp-status-collector',
- 'running.json')
+exchange_name = "teststatus.fanout"
+running_name = os.path.join(
+ os.path.sep, "run", "amqp-status-collector", "running.json"
+)
running_name_new = "{}.new".format(running_name)
# package -> runhash -> release -> arch -> (params, duration, logtail)
@@ -25,21 +24,24 @@ last_update = 0
def amqp_connect():
- '''Connect to AMQP server'''
+ """Connect to AMQP server"""
cp = configparser.ConfigParser()
- cp.read(os.path.expanduser('~ubuntu/autopkgtest-cloud.conf'))
- amqp_uri = cp['amqp']['uri']
+ cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+ amqp_uri = cp["amqp"]["uri"]
parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
- amqp_con = amqp.Connection(parts.hostname, userid=parts.username,
- password=parts.password)
- logging.info('Connected to AMQP server at %s@%s' % (parts.username, parts.hostname))
+ amqp_con = amqp.Connection(
+ parts.hostname, userid=parts.username, password=parts.password
+ )
+ logging.info(
+ "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
+ )
return amqp_con
def update_output(amqp_channel, force_update=False):
- '''Update report'''
+ """Update report"""
global last_update
@@ -48,60 +50,70 @@ def update_output(amqp_channel, force_update=False):
if not force_update and now - last_update < 10:
return
- with open(running_name_new, 'w', encoding='utf-8') as f:
+ with open(running_name_new, "w", encoding="utf-8") as f:
json.dump(running_tests, f)
os.rename(running_name_new, running_name)
def process_message(msg):
- '''Process AMQP status message, update running_tests'''
+ """Process AMQP status message, update running_tests"""
body = msg.body
if isinstance(body, bytes):
- body = body.decode('UTF-8', errors='replace')
+ body = body.decode("UTF-8", errors="replace")
info = json.loads(body)
- runhash = ''
- params = info.get('params', {})
+ runhash = ""
+ params = info.get("params", {})
for p in sorted(params):
- runhash += '%s_%s;' % (p, params[p])
-
- if info['running']:
- running_tests.setdefault(info['package'], {}).setdefault(
- runhash, {}).setdefault(
- info['release'], {})[info['architecture']] = (params, info.get('duration', 0), info['logtail'])
+ runhash += "%s_%s;" % (p, params[p])
+
+ if info["running"]:
+ running_tests.setdefault(info["package"], {}).setdefault(
+ runhash, {}
+ ).setdefault(info["release"], {})[info["architecture"]] = (
+ params,
+ info.get("duration", 0),
+ info["logtail"],
+ )
else:
try:
- del running_tests[info['package']][runhash][info['release']][info['architecture']]
+ del running_tests[info["package"]][runhash][info["release"]][
+ info["architecture"]
+ ]
# prune empty dicts
- if not running_tests[info['package']][runhash][info['release']]:
- del running_tests[info['package']][runhash][info['release']]
- if not running_tests[info['package']][runhash]:
- del running_tests[info['package']][runhash]
- if not running_tests[info['package']]:
- del running_tests[info['package']]
+ if not running_tests[info["package"]][runhash][info["release"]]:
+ del running_tests[info["package"]][runhash][info["release"]]
+ if not running_tests[info["package"]][runhash]:
+ del running_tests[info["package"]][runhash]
+ if not running_tests[info["package"]]:
+ del running_tests[info["package"]]
except KeyError:
pass
- update_output(msg.channel, not info['running'])
+ update_output(msg.channel, not info["running"])
#
# main
#
-logging.basicConfig(level=('DEBUG' in os.environ and logging.DEBUG or logging.INFO))
+logging.basicConfig(
+ level=("DEBUG" in os.environ and logging.DEBUG or logging.INFO)
+)
amqp_con = amqp_connect()
status_ch = amqp_con.channel()
-status_ch.access_request('/data', active=True, read=True, write=False)
-status_ch.exchange_declare(exchange_name, 'fanout', durable=False, auto_delete=True)
-queue_name = 'running-listener-%s' % socket.getfqdn()
+status_ch.access_request("/data", active=True, read=True, write=False)
+status_ch.exchange_declare(
+ exchange_name, "fanout", durable=False, auto_delete=True
+)
+queue_name = "running-listener-%s" % socket.getfqdn()
status_ch.queue_declare(queue_name, durable=False, auto_delete=True)
status_ch.queue_bind(queue_name, exchange_name, queue_name)
-logging.info('Listening to requests on %s' % queue_name)
-status_ch.basic_consume('', callback=process_message, no_ack=True)
+logging.info("Listening to requests on %s" % queue_name)
+status_ch.basic_consume("", callback=process_message, no_ack=True)
while status_ch.callbacks:
status_ch.wait()
diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
index 56f27c3..efdc9b7 100755
--- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
+++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
@@ -2,20 +2,20 @@
"""Browse autopkgtest results"""
+import configparser
+import json
import os
+import re
import sqlite3
-import json
-import configparser
import urllib.parse
-import re
-import distro_info
from collections import OrderedDict
-
-from werkzeug.middleware.proxy_fix import ProxyFix
from wsgiref.handlers import CGIHandler
+
+import distro_info
import flask
+from werkzeug.middleware.proxy_fix import ProxyFix
-app = flask.Flask('browse')
+app = flask.Flask("browse")
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
db_con = None
swift_container_url = None
@@ -31,20 +31,24 @@ def init_config():
global db_con, swift_container_url
cp = configparser.ConfigParser()
- cp.read(os.path.expanduser('~ubuntu/autopkgtest-cloud.conf'))
+ cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
- db_con = sqlite3.connect('file:%s?mode=ro' % cp['web']['database_ro'], uri=True)
+ db_con = sqlite3.connect(
+ "file:%s?mode=ro" % cp["web"]["database_ro"], uri=True
+ )
try:
- url = cp['web']['ExternalURL']
+ url = cp["web"]["ExternalURL"]
except KeyError:
- url = cp['web']['SwiftURL']
- swift_container_url = os.path.join(url, 'autopkgtest-%s')
+ url = cp["web"]["SwiftURL"]
+ swift_container_url = os.path.join(url, "autopkgtest-%s")
def get_test_id(release, arch, src):
c = db_con.cursor()
- c.execute('SELECT id FROM test WHERE release=? AND arch=? AND package=?',
- (release, arch, src))
+ c.execute(
+ "SELECT id FROM test WHERE release=? AND arch=? AND package=?",
+ (release, arch, src),
+ )
try:
return c.fetchone()[0]
except TypeError:
@@ -55,77 +59,90 @@ def render(template, code=200, **kwargs):
# sort the values passed in, so that releases are in the right order
try:
release_arches = OrderedDict()
- for k in sorted(kwargs['release_arches'], key=ALL_UBUNTU_RELEASES.index):
- release_arches[k] = kwargs['release_arches'][k]
- kwargs['release_arches'] = release_arches
+ for k in sorted(
+ kwargs["release_arches"], key=ALL_UBUNTU_RELEASES.index
+ ):
+ release_arches[k] = kwargs["release_arches"][k]
+ kwargs["release_arches"] = release_arches
except KeyError:
pass
try:
- kwargs['releases'] = sorted(kwargs['releases'], key=ALL_UBUNTU_RELEASES.index)
+ kwargs["releases"] = sorted(
+ kwargs["releases"], key=ALL_UBUNTU_RELEASES.index
+ )
except KeyError:
pass
- return (flask.render_template(template,
- base_url=flask.url_for('index_root'),
- static_url=flask.url_for('static', filename='/'),
- **kwargs), code)
+ return (
+ flask.render_template(
+ template,
+ base_url=flask.url_for("index_root"),
+ static_url=flask.url_for("static", filename="/"),
+ **kwargs
+ ),
+ code,
+ )
def human_date(run_id):
- return re.sub(r'(\d\d\d\d)(\d\d)(\d\d)_(\d\d)(\d\d)(\d\d).*',
- r'\1-\2-\3 \4:\5:\6 UTC',
- run_id)
+ return re.sub(
+ r"(\d\d\d\d)(\d\d)(\d\d)_(\d\d)(\d\d)(\d\d).*",
+ r"\1-\2-\3 \4:\5:\6 UTC",
+ run_id,
+ )
def human_sec(secs):
- return '%ih %02im %02is' % (secs // 3600, (secs % 3600) // 60, secs % 60)
+ return "%ih %02im %02is" % (secs // 3600, (secs % 3600) // 60, secs % 60)
def human_exitcode(code):
if code in (0, 2):
- return 'pass'
+ return "pass"
if code in (4, 6, 12):
- return 'fail'
+ return "fail"
if code in (8,):
- return 'neutral'
+ return "neutral"
if code == 99:
- return 'denylisted'
+ return "denylisted"
if code == 16:
- return 'tmpfail'
+ return "tmpfail"
if code == 20:
- return 'error'
- return 'exit code %i' % code
+ return "error"
+ return "exit code %i" % code
def srchash(src):
- if src.startswith('lib'):
+ if src.startswith("lib"):
return src[:4]
else:
return src[0]
def get_release_arches():
- '''Determine available releases and architectures
+ """Determine available releases and architectures
Return release → [arch] dict.
- '''
+ """
release_arches = {}
releases = []
- for row in db_con.execute('SELECT DISTINCT release from test'):
+ for row in db_con.execute("SELECT DISTINCT release from test"):
if row[0] in SUPPORTED_UBUNTU_RELEASES:
releases.append(row[0])
for r in releases:
- for row in db_con.execute('SELECT DISTINCT arch from test WHERE release=?', (r,)):
+ for row in db_con.execute(
+ "SELECT DISTINCT arch from test WHERE release=?", (r,)
+ ):
release_arches.setdefault(r, []).append(row[0])
return release_arches
def get_queue_info():
- '''Return information about queued tests
+ """Return information about queued tests
Return (releases, arches, context -> release -> arch -> (queue_size, [requests])).
- '''
+ """
- with open('/var/lib/cache-amqp/queued.json', 'r') as json_file:
+ with open("/var/lib/cache-amqp/queued.json", "r") as json_file:
queue_info_j = json.load(json_file)
releases = queue_info_j["releases"]
@@ -139,24 +156,27 @@ def get_queue_info():
for arch in queues[context][release]:
requests = queues[context][release][arch]["requests"]
size = queues[context][release][arch]["size"]
- ctx.setdefault(context, {}).setdefault(release, {})[arch] = (size, requests)
+ ctx.setdefault(context, {}).setdefault(release, {})[
+ arch
+ ] = (size, requests)
return (releases, arches, ctx)
def get_source_versions(db, release):
- '''Return srcpkg → current_version mapping for given release'''
+ """Return srcpkg → current_version mapping for given release"""
srcs = {}
- for (pkg, ver) in db.execute('SELECT package, version '
- 'FROM current_version '
- 'WHERE release = ?', (release, )):
+ for pkg, ver in db.execute(
+ "SELECT package, version " "FROM current_version " "WHERE release = ?",
+ (release,),
+ ):
srcs[pkg] = ver
return srcs
def success_count_for_release_and_arch(db, release, arch, src_versions):
- '''Return number of packages with tests that pass'''
+ """Return number of packages with tests that pass"""
count = 0
@@ -165,15 +185,17 @@ def success_count_for_release_and_arch(db, release, arch, src_versions):
# but succeeded for a trigger that is not published), don't count it as
# success
cur_pkg = None
- for (pkg, triggers, code) in db.execute(
- 'SELECT test.package, triggers, exitcode '
- 'FROM test, result, current_version '
- 'WHERE test.id == result.test_id AND test.release=? AND arch=? '
- ' AND test.package = current_version.package '
- ' AND test.release = current_version.release '
- ' AND result.version = current_version.version '
- ' AND (exitcode = 0 OR exitcode = 2 OR exitcode = 8) '
- 'ORDER BY test.package, run_id DESC', (release, arch)):
+ for pkg, triggers, code in db.execute(
+ "SELECT test.package, triggers, exitcode "
+ "FROM test, result, current_version "
+ "WHERE test.id == result.test_id AND test.release=? AND arch=? "
+ " AND test.package = current_version.package "
+ " AND test.release = current_version.release "
+ " AND result.version = current_version.version "
+ " AND (exitcode = 0 OR exitcode = 2 OR exitcode = 8) "
+ "ORDER BY test.package, run_id DESC",
+ (release, arch),
+ ):
# start of a new package block?
if pkg != cur_pkg:
# logging.debug('new package start: %s [%s] %i', pkg, triggers, code)
@@ -187,7 +209,7 @@ def success_count_for_release_and_arch(db, release, arch, src_versions):
# logging.debug('considered result: %s [%s] %i', pkg, triggers, code)
# weed out non-current triggers
for trigger in triggers.split():
- src, ver = trigger.split('/')
+ src, ver = trigger.split("/")
# it can happen that src_versions does not have a trigger source
# pacakge if that trigger source got removed in the final release
if src_versions.get(src) != ver:
@@ -203,103 +225,147 @@ def success_count_for_release_and_arch(db, release, arch, src_versions):
return count
-@app.route('/')
+@app.route("/")
def index_root():
- letters = list('abcdefghijklmnopqrstuvwxyz')
- indexes = letters + ['lib' + l for l in letters]
+ letters = list("abcdefghijklmnopqrstuvwxyz")
+ indexes = letters + ["lib" + l for l in letters]
indexes.sort()
recent = []
- for row in db_con.execute('SELECT exitcode, package, release, arch, triggers '
- 'FROM result, test '
- 'WHERE test.id == result.test_id '
- 'ORDER BY run_id DESC '
- 'LIMIT 15'):
+ for row in db_con.execute(
+ "SELECT exitcode, package, release, arch, triggers "
+ "FROM result, test "
+ "WHERE test.id == result.test_id "
+ "ORDER BY run_id DESC "
+ "LIMIT 15"
+ ):
hc = human_exitcode(row[0])
- res = hc if 'code' not in hc else 'fail'
+ res = hc if "code" not in hc else "fail"
recent.append((res, row[1], row[2], row[3], row[4]))
- return render('browse-home.html',
- indexes=indexes,
- recent_runs=recent)
+ return render("browse-home.html", indexes=indexes, recent_runs=recent)
# backwards-compatible path with debci that specifies the source hash
-@app.route('/packages/<_>/<package>')
-@app.route('/packages/<package>')
+@app.route("/packages/<_>/<package>")
+@app.route("/packages/<package>")
def package_overview(package, _=None):
results = {}
arches = set()
- for row in db_con.execute('SELECT MAX(run_id), exitcode, release, arch '
- 'FROM test, result '
- 'WHERE package = ? AND test.id = result.test_id '
- 'GROUP BY release, arch', (package,)):
+ for row in db_con.execute(
+ "SELECT MAX(run_id), exitcode, release, arch "
+ "FROM test, result "
+ "WHERE package = ? AND test.id = result.test_id "
+ "GROUP BY release, arch",
+ (package,),
+ ):
arches.add(row[3])
results.setdefault(row[2], {})[row[3]] = human_exitcode(row[1])
- return render('browse-package.html',
- package=package,
- releases=[release for release in results.keys() if release in SUPPORTED_UBUNTU_RELEASES],
- arches=sorted(arches),
- results=results,
- title_suffix='- %s' % package)
+ return render(
+ "browse-package.html",
+ package=package,
+ releases=[
+ release
+ for release in results.keys()
+ if release in SUPPORTED_UBUNTU_RELEASES
+ ],
+ arches=sorted(arches),
+ results=results,
+ title_suffix="- %s" % package,
+ )
# backwards-compatible path with debci that specifies the source hash
-@app.route('/packages/<_>/<package>/<release>/<arch>')
-@app.route('/packages/<package>/<release>/<arch>')
+@app.route("/packages/<_>/<package>/<release>/<arch>")
+@app.route("/packages/<package>/<release>/<arch>")
def package_release_arch(package, release, arch, _=None):
test_id = get_test_id(release, arch, package)
if test_id is None:
- return render('browse-error.html', error='Package does not exist', code=404)
+ return render(
+ "browse-error.html", error="Package does not exist", code=404
+ )
seen = set()
results = []
- for row in db_con.execute('SELECT run_id, version, triggers, duration, exitcode, requester FROM result '
- 'WHERE test_id=? '
- 'ORDER BY run_id DESC', (test_id,)):
- requester = row[5] if row[5] else '-'
+ for row in db_con.execute(
+ "SELECT run_id, version, triggers, duration, exitcode, requester FROM result "
+ "WHERE test_id=? "
+ "ORDER BY run_id DESC",
+ (test_id,),
+ ):
+ requester = row[5] if row[5] else "-"
code = human_exitcode(row[4])
- identifier = (row[1], row[2]) # Version + triggers uniquely identifies this result
- show_retry = code != 'pass' and identifier not in seen
+ identifier = (
+ row[1],
+ row[2],
+ ) # Version + triggers uniquely identifies this result
+ show_retry = code != "pass" and identifier not in seen
seen.add(identifier)
- url = os.path.join(swift_container_url % release, release, arch, srchash(package), package, row[0])
- results.append((row[1], row[2], human_date(row[0]), human_sec(row[3]), requester, code, url, show_retry))
-
- return render('browse-results.html',
- package=package,
- release=release,
- arch=arch,
- package_results=results,
- title_suffix='- %s/%s/%s' % (package, release, arch))
-
-
-@app.route('/running')
+ url = os.path.join(
+ swift_container_url % release,
+ release,
+ arch,
+ srchash(package),
+ package,
+ row[0],
+ )
+ results.append(
+ (
+ row[1],
+ row[2],
+ human_date(row[0]),
+ human_sec(row[3]),
+ requester,
+ code,
+ url,
+ show_retry,
+ )
+ )
+
+ return render(
+ "browse-results.html",
+ package=package,
+ release=release,
+ arch=arch,
+ package_results=results,
+ title_suffix="- %s/%s/%s" % (package, release, arch),
+ )
+
+
+@app.route("/running")
def running():
(releases, arches, queue_info) = get_queue_info()
queue_lengths = {}
for c in queue_info:
for r in releases:
for a in arches:
- (queue_length, queue_items) = queue_info.get(c, {}).get(r, {}).get(a, (0, []))
- queue_lengths.setdefault(c, {}).setdefault(r, {})[a] = queue_length
+ (queue_length, queue_items) = (
+ queue_info.get(c, {}).get(r, {}).get(a, (0, []))
+ )
+ queue_lengths.setdefault(c, {}).setdefault(r, {})[
+ a
+ ] = queue_length
try:
- with open('/run/amqp-status-collector/running.json') as f:
+ with open("/run/amqp-status-collector/running.json") as f:
# package -> runhash -> release -> arch -> (params, duration, logtail)
running_info = json.load(f)
except FileNotFoundError:
running_info = {}
- return render('browse-running.html',
- contexts=queue_info.keys(),
- releases=releases,
- arches=arches,
- queue_info=queue_info,
- queue_lengths=queue_lengths,
- running=running_info)
+ return render(
+ "browse-running.html",
+ contexts=queue_info.keys(),
+ releases=releases,
+ arches=arches,
+ queue_info=queue_info,
+ queue_lengths=queue_lengths,
+ running=running_info,
+ )
-@app.route('/queue_size.json')
+
+@app.route("/queue_size.json")
def queuesize_json():
out = {}
queue_info = get_queue_info()[2]
@@ -311,7 +377,8 @@ def queuesize_json():
queue_info[context][release][arch] = len(queue_items)
return json.dumps(queue_info, indent=2)
-@app.route('/queues.json')
+
+@app.route("/queues.json")
def queues_json():
out = {}
queue_info = get_queue_info()[2]
@@ -321,26 +388,29 @@ def queues_json():
for arch in queue_info[context][release]:
(queue_size, queue_items) = queue_info[context][release][arch]
queue_info[context][release][arch] = queue_items
- return flask.Response(json.dumps(queue_info, indent=2), mimetype='application/json')
+ return flask.Response(
+ json.dumps(queue_info, indent=2), mimetype="application/json"
+ )
-@app.route('/testlist')
+@app.route("/testlist")
def testlist():
# geneate index → [(package, version)] map
indexed_pkgs = {}
- for row in db_con.execute('SELECT package, MAX(version) '
- 'FROM test, result '
- 'WHERE id == test_id '
- 'GROUP BY package '
- 'ORDER BY package'):
+ for row in db_con.execute(
+ "SELECT package, MAX(version) "
+ "FROM test, result "
+ "WHERE id == test_id "
+ "GROUP BY package "
+ "ORDER BY package"
+ ):
# strip off epoch
- v = row[1][row[1].find(':') + 1:]
+ v = row[1][row[1].find(":") + 1 :]
indexed_pkgs.setdefault(srchash(row[0]), []).append((row[0], v))
- return render('browse-testlist.html',
- indexed_pkgs=indexed_pkgs)
+ return render("browse-testlist.html", indexed_pkgs=indexed_pkgs)
-@app.route('/statistics')
+@app.route("/statistics")
def statistics():
release_arches = get_release_arches()
@@ -352,26 +422,30 @@ def statistics():
data[release][arch] = {}
# number of packages with tests
- for (release, arch, numpkgs) in db_con.execute(
- 'SELECT release, arch, COUNT(DISTINCT package) '
- 'FROM test '
- 'GROUP BY release, arch'):
+ for release, arch, numpkgs in db_con.execute(
+ "SELECT release, arch, COUNT(DISTINCT package) "
+ "FROM test "
+ "GROUP BY release, arch"
+ ):
if numpkgs > 1:
try:
- data[release][arch]['numpkgs'] = numpkgs
+ data[release][arch]["numpkgs"] = numpkgs
except KeyError:
pass
# number of passed/failed test runs
- for (release, arch, key, numruns) in db_con.execute(
- 'SELECT release, arch, '
- ' CASE WHEN exitcode IN (0,2,8) '
- ' THEN \'passruns\' ELSE \'failruns\' END exit, '
- ' COUNT(run_id) '
- 'FROM result LEFT JOIN test ON result.test_id=test.id '
- 'GROUP BY release, arch, exit'):
+ for release, arch, key, numruns in db_con.execute(
+ "SELECT release, arch, "
+ " CASE WHEN exitcode IN (0,2,8) "
+ " THEN 'passruns' ELSE 'failruns' END exit, "
+ " COUNT(run_id) "
+ "FROM result LEFT JOIN test ON result.test_id=test.id "
+ "GROUP BY release, arch, exit"
+ ):
try:
- data[release][arch][key] = data[release][arch].get(key, 0) + numruns
+ data[release][arch][key] = (
+ data[release][arch].get(key, 0) + numruns
+ )
except KeyError:
pass
@@ -379,14 +453,18 @@ def statistics():
for release in release_arches:
sources = get_source_versions(db_con, release)
for arch in release_arches[release]:
- data[release][arch]['numpkgspass'] = success_count_for_release_and_arch(db_con, release, arch, sources)
+ data[release][arch][
+ "numpkgspass"
+ ] = success_count_for_release_and_arch(
+ db_con, release, arch, sources
+ )
- return render('browse-statistics.html',
- release_arches=release_arches,
- data=data)
+ return render(
+ "browse-statistics.html", release_arches=release_arches, data=data
+ )
-if __name__ == '__main__':
- app.config['DEBUG'] = True
+if __name__ == "__main__":
+ app.config["DEBUG"] = True
init_config()
CGIHandler().run(app)
diff --git a/charms/focal/autopkgtest-web/webcontrol/cache-amqp b/charms/focal/autopkgtest-web/webcontrol/cache-amqp
index c32ea40..1f72747 100755
--- a/charms/focal/autopkgtest-web/webcontrol/cache-amqp
+++ b/charms/focal/autopkgtest-web/webcontrol/cache-amqp
@@ -5,10 +5,10 @@ import configparser
import json
import logging
import os
-import time
import sqlite3
import sys
import tempfile
+import time
import urllib.parse
import amqplib.client_0_8 as amqp
@@ -116,9 +116,10 @@ class AutopkgtestQueueContents:
params = json.loads(req[1])
else:
params = {}
- if (params.get('readable-by', False) or
- params.get('swiftuser', False)):
- r = 'private job'
+ if params.get("readable-by", False) or params.get(
+ "swiftuser", False
+ ):
+ r = "private job"
res.append(r)
except (ValueError, IndexError):
logging.error('Received invalid request format "%s"', r)
@@ -155,7 +156,11 @@ class AutopkgtestQueueContents:
# ubuntu test requests use context-less name (backwards compatibility)
queue_name = "debci-%s-%s" % (release, arch)
else:
- queue_name = "debci-%s-%s-%s" % (context, release, arch)
+ queue_name = "debci-%s-%s-%s" % (
+ context,
+ release,
+ arch,
+ )
try:
requests = self.get_queue_requests(queue_name)
except AMQPChannelException as e:
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results
index ab0a1e3..0ae5c4e 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-all-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results
@@ -10,21 +10,21 @@
# notification of completed jobs, in case of bugs or network outages etc, this
# script can be used to find any results which were missed and insert them.
-import os
-import sys
+import configparser
+import http
+import io
+import json
import logging
+import os
import sqlite3
-import io
+import sys
import tarfile
-import json
-import configparser
-import urllib.parse
import time
-import http
+import urllib.parse
+from urllib.request import urlopen
from distro_info import UbuntuDistroInfo
from helpers.utils import get_test_id, init_db
-from urllib.request import urlopen
LOGGER = logging.getLogger(__name__)
@@ -46,7 +46,7 @@ def list_remote_container(container_url):
try:
resp = urlopen(url)
except http.client.RemoteDisconnected as e:
- LOGGER.debug('Got disconnected, sleeping')
+ LOGGER.debug("Got disconnected, sleeping")
time.sleep(5)
continue
else:
@@ -134,7 +134,9 @@ def fetch_one_result(url):
# KeyError means the file is not there, i.e. there isn't a human
# requester
try:
- requester = tar.extractfile("requester").read().decode().strip()
+ requester = (
+ tar.extractfile("requester").read().decode().strip()
+ )
except KeyError as e:
requester = ""
except (KeyError, ValueError, tarfile.TarError) as e:
@@ -203,7 +205,9 @@ def fetch_container(release, container_url):
LOGGER.debug("Need to download %d items", len(need_to_fetch))
for run_id in need_to_fetch:
- fetch_one_result(os.path.join(container_url, known_results[run_id]))
+ fetch_one_result(
+ os.path.join(container_url, known_results[run_id])
+ )
except urllib.error.HTTPError as e:
if e.code == 401:
LOGGER.warning(f"Couldn't access {container_url} - doesn't exist?")
@@ -221,7 +225,9 @@ if __name__ == "__main__":
LOGGER.addHandler(ch)
releases = list(
- set(UbuntuDistroInfo().supported() + UbuntuDistroInfo().supported_esm())
+ set(
+ UbuntuDistroInfo().supported() + UbuntuDistroInfo().supported_esm()
+ )
)
releases.sort(key=UbuntuDistroInfo().all.index)
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results
index b8d4188..419af47 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-results
@@ -7,11 +7,10 @@ import os
import socket
import sqlite3
import urllib.parse
-
-from helpers.utils import get_test_id, init_db
from urllib.request import urlopen
import amqplib.client_0_8 as amqp
+from helpers.utils import get_test_id, init_db
EXCHANGE_NAME = "testcomplete.fanout"
@@ -86,7 +85,15 @@ def process_message(msg, db_con):
c = db_con.cursor()
c.execute(
"INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?)",
- (test_id, run_id, version, triggers, duration, exitcode, requester),
+ (
+ test_id,
+ run_id,
+ version,
+ triggers,
+ duration,
+ exitcode,
+ requester,
+ ),
)
db_con.commit()
except sqlite3.IntegrityError:
diff --git a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
index ec74d8f..6fbb09c 100644
--- a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
+++ b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
@@ -1,89 +1,103 @@
-'''
+"""
utilities for autopkgtest-web webcontrol
-'''
-#pylint: disable=protected-access
+"""
+# pylint: disable=protected-access
import logging
import os
import sqlite3
def setup_key(app, path):
- '''Create or load app.secret_key for cookie encryption.'''
+ """Create or load app.secret_key for cookie encryption."""
try:
- with open(path, 'rb') as f:
+ with open(path, "rb") as f:
app.secret_key = f.read()
except FileNotFoundError:
key = os.urandom(24)
- with open(path, 'wb') as f:
+ with open(path, "wb") as f:
os.fchmod(f.fileno(), 0o600)
f.write(key)
app.secret_key = key
+
def init_db(path):
- '''Create DB if it does not exist, and connect to it'''
+ """Create DB if it does not exist, and connect to it"""
db = sqlite3.connect(path)
c = db.cursor()
try:
- c.execute('PRAGMA journal_mode = WAL')
- c.execute('CREATE TABLE IF NOT EXISTS test ('
- ' id INTEGER PRIMARY KEY, '
- ' release CHAR[20], '
- ' arch CHAR[20], '
- ' package char[120])')
- c.execute('CREATE TABLE IF NOT EXISTS result ('
- ' test_id INTEGER, '
- ' run_id CHAR[30], '
- ' version VARCHAR[200], '
- ' triggers TEXT, '
- ' duration INTEGER, '
- ' exitcode INTEGER, '
- ' requester TEXT, '
- ' PRIMARY KEY(test_id, run_id), '
- ' FOREIGN KEY(test_id) REFERENCES test(id))')
+ c.execute("PRAGMA journal_mode = WAL")
+ c.execute(
+ "CREATE TABLE IF NOT EXISTS test ("
+ " id INTEGER PRIMARY KEY, "
+ " release CHAR[20], "
+ " arch CHAR[20], "
+ " package char[120])"
+ )
+ c.execute(
+ "CREATE TABLE IF NOT EXISTS result ("
+ " test_id INTEGER, "
+ " run_id CHAR[30], "
+ " version VARCHAR[200], "
+ " triggers TEXT, "
+ " duration INTEGER, "
+ " exitcode INTEGER, "
+ " requester TEXT, "
+ " PRIMARY KEY(test_id, run_id), "
+ " FOREIGN KEY(test_id) REFERENCES test(id))"
+ )
# /packages/<name> mostly benefits from the index on package (0.8s -> 0.01s),
# but adding the other fields improves it a further 50% to 0.005s.
- c.execute('CREATE UNIQUE INDEX IF NOT EXISTS test_package_uix ON test('
- ' package, release, arch)')
- c.execute('CREATE INDEX IF NOT EXISTS result_run_ix ON result('
- ' run_id desc)')
+ c.execute(
+ "CREATE UNIQUE INDEX IF NOT EXISTS test_package_uix ON test("
+ " package, release, arch)"
+ )
+ c.execute(
+ "CREATE INDEX IF NOT EXISTS result_run_ix ON result("
+ " run_id desc)"
+ )
db.commit()
- logging.debug('database %s created', path)
+ logging.debug("database %s created", path)
except sqlite3.OperationalError as e:
- if 'already exists' not in str(e):
+ if "already exists" not in str(e):
raise
- logging.debug('database %s already exists', path)
+ logging.debug("database %s already exists", path)
return db
+
def get_test_id(db_con, release, arch, src):
- '''
+ """
get id of test
- '''
+ """
if not get_test_id._cache:
# prime the cache with all test IDs; much more efficient than doing
# thousands of individual queries
c = db_con.cursor()
- c.execute('SELECT * FROM test')
+ c.execute("SELECT * FROM test")
while True:
row = c.fetchone()
if row is None:
break
- get_test_id._cache[row[1] + '/' + row[2] + '/' + row[3]] = row[0]
+ get_test_id._cache[row[1] + "/" + row[2] + "/" + row[3]] = row[0]
- cache_idx = release + '/' + arch + '/' + src
+ cache_idx = release + "/" + arch + "/" + src
try:
return get_test_id._cache[cache_idx]
except KeyError:
# create new ID
c = db_con.cursor()
try:
- c.execute('INSERT INTO test VALUES (NULL, ?, ?, ?)', (release, arch, src))
+ c.execute(
+ "INSERT INTO test VALUES (NULL, ?, ?, ?)", (release, arch, src)
+ )
except sqlite3.IntegrityError:
# our cache got out of date in the meantime
- c.execute("SELECT id from test where release " + \
- "= ? and arch = ? and package = ?",
- (release, arch, src))
+ c.execute(
+ "SELECT id from test where release "
+ + "= ? and arch = ? and package = ?",
+ (release, arch, src),
+ )
test_id = c.fetchone()[0]
else:
test_id = c.lastrowid
diff --git a/charms/focal/autopkgtest-web/webcontrol/private_results/app.py b/charms/focal/autopkgtest-web/webcontrol/private_results/app.py
index 95cec50..ee7b6d0 100644
--- a/charms/focal/autopkgtest-web/webcontrol/private_results/app.py
+++ b/charms/focal/autopkgtest-web/webcontrol/private_results/app.py
@@ -1,22 +1,27 @@
"""Test Result Fetcher Flask App"""
+import configparser
+import logging
import os
import sys
-import logging
-import swiftclient
-import configparser
-
from html import escape
-from flask import (Flask, Response, request, session, redirect,
- render_template_string)
+
+import swiftclient
+from flask import (
+ Flask,
+ Response,
+ redirect,
+ render_template_string,
+ request,
+ session,
+)
from flask_openid import OpenID
from werkzeug.middleware.proxy_fix import ProxyFix
-sys.path.append('..')
+sys.path.append("..")
from helpers.utils import setup_key
from request.submit import Submit
-
HTML = """
<!doctype html>
<html>
@@ -45,8 +50,9 @@ def swift_get_object(connection, container, path):
try:
_, contents = connection.get_object(container, path)
except swiftclient.exceptions.ClientException as e:
- logging.error('Failed to fetch %s from container (%s)' %
- (path, str(e)))
+ logging.error(
+ "Failed to fetch %s from container (%s)" % (path, str(e))
+ )
return None
return contents
@@ -57,60 +63,58 @@ def validate_user_path(connection, container, nick, path):
allowed_file = swift_get_object(connection, container, path)
if not allowed_file:
return False
- allowed = allowed_file.decode('utf-8').splitlines()
+ allowed = allowed_file.decode("utf-8").splitlines()
# Check if user is allowed
# (separate step not to do unnecessary LP API calls)
if nick in allowed:
return True
# Check if user is allowed via team membership
for entity in allowed:
- (code, response) = Submit.lp_request('~%s/participants' % entity, {})
+ (code, response) = Submit.lp_request("~%s/participants" % entity, {})
if code != 200:
- logging.error('Unable to validate user %s (%s)' % (nick, code))
+ logging.error("Unable to validate user %s (%s)" % (nick, code))
return False
- for e in response.get('entries', []):
- if e.get('name') == nick:
+ for e in response.get("entries", []):
+ if e.get("name") == nick:
return True
return False
# Initialize app
-PATH = os.path.join(os.getenv('TMPDIR', '/tmp'), 'autopkgtest_webcontrol')
+PATH = os.path.join(os.getenv("TMPDIR", "/tmp"), "autopkgtest_webcontrol")
os.makedirs(PATH, exist_ok=True)
-app = Flask('private-results')
+app = Flask("private-results")
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
# Keep secret persistent between CGI invocations
-secret_path = os.path.join(PATH, 'secret_key')
+secret_path = os.path.join(PATH, "secret_key")
setup_key(app, secret_path)
-oid = OpenID(app, os.path.join(PATH, 'openid'), safe_roots=[])
+oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[])
# Load configuration
cfg = configparser.ConfigParser()
-cfg.read(os.path.expanduser('~/swift-web-credentials.conf'))
+cfg.read(os.path.expanduser("~/swift-web-credentials.conf"))
# Build swift credentials
-auth_url = cfg.get('swift', 'auth_url')
-if '/v2.0' in auth_url:
+auth_url = cfg.get("swift", "auth_url")
+if "/v2.0" in auth_url:
swift_creds = {
- 'authurl': auth_url,
- 'user': cfg.get('swift', 'username'),
- 'key': cfg.get('swift', 'password'),
- 'tenant_name': cfg.get('swift', 'tenant'),
- 'os_options': {
- 'region_name': cfg.get('swift', 'region_name')
- },
- 'auth_version': '2.0'
+ "authurl": auth_url,
+ "user": cfg.get("swift", "username"),
+ "key": cfg.get("swift", "password"),
+ "tenant_name": cfg.get("swift", "tenant"),
+ "os_options": {"region_name": cfg.get("swift", "region_name")},
+ "auth_version": "2.0",
}
else:
swift_creds = {
- 'authurl': auth_url,
- 'user': cfg.get('swift', 'username'),
- 'key': cfg.get('swift', 'password'),
- 'os_options': {
- 'region_name': cfg.get('swift', 'region_name'),
- 'project_domain_name': cfg.get('swift', 'project_domain_name'),
- 'project_name': cfg.get('swift', 'project_name'),
- 'user_domain_name': cfg.get('swift', 'user_domain_name')
- },
- 'auth_version': '3'
+ "authurl": auth_url,
+ "user": cfg.get("swift", "username"),
+ "key": cfg.get("swift", "password"),
+ "os_options": {
+ "region_name": cfg.get("swift", "region_name"),
+ "project_domain_name": cfg.get("swift", "project_domain_name"),
+ "project_name": cfg.get("swift", "project_name"),
+ "user_domain_name": cfg.get("swift", "user_domain_name"),
+ },
+ "auth_version": "3",
}
# Connect to swift
connection = swiftclient.Connection(**swift_creds)
@@ -120,58 +124,62 @@ connection = swiftclient.Connection(**swift_creds)
# Flask routes
#
-@app.route('/', methods=['GET'])
+
+@app.route("/", methods=["GET"])
def index_root():
"""Handle the main index root, just pure informational."""
return render_template_string(
- HTML, content='Please provide the path to the private result.')
+ HTML, content="Please provide the path to the private result."
+ )
-@app.route('/<container>/<series>/<arch>/<group>/<src>/<runid>/<file>',
- methods=['GET'])
+@app.route(
+ "/<container>/<series>/<arch>/<group>/<src>/<runid>/<file>",
+ methods=["GET"],
+)
def index_result(container, series, arch, group, src, runid, file):
"""Handle all GET requests for private tests."""
session.permanent = True
- session['next'] = escape(request.url)
- if not container.startswith('private-'):
+ session["next"] = escape(request.url)
+ if not container.startswith("private-"):
return render_template_string(
- HTML, content='Limited to private results only.')
- nick = session.get('nickname')
+ HTML, content="Limited to private results only."
+ )
+ nick = session.get("nickname")
if nick:
# Authenticated via SSO, so that's a start
parent_path = os.path.join(series, arch, group, src, runid)
object_path = os.path.join(parent_path, file)
- acl_path = os.path.join(parent_path, 'readable-by')
+ acl_path = os.path.join(parent_path, "readable-by")
if not validate_user_path(connection, container, nick, acl_path):
return render_template_string(HTML, content=DENIED), 403
# We can pull the result now
result = swift_get_object(connection, container, object_path)
if result is None:
return render_template_string(HTML, content=DENIED), 403
- if file.endswith('.gz'):
- content_type = 'text/plain; charset=UTF-8'
- headers = {'Content-Encoding': 'gzip'}
+ if file.endswith(".gz"):
+ content_type = "text/plain; charset=UTF-8"
+ headers = {"Content-Encoding": "gzip"}
return Response(result, content_type=content_type, headers=headers)
else:
return result
else:
# XXX: render_template_string urlencodes its context values, so it's
# not really possible to have 'nested HTML' rendered properly.
- return HTML.replace("{{ content }}",
- render_template_string(LOGIN, **session))
+ return HTML.replace(
+ "{{ content }}", render_template_string(LOGIN, **session)
+ )
-@app.route('/login', methods=['GET', 'POST'])
+@app.route("/login", methods=["GET", "POST"])
@oid.loginhandler
def login():
"""Initiate OpenID login."""
- if 'nickname' in session:
+ if "nickname" in session:
return redirect(oid.get_next_url())
- if 'next' in request.form:
- return oid.try_login(
- 'https://login.ubuntu.com/',
- ask_for=['nickname'])
- return redirect('/private-results')
+ if "next" in request.form:
+ return oid.try_login("https://login.ubuntu.com/", ask_for=["nickname"])
+ return redirect("/private-results")
@oid.after_login
@@ -184,7 +192,7 @@ def identify(resp):
return redirect(oid.get_next_url())
-@app.route('/logout')
+@app.route("/logout")
def logout():
"""Clear user session, logging them out."""
session.clear()
diff --git a/charms/focal/autopkgtest-web/webcontrol/publish-db b/charms/focal/autopkgtest-web/webcontrol/publish-db
index a44b04a..f35c9e8 100755
--- a/charms/focal/autopkgtest-web/webcontrol/publish-db
+++ b/charms/focal/autopkgtest-web/webcontrol/publish-db
@@ -61,8 +61,10 @@ def init_db(path, path_current, path_rw):
if "duplicate column name" not in str(e2):
raise
logging.debug("table already exists")
- db.execute("CREATE INDEX IF NOT EXISTS current_version_pocket_ix "
- "ON current_version(pocket, component)")
+ db.execute(
+ "CREATE INDEX IF NOT EXISTS current_version_pocket_ix "
+ "ON current_version(pocket, component)"
+ )
try:
db.execute(
@@ -144,7 +146,7 @@ def get_sources(db_con, release):
"VALUES (:url, :timestamp) "
"ON CONFLICT (url) DO "
"UPDATE SET timestamp = :timestamp",
- {'url': url, 'timestamp': last_modified}
+ {"url": url, "timestamp": last_modified},
)
db_con.execute(
@@ -164,11 +166,11 @@ def get_sources(db_con, release):
"UPDATE SET pocket = :pocket, "
"component = :component, version = :version",
{
- 'release': release,
- 'pocket': pocket,
- 'component': component,
- 'package': section["Package"],
- 'version': section["Version"],
+ "release": release,
+ "pocket": pocket,
+ "component": component,
+ "package": section["Package"],
+ "version": section["Version"],
},
)
db_con.commit()
diff --git a/charms/focal/autopkgtest-web/webcontrol/request-test.cgi b/charms/focal/autopkgtest-web/webcontrol/request-test.cgi
index 009f014..06ffe36 100755
--- a/charms/focal/autopkgtest-web/webcontrol/request-test.cgi
+++ b/charms/focal/autopkgtest-web/webcontrol/request-test.cgi
@@ -3,5 +3,5 @@
from request.app import app
-if __name__ == '__main__':
- app.run(host='0.0.0.0', debug=True)
+if __name__ == "__main__":
+ app.run(host="0.0.0.0", debug=True)
diff --git a/charms/focal/autopkgtest-web/webcontrol/request.cgi b/charms/focal/autopkgtest-web/webcontrol/request.cgi
index 53fc4a5..8dcf2b5 100755
--- a/charms/focal/autopkgtest-web/webcontrol/request.cgi
+++ b/charms/focal/autopkgtest-web/webcontrol/request.cgi
@@ -3,8 +3,9 @@
"""Run request app as CGI script """
from wsgiref.handlers import CGIHandler
+
from request.app import app
-if __name__ == '__main__':
- app.config['DEBUG'] = True
+if __name__ == "__main__":
+ app.config["DEBUG"] = True
CGIHandler().run(app)
diff --git a/charms/focal/autopkgtest-web/webcontrol/request/app.py b/charms/focal/autopkgtest-web/webcontrol/request/app.py
index 15c5b5c..8654ec2 100644
--- a/charms/focal/autopkgtest-web/webcontrol/request/app.py
+++ b/charms/focal/autopkgtest-web/webcontrol/request/app.py
@@ -1,25 +1,21 @@
"""Test Request Flask App"""
-import os
-import logging
import hmac
import json
+import logging
+import os
from collections import ChainMap
from html import escape as _escape
-from flask import Flask, request, session, redirect
+from flask import Flask, redirect, request, session
from flask_openid import OpenID
-from werkzeug.middleware.proxy_fix import ProxyFix
-
from helpers.utils import setup_key
from request.submit import Submit
-
+from werkzeug.middleware.proxy_fix import ProxyFix
# map multiple GET vars to AMQP JSON request parameter list
-MULTI_ARGS = {'trigger': 'triggers',
- 'ppa': 'ppas',
- 'env': 'env'}
+MULTI_ARGS = {"trigger": "triggers", "ppa": "ppas", "env": "env"}
-EMPTY = ''
+EMPTY = ""
HTML = """
<!doctype html>
@@ -64,33 +60,40 @@ def check_github_sig(request):
See https://developer.github.com/webhooks/securing/
"""
# load key
- keyfile = os.path.expanduser('~/github-secrets.json')
- package = request.args.get('package')
+ keyfile = os.path.expanduser("~/github-secrets.json")
+ package = request.args.get("package")
try:
with open(keyfile) as f:
keymap = json.load(f)
- key = keymap[package].encode('ASCII')
+ key = keymap[package].encode("ASCII")
except (IOError, ValueError, KeyError, UnicodeEncodeError) as e:
- logging.error('Failed to load GitHub key for package %s: %s', package, e)
+ logging.error(
+ "Failed to load GitHub key for package %s: %s", package, e
+ )
return False
- sig_sha1 = request.headers.get('X-Hub-Signature', '')
- payload_sha1 = 'sha1=' + hmac.new(key, request.data, 'sha1').hexdigest()
+ sig_sha1 = request.headers.get("X-Hub-Signature", "")
+ payload_sha1 = "sha1=" + hmac.new(key, request.data, "sha1").hexdigest()
if hmac.compare_digest(sig_sha1, payload_sha1):
return True
- logging.error('check_github_sig: signature mismatch! received: %s calculated: %s',
- sig_sha1, payload_sha1)
+ logging.error(
+ "check_github_sig: signature mismatch! received: %s calculated: %s",
+ sig_sha1,
+ payload_sha1,
+ )
return False
def invalid(message, code=400):
"""Return message and HTTP error code for an invalid request and log it"""
- if 'nickname' in session:
+ if "nickname" in session:
html = LOGOUT.format(**session)
else:
- html = ''
- html += '<p>You submitted an invalid request: %s</p>' % maybe_escape(str(message))
- logging.error('Request failed with %i: %s', code, message)
+ html = ""
+ html += "<p>You submitted an invalid request: %s</p>" % maybe_escape(
+ str(message)
+ )
+ logging.error("Request failed with %i: %s", code, message)
return HTML.format(html), code
@@ -100,28 +103,31 @@ def maybe_escape(value):
# Initialize app
-PATH = os.path.join(os.path.sep, 'run', 'autopkgtest_webcontrol')
+PATH = os.path.join(os.path.sep, "run", "autopkgtest_webcontrol")
os.makedirs(PATH, exist_ok=True)
-app = Flask('request')
+app = Flask("request")
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
# keep secret persistent between CGI invocations
-secret_path = os.path.join(PATH, 'secret_key')
+secret_path = os.path.join(PATH, "secret_key")
setup_key(app, secret_path)
-oid = OpenID(app, os.path.join(PATH, 'openid'), safe_roots=[])
+oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[])
#
# Flask routes
#
-@app.route('/', methods=['GET', 'POST'])
+
+@app.route("/", methods=["GET", "POST"])
def index_root():
"""Handle all GET requests."""
session.permanent = True
- session['next'] = maybe_escape(request.url)
- nick = maybe_escape(session.get('nickname'))
+ session["next"] = maybe_escape(request.url)
+ nick = maybe_escape(session.get("nickname"))
- params = {maybe_escape(k): maybe_escape(v) for k, v in request.args.items()}
+ params = {
+ maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
+ }
# convert multiple GET args into lists
for getarg, paramname in MULTI_ARGS.items():
try:
@@ -134,61 +140,82 @@ def index_root():
# split "VAR1=value;VAR2=value" --env arguments, as some frameworks don't
# allow multipe "env="
- if 'env' in params:
+ if "env" in params:
splitenv = []
- for e in params['env']:
- splitenv += e.split(';')
- params['env'] = splitenv
+ for e in params["env"]:
+ splitenv += e.split(";")
+ params["env"] = splitenv
# request from github?
- if b'api.github.com' in request.data:
+ if b"api.github.com" in request.data:
if not check_github_sig(request):
- return invalid('GitHub signature verification failed', 403)
+ return invalid("GitHub signature verification failed", 403)
- if request.headers.get('X-GitHub-Event') == 'ping':
- return HTML.format('<p>OK</p>')
+ if request.headers.get("X-GitHub-Event") == "ping":
+ return HTML.format("<p>OK</p>")
github_params = request.get_json()
- if github_params.get('action') not in ['opened', 'synchronize']:
- return HTML.format('<p>GitHub PR action %s is not relevant for testing</p>' %
- github_params.get('action', '<none>'))
+ if github_params.get("action") not in ["opened", "synchronize"]:
+ return HTML.format(
+ "<p>GitHub PR action %s is not relevant for testing</p>"
+ % github_params.get("action", "<none>")
+ )
s = Submit()
try:
- params.setdefault('env', []).append('UPSTREAM_PULL_REQUEST=%i' % int(github_params['number']))
- statuses_url = github_params['pull_request']['statuses_url']
- params['env'].append('GITHUB_STATUSES_URL=' + statuses_url)
+ params.setdefault("env", []).append(
+ "UPSTREAM_PULL_REQUEST=%i" % int(github_params["number"])
+ )
+ statuses_url = github_params["pull_request"]["statuses_url"]
+ params["env"].append("GITHUB_STATUSES_URL=" + statuses_url)
# support autopkgtests in upstream repos, set build-git URL to the
# PR clone URL if not given
- if 'build-git' not in params:
- params['build-git'] = '%s#refs/pull/%s/head' % (
- github_params['pull_request']['base']['repo']['clone_url'],
- github_params['number'])
+ if "build-git" not in params:
+ params["build-git"] = "%s#refs/pull/%s/head" % (
+ github_params["pull_request"]["base"]["repo"]["clone_url"],
+ github_params["number"],
+ )
s.validate_git_request(**params)
except (ValueError, TypeError) as e:
return invalid(e)
except KeyError as e:
- return invalid('Missing field in JSON data: %s' % e)
+ return invalid("Missing field in JSON data: %s" % e)
- s.send_amqp_request(context='upstream', **params)
+ s.send_amqp_request(context="upstream", **params)
# write status file for pending test
- os.makedirs(os.path.join(PATH, 'github-pending'), exist_ok=True)
- with open(os.path.join(PATH, 'github-pending', '%s-%s-%s-%s-%s' %
- (params['release'], params['arch'],
- params['package'], github_params['number'],
- os.path.basename(statuses_url))), 'w') as f:
+ os.makedirs(os.path.join(PATH, "github-pending"), exist_ok=True)
+ with open(
+ os.path.join(
+ PATH,
+ "github-pending",
+ "%s-%s-%s-%s-%s"
+ % (
+ params["release"],
+ params["arch"],
+ params["package"],
+ github_params["number"],
+ os.path.basename(statuses_url),
+ ),
+ ),
+ "w",
+ ) as f:
f.write(json.dumps(params))
# tell GitHub that the test is pending
- status = {'state': 'pending',
- 'context': '%s-%s' % (params['release'], params['arch']),
- 'description': 'autopkgtest running',
- 'target_url':
- 'http://autopkgtest.ubuntu.com/running#pkg-' + params['package']}
- s.post_json(statuses_url, status,
- os.path.expanduser('~/github-status-credentials.txt'),
- params['package'])
+ status = {
+ "state": "pending",
+ "context": "%s-%s" % (params["release"], params["arch"]),
+ "description": "autopkgtest running",
+ "target_url": "http://autopkgtest.ubuntu.com/running#pkg-"
+ + params["package"],
+ }
+ s.post_json(
+ statuses_url,
+ status,
+ os.path.expanduser("~/github-status-credentials.txt"),
+ params["package"],
+ )
success = SUCCESS.format(
EMPTY.join(ROW.format(key, val) for key, val in params.items())
@@ -197,53 +224,55 @@ def index_root():
# distro request? Require SSO auth and validate_distro_request()
elif nick:
- params['requester'] = nick
+ params["requester"] = nick
s = Submit()
try:
s.validate_distro_request(**params)
except (ValueError, TypeError) as e:
return invalid(e)
- if params.get('delete'):
- del params['delete']
- if params.get('ppas'):
- count = s.unsend_amqp_request(context='ppa', **params)
+ if params.get("delete"):
+ del params["delete"]
+ if params.get("ppas"):
+ count = s.unsend_amqp_request(context="ppa", **params)
else:
count = s.unsend_amqp_request(**params)
- return HTML.format(LOGOUT +
- "<p>Deleted {} requests</p>".format(count)).format(
- **ChainMap(session, params))
+ return HTML.format(
+ LOGOUT + "<p>Deleted {} requests</p>".format(count)
+ ).format(**ChainMap(session, params))
- if params.get('ppas'):
- s.send_amqp_request(context='ppa', **params)
+ if params.get("ppas"):
+ s.send_amqp_request(context="ppa", **params)
else:
s.send_amqp_request(**params)
# add link to result page for Ubuntu results
- if not params.get('ppas'):
- url = 'https://autopkgtest.ubuntu.com/packages/{}/{}/{}'.format(
- params['package'], params['release'], params['arch'])
- params['Result history'] = '<a href="{}">{}</a>'.format(url, url)
+ if not params.get("ppas"):
+ url = "https://autopkgtest.ubuntu.com/packages/{}/{}/{}".format(
+ params["package"], params["release"], params["arch"]
+ )
+ params["Result history"] = '<a href="{}">{}</a>'.format(url, url)
success = SUCCESS.format(
- EMPTY.join(ROW.format(key, val) for key, val in sorted(params.items()))
+ EMPTY.join(
+ ROW.format(key, val) for key, val in sorted(params.items())
+ )
)
return HTML.format(LOGOUT + success).format(
- **ChainMap(session, params))
+ **ChainMap(session, params)
+ )
else:
return HTML.format(LOGIN).format(**session), 403
-@app.route('/login', methods=['GET', 'POST'])
+@app.route("/login", methods=["GET", "POST"])
@oid.loginhandler
def login():
"""Initiate OpenID login."""
- if 'nickname' in session:
+ if "nickname" in session:
return redirect(oid.get_next_url())
- if 'next' in request.form:
- return oid.try_login(
- 'https://login.ubuntu.com/',
- ask_for=['nickname'])
- return redirect('/')
+ if "next" in request.form:
+ return oid.try_login("https://login.ubuntu.com/", ask_for=["nickname"])
+ return redirect("/")
@oid.after_login
@@ -256,7 +285,7 @@ def identify(resp):
return redirect(oid.get_next_url())
-@app.route('/logout')
+@app.route("/logout")
def logout():
"""Clear user session, logging them out."""
session.clear()
diff --git a/charms/focal/autopkgtest-web/webcontrol/request/submit.py b/charms/focal/autopkgtest-web/webcontrol/request/submit.py
index 0e35b3f..bc283a7 100644
--- a/charms/focal/autopkgtest-web/webcontrol/request/submit.py
+++ b/charms/focal/autopkgtest-web/webcontrol/request/submit.py
@@ -3,63 +3,69 @@
Author: Martin Pitt <martin.pitt@xxxxxxxxxx>
"""
-import os
-import json
-import re
import base64
-import logging
import configparser
+import json
+import logging
+import os
+import re
import sqlite3
-import urllib.request
import urllib.parse
-from urllib.error import HTTPError
+import urllib.request
from datetime import datetime
+from urllib.error import HTTPError
import amqplib.client_0_8 as amqp
from distro_info import UbuntuDistroInfo
# Launchpad REST API base
-LP = 'https://api.launchpad.net/1.0/'
-NAME = re.compile('^[a-z0-9][a-z0-9.+-]+$')
-VERSION = re.compile('^[a-zA-Z0-9.+:~-]+$')
+LP = "https://api.launchpad.net/1.0/"
+NAME = re.compile("^[a-z0-9][a-z0-9.+-]+$")
+VERSION = re.compile("^[a-zA-Z0-9.+:~-]+$")
# allowed values are rather conservative, expand if/when needed
-ENV = re.compile(r'^[a-zA-Z][a-zA-Z0-9_]+=[a-zA-Z0-9.:~/ -=]*$')
+ENV = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]+=[a-zA-Z0-9.:~/ -=]*$")
# URL and optional branch name
-GIT = re.compile(r'^https?://[a-zA-Z0-9._/~+-]+(#[a-zA-Z0-9._/-]+)?$')
+GIT = re.compile(r"^https?://[a-zA-Z0-9._/~+-]+(#[a-zA-Z0-9._/-]+)?$")
-ALLOWED_TEAMS = ['canonical-kernel-distro-team']
+ALLOWED_TEAMS = ["canonical-kernel-distro-team"]
# not teams
-ALLOWED_USERS_PERPACKAGE = {'snapcraft': ['snappy-m-o']}
+ALLOWED_USERS_PERPACKAGE = {"snapcraft": ["snappy-m-o"]}
class Submit:
def __init__(self):
cp = configparser.ConfigParser()
- cp.read(os.path.expanduser('~ubuntu/autopkgtest-cloud.conf'))
+ cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
# read valid releases and architectures from DB
- self.db_con = sqlite3.connect('file:%s?mode=ro' % cp['web']['database_ro'], uri=True)
- self.releases = set(UbuntuDistroInfo().supported() + UbuntuDistroInfo().supported_esm())
- logging.debug('Valid releases: %s' % self.releases)
+ self.db_con = sqlite3.connect(
+ "file:%s?mode=ro" % cp["web"]["database_ro"], uri=True
+ )
+ self.releases = set(
+ UbuntuDistroInfo().supported() + UbuntuDistroInfo().supported_esm()
+ )
+ logging.debug("Valid releases: %s" % self.releases)
self.architectures = set()
c = self.db_con.cursor()
- c.execute('SELECT DISTINCT arch from test')
+ c.execute("SELECT DISTINCT arch from test")
while True:
row = c.fetchone()
if row is None:
break
self.architectures.add(row[0])
- logging.debug('Valid architectures: %s' % self.architectures)
+ logging.debug("Valid architectures: %s" % self.architectures)
# dissect AMQP URL
- self.amqp_creds = urllib.parse.urlsplit(cp['amqp']['uri'],
- allow_fragments=False)
- assert self.amqp_creds.scheme == 'amqp'
- logging.debug('AMQP credentials: %s' % repr(self.amqp_creds))
-
- def validate_distro_request(self, release, arch, package, triggers,
- requester, ppas=[], **kwargs):
+ self.amqp_creds = urllib.parse.urlsplit(
+ cp["amqp"]["uri"], allow_fragments=False
+ )
+ assert self.amqp_creds.scheme == "amqp"
+ logging.debug("AMQP credentials: %s" % repr(self.amqp_creds))
+
+ def validate_distro_request(
+ self, release, arch, package, triggers, requester, ppas=[], **kwargs
+ ):
"""Validate package and triggers for a distro test request
'package' is a single source package name. 'triggers' has the format
@@ -73,93 +79,127 @@ class Submit:
can_upload_any_trigger = False
try:
- if kwargs['delete'] != '1':
- raise ValueError('Invalid delete value')
- del kwargs['delete']
+ if kwargs["delete"] != "1":
+ raise ValueError("Invalid delete value")
+ del kwargs["delete"]
except KeyError:
pass
try:
- if kwargs['all-proposed'] != '1':
- raise ValueError('Invalid all-proposed value')
- del kwargs['all-proposed']
+ if kwargs["all-proposed"] != "1":
+ raise ValueError("Invalid all-proposed value")
+ del kwargs["all-proposed"]
except KeyError:
pass
try:
- if not kwargs['readable-by']:
- raise ValueError('Invalid readable-by value')
- del kwargs['readable-by']
+ if not kwargs["readable-by"]:
+ raise ValueError("Invalid readable-by value")
+ del kwargs["readable-by"]
except KeyError:
pass
# no other kwargs supported
if kwargs:
- raise ValueError('Invalid argument %s' % list(kwargs)[0])
+ raise ValueError("Invalid argument %s" % list(kwargs)[0])
if release not in self.releases:
- raise ValueError('Unknown release ' + release)
+ raise ValueError("Unknown release " + release)
if arch not in self.architectures:
- raise ValueError('Unknown architecture ' + arch)
+ raise ValueError("Unknown architecture " + arch)
for ppa in ppas:
if not self.is_valid_ppa(ppa):
- raise ValueError('Unknown PPA ' + ppa)
+ raise ValueError("Unknown PPA " + ppa)
# allow kernel tests for EOL vivid
- skip_result_check = (release == 'vivid' and triggers and triggers[0].startswith('linux'))
- if not self.is_valid_package_with_results(None if (ppas or skip_result_check) else release, arch, package):
- raise ValueError('Package %s does not have any test results' %
- package)
-
- if 'migration-reference/0' in triggers:
+ skip_result_check = (
+ release == "vivid" and triggers and triggers[0].startswith("linux")
+ )
+ if not self.is_valid_package_with_results(
+ None if (ppas or skip_result_check) else release, arch, package
+ ):
+ raise ValueError(
+ "Package %s does not have any test results" % package
+ )
+
+ if "migration-reference/0" in triggers:
if len(triggers) != 1:
- raise ValueError('Cannot use additional triggers with migration-reference/0')
+ raise ValueError(
+ "Cannot use additional triggers with migration-reference/0"
+ )
if ppas:
- raise ValueError('Cannot use PPAs with migration-reference/0')
+ raise ValueError("Cannot use PPAs with migration-reference/0")
if "all-proposed" in kwargs:
- raise ValueError('Cannot use "all-proposed" with migration-reference/0')
+ raise ValueError(
+ 'Cannot use "all-proposed" with migration-reference/0'
+ )
for trigger in triggers:
try:
- trigsrc, trigver = trigger.split('/')
+ trigsrc, trigver = trigger.split("/")
except ValueError:
- raise ValueError('Malformed trigger, must be srcpackage/version')
+ raise ValueError(
+ "Malformed trigger, must be srcpackage/version"
+ )
# Debian Policy 5.6.1 and 5.6.12
if not NAME.match(trigsrc) or not VERSION.match(trigver):
- raise ValueError('Malformed trigger')
+ raise ValueError("Malformed trigger")
# Special snowflake
- if trigger in ('qemu-efi-noacpi/0', 'migration-reference/0'):
+ if trigger in ("qemu-efi-noacpi/0", "migration-reference/0"):
continue
if ppas:
- if not self.is_valid_package_version(release, trigsrc, trigver,
- ppas and ppas[-1] or None):
- raise ValueError('%s is not published in PPA %s %s' %
- (trigger, ppas[-1], release))
+ if not self.is_valid_package_version(
+ release, trigsrc, trigver, ppas and ppas[-1] or None
+ ):
+ raise ValueError(
+ "%s is not published in PPA %s %s"
+ % (trigger, ppas[-1], release)
+ )
# PPAs don't have components, so we need to determine it from the
# Ubuntu archive
- trigsrc_component = self.is_valid_package_version(release, trigsrc, None) or 'main'
+ trigsrc_component = (
+ self.is_valid_package_version(release, trigsrc, None)
+ or "main"
+ )
else:
- trigsrc_component = self.is_valid_package_version(release, trigsrc, trigver)
+ trigsrc_component = self.is_valid_package_version(
+ release, trigsrc, trigver
+ )
if not trigsrc_component:
- raise ValueError('%s is not published in %s' % (trigger, release))
+ raise ValueError(
+ "%s is not published in %s" % (trigger, release)
+ )
- can_upload_any_trigger = can_upload_any_trigger or \
- self.can_upload(requester, release, trigsrc_component, trigsrc)
+ can_upload_any_trigger = can_upload_any_trigger or self.can_upload(
+ requester, release, trigsrc_component, trigsrc
+ )
if ppas:
- package_component = self.is_valid_package_version(release, package, None) or 'main'
+ package_component = (
+ self.is_valid_package_version(release, package, None) or "main"
+ )
else:
- package_component = self.is_valid_package_version(release, package, None)
+ package_component = self.is_valid_package_version(
+ release, package, None
+ )
if not package_component:
- raise ValueError('%s is not published in %s' % (package, release))
+ raise ValueError(
+ "%s is not published in %s" % (package, release)
+ )
# verify that requester can upload package or trigsrc
- if not self.can_upload(requester, release, package_component, package) and \
- not can_upload_any_trigger and \
- requester not in ALLOWED_USERS_PERPACKAGE.get(package, []) and \
- not self.in_allowed_team(requester, package):
- raise ValueError('You are not allowed to upload %s or %s to '
- 'Ubuntu, thus you are not allowed to use this '
- 'service.' % (package, trigsrc))
-
- def validate_git_request(self, release, arch, package, ppas=[], env=[], **kwargs):
+ if (
+ not self.can_upload(requester, release, package_component, package)
+ and not can_upload_any_trigger
+ and requester not in ALLOWED_USERS_PERPACKAGE.get(package, [])
+ and not self.in_allowed_team(requester, package)
+ ):
+ raise ValueError(
+ "You are not allowed to upload %s or %s to "
+ "Ubuntu, thus you are not allowed to use this "
+ "service." % (package, trigsrc)
+ )
+
+ def validate_git_request(
+ self, release, arch, package, ppas=[], env=[], **kwargs
+ ):
"""Validate parameters for an upstream git test request
Supported kwargs:
@@ -172,43 +212,53 @@ class Submit:
otherwise return.
"""
if release not in self.releases:
- raise ValueError('Unknown release ' + release)
+ raise ValueError("Unknown release " + release)
if arch not in self.architectures:
- raise ValueError('Unknown architecture ' + arch)
+ raise ValueError("Unknown architecture " + arch)
if not NAME.match(package):
- raise ValueError('Malformed package')
+ raise ValueError("Malformed package")
if not ppas:
- raise ValueError('Must specify at least one PPA (to associate results with)')
+ raise ValueError(
+ "Must specify at least one PPA (to associate results with)"
+ )
for ppa in ppas:
if not self.is_valid_ppa(ppa):
- raise ValueError('Unknown PPA ' + ppa)
+ raise ValueError("Unknown PPA " + ppa)
for e in env:
if not ENV.match(e):
- raise ValueError('Invalid environment variable format "%s"' % e)
+ raise ValueError(
+ 'Invalid environment variable format "%s"' % e
+ )
# we should only be called in this mode
- assert 'build-git' in kwargs
- if not GIT.match(kwargs['build-git']):
- raise ValueError('Malformed build-git')
- if 'testname' in kwargs and not NAME.match(kwargs['testname']):
- raise ValueError('Malformed testname')
+ assert "build-git" in kwargs
+ if not GIT.match(kwargs["build-git"]):
+ raise ValueError("Malformed build-git")
+ if "testname" in kwargs and not NAME.match(kwargs["testname"]):
+ raise ValueError("Malformed testname")
- unsupported_keys = set(kwargs.keys()) - {'build-git', 'testname'}
+ unsupported_keys = set(kwargs.keys()) - {"build-git", "testname"}
if unsupported_keys:
- raise ValueError('Unsupported arguments: %s' % ' '.join(unsupported_keys))
+ raise ValueError(
+ "Unsupported arguments: %s" % " ".join(unsupported_keys)
+ )
- def unsend_amqp_request(self, release, arch, package, context=None, **params):
+ def unsend_amqp_request(
+ self, release, arch, package, context=None, **params
+ ):
"""Remove an autopkgtest AMQP request"""
if context:
- queue = 'debci-%s-%s-%s' % (context, release, arch)
+ queue = "debci-%s-%s-%s" % (context, release, arch)
else:
- queue = 'debci-%s-%s' % (release, arch)
+ queue = "debci-%s-%s" % (release, arch)
count = 0
- with amqp.Connection(self.amqp_creds.hostname,
- userid=self.amqp_creds.username,
- password=self.amqp_creds.password) as amqp_con:
+ with amqp.Connection(
+ self.amqp_creds.hostname,
+ userid=self.amqp_creds.username,
+ password=self.amqp_creds.password,
+ ) as amqp_con:
with amqp_con.channel() as ch:
while True:
message = ch.basic_get(queue)
@@ -217,32 +267,40 @@ class Submit:
if isinstance(message.body, str):
body = message.body
else:
- body = message.body.decode('UTF-8')
+ body = message.body.decode("UTF-8")
this_package, this_params = body.split(None, 1)
this_params = json.loads(this_params)
- del this_params['submit-time']
+ del this_params["submit-time"]
if this_package == package and this_params == params:
ch.basic_ack(message.delivery_tag)
count += 1
return count
- def send_amqp_request(self, release, arch, package, context=None, **params):
+ def send_amqp_request(
+ self, release, arch, package, context=None, **params
+ ):
"""Send autopkgtest AMQP request"""
if context:
- queue = 'debci-%s-%s-%s' % (context, release, arch)
+ queue = "debci-%s-%s-%s" % (context, release, arch)
else:
- queue = 'debci-%s-%s' % (release, arch)
-
- params['submit-time'] = datetime.strftime(datetime.utcnow(), '%Y-%m-%d %H:%M:%S%z')
- body = '%s %s' % (package, json.dumps(params, sort_keys=True))
- with amqp.Connection(self.amqp_creds.hostname,
- userid=self.amqp_creds.username,
- password=self.amqp_creds.password) as amqp_con:
+ queue = "debci-%s-%s" % (release, arch)
+
+ params["submit-time"] = datetime.strftime(
+ datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
+ )
+ body = "%s %s" % (package, json.dumps(params, sort_keys=True))
+ with amqp.Connection(
+ self.amqp_creds.hostname,
+ userid=self.amqp_creds.username,
+ password=self.amqp_creds.password,
+ ) as amqp_con:
with amqp_con.channel() as ch:
- ch.basic_publish(amqp.Message(body, delivery_mode=2), # persistent
- routing_key=queue)
+ ch.basic_publish(
+ amqp.Message(body, delivery_mode=2), # persistent
+ routing_key=queue,
+ )
@classmethod
def post_json(klass, url, data, auth_file, project):
@@ -262,22 +320,32 @@ class Submit:
with open(auth_file) as f:
contents = f.read()
for l in contents.splitlines():
- if l.startswith(project + ':'):
- credentials = l.split(':', 1)[1].strip()
+ if l.startswith(project + ":"):
+ credentials = l.split(":", 1)[1].strip()
break
else:
- logging.error('%s does not have password for project %s',
- auth_file, project)
+ logging.error(
+ "%s does not have password for project %s", auth_file, project
+ )
return
req = urllib.request.Request(
- url=url, method='POST',
- data=json.dumps(data).encode('UTF-8'),
- headers={'Content-Type': 'application/json',
- 'Authorization': 'Basic %s' % base64.b64encode(credentials.encode()).decode()})
+ url=url,
+ method="POST",
+ data=json.dumps(data).encode("UTF-8"),
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Basic %s"
+ % base64.b64encode(credentials.encode()).decode(),
+ },
+ )
with urllib.request.urlopen(req) as f:
- logging.debug('POST to URL %s succeeded with code %u: %s',
- f.geturl(), f.getcode(), f.read())
+ logging.debug(
+ "POST to URL %s succeeded with code %u: %s",
+ f.geturl(),
+ f.getcode(),
+ f.read(),
+ )
#
# helper methods
@@ -285,21 +353,24 @@ class Submit:
def is_valid_ppa(self, ppa):
"""Check if a ppa exists"""
- team, _, name = ppa.partition('/')
+ team, _, name = ppa.partition("/")
if not NAME.match(team) or not NAME.match(name):
return None
# https://launchpad.net/+apidoc/1.0.html#person-getPPAByName
- (code, response) = self.lp_request('~' + team, {
- 'ws.op': 'getPPAByName',
- 'distribution': json.dumps(LP + 'ubuntu'),
- 'name': json.dumps(name),
- })
+ (code, response) = self.lp_request(
+ "~" + team,
+ {
+ "ws.op": "getPPAByName",
+ "distribution": json.dumps(LP + "ubuntu"),
+ "name": json.dumps(name),
+ },
+ )
logging.debug(
- 'is_valid_ppa(%s): code %u, response %s',
- ppa, code, repr(response))
+ "is_valid_ppa(%s): code %u, response %s", ppa, code, repr(response)
+ )
if code < 200 or code >= 300:
return None
- if response.get('name') == name:
+ if response.get("name") == name:
return True
def is_valid_package_with_results(self, release, arch, package):
@@ -310,13 +381,16 @@ class Submit:
"""
c = self.db_con.cursor()
if release:
- c.execute('SELECT count(arch) FROM test '
- 'WHERE package=? AND arch=? AND release=?',
- (package, arch, release))
+ c.execute(
+ "SELECT count(arch) FROM test "
+ "WHERE package=? AND arch=? AND release=?",
+ (package, arch, release),
+ )
else:
- c.execute('SELECT count(arch) FROM test '
- 'WHERE package=? AND arch=?',
- (package, arch))
+ c.execute(
+ "SELECT count(arch) FROM test " "WHERE package=? AND arch=?",
+ (package, arch),
+ )
return c.fetchone()[0] > 0
def is_valid_package_version(self, release, package, version, ppa=None):
@@ -330,25 +404,32 @@ class Submit:
"""
# https://launchpad.net/+apidoc/1.0.html#archive-getPublishedSources
if ppa:
- team, name = ppa.split('/')
- obj = '~%s/+archive/ubuntu/%s' % (team, name)
+ team, name = ppa.split("/")
+ obj = "~%s/+archive/ubuntu/%s" % (team, name)
else:
- obj = 'ubuntu/+archive/primary'
- req = {'ws.op': 'getPublishedSources',
- 'source_name': json.dumps(package),
- 'distro_series': json.dumps(LP + 'ubuntu/' + release),
- 'status': 'Published',
- 'exact_match': 'true'}
+ obj = "ubuntu/+archive/primary"
+ req = {
+ "ws.op": "getPublishedSources",
+ "source_name": json.dumps(package),
+ "distro_series": json.dumps(LP + "ubuntu/" + release),
+ "status": "Published",
+ "exact_match": "true",
+ }
if version is not None:
- req['version'] = json.dumps(version)
+ req["version"] = json.dumps(version)
(code, response) = self.lp_request(obj, req)
if code < 200 or code >= 300:
return None
logging.debug(
- 'is_valid_package_version(%s, %s, %s): code %u, response %s',
- release, package, version, code, repr(response))
- if response.get('total_size', 0) > 0:
- return response['entries'][0]['component_name']
+ "is_valid_package_version(%s, %s, %s): code %u, response %s",
+ release,
+ package,
+ version,
+ code,
+ repr(response),
+ )
+ if response.get("total_size", 0) > 0:
+ return response["entries"][0]["component_name"]
else:
return None
@@ -356,25 +437,35 @@ class Submit:
"""Check if person can upload package into Ubuntu release"""
# https://launchpad.net/+apidoc/1.0.html#archive-checkUpload
- (code, response) = self.lp_request('ubuntu/+archive/primary',
- {'ws.op': 'checkUpload',
- 'distroseries': json.dumps(LP + 'ubuntu/' + release),
- 'person': json.dumps(LP + '~' + person),
- 'component': component,
- 'pocket': 'Proposed',
- 'sourcepackagename': json.dumps(package),
- })
- logging.debug('can_upload(%s, %s, %s, %s): (%u, %s)',
- person, release, component, package, code, repr(response))
+ (code, response) = self.lp_request(
+ "ubuntu/+archive/primary",
+ {
+ "ws.op": "checkUpload",
+ "distroseries": json.dumps(LP + "ubuntu/" + release),
+ "person": json.dumps(LP + "~" + person),
+ "component": component,
+ "pocket": "Proposed",
+ "sourcepackagename": json.dumps(package),
+ },
+ )
+ logging.debug(
+ "can_upload(%s, %s, %s, %s): (%u, %s)",
+ person,
+ release,
+ component,
+ package,
+ code,
+ repr(response),
+ )
return code >= 200 and code < 300
def in_allowed_team(self, person, package=[], teams=[]):
"""Check if person is in ALLOWED_TEAMS"""
- for team in (teams or ALLOWED_TEAMS):
- (code, response) = self.lp_request('~%s/participants' % team, {})
- for e in response.get('entries', []):
- if e.get('name') == person:
+ for team in teams or ALLOWED_TEAMS:
+ (code, response) = self.lp_request("~%s/participants" % team, {})
+ for e in response.get("entries", []):
+ if e.get("name") == person:
return True
return False
@@ -387,23 +478,31 @@ class Submit:
Return (code, json), where json is defined for successful codes
(200 <= code < 300) and None otherwise.
"""
- url = LP + obj + '?' + urllib.parse.urlencode(query)
+ url = LP + obj + "?" + urllib.parse.urlencode(query)
try:
with urllib.request.urlopen(url, timeout=10) as req:
code = req.getcode()
if code >= 300:
- logging.error('URL %s failed with code %u', req.geturl(), code)
+ logging.error(
+ "URL %s failed with code %u", req.geturl(), code
+ )
return (code, None)
response = req.read()
except HTTPError as e:
- logging.error('%s failed with %u: %s\n%s', url, e.code, e.reason, e.headers)
+ logging.error(
+ "%s failed with %u: %s\n%s", url, e.code, e.reason, e.headers
+ )
return (e.code, None)
try:
- response = json.loads(response.decode('UTF-8'))
+ response = json.loads(response.decode("UTF-8"))
except (UnicodeDecodeError, ValueError) as e:
- logging.error('URL %s gave invalid response %s: %s',
- req.geturl(), response, str(e))
+ logging.error(
+ "URL %s gave invalid response %s: %s",
+ req.geturl(),
+ response,
+ str(e),
+ )
return (500, None)
- logging.debug('lp_request %s succeeded: %s', url, response)
+ logging.debug("lp_request %s succeeded: %s", url, response)
return (code, response)
diff --git a/charms/focal/autopkgtest-web/webcontrol/request/tests/test_app.py b/charms/focal/autopkgtest-web/webcontrol/request/tests/test_app.py
index c3bd16a..44067b9 100644
--- a/charms/focal/autopkgtest-web/webcontrol/request/tests/test_app.py
+++ b/charms/focal/autopkgtest-web/webcontrol/request/tests/test_app.py
@@ -1,9 +1,8 @@
"""Test the Flask app."""
import os
-
from unittest import TestCase
-from unittest.mock import patch, mock_open
+from unittest.mock import mock_open, patch
import request.app
from request.submit import Submit
@@ -11,7 +10,7 @@ from request.submit import Submit
class AppTestBase(TestCase):
def setUp(self):
- request.app.app.config['TESTING'] = True
+ request.app.app.config["TESTING"] = True
self.app = request.app.app.test_client()
@@ -21,11 +20,11 @@ class DistroRequestTests(AppTestBase):
def prep_session(self):
"""Set some commonly needed session data."""
with self.app.session_transaction() as session:
- session['nickname'] = 'person'
+ session["nickname"] = "person"
def test_login(self):
"""Hitting / when not logged in prompts for a login."""
- ret = self.app.get('/')
+ ret = self.app.get("/")
self.assertIn(b'<form action="/login"', ret.data)
def test_secret_key_persistence(self):
@@ -35,308 +34,529 @@ class DistroRequestTests(AppTestBase):
request.app.setup_key(request.app.secret_path)
self.assertEqual(request.app.app.secret_key, orig_key)
- @patch('request.app.Submit')
+ @patch("request.app.Submit")
def test_nickname(self, mock_submit):
"""Hitting / with a nickname in the session prompts for logout."""
- mock_submit.return_value.validate_distro_request.side_effect = ValueError('not 31337 enough')
+ mock_submit.return_value.validate_distro_request.side_effect = (
+ ValueError("not 31337 enough")
+ )
with self.app.session_transaction() as session:
- session['nickname'] = 'person'
- ret = self.app.get('/')
- self.assertIn(b'Logout person', ret.data)
+ session["nickname"] = "person"
+ ret = self.app.get("/")
+ self.assertIn(b"Logout person", ret.data)
- @patch('request.app.Submit')
+ @patch("request.app.Submit")
def test_missing_request(self, mock_submit):
"""Missing GET params should return 400."""
- mock_submit.return_value.validate_distro_request.side_effect = ValueError('not 31337 enough')
+ mock_submit.return_value.validate_distro_request.side_effect = (
+ ValueError("not 31337 enough")
+ )
self.prep_session()
- ret = self.app.get('/')
+ ret = self.app.get("/")
self.assertEqual(ret.status_code, 400)
- self.assertIn(b'You submitted an invalid request', ret.data)
+ self.assertIn(b"You submitted an invalid request", ret.data)
- @patch('request.app.Submit')
+ @patch("request.app.Submit")
def test_invalid_request(self, mock_submit):
"""Invalid GET params should return 400."""
- mock_submit.return_value.validate_distro_request.side_effect = ValueError('not 31337 enough')
+ mock_submit.return_value.validate_distro_request.side_effect = (
+ ValueError("not 31337 enough")
+ )
self.prep_session()
- ret = self.app.get('/?arch=i386&package=hi&release=testy&trigger=foo/1')
+ ret = self.app.get(
+ "/?arch=i386&package=hi&release=testy&trigger=foo/1"
+ )
self.assertEqual(ret.status_code, 400)
- self.assertIn(b'not 31337 enough', ret.data)
+ self.assertIn(b"not 31337 enough", ret.data)
mock_submit.return_value.validate_distro_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person')
-
- @patch('request.app.Submit')
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ )
+
+ @patch("request.app.Submit")
def test_valid_request(self, mock_submit):
"""Successful distro request with one trigger."""
self.prep_session()
- ret = self.app.get('/?arch=i386&package=hi&release=testy&trigger=foo/1')
+ ret = self.app.get(
+ "/?arch=i386&package=hi&release=testy&trigger=foo/1"
+ )
self.assertEqual(ret.status_code, 200)
- self.assertIn(b'ubmitted', ret.data)
+ self.assertIn(b"ubmitted", ret.data)
mock_submit.return_value.validate_distro_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person')
-
- @patch('request.app.Submit')
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ )
+
+ @patch("request.app.Submit")
def test_valid_request_multi_trigger(self, mock_submit):
"""Successful distro request with multiple triggers."""
self.prep_session()
- ret = self.app.get('/?arch=i386&package=hi&release=testy&trigger=foo/1&trigger=bar/2')
+ ret = self.app.get(
+ "/?arch=i386&package=hi&release=testy&trigger=foo/1&trigger=bar/2"
+ )
self.assertEqual(ret.status_code, 200)
- self.assertIn(b'ubmitted', ret.data)
+ self.assertIn(b"ubmitted", ret.data)
mock_submit.return_value.validate_distro_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1', 'bar/2'], requester='person')
-
- @patch('request.app.Submit')
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1", "bar/2"],
+ requester="person",
+ )
+
+ @patch("request.app.Submit")
def test_valid_request_with_ppas(self, mock_submit):
"""Return success with all params & ppas."""
self.prep_session()
- ret = self.app.get('/?arch=i386&package=hi&release=testy&trigger=foo/1&ppa=train/overlay&ppa=train/001')
+ ret = self.app.get(
+ "/?arch=i386&package=hi&release=testy&trigger=foo/1&ppa=train/overlay&ppa=train/001"
+ )
self.assertEqual(ret.status_code, 200)
- self.assertIn(b'ubmitted', ret.data)
+ self.assertIn(b"ubmitted", ret.data)
mock_submit.return_value.validate_distro_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person',
- ppas=['train/overlay', 'train/001'])
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ ppas=["train/overlay", "train/001"],
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- context='ppa', release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person',
- ppas=['train/overlay', 'train/001'])
-
- @patch('request.app.Submit')
+ context="ppa",
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ ppas=["train/overlay", "train/001"],
+ )
+
+ @patch("request.app.Submit")
def test_all_proposed(self, mock_submit):
"""Successful distro request with all-proposed."""
self.prep_session()
- ret = self.app.get('/?arch=i386&package=hi&release=testy&trigger=foo/1&all-proposed=1')
+ ret = self.app.get(
+ "/?arch=i386&package=hi&release=testy&trigger=foo/1&all-proposed=1"
+ )
self.assertEqual(ret.status_code, 200)
- self.assertIn(b'ubmitted', ret.data)
+ self.assertIn(b"ubmitted", ret.data)
mock_submit.return_value.validate_distro_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person', **{'all-proposed': '1'})
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ **{"all-proposed": "1"}
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- release='testy', arch='i386', package='hi',
- triggers=['foo/1'], requester='person', **{'all-proposed': '1'})
+ release="testy",
+ arch="i386",
+ package="hi",
+ triggers=["foo/1"],
+ requester="person",
+ **{"all-proposed": "1"}
+ )
class GitHubRequestTests(AppTestBase):
"""Test GitHub test requests (via PSK signatures)."""
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_ping(self):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=cb59904bf33c619ad2c52095deb405c86cc5adfd'),
- ('X-GitHub-Event', 'ping')],
- data=b'{"info": "https://api.github.com/xx"}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=cb59904bf33c619ad2c52095deb405c86cc5adfd",
+ ),
+ ("X-GitHub-Event", "ping"),
+ ],
+ data=b'{"info": "https://api.github.com/xx"}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'OK', ret.data)
- self.assertNotIn(b'ubmit', ret.data)
+ self.assertIn(b"OK", ret.data)
+ self.assertNotIn(b"ubmit", ret.data)
- @patch('request.app.Submit')
- @patch('request.app.open', mock_open(None, 'bogus'), create=True)
+ @patch("request.app.Submit")
+ @patch("request.app.open", mock_open(None, "bogus"), create=True)
def test_invalid_secret_file(self, mock_submit):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=8572f239e05c652710a4f85d2061cc0fcbc7b127')],
- data=b'{"action": "opened", "number": 2, "pr": "https://api.github.com/xx"}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=8572f239e05c652710a4f85d2061cc0fcbc7b127",
+ )
+ ],
+ data=b'{"action": "opened", "number": 2, "pr": "https://api.github.com/xx"}',
+ )
self.assertEqual(ret.status_code, 403, ret.data)
- self.assertIn(b'GitHub signature verification failed', ret.data)
+ self.assertIn(b"GitHub signature verification failed", ret.data)
self.assertFalse(mock_submit.return_value.validate_git_request.called)
self.assertFalse(mock_submit.return_value.send_amqp_request.called)
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_bad_signature(self):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=deadbeef0815'),
- ('X-GitHub-Event', 'ping')],
- data=b'{"info": "https://api.github.com/xx"}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ headers=[
+ ("X-Hub-Signature", "sha1=deadbeef0815"),
+ ("X-GitHub-Event", "ping"),
+ ],
+ data=b'{"info": "https://api.github.com/xx"}',
+ )
self.assertEqual(ret.status_code, 403, ret.data)
- self.assertIn(b'GitHub signature verification failed', ret.data)
+ self.assertIn(b"GitHub signature verification failed", ret.data)
- @patch('request.app.Submit')
- @patch('request.app.check_github_sig')
+ @patch("request.app.Submit")
+ @patch("request.app.check_github_sig")
def test_missing_pr_number(self, mock_check_github_sig, mock_submit):
mock_check_github_sig.return_value = True
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- data=b'{"action": "opened", "pr": "https://api.github.com/xx"}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ data=b'{"action": "opened", "pr": "https://api.github.com/xx"}',
+ )
self.assertEqual(ret.status_code, 400, ret.data)
self.assertIn(b"Missing field in JSON data: 'number'", ret.data)
self.assertFalse(mock_submit.return_value.validate_git_request.called)
self.assertFalse(mock_submit.return_value.send_amqp_request.called)
- @patch('request.app.Submit')
- @patch('request.app.check_github_sig')
+ @patch("request.app.Submit")
+ @patch("request.app.check_github_sig")
def test_ignored_action(self, mock_check_github_sig, mock_submit):
mock_check_github_sig.return_value = True
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- data=b'{"action": "boring", "number": 2, "pr": "https://api.github.com/xx"}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ data=b'{"action": "boring", "number": 2, "pr": "https://api.github.com/xx"}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'GitHub PR action boring is not relevant for testing', ret.data)
+ self.assertIn(
+ b"GitHub PR action boring is not relevant for testing", ret.data
+ )
self.assertFalse(mock_submit.return_value.validate_git_request.called)
self.assertFalse(mock_submit.return_value.send_amqp_request.called)
- @patch('request.app.Submit')
- @patch('request.app.check_github_sig')
+ @patch("request.app.Submit")
+ @patch("request.app.check_github_sig")
def test_invalid(self, mock_check_github_sig, mock_submit):
- mock_submit.return_value.validate_git_request.side_effect = ValueError('weird color')
+ mock_submit.return_value.validate_git_request.side_effect = ValueError(
+ "weird color"
+ )
mock_check_github_sig.return_value = True
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- data=b'{"action": "opened", "number": 2, "pull_request":'
- b'{"statuses_url": "https://api.github.com/2"}}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ data=b'{"action": "opened", "number": 2, "pull_request":'
+ b'{"statuses_url": "https://api.github.com/2"}}',
+ )
self.assertEqual(ret.status_code, 400, ret.data)
- self.assertIn(b'invalid request', ret.data)
- self.assertIn(b'weird color', ret.data)
+ self.assertIn(b"invalid request", ret.data)
+ self.assertIn(b"weird color", ret.data)
mock_submit.return_value.validate_git_request.assert_called_once_with(
- release='testy', arch='C51', package='hi',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/2'],
- **{'build-git': 'http://x.com/foo'})
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/2",
+ ],
+ **{"build-git": "http://x.com/foo"}
+ )
self.assertFalse(mock_submit.return_value.send_amqp_request.called)
- @patch('request.app.Submit')
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ @patch("request.app.Submit")
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_valid_simple(self, mock_submit):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=1dae67d4406d21b498806968a3def61754498a21')],
- data=b'{"action": "opened", "number": 2, "pull_request":'
- b' {"statuses_url": "https://api.github.com/two"}}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=1dae67d4406d21b498806968a3def61754498a21",
+ )
+ ],
+ data=b'{"action": "opened", "number": 2, "pull_request":'
+ b' {"statuses_url": "https://api.github.com/two"}}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'Test request submitted.', ret.data)
+ self.assertIn(b"Test request submitted.", ret.data)
mock_submit.return_value.validate_git_request.assert_called_once_with(
- release='testy', arch='C51', package='hi',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'http://x.com/foo'})
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "http://x.com/foo"}
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- context='upstream', release='testy', arch='C51', package='hi',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'http://x.com/foo'})
+ context="upstream",
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "http://x.com/foo"}
+ )
# we recorded the request
request.app.open.assert_called_with(
- os.path.join(request.app.PATH, 'github-pending', 'testy-C51-hi-2-two'), 'w')
- self.assertIn('GITHUB_STATUSES_URL=https://api.github.com/two', str(request.app.open().write.call_args))
+ os.path.join(
+ request.app.PATH, "github-pending", "testy-C51-hi-2-two"
+ ),
+ "w",
+ )
+ self.assertIn(
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ str(request.app.open().write.call_args),
+ )
self.assertIn('"arch": "C51"', str(request.app.open().write.call_args))
# we told GitHub about it
mock_submit.return_value.post_json.assert_called_once_with(
- 'https://api.github.com/two',
- {'context': 'testy-C51', 'state': 'pending',
- 'target_url': 'http://autopkgtest.ubuntu.com/running#pkg-hi',
- 'description': 'autopkgtest running'},
- os.path.expanduser('~/github-status-credentials.txt'), 'hi')
-
- @patch('request.app.Submit')
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ "https://api.github.com/two",
+ {
+ "context": "testy-C51",
+ "state": "pending",
+ "target_url": "http://autopkgtest.ubuntu.com/running#pkg-hi",
+ "description": "autopkgtest running",
+ },
+ os.path.expanduser("~/github-status-credentials.txt"),
+ "hi",
+ )
+
+ @patch("request.app.Submit")
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_valid_complex(self, mock_submit):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo&'
- 'ppa=joe/stuff&ppa=mary/misc&env=THIS=a;THAT=b&env=THERE=c',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=f9041325575127310c304bb65f9befb0d13b1ce6')],
- data=b'{"action": "opened", "number": 2, "pull_request":'
- b' {"statuses_url": "https://api.github.com/2"}}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo&"
+ "ppa=joe/stuff&ppa=mary/misc&env=THIS=a;THAT=b&env=THERE=c",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=f9041325575127310c304bb65f9befb0d13b1ce6",
+ )
+ ],
+ data=b'{"action": "opened", "number": 2, "pull_request":'
+ b' {"statuses_url": "https://api.github.com/2"}}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'Test request submitted.', ret.data)
+ self.assertIn(b"Test request submitted.", ret.data)
mock_submit.return_value.validate_git_request.assert_called_once_with(
- release='testy', arch='C51', package='hi',
- env=['THIS=a', 'THAT=b', 'THERE=c', 'UPSTREAM_PULL_REQUEST=2',
- 'GITHUB_STATUSES_URL=https://api.github.com/2'],
- ppas=['joe/stuff', 'mary/misc'],
- **{'build-git': 'http://x.com/foo'})
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "THIS=a",
+ "THAT=b",
+ "THERE=c",
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/2",
+ ],
+ ppas=["joe/stuff", "mary/misc"],
+ **{"build-git": "http://x.com/foo"}
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- context='upstream', release='testy', arch='C51', package='hi',
- env=['THIS=a', 'THAT=b', 'THERE=c', 'UPSTREAM_PULL_REQUEST=2',
- 'GITHUB_STATUSES_URL=https://api.github.com/2'],
- ppas=['joe/stuff', 'mary/misc'],
- **{'build-git': 'http://x.com/foo'})
-
- @patch('request.app.Submit')
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ context="upstream",
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "THIS=a",
+ "THAT=b",
+ "THERE=c",
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/2",
+ ],
+ ppas=["joe/stuff", "mary/misc"],
+ **{"build-git": "http://x.com/foo"}
+ )
+
+ @patch("request.app.Submit")
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_valid_generated_url(self, mock_submit):
- ret = self.app.post('/?arch=C51&package=hi&release=testy',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=427a20827d46f5fe8e18f08b9a7fa09ba915ea08')],
- data=b'{"action": "opened", "number": 2, "pull_request":'
- b' {"statuses_url": "https://api.github.com/two",'
- b' "base": {"repo": {"clone_url": "https://github.com/joe/x.git"}}}}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=427a20827d46f5fe8e18f08b9a7fa09ba915ea08",
+ )
+ ],
+ data=b'{"action": "opened", "number": 2, "pull_request":'
+ b' {"statuses_url": "https://api.github.com/two",'
+ b' "base": {"repo": {"clone_url": "https://github.com/joe/x.git"}}}}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'Test request submitted.', ret.data)
+ self.assertIn(b"Test request submitted.", ret.data)
mock_submit.return_value.validate_git_request.assert_called_once_with(
- release='testy', arch='C51', package='hi',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'https://github.com/joe/x.git#refs/pull/2/head'})
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "https://github.com/joe/x.git#refs/pull/2/head"}
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- context='upstream', release='testy', arch='C51', package='hi',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'https://github.com/joe/x.git#refs/pull/2/head'})
+ context="upstream",
+ release="testy",
+ arch="C51",
+ package="hi",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "https://github.com/joe/x.git#refs/pull/2/head"}
+ )
def test_post_json_missing_file(self):
- self.assertRaises(IOError, Submit.post_json, 'https://foo', {}, '/non/existing', 'myproj')
-
- @patch('request.submit.open', mock_open(None, 'proj1:user:s3kr1t'), create=True)
- @patch('request.submit.urllib.request')
+ self.assertRaises(
+ IOError,
+ Submit.post_json,
+ "https://foo",
+ {},
+ "/non/existing",
+ "myproj",
+ )
+
+ @patch(
+ "request.submit.open",
+ mock_open(None, "proj1:user:s3kr1t"),
+ create=True,
+ )
+ @patch("request.submit.urllib.request")
def test_post_json_nouser(self, mock_request):
- Submit.post_json('https://example.com', {'bar': 2}, '/the/creds.txt', 'proj')
+ Submit.post_json(
+ "https://example.com", {"bar": 2}, "/the/creds.txt", "proj"
+ )
self.assertEqual(mock_request.urlopen.call_count, 0)
# this can only be tested shallowly in a unit test, this would need a real
# web server
- @patch('request.submit.open', mock_open(None, 'proj:user:s3kr1t'), create=True)
- @patch('request.submit.urllib.request')
+ @patch(
+ "request.submit.open", mock_open(None, "proj:user:s3kr1t"), create=True
+ )
+ @patch("request.submit.urllib.request")
def test_post_json_success(self, mock_request):
- Submit.post_json('https://example.com', {'bar': 2}, '/the/creds.txt', 'proj')
+ Submit.post_json(
+ "https://example.com", {"bar": 2}, "/the/creds.txt", "proj"
+ )
print(mock_request.mock_calls)
mock_request.Request.assert_called_once_with(
- url='https://example.com',
- headers={'Content-Type': 'application/json', 'Authorization': 'Basic dXNlcjpzM2tyMXQ='},
- method='POST',
- data=b'{"bar": 2}')
+ url="https://example.com",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Basic dXNlcjpzM2tyMXQ=",
+ },
+ method="POST",
+ data=b'{"bar": 2}',
+ )
self.assertEqual(mock_request.urlopen.call_count, 1)
- @patch('request.app.Submit')
- @patch('request.app.open',
- mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
- create=True)
+ @patch("request.app.Submit")
+ @patch(
+ "request.app.open",
+ mock_open(None, '{"hi": "1111111111111111111111111111111111111111"}'),
+ create=True,
+ )
def test_valid_testname(self, mock_submit):
- ret = self.app.post('/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo&testname=first',
- content_type='application/json',
- headers=[('X-Hub-Signature', 'sha1=1dae67d4406d21b498806968a3def61754498a21')],
- data=b'{"action": "opened", "number": 2, "pull_request":'
- b' {"statuses_url": "https://api.github.com/two"}}')
+ ret = self.app.post(
+ "/?arch=C51&package=hi&release=testy&build-git=http://x.com/foo&testname=first",
+ content_type="application/json",
+ headers=[
+ (
+ "X-Hub-Signature",
+ "sha1=1dae67d4406d21b498806968a3def61754498a21",
+ )
+ ],
+ data=b'{"action": "opened", "number": 2, "pull_request":'
+ b' {"statuses_url": "https://api.github.com/two"}}',
+ )
self.assertEqual(ret.status_code, 200, ret.data)
- self.assertIn(b'Test request submitted.', ret.data)
+ self.assertIn(b"Test request submitted.", ret.data)
mock_submit.return_value.validate_git_request.assert_called_once_with(
- release='testy', arch='C51', package='hi', testname='first',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'http://x.com/foo'})
+ release="testy",
+ arch="C51",
+ package="hi",
+ testname="first",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "http://x.com/foo"}
+ )
mock_submit.return_value.send_amqp_request.assert_called_once_with(
- context='upstream', release='testy', arch='C51', package='hi', testname='first',
- env=['UPSTREAM_PULL_REQUEST=2', 'GITHUB_STATUSES_URL=https://api.github.com/two'],
- **{'build-git': 'http://x.com/foo'})
+ context="upstream",
+ release="testy",
+ arch="C51",
+ package="hi",
+ testname="first",
+ env=[
+ "UPSTREAM_PULL_REQUEST=2",
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ ],
+ **{"build-git": "http://x.com/foo"}
+ )
# we recorded the request
request.app.open.assert_called_with(
- os.path.join(request.app.PATH, 'github-pending', 'testy-C51-hi-2-two'), 'w')
- self.assertIn('GITHUB_STATUSES_URL=https://api.github.com/two', str(request.app.open().write.call_args))
- self.assertIn('"testname": "first"', str(request.app.open().write.call_args))
+ os.path.join(
+ request.app.PATH, "github-pending", "testy-C51-hi-2-two"
+ ),
+ "w",
+ )
+ self.assertIn(
+ "GITHUB_STATUSES_URL=https://api.github.com/two",
+ str(request.app.open().write.call_args),
+ )
+ self.assertIn(
+ '"testname": "first"', str(request.app.open().write.call_args)
+ )
SESSION = {}
@@ -347,39 +567,46 @@ class LoginTests(AppTestBase):
def test_login(self):
"""Ensure correct redirect when initiating login."""
- ret = self.app.post('/login', data=dict(
- openid='test',
- next='/',
- ), follow_redirects=False)
- self.assertIn(b'https://login.ubuntu.com/+openid?', ret.data)
+ ret = self.app.post(
+ "/login",
+ data=dict(
+ openid="test",
+ next="/",
+ ),
+ follow_redirects=False,
+ )
+ self.assertIn(b"https://login.ubuntu.com/+openid?", ret.data)
self.assertEqual(ret.status_code, 302)
def test_login_get(self):
"""Ensure login endpoint accepts GET requests as per SSO spec."""
- ret = self.app.get('/login', follow_redirects=False)
+ ret = self.app.get("/login", follow_redirects=False)
self.assertIn(b'<a href="/">/</a>.', ret.data)
self.assertEqual(ret.status_code, 302)
def test_logged_already(self):
"""Ensure correct redirect when already logged in."""
with self.app.session_transaction() as session:
- session['nickname'] = 'person'
- ret = self.app.get('/login', follow_redirects=False)
- self.assertIn(b'You should be redirected automatically', ret.data)
+ session["nickname"] = "person"
+ ret = self.app.get("/login", follow_redirects=False)
+ self.assertIn(b"You should be redirected automatically", ret.data)
self.assertEqual(ret.status_code, 302)
- @patch('request.app.oid')
- @patch('request.app.session', SESSION)
+ @patch("request.app.oid")
+ @patch("request.app.session", SESSION)
def test_identify(self, oid_mock):
"""Ensure OpenID login can be successfully completed."""
+
class Resp:
"""Fake OpenID response class."""
- identity_url = 'http://example.com'
- nickname = 'lebowski'
- oid_mock.get_next_url.return_value = 'https://localhost/'
+
+ identity_url = "http://example.com"
+ nickname = "lebowski"
+
+ oid_mock.get_next_url.return_value = "https://localhost/"
ret = request.app.identify(Resp)
- self.assertIn(b'>https://localhost/</a>', ret.data)
- for attr in ('identity_url', 'nickname'):
+ self.assertIn(b">https://localhost/</a>", ret.data)
+ for attr in ("identity_url", "nickname"):
self.assertEqual(getattr(Resp, attr), SESSION[attr])
oid_mock.get_next_url.assert_called_once_with()
self.assertEqual(ret.status_code, 302)
@@ -387,9 +614,9 @@ class LoginTests(AppTestBase):
def test_logout(self):
"""Ensure logging out correctly clears session."""
with self.app.session_transaction() as session:
- session['foo'] = 'bar'
- ret = self.app.get('/logout', follow_redirects=False)
- self.assertIn(b'http://localhost/</a>.', ret.data)
+ session["foo"] = "bar"
+ ret = self.app.get("/logout", follow_redirects=False)
+ self.assertIn(b"http://localhost/</a>.", ret.data)
self.assertEqual(ret.status_code, 302)
with self.app.session_transaction() as session:
- self.assertNotIn('foo', session)
+ self.assertNotIn("foo", session)
diff --git a/charms/focal/autopkgtest-web/webcontrol/request/tests/test_submit.py b/charms/focal/autopkgtest-web/webcontrol/request/tests/test_submit.py
index bae9185..790846e 100644
--- a/charms/focal/autopkgtest-web/webcontrol/request/tests/test_submit.py
+++ b/charms/focal/autopkgtest-web/webcontrol/request/tests/test_submit.py
@@ -4,9 +4,8 @@ Test all things related verifying input arguments and sending AMQP requests.
"""
import sqlite3
-
from unittest import TestCase
-from unittest.mock import patch, MagicMock
+from unittest.mock import MagicMock, patch
from urllib.error import HTTPError
import request.submit
@@ -15,25 +14,35 @@ import request.submit
class SubmitTestBase(TestCase):
"""Common setup of tests of Submit class"""
- @patch('request.submit.configparser.ConfigParser')
- @patch('request.submit.sqlite3')
+ @patch("request.submit.configparser.ConfigParser")
+ @patch("request.submit.sqlite3")
def setUp(self, mock_sqlite, mock_configparser):
- test_db = sqlite3.connect(':memory:')
- test_db.execute('CREATE TABLE test ('
- ' id INTEGER PRIMARY KEY, '
- ' release CHAR[20], '
- ' arch CHAR[20], '
- ' package char[120])')
- test_db.execute("INSERT INTO test values(null, 'testy', '6510', 'blue')")
- test_db.execute("INSERT INTO test values(null, 'testy', 'C51', 'blue')")
- test_db.execute("INSERT INTO test values(null, 'grumpy', 'hexium', 'green')")
+ test_db = sqlite3.connect(":memory:")
+ test_db.execute(
+ "CREATE TABLE test ("
+ " id INTEGER PRIMARY KEY, "
+ " release CHAR[20], "
+ " arch CHAR[20], "
+ " package char[120])"
+ )
+ test_db.execute(
+ "INSERT INTO test values(null, 'testy', '6510', 'blue')"
+ )
+ test_db.execute(
+ "INSERT INTO test values(null, 'testy', 'C51', 'blue')"
+ )
+ test_db.execute(
+ "INSERT INTO test values(null, 'grumpy', 'hexium', 'green')"
+ )
test_db.commit()
mock_sqlite.connect.return_value = test_db
# mock config values
- cfg = {'amqp': {'uri': 'amqp://user:s3kr1t@1.2.3.4'},
- 'web': {'database': '/ignored', 'database_ro': '/ignored'},
- 'autopkgtest' : { 'releases': 'testy grumpy' }}
+ cfg = {
+ "amqp": {"uri": "amqp://user:s3kr1t@1.2.3.4"},
+ "web": {"database": "/ignored", "database_ro": "/ignored"},
+ "autopkgtest": {"releases": "testy grumpy"},
+ }
mock_configparser.return_value = MagicMock()
mock_configparser.return_value.__getitem__.side_effect = cfg.get
@@ -46,61 +55,70 @@ class DistroRequestValidationTests(SubmitTestBase):
def test_init(self):
"""Read debci configuration"""
- self.assertEqual(self.submit.releases,
- ['testy', 'grumpy'])
- self.assertEqual(self.submit.architectures,
- {'6510', 'C51', 'hexium'})
- self.assertEqual(self.submit.amqp_creds.hostname,
- '1.2.3.4')
- self.assertEqual(self.submit.amqp_creds.username,
- 'user')
- self.assertEqual(self.submit.amqp_creds.password,
- 's3kr1t')
+ self.assertEqual(self.submit.releases, ["testy", "grumpy"])
+ self.assertEqual(self.submit.architectures, {"6510", "C51", "hexium"})
+ self.assertEqual(self.submit.amqp_creds.hostname, "1.2.3.4")
+ self.assertEqual(self.submit.amqp_creds.username, "user")
+ self.assertEqual(self.submit.amqp_creds.password, "s3kr1t")
def test_bad_release(self):
"""Unknown release"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('fooly', 'C51', 'blue', ['ab/1'], 'joe')
- self.assertEqual(str(cme.exception), 'Unknown release fooly')
+ self.submit.validate_distro_request(
+ "fooly", "C51", "blue", ["ab/1"], "joe"
+ )
+ self.assertEqual(str(cme.exception), "Unknown release fooly")
def test_bad_arch(self):
"""Unknown architecture"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'wut', 'blue', ['ab/1'], 'joe')
- self.assertEqual(str(cme.exception), 'Unknown architecture wut')
+ self.submit.validate_distro_request(
+ "testy", "wut", "blue", ["ab/1"], "joe"
+ )
+ self.assertEqual(str(cme.exception), "Unknown architecture wut")
def test_bad_package(self):
"""Unknown package"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'badpkg', ['ab/1'], 'joe')
- self.assertIn('Package badpkg', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "badpkg", ["ab/1"], "joe"
+ )
+ self.assertIn("Package badpkg", str(cme.exception))
def test_bad_argument(self):
"""Unknown argument"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1'], 'joe', foo='bar')
- self.assertIn('Invalid argument foo', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1"], "joe", foo="bar"
+ )
+ self.assertIn("Invalid argument foo", str(cme.exception))
def test_invalid_trigger_syntax(self):
"""Invalid syntax in trigger"""
# invalid trigger format
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab'], 'joe')
- self.assertIn('Malformed trigger', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab"], "joe"
+ )
+ self.assertIn("Malformed trigger", str(cme.exception))
# invalid trigger source package name chars
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['a!b/1'], 'joe')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["a!b/1"], "joe"
+ )
# invalid trigger version chars
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1!1'], 'joe')
- self.assertIn('Malformed trigger', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1!1"], "joe"
+ )
+ self.assertIn("Malformed trigger", str(cme.exception))
def test_disallowed_testname(self):
"""testname not allowed for distro tests"""
@@ -108,54 +126,60 @@ class DistroRequestValidationTests(SubmitTestBase):
# we only allow this for GitHub requests; with distro requests it would
# be cheating as proposed-migration would consider those
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe', testname='first')
- self.assertIn('Invalid argument testname', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe", testname="first"
+ )
+ self.assertIn("Invalid argument testname", str(cme.exception))
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_ppa(self, mock_urlopen):
"""PPA does not exist"""
# invalid name don't even call lp
with self.assertRaises(ValueError) as cme:
self.submit.validate_distro_request(
- 'testy', 'C51', 'foo', ['ab/1.2'], 'joe', ['b~ad/ppa'])
- self.assertEqual(str(cme.exception), 'Unknown PPA b~ad/ppa')
+ "testy", "C51", "foo", ["ab/1.2"], "joe", ["b~ad/ppa"]
+ )
+ self.assertEqual(str(cme.exception), "Unknown PPA b~ad/ppa")
self.assertEqual(mock_urlopen.call_count, 0)
# mock Launchpad response: successful form, but no match
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.geturl.return_value = 'http://mock.launchpad.net'
- cm.read.return_value = b'{}'
+ cm.geturl.return_value = "http://mock.launchpad.net"
+ cm.read.return_value = b"{}"
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
self.submit.validate_distro_request(
- 'testy', 'C51', 'foo', ['ab/1.2'], 'joe', ['bad/ppa'])
- self.assertEqual(str(cme.exception), 'Unknown PPA bad/ppa')
+ "testy", "C51", "foo", ["ab/1.2"], "joe", ["bad/ppa"]
+ )
+ self.assertEqual(str(cme.exception), "Unknown PPA bad/ppa")
self.assertEqual(mock_urlopen.call_count, 1)
# success
cm.read.return_value = b'{"name": "there"}'
- self.assertTrue(self.submit.is_valid_ppa('hi/there'))
+ self.assertTrue(self.submit.is_valid_ppa("hi/there"))
# broken JSON response
- cm.read.return_value = b'not { json}'
+ cm.read.return_value = b"not { json}"
with self.assertRaises(ValueError) as cme:
self.submit.validate_distro_request(
- 'testy', 'C51', 'foo', ['ab/1.2'], 'joe', ['broke/ness'])
+ "testy", "C51", "foo", ["ab/1.2"], "joe", ["broke/ness"]
+ )
# same, but entirely failing query -- let's be on the safe side
cm.getcode.return_value = 404
- cm.read.return_value = b'<html>not found</html>'
+ cm.read.return_value = b"<html>not found</html>"
with self.assertRaises(ValueError) as cme:
self.submit.validate_distro_request(
- 'testy', 'C51', 'foo', ['ab/1.2'], 'joe', ['bro/ken'])
- self.assertEqual(str(cme.exception), 'Unknown PPA bro/ken')
+ "testy", "C51", "foo", ["ab/1.2"], "joe", ["bro/ken"]
+ )
+ self.assertEqual(str(cme.exception), "Unknown PPA bro/ken")
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_nonexisting_trigger(self, mock_urlopen):
"""Trigger source package/version does not exist"""
@@ -164,29 +188,39 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.geturl.return_value = 'http://mock.launchpad.net'
+ cm.geturl.return_value = "http://mock.launchpad.net"
cm.read.return_value = b'{"total_size": 0}'
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
- self.assertEqual(str(cme.exception), 'ab/1.2 is not published in testy')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
+ self.assertEqual(
+ str(cme.exception), "ab/1.2 is not published in testy"
+ )
self.assertEqual(mock_urlopen.call_count, 1)
# broken JSON response
- cm.read.return_value = b'not { json}'
+ cm.read.return_value = b"not { json}"
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
# same, but entirely failing query -- let's be on the safe side
cm.getcode.return_value = 404
- cm.read.return_value = b'<html>not found</html>'
+ cm.read.return_value = b"<html>not found</html>"
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
- self.assertEqual(str(cme.exception), 'ab/1.2 is not published in testy')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
+ self.assertEqual(
+ str(cme.exception), "ab/1.2 is not published in testy"
+ )
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_bad_package_ppa(self, mock_urlopen):
"""Unknown package with a PPA request"""
@@ -195,19 +229,29 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.geturl.return_value = 'http://mock.launchpad.net'
- cm.read.side_effect = [b'{"name": "overlay"}',
- b'{"name": "goodstuff"}']
+ cm.geturl.return_value = "http://mock.launchpad.net"
+ cm.read.side_effect = [
+ b'{"name": "overlay"}',
+ b'{"name": "goodstuff"}',
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'badpkg', ['ab/1.2'], 'joe',
- ppas=['team/overlay', 'joe/goodstuff'])
- self.assertEqual(str(cme.exception), 'Package badpkg does not have any test results')
+ self.submit.validate_distro_request(
+ "testy",
+ "C51",
+ "badpkg",
+ ["ab/1.2"],
+ "joe",
+ ppas=["team/overlay", "joe/goodstuff"],
+ )
+ self.assertEqual(
+ str(cme.exception), "Package badpkg does not have any test results"
+ )
self.assertEqual(mock_urlopen.call_count, 2)
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_nonexisting_trigger_ppa(self, mock_urlopen):
"""Trigger source package/version does not exist in PPA"""
@@ -216,20 +260,31 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.geturl.return_value = 'http://mock.launchpad.net'
- cm.read.side_effect = [b'{"name": "overlay"}',
- b'{"name": "goodstuff"}',
- b'{"total_size": 0}']
+ cm.geturl.return_value = "http://mock.launchpad.net"
+ cm.read.side_effect = [
+ b'{"name": "overlay"}',
+ b'{"name": "goodstuff"}',
+ b'{"total_size": 0}',
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe',
- ppas=['team/overlay', 'joe/goodstuff'])
- self.assertEqual(str(cme.exception), 'ab/1.2 is not published in PPA joe/goodstuff testy')
+ self.submit.validate_distro_request(
+ "testy",
+ "C51",
+ "blue",
+ ["ab/1.2"],
+ "joe",
+ ppas=["team/overlay", "joe/goodstuff"],
+ )
+ self.assertEqual(
+ str(cme.exception),
+ "ab/1.2 is not published in PPA joe/goodstuff testy",
+ )
self.assertEqual(mock_urlopen.call_count, 3)
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_no_upload_perm(self, mock_urlopen):
"""Requester is not allowed to upload package"""
@@ -238,19 +293,23 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.read.side_effect = [b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
- HTTPError('https://lp/checkUpload', 403, 'Forbidden', {}, None),
- HTTPError('https://lp/checkUpload', 403, 'Forbidden', {}, None),
- b'{"total_size": 1, "entries": [{"name": "joe2"}]}']
+ cm.read.side_effect = [
+ b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
+ HTTPError("https://lp/checkUpload", 403, "Forbidden", {}, None),
+ HTTPError("https://lp/checkUpload", 403, "Forbidden", {}, None),
+ b'{"total_size": 1, "entries": [{"name": "joe2"}]}',
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
- self.assertIn('not allowed to upload blue or ab', str(cme.exception))
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
+ self.assertIn("not allowed to upload blue or ab", str(cme.exception))
self.assertEqual(mock_urlopen.call_count, 4)
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_distro_ok(self, mock_urlopen):
"""Valid distro request is accepted"""
@@ -259,15 +318,19 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.read.side_effect = [b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
- b'true']
+ cm.read.side_effect = [
+ b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
+ b"true",
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
self.assertEqual(mock_urlopen.call_count, 2)
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_distro_all_proposed(self, mock_urlopen):
"""Valid distro request with all-proposed is accepted"""
@@ -276,24 +339,33 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.read.side_effect = [b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
- b'true']
+ cm.read.side_effect = [
+ b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
+ b"true",
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'],
- 'joe', **{'all-proposed': '1'})
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe", **{"all-proposed": "1"}
+ )
self.assertEqual(mock_urlopen.call_count, 2)
def test_distro_all_proposed_bad_value(self):
"""Valid distro request with invalid all-proposed value"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'],
- 'joe', **{'all-proposed': 'bogus'})
- self.assertIn('nvalid all-proposed value', str(cme.exception))
-
- @patch('request.submit.urllib.request.urlopen')
+ self.submit.validate_distro_request(
+ "testy",
+ "C51",
+ "blue",
+ ["ab/1.2"],
+ "joe",
+ **{"all-proposed": "bogus"}
+ )
+ self.assertIn("nvalid all-proposed value", str(cme.exception))
+
+ @patch("request.submit.urllib.request.urlopen")
def test_validate_distro_whitelisted_team(self, mock_urlopen):
"""Valid distro request via whitelisted team is accepted"""
@@ -302,17 +374,21 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.read.side_effect = [b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
- HTTPError('https://lp/checkUpload', 403, 'Forbidden', {}, None),
- HTTPError('https://lp/checkUpload', 403, 'Forbidden', {}, None),
- b'{"total_size": 1, "entries": [{"name": "joe"}]}']
+ cm.read.side_effect = [
+ b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
+ HTTPError("https://lp/checkUpload", 403, "Forbidden", {}, None),
+ HTTPError("https://lp/checkUpload", 403, "Forbidden", {}, None),
+ b'{"total_size": 1, "entries": [{"name": "joe"}]}',
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe')
+ self.submit.validate_distro_request(
+ "testy", "C51", "blue", ["ab/1.2"], "joe"
+ )
self.assertEqual(mock_urlopen.call_count, 4)
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_ppa_ok(self, mock_urlopen):
"""Valid PPA request is accepted"""
@@ -321,18 +397,26 @@ class DistroRequestValidationTests(SubmitTestBase):
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.read.side_effect = [b'{"name": "overlay"}',
- b'{"name": "goodstuff"}',
- # check if package is published in PPA
- b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
- # component name in Ubuntu archive
- b'{"total_size": 1, "entries": [{"component_name": "universe"}]}',
- b'true']
+ cm.read.side_effect = [
+ b'{"name": "overlay"}',
+ b'{"name": "goodstuff"}',
+ # check if package is published in PPA
+ b'{"total_size": 1, "entries": [{"component_name": "main"}]}',
+ # component name in Ubuntu archive
+ b'{"total_size": 1, "entries": [{"component_name": "universe"}]}',
+ b"true",
+ ]
cm.return_value = cm
mock_urlopen.return_value = cm
- self.submit.validate_distro_request('testy', 'C51', 'blue', ['ab/1.2'], 'joe',
- ppas=['team/overlay', 'joe/goodstuff'])
+ self.submit.validate_distro_request(
+ "testy",
+ "C51",
+ "blue",
+ ["ab/1.2"],
+ "joe",
+ ppas=["team/overlay", "joe/goodstuff"],
+ )
self.assertEqual(mock_urlopen.call_count, 5)
@@ -341,140 +425,213 @@ class GitRequestValidationTests(SubmitTestBase):
def test_bad_release(self):
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('fooly', 'C51', 'ab', **{'build-git': 'https://x.com/proj'})
- self.assertEqual(str(cme.exception), 'Unknown release fooly')
+ self.submit.validate_git_request(
+ "fooly", "C51", "ab", **{"build-git": "https://x.com/proj"}
+ )
+ self.assertEqual(str(cme.exception), "Unknown release fooly")
def test_bad_arch(self):
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'wut', 'a!b', **{'build-git': 'https://x.com/proj'})
- self.assertEqual(str(cme.exception), 'Unknown architecture wut')
+ self.submit.validate_git_request(
+ "testy", "wut", "a!b", **{"build-git": "https://x.com/proj"}
+ )
+ self.assertEqual(str(cme.exception), "Unknown architecture wut")
def test_bad_package(self):
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'a!b', **{'build-git': 'https://x.com/proj'})
- self.assertEqual(str(cme.exception), 'Malformed package')
+ self.submit.validate_git_request(
+ "testy", "C51", "a!b", **{"build-git": "https://x.com/proj"}
+ )
+ self.assertEqual(str(cme.exception), "Malformed package")
- @patch('request.submit.urllib.request.urlopen')
+ @patch("request.submit.urllib.request.urlopen")
def test_unknown_ppa(self, mock_urlopen):
# mock Launchpad response: successful form, but no match
cm = MagicMock()
cm.__enter__.return_value = cm
cm.getcode.return_value = 200
- cm.geturl.return_value = 'http://mock.launchpad.net'
- cm.read.return_value = b'{}'
+ cm.geturl.return_value = "http://mock.launchpad.net"
+ cm.read.return_value = b"{}"
cm.return_value = cm
mock_urlopen.return_value = cm
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab', ['bad/ppa'],
- **{'build-git': 'https://x.com/proj'})
- self.assertEqual(str(cme.exception), 'Unknown PPA bad/ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ ["bad/ppa"],
+ **{"build-git": "https://x.com/proj"}
+ )
+ self.assertEqual(str(cme.exception), "Unknown PPA bad/ppa")
self.assertEqual(mock_urlopen.call_count, 1)
- @patch('request.submit.Submit.is_valid_ppa')
+ @patch("request.submit.Submit.is_valid_ppa")
def test_bad_env(self, is_valid_ppa):
is_valid_ppa.return_value = True
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab',
- env=['foo=1', 'bar=1\n='],
- **{'build-git': 'https://x.com/proj',
- 'ppas': ['a/b']})
- self.assertIn('Invalid environment', str(cme.exception))
- self.assertIn('bar=1', str(cme.exception))
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ env=["foo=1", "bar=1\n="],
+ **{"build-git": "https://x.com/proj", "ppas": ["a/b"]}
+ )
+ self.assertIn("Invalid environment", str(cme.exception))
+ self.assertIn("bar=1", str(cme.exception))
def test_no_ppa(self):
"""No PPA"""
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'https://x.com/proj'})
- self.assertEqual(str(cme.exception), 'Must specify at least one PPA (to associate results with)')
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy", "C51", "ab", **{"build-git": "https://x.com/proj"}
+ )
+ self.assertEqual(
+ str(cme.exception),
+ "Must specify at least one PPA (to associate results with)",
+ )
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_bad_git_url(self, is_valid_ppa):
is_valid_ppa.return_value = True
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab', **{'build-git': 'foo://x.com/proj',
- 'ppas': ['a/b']})
- self.assertEqual(str(cme.exception), 'Malformed build-git')
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{"build-git": "foo://x.com/proj", "ppas": ["a/b"]}
+ )
+ self.assertEqual(str(cme.exception), "Malformed build-git")
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_unknown_param(self, is_valid_ppa):
is_valid_ppa.return_value = True
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'http://x.com/proj', 'ppas': ['a/b'],
- 'foo': 'bar'})
- self.assertEqual(str(cme.exception), 'Unsupported arguments: foo')
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{
+ "build-git": "http://x.com/proj",
+ "ppas": ["a/b"],
+ "foo": "bar",
+ }
+ )
+ self.assertEqual(str(cme.exception), "Unsupported arguments: foo")
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_bad_testname(self, is_valid_ppa):
is_valid_ppa.return_value = True
with self.assertRaises(ValueError) as cme:
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'http://x.com/proj', 'testname': 'a !',
- 'ppas': ['a/b']})
- self.assertEqual(str(cme.exception), 'Malformed testname')
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{
+ "build-git": "http://x.com/proj",
+ "testname": "a !",
+ "ppas": ["a/b"],
+ }
+ )
+ self.assertEqual(str(cme.exception), "Malformed testname")
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_valid(self, is_valid_ppa):
is_valid_ppa.return_value = True
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'http://x.com/proj',
- 'env': ['STATUS_URL=https://api.github.com/proj/123deadbeef'],
- 'ppas': ['a/b']})
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{
+ "build-git": "http://x.com/proj",
+ "env": ["STATUS_URL=https://api.github.com/proj/123deadbeef"],
+ "ppas": ["a/b"],
+ }
+ )
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_branch(self, is_valid_ppa):
is_valid_ppa.return_value = True
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'http://x.com/proj#refs/pull/2/head',
- 'env': ['STATUS_URL=https://api.github.com/proj/123deadbeef'],
- 'ppas': ['a/b']})
-
- @patch('request.submit.Submit.is_valid_ppa')
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{
+ "build-git": "http://x.com/proj#refs/pull/2/head",
+ "env": ["STATUS_URL=https://api.github.com/proj/123deadbeef"],
+ "ppas": ["a/b"],
+ }
+ )
+
+ @patch("request.submit.Submit.is_valid_ppa")
def test_valid_testname(self, is_valid_ppa):
is_valid_ppa.return_value = True
- self.submit.validate_git_request('testy', 'C51', 'ab',
- **{'build-git': 'http://x.com/proj',
- 'testname': 'first',
- 'env': ['STATUS_URL=https://api.github.com/proj/123deadbeef'],
- 'ppas': ['a/b']})
+ self.submit.validate_git_request(
+ "testy",
+ "C51",
+ "ab",
+ **{
+ "build-git": "http://x.com/proj",
+ "testname": "first",
+ "env": ["STATUS_URL=https://api.github.com/proj/123deadbeef"],
+ "ppas": ["a/b"],
+ }
+ )
class SendAMQPTests(SubmitTestBase):
"""Test test request sending via AMQP"""
- @patch('request.submit.amqp.Connection')
- @patch('request.submit.amqp.Message')
+ @patch("request.submit.amqp.Connection")
+ @patch("request.submit.amqp.Message")
def test_valid_request(self, message_con, mock_con):
# mostly a passthrough, but ensure that we do wrap the string in Message()
- message_con.side_effect = lambda x: '>%s<' % x
-
- self.submit.send_amqp_request('testy', 'C51', 'foo', triggers=['ab/1'],
- requester='joe', ppas=['my/ppa'])
- mock_con.assert_called_once_with('1.2.3.4', userid='user', password='s3kr1t')
+ message_con.side_effect = lambda x: ">%s<" % x
+
+ self.submit.send_amqp_request(
+ "testy",
+ "C51",
+ "foo",
+ triggers=["ab/1"],
+ requester="joe",
+ ppas=["my/ppa"],
+ )
+ mock_con.assert_called_once_with(
+ "1.2.3.4", userid="user", password="s3kr1t"
+ )
# with amqp.Connection() as con:
cm_amqp_con = mock_con.return_value.__enter__.return_value
# with con.channel() as ch:
cm_channel = cm_amqp_con.channel.return_value.__enter__.return_value
cm_channel.basic_publish.assert_called_once_with(
'>foo {"ppas": ["my/ppa"], "requester": "joe", "triggers": ["ab/1"]}<',
- routing_key='debci-testy-C51')
+ routing_key="debci-testy-C51",
+ )
- @patch('request.submit.amqp.Connection')
- @patch('request.submit.amqp.Message')
+ @patch("request.submit.amqp.Connection")
+ @patch("request.submit.amqp.Message")
def test_valid_request_context(self, message_con, mock_con):
# mostly a passthrough, but ensure that we do wrap the string in Message()
- message_con.side_effect = lambda x: '>%s<' % x
-
- self.submit.send_amqp_request('testy', 'C51', 'foo', triggers=['ab/1'],
- requester='joe', context='ppa', ppas=['my/ppa'])
- mock_con.assert_called_once_with('1.2.3.4', userid='user', password='s3kr1t')
+ message_con.side_effect = lambda x: ">%s<" % x
+
+ self.submit.send_amqp_request(
+ "testy",
+ "C51",
+ "foo",
+ triggers=["ab/1"],
+ requester="joe",
+ context="ppa",
+ ppas=["my/ppa"],
+ )
+ mock_con.assert_called_once_with(
+ "1.2.3.4", userid="user", password="s3kr1t"
+ )
# with amqp.Connection() as con:
cm_amqp_con = mock_con.return_value.__enter__.return_value
# with con.channel() as ch:
cm_channel = cm_amqp_con.channel.return_value.__enter__.return_value
cm_channel.basic_publish.assert_called_once_with(
'>foo {"ppas": ["my/ppa"], "requester": "joe", "triggers": ["ab/1"]}<',
- routing_key='debci-ppa-testy-C51')
+ routing_key="debci-ppa-testy-C51",
+ )
diff --git a/charms/focal/autopkgtest-web/webcontrol/setup.py b/charms/focal/autopkgtest-web/webcontrol/setup.py
index 3ba7ad8..5f5a996 100644
--- a/charms/focal/autopkgtest-web/webcontrol/setup.py
+++ b/charms/focal/autopkgtest-web/webcontrol/setup.py
@@ -1,13 +1,13 @@
-'''
+"""
Setup file for autopkgtest-web python package
-'''
+"""
from setuptools import find_packages, setup
setup(
- name='webcontrol',
- version='0.0',
- description='autopkgtest web control',
- author='Ubuntu Foundations',
+ name="webcontrol",
+ version="0.0",
+ description="autopkgtest web control",
+ author="Ubuntu Foundations",
packages=find_packages(),
include_package_data=True,
)
diff --git a/charms/focal/autopkgtest-web/webcontrol/update-github-jobs b/charms/focal/autopkgtest-web/webcontrol/update-github-jobs
index a0119ec..334722f 100755
--- a/charms/focal/autopkgtest-web/webcontrol/update-github-jobs
+++ b/charms/focal/autopkgtest-web/webcontrol/update-github-jobs
@@ -1,22 +1,20 @@
#!/usr/bin/python3
-import os
-import json
import configparser
-import logging
-import time
import io
+import json
+import logging
+import os
import sys
import tarfile
import time
-import urllib.request
import urllib.parse
+import urllib.request
from urllib.error import HTTPError
from request.submit import Submit
-
-PENDING_DIR = '/run/autopkgtest_webcontrol/github-pending'
+PENDING_DIR = "/run/autopkgtest_webcontrol/github-pending"
swift_url = None
external_url = None
@@ -25,65 +23,76 @@ def result_matches_job(result_url, params):
# download result.tar and get exit code and testinfo
for retry in range(5):
try:
- with urllib.request.urlopen(result_url + '/result.tar') as f:
+ with urllib.request.urlopen(result_url + "/result.tar") as f:
tar_bytes = io.BytesIO(f.read())
break
except IOError as e:
- logging.debug('failed to download result %s: %s, retrying...', result_url, e)
+ logging.debug(
+ "failed to download result %s: %s, retrying...", result_url, e
+ )
time.sleep(1)
else:
- logging.error('failed to download result %s', result_url)
+ logging.error("failed to download result %s", result_url)
return
try:
- with tarfile.open(None, 'r', tar_bytes) as tar:
- exitcode = int(tar.extractfile('exitcode').read().strip())
- info = json.loads(tar.extractfile('testinfo.json').read().decode())
+ with tarfile.open(None, "r", tar_bytes) as tar:
+ exitcode = int(tar.extractfile("exitcode").read().strip())
+ info = json.loads(tar.extractfile("testinfo.json").read().decode())
except (KeyError, ValueError, tarfile.TarError) as e:
- logging.error('broken result %s: %s', result_url, e)
+ logging.error("broken result %s: %s", result_url, e)
return
try:
- result_env = info['custom_environment']
+ result_env = info["custom_environment"]
except KeyError:
- logging.info('result has no custom_environment, ignoring')
+ logging.info("result has no custom_environment, ignoring")
return
# if the test result has the same parameters than the job, we have a winner
- if result_env != params['env']:
- logging.debug('exit code: %i, ignoring due to different test env: %s',
- exitcode, result_env)
+ if result_env != params["env"]:
+ logging.debug(
+ "exit code: %i, ignoring due to different test env: %s",
+ exitcode,
+ result_env,
+ )
return
- logging.debug('exit code: %i, test env matches job: %s',
- exitcode, result_env)
+ logging.debug(
+ "exit code: %i, test env matches job: %s", exitcode, result_env
+ )
return exitcode
def finish_job(jobfile, params, code, log_url):
- '''Tell GitHub that job is complete and delete the job file'''
+ """Tell GitHub that job is complete and delete the job file"""
if code in (0, 2):
- state = 'success'
+ state = "success"
elif code in (4, 6, 12):
- state = 'failure'
+ state = "failure"
else:
- state = 'error'
+ state = "error"
- data = {'state': state,
- 'context': '%s-%s' % (params['release'], params['arch']),
- 'description': 'autopkgtest finished (%s)' % state,
- 'target_url': log_url}
+ data = {
+ "state": state,
+ "context": "%s-%s" % (params["release"], params["arch"]),
+ "description": "autopkgtest finished (%s)" % state,
+ "target_url": log_url,
+ }
# find status URL
- for e in params['env']:
- if e.startswith('GITHUB_STATUSES_URL='):
- statuses_url = e.split('=', 1)[1]
+ for e in params["env"]:
+ if e.startswith("GITHUB_STATUSES_URL="):
+ statuses_url = e.split("=", 1)[1]
# tell GitHub about the result
- Submit.post_json(statuses_url, data,
- os.path.expanduser('~/github-status-credentials.txt'),
- params['package'])
-
- logging.debug('removing job file')
+ Submit.post_json(
+ statuses_url,
+ data,
+ os.path.expanduser("~/github-status-credentials.txt"),
+ params["package"],
+ )
+
+ logging.debug("removing job file")
os.unlink(jobfile)
@@ -96,56 +105,73 @@ def process_job(jobfile):
logging.error("couldn't read %s, skipping: %s", jobfile, e)
return
- logging.debug('\n\n--------------------\nprocessing job %s:\n %s',
- os.path.basename(jobfile), params)
+ logging.debug(
+ "\n\n--------------------\nprocessing job %s:\n %s",
+ os.path.basename(jobfile),
+ params,
+ )
# fetch Swift results for this request
- container = 'autopkgtest-' + params['release']
+ container = "autopkgtest-" + params["release"]
try:
- container += '-' + params['ppas'][-1].replace('/', '-')
+ container += "-" + params["ppas"][-1].replace("/", "-")
except (KeyError, IndexError):
pass
container_url = os.path.join(swift_url, container)
- package = params['package']
- pkghash = package.startswith('lib') and package[:4] or package[0]
- timestamp = time.strftime('%Y%m%d_%H%M%S', time.gmtime(mtime))
-
- args = {'format': 'plain',
- 'delimiter': '@',
- 'prefix': '%s/%s/%s/%s/' % (params['release'], params['arch'],
- pkghash, package)}
- args['marker'] = args['prefix'] + timestamp
- query_url = container_url + '?' + urllib.parse.urlencode(args)
- logging.debug('Swift URL query: %s', query_url)
+ package = params["package"]
+ pkghash = package.startswith("lib") and package[:4] or package[0]
+ timestamp = time.strftime("%Y%m%d_%H%M%S", time.gmtime(mtime))
+
+ args = {
+ "format": "plain",
+ "delimiter": "@",
+ "prefix": "%s/%s/%s/%s/"
+ % (params["release"], params["arch"], pkghash, package),
+ }
+ args["marker"] = args["prefix"] + timestamp
+ query_url = container_url + "?" + urllib.parse.urlencode(args)
+ logging.debug("Swift URL query: %s", query_url)
try:
with urllib.request.urlopen(query_url) as f:
for result in f:
- result_url = os.path.join(container_url, result.strip().decode())
- logging.debug('checking result %s for job %s',
- result_url, os.path.basename(jobfile))
+ result_url = os.path.join(
+ container_url, result.strip().decode()
+ )
+ logging.debug(
+ "checking result %s for job %s",
+ result_url,
+ os.path.basename(jobfile),
+ )
code = result_matches_job(result_url, params)
if code is not None:
- finish_job(jobfile, params, code,
- result_url.replace(swift_url, external_url) + '/log.gz')
+ finish_job(
+ jobfile,
+ params,
+ code,
+ result_url.replace(swift_url, external_url)
+ + "/log.gz",
+ )
break
except HTTPError as e:
- logging.error('job %s URL %s failed: %s', os.path.basename(jobfile), query_url, e)
+ logging.error(
+ "job %s URL %s failed: %s", os.path.basename(jobfile), query_url, e
+ )
if e.code == 404:
os.unlink(jobfile)
-if __name__ == '__main__':
- if 'DEBUG' in os.environ:
- logging.basicConfig(level='DEBUG')
+if __name__ == "__main__":
+ if "DEBUG" in os.environ:
+ logging.basicConfig(level="DEBUG")
if not os.path.isdir(PENDING_DIR):
- logging.info('%s does not exist, nothing to do', PENDING_DIR)
+ logging.info("%s does not exist, nothing to do", PENDING_DIR)
sys.exit(0)
config = configparser.ConfigParser()
- config.read(os.path.expanduser('~ubuntu/autopkgtest-cloud.conf'))
- swift_url = config['web']['SwiftURL']
+ config.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+ swift_url = config["web"]["SwiftURL"]
try:
- external_url = config['web']['ExternalURL']
+ external_url = config["web"]["ExternalURL"]
except KeyError:
external_url = swift_url
diff --git a/ci/lint_test b/ci/lint_test
index 1742fb2..3fee7f5 100755
--- a/ci/lint_test
+++ b/ci/lint_test
@@ -1,19 +1,19 @@
#!/usr/bin/python3
# pylint: disable = invalid-name, broad-except, subprocess-run-check
-'''
+"""
Script to lint the scripts in the autopkgtest-cloud repository in CI
-'''
-import pathlib
-import os
-import sys
+"""
import logging
+import os
+import pathlib
import subprocess
+import sys
def check_for_extension(input_list, output_list, extension):
- '''
+ """
Checks filepaths in a list for a given extension
- '''
+ """
for a in input_list:
if os.path.isfile(a):
# if str(a)[-3:] == extension:
@@ -23,13 +23,13 @@ def check_for_extension(input_list, output_list, extension):
def check_for_shebang(input_list, output_list, shebang):
- '''
+ """
Checks filepaths in a given list for a given shebang
- '''
+ """
for b in input_list:
if os.path.isfile(b):
try:
- with open(b, 'r', encoding='utf-8') as myfile:
+ with open(b, "r", encoding="utf-8") as myfile:
file = myfile.read()
into_list = file.splitlines()
if len(into_list) > 1:
@@ -41,9 +41,9 @@ def check_for_shebang(input_list, output_list, shebang):
def remove_list_from_list(input_list, remove_list):
- '''
+ """
Removes elements from remove_list from input_list
- '''
+ """
for ff in input_list:
if os.path.isfile(ff):
if str(ff) in remove_list:
@@ -52,10 +52,10 @@ def remove_list_from_list(input_list, remove_list):
def run_lint_command(files_to_lint, lint_command, arguments=None):
- '''
+ """
Runs a given lint command over a list of filepaths and stores output
and exit code
- '''
+ """
exit_codes = 0
lint_output = ""
# check lint command exists
@@ -74,9 +74,9 @@ def run_lint_command(files_to_lint, lint_command, arguments=None):
return lint_output, exit_codes
-if __name__=="__main__":
+if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger('autopkgtest-cloud-linter')
+ logger = logging.getLogger("autopkgtest-cloud-linter")
start_dir = "../"
repo_dir = pathlib.Path(start_dir)
@@ -92,21 +92,30 @@ if __name__=="__main__":
"shebangs": ["#!/usr/bin/python3"],
"args": None,
"output": "",
- "code": 0
+ "code": 0,
},
}
for key, item in data.items():
if item["extensions"] is not None:
for extension in item["extensions"]:
- data[key]["files"] = check_for_extension(all_files, data[key]["files"], extension)
- all_files = remove_list_from_list(all_files, data[key]["files"])
+ data[key]["files"] = check_for_extension(
+ all_files, data[key]["files"], extension
+ )
+ all_files = remove_list_from_list(
+ all_files, data[key]["files"]
+ )
if item["shebangs"] is not None:
for shebang in item["shebangs"]:
- data[key]["files"] = check_for_shebang(all_files, data[key]["files"], shebang)
- all_files = remove_list_from_list(all_files, data[key]["files"])
- data[key]["output"], \
- data[key]["code"] = run_lint_command(data[key]["files"], key, data[key]["args"])
+ data[key]["files"] = check_for_shebang(
+ all_files, data[key]["files"], shebang
+ )
+ all_files = remove_list_from_list(
+ all_files, data[key]["files"]
+ )
+ data[key]["output"], data[key]["code"] = run_lint_command(
+ data[key]["files"], key, data[key]["args"]
+ )
ecodesum = 0
for _, oec in data.items():
ecodesum += oec["code"]
diff --git a/docs/conf.py b/docs/conf.py
index a46ffdc..2daedbd 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -1,7 +1,7 @@
-'''
+"""
Configuration file for the Sphinx documentation builder.
-'''
-#pylint: disable=redefined-builtin
+"""
+# pylint: disable=redefined-builtin
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
@@ -20,9 +20,9 @@ Configuration file for the Sphinx documentation builder.
# -- Project information -----------------------------------------------------
-project = 'autopkgtest-cloud'
-copyright = '2021, Canonical Ltd'
-author = 'Iain Lane <iain.lane@xxxxxxxxxxxxx>'
+project = "autopkgtest-cloud"
+copyright = "2021, Canonical Ltd"
+author = "Iain Lane <iain.lane@xxxxxxxxxxxxx>"
# -- General configuration ---------------------------------------------------
@@ -30,18 +30,15 @@ author = 'Iain Lane <iain.lane@xxxxxxxxxxxxx>'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
-extensions = [
- 'sphinx.ext.autosectionlabel',
- 'sphinx.ext.graphviz'
-]
+extensions = ["sphinx.ext.autosectionlabel", "sphinx.ext.graphviz"]
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
+templates_path = ["_templates"]
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
-exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
+exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
# -- Options for HTML output -------------------------------------------------
@@ -49,9 +46,9 @@ exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
-html_theme = 'alabaster'
+html_theme = "alabaster"
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
+html_static_path = ["_static"]
diff --git a/mojo/add-floating-ip b/mojo/add-floating-ip
index 01dd53c..57c9839 100755
--- a/mojo/add-floating-ip
+++ b/mojo/add-floating-ip
@@ -11,24 +11,28 @@ import subprocess
import novaclient.client
-
-SECRETS_DIR = "/srv/mojo/LOCAL/{MOJO_PROJECT}/{MOJO_STAGE}/".format(**os.environ)
+SECRETS_DIR = "/srv/mojo/LOCAL/{MOJO_PROJECT}/{MOJO_STAGE}/".format(
+ **os.environ
+)
status = json.loads(
- subprocess.check_output(['juju', 'status', '--format=json']))
+ subprocess.check_output(["juju", "status", "--format=json"])
+)
services = status.get("applications")
if services is None:
services = status["services"]
-nova_tenant = novaclient.client.Client('1.1',
- os.environ['OS_USERNAME'],
- os.environ['OS_PASSWORD'],
- os.environ['OS_TENANT_NAME'],
- os.environ['OS_AUTH_URL'])
+nova_tenant = novaclient.client.Client(
+ "1.1",
+ os.environ["OS_USERNAME"],
+ os.environ["OS_PASSWORD"],
+ os.environ["OS_TENANT_NAME"],
+ os.environ["OS_AUTH_URL"],
+)
def get_ip_pool():
- pool = os.environ.get('MOJO_FLOATING_IP_POOL')
+ pool = os.environ.get("MOJO_FLOATING_IP_POOL")
if pool is not None:
return pool
@@ -43,7 +47,7 @@ def units_in_service(service_name):
def machine_of_unit(unit_name):
- service_name, _ = unit_name.split('/', 1)
+ service_name, _ = unit_name.split("/", 1)
unit = services[service_name]["units"][unit_name]
machine_no = unit["machine"]
@@ -64,10 +68,10 @@ def get_unit_floating_ip(unit_name):
except Exception:
pass
- unitfn = os.path.join(SECRETS_DIR, unit_name.replace('/', '.') + ".ip")
+ unitfn = os.path.join(SECRETS_DIR, unit_name.replace("/", ".") + ".ip")
# Rename older standard
- oldunitfn = os.path.join(SECRETS_DIR, unit_name.replace('/', '_'))
+ oldunitfn = os.path.join(SECRETS_DIR, unit_name.replace("/", "_"))
try:
os.rename(oldunitfn, unitfn)
except Exception:
@@ -93,15 +97,25 @@ def get_unit_floating_ip(unit_name):
except Exception:
# If this happens you're going to need to either get that back in the list,
# or blow away the state file so it gets a new IP.
- raise(RuntimeError("Desired IP {} not in floating ips list!".format(myip)))
+ raise (
+ RuntimeError(
+ "Desired IP {} not in floating ips list!".format(myip)
+ )
+ )
if fip.instance_id:
# If it's already associated, ensure it's associated to us
- machine_id = machine.get('Id')
+ machine_id = machine.get("Id")
if machine_id is None:
- machine_id = machine['instance-id']
- if (fip.instance_id != machine_id):
- raise(RuntimeError("IP {} is associated, but not to {}!".format(myip, unit_name)))
+ machine_id = machine["instance-id"]
+ if fip.instance_id != machine_id:
+ raise (
+ RuntimeError(
+ "IP {} is associated, but not to {}!".format(
+ myip, unit_name
+ )
+ )
+ )
return myip
# Go associate it now
@@ -111,7 +125,8 @@ def get_unit_floating_ip(unit_name):
def usage():
- print("""Usage: {} [SERVICE|UNIT]
+ print(
+ """Usage: {} [SERVICE|UNIT]
# Add a floating IP to the apache2/0 unit:
add-floating-ip apache2/0
@@ -125,7 +140,10 @@ add-floating-ip haproxy squid
# Add floating IPs to the apache2/0 and apache2/1 units:
export targets="apache2/0 apache2/1"
add-floating-ip
-""".format("add-floating-ip"))
+""".format(
+ "add-floating-ip"
+ )
+ )
def main():
@@ -133,8 +151,8 @@ def main():
if len(sys.argv) >= 2:
args = sys.argv[1:]
- elif 'targets' in os.environ:
- args = os.environ['targets'].split()
+ elif "targets" in os.environ:
+ args = os.environ["targets"].split()
else:
return usage()
@@ -153,5 +171,5 @@ def main():
print("{}: {}".format(unit_name, ip))
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..d84cc51
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,6 @@
+[tool.black]
+line-length = 79
+
+[tool.isort]
+profile = "black"
+line_length = 79