← Back to team overview

canonical-ubuntu-qa team mailing list archive

Re: [Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master

 

inline comments all addressed

Diff comments:

> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..b1bc37e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,211 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +import requests
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> +    "uuid",
> +    "not-running-on",
> +]
> +NUM_WORKERS = 2

done

> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> +    RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> +    amqp_con = amqp.Connection(
> +        RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> +        userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> +        password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> +        confirm_publish=True,
> +    )
> +    return amqp_con
> +
> +
> +def check_message(msg):
> +    return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid(uuid):
> +    try:
> +        # get list of running processes
> +        ps_aux_run = subprocess.run(
> +            ["ps", "aux"],
> +            stdout=subprocess.PIPE,
> +            check=True,
> +        )
> +        # Filter the list for only 'runner' processes
> +        runner_run = subprocess.run(
> +            ["grep", "runner"],
> +            input=ps_aux_run.stdout,
> +            stdout=subprocess.PIPE,
> +            check=True,
> +        )
> +        # Check all runner processes for the given uuid
> +        # If this one fails, the test isn't running on this worker
> +        uuid_run = subprocess.run(
> +            ["grep", uuid],
> +            input=runner_run.stdout,
> +            capture_output=True,
> +            check=True,
> +        )
> +    except subprocess.CalledProcessError as _:
> +        # We hit this exception if the test with the given uuid
> +        # isn't running on this cloud worker
> +        return None
> +    search_for_test_output = uuid_run.stdout
> +    search_me = search_for_test_output.splitlines()
> +    # We have to assert the length is 1 otherwise we'll only kill
> +    # the first one in the list - which may be the incorrect one
> +    # if there's two processes with same uuid - something is wrong!
> +    assert len(search_me) == 1
> +    line = search_me[0].decode("utf-8")
> +    if uuid in line:

i think is fine

> +        line = line.split(" ")
> +        line = [x for x in line if x]
> +        pid = line[1]
> +        return int(pid)
> +
> +
> +def place_message_in_queue(info: dict, amqp_con: amqp.Connection):
> +    complete_amqp = amqp_con.channel()
> +    complete_amqp.access_request(
> +        "/complete", active=True, read=False, write=True
> +    )
> +    complete_amqp.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    complete_amqp.basic_publish(
> +        amqp.Message(json.dumps(info), delivery_mode=2),
> +        WRITER_EXCHANGE_NAME,
> +        "",
> +    )
> +
> +
> +def kill_process(pid: int, uuid: str):
> +    # sends SIGUSR1 to worker
> +    # This causes autopkgtest to exit with code -10
> +    # which the worker then detects, exits the test and kills
> +    # the openstack server, then the worker goes on to the next
> +    # test in the queue
> +    kill_cmd = "kill -USR1 %i" % pid
> +    try:
> +        _ = subprocess.run(
> +            kill_cmd.split(" "),
> +            check=True,
> +        )
> +        while get_test_pid(uuid) is not None:
> +            time.sleep(3)

done

> +        return True
> +    except subprocess.CalledProcessError as _:
> +        return False
> +
> +
> +def test_is_queued(uuid: str):

done

> +    influx_cfg = configparser.ConfigParser()
> +    with open("/home/ubuntu/influx.cred", "r") as f:
> +        influx_cfg.read_string("[influx]\n" + f.read())
> +    if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging":
> +        autopkgtest_url = "https://autopkgtest.staging.ubuntu.com";
> +    else:
> +        autopkgtest_url = "https://autopkgtest.ubuntu.com";
> +    queue_req = requests.get(autopkgtest_url)

done

> +    if uuid in queue_req.content.decode("utf-8"):
> +        return True
> +    return False
> +
> +
> +def already_checked_this_host(hostnames):

done

