canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03190
[Merge] ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master
Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
Requested reviews:
Canonical's Ubuntu QA (canonical-ubuntu-qa)
For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461654
--
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:stop-tests-from-webpage into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
new file mode 100644
index 0000000..a0dab6a
--- /dev/null
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/tools/test-killer
@@ -0,0 +1,154 @@
+#!/usr/bin/python3
+"""Kills running tests."""
+
+import configparser
+import json
+import logging
+import subprocess
+import time
+import os
+import socket
+import sqlite3
+import urllib.parse
+
+import amqplib.client_0_8 as amqp
+from helpers.utils import init_db
+
+
+WRITER_EXCHANGE_NAME = "stop-running.fanout"
+# needs rabbitmq.cred
+RABBIT_CREDS = "/home/ubuntu/rabbitmq.cred"
+MSG_ONLY_KEYS = [
+ "uuid",
+ "not-running-on",
+]
+
+RABBIT_CFG = configparser.ConfigParser()
+with open(RABBIT_CREDS, "r") as f:
+ RABBIT_CFG.read_string("[rabbit]\n" + f.read())
+
+
+
+def amqp_connect():
+ amqp_con = amqp.Connection(
+ RABBIT_CFG["rabbit"]["RABBIT_HOST"],
+ userid=RABBIT_CFG["rabbit"]["RABBIT_USER"],
+ password=RABBIT_CFG["rabbit"]["RABBIT_PASSWORD"],
+ confirm_publish=True,
+ )
+ return amqp_con
+
+
+def check_message(msg):
+ return list(msg.keys()) == MSG_ONLY_KEYS
+
+
+def get_test_pid(uuid):
+ # ps aux | grep runner | grep uuid
+ ps_aux = "ps aux"
+ grep_runner = "grep runner"
+ grep_for_uuid = "grep " + uuid
+ try:
+ ps_aux_run = subprocess.run(
+ ps_aux,
+ stdout=subprocess.PIPE,
+ check=True
+ )
+ runner_run = subprocess.run(
+ grep_runner,
+ stdin=ps_aux_run.stdout,
+ stdout=subprocess.PIPE,
+ check=True,
+ )
+ # If this one fails, the test isn't running on this worker
+ uuid_run = subprocess.run(
+ grep_for_uuid,
+ stdin=runner_run.stdout,
+ capture_output=True,
+ check=True
+ )
+ except subprocess.CalledProcessError as _:
+ return None
+ search_for_test_output = uuid_run.stdout
+ search_me = search_for_test_output.splitlines()
+ for line in search_me:
+ if uuid in line:
+ line = line.split(" ")
+ line = [x for x in line if x]
+ pid = line[1]
+ return int(pid)
+
+
+def place_new_message_in_queue(info: dict, amqp_con: amqp.Connection):
+ complete_amqp = amqp_con.channel()
+ complete_amqp.access_request(
+ "/complete", active=True, read=False, write=True
+ )
+ complete_amqp.exchange_declare(
+ WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+ )
+ complete_amqp.basic_publish(
+ amqp.Message(json.dumps(info), delivery_mode=2),
+ WRITER_EXCHANGE_NAME,
+ ""
+ )
+
+
+def kill_process(pid: int):
+ kill_cmd = "kill %i -15" % pid
+ _ = subprocess.run(
+ kill_cmd.split(" "),
+ check=True,
+ )
+
+
+def process_message(msg, amqp_con):
+ body = msg.body
+ if isinstance(body, bytes):
+ body = body.decode("UTF-8", errors="replace")
+ info = json.loads(body)
+ logging.info("Message is: \n%s", json.dumps(info, indent=2))
+ if not check_message(info):
+ logging.error("Message %s is invalid. Ignoring.", json.dumps(info, indent=2))
+ # removes from queue!
+ msg.channel.basic_reject(msg.delivery_tag)
+ return
+ if socket.getfqdn() in info["not-running-on"]:
+ # places back into queue as is
+ msg.channel.basic_nack(msg.delivery_tag)
+ time.sleep(30) # <- needs some thought
+ pid = get_test_pid(info["uuid"])
+ if pid is None:
+ info["not-running-on"].append(socket.getfqdn())
+ place_new_message_in_queue(info, amqp_con)
+ # removes from queue without saying it failed or anything
+ msg.channel.basic_ack(msg.delivery_tag)
+ kill_process(pid)
+ while get_test_pid(info["uuid"]) is not None:
+ time.sleep(3)
+ # grats, we killed the test, we shilling
+ msg.channel.basic_ack(msg.delivery_tag)
+ # all left to do is make sure worker handles the kill command properly.
+
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.INFO)
+ amqp_con = amqp_connect()
+ status_ch = amqp_con.channel()
+ status_ch.access_request("/complete", active=True, read=True, write=True)
+ # need to go back and check the functionality of durable=True and auto_delete=False
+ status_ch.exchange_declare(
+ WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+ )
+ queue_name = "tests-to-kill"
+ status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
+ status_ch.queue_bind(queue_name, WRITER_EXCHANGE_NAME, queue_name)
+ logging.info("Listening to requests on %s", queue_name)
+ status_ch.basic_consume(
+ "", callback=lambda msg: process_message(msg, amqp_con)
+ )
+ while status_ch.callbacks:
+ status_ch.wait()
+
+
+ pass
\ No newline at end of file
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index 9ff2ff1..147a331 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -219,8 +219,16 @@ def getglob(d, glob, default=None):
return default
+# signal.
+# https://www.ibm.com/docs/en/aix/7.2?topic=management-process-termination
def term_handler(signum, frame):
+ # kill -15 sends TERM
+ # so we gotta modify this to kill test immediately
+ # well, it just has to basic_ack the message
+ # need to modify this to terminate the test immediately?
+ # and make sure the amqp message is ack'd so it doesn't go back into queue
+ # maybe it needs to be hup_handler instead
"""SIGTERM handler, for clean exit after current test"""
logging.info("Caught SIGTERM, requesting exit")
diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/__init__.py
diff --git a/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
new file mode 100644
index 0000000..45a9d8d
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/test_manager/app.py
@@ -0,0 +1,137 @@
+"""
+This'll be for stopping already running tests
+workflow is
+running page will show a stop test option to privileged users OR the requester of that specific test
+clicking that button will redirect to
+a.u.c/test-manager.cgi?uuid=$uuid
+test_manager module submits message with just uuid to rabbitmq queue - perhaps called kill-these-tests
+systemd service on worker - test-killer or something
+absorbs rabbitmq messages
+kills the test with uuid specified
+removes uuid item from queue when done
+need to make sure when worker exits, it ack's the message so the test doesn't go back into the queue.
+ezpz
+"""
+
+
+from flask import Flask, redirect, request, session
+from flask_openid import OpenID
+from html import escape as _escape
+import os
+import json
+import configparser
+import logging
+import urllib
+from werkzeug.middleware.proxy_fix import ProxyFix
+import amqplib.client_0_8 as amqp
+
+
+from helpers.utils import setup_key
+
+RUNNING_FP = "/run/amqp-status-collector/running.json"
+WRITER_EXCHANGE_NAME = "stop-running.fanout"
+
+
+def maybe_escape(value):
+ """Escape the value if it is True-ish"""
+ return _escape(value) if value else value
+
+
+def submit_to_queue(message):
+ amqp_con = amqp_connect()
+ complete_amqp = amqp_con.channel()
+ complete_amqp.access_request(
+ "/complete", active=True, read=False, write=True
+ )
+ complete_amqp.exchange_declare(
+ WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+ )
+ complete_amqp.basic_publish(
+ amqp.Message(json.dumps(message), delivery_mode=2),
+ WRITER_EXCHANGE_NAME,
+ "",
+ )
+
+
+def amqp_connect():
+ """Connect to AMQP server"""
+ cp = configparser.ConfigParser()
+ cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+ amqp_uri = cp["amqp"]["uri"]
+ parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
+ amqp_con = amqp.Connection(
+ parts.hostname, userid=parts.username, password=parts.password
+ )
+ logging.info(
+ "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
+ )
+ return amqp_con
+
+
+# Initialize app
+PATH = os.path.join(
+ os.path.sep, os.getenv("XDG_RUNTIME_DIR", "/run"), "autopkgtest_webcontrol"
+)
+os.makedirs(PATH, exist_ok=True)
+app = Flask("request")
+app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
+# keep secret persistent between CGI invocations
+secret_path = os.path.join(PATH, "secret_key")
+setup_key(app, secret_path)
+oid = OpenID(app, os.path.join(PATH, "openid"), safe_roots=[])
+
+
+@app.route("/", methods=["POST"])
+def index_root():
+ """Handle stop test requests"""
+ session.permanent = True
+ nick = maybe_escape(session.get("nickname"))
+ params = {
+ maybe_escape(k): maybe_escape(v) for k, v in request.args.items()
+ }
+ base = ["uuid"]
+ if list(params.keys()) != base:
+ # all we need is the uuid tyvm
+ return
+ if not os.path.isfile(RUNNING_FP):
+ return
+ running_data = {}
+ with open(RUNNING_FP, "r") as f:
+ running_data = json.load(f)
+ str_running_data = str(running_data)
+ if params["uuid"] not in str_running_data:
+ # test uuid doesn't correspond with any test listed as running in running.json
+ return
+ # now we submit to queue...
+ queue_message = {
+ "uuid": params["uuid"],
+ "not-running-on": [],
+ }
+ submit_to_queue(queue_message)
+ # need to add return here with message to the user
+
+
+ # running_info =
+ # check args
+ # check uuid is valid?
+ # check uuid is on running page? acc in running.json
+ # then submit to deleter queue
+ # right?
+ # what should the queue name be?
+ # 2 cloud workers, 2 web workers ...
+ # need to figure out ...
+ # gotta be one queue, as it could be either cloud worker
+ # maybe some tags?
+ # separate queue for cloud and lxd worker
+ # check arch - if armhf, separate queue for lxd worker.
+ # maybe don't need to check, and can have one queue
+ # add hostname to tried...
+ # message goes to cloud worker without the test, append hostname to not-running-on if
+ # the test isn't running on that cloud worker.
+ # place the message back in the queue
+ # sleep a bit? wait for other cloud worker to get the message?
+ # if hostname in "not-running-on" just place message back in the queue and sleep.
+ # {
+ # "uuid": str,
+ # "not-running-on": [""]
+ # }
References