← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master

 

Skia has proposed merging ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~hyask/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/473908

Switch the worker part to use `python3-amqp` instead of `python3-amqplib`. Take the occasion to modernize some tools.
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
index a6f7f09..d70dd69 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
@@ -1,35 +1,54 @@
 #!/usr/bin/python3
 # Filter out AMQP requests that match a given regex
 
+import argparse
 import configparser
 import logging
-import optparse  # pylint: disable=deprecated-module
+import os
 import re
 import sys
 import time
-import urllib.parse
 
-import amqplib.client_0_8 as amqp
+import amqp
 import distro_info
 
 
-def filter_amqp(options, host, queue_name, regex):
-    url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
-    filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL)
-    amqp_con = amqp.Connection(
-        url_parts.hostname,
-        userid=url_parts.username,
-        password=url_parts.password,
-    )
-    ch = amqp_con.channel()
+def get_amqp_channel():
+    try:
+        cp = configparser.ConfigParser()
+        with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+            cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+        amqp_con = amqp.Connection(
+            cp["rabbit"]["RABBIT_HOST"],
+            cp["rabbit"]["RABBIT_USER"],
+            cp["rabbit"]["RABBIT_PASSWORD"],
+        )
+    except FileNotFoundError:
+        amqp_con = amqp.Connection(
+            os.environ["RABBIT_HOST"],
+            userid=os.environ["RABBIT_USER"],
+            password=os.environ["RABBIT_PASSWORD"],
+        )
+    amqp_con.connect()
+    return amqp_con.channel()
+
+
+def filter_amqp(options, queue_name, regex):
     num_items_deleted = 0
+    filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL)
+
+    channel = get_amqp_channel()
 
     while True:
-        r = ch.basic_get(queue_name)
+        try:
+            r = channel.basic_get(queue_name)
+        except amqp.NotFound:
+            logging.warning(f"Queue {queue_name} not found")
+            return None
         if r is None:
-            logging.debug("r is none, exiting")
-            ch.close()
-            amqp_con.close()
+            logging.info(
+                "Message empty, we probably reached the end of the queue"
+            )
             break
         if isinstance(r.body, str):
             body = r.body.encode("UTF-8")
@@ -45,7 +64,7 @@ def filter_amqp(options, host, queue_name, regex):
                 logging.info("queue item: %s (would delete)", body)
             else:
                 logging.info("queue item: %s (deleting)", body)
-                ch.basic_ack(r.delivery_tag)
+                channel.basic_ack(r.delivery_tag)
                 num_items_deleted += 1
     return num_items_deleted
 
@@ -69,25 +88,31 @@ def generate_queue_names():
 
 
 def main():
