← Back to team overview

canonical-ubuntu-qa team mailing list archive

Re: [Merge] ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master

 

Review: Needs Fixing

Sorry for the delay!

> also just FYI - not ack'ing a message in the queue listener callback WILL cause the queue item to stay in the queue, so it is an option in callback functions to leave the message in the queue if you choose not to do something with it :)

Yeah, I suspected not ACKing would leave it in the queue. My main concern was whether the AMQP protocol permitted further messages to be received after refusing to ACK one (i.e. whether it's strictly an in-order protocol like, say, REQ queues in ZeroMQ).

Left some more inline comments

Diff comments:

> diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> new file mode 100755
> index 0000000..4ba8bdf
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> @@ -0,0 +1,138 @@
> +#!/usr/bin/python3
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import socket
> +import sqlite3
> +import urllib.parse
> +
> +import amqplib.client_0_8 as amqp
> +from helpers.utils import init_db
> +
> +EXCHANGE_NAME = "sqlite-write-me.fanout"
> +
> +config = None
> +db_con = None
> +
> +INSERT_INTO_KEYS = [
> +    "test_id",
> +    "run_id",
> +    "version",
> +    "triggers",
> +    "duration",
> +    "exitcode",
> +    "requester",
> +    "env",
> +    "uuid",
> +]
> +
> +
> +def amqp_connect():
> +    """Connect to AMQP server"""
> +
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +    amqp_uri = cp["amqp"]["uri"]
> +    parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> +    amqp_con = amqp.Connection(
> +        parts.hostname, userid=parts.username, password=parts.password
> +    )
> +    logging.info(
> +        "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)

Don't use % formatting in logging terms; just add the items as extra arguments. This prevents the logging infrastructure from actually performing interpolation if the log-filtering suppresses the message (it's one of the things pylint whinges about, correctly in this case). For example, here I'd use:

logging.info("Connected to AMQP server at %s@%s", parts.username, parts.hostname)

> +    )
> +
> +    return amqp_con
> +
> +
> +def db_connect():
> +    """Connect to SQLite DB"""
> +    cp = configparser.ConfigParser()
> +    cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +
> +    db_con = init_db(cp["web"]["database"])
> +
> +    return db_con
> +
> +
> +def check_msg(queue_msg):
> +    required_keys = set(

Isn't this just set(INSERT_INTO_KEYS)? Also, if it isn't, there's no need to do set(['foo', 'bar', 'baz']) -- you can just do {'foo', 'bar', 'baz'} which is a set literal (deliberately similar syntax to a dict but without colons)

> +        [
> +            "test_id",
> +            "run_id",
> +            "version",
> +            "triggers",
> +            "duration",
> +            "exitcode",
> +            "requester",
> +            "env",
> +            "uuid",
> +        ]
> +    )
> +    queue_keys = set(queue_msg.keys())
> +    if required_keys == queue_keys:
> +        return True
> +    return False
> +
> +
> +def process_message(msg, db_con):
> +    # We want to ack and re-send messages if insert fails?
> +    body = msg.body
> +    if isinstance(body, bytes):
> +        body = body.decode("UTF-8", errors="replace")
> +    info = json.loads(body)
> +    logging.info("Message is: \n%s" % json.dumps(info, indent=2))

Another place where pylint'll whinge about %-interpolation vs parameters.

> +    if not check_msg(info):
> +        logging.error(
> +            "Message has incorrect keys! Ignoring\n%s"
> +            % json.dumps(info, indent=2)

Another place where pylint'll whinge about %-interpolation vs parameters.

> +        )
> +        msg.channel.basic_ack(msg.delivery_tag)
> +        return
> +    # insert into db
> +    sqlite3.paramstyle = "named"

paramstyle is a module-level constant, so this should be set once after importing the module. And yes, this is a *horrid* design because it obviously affects every single user of the module in the process ... but that's the Python DB-API.

> +    with db_con:
> +        c = db_con.cursor()
> +        # change this to column names
> +        c.execute(
> +            (
> +                "INSERT INTO result(test_id, run_id, version, triggers, duration, "
> +                "exitcode, requester, env, uuid) VALUES (:test_id, :run_id, "
> +                ":version, :triggers, :duration, :exitcode, :requester, :env, :uuid)"
> +            ),
> +            {

This dict is just "info", right? So instead of re-defining it, just pass info (and yes, this was one of my sneaky intents in using the named paramstyle -- knowing you already had a dict of the fields you wanted :)

> +                "test_id": info["test_id"],
> +                "run_id": info["run_id"],
> +                "version": info["version"],
> +                "triggers": info["triggers"],
> +                "duration": info["duration"],
> +                "exitcode": info["exitcode"],
> +                "requester": info["requester"],
> +                "env": info["env"],
> +                "uuid": info["uuid"],
> +            },
> +        )
> +    logging.info("Inserted the following entry into the db:\n%s" % body)

Another place where pylint'll whinge about %-interpolation vs parameters.

> +
> +    msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> +    logging.basicConfig(level=logging.INFO)
> +    db_con = db_connect()
> +    amqp_con = amqp_connect()
> +    status_ch = amqp_con.channel()
> +    status_ch.access_request("/complete", active=True, read=True, write=False)
> +    status_ch.exchange_declare(
> +        EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> +    )
> +    queue_name = "sqlite-writer-listener-%s" % socket.getfqdn()
> +    status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> +    status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name)
> +    logging.info("Listening to requests on %s" % queue_name)

Another place where pylint'll whinge about %-interpolation vs parameters.

> +    status_ch.basic_consume(
> +        "", callback=lambda msg: process_message(msg, db_con)
> +    )
> +    while status_ch.callbacks:
> +        status_ch.wait()


-- 
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/460847
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master.



References