canonical-ubuntu-qa team mailing list archive
-
canonical-ubuntu-qa team
-
Mailing list archive
-
Message #03313
[Merge] ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master
Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.
Requested reviews:
Canonical's Ubuntu QA (canonical-ubuntu-qa)
For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/462218
--
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results
index 0c0e569..457fcd5 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-all-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results
@@ -21,6 +21,7 @@ import tarfile
import urllib.parse
import amqplib.client_0_8 as amqp
+from amqplib.client_0_8 import AMQPException
import swiftclient
from distro_info import UbuntuDistroInfo
from helpers.utils import SqliteWriterConfig, get_test_id
@@ -50,7 +51,6 @@ def amqp_connect():
return amqp_con
-# def list_remote_container(container_url):
def list_remote_container(container_name, swift_conn):
LOGGER.debug("Listing container %s", container_name)
_, list_of_test_results = swift_conn.get_container(
@@ -78,7 +78,6 @@ def list_our_results(release):
def fetch_one_result(container_name, object_name, swift_conn):
"""Download one result URL from swift and add it to the DB"""
(release, arch, _, src, run_id, _) = object_name.split("/")
- test_id = get_test_id(db_con, release, arch, src)
_, contents = swift_conn.get_object(container_name, object_name)
tar_bytes = io.BytesIO(contents)
try:
@@ -142,32 +141,42 @@ def fetch_one_result(container_name, object_name, swift_conn):
env_vars.append("=".join([env, value]))
# Insert the write request into the queue
- complete_amqp = amqp_con.channel()
- complete_amqp.access_request(
- "/complete", active=True, read=False, write=True
- )
- complete_amqp.exchange_declare(
- SqliteWriterConfig.writer_exchange_name,
- "fanout",
- durable=True,
- auto_delete=False,
- )
- write_me_msg = {
- "test_id": test_id,
- "run_id": run_id,
- "version": ver,
- "triggers": test_triggers,
- "duration": duration,
- "exitcode": exitcode,
- "requester": requester,
- "env": ",".join(env_vars),
- "uuid": test_uuid,
- }
- complete_amqp.basic_publish(
- amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
- SqliteWriterConfig.writer_exchange_name,
- "",
- )
+ while True:
+ try:
+ complete_amqp = amqp_con.channel()
+ complete_amqp.access_request(
+ "/complete", active=True, read=False, write=True
+ )
+ complete_amqp.exchange_declare(
+ SqliteWriterConfig.writer_exchange_name,
+ "fanout",
+ durable=True,
+ auto_delete=False,
+ )
+ write_me_msg = {
+ "run_id": run_id,
+ "version": ver,
+ "triggers": test_triggers,
+ "duration": duration,
+ "exitcode": exitcode,
+ "requester": requester,
+ "env": ",".join(env_vars),
+ "uuid": test_uuid,
+ "release": release,
+ "arch": arch,
+ "package": src,
+ }
+ complete_amqp.basic_publish(
+ amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+ SqliteWriterConfig.writer_exchange_name,
+ "",
+ )
+ return
+ except Exception as _:
+ amqp_con = amqp_connect()
+ # maybe we should sleep here?
+ # or re-init the amqp_con
+ pass
def fetch_container(release, swift_conn):
@@ -246,9 +255,7 @@ if __name__ == "__main__":
for release in releases:
fetch_container(
release,
- os.path.join(
- config["web"]["SwiftURL"], f"autopkgtest-{release}"
- ),
+ swift_conn,
)
finally:
if db_con:
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results
index e744eff..14d522c 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-results
@@ -9,7 +9,7 @@ import sqlite3
import urllib.parse
import amqplib.client_0_8 as amqp
-from helpers.utils import SqliteWriterConfig, get_test_id
+from helpers.utils import SqliteWriterConfig
EXCHANGE_NAME = "testcomplete.fanout"
@@ -81,36 +81,43 @@ def process_message(msg, db_con):
msg.channel.basic_ack(msg.delivery_tag)
return
- test_id = get_test_id(db_con, release, arch, package)
- # add to queue instead of writing to db
- complete_amqp = amqp_con.channel()
- complete_amqp.access_request(
- "/complete", active=True, read=False, write=True
- )
- complete_amqp.exchange_declare(
- SqliteWriterConfig.writer_exchange_name,
- "fanout",
- durable=True,
- auto_delete=False,
- )
- write_me_msg = {
- "test_id": test_id,
- "run_id": run_id,
- "version": version,
- "triggers": triggers,
- "duration": duration,
- "exitcode": exitcode,
- "requester": requester,
- "env": info.get("env", ""),
- "uuid": test_uuid,
- }
- complete_amqp.basic_publish(
- amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
- SqliteWriterConfig.writer_exchange_name,
- "",
- )
+ while True:
+ try:
+ # add to queue instead of writing to db
+ complete_amqp = amqp_con.channel()
+ complete_amqp.access_request(
+ "/complete", active=True, read=False, write=True
+ )
+ complete_amqp.exchange_declare(
+ SqliteWriterConfig.writer_exchange_name,
+ "fanout",
+ durable=True,
+ auto_delete=False,
+ )
+ write_me_msg = {
+ "run_id": run_id,
+ "version": version,
+ "triggers": triggers,
+ "duration": duration,
+ "exitcode": exitcode,
+ "requester": requester,
+ "env": info.get("env", ""),
+ "uuid": test_uuid,
+ "release": release,
+ "arch": arch,
+ "package": package,
+ }
+ complete_amqp.basic_publish(
+ amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+ SqliteWriterConfig.writer_exchange_name,
+ "",
+ )
+
+ msg.channel.basic_ack(msg.delivery_tag)
+ return
+ except Exception as _:
+ amqp_con = amqp_connect()
- msg.channel.basic_ack(msg.delivery_tag)
if __name__ == "__main__":
diff --git a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
index 4306a79..3deca9a 100644
--- a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
+++ b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
@@ -19,7 +19,6 @@ class SqliteWriterConfig:
writer_exchange_name = "sqlite-write-me.fanout"
checkpoint_interval = 5 # minutes
test_column_names = [
- "test_id",
"run_id",
"version",
"triggers",
@@ -28,6 +27,9 @@ class SqliteWriterConfig:
"requester",
"env",
"uuid",
+ "release",
+ "arch",
+ "package",
]
diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
index b0046db..a08d459 100755
--- a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
+++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
@@ -13,7 +13,7 @@ sqlite3.paramstyle = "named"
import urllib.parse
import amqplib.client_0_8 as amqp
-from helpers.utils import SqliteWriterConfig, init_db
+from helpers.utils import SqliteWriterConfig, init_db, get_test_id
LAST_CHECKPOINT = datetime.datetime.now()
@@ -72,13 +72,19 @@ def process_message(msg, db_con):
if isinstance(body, bytes):
body = body.decode("UTF-8", errors="replace")
info = json.loads(body)
- logging.info("Message is: \n%s" % json.dumps(info, indent=2))
+ # need to make it so the test_id isn't retrieved by download-results or download-all-results, but here
+ logging.info("Message is: \n%s" % json.dumps(info))
if not check_msg(info):
logging.error(
"Message has incorrect keys! Ignoring\n%s"
% json.dumps(info, indent=2)
)
return
+ # these aren't currently in the messages!
+ info["test_id"] = get_test_id(db_con, info["release"], info["arch"], info["package"])
+ del info["release"]
+ del info["arch"]
+ del info["package"]
with db_con:
c = db_con.cursor()
c.execute(
@@ -89,7 +95,7 @@ def process_message(msg, db_con):
),
info,
)
- logging.info("Inserted the following entry into the db:\n%s" % body)
+ logging.info("Inserted the following entry into the db:\n%s" % json.dumps(info))
def msg_callback(msg, db_con):