canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03852
Re: [Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master
inline comments all addressed
Diff comments:
> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..b1bc37e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,211 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +import requests
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> + "uuid",
> + "not-running-on",
> +]
> +NUM_WORKERS = 2
done
> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> + amqp_con = amqp.Connection(
> + RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> + confirm_publish=True,
> + )
> + return amqp_con
> +
> +
> +def check_message(msg):
> + return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid(uuid):
> + try:
> + # get list of running processes
> + ps_aux_run = subprocess.run(
> + ["ps", "aux"],
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Filter the list for only 'runner' processes
> + runner_run = subprocess.run(
> + ["grep", "runner"],
> + input=ps_aux_run.stdout,
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # Check all runner processes for the given uuid
> + # If this one fails, the test isn't running on this worker
> + uuid_run = subprocess.run(
> + ["grep", uuid],
> + input=runner_run.stdout,
> + capture_output=True,
> + check=True,
> + )
> + except subprocess.CalledProcessError as _:
> + # We hit this exception if the test with the given uuid
> + # isn't running on this cloud worker
> + return None
> + search_for_test_output = uuid_run.stdout
> + search_me = search_for_test_output.splitlines()
> + # We have to assert the length is 1 otherwise we'll only kill
> + # the first one in the list - which may be the incorrect one
> + # if there's two processes with same uuid - something is wrong!
> + assert len(search_me) == 1
> + line = search_me[0].decode("utf-8")
> + if uuid in line:
i think is fine
> + line = line.split(" ")
> + line = [x for x in line if x]
> + pid = line[1]
> + return int(pid)
> +
> +
> +def place_message_in_queue(info: dict, amqp_con: amqp.Connection):
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(info), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def kill_process(pid: int, uuid: str):
> + # sends SIGUSR1 to worker
> + # This causes autopkgtest to exit with code -10
> + # which the worker then detects, exits the test and kills
> + # the openstack server, then the worker goes on to the next
> + # test in the queue
> + kill_cmd = "kill -USR1 %i" % pid
> + try:
> + _ = subprocess.run(
> + kill_cmd.split(" "),
> + check=True,
> + )
> + while get_test_pid(uuid) is not None:
> + time.sleep(3)
done
> + return True
> + except subprocess.CalledProcessError as _:
> + return False
> +
> +
> +def test_is_queued(uuid: str):
done
> + influx_cfg = configparser.ConfigParser()
> + with open("/home/ubuntu/influx.cred", "r") as f:
> + influx_cfg.read_string("[influx]\n" + f.read())
> + if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging":
> + autopkgtest_url = "https://autopkgtest.staging.ubuntu.com"
> + else:
> + autopkgtest_url = "https://autopkgtest.ubuntu.com"
> + queue_req = requests.get(autopkgtest_url)
done
> + if uuid in queue_req.content.decode("utf-8"):
> + return True
> + return False
> +
> +
> +def already_checked_this_host(hostnames):
done
> + if socket.getfqdn() in hostnames:
> + return True
> + return False
> +
> +
> +def process_message(msg, amqp_con):
> + body = msg.body
> + if isinstance(body, bytes):
> + body = body.decode("UTF-8", errors="replace")
> + info = json.loads(body)
> + logging.info("Received request to kill test: %s" % json.dumps(info))
> + if not check_message(info):
> + logging.error(
> + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> + )
> + # Remove the message from the queue
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + # If the test hasn't been found on any of the workers, we reach this
> + # Check if the test is currently queued - this could happen in the case
need to talk to team about this change methinks
> + # of infinite looping.
> + if test_is_queued(info["uuid"]):
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"] = []
> + place_message_in_queue(info, amqp_con)
> + else:
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> +
> + if already_checked_this_host(info["not-running-on"]):
> + # We check to see if we've already checked for the job on this cloud worker unit.
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info(
> + "Test already found to not be running on this host, placing back into queue."
> + )
> + place_message_in_queue(info, amqp_con)
> + return
> + # get the test pid
> + pid = get_test_pid(info["uuid"])
> + if pid is None:
> + # The test isn't running on this unit
> + # append this hostname to not-running-on
> + msg.channel.basic_ack(msg.delivery_tag)
> + info["not-running-on"].append(socket.getfqdn())
done
> + if len(info["not-running-on"]) == NUM_WORKERS:
> + logging.info(
> + "Job %s not found on any workers, not re-queueing."
> + % json.dumps(info)
> + )
> + return
> + place_message_in_queue(info, amqp_con)
> + return
> + # Kill the process
> + if kill_process(pid, info["uuid"]):
done
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info("Job %s has been killed." % json.dumps(info))
> + else:
> + logging.error(
> + "Job %s couldn't be killed! Ignoring." % json.dumps(info)
> + )
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> + logging.basicConfig(level=logging.INFO)
> + amqp_con = amqp_connect()
> + status_ch = amqp_con.channel()
> + status_ch.access_request("/complete", active=True, read=True, write=True)
> + status_ch.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + queue_name = "tests-to-kill"
> + status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> + logging.info("Listening to requests on %s", queue_name)
> + status_ch.basic_consume(
> + "", callback=lambda msg: process_message(msg, amqp_con)
> + )
> + while status_ch.callbacks:
> + status_ch.wait()
> diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> index 309fb82..959fdcd 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> @@ -42,6 +32,14 @@ SUPPORTED_UBUNTU_RELEASES = get_supported_releases()
> INDEXED_PACKAGES_FP = ""
> AMQP_QUEUE_CACHE = "/var/lib/cache-amqp/queued.json"
> RUNNING_CACHE = "/run/amqp-status-collector/running.json"
> +ADMIN_NICKS = [
make shared function actually
EDIT: done
> + "brian-murray",
> + "andersson123",
> + "paride",
> + "hyask",
> + "vorlon",
> + "sil2000",
> +]
>
>
> def init_config():
> diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> new file mode 100644
> index 0000000..7848e2e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> @@ -0,0 +1,131 @@
> +"""
> +# NEW DOCS
done
> +test-manager is an app for autopkgtest-web which sends kill requests to
> +the worker units, detailing the test uuid.
> +
> +The worker units then kill the test with the matching uuid.
> +
> +# NEED TO ADD THE LINK TO RESULTS PAGES TOO?
done
> +On the running page, admins will have a hyperlink under running jobs which, when clicked,
> +will send the kill request.
> +
> +Before sending the kill request, test_manager checks that the uuid is indeed in running.json
> +
> +After sending the kill request, the request is picked up by a systemd unit named "test-killer"
> +on the cloud worker units.
> +
> +This unit, through all the cloud worker units, will find which unit the test is running on,
> +and kill the test, removing the test request from the queue and making the worker unit move
> +on to the next test in the queue.
> +"""
> +
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import pathlib
> +import urllib
> +
> +import amqplib.client_0_8 as amqp
> +from flask import request, session
> +from helpers.exceptions import RunningJSONNotFound
> +from helpers.utils import HTML, get_all_releases, initialise_app, maybe_escape
> +
> +ALL_UBUNTU_RELEASES = get_all_releases()
> +# This should actually be loaded dynamically. Not in global namespace
done
> +ADMIN_NICKS_PATH = "/home/ubuntu/.config/autopkgtest-web/admin-nicks"
> +try:
> + ADMIN_NICKS = pathlib.Path(ADMIN_NICKS_PATH).read_text().split(",")
> +except FileNotFoundError as _:
> + ADMIN_NICKS = []
> +
> +RUNNING_FP = "/run/amqp-status-collector/running.json"
> +RUNNING_FILE = pathlib.Path("/run/amqp-status-collector/running.json")
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +
> +
> +def submit_to_queue(message):
> + amqp_con = amqp_connect()
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(message), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def amqp_connect():
> + """Connect to AMQP server"""
> + cp = configparser.ConfigParser()
> + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> + amqp_uri = cp["amqp"]["uri"]
> + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> + amqp_con = amqp.Connection(
> + parts.hostname, userid=parts.username, password=parts.password
> + )
> + logging.info(
> + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> + )
> + return amqp_con
> +
> +
> +PATH, app, secret_path, oid = initialise_app("test_manager")
> +
> +
> +@app.route("/", methods=["GET", "POST"])
> +def index_root():
> + """Handle stop test requests"""
> + session.permanent = True
> + nick = maybe_escape(session.get("nickname"))
> + if nick not in ADMIN_NICKS:
> + return (
> + HTML.format(
> + (
> + "<p>You are not an admin. You are not "
> + "allowed to use this endpoint.</p>"
> + )
> + ),
> + 200,
> + )
> + params = {
> + maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
> + }
> + base = ["uuid"]
done
> + if list(params.keys()) != base:
> + return (
> + HTML.format(
> + "<p>You have passed %s, please only pass the uuid</p>"
> + % ",".join(params.keys())
> + ),
> + 200,
> + )
> + if not RUNNING_FILE.is_file():
> + raise RunningJSONNotFound
> + running_data = json.loads(RUNNING_FILE.read_text())
> + if params["uuid"] not in json.dumps(running_data):
> + return (
> + HTML.format(
> + "<p>uuid %s not found in running jobs</p>" % params["uuid"]
> + ),
> + 200,
> + )
> + queue_message = {
> + "uuid": params["uuid"],
> + "not-running-on": [],
> + }
> + submit_to_queue(queue_message)
> + while params["uuid"] in RUNNING_FILE.read_text():
> + pass
> + return (
> + HTML.format(
> + "<p>Test with uuid %s has been killed.</p>" % params["uuid"]
> + ),
> + 200,
> + )
--
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
References