← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~andersson123/autopkgtest-cloud:amqp-status-collector-oneshot into autopkgtest-cloud:master


Tim Andersson has proposed merging ~andersson123/autopkgtest-cloud:amqp-status-collector-oneshot into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:

make amqp-status-collector a oneshot service
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~andersson123/autopkgtest-cloud:amqp-status-collector-oneshot into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-web/units/amqp-status-collector.service b/charms/focal/autopkgtest-web/units/amqp-status-collector.service
index 5b98105..11f8157 100644
--- a/charms/focal/autopkgtest-web/units/amqp-status-collector.service
+++ b/charms/focal/autopkgtest-web/units/amqp-status-collector.service
@@ -1,15 +1,10 @@
 Description=Read running test status from RabbitMQ (running.json)
diff --git a/charms/focal/autopkgtest-web/units/amqp-status-collector.timer b/charms/focal/autopkgtest-web/units/amqp-status-collector.timer
new file mode 100644
index 0000000..6ff52ec
--- /dev/null
+++ b/charms/focal/autopkgtest-web/units/amqp-status-collector.timer
@@ -0,0 +1,8 @@
+Description=Read running test status from RabbitMQ (running.json)
diff --git a/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector b/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
index ae2226c..0329b88 100755
--- a/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
+++ b/charms/focal/autopkgtest-web/webcontrol/amqp-status-collector
@@ -1,41 +1,20 @@
 # Pick up running tests, their status and logtail from the "teststatus" fanout
-# queue, and regularly write it into /run/amqp-status-collector/running.json
+# queue, and periodically write it into /run/amqp-status-collector/running.json
 import json
 import logging
 import os
 import socket
-import time
 from helpers.utils import amqp_connect, get_autopkgtest_cloud_conf
-exchange_name = "teststatus.fanout"
-cp = get_autopkgtest_cloud_conf()
-running_name = cp["web"]["running_cache"]
-running_name_new = "{}.new".format(running_name)
-# package -> runhash -> release -> arch -> (params, duration, logtail)
-running_tests = {}
-last_update = 0
-def update_output(amqp_channel, force_update=False):
-    """Update report"""
-    global last_update
-    # update at most every 10 s
-    now = time.time()
-    if not force_update and now - last_update < 10:
-        return
-    with open(running_name_new, "w", encoding="utf-8") as f:
-        json.dump(running_tests, f)
-    os.rename(running_name_new, running_name)
+    level=(logging.DEBUG if "DEBUG" in os.environ else logging.INFO)
-def process_message(msg):
+def process_message(msg, running_tests):
     """Process AMQP status message, update running_tests"""
     body = msg.body
@@ -73,27 +52,34 @@ def process_message(msg):
         except KeyError:
-    update_output(msg.channel, not info["running"])
+    return running_tests
+def main():
+    amqp_con = amqp_connect()
+    status_ch = amqp_con.channel()
+    status_ch.exchange_declare(
+        "teststatus.fanout", "fanout", durable=False, auto_delete=True
+    )
+    queue_name = "running-listener-%s" % socket.getfqdn()
+    # package -> runhash -> release -> arch -> (params, duration, logtail)
+    running_tests = {}
+    queue_empty = False
+    while not queue_empty:
+        msg = status_ch.basic_get(queue_name)
+        if msg is None:
+            queue_empty = True
+            continue
+        running_tests = process_message(msg, running_tests)
+    cp = get_autopkgtest_cloud_conf()
+    running_name = cp["web"]["running_cache"]
+    running_name_new = "{}.new".format(running_name)
-# main
+    with open(running_name_new, "w", encoding="utf-8") as f:
+        json.dump(running_tests, f)
+    os.rename(running_name_new, running_name)
-    level=(logging.DEBUG if "DEBUG" in os.environ else logging.INFO)
-amqp_con = amqp_connect()
-status_ch = amqp_con.channel()
-    exchange_name, "fanout", durable=False, auto_delete=True
-queue_name = "running-listener-%s" % socket.getfqdn()
-status_ch.queue_declare(queue_name, durable=False, auto_delete=True)
-status_ch.queue_bind(queue_name, exchange_name, queue_name)
-logging.info("Listening to requests on %s" % queue_name)
-status_ch.basic_consume("", callback=process_message, no_ack=True)
-while status_ch.callbacks:
-    amqp_con.drain_events()
+if __name__ == "__main__":
+    main()
