← Back to team overview

canonical-ubuntu-qa team mailing list archive

[Merge] ~hyask/autopkgtest-cloud:skia/close_channels into autopkgtest-cloud:master

 

Skia has proposed merging ~hyask/autopkgtest-cloud:skia/close_channels into autopkgtest-cloud:master.

Requested reviews:
  Canonical's Ubuntu QA (canonical-ubuntu-qa)

For more details, see:
https://code.launchpad.net/~hyask/autopkgtest-cloud/+git/autopkgtest-cloud/+merge/468265

We hit a situation where RabbitMQ hit its `memory alarm` of 3GB (https://www.rabbitmq.com/docs/memory). Investigation showed a huge number of channels (+65k). Closing those two worker channels seems to stabilize the situation to around 1350 opened channels and 700MB of RAM, but we're not running at full capacity right now.

More channel closing might be required elsewhere, but those three are already a very good start.

EDIT after a night:
The closing of the additional channel in `download-results` stabilized the situation at about 900 opened channels.

EDIT after an additional day:
The situation seems to keep stable. I'm now opening this MP for review.
-- 
Your team Canonical's Ubuntu QA is requested to review the proposed merge of ~hyask/autopkgtest-cloud:skia/close_channels into autopkgtest-cloud:master.
diff --git a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
index e9bc152..7354be2 100755
--- a/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
+++ b/charms/focal/autopkgtest-cloud-worker/autopkgtest-cloud/worker/worker
@@ -603,6 +603,8 @@ def call_autopkgtest(
         private,
     )
 
+    status_amqp.close()
+
     return ret
 
 
@@ -1583,6 +1585,7 @@ def request(msg):
     complete_amqp.basic_publish(
         amqp.Message(complete_msg, delivery_mode=2), complete_exchange_name, ""
     )
+    complete_amqp.close()
 
     logging.info("Acknowledging request %s" % body)
     msg.channel.basic_ack(msg.delivery_tag)
diff --git a/charms/focal/autopkgtest-web/webcontrol/download-results b/charms/focal/autopkgtest-web/webcontrol/download-results
index 1297a66..2c4ea47 100755
--- a/charms/focal/autopkgtest-web/webcontrol/download-results
+++ b/charms/focal/autopkgtest-web/webcontrol/download-results
@@ -76,38 +76,38 @@ def process_message(msg):
     while True:
         try:
             # add to queue instead of writing to db
-            complete_amqp = amqp_con.channel()
-            complete_amqp.access_request(
-                "/complete", active=True, read=False, write=True
-            )
-            complete_amqp.exchange_declare(
-                SqliteWriterConfig.writer_exchange_name,
-                "fanout",
-                durable=True,
-                auto_delete=False,
-            )
-            write_me_msg = {
-                "run_id": run_id,
-                "version": version,
-                "triggers": triggers,
-                "duration": duration,
-                "exitcode": exitcode,
-                "requester": requester,
-                "env": info.get("env", ""),
-                "uuid": test_uuid,
-                "release": release,
-                "arch": arch,
-                "package": package,
-            }
-            complete_amqp.basic_publish(
-                amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
-                SqliteWriterConfig.writer_exchange_name,
-                "",
-            )
-
-            msg.channel.basic_ack(msg.delivery_tag)
-            return
-        except Exception as _:
+            with amqp_con.channel() as complete_amqp:
+                complete_amqp.access_request(
+                    "/complete", active=True, read=False, write=True
+                )
+                complete_amqp.exchange_declare(
+                    SqliteWriterConfig.writer_exchange_name,
+                    "fanout",
+                    durable=True,
+                    auto_delete=False,
+                )
+                write_me_msg = {
+                    "run_id": run_id,
+                    "version": version,
+                    "triggers": triggers,
+                    "duration": duration,
+                    "exitcode": exitcode,
+                    "requester": requester,
+                    "env": info.get("env", ""),
+                    "uuid": test_uuid,
+                    "release": release,
+                    "arch": arch,
+                    "package": package,
+                }
+                complete_amqp.basic_publish(
+                    amqp.Message(json.dumps(write_me_msg), delivery_mode=2),
+                    SqliteWriterConfig.writer_exchange_name,
+                    "",
+                )
+
+                msg.channel.basic_ack(msg.delivery_tag)
+                return
+        except Exception:
             time.sleep(3)
             now = datetime.datetime.now()
             if (