> +    if socket.getfqdn() in hostnames:
> +        return True
> +    return False
> +
> +
> +def process_message(msg, amqp_con):
> +    body = msg.body
> +    if isinstance(body, bytes):
> +        body = body.decode("UTF-8", errors="replace")
> +    info = json.loads(body)
> +    logging.info("Received request to kill test: %s" % json.dumps(info))
> +    if not check_message(info):
> +        logging.error(
> +            "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> +        )
> +        # Remove the message from the queue
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +    if len(info["not-running-on"]) == NUM_WORKERS:
> +        # If the test hasn't been found on any of the workers, we reach this
> +        # Check if the test is currently queued - this could happen in the case

need to talk to team about this change methinks

> +        # of infinite looping.
> +        if test_is_queued(info["uuid"]):
> +            msg.channel.basic_ack(msg.delivery_tag)
> +            info["not-running-on"] = []
> +            place_message_in_queue(info, amqp_con)
> +        else:
> +            msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +
> +    if already_checked_this_host(info["not-running-on"]):
> +        # We check to see if we've already checked for the job on this cloud worker unit.
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        logging.info(
> +            "Test already found to not be running on this host, placing back into queue."
> +        )
> +        place_message_in_queue(info, amqp_con)
> +        return
> +    # get the test pid
> +    pid = get_test_pid(info["uuid"])
> +    if pid is None:
> +        # The test isn't running on this unit
> +        # append this hostname to not-running-on
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        info["not-running-on"].append(socket.getfqdn())

done

> +        if len(info["not-running-on"]) == NUM_WORKERS:
> +            logging.info(
> +                "Job %s not found on any workers, not re-queueing."
> +                % json.dumps(info)
> +            )
> +            return
> +        place_message_in_queue(info, amqp_con)
> +        return
> +    # Kill the process
> +    if kill_process(pid, info["uuid"]):

done

> +        msg.channel.basic_ack(msg.delivery_tag)
> +        logging.info("Job %s has been killed." % json.dumps(info))
> +    else:
> +        logging.error(
> +            "Job %s couldn't be killed! Ignoring." % json.dumps(info)
> +        )
> +        msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> +    logging.basicConfig(level=logging.INFO)
> +    amqp_con = amqp_connect()
> +    status_ch = amqp_con.channel()
> +    status_ch.access_request("/complete", active=True, read=True, write=True)
> +    status_ch.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    queue_name = "tests-to-kill"
> +    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> +    status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> +    logging.info("Listening to requests on %s", queue_name)
> +    status_ch.basic_consume(
> +        "", callback=lambda msg: process_message(msg, amqp_con)
> +    )
> +    while status_ch.callbacks:
> +        status_ch.wait()
> diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> index 309fb82..959fdcd 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> @@ -42,6 +32,14 @@ SUPPORTED_UBUNTU_RELEASES = get_supported_releases()
>  INDEXED_PACKAGES_FP = ""
>  AMQP_QUEUE_CACHE = "/var/lib/cache-amqp/queued.json"
>  RUNNING_CACHE = "/run/amqp-status-collector/running.json"
> +ADMIN_NICKS = [

make shared function actually

EDIT: done

> +    "brian-murray",
> +    "andersson123",
> +    "paride",
> +    "hyask",
> +    "vorlon",
> +    "sil2000",
> +]
>  
>  
>  def init_config():
> diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> new file mode 100644
> index 0000000..7848e2e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> @@ -0,0 +1,131 @@
> +"""
> +# NEW DOCS

done

> +test-manager is an app for autopkgtest-web which sends kill requests to
> +the worker units, detailing the test uuid.
> +
> +The worker units then kill the test with the matching uuid.
> +
> +# NEED TO ADD THE LINK TO RESULTS PAGES TOO?

done

> +On the running page, admins will have a hyperlink under running jobs which, when clicked,
> +will send the kill request.
> +
> +Before sending the kill request, test_manager checks that the uuid is indeed in running.json
> +
> +After sending the kill request, the request is picked up by a systemd unit named "test-killer"
> +on the cloud worker units.
> +
> +This unit, through all the cloud worker units, will find which unit the test is running on,
> +and kill the test, removing the test request from the queue and making the worker unit move
> +on to the next test in the queue.
> +"""
> +
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import pathlib
> +import urllib
> +
> +import amqplib.client_0_8 as amqp
> +from flask import request, session
> +from helpers.exceptions import RunningJSONNotFound
> +from helpers.utils import HTML, get_all_releases, initialise_app, maybe_escape
> +
> +ALL_UBUNTU_RELEASES = get_all_releases()
> +# This should actually be loaded dynamically. Not in global namespace

done

> +ADMIN_NICKS_PATH = "/home/ubuntu/.config/autopkgtest-web/admin-nicks"
> +try:
> +    ADMIN_NICKS = pathlib.Path(ADMIN_NICKS_PATH).read_text().split(",")
> +except FileNotFoundError as _:
> +    ADMIN_NICKS = []
> +
> +RUNNING_FP = "/run/amqp-status-collector/running.json"
> +RUNNING_FILE = pathlib.Path("/run/amqp-status-collector/running.json")
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +
> +
> +def submit_to_queue(message):
> +    amqp_con = amqp_connect()
> +    complete_amqp = amqp_con.channel()
> +    complete_amqp.access_request(
> +        "/complete", active=True, read=False, write=True
> +    )
> +    complete_amqp.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    complete_amqp.basic_publish(
> +        amqp.Message(json.dumps(message), delivery_mode=2),
> +        WRITER_EXCHANGE_NAME,
> +        "",
> +    )
> +
> +
> +def amqp_connect():
> +    """Connect to AMQP server"""
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_uri = cp["amqp"]["uri"]
> +    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> +    amqp_con = amqp.Connection(
> +        parts.hostname, userid=parts.username, password=parts.password
> +    )
> +    logging.info(
> +        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> +    )
> +    return amqp_con
> +
> +
> +PATH, app, secret_path, oid = initialise_app("test_manager")
> +
> +
> +@app.route("/", methods=["GET", "POST"])
> +def index_root():
> +    """Handle stop test requests"""
> +    session.permanent = True
> +    nick = maybe_escape(session.get("nickname"))
> +    if nick not in ADMIN_NICKS:
> +        return (
> +            HTML.format(
> +                (
> +                    "<p>You are not an admin. You are not "
> +                    "allowed to use this endpoint.</p>"
> +                )
> +            ),
> +            200,
> +        )
> +    params = {
> +        maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
> +    }
> +    base = ["uuid"]

done

> +    if list(params.keys()) != base:
> +        return (
> +            HTML.format(
> +                "<p>You have passed %s, please only pass the uuid</p>"
> +                % ",".join(params.keys())
> +            ),
> +            200,
> +        )
> +    if not RUNNING_FILE.is_file():
> +        raise RunningJSONNotFound
> +    running_data = json.loads(RUNNING_FILE.read_text())
> +    if params["uuid"] not in json.dumps(running_data):
> +        return (
> +            HTML.format(
> +                "<p>uuid %s not found in running jobs</p>" % params["uuid"]
> +            ),
> +            200,
> +        )
> +    queue_message = {
> +        "uuid": params["uuid"],
> +        "not-running-on": [],
> +    }
> +    submit_to_queue(queue_message)
> +    while params["uuid"] in RUNNING_FILE.read_text():
> +        pass
> +    return (
> +        HTML.format(
> +            "<p>Test with uuid %s has been killed.</p>" % params["uuid"]
> +        ),
> +        200,
> +    )


-- 
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.



References