← Back to team overview

canonical-ubuntu-qa team mailing list archive

Re: [Merge] ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master

 


Diff comments:

> diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results
> index e0a11b0..561e8d9 100755
> --- a/charms/focal/autopkgtest-web/webcontrol/download-all-results
> +++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results
> @@ -259,19 +266,14 @@ if __name__ == "__main__":
>  
>      config = configparser.ConfigParser()
>      config.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_con = amqp_connect()
>  
>      try:
>          for release in releases:
> -            db_con = init_db(config["web"]["database"])
> -            # check if env column exists
> -            # if not, create it
> -            try:
> -                c = db_con.cursor()
> -                c.execute("SELECT env FROM result")
> -            except sqlite3.OperationalError as e:
> -                if "no such column" in str(e):
> -                    c = db_con.cursor()
> -                    c.execute("ALTER TABLE result ADD COLUMN env TEXT")
> +            # explicitly connect as read only
> +            db_con = sqlite3.connect(

don't connect in loop

> +                "file:%s%s" % (config["web"]["database"], "?mode=ro"), uri=True
> +            )
>              fetch_container(
>                  release,
>                  os.path.join(
> diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> new file mode 100755
> index 0000000..b3fca6e
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> @@ -0,0 +1,148 @@
> +#!/usr/bin/python3
> +
> +import configparser
> +import datetime
> +import json
> +import logging
> +import os
> +import socket
> +import sqlite3
> +import urllib.parse
> +

remove global variables

> +import amqplib.client_0_8 as amqp
> +from helpers.utils import init_db
> +
> +EXCHANGE_NAME = "sqlite-write-me.fanout"

refactor into utils so not duplicated in two scripts

> +LAST_CHECKPOINT = datetime.datetime.now()
> +CHECKPOINT_MINUTES = 5
> +
> +config = None
> +db_con = None
> +
> +INSERT_INTO_KEYS = [
> +    "test_id",
> +    "run_id",
> +    "version",
> +    "triggers",
> +    "duration",
> +    "exitcode",
> +    "requester",
> +    "env",
> +    "uuid",
> +]
> +
> +
> +def amqp_connect():
> +    """Connect to AMQP server"""
> +
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_uri = cp["amqp"]["uri"]
> +    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> +    amqp_con = amqp.Connection(
> +        parts.hostname, userid=parts.username, password=parts.password
> +    )
> +    logging.info(
> +        "Connected to AMQP server at %s@%s", parts.username, parts.hostname
> +    )
> +
> +    return amqp_con
> +
> +
> +def db_connect():
> +    """Connect to SQLite DB"""
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +
> +    db_con = init_db(cp["web"]["database"])
> +
> +    return db_con
> +
> +
> +def check_msg(queue_msg):
> +    required_keys = set(

use INSERT_INTO_KEYS

> +        [
> +            "test_id",
> +            "run_id",
> +            "version",
> +            "triggers",
> +            "duration",
> +            "exitcode",
> +            "requester",
> +            "env",
> +            "uuid",
> +        ]
> +    )
> +    queue_keys = set(queue_msg.keys())
> +    if required_keys == queue_keys:
> +        return True
> +    return False
> +
> +
> +def process_message(msg, db_con):

add docstring

> +    # We want to ack and re-send messages if insert fails?
> +    global LAST_CHECKPOINT
> +    body = msg.body
> +    if isinstance(body, bytes):
> +        body = body.decode("UTF-8", errors="replace")
> +    info = json.loads(body)
> +    logging.info("Message is: \n%s" % json.dumps(info, indent=2))
> +    if not check_msg(info):
> +        logging.error(
> +            "Message has incorrect keys! Ignoring\n%s"
> +            % json.dumps(info, indent=2)
> +        )
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +    # insert into db
> +    sqlite3.paramstyle = "named"

this needs to be moved to module level after the import of sqlite

> +    with db_con:
> +        c = db_con.cursor()
> +        # change this to column names
> +        c.execute(
> +            (
> +                "INSERT INTO result(test_id, run_id, version, triggers, duration, "
> +                "exitcode, requester, env, uuid) VALUES (:test_id, :run_id, "
> +                ":version, :triggers, :duration, :exitcode, :requester, :env, :uuid)"
> +            ),
> +            {
> +                "test_id": info["test_id"],
> +                "run_id": info["run_id"],
> +                "version": info["version"],
> +                "triggers": info["triggers"],
> +                "duration": info["duration"],
> +                "exitcode": info["exitcode"],
> +                "requester": info["requester"],
> +                "env": info["env"],
> +                "uuid": info["uuid"],
> +            },
> +        )
> +        now = datetime.datetime.now()

move checkpoint to separate transaction, add another with db_con:

> +        if (now - LAST_CHECKPOINT) > datetime.timedelta(
> +            minutes=CHECKPOINT_MINUTES
> +        ):
> +            c.execute("PRAGMA wal_checkpoint(TRUNCATE)")

see if checkpointing can be made part of sqlite config, or move it out of this function somehow - and have separate functions to have clear semantics

> +            LAST_CHECKPOINT = now
> +    logging.info("Inserted the following entry into the db:\n%s" % body)
> +
> +    msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":

refactor into main() function

> +    logging.basicConfig(level=logging.INFO)
> +    db_con = db_connect()
> +    amqp_con = amqp_connect()
> +    status_ch = amqp_con.channel()
> +    status_ch.access_request("/complete", active=True, read=True, write=False)
> +    status_ch.exchange_declare(
> +        EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    queue_name = "sqlite-writer-listener-%s" % socket.getfqdn()
> +    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> +    status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name)
> +    logging.info("Listening to requests on %s" % queue_name)
> +    status_ch.basic_consume(
> +        "", callback=lambda msg: process_message(msg, db_con)
> +    )
> +    while status_ch.callbacks:
> +        status_ch.wait()


-- 
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/460847
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master.



References