← Back to team overview

canonical-ubuntu-qa team mailing list archive

Re: [Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master

 

todo:
- exchange name is stop-running not tests-to-kill - that's the queue name
- add docstrings for all functions and make sure everything is typed
- add hyperlinks on other pages which show running jobs
- amend all inline comments from me

Diff comments:

> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..b1bc37e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,211 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +import requests
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> +    "uuid",
> +    "not-running-on",
> +]
> +NUM_WORKERS = 2

this isn't acceptable - something smarter needs to be implemented.

> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> +    RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> +    amqp_con = amqp.Connection(
> +        RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> +        userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> +        password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> +        confirm_publish=True,
> +    )
> +    return amqp_con
> +
> +
> +def check_message(msg):
> +    return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid(uuid):
> +    try:
> +        # get list of running processes
> +        ps_aux_run = subprocess.run(
> +            ["ps", "aux"],
> +            stdout=subprocess.PIPE,
> +            check=True,
> +        )
> +        # Filter the list for only 'runner' processes
> +        runner_run = subprocess.run(
> +            ["grep", "runner"],
> +            input=ps_aux_run.stdout,
> +            stdout=subprocess.PIPE,
> +            check=True,
> +        )
> +        # Check all runner processes for the given uuid
> +        # If this one fails, the test isn't running on this worker
> +        uuid_run = subprocess.run(
> +            ["grep", uuid],
> +            input=runner_run.stdout,
> +            capture_output=True,
> +            check=True,
> +        )
> +    except subprocess.CalledProcessError as _:
> +        # We hit this exception if the test with the given uuid
> +        # isn't running on this cloud worker
> +        return None
> +    search_for_test_output = uuid_run.stdout
> +    search_me = search_for_test_output.splitlines()
> +    # We have to assert the length is 1 otherwise we'll only kill
> +    # the first one in the list - which may be the incorrect one
> +    # if there's two processes with same uuid - something is wrong!
> +    assert len(search_me) == 1
> +    line = search_me[0].decode("utf-8")
> +    if uuid in line:
> +        line = line.split(" ")
> +        line = [x for x in line if x]
> +        pid = line[1]
> +        return int(pid)
> +
> +
> +def place_message_in_queue(info: dict, amqp_con: amqp.Connection):
> +    complete_amqp = amqp_con.channel()
> +    complete_amqp.access_request(
> +        "/complete", active=True, read=False, write=True
> +    )
> +    complete_amqp.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    complete_amqp.basic_publish(
> +        amqp.Message(json.dumps(info), delivery_mode=2),
> +        WRITER_EXCHANGE_NAME,
> +        "",
> +    )
> +
> +
> +def kill_process(pid: int, uuid: str):
> +    # sends SIGUSR1 to worker
> +    # This causes autopkgtest to exit with code -10
> +    # which the worker then detects, exits the test and kills
> +    # the openstack server, then the worker goes on to the next
> +    # test in the queue
> +    kill_cmd = "kill -USR1 %i" % pid
> +    try:
> +        _ = subprocess.run(
> +            kill_cmd.split(" "),
> +            check=True,
> +        )
> +        while get_test_pid(uuid) is not None:
> +            time.sleep(3)
> +        return True
> +    except subprocess.CalledProcessError as _:
> +        return False
> +
> +
> +def test_is_queued(uuid: str):
> +    influx_cfg = configparser.ConfigParser()

perhaps using the influx config here is kind of scuffed and horrible ...

