canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03847
Re: [Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master
TODO (aside from inline comments):
- type all functions with docstrings and param explanations
- add the stop test hyperlink to all other pages which display running jobs
Diff comments:
> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..b1bc37e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,211 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +import requests
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> + "uuid",
> + "not-running-on",
> +]
> +NUM_WORKERS = 2
this should be stored as juju config option and preserved in worker conf file or something
> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> + amqp_con = amqp.Connection(
> + RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> + confirm_publish=True,
> + )
> + return amqp_con
> +
> +
> +def check_message(msg):
> + return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid(uuid):
> + try:
> + # get list of running processes
> + ps_aux_run = subprocess.run(
> + ["ps", "aux"],
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Filter the list for only 'runner' processes
> + runner_run = subprocess.run(
> + ["grep", "runner"],
> + input=ps_aux_run.stdout,
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Check all runner processes for the given uuid
> + # If this one fails, the test isn't running on this worker
> + uuid_run = subprocess.run(
> + ["grep", uuid],
> + input=runner_run.stdout,
> + capture_output=True,
> + check=True,
> + )
> + except subprocess.CalledProcessError as _:
> + # We hit this exception if the test with the given uuid
> + # isn't running on this cloud worker
> + return None
> + search_for_test_output = uuid_run.stdout
> + search_me = search_for_test_output.splitlines()
> + # We have to assert the length is 1 otherwise we'll only kill
> + # the first one in the list - which may be the incorrect one
> + # if there's two processes with same uuid - something is wrong!
> + assert len(search_me) == 1
> + line = search_me[0].decode("utf-8")
> + if uuid in line:
perhaps brittle parsing? idk I don't think so tho
> + line = line.split(" ")
> + line = [x for x in line if x]
> + pid = line[1]
> + return int(pid)
> +
> +
> +def place_message_in_queue(info: dict, amqp_con: amqp.Connection):
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(info), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def kill_process(pid: int, uuid: str):
> + # sends SIGUSR1 to worker
> + # This causes autopkgtest to exit with code -10
this is inaccurate, exit code doesn't need to be stated, but need to say it hits the fallback option which cancels the test request and kills the openstack server if it's up yet
> + # which the worker then detects, exits the test and kills
> + # the openstack server, then the worker goes on to the next
> + # test in the queue
> + kill_cmd = "kill -USR1 %i" % pid
> + try:
> + _ = subprocess.run(
> + kill_cmd.split(" "),
> + check=True,
> + )
> + while get_test_pid(uuid) is not None:
> + time.sleep(3)
I think this sleep can be shorter - get_test_pid is a very quick function
> + return True
> + except subprocess.CalledProcessError as _:
> + return False
> +
> +
> +def test_is_queued(uuid: str):
ugly function ... mojo stage ([production | staging]) should be in a config file somewhere
> + influx_cfg = configparser.ConfigParser()
> + with open("/home/ubuntu/influx.cred", "r") as f:
> + influx_cfg.read_string("[influx]\n" + f.read())
> + if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging":
> + autopkgtest_url = "https://autopkgtest.staging.ubuntu.com"
> + else:
> + autopkgtest_url = "https://autopkgtest.ubuntu.com"
> + queue_req = requests.get(autopkgtest_url)
lol this isn't even checking queued.json hehehe that's silly
> + if uuid in queue_req.content.decode("utf-8"):
> + return True
> + return False
> +
> +
> +def already_checked_this_host(hostnames):
just return socket.fqdn() in hostnames
> + if socket.getfqdn() in hostnames:
> + return True
> + return False
> +
> +
> +def process_message(msg, amqp_con):
> + body = msg.body
> + if isinstance(body, bytes):
> + body = body.decode("UTF-8", errors="replace")
> + info = json.loads(body)
> + logging.info("Received request to kill test: %s" % json.dumps(info))
> + if not check_message(info):
> + logging.error(
> + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> + )
> + # Remove the message from the queue
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + # If the test hasn't been found on any of the workers, we reach this
> + # Check if the test is currently queued - this could happen in the case
probs can't happen now with recent worker changes - maybe the check for the message in the queue isn't necessary anymore
> + # of infinite looping.
> + if test_is_queued(info["uuid"]):
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"] = []
> + place_message_in_queue(info, amqp_con)
> + else:
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> +
> + if already_checked_this_host(info["not-running-on"]):
> + # We check to see if we've already checked for the job on this cloud worker unit.
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info(
> + "Test already found to not be running on this host, placing back into queue."
> + )
> + place_message_in_queue(info, amqp_con)
> + return
> + # get the test pid
> + pid = get_test_pid(info["uuid"])
> + if pid is None:
> + # The test isn't running on this unit
> + # append this hostname to not-running-on
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"].append(socket.getfqdn())
don't append yet, check len == NUM_WORKERS-1 first
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + logging.info(
> + "Job %s not found on any workers, not re-queueing."
> + % json.dumps(info)
> + )
> + return
> + place_message_in_queue(info, amqp_con)
> + return
> + # Kill the process
> + if kill_process(pid, info["uuid"]):
move the ack after the if since both blocks ack
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info("Job %s has been killed." % json.dumps(info))
> + else:
> + logging.error(
> + "Job %s couldn't be killed! Ignoring." % json.dumps(info)
> + )
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> + logging.basicConfig(level=logging.INFO)
> + amqp_con = amqp_connect()
> + status_ch = amqp_con.channel()
> + status_ch.access_request("/complete", active=True, read=True, write=True)
> + status_ch.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + queue_name = "tests-to-kill"
> + status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> + logging.info("Listening to requests on %s", queue_name)
> + status_ch.basic_consume(
> + "", callback=lambda msg: process_message(msg, amqp_con)
> + )
> + while status_ch.callbacks:
> + status_ch.wait()
> diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> index 309fb82..959fdcd 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> @@ -42,6 +32,14 @@ SUPPORTED_UBUNTU_RELEASES = get_supported_releases()
> INDEXED_PACKAGES_FP = ""
> AMQP_QUEUE_CACHE = "/var/lib/cache-amqp/queued.json"
> RUNNING_CACHE = "/run/amqp-status-collector/running.json"
> +ADMIN_NICKS = [
remove this variable, get from config file also
> + "brian-murray",
> + "andersson123",
> + "paride",
> + "hyask",
> + "vorlon",
> + "sil2000",
> +]
>
>
> def init_config():
> diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> new file mode 100644
> index 0000000..7848e2e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> @@ -0,0 +1,131 @@
> +"""
> +# NEW DOCS
remove NEW DOCS
> +test-manager is an app for autopkgtest-web which sends kill requests to
> +the worker units, detailing the test uuid.
> +
> +The worker units then kill the test with the matching uuid.
> +
> +# NEED TO ADD THE LINK TO RESULTS PAGES TOO?
remove comment
> +On the running page, admins will have a hyperlink under running jobs which, when clicked,
> +will send the kill request.
> +
> +Before sending the kill request, test_manager checks that the uuid is indeed in running.json
> +
> +After sending the kill request, the request is picked up by a systemd unit named "test-killer"
> +on the cloud worker units.
> +
> +This unit, through all the cloud worker units, will find which unit the test is running on,
> +and kill the test, removing the test request from the queue and making the worker unit move
> +on to the next test in the queue.
> +"""
> +
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import pathlib
> +import urllib
> +
> +import amqplib.client_0_8 as amqp
> +from flask import request, session
> +from helpers.exceptions import RunningJSONNotFound
> +from helpers.utils import HTML, get_all_releases, initialise_app, maybe_escape
> +
> +ALL_UBUNTU_RELEASES = get_all_releases()
> +# This should actually be loaded dynamically. Not in global namespace
load ADMIN_NICKS dynamically
> +ADMIN_NICKS_PATH = "/home/ubuntu/.config/autopkgtest-web/admin-nicks"
> +try:
> + ADMIN_NICKS = pathlib.Path(ADMIN_NICKS_PATH).read_text().split(",")
> +except FileNotFoundError as _:
> + ADMIN_NICKS = []
> +
> +RUNNING_FP = "/run/amqp-status-collector/running.json"
> +RUNNING_FILE = pathlib.Path("/run/amqp-status-collector/running.json")
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +
> +
> +def submit_to_queue(message):
> + amqp_con = amqp_connect()
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(message), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def amqp_connect():
> + """Connect to AMQP server"""
> + cp = configparser.ConfigParser()
> + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> + amqp_uri = cp["amqp"]["uri"]
> + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> + amqp_con = amqp.Connection(
> + parts.hostname, userid=parts.username, password=parts.password
> + )
> + logging.info(
> + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> + )
> + return amqp_con
> +
> +
> +PATH, app, secret_path, oid = initialise_app("test_manager")
> +
> +
> +@app.route("/", methods=["GET", "POST"])
> +def index_root():
> + """Handle stop test requests"""
> + session.permanent = True
> + nick = maybe_escape(session.get("nickname"))
> + if nick not in ADMIN_NICKS:
> + return (
> + HTML.format(
> + (
> + "<p>You are not an admin. You are not "
> + "allowed to use this endpoint.</p>"
> + )
> + ),
> + 200,
> + )
> + params = {
> + maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
> + }
> + base = ["uuid"]
unnecessary variable
> + if list(params.keys()) != base:
> + return (
> + HTML.format(
> + "<p>You have passed %s, please only pass the uuid</p>"
> + % ",".join(params.keys())
> + ),
> + 200,
> + )
> + if not RUNNING_FILE.is_file():
> + raise RunningJSONNotFound
> + running_data = json.loads(RUNNING_FILE.read_text())
> + if params["uuid"] not in json.dumps(running_data):
> + return (
> + HTML.format(
> + "<p>uuid %s not found in running jobs</p>" % params["uuid"]
> + ),
> + 200,
> + )
> + queue_message = {
> + "uuid": params["uuid"],
> + "not-running-on": [],
> + }
> + submit_to_queue(queue_message)
> + while params["uuid"] in RUNNING_FILE.read_text():
> + pass
> + return (
> + HTML.format(
> + "<p>Test with uuid %s has been killed.</p>" % params["uuid"]
> + ),
> + 200,
> + )
--
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
References