← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master

 

Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/463146
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:auto-queue-cleanup into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/queue-merger.service b/charms/focal/autopkgtest-web/units/queue-merger.service
new file mode 100644
index 0000000..5a89a65
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/queue-merger.service
@@ -0,0 +1,7 @@
+[Unit]
+Description=Remove obsolete queue entries.
+
+[Service]
+Type=oneshot
+User=ubuntu
+ExecStart=/home/ubuntu/webcontrol/queue-merger
diff --git a/charms/focal/autopkgtest-web/units/queue-merger.timer b/charms/focal/autopkgtest-web/units/queue-merger.timer
new file mode 100644
index 0000000..3f405cf
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/queue-merger.timer
@@ -0,0 +1,8 @@
+[Unit]
+Description=Remove obsolete queue entries.
+
+[Timer]
+OnCalendar=*:0/5
+
+[Install]
+WantedBy=autopkgtest-web.target
diff --git a/charms/focal/autopkgtest-web/webcontrol/queue-merger b/charms/focal/autopkgtest-web/webcontrol/queue-merger
new file mode 100755
index 0000000..96395d7
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/queue-merger
@@ -0,0 +1,184 @@
+#!/usr/bin/python3
+# pylint: disable=c-extension-no-member
+
+import argparse
+import configparser
+import itertools
+import json
+import logging
+import os
+import urllib.parse
+import uuid
+from datetime import datetime
+
+import amqplib.client_0_8 as amqp
+import apt.progress.base
+import apt_pkg
+
+
+def get_queue(release, arch, context, all_queues):
+    """Build a single queue of package, args tuples."""
+    try:
+        queue = [
+            (i.split()[0], json.loads(i.split(None, 1)[1]))
+            for i in all_queues["queues"][context][release][arch]["requests"]
+        ]
+    except KeyError as _:
+        return []
+    return queue
+
+
+def get_proposed_packages():
+    """Get all valid triggers in proposed, returns set of package/version strings."""
+    tmpdir = "merge-requests.tmp"
+    os.makedirs(os.path.join(tmpdir, "etc/apt"), exist_ok=True)
+    os.makedirs(os.path.join(tmpdir, "var/lib/apt"), exist_ok=True)
+    with open(os.path.join(tmpdir, "etc/apt/sources.list"), "w") as sources:
+        # pylint: disable=line-too-long
+        print(
+            "deb-src [signed-by=/usr/share/keyrings/ubuntu-archive-keyring.gpg] https://snapshot.ubuntu.com/ubuntu noble-proposed main universe restricted multiverse",
+            file=sources,
+        )
+
+    apt_pkg.config["Dir"] = tmpdir
+    apt_pkg.init()
+    sl = apt_pkg.SourceList()
+    sl.read_main_list()
+    cache = apt_pkg.Cache(None)
+    cache.update(apt.progress.base.AcquireProgress(), sl)
+
+    srcrecords = apt_pkg.SourceRecords()
+
+    proposed = set()
+    while srcrecords.step():
+        proposed.add(srcrecords.package + "/" + srcrecords.version)
+    return proposed
+
+
+def merge_queue(queue, proposed):
+    """Calculate the merged queue"""
+    out = []
+    for package, runs in itertools.groupby(
+        sorted(queue, key=lambda i: i[0]), lambda i: i[0]
+    ):
+        triggers = set()
+        for package, args in runs:
+            for trigger in args["triggers"]:
+                if trigger in proposed or trigger == "migration-reference/0":
+                    triggers.add(trigger)
+            # uuid in here I believe.
+            test_uuid = args.get("uuid", str(uuid.uuid4()))
+
+        if "migration-reference/0" in triggers:
+            out.append(
+                (
+                    package,
+                    {"triggers": ["migration-reference/0"], "uuid": test_uuid},
+                )
+            )
+            triggers.remove("migration-reference/0")
+        if triggers:
+            out.append(
+                (
+                    package,
+                    {
+                        "all-proposed": "1",
+                        "triggers": sorted(triggers),
+                        "uuid": test_uuid,
+                    },
+                )
+            )
+    return out
+
+
+def amqp_setup():
+    cfg = configparser.ConfigParser()
+    with open("/home/ubuntu/autopkgtest-cloud.conf", "r") as f:
+        cfg.read_file(f)
+    amqp_url = cfg["amqp"]["uri"]
+    url_parts = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
+    amqp_con = amqp.Connection(
+        url_parts.hostname,
+        userid=url_parts.username,
+        password=url_parts.password,
+    )
+    return amqp_con
+
+
+def clear_queue(amqp_con: amqp.Connection, queue_name: str):
+    with amqp_con.channel() as ch:
+        while True:
+            r = ch.basic_get(queue_name)
+            if r is None:
+                return
+            ch.basic_ack(r.delivery_tag)
+
+
+def queue_a_test(
+    amqp_message: str, queue_name: str, amqp_con: amqp.Connection
+):
+    with amqp_con.channel() as ch:
+        ch.basic_publish(
+            amqp.Message(amqp_message, delivery_mode=2),
+            routing_key=queue_name,
+        )
+
+
+def main():
+    logging.basicConfig(level=logging.INFO)
+    logging.info("Starting queue merging...")
+    # just needs logging.
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--dry-run", action="store_true", help="Dry run")
+    args = parser.parse_args()
+    with open("/var/lib/cache-amqp/queued.json", "r") as f:
+        all_queue_info = json.load(f)
+    # we're ignoring upstream requests for this.
+    contexts = ["huge", "ppa", "ubuntu"]
+    arches = all_queue_info.get("arches")
+    releases = all_queue_info.get("releases")
+    proposed_packages = get_proposed_packages()
+    amqp_con = amqp_setup()
+    for context in contexts:
+        for release in releases:
+            for arch in arches:
+                queue_name = (
+                    f"debci-{context}-{release}-{arch}"
+                    if context != "ubuntu"
+                    else f"debci-{release}-{arch}"
+                )
+                individual_queue = get_queue(
+                    release, arch, context, all_queue_info
+                )
+                merged_queue = merge_queue(individual_queue, proposed_packages)
+                # We need to empty the specified queue here.
+                if args.dry_run:
+                    logging.info(
+                        f"Would empty {len(individual_queue)} items from {queue_name}"
+                    )
+                    logging.info(
+                        f"Would submit {len(merged_queue)} items to {queue_name}"
+                    )
+                else:
+                    logging.info(
+                        f"Emptying {len(individual_queue)} items from {queue_name}"
+                    )
+                    clear_queue(amqp_con, queue_name)
+                # Add the pruned queue
+                logging.info(
+                    f"Adding {len(merged_queue)} items to {queue_name}"
+                )
+                for pkg, run_args in merged_queue:
+                    message_params = run_args
+                    message_params["submit-time"] = datetime.strftime(
+                        datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
+                    )
+                    amqp_message = f"{pkg}\n{json.dumps(message_params)}"
+                    if args.dry_run:
+                        print(f"Would submit {amqp_message} to {queue_name}")
+                    else:
+                        queue_a_test(amqp_message, queue_name, amqp_con)
+
+
+if __name__ == "__main__":
+    main()

Follow ups