← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master

 

Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~andersson123/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/462218
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:sqlite-writer-bugfixes into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-all-results b/charms/focal/autopkgtest-web/webcontrol/download-all-results
index 0c0e569..457fcd5 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-all-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-all-results
@@ -21,6 +21,7 @@ import tarfile
 import urllib.parse
 
 import amqplib.client_0_8 as amqp
+from amqplib.client_0_8 import AMQPException
 import swiftclient
 from distro_info import UbuntuDistroInfo
 from helpers.utils import SqliteWriterConfig, get_test_id
@@ -50,7 +51,6 @@ def amqp_connect():
     return amqp_con
 
 
-# def list_remote_container(container_url):
 def list_remote_container(container_name, swift_conn):
     LOGGER.debug("Listing container %s", container_name)
     _, list_of_test_results = swift_conn.get_container(
@@ -78,7 +78,6 @@ def list_our_results(release):
 def fetch_one_result(container_name, object_name, swift_conn):
     """Download one result URL from swift and add it to the DB"""
     (release, arch, _, src, run_id, _) = object_name.split("/")
-    test_id = get_test_id(db_con, release, arch, src)
     _, contents = swift_conn.get_object(container_name, object_name)
     tar_bytes = io.BytesIO(contents)
     try:
@@ -142,32 +141,42 @@ def fetch_one_result(container_name, object_name, swift_conn):
             env_vars.append("=".join([env, value]))
 
     # Insert the write request into the queue
-    complete_amqp = amqp_con.channel()
-    complete_amqp.access_request(
-        "/complete", active=True, read=False, write=True
-    )
-    complete_amqp.exchange_declare(
-        SqliteWriterConfig.writer_exchange_name,
-        "fanout",
-        durable=True,
-        auto_delete=False,
-    )
-    write_me_msg = {
-        "test_id": test_id,
-        "run_id": run_id,
-        "version": ver,
-        "triggers": test_triggers,
-        "duration": duration,
-        "exitcode": exitcode,
-        "requester": requester,
-        "env": ",".join(env_vars),
-        "uuid": test_uuid,
-    }
-    complete_amqp.basic_publish(
-        amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
-        SqliteWriterConfig.writer_exchange_name,
-        "",
-    )
+    while True:
+        try:
+            complete_amqp = amqp_con.channel()
+            complete_amqp.access_request(
+                "/complete", active=True, read=False, write=True
+            )
+            complete_amqp.exchange_declare(
+                SqliteWriterConfig.writer_exchange_name,
+                "fanout",
+                durable=True,
+                auto_delete=False,
+            )
+            write_me_msg = {
+                "run_id": run_id,
+                "version": ver,
+                "triggers": test_triggers,
+                "duration": duration,
+                "exitcode": exitcode,
+                "requester": requester,
+                "env": ",".join(env_vars),
+                "uuid": test_uuid,
+                "release": release,
+                "arch": arch,
+                "package": src,
+            }
+            complete_amqp.basic_publish(
+                amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+                SqliteWriterConfig.writer_exchange_name,
+                "",
+            )
+            return
+        except Exception as _:
+            amqp_con = amqp_connect()
+            # maybe we should sleep here?
+            # or re-init the amqp_con
+            pass
 
 
 def fetch_container(release, swift_conn):
@@ -246,9 +255,7 @@ if __name__ == "__main__":
         for release in releases:
             fetch_container(
                 release,
-                os.path.join(
-                    config["web"]["SwiftURL"], f"autopkgtest-{release}"
-                ),
+                swift_conn,
             )
     finally:
         if db_con:
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results
index e744eff..14d522c 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-results
@@ -9,7 +9,7 @@ import sqlite3
 import urllib.parse
 
 import amqplib.client_0_8 as amqp
-from helpers.utils import SqliteWriterConfig, get_test_id
+from helpers.utils import SqliteWriterConfig
 
 EXCHANGE_NAME = "testcomplete.fanout"
 
@@ -81,36 +81,43 @@ def process_message(msg, db_con):
         msg.channel.basic_ack(msg.delivery_tag)
         return
 
-    test_id = get_test_id(db_con, release, arch, package)
-    # add to queue instead of writing to db
-    complete_amqp = amqp_con.channel()
-    complete_amqp.access_request(
-        "/complete", active=True, read=False, write=True
-    )
-    complete_amqp.exchange_declare(
-        SqliteWriterConfig.writer_exchange_name,
-        "fanout",
-        durable=True,
-        auto_delete=False,
-    )
-    write_me_msg = {
-        "test_id": test_id,
-        "run_id": run_id,
-        "version": version,
-        "triggers": triggers,
-        "duration": duration,
-        "exitcode": exitcode,
-        "requester": requester,
-        "env": info.get("env", ""),
-        "uuid": test_uuid,
-    }
-    complete_amqp.basic_publish(
-        amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
-        SqliteWriterConfig.writer_exchange_name,
-        "",
-    )
+    while True:
+        try:
+            # add to queue instead of writing to db
+            complete_amqp = amqp_con.channel()
+            complete_amqp.access_request(
+                "/complete", active=True, read=False, write=True
+            )
+            complete_amqp.exchange_declare(
+                SqliteWriterConfig.writer_exchange_name,
+                "fanout",
+                durable=True,
+                auto_delete=False,
+            )
+            write_me_msg = {
+                "run_id": run_id,
+                "version": version,
+                "triggers": triggers,
+                "duration": duration,
+                "exitcode": exitcode,
+                "requester": requester,
+                "env": info.get("env", ""),
+                "uuid": test_uuid,
+                "release": release,
+                "arch": arch,
+                "package": package,
+            }
+            complete_amqp.basic_publish(
+                amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+                SqliteWriterConfig.writer_exchange_name,
+                "",
+            )
+
+            msg.channel.basic_ack(msg.delivery_tag)
+            return
+        except Exception as _:
+            amqp_con = amqp_connect()
 
-    msg.channel.basic_ack(msg.delivery_tag)
 
 
 if __name__ == "__main__":
diff --git a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
index 4306a79..3deca9a 100644
--- a/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
+++ b/charms/focal/autopkgtest-web/webcontrol/helpers/utils.py
@@ -19,7 +19,6 @@ class SqliteWriterConfig:
     writer_exchange_name = "sqlite-write-me.fanout"
     checkpoint_interval = 5  # minutes
     test_column_names = [
-        "test_id",
         "run_id",
         "version",
         "triggers",
@@ -28,6 +27,9 @@ class SqliteWriterConfig:
         "requester",
         "env",
         "uuid",
+        "release",
+        "arch",
+        "package",
     ]
 
 
diff --git a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
index b0046db..a08d459 100755
--- a/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
+++ b/charms/focal/autopkgtest-web/webcontrol/sqlite-writer
@@ -13,7 +13,7 @@ sqlite3.paramstyle = "named"
 import urllib.parse
 
 import amqplib.client_0_8 as amqp
-from helpers.utils import SqliteWriterConfig, init_db
+from helpers.utils import SqliteWriterConfig, init_db, get_test_id
 
 LAST_CHECKPOINT = datetime.datetime.now()
 
@@ -72,13 +72,19 @@ def process_message(msg, db_con):
     if isinstance(body, bytes):
         body = body.decode("UTF-8", errors="replace")
     info = json.loads(body)
-    logging.info("Message is: \n%s" % json.dumps(info, indent=2))
+    # need to make it so the test_id isn't retrieved by download-results or download-all-results, but here
+    logging.info("Message is: \n%s" % json.dumps(info))
     if not check_msg(info):
         logging.error(
             "Message has incorrect keys! Ignoring\n%s"
             % json.dumps(info, indent=2)
         )
         return
+    # these aren't currently in the messages!
+    info["test_id"] = get_test_id(db_con, info["release"], info["arch"], info["package"])
+    del info["release"]
+    del info["arch"]
+    del info["package"]
     with db_con:
         c = db_con.cursor()
         c.execute(
@@ -89,7 +95,7 @@ def process_message(msg, db_con):
             ),
             info,
         )
-    logging.info("Inserted the following entry into the db:\n%s" % body)
+    logging.info("Inserted the following entry into the db:\n%s" % json.dumps(info))
 
 
 def msg_callback(msg, db_con):