canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #05495
[Merge] ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master
Skia has proposed merging ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.
Requested reviews:
Canonical's Ubuntu QA (canonical-ubuntu-qa)
For more details, see:
https://code.launchpad.net/~hyask/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/473908
Switch the worker part to use `python3-amqp` instead of `python3-amqplib`. Take the occasion to modernize some tools.
--
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~hyask/autopkgtest-cloud:skia/switch_worker_to_amqp into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
index a6f7f09..d70dd69 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp
@@ -1,35 +1,54 @@
#!/usr/bin/python3
# Filter out AMQP requests that match a given regex
+import argparse
import configparser
import logging
-import optparse # pylint: disable=deprecated-module
+import os
import re
import sys
import time
-import urllib.parse
-import amqplib.client_0_8 as amqp
+import amqp
import distro_info
-def filter_amqp(options, host, queue_name, regex):
- url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
- filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL)
- amqp_con = amqp.Connection(
- url_parts.hostname,
- userid=url_parts.username,
- password=url_parts.password,
- )
- ch = amqp_con.channel()
+def get_amqp_channel():
+ try:
+ cp = configparser.ConfigParser()
+ with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+ cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+ amqp_con = amqp.Connection(
+ cp["rabbit"]["RABBIT_HOST"],
+ cp["rabbit"]["RABBIT_USER"],
+ cp["rabbit"]["RABBIT_PASSWORD"],
+ )
+ except FileNotFoundError:
+ amqp_con = amqp.Connection(
+ os.environ["RABBIT_HOST"],
+ userid=os.environ["RABBIT_USER"],
+ password=os.environ["RABBIT_PASSWORD"],
+ )
+ amqp_con.connect()
+ return amqp_con.channel()
+
+
+def filter_amqp(options, queue_name, regex):
num_items_deleted = 0
+ filter_re = re.compile(regex.encode("UTF-8"), re.DOTALL)
+
+ channel = get_amqp_channel()
while True:
- r = ch.basic_get(queue_name)
+ try:
+ r = channel.basic_get(queue_name)
+ except amqp.NotFound:
+ logging.warning(f"Queue {queue_name} not found")
+ return None
if r is None:
- logging.debug("r is none, exiting")
- ch.close()
- amqp_con.close()
+ logging.info(
+ "Message empty, we probably reached the end of the queue"
+ )
break
if isinstance(r.body, str):
body = r.body.encode("UTF-8")
@@ -45,7 +64,7 @@ def filter_amqp(options, host, queue_name, regex):
logging.info("queue item: %s (would delete)", body)
else:
logging.info("queue item: %s (deleting)", body)
- ch.basic_ack(r.delivery_tag)
+ channel.basic_ack(r.delivery_tag)
num_items_deleted += 1
return num_items_deleted
@@ -69,25 +88,31 @@ def generate_queue_names():
def main():
- parser = optparse.OptionParser(
- usage="usage: %prog [options] queue_name regex\n"
- "Pass `all` for queue_name to filter all queues"
+ parser = argparse.ArgumentParser(
+ description="""Filter queue based on a regex
+
+This script can be used whenever a new upload happens, obsoleting a previous
+one, and that previous upload still had a lot of tests scheduled. To avoid
+processing useless jobs, the queue can be filtered on the trigger of the
+obsolete upload.
+""",
+ formatter_class=argparse.RawTextHelpFormatter,
)
- parser.add_option(
+ parser.add_argument(
"-n",
"--dry-run",
default=False,
action="store_true",
help="only show the operations that would be performed",
)
- parser.add_option(
+ parser.add_argument(
"-v",
"--verbose",
default=False,
action="store_true",
help="additionally show queue items that are not removed",
)
- parser.add_option(
+ parser.add_argument(
"-a",
"--all-items-in-queue",
default=False,
@@ -97,25 +122,21 @@ def main():
"When using this option, the provided regex will be ignored."
),
)
- cp = configparser.ConfigParser()
- with open("/home/ubuntu/rabbitmq.cred", "r") as f:
- cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
- creds = "amqp://%s:%s@%s" % (
- cp["rabbit"]["RABBIT_USER"],
- cp["rabbit"]["RABBIT_PASSWORD"],
- cp["rabbit"]["RABBIT_HOST"],
+ parser.add_argument(
+ "queue_name",
+ help="The name of the queue to filter. `all` is a valid value.",
+ )
+ parser.add_argument(
+ "regex", help="The regex with which to filter the queue"
)
- opts, args = parser.parse_args()
+ opts = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if opts.verbose else logging.INFO,
format="%(asctime)s - %(message)s",
)
- if len(args) != 2:
- parser.error("Need to specify queue name and regex")
-
if opts.all_items_in_queue:
print("""Do you really want to flush this queue? [yN]""", end="")
sys.stdout.flush()
@@ -123,16 +144,25 @@ def main():
if not response.strip().lower().startswith("y"):
print("""Exiting""")
sys.exit(1)
- queues = [args[0]] if args[0] != "all" else generate_queue_names()
+ queues = (
+ [opts.queue_name]
+ if opts.queue_name != "all"
+ else generate_queue_names()
+ )
- deletion_count_history = []
for this_queue in queues:
+ deletion_count_history = []
while True:
- num_deleted = filter_amqp(opts, creds, this_queue, args[1])
+ num_deleted = filter_amqp(opts, this_queue, opts.regex)
+ if num_deleted is None:
+ logging.info("Skipping")
+ break
deletion_count_history.append(num_deleted)
if opts.dry_run:
break
- if all([x == 0 for x in deletion_count_history[-5:]]):
+ if len(deletion_count_history) >= 5 and all(
+ [x == 0 for x in deletion_count_history[-5:]]
+ ):
logging.info(
"Finished filtering queue objects, run history:\n%s"
% "\n".join(str(x) for x in deletion_count_history)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
index c255239..e7441cc 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/filter-amqp-dupes-upstream
@@ -1,14 +1,14 @@
#!/usr/bin/python3
# Filter out all but the latest request for a given upstream PR
+import argparse
+import configparser
import json
import logging
-import optparse # pylint: disable=deprecated-module
import os
-import urllib.parse
from collections import defaultdict
-import amqplib.client_0_8 as amqp
+import amqp
import dateutil.parser
import distro_info
@@ -19,13 +19,23 @@ SUPPORTED_UBUNTU_RELEASES = sorted(
)
-def filter_amqp(options, host):
- url_parts = urllib.parse.urlsplit(host, allow_fragments=False)
- amqp_con = amqp.Connection(
- url_parts.hostname,
- userid=url_parts.username,
- password=url_parts.password,
- )
+def filter_amqp(options):
+ try:
+ cp = configparser.ConfigParser()
+ with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+ cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+ amqp_con = amqp.Connection(
+ cp["rabbit"]["RABBIT_HOST"],
+ cp["rabbit"]["RABBIT_USER"],
+ cp["rabbit"]["RABBIT_PASSWORD"],
+ )
+ except FileNotFoundError:
+ amqp_con = amqp.Connection(
+ os.environ["RABBIT_HOST"],
+ userid=os.environ["RABBIT_USER"],
+ password=os.environ["RABBIT_PASSWORD"],
+ )
+ amqp_con.connect()
dry_run = "[dry-run] " if options.dry_run else ""
queues = (
@@ -40,10 +50,7 @@ def filter_amqp(options, host):
while True:
try:
r = ch.basic_get(queue_name)
- except amqp.AMQPChannelException as e:
- (code, _, _, _) = e.args
- if code != 404:
- raise
+ except amqp.NotFound:
logging.debug(f"No such queue {queue_name}")
break
if r is None:
@@ -52,7 +59,7 @@ def filter_amqp(options, host):
body = r.body.decode("UTF-8")
else:
body = r.body
- (pkg, params) = body.split(" ", 1)
+ (pkg, params) = body.split("\n", 1)
params_j = json.loads(params)
submit_time = dateutil.parser.parse(params_j["submit-time"])
pr = [
@@ -80,37 +87,35 @@ def filter_amqp(options, host):
def main():
- parser = optparse.OptionParser(
- usage="usage: %prog [options] amqp://user:pass@host queue_name regex"
+ parser = argparse.ArgumentParser(
+ description="""Deduplicates jobs in the upstream queue.
+
+The upstream integration is different than regular jobs pushed by Britney.
+If a developer pushes two times in a row on a pull request, then two test
+requests get queued. This script is here to deduplicate those requests.
+""",
+ formatter_class=argparse.RawTextHelpFormatter,
)
- parser.add_option(
- "-n",
+ parser.add_argument(
"--dry-run",
- default=False,
action="store_true",
help="only show the operations that would be performed",
)
- parser.add_option(
+ parser.add_argument(
"-v",
"--verbose",
- default=False,
action="store_true",
help="additionally show queue items that are not removed",
)
- # pylint: disable=unused-variable
- opts, args = parser.parse_args()
+ opts = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if opts.verbose else logging.INFO,
format="%(asctime)s - %(message)s",
)
- user = os.environ["RABBIT_USER"]
- password = os.environ["RABBIT_PASSWORD"]
- host = os.environ["RABBIT_HOST"]
- uri = f"amqp://{user}:{password}@{host}"
- filter_amqp(opts, uri)
+ filter_amqp(opts)
if __name__ == "__main__":
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
index cdda67a..fbd2092 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/pull-amqp
@@ -2,10 +2,11 @@
import argparse
import configparser
+import os
import re
import sys
-import amqplib.client_0_8 as amqp
+import amqp
def parse_args():
@@ -45,14 +46,22 @@ You can alter the queue messages however you please, but be careful :)
def main():
args = parse_args()
- cp = configparser.ConfigParser()
- with open("/home/ubuntu/rabbitmq.cred", "r") as f:
- cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
- amqp_con = amqp.Connection(
- cp["rabbit"]["RABBIT_HOST"],
- cp["rabbit"]["RABBIT_USER"],
- cp["rabbit"]["RABBIT_PASSWORD"],
- )
+ try:
+ cp = configparser.ConfigParser()
+ with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+ cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+ amqp_con = amqp.Connection(
+ cp["rabbit"]["RABBIT_HOST"],
+ cp["rabbit"]["RABBIT_USER"],
+ cp["rabbit"]["RABBIT_PASSWORD"],
+ )
+ except FileNotFoundError:
+ amqp_con = amqp.Connection(
+ os.environ["RABBIT_HOST"],
+ userid=os.environ["RABBIT_USER"],
+ password=os.environ["RABBIT_PASSWORD"],
+ )
+ amqp_con.connect()
if args.regex is not None:
filter_re = re.compile(args.regex.encode("UTF-8"), re.DOTALL)
with amqp_con.channel() as ch:
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
index 80f70e2..10d94a1 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/push-amqp
@@ -3,9 +3,10 @@
import argparse
import ast
import configparser
+import os
import sys
-import amqplib.client_0_8 as amqp
+import amqp
def parse_args():
@@ -68,14 +69,22 @@ def main():
file=sys.stderr,
)
- cp = configparser.ConfigParser()
- with open("/home/ubuntu/rabbitmq.cred", "r") as f:
- cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
- amqp_con = amqp.Connection(
- cp["rabbit"]["RABBIT_HOST"],
- cp["rabbit"]["RABBIT_USER"],
- cp["rabbit"]["RABBIT_PASSWORD"],
- )
+ try:
+ cp = configparser.ConfigParser()
+ with open("/home/ubuntu/rabbitmq.cred", "r") as f:
+ cp.read_string("[rabbit]\n" + f.read().replace('"', ""))
+ amqp_con = amqp.Connection(
+ cp["rabbit"]["RABBIT_HOST"],
+ cp["rabbit"]["RABBIT_USER"],
+ cp["rabbit"]["RABBIT_PASSWORD"],
+ )
+ except FileNotFoundError:
+ amqp_con = amqp.Connection(
+ os.environ["RABBIT_HOST"],
+ userid=os.environ["RABBIT_USER"],
+ password=os.environ["RABBIT_PASSWORD"],
+ )
+ amqp_con.connect()
ch = amqp_con.channel()
queue_name = args.queue_name
if args.message:
@@ -103,10 +112,7 @@ def main():
continue
try:
push(message, queue_name, ch)
- except (
- amqp.AMQPChannelException,
- amqp.AMQPConnectionException,
- ) as _:
+ except amqp.AMQPError:
print(
f"Pushing message `{message}` to queue {queue_name} failed.",
file=sys.stderr,
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
index 9383aa2..06ab51b 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/run-autopkgtest
@@ -9,9 +9,9 @@ import os
import sys
import urllib.parse
import uuid
-from datetime import datetime
+from datetime import datetime, timezone
-import amqplib.client_0_8 as amqp
+import amqp
my_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
@@ -175,7 +175,7 @@ if __name__ == "__main__":
except KeyError:
pass
params["submit-time"] = datetime.strftime(
- datetime.utcnow(), "%Y-%m-%d %H:%M:%S%z"
+ datetime.now().astimezone(timezone.utc), "%Y-%m-%d %H:%M:%S%z"
)
params["uuid"] = str(uuid.uuid4())
params = "\n" + json.dumps(params)
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
index 442e914..aaca3d2 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/with-distributed-lock
@@ -15,7 +15,7 @@ import os
import subprocess
import sys
-import amqplib.client_0_8 as amqp
+import amqp
@contextlib.contextmanager
@@ -33,15 +33,14 @@ def amqp_lock(name):
userid=os.environ["RABBIT_USER"],
password=os.environ["RABBIT_PASSWORD"],
)
+ amqp_con.connect()
channel = amqp_con.channel()
- channel.queue_declare(
- name, arguments={"args.queue.x-single-active-consumer": True}
- )
+ channel.queue_declare(name, arguments={"x-single-active-consumer": True})
channel.basic_publish(amqp.Message(""), routing_key=name)
consumer_tag = channel.basic_consume(queue=name, callback=callback)
while channel.callbacks and not callback.called:
- channel.wait()
+ amqp_con.drain_events()
try:
yield
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index 09ade16..893f860 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -2,7 +2,7 @@
# autopkgtest cloud worker
# Author: Martin Pitt <martin.pitt@xxxxxxxxxx>
#
-# Requirements: python3-amqplib python3-swiftclient python3-influxdb
+# Requirements: python3-amqp python3-swiftclient python3-influxdb
# Requirements for running autopkgtest from git: python3-debian libdpkg-perl
#
# pylint: disable=too-many-lines,line-too-long
@@ -27,7 +27,7 @@ import urllib.request
import uuid
from urllib.error import HTTPError
-import amqplib.client_0_8 as amqp
+import amqp
import distro_info
import novaclient.client
import novaclient.exceptions
@@ -562,7 +562,6 @@ def call_autopkgtest(
# set up status AMQP exchange
global amqp_con
status_amqp = amqp_con.channel()
- status_amqp.access_request("/data", active=True, read=False, write=True)
status_amqp.exchange_declare(
status_exchange_name, "fanout", durable=False, auto_delete=True
)
@@ -1576,9 +1575,6 @@ def request(msg):
global amqp_con
complete_amqp = amqp_con.channel()
- complete_amqp.access_request(
- "/complete", active=True, read=False, write=True
- )
complete_amqp.exchange_declare(
complete_exchange_name, "fanout", durable=True, auto_delete=False
)
@@ -1629,6 +1625,7 @@ def amqp_connect(cfg, callback):
password=os.environ["RABBIT_PASSWORD"],
confirm_publish=True,
)
+ amqp_con.connect()
queue = amqp_con.channel()
# avoids greedy grabbing of the entire queue while being too busy
queue.basic_qos(0, 1, True)
@@ -1665,7 +1662,7 @@ def amqp_connect(cfg, callback):
queue.queue_declare(queue_name, durable=True, auto_delete=False)
queue.basic_consume(queue=queue_name, callback=request)
- return queue
+ return amqp_con
def main():
@@ -1740,13 +1737,13 @@ def main():
swiftclient.Connection(**swift_creds).close()
# connect to AMQP queues
- queue = amqp_connect(cfg, request)
+ amqp_con = amqp_connect(cfg, request)
# process queues forever
try:
while exit_requested is None:
logging.info("Waiting for and processing AMQP requests")
- queue.wait()
+ amqp_con.drain_events()
except IOError:
if exit_requested is None:
raise
diff --git a/charms/focal/autopkgtest-cloud-worker/layer.yaml b/charms/focal/autopkgtest-cloud-worker/layer.yaml
index d8f8c0a..fd41b99 100644
--- a/charms/focal/autopkgtest-cloud-worker/layer.yaml
+++ b/charms/focal/autopkgtest-cloud-worker/layer.yaml
@@ -18,7 +18,7 @@ options:
- libdpkg-perl
- lxd-client
- make
- - python3-amqplib
+ - python3-amqp
- python3-debian
- python3-distro-info
- python3-glanceclient