-    parser = optparse.OptionParser(
-        usage="usage: %prog [options] queue_name regex\n"
-        "Pass `all` for queue_name to filter all queues"
+    parser = argparse.ArgumentParser(
+        description="""Filter queue based on a regex
+
+This script can be used whenever a new upload happens, obsoleting a previous
+one, and that previous upload still had a lot of tests scheduled. To avoid
+processing useless jobs, the queue can be filtered on the trigger of the
+obsolete upload.
+""",
+        formatter_class=argparse.RawTextHelpFormatter,
     )
-    parser.add_option(
+    parser.add_argument(
         "-n",
         "--dry-run",
         default=False,
         action="store_true",
         help="only show the operations that would be performed",
     )
-    parser.add_option(
+    parser.add_argument(
         "-v",
         "--verbose",
         default=False,
         action="store_true",
         help="additionally show queue items that are not removed",
     )
-    parser.add_option(
+    parser.add_argument(
         "-a",
         "--all-items-in-queue",
         default=False,
@@ -97,25 +122,21 @@ def main():
             "When using this option, the provided regex will be ignored."
         ),
     )
-    cp = configparser.ConfigParser()
-    with open("/home/ubuntu/rabbitmq.cred", "r") as f:
-        cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
-    creds = "amqp://%s:%s@%s" % (
-        cp["rabbit"]["RABBIT_USER"],
-        cp["rabbit"]["RABBIT_PASSWORD"],
-        cp["rabbit"]["RABBIT_HOST"],
+    parser.add_argument(
+        "queue_name",
+        help="The name of the queue to filter. `all` is a valid value.",
+    )
+    parser.add_argument(
+        "regex", help="The regex with which to filter the queue"
     )
 
-    opts, args = parser.parse_args()
+    opts = parser.parse_args()
 
     logging.basicConfig(
         level=logging.DEBUG if opts.verbose else logging.INFO,
         format="%(asctime)s - %(message)s",
     )
 
-    if len(args) != 2:
-        parser.error("Need to specify queue name and regex")
-
     if opts.all_items_in_queue:
         print("""Do you really want to flush this queue? [yN]""", end="")
         sys.stdout.flush()
@@ -123,16 +144,25 @@ def main():
         if not response.strip().lower().startswith("y"):
             print("""Exiting""")
             sys.exit(1)
-    queues = [args[0]] if args[0] != "all" else generate_queue_names()
+    queues = (
+        [opts.queue_name]
+        if opts.queue_name != "all"
+        else generate_queue_names()
+    )
 
-    deletion_count_history = []
     for this_queue in queues:
+        deletion_count_history = []
         while True:
-            num_deleted = filter_amqp(opts, creds, this_queue, args[1])
+            num_deleted = filter_amqp(opts, this_queue, opts.regex)
+            if num_deleted is None:
+                logging.info("Skipping")
+                break
             deletion_count_history.append(num_deleted)
             if opts.dry_run:
                 break
-            if all([x == 0 for x in deletion_count_history[-5:]]):
+            if len(deletion_count_history) >= 5 and all(
+                [x == 0 for x in deletion_count_history[-5:]]
+            ):
                 logging.info(
                     "Finished filtering queue objects, run history:\n%s"
                     % "\n".join(str(x) for x in deletion_count_history)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
index c255239..e7441cc 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
@@ -1,14 +1,14 @@
 #!/usr/bin/python3
 # Filter out all but the latest request for a given upstream PR
 
+import argparse
+import configparser
 import json
 import logging
-import optparse  # pylint: disable=deprecated-module
 import os
-import urllib.parse
 from collections import defaultdict
 
-import amqplib.client_0_8 as amqp
+import amqp
 import dateutil.parser
 import distro_info
 
@@ -19,13 +19,23 @@ SUPPORTED_UBUNTU_RELEASES = sorted(
 )
 
 
-def filter_amqp(options, host):
-    url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
-    amqp_con = amqp.Connection(
-        url_parts.hostname,
-        userid=url_parts.username,
-        password=url_parts.password,
-    )
+def filter_amqp(options):
+    try:
+        cp = configparser.ConfigParser()
+        with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+            cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+        amqp_con = amqp.Connection(
+            cp["rabbit"]["RABBIT_HOST"],
+            cp["rabbit"]["RABBIT_USER"],
+            cp["rabbit"]["RABBIT_PASSWORD"],
+        )
+    except FileNotFoundError:
+        amqp_con = amqp.Connection(
+            os.environ["RABBIT_HOST"],
+            userid=os.environ["RABBIT_USER"],
+            password=os.environ["RABBIT_PASSWORD"],
+        )
+    amqp_con.connect()
     dry_run = "[dry-run] " if options.dry_run else ""
 
     queues = (
@@ -40,10 +50,7 @@ def filter_amqp(options, host):
         while True:
             try:
                 r = ch.basic_get(queue_name)
-            except amqp.AMQPChannelException as e:
-                (code, _, _, _) = e.args
-                if code != 404:
-                    raise
+            except amqp.NotFound:
                 logging.debug(f"No such queue {queue_name}")
                 break
             if r is None:
@@ -52,7 +59,7 @@ def filter_amqp(options, host):
                 body = r.body.decode("UTF-8")
             else:
                 body = r.body
-            (pkg, params) = body.split(" ", 1)
+            (pkg, params) = body.split("\n", 1)
             params_j = json.loads(params)
             submit_time = dateutil.parser.parse(params_j["submit-time"])
             pr = [
@@ -80,37 +87,35 @@ def filter_amqp(options, host):
 
 
 def main():
-    parser = optparse.OptionParser(
-        usage="usage: %prog [options] amqp://user:pass@host queue_name regex"
+    parser = argparse.ArgumentParser(
+        description="""Deduplicates jobs in the upstream queue.
+
+The upstream integration is different than regular jobs pushed by Britney.
+If a developer pushes two times in a row on a pull request, then two test
+requests get queued. This script is here to deduplicate those requests.
+""",
+        formatter_class=argparse.RawTextHelpFormatter,
     )
-    parser.add_option(
-        "-n",
+    parser.add_argument(
         "--dry-run",
-        default=False,
         action="store_true",
         help="only show the operations that would be performed",
     )
-    parser.add_option(
+    parser.add_argument(
         "-v",
         "--verbose",
-        default=False,
         action="store_true",
         help="additionally show queue items that are not removed",
     )
 
-    # pylint: disable=unused-variable
-    opts, args = parser.parse_args()
+    opts = parser.parse_args()
 
     logging.basicConfig(
         level=logging.DEBUG if opts.verbose else logging.INFO,
         format="%(asctime)s - %(message)s",
     )
 
-    user = os.environ["RABBIT_USER"]
-    password = os.environ["RABBIT_PASSWORD"]
-    host = os.environ["RABBIT_HOST"]
-    uri = f"amqp://{user}:{password}@{host}"
-    filter_amqp(opts, uri)
+    filter_amqp(opts)
 
 
 if __name__ == "__main__":
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
index cdda67a..fbd2092 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
@@ -2,10 +2,11 @@
 
 import argparse
 import configparser
+import os
 import re
 import sys
 
-import amqplib.client_0_8 as amqp
+import amqp
 
 
 def parse_args():
@@ -45,14 +46,22 @@ You can alter the queue messages however you please, but be careful :)
 
 def main():
     args = parse_args()
-    cp = configparser.ConfigParser()
-    with open("/home/ubuntu/rabbitmq.cred", "r") as f:
-        cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
-    amqp_con = amqp.Connection(
-        cp["rabbit"]["RABBIT_HOST"],
-        cp["rabbit"]["RABBIT_USER"],
-        cp["rabbit"]["RABBIT_PASSWORD"],
-    )
+    try:
+        cp = configparser.ConfigParser()
+        with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+            cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+        amqp_con = amqp.Connection(
+            cp["rabbit"]["RABBIT_HOST"],
+            cp["rabbit"]["RABBIT_USER"],
+            cp["rabbit"]["RABBIT_PASSWORD"],
+        )
+    except FileNotFoundError:
+        amqp_con = amqp.Connection(
+            os.environ["RABBIT_HOST"],
+            userid=os.environ["RABBIT_USER"],
+            password=os.environ["RABBIT_PASSWORD"],
+        )
+    amqp_con.connect()
     if args.regex is not None:
         filter_re = re.compile(args.regex.encode("UTF-8"), re.DOTALL)
     with amqp_con.channel() as ch:
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
index 80f70e2..10d94a1 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
@@ -3,9 +3,10 @@
 import argparse
 import ast
 import configparser
+import os
 import sys
 
-import amqplib.client_0_8 as amqp
+import amqp
 
 
 def parse_args():
@@ -68,14 +69,22 @@ def main():
                     file=sys.stderr,
                 )
 
-    cp = configparser.ConfigParser()
-    with open("/home/ubuntu/rabbitmq.cred", "r") as f:
-        cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
-    amqp_con = amqp.Connection(
-        cp["rabbit"]["RABBIT_HOST"],
-        cp["rabbit"]["RABBIT_USER"],
-        cp["rabbit"]["RABBIT_PASSWORD"],
-    )
+    try:
+        cp = configparser.ConfigParser()
+        with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+            cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+        amqp_con = amqp.Connection(
+            cp["rabbit"]["RABBIT_HOST"],
+            cp["rabbit"]["RABBIT_USER"],
+            cp["rabbit"]["RABBIT_PASSWORD"],
+        )
+    except FileNotFoundError:
+        amqp_con = amqp.Connection(
+            os.environ["RABBIT_HOST"],
+            userid=os.environ["RABBIT_USER"],
+            password=os.environ["RABBIT_PASSWORD"],
+        )
+    amqp_con.connect()
     ch = amqp_con.channel()
     queue_name = args.queue_name
     if args.message:
@@ -103,10 +112,7 @@ def main():
                 continue
             try:
                 push(message, queue_name, ch)
-            except (
-                amqp.AMQPChannelException,
-                amqp.AMQPConnectionException,
-            ) as _:
+            except amqp.AMQPError:
                 print(
                     f"Pushing message `{message}` to queue {queue_name} failed.",
                     file=sys.stderr,
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
index 9383aa2..06ab51b 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
@@ -9,9 +9,9 @@ import os
 import sys
 import urllib.parse
 import uuid
-from datetime import datetime
+from datetime import datetime, timezone
 
-import amqplib.client_0_8 as amqp
+import amqp
 
 my_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 
@@ -175,7 +175,7 @@ if __name__ == "__main__":
     except KeyError:
         pass
     params["submit-time"] = datetime.strftime(
-        datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
+        datetime.now().astimezone(timezone.utc), "%Y-%m-%d %H:%M:%S%z"
     )
     params["uuid"] = str(uuid.uuid4())
     params = "\n" + json.dumps(params)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
index 442e914..aaca3d2 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
@@ -15,7 +15,7 @@ import os
 import subprocess
 import sys
 
-import amqplib.client_0_8 as amqp
+import amqp
 
 
 @contextlib.contextmanager
@@ -33,15 +33,14 @@ def amqp_lock(name):
         userid=os.environ["RABBIT_USER"],
         password=os.environ["RABBIT_PASSWORD"],
     )
+    amqp_con.connect()
     channel = amqp_con.channel()
-    channel.queue_declare(
-        name, arguments={"args.queue.x-single-active-consumer": True}
-    )
+    channel.queue_declare(name, arguments={"x-single-active-consumer": True})
     channel.basic_publish(amqp.Message(""), routing_key=name)
     consumer_tag = channel.basic_consume(queue=name, callback=callback)
 
     while channel.callbacks and not callback.called:
-        channel.wait()
+        amqp_con.drain_events()
 
     try:
         yield
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index 09ade16..893f860 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -2,7 +2,7 @@
 # autopkgtest cloud worker
 # Author: Martin Pitt <martin.pitt@xxxxxxxxxx>
 #
-# Requirements: python3-amqplib python3-swiftclient python3-influxdb
+# Requirements: python3-amqp python3-swiftclient python3-influxdb
 # Requirements for running autopkgtest from git: python3-debian libdpkg-perl
 #
 # pylint: disable=too-many-lines,line-too-long
@@ -27,7 +27,7 @@ import urllib.request
 import uuid
 from urllib.error import HTTPError
 
-import amqplib.client_0_8 as amqp
+import amqp
 import distro_info
 import novaclient.client
 import novaclient.exceptions
@@ -562,7 +562,6 @@ def call_autopkgtest(
     # set up status AMQP exchange
     global amqp_con
     status_amqp = amqp_con.channel()
-    status_amqp.access_request("/data", active=True, read=False, write=True)
     status_amqp.exchange_declare(
         status_exchange_name, "fanout", durable=False, auto_delete=True
     )
@@ -1576,9 +1575,6 @@ def request(msg):
 
     global amqp_con
     complete_amqp = amqp_con.channel()
-    complete_amqp.access_request(
-        "/complete", active=True, read=False, write=True
-    )
     complete_amqp.exchange_declare(
         complete_exchange_name, "fanout", durable=True, auto_delete=False
     )
@@ -1629,6 +1625,7 @@ def amqp_connect(cfg, callback):
         password=os.environ["RABBIT_PASSWORD"],
         confirm_publish=True,
     )
+    amqp_con.connect()
     queue = amqp_con.channel()
     # avoids greedy grabbing of the entire queue while being too busy
     queue.basic_qos(0, 1, True)
@@ -1665,7 +1662,7 @@ def amqp_connect(cfg, callback):
         queue.queue_declare(queue_name, durable=True, auto_delete=False)
         queue.basic_consume(queue=queue_name, callback=request)
 
-    return queue
+    return amqp_con
 
 
 def main():
@@ -1740,13 +1737,13 @@ def main():
         swiftclient.Connection(**swift_creds).close()
 
     # connect to AMQP queues
-    queue = amqp_connect(cfg, request)
+    amqp_con = amqp_connect(cfg, request)
 
     # process queues forever
     try:
         while exit_requested is None:
             logging.info("Waiting for and processing AMQP requests")
-            queue.wait()
+            amqp_con.drain_events()
     except IOError:
         if exit_requested is None:
             raise
diff --git a/charms/focal/autopkgtest-cloud-worker/layer.yaml b/charms/focal/autopkgtest-cloud-worker/layer.yaml
index d8f8c0a..fd41b99 100644
--- a/charms/focal/autopkgtest-cloud-worker/layer.yaml
+++ b/charms/focal/autopkgtest-cloud-worker/layer.yaml
@@ -18,7 +18,7 @@ options:
       - libdpkg-perl
       - lxd-client
       - make
-      - python3-amqplib
+      - python3-amqp
       - python3-debian
       - python3-distro-info
       - python3-glanceclient