canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03550
[Merge] ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master
Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.
Requested reviews:
Canonical's Ubuntu QA (canonical-ubuntu-qa)
For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/463146
--
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/queue-merger.service b/charms/focal/autopkgtest-web/units/queue-merger.service
new file mode 100644
index 0000000..5a89a65
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/queue-merger.service
@@ -0,0 +1,7 @@
+[Unit]
+Description=Remove obsolete queue entries.
+
+[Service]
+Type=oneshot
+User=ubuntu
+ExecStart=/home/ubuntu/webcontrol/queue-merger
diff --git a/charms/focal/autopkgtest-web/units/queue-merger.timer b/charms/focal/autopkgtest-web/units/queue-merger.timer
new file mode 100644
index 0000000..3f405cf
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/queue-merger.timer
@@ -0,0 +1,8 @@
+[Unit]
+Description=Remove obsolete queue entries.
+
+[Timer]
+OnCalendar=*:0/5
+
+[Install]
+WantedBy=autopkgtest-web.target
diff --git a/charms/focal/autopkgtest-web/webcontrol/queue-merger b/charms/focal/autopkgtest-web/webcontrol/queue-merger
new file mode 100755
index 0000000..96395d7
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/queue-merger
@@ -0,0 +1,184 @@
+#!/usr/bin/python3
+# pylint: disable=c-extension-no-member
+
+import argparse
+import configparser
+import itertools
+import json
+import logging
+import os
+import urllib.parse
+import uuid
+from datetime import datetime
+
+import amqplib.client_0_8 as amqp
+import apt.progress.base
+import apt_pkg
+
+
+def get_queue(release, arch, context, all_queues):
+ """Build a single queue of package, args tuples."""
+ try:
+ queue = [
+ (i.split()[0], json.loads(i.split(None, 1)[1]))
+ for i in all_queues["queues"][context][release][arch]["requests"]
+ ]
+ except KeyError as _:
+ return []
+ return queue
+
+
+def get_proposed_packages():
+ """Get all valid triggers in proposed, returns set of package/version strings."""
+ tmpdir = "merge-requests.tmp"
+ os.makedirs(os.path.join(tmpdir, "etc/apt"), exist_ok=True)
+ os.makedirs(os.path.join(tmpdir, "var/lib/apt"), exist_ok=True)
+ with open(os.path.join(tmpdir, "etc/apt/sources.list"), "w") as sources:
+ # pylint: disable=line-too-long
+ print(
+ "deb-src [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] https://snapshot.ubuntu.com/ubuntu noble-proposed main universe restricted multiverse",
+ file=sources,
+ )
+
+ apt_pkg.config["Dir"] = tmpdir
+ apt_pkg.init()
+ sl = apt_pkg.SourceList()
+ sl.read_main_list()
+ cache = apt_pkg.Cache(None)
+ cache.update(apt.progress.base.AcquireProgress(), sl)
+
+ srcrecords = apt_pkg.SourceRecords()
+
+ proposed = set()
+ while srcrecords.step():
+ proposed.add(srcrecords.package + "/" + srcrecords.version)
+ return proposed
+
+
+def merge_queue(queue, proposed):
+ """Calculate the merged queue"""
+ out = []
+ for package, runs in itertools.groupby(
+ sorted(queue, key=lambda i: i[0]), lambda i: i[0]
+ ):
+ triggers = set()
+ for package, args in runs:
+ for trigger in args["triggers"]:
+ if trigger in proposed or trigger == "migration-reference/0":
+ triggers.add(trigger)
+ # uuid in here I believe.
+ test_uuid = args.get("uuid", str(uuid.uuid4()))
+
+ if "migration-reference/0" in triggers:
+ out.append(
+ (
+ package,
+ {"triggers": ["migration-reference/0"], "uuid": test_uuid},
+ )
+ )
+ triggers.remove("migration-reference/0")
+ if triggers:
+ out.append(
+ (
+ package,
+ {
+ "all-proposed": "1",
+ "triggers": sorted(triggers),
+ "uuid": test_uuid,
+ },
+ )
+ )
+ return out
+
+
+def amqp_setup():
+ cfg = configparser.ConfigParser()
+ with open("/home/ubuntu/autopkgtest-cloud.conf", "r") as f:
+ cfg.read_file(f)
+ amqp_url = cfg["amqp"]["uri"]
+ url_parts = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
+ amqp_con = amqp.Connection(
+ url_parts.hostname,
+ userid=url_parts.username,
+ password=url_parts.password,
+ )
+ return amqp_con
+
+
+def clear_queue(amqp_con: amqp.Connection, queue_name: str):
+ with amqp_con.channel() as ch:
+ while True:
+ r = ch.basic_get(queue_name)
+ if r is None:
+ return
+ ch.basic_ack(r.delivery_tag)
+
+
+def queue_a_test(
+ amqp_message: str, queue_name: str, amqp_con: amqp.Connection
+):
+ with amqp_con.channel() as ch:
+ ch.basic_publish(
+ amqp.Message(amqp_message, delivery_mode=2),
+ routing_key=queue_name,
+ )
+
+
+def main():
+ logging.basicConfig(level=logging.INFO)
+ logging.info("Starting queue merging...")
+ # just needs logging.
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--dry-run", action="store_true", help="Dry run")
+ args = parser.parse_args()
+ with open("/var/lib/cache-amqp/queued.json", "r") as f:
+ all_queue_info = json.load(f)
+ # we're ignoring upstream requests for this.
+ contexts = ["huge", "ppa", "ubuntu"]
+ arches = all_queue_info.get("arches")
+ releases = all_queue_info.get("releases")
+ proposed_packages = get_proposed_packages()
+ amqp_con = amqp_setup()
+ for context in contexts:
+ for release in releases:
+ for arch in arches:
+ queue_name = (
+ f"debci-{context}-{release}-{arch}"
+ if context != "ubuntu"
+ else f"debci-{release}-{arch}"
+ )
+ individual_queue = get_queue(
+ release, arch, context, all_queue_info
+ )
+ merged_queue = merge_queue(individual_queue, proposed_packages)
+ # We need to empty the specified queue here.
+ if args.dry_run:
+ logging.info(
+ f"Would empty {len(individual_queue)} items from {queue_name}"
+ )
+ logging.info(
+ f"Would submit {len(merged_queue)} items to {queue_name}"
+ )
+ else:
+ logging.info(
+ f"Emptying {len(individual_queue)} items from {queue_name}"
+ )
+ clear_queue(amqp_con, queue_name)
+ # Add the pruned queue
+ logging.info(
+ f"Adding {len(merged_queue)} items to {queue_name}"
+ )
+ for pkg, run_args in merged_queue:
+ message_params = run_args
+ message_params["submit-time"] = datetime.strftime(
+ datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
+ )
+ amqp_message = f"{pkg}\n{json.dumps(message_params)}"
+ if args.dry_run:
+ print(f"Would submit {amqp_message} to {queue_name}")
+ else:
+ queue_a_test(amqp_message, queue_name, amqp_con)
+
+
+if __name__ == "__main__":
+ main()
Follow ups