canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03055
Re: [Merge] ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master
I should have a read up on AMQP so I can review the queue stuff; in the meantime, I've left one comment on the handling of failed INSERTs
Diff comments:
> diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> new file mode 100755
> index 0000000..3939bde
> --- /dev/null
> +++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
> @@ -0,0 +1,136 @@
> +#!/usr/bin/python3
> +
> +import configparser
> +import json
> +import logging
> +import os
> +import socket
> +import sqlite3
> +import urllib.parse
> +
> +import amqplib.client_0_8 as amqp
> +from helpers.utils import init_db
> +
> +EXCHANGE_NAME = "sqlite-write-me.fanout"
> +
> +config = None
> +db_con = None
> +
> +INSERT_INTO_KEYS = [
> + "test_id",
> + "run_id",
> + "version",
> + "triggers",
> + "duration",
> + "exitcode",
> + "requester",
> + "env",
> + "uuid",
> +]
> +
> +
> +def amqp_connect():
> + """Connect to AMQP server"""
> +
> + cp = configparser.ConfigParser()
> + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> + amqp_uri = cp["amqp"]["uri"]
> + parts = urllib.parse.urlsplit(amqp_uri, allow_fragments=False)
> + amqp_con = amqp.Connection(
> + parts.hostname, userid=parts.username, password=parts.password
> + )
> + logging.info(
> + "Connected to AMQP server at %s@%s" % (parts.username, parts.hostname)
> + )
> +
> + return amqp_con
> +
> +
> +def db_connect():
> + """Connect to SQLite DB"""
> + cp = configparser.ConfigParser()
> + cp.read(os.path.expanduser("~ubuntu/autopkgtest-cloud.conf"))
> +
> + db_con = init_db(cp["web"]["database"])
> +
> + return db_con
> +
> +
> +def check_msg(queue_msg):
> + required_keys = [
> + "test_id",
> + "run_id",
> + "version",
> + "triggers",
> + "duration",
> + "exitcode",
> + "requester",
> + "env",
> + "uuid",
> + ]
> + queue_msg_keys = list(queue_msg.keys())
> + required_keys.sort()
> + queue_msg_keys.sort()
> + if queue_msg_keys == required_keys:
> + return True
> + return False
> +
> +
> +def process_message(msg, db_con):
> + # aight, time to test this with download-results and download-all-results now.
> + body = msg.body
> + if isinstance(body, bytes):
> + body = body.decide("UTF-8", errors="replace")
> + info = json.loads(body)
> + logging.info("Message is: \n%s" % json.dumps(info, indent=2))
> + print(check_msg(info))
> + if not check_msg(info):
> + logging.info(
> + "Message has incorrect keys!\n%s" % json.dumps(info, indent=2)
> + )
> + # insert into db
> + try:
> + with db_con:
> + c = db_con.cursor()
> + c.execute(
> + "INSERT INTO result VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
> + (
> + info["test_id"],
> + info["run_id"],
> + info["version"],
> + info["triggers"],
> + info["duration"],
> + info["exitcode"],
> + info["requester"],
> + info["env"],
> + info["uuid"],
> + ),
> + )
> + db_con.commit()
> + logging.info("Inserted the following entry into the db:\n%s" % body)
> + except sqlite3.OperationalError as e:
> + logging.info("Insert operation failed with: %s" % str(e))
> + except sqlite3.IntegrityError as e:
> + logging.info("Insert operation failed with: %s" % str(e))
I'm not convinced the problematic ones will stay in the queue. Having failed the INSERT, and logged the error, we just continue onto basic_ack() the message, which should remove the message from queue storage and reply to the client that we've successfully dealt with their message. In fact, crashing out here would preserve the message because we *wouldn't* ack it.
You could choose to only ack those messages that we successfully INSERT, which should preserve the ones that fail (assuming that's not a violation of the AMQP protocol -- it's not one I'm overly familiar with). But consider the likely INSERT failure scenarios. If it starts failing, it likely won't be a case of "some succeed and some fail"; it'll be something like the database has become inaccessible (due to storage failure) or the database has changed to an incompatible structure.
In either case we can pretty much expect all future INSERTs to fail, so whether we crash out with the relevant error, or just start log spamming tons of failures to the log, the effect on the incoming queue would be the same: it builds and builds until either someone fixes stuff or it runs out of storage.
> +
> + msg.channel.basic_ack(msg.delivery_tag)
> +
> +
> +if __name__ == "__main__":
> + logging.basicConfig(level=logging.INFO)
> + db_con = db_connect()
> + amqp_con = amqp_connect()
> + status_ch = amqp_con.channel()
> + status_ch.access_request("/complete", active=True, read=True, write=False)
> + status_ch.exchange_declare(
> + EXCHANGE_NAME, "fanout", durable=True, auto_delete=False
> + )
> + queue_name = "sqlite-writer-listener-%s" % socket.getfqdn()
> + status_ch.queue_declare(queue_name, durable=True, auto_delete=False)
> + status_ch.queue_bind(queue_name, EXCHANGE_NAME, queue_name)
> + logging.info("Listening to requests on %s" % queue_name)
> + status_ch.basic_consume(
> + "", callback=lambda msg: process_message(msg, db_con)
> + )
> + while status_ch.callbacks:
> + status_ch.wait()
--
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/460847
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:d-r-d-a-r-merging into autopkgtest-cloud:master.
References