canonical-ubuntu-qa team mailing list archive
canonical-ubuntu-qa team
Mailing list archive
Message #03555
Re: [Merge] ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master
Review: Needs Fixing
I think that's too often, the script is fairly intense and will mess up the KPIs by purging and requeuing, I think it makes more sense to run manually if needed.
Merging triggers also _normally_ does not work:
The reason why the script with merging the triggers work now is not a general situation. We can merge triggers when we run with all-proposed, since we use all proposed anyway. But in normal archive situations, we don't actually want to merge triggers. We want to be able to trigger e.g. apt for a new gnutls and a new db5.3 individually so that we get more relevant results, rather than test combinations.
For a regular cleanup my suggestion would be to just walk the amqp queue and ack any message that has an obsolete trigger or _potentially_ that we have seen a superset of triggers for already. We used to have a filter-amqp script that did that on the old infrastructure, or rather it did the first part of it.
Note that as you saw before two concurrent walks of the queue will also messup the KPI so it's annoying to run this as a regular script either. I guess you'd want to extend cache-amqp to also be able to do maintenance or make sure they have After= relationships.
Finally this of course doesn't just potentially run in parallel with cache-amqp, but also AFAICT it runs on all nodes right now rather than just on the leader which is unnecessary and again will cause KPI confusion due to parallel full queue reads.
Diff comments:
> diff --git a/charms/focal/autopkgtest-web/units/queue-merger.timer b/charms/focal/autopkgtest-web/units/queue-merger.timer
> new file mode 100644
> index 0000000..3f405cf
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/units/queue-merger.timer
> @@ -0,0 +1,8 @@
> +[Unit]
> +Description=Remove obsolete queue entries.
> +
> +[Timer]
> +OnCalendar=*:0/5
I think that's too often
> +
> +[Install]
> diff --git a/charms/focal/autopkgtest-web/webcontrol/queue-merger b/charms/focal/autopkgtest-web/webcontrol/queue-merger
> new file mode 100755
> index 0000000..96395d7
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/queue-merger
> @@ -0,0 +1,184 @@
> +#!/usr/bin/python3
> +# pylint: disable=c-extension-no-member
> +
> +import argparse
> +import configparser
> +import itertools
> +import json
> +import logging
> +import os
> +import urllib.parse
> +import uuid
> +from datetime import datetime
> +
> +import amqplib.client_0_8 as amqp
> +import apt.progress.base
> +import apt_pkg
> +
> +
> +def get_queue(release, arch, context, all_queues):
> + """Build a single queue of package, args tuples."""
> + try:
> + queue = [
> + (i.split()[0], json.loads(i.split(None, 1)[1]))
> + for i in all_queues["queues"][context][release][arch]["requests"]
> + ]
> + except KeyError as _:
> + return []
> + return queue
> +
> +
> +def get_proposed_packages():
> + """Get all valid triggers in proposed, returns set of package/version strings."""
> + tmpdir = "merge-requests.tmp"
> + os.makedirs(os.path.join(tmpdir, "etc/apt"), exist_ok=True)
> + os.makedirs(os.path.join(tmpdir, "var/lib/apt"), exist_ok=True)
> + with open(os.path.join(tmpdir, "etc/apt/sources.list"), "w") as sources:
> + # pylint: disable=line-too-long
> + print(
> + "deb-src [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] noble-proposed main universe restricted multiverse",
You are going to have to pass the release you are actually testing into this function and write it here.
> + file=sources,
> + )
> +
> + apt_pkg.config["Dir"] = tmpdir
> + apt_pkg.init()
> + sl = apt_pkg.SourceList()
> + sl.read_main_list()
> + cache = apt_pkg.Cache(None)
> + cache.update(apt.progress.base.AcquireProgress(), sl)
> +
> + srcrecords = apt_pkg.SourceRecords()
> +
> + proposed = set()
> + while srcrecords.step():
> + proposed.add(srcrecords.package + "/" + srcrecords.version)
> + return proposed
> +
> +
> +def merge_queue(queue, proposed):
> + """Calculate the merged queue"""
> + out = []
> + for package, runs in itertools.groupby(
> + sorted(queue, key=lambda i: i[0]), lambda i: i[0]
> + ):
> + triggers = set()
> + for package, args in runs:
> + for trigger in args["triggers"]:
> + if trigger in proposed or trigger == "migration-reference/0":
> + triggers.add(trigger)
> + # uuid in here I believe.
> + test_uuid = args.get("uuid", str(uuid.uuid4()))
> +
> + if "migration-reference/0" in triggers:
> + out.append(
> + (
> + package,
> + {"triggers": ["migration-reference/0"], "uuid": test_uuid},
> + )
> + )
> + triggers.remove("migration-reference/0")
> + if triggers:
> + out.append(
> + (
> + package,
> + {
> + "all-proposed": "1",
We don't usually always want to do all-proposed, my script worked fine as a hack for the current noble situation, but here you want to ensure that you do not merge all-proposed and normal ones.
> + "triggers": sorted(triggers),
> + "uuid": test_uuid,
> + },
> + )
> + )
> + return out
> +
> +
> +def amqp_setup():
> + cfg = configparser.ConfigParser()
> + with open("/home/ubuntu/autopkgtest-cloud.conf", "r") as f:
> + cfg.read_file(f)
> + amqp_url = cfg["amqp"]["uri"]
> + url_parts = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
> + amqp_con = amqp.Connection(
> + url_parts.hostname,
> + userid=url_parts.username,
> + password=url_parts.password,
> + )
> + return amqp_con
> +
> +
> +def clear_queue(amqp_con: amqp.Connection, queue_name: str):
> + with as ch:
> + while True:
> + r = ch.basic_get(queue_name)
> + if r is None:
> + return
> + ch.basic_ack(r.delivery_tag)
> +
> +
> +def queue_a_test(
> + amqp_message: str, queue_name: str, amqp_con: amqp.Connection
> +):
> + with as ch:
> + ch.basic_publish(
> + amqp.Message(amqp_message, delivery_mode=2),
> + routing_key=queue_name,
> + )
> +
> +
> +def main():
> + logging.basicConfig(level=logging.INFO)
> +"Starting queue merging...")
> + # just needs logging.
> + parser = argparse.ArgumentParser()
> + parser.add_argument("--dry-run", action="store_true", help="Dry run")
> + args = parser.parse_args()
> + with open("/var/lib/cache-amqp/queued.json", "r") as f:
Since we actually clear the queue using amqp, we should maybe just collect the queue state when reading it. This avoids some test losses between last cache-amqp update and the script running.
> + all_queue_info = json.load(f)
> + # we're ignoring upstream requests for this.
> + contexts = ["huge", "ppa", "ubuntu"]
> + arches = all_queue_info.get("arches")
> + releases = all_queue_info.get("releases")
> + proposed_packages = get_proposed_packages()
> + amqp_con = amqp_setup()
> + for context in contexts:
> + for release in releases:
> + for arch in arches:
> + queue_name = (
> + f"debci-{context}-{release}-{arch}"
> + if context != "ubuntu"
> + else f"debci-{release}-{arch}"
> + )
> + individual_queue = get_queue(
> + release, arch, context, all_queue_info
> + )
> + merged_queue = merge_queue(individual_queue, proposed_packages)
> + # We need to empty the specified queue here.
> + if args.dry_run:
> +
> + f"Would empty {len(individual_queue)} items from {queue_name}"
> + )
> +
> + f"Would submit {len(merged_queue)} items to {queue_name}"
> + )
> + else:
> +
> + f"Emptying {len(individual_queue)} items from {queue_name}"
> + )
> + clear_queue(amqp_con, queue_name)
> + # Add the pruned queue
> +
> + f"Adding {len(merged_queue)} items to {queue_name}"
> + )
> + for pkg, run_args in merged_queue:
> + message_params = run_args
> + message_params["submit-time"] = datetime.strftime(
> + datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
> + )
> + amqp_message = f"{pkg}\n{json.dumps(message_params)}"
> + if args.dry_run:
> + print(f"Would submit {amqp_message} to {queue_name}")
> + else:
> + queue_a_test(amqp_message, queue_name, amqp_con)
> +
> +
> +if __name__ == "__main__":
> + main()
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.