canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03824
Re: [Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master
left to do:
- test again
- finish some minor refactoring, make sure in sensible commits
- use pathlib instead of alternatives in test_manager/app.py
- knife emoji for kill test hyperlink?
Diff comments:
> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> new file mode 100755
> index 0000000..7eefac5
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
> @@ -0,0 +1,181 @@
> +#!/usr/bin/python3
> +"""Kills running tests."""
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import socket
> +import subprocess
> +import time
> +
> +import amqplib.client_0_8 as amqp
> +
> +# from helpers.utils import init_db
> +
> +
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +# needs rabbitmq.cred
done
> +RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
> +MSG_ONLY_KEYS = [
> + "uuid",
> + "not-running-on",
> +]
> +NUM_WORKERS = 2
> +QUEUE_PATH = "/run/amqp-status-collector/running.json"
> +
> +RABBIT_CFG = configparser.ConfigParser()
> +with open(RABBIT_CREDS, "r") as f:
> + RABBIT_CFG.read_string("[rabbit]\n" + f.read().replace('"', ""))
> +
> +
> +def amqp_connect():
> + amqp_con = amqp.Connection(
> + RABBIT_CFG["rabbit"]["RABBIT_HOST"],
> + userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
> + password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
> + confirm_publish=True,
> + )
> + return amqp_con
> +
> +
> +def check_message(msg):
> + return list(msg.keys()) == MSG_ONLY_KEYS
> +
> +
> +def get_test_pid_and_server_name(uuid):
> + # ps aux | grep runner | grep uuid
done
> + ps_aux = "ps aux"
done
> + grep_runner = "grep runner"
> + grep_for_uuid = "grep " + uuid
> + try:
done
> + ps_aux_run = subprocess.run(
> + ps_aux.split(" "), stdout=subprocess.PIPE, check=True
> + )
> + runner_run = subprocess.run(
> + grep_runner.split(" "),
> + input=ps_aux_run.stdout,
> + stdout=subprocess.PIPE,
> + check=True,
> + )
> + # If this one fails, the test isn't running on this worker
> + uuid_run = subprocess.run(
> + grep_for_uuid.split(" "),
> + input=runner_run.stdout,
> + capture_output=True,
> + check=True,
> + )
> + except subprocess.CalledProcessError as _:
> + return None, None
> + search_for_test_output = uuid_run.stdout
> + search_me = search_for_test_output.splitlines()
> + assert len(search_me) == 1
done
> + line = search_me[0].decode("utf-8")
> + if uuid in line:
> + line = line.split(" ")
> + line = [x for x in line if x]
> + pid = line[1]
> + return int(pid)
> +
> +
> +def place_new_message_in_queue(info: dict, amqp_con: amqp.Connection):
done
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(info), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def kill_process(pid: int):
> + # kill_cmd = "kill %i -15" % pid
done
> + # sends SIGUSR1 to worker
done
> + kill_cmd = "kill -USR1 %i" % pid
> + _ = subprocess.run(
done
> + kill_cmd.split(" "),
> + check=True,
> + )
> +
> +
> +def kill_server(server_name: str):
done
> + pass
> +
> +
> +def process_message(msg, amqp_con):
> + body = msg.body
> + if isinstance(body, bytes):
> + body = body.decode("UTF-8", errors="replace")
> + info = json.loads(body)
> + logging.info("Message is: \n%s", json.dumps(info, indent=2))
done
> + if not check_message(info):
> + logging.error(
> + "Message %s is invalid. Ignoring.", json.dumps(info, indent=2)
> + )
> + # removes from queue!
> + msg.channel.basic_reject(msg.delivery_tag, requeue=False)
done
> + return
> + if len(info["not-running-on"]) == NUM_WORKERS:
done
> + queue_data = {}
> + if os.path.isfile(QUEUE_PATH):
> + with open(QUEUE_PATH, "r") as f:
> + queue_data = json.load(f)
> + if info["uuid"] in str(queue_data):
> + msg.channel.basic_ack(msg.delivery_tag)
> + # we need to reconstruct message with not-running-on = []
> + info["not-running-on"] = []
> + place_new_message_in_queue(info, amqp_con)
> + else:
> + # looks like the test has finished before we've had a chance to kill it
> + msg.channel.basic_ack(msg.delivery_tag)
> + return
> + else:
done
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> + if socket.getfqdn() in info["not-running-on"]:
I don't think functions necessary, done though
> + # places back into queue as is
> + msg.channel.basic_ack(msg.delivery_tag)
> + logging.info(
> + "Test already found to not be running on this host, placing back into queue."
> + )
> + place_new_message_in_queue(info, amqp_con)
done
> + return
> + pid = get_test_pid_and_server_name(info["uuid"])
> + if pid is None:
> + info["not-running-on"].append(socket.getfqdn())
> + msg.channel.basic_ack(msg.delivery_tag)
> + place_new_message_in_queue(info, amqp_con)
> + # removes from queue without saying it failed or anything
done
> + return
> + kill_process(pid)
> + ##########################################
done
> + # after killing the test, we need to remove the openstack server
done
> + # need to save the output of ps_aux
> + while get_test_pid_and_server_name(info["uuid"]) is not None:
done
> + time.sleep(3)
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> + logging.basicConfig(level=logging.INFO)
> + amqp_con = amqp_connect()
> + status_ch = amqp_con.channel()
> + status_ch.access_request("/complete", active=True, read=True, write=True)
> + # need to go back and check the functionality of durable=True and auto_delete=False
done
> + status_ch.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + queue_name = "tests-to-kill"
> + status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> + status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
> + logging.info("Listening to requests on %s", queue_name)
> + status_ch.basic_consume(
> + "", no_ack=False, callback=lambda msg: process_message(msg, amqp_con)
> + )
> + while status_ch.callbacks:
> + status_ch.wait()
> diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> index bfa35e7..9f60bce 100755
> --- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> +++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
> @@ -222,6 +229,30 @@ def getglob(d, glob, default=None):
> return default
>
>
> +# signal.
done
> +# https://www.ibm.com/docs/en/aix/7.2?topic=management-process-termination
> +
> +
> +# looks like we don't even need this lol... not that it even works.
done
> +def usr1_handler(signum, frame):
> + """SIGUSR1 handler, for killing a test"""
> + # okay so this doesn't seem to work at all
> + # no log messaged, exit_requested doesn't get modified
> + # oh wait nvm it actually does work?
> + # sending this RN, it gets interpreted as a testbed failure
> + # OH SHIT THAT'S ACTUALLY REALLY GOOD HAHAHAHAHAH
> + # YAAAAAAAAAAAAAAAAAAAAAAAAAAY
> + logging.error("Caught SIGUSR1, requesting exit")
> + global autopkgtest
> + global exit_requested
> + autopkgtest.kill()
> + _, _ = autopkgtest.communicate()
> + shutil.rmtree(out_dir)
> + os.mkdir(out_dir)
> + logging.info("Exiting...")
> + sys.exit(0)
> +
> +
> def term_handler(signum, frame):
> """SIGTERM handler, for clean exit after current test"""
>
> @@ -554,6 +585,8 @@ def call_autopkgtest(
> """
> # set up status AMQP exchange
> global amqp_con
> + ###########################################
done
> + global autopkgtest
> status_amqp = amqp_con.channel()
> status_amqp.access_request("/data", active=True, read=False, write=True)
> status_amqp.exchange_declare(
> @@ -585,6 +618,7 @@ def call_autopkgtest(
> )
>
> ret = autopkgtest.wait()
> + # looks like when we sigterm ret is -10, we can use this.
done
> send_status_info(
> status_amqp,
> release,
> @@ -628,6 +662,7 @@ def request(msg):
> for extra in list(systemd_logging_handler._extra.keys()):
> if extra.startswith("ADT_"):
> del systemd_logging_handler._extra[extra]
> + global out_dir
done
>
> # Re-read in case the big/long/no run lists changed, would be better to
> # this only when needed via inotify.
> @@ -1174,12 +1211,66 @@ def request(msg):
> elif code == 16 or code < 0:
> contents = log_contents(out_dir)
> if exit_requested is not None:
> - logging.warning(
> - "Testbed failure and exit %i requested. Log follows:",
> - exit_requested,
> - )
> - logging.error(contents)
> - sys.exit(exit_requested)
> + if exit_requested != 99:
> + logging.warning(
> + "Testbed failure and exit %i requested. Log follows:",
> + exit_requested,
> + )
> + logging.error(contents)
> + sys.exit(exit_requested)
> + else:
done
> + # okay, this works, but the server is still leftover :/
done
> + # we can use python-novaclient here to kill the server.
> + logging.info(
> + "Test has been killed by test-killer, exiting."
> + )
> + # for some reason, the novaclient doesn't work here. Need to figure out why.
done
> + running_test = False
> + msg.channel.basic_ack(msg.delivery_tag)
> + shutil.rmtree(out_dir)
> + # get server name
> + argv_iter = iter(argv)
> + while next(argv_iter) != "--name":
> + pass
> + openstack_server_name = next(argv_iter)
> + logging.info(
> + "Killing openstack server %s",
> + openstack_server_name,
> + )
> + if (
done
> + int(os.environ.get("OS_IDENTITY_API_VERSION"))
> + == 3
> + ):
> + auth = v3.Password(
> + auth_url=os.environ["OS_AUTH_URL"],
> + username=os.environ["OS_USERNAME"],
> + password=os.environ["OS_PASSWORD"],
> + project_name=os.environ["OS_PROJECT_NAME"],
> + user_domain_name=os.environ[
> + "OS_USER_DOMAIN_NAME"
> + ],
> + project_domain_name=os.environ[
> + "OS_PROJECT_DOMAIN_NAME"
> + ],
> + )
> + else:
> + auth = v2.Password(
> + auth_url=os.environ["OS_AUTH_URL"],
> + username=os.environ["OS_USERNAME"],
> + password=os.environ["OS_PASSWORD"],
> + tenant_name=os.environ["OS_TENANT_NAME"],
> + )
> + sess = session.Session(auth=auth)
> + nova = novaclient.client.Client(
> + "2",
> + session=sess,
> + region_name=os.environ["OS_REGION_NAME"],
> + )
> + for instance in nova.servers.list():
> + if instance.name == openstack_server_name:
> + instance.delete()
> + logging.info("Deleted %s", openstack_server_name)
> + return
> # Get the package-specific string for triggers too, since they might have broken the run
> trigs = [
> t.split("/", 1)[0] for t in params.get("triggers", [])
> @@ -1507,6 +1598,7 @@ def main():
>
> signal.signal(signal.SIGTERM, term_handler)
> signal.signal(signal.SIGHUP, hup_handler)
> + signal.signal(signal.SIGUSR1, usr1_handler)
done
>
> # load configuration
> cfg = configparser.ConfigParser(
> diff --git a/charms/focal/autopkgtest-cloud-worker/units/test-killer.service b/charms/focal/autopkgtest-cloud-worker/units/test-killer.service
> new file mode 100644
> index 0000000..3c5ed87
> --- /dev/null
> +++ b/charms/focal/autopkgtest-cloud-worker/units/test-killer.service
> @@ -0,0 +1,13 @@
> +[Unit]
> +Description=Test killer
> +StartLimitIntervalSec=60s
> +StartLimitBurst=60
done
> +
> +[Service]
> +User=ubuntu
> +ExecStart=/home/ubuntu/autopkgtest-cloud/tools/test-killer
> +Restart=on-failure
> +RestartSec=1s
> +
> +[Install]
> +WantedBy=autopkgtest.target
> diff --git a/charms/focal/autopkgtest-web/webcontrol/browse.cgi b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> index 309fb82..55b0a8d 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> +++ b/charms/focal/autopkgtest-web/webcontrol/browse.cgi
> @@ -573,7 +584,17 @@ def running():
> running_count = 0
> for pkg in packages:
> running_count += len(running_info[pkg].keys())
> -
> + #########################################
> + # NEED TO GO BACK N MAKE SURE THAT I AMEND ALL USES OF THIS
> + # modify the above.
> + show_stop = False
> + if session.get("nickname"):
> + # modify running_info
done
> + nick = session.get("nickname")
> + if nick in ADMIN_NICKS:
done
> + show_stop = True
> +
> + # maybe need to pass site url too?
done
> return render(
> "browse-running.html",
> releases=releases,
> diff --git a/charms/focal/autopkgtest-web/webcontrol/templates/macros.html b/charms/focal/autopkgtest-web/webcontrol/templates/macros.html
> index 941dc77..3556acb 100644
> --- a/charms/focal/autopkgtest-web/webcontrol/templates/macros.html
> +++ b/charms/focal/autopkgtest-web/webcontrol/templates/macros.html
> @@ -7,14 +7,19 @@
> <tr><th>Architecture:</th><td>{{ arch }}</td></tr>
> {% for param, v in params.items() %}
> {% if param == "requester" %}
> - <tr><th>{{ param|capitalize }}:</th><td><a href="https://launchpad.net/~{{ v }}">{{ v }}</a></td></tr>
> - {% elif param == "uuid" %}
> - <tr><th>{{ param|upper }}:</th><td>{{ v }}</td></tr>
> - {% else %}
> - <tr><th>{{ param|capitalize }}:</th><td>{{ v }}</td></tr>
> - {% endif %}
> + <tr><th>{{ param|capitalize }}:</th><td><a href="https://launchpad.net/~{{ v }}">{{ v }}</a></td></tr>
> + {% elif param == "uuid" %}
> + <tr><th>{{ param|upper }}:</th><td>{{ v }}</td></tr>
> + {% else %}
> + <tr><th>{{ param|capitalize }}:</th><td>{{ v }}</td></tr>
> + {% endif %}
> {% endfor %}
> <tr><th>Running for:</th><td>{{ duration//3600 }}h {{ duration % 3600//60 }}m {{ duration % 60 }}s ({{ duration }}s)</td></tr>
> + {% if "uuid" in params.keys() %}
done
> + {% if show_stop %}
> + <tr><td><a href="{{ base_url }}test-manager.cgi?uuid={{ params.get("uuid") }}">Stop this test</a></td></tr>
> + {% endif %}
> + {% endif %}
> </table>
> <pre>
> {{ logtail }}
> diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> new file mode 100644
> index 0000000..9bda8c6
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
> @@ -0,0 +1,205 @@
> +"""
done
> + Method doc embedded here for now because why not
> + Beware ye who read this code, this could all go so horribly wrong <3
> + test-manager is a new app in autopkgtest-web
> + It takes a request like dis:
> + {
> + "uuid": uuid
> + }
> + the requests will be generated on the running page, with a hyperlink / button much like the
> + retries fing.
> + peeps can also just manually run the post request if they want
> + The request first checks that the test of the specified uuid is actually running
> + If it is currently running, test-manager then places a request like this in the
> + stop-running.fanout exchange
> + {
> + "uuid": uuid,
> + "not-running-on": []
> + }
> + The queue message will then get picked up by a worker unit, running the
> + test-killer script as a systemd service
> + test-killer checks if the test with that uuid is running on that specific cloud worker
> + if it is, test-killer kills the test *
> + if it is not running on that specific cloud worker, test killer modifies the message like dis:
> + {
> + "uuid": uuid,
> + "not-running-on": [cloud-worker-hostname]
> + }
> + (and then any cloud worker that doesn't find it also adds their hostname to the list)
> + (test killer first checks this list and then sleeps so we don't check the same cloud worker twice)
> + test-killer acks the original message, and then places the new message, at the back of the queue
> + I know what you're thinking. Back of the queue? But there could be issues with that, what if the
> + test finishes before a test-killer systemd unit kills it? And to that I say:
> + queue iz smol , dusnt matta
> + test-killer should also ack the message if no cloud worker units find it (len(not-running-on) == 3)
> + Using one queue means we don't need to differentiate between cloud worker and lxd worker or
> + whatever
> + what if found on no cloud workers ?
> + - check queued.json. See if uuid in there
> + - if is in there, test likely looping. Sleep for some time and place message back into queue
> + - if not, the test finished already. ack the message and place back into queue.
> + * NEED TO ENSURE WORKER EXITS PROPERLY AND ACKS TEST REQUEST MESSAGE FROM QUEUE, instead of
> + placing it back
> +
> + NEEDS TESTING STILL OVVI!
> +"""
> +
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import urllib
> +from collections import OrderedDict
> +from html import escape as _escape
> +
> +import amqplib.client_0_8 as amqp
> +import flask
> +from flask import Flask, request, session # , redirect
> +from flask_openid import OpenID
> +from helpers.utils import get_all_releases, setup_key
> +from werkzeug.middleware.proxy_fix import ProxyFix
> +
> +ALL_UBUNTU_RELEASES = get_all_releases()
> +ADMIN_NICKS = [
> + "brian-murray",
> + "andersson123",
> + "paride",
> + "hyask",
> + "vorlon",
> + "sil2000",
> +]
> +RUNNING_FP = "/run/amqp-status-collector/running.json"
> +WRITER_EXCHANGE_NAME = "stop-running.fanout"
> +
> +
> +# can move to utils in the future or something. When we refactor.
> +# COPIED FUNCTIONS AND VARIABLES!
> +def maybe_escape(value):
> + """Escape the value if it is True-ish"""
> + return _escape(value) if value else value
> +
> +
> +def render(template, code=200, **kwargs):
> + # sort the values passed in, so that releases are in the right order
> + try:
> + release_arches = OrderedDict()
> + for k in sorted(
> + kwargs["release_arches"], key=ALL_UBUNTU_RELEASES.index
> + ):
> + release_arches[k] = kwargs["release_arches"][k]
> + kwargs["release_arches"] = release_arches
> + except KeyError:
> + pass
> + try:
> + kwargs["releases"] = sorted(
> + kwargs["releases"], key=ALL_UBUNTU_RELEASES.index
> + )
> + except KeyError:
> + pass
> + return (
> + flask.render_template(
> + template,
> + base_url=flask.url_for("index_root"),
> + static_url=flask.url_for("static", filename=""),
> + **kwargs,
> + ),
> + code,
> + )
> +
> +
> +HTML = """
done
> +<!doctype html>
> +<html>
> +<head>
> +<meta charset="utf-8">
> +<title>Autopkgtest Test Request</title>
> +</head>
> +<body>
> +{}
> +</body>
> +</html>
> +"""
> +
> +
> +def submit_to_queue(message):
> + amqp_con = amqp_connect()
> + complete_amqp = amqp_con.channel()
> + complete_amqp.access_request(
> + "/complete", active=True, read=False, write=True
> + )
> + complete_amqp.exchange_declare(
> + WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + complete_amqp.basic_publish(
> + amqp.Message(json.dumps(message), delivery_mode=2),
> + WRITER_EXCHANGE_NAME,
> + "",
> + )
> +
> +
> +def amqp_connect():
> + """Connect to AMQP server"""
> + cp = configparser.ConfigParser()
> + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> + amqp_uri = cp["amqp"]["uri"]
> + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> + amqp_con = amqp.Connection(
> + parts.hostname, userid=parts.username, password=parts.password
> + )
> + logging.info(
> + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> + )
> + return amqp_con
> +
> +
> +# Initialize app
done
> +PATH = os.path.join(
> + os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), "autopkgtest_webcontrol"
> +)
> +os.makedirs(PATH, exist_ok=True)
> +app = Flask("test_manager")
> +app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
> +# keep secret persistent between CGI invocations
> +secret_path = os.path.join(PATH, "secret_key")
> +setup_key(app, secret_path)
> +oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[])
> +
> +
> +@app.route("/", methods=["GET", "POST"])
> +def index_root():
> + """Handle stop test requests"""
> + session.permanent = True
doesn't need refactor I think
> + nick = maybe_escape(session.get("nickname"))
> + # amend me
> + if nick not in ADMIN_NICKS:
done
> + return
> + # check that nick is acceptable
> + params = {
I don't think refactor necessary
> + maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
> + }
> + base = ["uuid"]
> + if list(params.keys()) != base:
> + # all we need is the uuid tyvm
> + return
> + if not os.path.isfile(RUNNING_FP):
> + # return properly
> + return
> + running_data = {}
> + with open(RUNNING_FP, "r") as f:
> + running_data = json.load(f)
> + str_running_data = str(running_data)
done
> + if params["uuid"] not in str_running_data:
> + # test uuid doesn't correspond with any test listed as running in running.json
> + return
> + # now we submit to queue...
> + queue_message = {
> + "uuid": params["uuid"],
> + "not-running-on": [],
> + }
> + submit_to_queue(queue_message)
> + myhtml = "<p>Submitted %s to exchange %s</p>" % (
> + json.dumps(queue_message),
> + WRITER_EXCHANGE_NAME,
> + )
> + return HTML.format(myhtml), 200
--
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
References