> +    with open("/home/ubuntu/influx.cred", "r") as f:
> +        influx_cfg.read_string("[influx]\n" + f.read())
> +    if influx_cfg["influx"]["INFLUXDB_CONTEXT"] == "staging":
> +        autopkgtest_url = "https://autopkgtest.staging.ubuntu.com";
> +    else:
> +        autopkgtest_url = "https://autopkgtest.ubuntu.com";
> +    queue_req = requests.get(autopkgtest_url)
> +    if uuid in queue_req.content.decode("utf-8"):
> +        return True
> +    return False
> +
> +
> +def already_checked_this_host(hostnames):
> +    if socket.getfqdn() in hostnames:
> +        return True
> +    return False
> +
> +
> +def process_message(msg, amqp_con):
> +    body = msg.body
> +    if isinstance(body, bytes):
> +        body = body.decode("UTF-8", errors="replace")
> +    info = json.loads(body)
> +    logging.info("Received request to kill test: %s" % json.dumps(info))
> +    if not check_message(info):
> +        logging.error(
> +            "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> +        )
> +        # Remove the message from the queue
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +    if len(info["not-running-on"]) == NUM_WORKERS:
> +        # If the test hasn't been found on any of the workers, we reach this
> +        # Check if the test is currently queued - this could happen in the case
> +        # of infinite looping.
> +        if test_is_queued(info["uuid"]):
> +            msg.channel.basic_ack(msg.delivery_tag)
> +            info["not-running-on"] = []
> +            place_message_in_queue(info, amqp_con)
> +        else:
> +            msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +
> +    if already_checked_this_host(info["not-running-on"]):
> +        # We check to see if we've already checked for the job on this cloud worker unit.
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        logging.info(
> +            "Test already found to not be running on this host, placing back into queue."
> +        )
> +        place_message_in_queue(info, amqp_con)
> +        return
> +    # get the test pid
> +    pid = get_test_pid(info["uuid"])
> +    if pid is None:
> +        # The test isn't running on this unit
> +        # append this hostname to not-running-on
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        info["not-running-on"].append(socket.getfqdn())
> +        if len(info["not-running-on"]) == NUM_WORKERS:
> +            logging.info(
> +                "Job %s not found on any workers, not re-queueing."
> +                % json.dumps(info)
> +            )
> +            return
> +        place_message_in_queue(info, amqp_con)
> +        return
> +    # Kill the process
> +    if kill_process(pid, info["uuid"]):
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        logging.info("Job %s has been killed." % json.dumps(info))
> +    else:
> +        logging.error(
> +            "Job %s couldn't be killed! Ignoring." % json.dumps(info)
> +        )
> +        msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> +    logging.basicConfig(level=logging.INFO)
> +    amqp_con = amqp_connect()
> +    status_ch = amqp_con.channel()
> +    status_ch.access_request("/complete", active=True, read=True, write=True)
> +    status_ch.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    queue_name = "tests-to-kill"
> +    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> +    status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> +    logging.info("Listening to requests on %s", queue_name)
> +    status_ch.basic_consume(
> +        "", callback=lambda msg: process_message(msg, amqp_con)
> +    )
> +    while status_ch.callbacks:
> +        status_ch.wait()
> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> index bfa35e7..5c9105d 100755
> --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> @@ -1174,12 +1208,37 @@ def request(msg):
>                  elif code == 16 or code < 0:
>                      contents = log_contents(out_dir)
>                      if exit_requested is not None:
> -                        logging.warning(
> -                            "Testbed failure and exit %i requested. Log follows:",
> -                            exit_requested,
> -                        )
> -                        logging.error(contents)
> -                        sys.exit(exit_requested)
> +                        # exit_requested is set to 99 when the test is requested to be killed
> +                        if exit_requested != 99:
> +                            logging.warning(
> +                                "Testbed failure and exit %i requested. Log follows:",
> +                                exit_requested,
> +                            )
> +                            logging.error(contents)
> +                            sys.exit(exit_requested)
> +                        else:
> +                            # Test has been requested to be killed
> +                            logging.info(
> +                                "Test has been killed by test-killer, exiting."
> +                            )
> +                            running_test = False
> +                            # ack the message so it doesn't go back in the queue
> +                            msg.channel.basic_ack(msg.delivery_tag)
> +                            # remove the output directory
> +                            shutil.rmtree(out_dir)
> +                            # get the openstack server name from the command line args
> +                            argv_iter = iter(argv)
> +                            while next(argv_iter) != "--name":
> +                                pass
> +                            openstack_server_name = next(argv_iter)
> +                            # make this a function

leftover comment

> +                            logging.info(
> +                                "Killing openstack server %s",
> +                                openstack_server_name,
> +                            )
> +                            kill_openstack_server(openstack_server_name)
> +                            logging.info("Deleted %s", openstack_server_name)
> +                            return
>                      # Get the package-specific string for triggers too, since they might have broken the run
>                      trigs = [
>                          t.split("/", 1)[0] for t in params.get("triggers", [])
> diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> index 309fb82..959fdcd 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> @@ -10,30 +10,20 @@ from collections import OrderedDict
>  from wsgiref.handlers import CGIHandler
>  
>  import flask
> +from flask import session
>  from helpers.admin import select_abnormally_long_jobs
>  from helpers.exceptions import RunningJSONNotFound
>  from helpers.utils import (
>      get_all_releases,
>      get_autopkgtest_cloud_conf,
>      get_supported_releases,
> -    setup_key,
> +    initialise_app,
>  )
> -from werkzeug.middleware.proxy_fix import ProxyFix
>  
>  # Initialize app
> -PATH = os.path.join(
> -    os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), "autopkgtest_webcontrol"
> -)
> -os.makedirs(PATH, exist_ok=True)
> -app = flask.Flask("browse")
> -# we don't want a long cache, as we only serve files that are regularly updated

removed this comment by accident!

> +PATH, app, secret_path, _ = initialise_app("browse")
>  app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 60
>  
> -app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
> -
> -secret_path = os.path.join(PATH, "secret_key")
> -setup_key(app, secret_path)
> -
>  db_con = None
>  swift_container_url = None
>  
> @@ -42,6 +32,14 @@ SUPPORTED_UBUNTU_RELEASES = get_supported_releases()
>  INDEXED_PACKAGES_FP = ""
>  AMQP_QUEUE_CACHE = "/var/lib/cache-amqp/queued.json"
>  RUNNING_CACHE = "/run/amqp-status-collector/running.json"
> +ADMIN_NICKS = [

remove this, no longer necessary

> +    "brian-murray",
> +    "andersson123",
> +    "paride",
> +    "hyask",
> +    "vorlon",
> +    "sil2000",
> +]
>  
>  
>  def init_config():
> @@ -573,6 +571,8 @@ def running():
>      running_count = 0
>      for pkg in packages:
>          running_count += len(running_info[pkg].keys())
> +    show_stop = False

remove this unnecessary line

> +    show_stop = session.get("nickname", "") in ADMIN_NICKS
>  
>      return render(
>          "browse-running.html",
> @@ -582,6 +582,7 @@ def running():
>          queues_lengths=queues_lengths,
>          running=running_info,
>          running_count=running_count,
> +        show_stop=show_stop,

this could also be on other pages which display running jobs

>      )
>  
>  
> diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> new file mode 100644
> index 0000000..7848e2e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> @@ -0,0 +1,131 @@
> +"""
> +# NEW DOCS

remove

> +test-manager is an app for autopkgtest-web which sends kill requests to
> +the worker units, detailing the test uuid.
> +
> +The worker units then kill the test with the matching uuid.
> +
> +# NEED TO ADD THE LINK TO RESULTS PAGES TOO?
> +On the running page, admins will have a hyperlink under running jobs which, when clicked,
> +will send the kill request.
> +
> +Before sending the kill request, test_manager checks that the uuid is indeed in running.json
> +
> +After sending the kill request, the request is picked up by a systemd unit named "test-killer"
> +on the cloud worker units.
> +
> +This unit, through all the cloud worker units, will find which unit the test is running on,
> +and kill the test, removing the test request from the queue and making the worker unit move
> +on to the next test in the queue.
> +"""
> +
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import pathlib
> +import urllib
> +
> +import amqplib.client_0_8 as amqp
> +from flask import request, session
> +from helpers.exceptions import RunningJSONNotFound
> +from helpers.utils import HTML, get_all_releases, initialise_app, maybe_escape
> +
> +ALL_UBUNTU_RELEASES = get_all_releases()
> +# This should actually be loaded dynamically. Not in global namespace

do this

> +ADMIN_NICKS_PATH = "/home/ubuntu/.config/autopkgtest-web/admin-nicks"
> +try:
> +    ADMIN_NICKS = pathlib.Path(ADMIN_NICKS_PATH).read_text().split(",")
> +except FileNotFoundError as _:
> +    ADMIN_NICKS = []
> +
> +RUNNING_FP = "/run/amqp-status-collector/running.json"
> +RUNNING_FILE = pathlib.Path("/run/amqp-status-collector/running.json")
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +
> +
> +def submit_to_queue(message):
> +    amqp_con = amqp_connect()
> +    complete_amqp = amqp_con.channel()
> +    complete_amqp.access_request(
> +        "/complete", active=True, read=False, write=True
> +    )
> +    complete_amqp.exchange_declare(
> +        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    complete_amqp.basic_publish(
> +        amqp.Message(json.dumps(message), delivery_mode=2),
> +        WRITER_EXCHANGE_NAME,
> +        "",
> +    )
> +
> +
> +def amqp_connect():
> +    """Connect to AMQP server"""
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_uri = cp["amqp"]["uri"]
> +    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> +    amqp_con = amqp.Connection(
> +        parts.hostname, userid=parts.username, password=parts.password
> +    )
> +    logging.info(
> +        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> +    )
> +    return amqp_con
> +
> +
> +PATH, app, secret_path, oid = initialise_app("test_manager")
> +
> +
> +@app.route("/", methods=["GET", "POST"])
> +def index_root():
> +    """Handle stop test requests"""
> +    session.permanent = True
> +    nick = maybe_escape(session.get("nickname"))
> +    if nick not in ADMIN_NICKS:
> +        return (
> +            HTML.format(
> +                (
> +                    "<p>You are not an admin. You are not "
> +                    "allowed to use this endpoint.</p>"
> +                )
> +            ),
> +            200,
> +        )
> +    params = {
> +        maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
> +    }
> +    base = ["uuid"]
> +    if list(params.keys()) != base:
> +        return (
> +            HTML.format(
> +                "<p>You have passed %s, please only pass the uuid</p>"
> +                % ",".join(params.keys())
> +            ),
> +            200,
> +        )
> +    if not RUNNING_FILE.is_file():
> +        raise RunningJSONNotFound
> +    running_data = json.loads(RUNNING_FILE.read_text())
> +    if params["uuid"] not in json.dumps(running_data):
> +        return (
> +            HTML.format(
> +                "<p>uuid %s not found in running jobs</p>" % params["uuid"]
> +            ),
> +            200,
> +        )
> +    queue_message = {
> +        "uuid": params["uuid"],
> +        "not-running-on": [],
> +    }
> +    submit_to_queue(queue_message)
> +    while params["uuid"] in RUNNING_FILE.read_text():
> +        pass
> +    return (
> +        HTML.format(
> +            "<p>Test with uuid %s has been killed.</p>" % params["uuid"]
> +        ),
> +        200,
> +    )


-- 
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.



References