← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~andersson123/autopkgtest-cloud:d-a-r-make-me-faster into autopkgtest-cloud:master

 

Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:d-a-r-make-me-faster into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/461146
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:d-a-r-make-me-faster into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/sqlite-writer.service b/charms/focal/autopkgtest-web/units/sqlite-writer.service
new file mode 100644
index 0000000..3a47c08
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/sqlite-writer.service
@@ -0,0 +1,13 @@
+[Unit]
+Description=Write test results to db
+StartLimitIntervalSec=60s
+StartLimitBurst=60
+
+[Service]
+User=ubuntu
+ExecStart=/home/ubuntu/webcontrol/sqlite-writer
+Restart=on-failure
+RestartSec=1s
+
+[Install]
+WantedBy=autopkgtest-web.target
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results
index 1af7918..485f6bd 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-all-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results
@@ -11,73 +11,58 @@
 # script can be used to find any results which were missed and insert them.
 
 import configparser
-import http
 import io
+import itertools
 import json
 import logging
 import os
-import random
 import sqlite3
-import sys
 import tarfile
-import time
 import urllib.parse
-from urllib.request import urlopen
 
+import amqplib.client_0_8 as amqp
+import swiftclient
 from distro_info import UbuntuDistroInfo
 from helpers.utils import get_test_id, init_db
 
 LOGGER = logging.getLogger(__name__)
+WRITER_EXCHANGE_NAME = "sqlite-write-me.fanout"
+SWIFT_CREDS_FILE = "/home/ubuntu/public-swift-creds"
 
 config = None
 db_con = None
+amqp_con = None
 
 
-def list_remote_container(container_url):
-    LOGGER.debug("Listing container %s", container_url)
-    out = []
+def amqp_connect():
+    """Connect to AMQP server"""
 
-    def get_batch(start=None):
-        url = f"{container_url}/?format=json"
-        if start is not None:
-            url += f"&marker={urllib.parse.quote(start)}"
-
-        LOGGER.debug('Retrieving "%s"', url)
-        for _ in range(5):
-            try:
-                resp = urlopen(url)
-            except http.client.RemoteDisconnected:
-                LOGGER.debug("Got disconnected, sleeping")
-                time.sleep(5)
-                continue
-            else:
-                break
-        json_string = resp.read()
-        json_data = json.loads(json_string)
-
-        if not json_data:
-            return None
-
-        out.extend([e["name"] for e in json_data])
-        name = out[-1]
-
-        return name
+    cp = configparser.ConfigParser()
+    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+    amqp_uri = cp["amqp"]["uri"]
+    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
+    amqp_con = amqp.Connection(
+        parts.hostname, userid=parts.username, password=parts.password
+    )
+    logging.info(
+        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
+    )
 
-    marker = get_batch()
+    return amqp_con
 
-    while True:
-        new_marker = get_batch(marker)
-        if not new_marker or new_marker == marker:
-            break
-        marker = new_marker
 
-    out = [name for name in out if name.endswith("result.tar")]
-    LOGGER.debug("Found %d items in %s", len(out), container_url)
-    ret = {}
-    for r in out:
-        (_, _, _, _, run_id, _) = r.split("/")
-        ret[run_id] = r
-    return ret
+# def list_remote_container(container_url):
+def list_remote_container(container_name, swift_conn):
+    LOGGER.debug("Listing container %s", container_name)
+    _, object_list = swift_conn.get_container(
+        container_name, full_listing=True
+    )
+    ret_me = {}
+    for obj in object_list:
+        if "result.tar" in obj["name"]:
+            obj_splitname = obj["name"].split("/")
+            ret_me[obj_splitname[4]] = obj["name"]
+    return ret_me
 
 
 def list_our_results(release):
@@ -91,29 +76,16 @@ def list_our_results(release):
     return {run_id for (run_id,) in c.fetchall()}
 
 
-def fetch_one_result(url):
+def fetch_one_result(container_name, object_name, swift_conn):
     """Download one result URL from swift and add it to the DB"""
-    (release, arch, _, src, run_id, _) = url.split("/")[-6:]
+    # modify this to use swiftclient too.
+    # use public-swift-creds
+    (release, arch, _, src, run_id, _) = object_name.split("/")
     test_id = get_test_id(db_con, release, arch, src)
-
-    try:
-        f = urlopen(url, timeout=30)
-        if f.getcode() == 200:
-            tar_bytes = io.BytesIO(f.read())
-            f.close()
-        else:
-            raise NotImplementedError(
-                "fetch_one_result(%s): cannot handle HTTP code %i"
-                % (url, f.getcode())
-            )
-    except IOError as e:
-        LOGGER.error("Failure to fetch %s: %s", url, str(e))
-        # we tolerate "not found" (something went wrong on uploading the
-        # result), but other things indicate infrastructure problems
-        if hasattr(e, "code") and e.code == 404:  # pylint: disable=no-member
-            return
-        sys.exit(1)
-
+    # modify this to use swiftclient instead of urllib
+    # look at update-github-jobs for help
+    _, contents = swift_conn.get_object(container_name, object_name)
+    tar_bytes = io.BytesIO(contents)
     try:
         with tarfile.open(None, "r", tar_bytes) as tar:
             exitcode = int(tar.extractfile("exitcode").read().strip())
@@ -128,14 +100,12 @@ def fetch_one_result(url):
                     srcver = "%s unknown" % (src)
                 else:
                     raise
-            (ressrc, ver) = srcver.split()
+            (_, ver) = srcver.split()
             testinfo = json.loads(
                 tar.extractfile("testinfo.json").read().decode()
             )
             test_uuid = testinfo.get("uuid", "")
             duration = int(tar.extractfile("duration").read().strip())
-            # KeyError means the file is not there, i.e. there isn't a human
-            # requester
             try:
                 requester = (
                     tar.extractfile("requester").read().decode().strip()
@@ -143,16 +113,7 @@ def fetch_one_result(url):
             except KeyError:
                 requester = ""
     except (KeyError, ValueError, tarfile.TarError) as e:
-        LOGGER.debug("%s is damaged, ignoring: %s", url, str(e))
-        return
-
-    if src != ressrc:
-        LOGGER.error(
-            "%s is a result for package %s, but expected package %s",
-            url,
-            ressrc,
-            src,
-        )
+        LOGGER.debug("%s is damaged, ignoring: %s" % (object_name, str(e)))
         return
 
     # parse recorded triggers in test result
@@ -161,7 +122,9 @@ def fetch_one_result(url):
             test_triggers = e.split("=", 1)[1]
             break
     else:
-        LOGGER.error("%s result has no ADT_TEST_TRIGGERS, ignoring", url)
+        LOGGER.error(
+            "%s result has no ADT_TEST_TRIGGERS, ignoring", object_name
+        )
         return
 
     LOGGER.debug(
@@ -182,63 +145,56 @@ def fetch_one_result(url):
         if env in testinfo.keys():
             env_vars.append(spec)
 
-    while True:
-        try:
-            with (
-                db_con
-            ):  # this starts a transaction, making sure we release the lock at the end
-                c = db_con.cursor()
-                c.execute(
-                    "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
-                    (
-                        test_id,
-                        run_id,
-                        ver,
-                        test_triggers,
-                        duration,
-                        exitcode,
-                        requester,
-                        ",".join(env_vars),
-                        test_uuid,
-                    ),
-                )
-                db_con.commit()
-            break
-        except sqlite3.OperationalError as e:
-            if "database is locked" in str(e):
-                sleep_time = random.uniform(0.1, 2)
-                LOGGER.info(
-                    "database is currently locked, waiting %f seconds and trying again..."
-                    % sleep_time
-                )
-                time.sleep(sleep_time)
-            else:
-                logging.info("insert operation failed with: %s" % str(e))
-                break
-        except sqlite3.IntegrityError:
-            LOGGER.info("%s was already recorded - skipping", run_id)
-            break
+    # Insert the write request into the queue
+    complete_amqp = amqp_con.channel()
+    complete_amqp.access_request(
+        "/complete", active=True, read=False, write=True
+    )
+    complete_amqp.exchange_declare(
+        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    write_me_msg = {
+        "test_id": test_id,
+        "run_id": run_id,
+        "version": ver,
+        "triggers": test_triggers,
+        "duration": duration,
+        "exitcode": exitcode,
+        "requester": requester,
+        "env": ",".join(env_vars),
+        "uuid": test_uuid,
+    }
+    complete_amqp.basic_publish(
+        amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+        WRITER_EXCHANGE_NAME,
+        "",
+    )
 
 
-def fetch_container(release, container_url):
+def fetch_container(release, swift_conn):
     """Download new results from a swift container"""
+    container_name = "autopkgtest-" + release
 
     try:
         our_results = list_our_results(release)
-        known_results = list_remote_container(container_url)
+        known_results = list_remote_container(container_name, swift_conn)
 
+        # the keys WERE the run_id, so need to fix dis
         need_to_fetch = set(known_results.keys()) - our_results
 
         LOGGER.debug("Need to download %d items", len(need_to_fetch))
 
         for run_id in need_to_fetch:
             fetch_one_result(
-                os.path.join(container_url, known_results[run_id])
+                container_name=container_name,
+                object_name=known_results[run_id],
+                swift_conn=swift_conn,
             )
-    except urllib.error.HTTPError as e:
-        if e.code == 401 or e.code == 404:
-            LOGGER.warning(f"Couldn't access {container_url} - doesn't exist?")
-            return
+    except swiftclient.ClientException as e:
+        LOGGER.warning(
+            "Something went wrong accessing container %s\nTraceback: %s"
+            % (container_name, str(e))
+        )
         raise
 
 
@@ -260,6 +216,31 @@ if __name__ == "__main__":
 
     config = configparser.ConfigParser()
     config.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+    amqp_con = amqp_connect()
+
+    swift_cfg = configparser.ConfigParser()
+
+    with open(SWIFT_CREDS_FILE) as fp:
+        swift_cfg.read_file(
+            itertools.chain(["[swift]"], fp), source=SWIFT_CREDS_FILE
+        )
+
+    # change this to use configparser
+    swift_creds = {
+        "authurl": swift_cfg["swift"]["OS_AUTH_URL"],
+        "user": swift_cfg["swift"]["OS_USERNAME"],
+        "key": swift_cfg["swift"]["OS_PASSWORD"],
+        "os_options": {
+            "region_name": swift_cfg["swift"]["OS_REGION_NAME"],
+            "project_domain_name": swift_cfg["swift"][
+                "OS_PROJECT_DOMAIN_NAME"
+            ],
+            "project_name": swift_cfg["swift"]["OS_PROJECT_NAME"],
+            "user_domain_name": swift_cfg["swift"]["OS_USER_DOMAIN_NAME"],
+        },
+        "auth_version": 3,
+    }
+    swift_conn = swiftclient.Connection(**swift_creds)
 
     try:
         for release in releases:
@@ -275,9 +256,7 @@ if __name__ == "__main__":
                     c.execute("ALTER TABLE result ADD COLUMN env TEXT")
             fetch_container(
                 release,
-                os.path.join(
-                    config["web"]["SwiftURL"], f"autopkgtest-{release}"
-                ),
+                swift_conn=swift_conn,
             )
     finally:
         if db_con:
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results
index e71d4a0..4b9b11e 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-results
@@ -4,16 +4,15 @@ import configparser
 import json
 import logging
 import os
-import random
 import socket
 import sqlite3
-import time
 import urllib.parse
 
 import amqplib.client_0_8 as amqp
 from helpers.utils import get_test_id, init_db
 
 EXCHANGE_NAME = "testcomplete.fanout"
+WRITER_EXCHANGE_NAME = "sqlite-write-me.fanout"
 
 
 def amqp_connect():
@@ -83,43 +82,30 @@ def process_message(msg, db_con):
         return
 
     test_id = get_test_id(db_con, release, arch, package)
-
-    while True:
-        try:
-            with (
-                db_con
-            ):  # this starts a transaction, making sure we release the lock at the end
-                c = db_con.cursor()
-                c.execute(
-                    "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
-                    (
-                        test_id,
-                        run_id,
-                        version,
-                        triggers,
-                        duration,
-                        exitcode,
-                        requester,
-                        info.get("env", ""),
-                        test_uuid,
-                    ),
-                )
-                db_con.commit()
-            break
-        except sqlite3.OperationalError as e:
-            if "database is locked" in str(e):
-                sleep_time = random.uniform(0.1, 2)
-                logging.info(
-                    "database is currently locked, waiting %f seconds and trying again..."
-                    % sleep_time
-                )
-                time.sleep(sleep_time)
-            else:
-                logging.info("insert operation failed with: %s" % str(e))
-                break
-        except sqlite3.IntegrityError:
-            logging.info("...which was already recorded - skipping")
-            break
+    # add to queue instead of writing to db
+    complete_amqp = amqp_con.channel()
+    complete_amqp.access_request(
+        "/complete", active=True, read=False, write=True
+    )
+    complete_amqp.exchange_declare(
+        WRITER_EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    write_me_msg = {
+        "test_id": test_id,
+        "run_id": run_id,
+        "version": version,
+        "triggers": triggers,
+        "duration": duration,
+        "exitcode": exitcode,
+        "requester": requester,
+        "env": info.get("env", ""),
+        "uuid": test_uuid,
+    }
+    complete_amqp.basic_publish(
+        amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+        WRITER_EXCHANGE_NAME,
+        "",
+    )
 
     msg.channel.basic_ack(msg.delivery_tag)
 
diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
new file mode 100755
index 0000000..4ba8bdf
--- /dev/null
+++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
@@ -0,0 +1,138 @@
+#!/usr/bin/python3
+
+import configparser
+import json
+import logging
+import os
+import socket
+import sqlite3
+import urllib.parse
+
+import amqplib.client_0_8 as amqp
+from helpers.utils import init_db
+
+EXCHANGE_NAME = "sqlite-write-me.fanout"
+
+config = None
+db_con = None
+
+INSERT_INTO_KEYS = [
+    "test_id",
+    "run_id",
+    "version",
+    "triggers",
+    "duration",
+    "exitcode",
+    "requester",
+    "env",
+    "uuid",
+]
+
+
+def amqp_connect():
+    """Connect to AMQP server"""
+
+    cp = configparser.ConfigParser()
+    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+    amqp_uri = cp["amqp"]["uri"]
+    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
+    amqp_con = amqp.Connection(
+        parts.hostname, userid=parts.username, password=parts.password
+    )
+    logging.info(
+        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
+    )
+
+    return amqp_con
+
+
+def db_connect():
+    """Connect to SQLite DB"""
+    cp = configparser.ConfigParser()
+    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
+
+    db_con = init_db(cp["web"]["database"])
+
+    return db_con
+
+
+def check_msg(queue_msg):
+    required_keys = set(
+        [
+            "test_id",
+            "run_id",
+            "version",
+            "triggers",
+            "duration",
+            "exitcode",
+            "requester",
+            "env",
+            "uuid",
+        ]
+    )
+    queue_keys = set(queue_msg.keys())
+    if required_keys == queue_keys:
+        return True
+    return False
+
+
+def process_message(msg, db_con):
+    # We want to ack and re-send messages if insert fails?
+    body = msg.body
+    if isinstance(body, bytes):
+        body = body.decode("UTF-8", errors="replace")
+    info = json.loads(body)
+    logging.info("Message is: \n%s" % json.dumps(info, indent=2))
+    if not check_msg(info):
+        logging.error(
+            "Message has incorrect keys! Ignoring\n%s"
+            % json.dumps(info, indent=2)
+        )
+        msg.channel.basic_ack(msg.delivery_tag)
+        return
+    # insert into db
+    sqlite3.paramstyle = "named"
+    with db_con:
+        c = db_con.cursor()
+        # change this to column names
+        c.execute(
+            (
+                "INSERT INTO result(test_id, run_id, version, triggers, duration, "
+                "exitcode, requester, env, uuid) VALUES (:test_id, :run_id, "
+                ":version, :triggers, :duration, :exitcode, :requester, :env, :uuid)"
+            ),
+            {
+                "test_id": info["test_id"],
+                "run_id": info["run_id"],
+                "version": info["version"],
+                "triggers": info["triggers"],
+                "duration": info["duration"],
+                "exitcode": info["exitcode"],
+                "requester": info["requester"],
+                "env": info["env"],
+                "uuid": info["uuid"],
+            },
+        )
+    logging.info("Inserted the following entry into the db:\n%s" % body)
+
+    msg.channel.basic_ack(msg.delivery_tag)
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=logging.INFO)
+    db_con = db_connect()
+    amqp_con = amqp_connect()
+    status_ch = amqp_con.channel()
+    status_ch.access_request("/complete", active=True, read=True, write=False)
+    status_ch.exchange_declare(
+        EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
+    )
+    queue_name = "sqlite-writer-listener-%s" % socket.getfqdn()
+    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
+    status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name)
+    logging.info("Listening to requests on %s" % queue_name)
+    status_ch.basic_consume(
+        "", callback=lambda msg: process_message(msg, db_con)
+    )
+    while status_ch.callbacks:
+        status_ch.wait()

Follow ups