← Back to team overview

sts-sponsors team mailing list archive

[Merge] ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master

 

Jack Lloyd-Walters has proposed merging ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.

Commit message:
all of the temporal specific backend stuff should live here

Requested reviews:
  MAAS Committers (maas-committers)

For more details, see:
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443010
-- 
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.
diff --git a/README.md b/README.md
index f0fd167..9df9c41 100644
--- a/README.md
+++ b/README.md
@@ -50,7 +50,7 @@ for init in inits:
 	docstring = textwrap.fill(module.__doc__, 180)
 	cog.outl(f" - `{package}`: {docstring}\n")
 ]]] -->
-We have 5 test suites:
+We have 6 test suites:
  - `ansible_tests`:  Prepares a container running Ansible, and clones maas-ansible-playbooks, to test that MAAS topology is configurable with Ansible, and that the MAAS install behaves as expected
 under testing.
 
@@ -60,6 +60,8 @@ under testing.
 
  - `general_tests`:  Uses credentials.yaml info to access a running MAAS deployment, asserts the state is useful to these tests and executes them.
 
+ - `image_tests`:  Prepares a container running a temporal cluster, uses the cluster to execute image tests, and returns the results
+
  - `tests_per_machine`: Contains tests that are per machine, run in parallel by tox for efficiency.
 
 <!-- [[[end]]] -->
diff --git a/setup.py b/setup.py
index 4f8f242..e7b6ba4 100644
--- a/setup.py
+++ b/setup.py
@@ -4,6 +4,7 @@ install_requires = (
     'netaddr',
     'paramiko',
     'pytest',
+    'pytest-asyncio',
     'pytest-dependency',
     'pytest-rerunfailures',
     'pytest-steps',
@@ -11,6 +12,7 @@ install_requires = (
     'requests',
     'retry',
     'ruamel.yaml',
+    'temporalio'
 )
 
 
diff --git a/systemtests/ansible.py b/systemtests/ansible.py
index 46c376b..747eba0 100644
--- a/systemtests/ansible.py
+++ b/systemtests/ansible.py
@@ -109,28 +109,21 @@ def pip_package_exists(instance: Instance, package: str) -> bool:
 def clone_repo(
     instance: Instance,
     repo: str,
-    branch: str,
     clone_path: str,
+    branch: Optional[str] = None,
     environment: Optional[dict[str, str]] = None,
 ) -> None:
     clone_file = instance.files[clone_path]
+    cmd = ["eatmydata", "git", "clone"]
+    if branch:
+        cmd.extend(["--single-branch", "-b", branch])
+    cmd.extend(["--depth", "1", repo, clone_path])
     if not clone_file.exists():
-        instance.execute(
-            [
-                "eatmydata",
-                "git",
-                "clone",
-                "-b",
-                branch,
-                "--depth",
-                "1",
-                repo,
-                clone_path,
-            ],
-            environment=environment,
-        )
+        instance.execute(cmd, environment=environment)
         instance.execute(["git", "-C", clone_path, "show", "-q"])
-        instance.logger.info(f"Cloned {branch} from {repo} to {clone_path}")
+        instance.logger.info(
+            f"Cloned{' '+branch if branch else ''} from {repo} to {clone_path}"
+        )
 
 
 class AnsibleHost:
@@ -324,8 +317,11 @@ class AnsibleMain:
         playbooks_repo: str,
         playbooks_branch: str,
         proxy_env: Optional[dict[str, str]],
+<<<<<<< systemtests/ansible.py
         debug: str = "",
         floating_ip_net: Optional[str] = None,
+=======
+>>>>>>> systemtests/ansible.py
     ) -> None:
         self._lxd = lxd
         self.instance = instance
@@ -340,7 +336,6 @@ class AnsibleMain:
         self._inventory_: set[AnsibleHost] = set()
 
         self.ansible_repo_path = "/home/ubuntu/ansible_repo"
-        self.default_debug = debug or ""
 
     def setup(self) -> None:
         self.logger.info("Installing python3-pip")
@@ -353,8 +348,8 @@ class AnsibleMain:
         clone_repo(
             self.instance,
             self._playbooks_repo,
-            self._playbooks_branch,
             self.ansible_repo_path,
+            self._playbooks_branch,
             environment=self._proxy_env,
         )
 
@@ -519,7 +514,6 @@ class AnsibleMain:
                 ["maas", "apikey", "--username", user]
             ).stdout.rstrip("\n")
             _, authd_client = maas_client.log_in("admin", api_key)
-            authd_client.refresh()
             return authd_client
         raise HostWithoutRegion()
 
@@ -560,10 +554,7 @@ class AnsibleMain:
                 ]
             )
         file_content = "\n".join(inv)
-        if self.default_debug:
-            self.logger.info("Ansible hosts file generated:")
-            for line in file_content:
-                self.logger.info(line)
+        LOG.info(f"Ansible hosts file generated:\n{file_content}")
         self._hosts_file.write(file_content)
 
     def create_config_file(self) -> None:
diff --git a/systemtests/ansible_tests/test_ansible.py b/systemtests/ansible_tests/test_ansible.py
index d0477b0..a543f52 100644
--- a/systemtests/ansible_tests/test_ansible.py
+++ b/systemtests/ansible_tests/test_ansible.py
@@ -11,7 +11,6 @@ if TYPE_CHECKING:
 
 DEFAULT_VERSION = "3.3"
 
-
 @pytest.mark.skip_if_ansible_playbooks_unconfigured(
     "Needs Ansible playbook configuration"
 )
@@ -100,8 +99,11 @@ class TestAnsibleMAAS:
             ansible_main.logger = testlog.getChild(f"[{upgrade_version}]")
             ansible_main.update_config({"maas_version": upgrade_version})
             ansible_main.run_playbook()
+<<<<<<< systemtests/ansible_tests/test_ansible.py
             region = ansible_main.fetch_region(host)
             region.refresh()
+=======
+>>>>>>> systemtests/ansible_tests/test_ansible.py
             assert region.read_version_information()["version"][:3] == upgrade_version
         assert not ansible_main._inventory_
         assert not host.instance.exists()
diff --git a/systemtests/api.py b/systemtests/api.py
index 9696fda..6a205a6 100644
--- a/systemtests/api.py
+++ b/systemtests/api.py
@@ -158,9 +158,6 @@ class AuthenticatedAPIClient:
                 raise
         return output
 
-    def refresh(self) -> None:
-        self.api_client.execute(["refresh"])
-
     def list_subnets(self) -> list[Subnet]:
         subnets: list[Subnet] = self.execute(["subnets", "read"])
         return subnets
@@ -683,6 +680,19 @@ class AuthenticatedAPIClient:
         )
         return result["state"]
 
+    def create_storage_layout(
+        self, machine: Machine, layout_type: str, options: dict[str, str]
+    ) -> None:
+        self.execute(
+            [
+                "machine",
+                "set-storage-layout",
+                machine["system_id"],
+                f"storage_layout={layout_type}",
+            ]
+            + [f"{k}={v}" for k, v in options.items()]
+        )
+
 
 class QuietAuthenticatedAPIClient(AuthenticatedAPIClient):
     """An Authenticated API Client that is quiet."""
diff --git a/systemtests/conftest.py b/systemtests/conftest.py
index 0db6532..405cd46 100644
--- a/systemtests/conftest.py
+++ b/systemtests/conftest.py
@@ -49,6 +49,7 @@ from .fixtures import (
     skip_if_installed_from_snap,
     ssh_key,
     tag_all,
+    temporal_main,
     testlog,
     unauthenticated_maas_api_client,
     vault,
@@ -96,6 +97,7 @@ __all__ = [
     "skip_if_installed_from_deb_package",
     "skip_if_installed_from_snap",
     "tag_all",
+    "temporal_main",
     "testlog",
     "unauthenticated_maas_api_client",
     "vault",
diff --git a/systemtests/fixtures.py b/systemtests/fixtures.py
index 64d3974..6871772 100644
--- a/systemtests/fixtures.py
+++ b/systemtests/fixtures.py
@@ -2,10 +2,9 @@ from __future__ import annotations
 
 import io
 import os
-import re
 from logging import Logger, StreamHandler, getLogger
 from textwrap import dedent
-from typing import Any, Iterator, Optional, TextIO
+from typing import Any, Iterator, Optional, TextIO, Type
 
 import paramiko
 import pytest
@@ -18,6 +17,7 @@ from .config import ADMIN_EMAIL, ADMIN_PASSWORD, ADMIN_USER
 from .lxd import Instance, get_lxd
 from .o11y import is_o11y_enabled, setup_o11y
 from .region import MAASRegion
+from .temporal import ManagedTemporalMain, TemporalMain
 from .tls import MAAS_CONTAINER_CERTS_PATH, is_tls_enabled, setup_tls
 from .vault import Vault, VaultNotReadyError, is_vault_enabled, setup_vault
 
@@ -25,11 +25,14 @@ LOG_NAME = "systemtests.fixtures"
 
 LXD_PROFILE = os.environ.get("MAAS_SYSTEMTESTS_LXD_PROFILE", "prof-maas-lab")
 
+<<<<<<< systemtests/fixtures.py
 _DEB_GITREV_RE = re.compile(r"^\s+Installed:\s+\S+-g\.(?P<gitrev>\S+)-\S+$", re.M)
 _SNAP_GITREV_RE = re.compile(r"^maas\s+.+-g\.(?P<gitrev>\S+)\s+.+$", re.M)
 
 MAAS_VERSION_KEY = pytest.StashKey[str]()
 
+=======
+>>>>>>> systemtests/fixtures.py
 
 def _add_maas_ppa(
     instance: Instance,
@@ -58,6 +61,7 @@ def ansible_main(config: dict[str, Any]) -> Optional[Iterator[AnsibleMain]]:
     instance = Instance(lxd, "ansible-main")
     instance.create_container(config["containers-image"])
     proxy_env = get_proxy_env(config.get("proxy", {}))
+<<<<<<< systemtests/fixtures.py
     main = AnsibleMain(
         lxd,
         instance,
@@ -71,6 +75,21 @@ def ansible_main(config: dict[str, Any]) -> Optional[Iterator[AnsibleMain]]:
     yield main
     main.remove_hosts(main._inventory_)
     instance.delete()
+=======
+    try:
+        main = AnsibleMain(
+            lxd,
+            instance,
+            playbooks_repo=playbooks_repo,
+            playbooks_branch=playbooks_branch,
+            proxy_env=proxy_env,
+        )
+        main.setup()
+        yield main
+    finally:
+        main.remove_hosts(main._inventory_)
+        instance.delete()
+>>>>>>> systemtests/fixtures.py
 
 
 @pytest.fixture(scope="session")
@@ -86,10 +105,63 @@ def maas_from_ansible(ansible_main: AnsibleMain) -> Iterator[AuthenticatedAPICli
                 "maas_url": f"http://{host.ip}:5240/MAAS";,
             }
         )
+<<<<<<< systemtests/fixtures.py
         ansible_main.run_playbook()
+=======
+        ansible_main.run_playbook("site.yaml", "-vv")
+>>>>>>> systemtests/fixtures.py
         yield ansible_main.fetch_region(host)
 
 
+def image_results(
+    log: Logger, temporal_main: TemporalMain | ManagedTemporalMain
+) -> None:
+    if hasattr(temporal_main, "image_results"):
+        log.info("Image results:")
+        # TODO: Better image summary, and send results to frontend
+        # Maybe save as some local file to be sent out later?
+        if temporal_main.image_results:
+            for image, result in temporal_main.image_results.items():
+                log.info(f"{image}: {result}")
+        log.info("No Image tests completed")
+    else:
+        log.info("Could not launch temporal")
+
+
+@pytest.fixture(scope="session")
+def temporal_main(config: dict[str, Any]) -> Optional[Iterator[Type[TemporalMain]]]:
+    """Create a reference to a temporal cluster."""
+    log = getLogger(f"{LOG_NAME}.temporal")
+    lxd = get_lxd(log)
+    temporal_cfg = config.get("temporal-config", {})
+    proxy_env = get_proxy_env(config.get("proxy", {}))
+    cluster_ip = temporal_cfg.get("ip-addr")
+    try:
+        # create a temporal container with everything setup if not given
+        if not cluster_ip:
+            instance = Instance(lxd, "temporal-container")
+            instance.create_container(config["containers-image"])
+            main = ManagedTemporalMain(lxd, instance, temporal_cfg, proxy_env)
+            main.setup()
+            main.start_temporal_server()
+            log.info("Temporal enclosure started")
+            yield main
+            main.stop()
+        else:
+            external_main = TemporalMain(
+                cluster_ip,
+                temporal_cfg,
+                getLogger(f"{LOG_NAME}.external_temporal_cluster"),
+            )
+            log.info("Temporal enclosure started")
+            yield external_main
+    except Exception as e:
+        log.error(f"Image tests failure: {e}")
+    finally:
+        if not cluster_ip:
+            main.stop()
+
+
 @pytest.fixture(scope="session")
 def build_container(config: dict[str, Any]) -> Optional[Iterator[Instance]]:
     """Create a container for building MAAS package in."""
@@ -437,6 +509,7 @@ def install_deb(
         ["apt-cache", "policy", "maas"],
         environment={"DEBIAN_FRONTEND": "noninteractive"},
     )  # just to record which version is running.
+<<<<<<< systemtests/fixtures.py
     match = _DEB_GITREV_RE.search(policy.stdout)
     if not match:
         return ""
@@ -451,6 +524,13 @@ def get_maas_snap_version(maas_container: Instance) -> str:
         return ""
     entry = match.groupdict()
     return entry["gitrev"]
+=======
+    try:
+        version = policy.stdout.split("\n")[1].strip().split(" ")[1][2:]
+    except IndexError:
+        version = ""
+    return version
+>>>>>>> systemtests/fixtures.py
 
 
 @pytest.fixture(scope="session")
@@ -484,10 +564,15 @@ def maas_region(
         maas_container.execute(["update-ca-certificates"])
 
     if installed_from_snap:
-        version = get_maas_snap_version(maas_container)
         maas_already_initialized = maas_container.files[
             "/var/snap/maas/common/snap_mode"
         ].exists()
+        # just to record which version is running.
+        snap_list = maas_container.execute(["snap", "list", "maas"])
+        try:
+            version = snap_list.stdout.split("\n")[1].split()[1]
+        except IndexError:
+            version = ""
         if not maas_already_initialized:
             maas_container.execute(
                 [
@@ -503,6 +588,11 @@ def maas_region(
     else:
         assert maas_deb_repo is not None
         version = install_deb(maas_container, maas_deb_repo, config)
+<<<<<<< systemtests/fixtures.py
+=======
+    with open("version_under_test", "w") as fh:
+        fh.write(f"{version}\n")
+>>>>>>> systemtests/fixtures.py
 
     # We never want to access the region via the system proxy
     if "no_proxy" not in os.environ:
diff --git a/systemtests/image_tests/__init__.py b/systemtests/image_tests/__init__.py
new file mode 100644
index 0000000..8c18b57
--- /dev/null
+++ b/systemtests/image_tests/__init__.py
@@ -0,0 +1,4 @@
+"""
+Prepares a container running a temporal cluster,
+uses the cluster to execute image tests, and returns the results
+"""
diff --git a/systemtests/image_tests/test_images.py b/systemtests/image_tests/test_images.py
new file mode 100644
index 0000000..dd0e8ef
--- /dev/null
+++ b/systemtests/image_tests/test_images.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+import pytest
+
+from systemtests.temporal import TemporalMain
+
+from .workflows.workflows import TestTemporalWorkflow
+
+
+@pytest.mark.asyncio
+class TestImages:
+    async def test_completes_prototype_workflow(
+        self, temporal_main: TemporalMain
+    ) -> None:
+        name = "test"
+        client = await temporal_main.connect()
+        result = await temporal_main.run_workflow(
+            client, TestTemporalWorkflow, name, workflow_id="prototype-workflow"
+        )
+        assert result == f"Hello, {name}"
diff --git a/systemtests/image_tests/workflows/workflows.py b/systemtests/image_tests/workflows/workflows.py
new file mode 100644
index 0000000..ea24020
--- /dev/null
+++ b/systemtests/image_tests/workflows/workflows.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+import time
+from datetime import timedelta
+
+from temporalio import workflow
+
+from ...temporal_registry import register_activity, register_workflow
+
+NAME = "systemtests.temporal.workflow"
+
+
+@register_activity
+async def test_temporal(name: str) -> str:
+    time.sleep(1)
+    return f"Hello, {name}"
+
+
+@register_workflow
+class TestTemporalWorkflow:
+    @workflow.run
+    async def run(self, name: str) -> str:
+        res = await workflow.execute_activity(
+            test_temporal,
+            name,
+            schedule_to_close_timeout=timedelta(seconds=10),
+        )
+        return str(res)
diff --git a/systemtests/lxd.py b/systemtests/lxd.py
index 4ea9298..3b9a553 100644
--- a/systemtests/lxd.py
+++ b/systemtests/lxd.py
@@ -1,12 +1,13 @@
 from __future__ import annotations
 
 import json
+import multiprocessing
 import subprocess
 import textwrap
 from functools import partial
 from itertools import chain
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, List, Optional
 
 from netaddr import IPNetwork
 from retry import retry
@@ -146,6 +147,7 @@ class Instance:
         self._lxd = lxd
         self._instance_name = instance_name
         self.logger = lxd.logger.getChild(instance_name)
+        self.parallel_processes: List[multiprocessing.Process] = []
 
     def __repr__(self) -> str:
         return f"<Instance {self.name}>"
@@ -160,11 +162,14 @@ class Instance:
         stdin: Optional[str] = None,
         prefix: Optional[list[str]] = None,
         logger: Optional[logging.Logger] = None,
+        parallel: Optional[bool] = False,
     ) -> subprocess.CompletedProcess[str]:
         __tracebackhide__ = True
         if logger is None:
             logger = self.logger
-        return run_with_logging(cmd, logger, prefix=prefix, stdin=stdin)
+        return run_with_logging(
+            cmd, logger, prefix=prefix, stdin=stdin, parallel=parallel
+        )
 
     def exists(self) -> bool:
         try:
@@ -247,6 +252,36 @@ class Instance:
         )
         return self._run_with_logger(executor, None)
 
+    def parallel_execute(
+        self,
+        command: list[str],
+        environment: Optional[dict[str, str]] = None,
+        working_dir: Optional[str] = None,
+        logger_name: str = "parallel-process",
+        completion_callback: Optional[Callable[..., Any]] = None,
+    ) -> None:
+        __tracebackhide__ = True
+
+        def run_in_parallel() -> None:
+            __tracebackhide__ = True
+            self.logger.info(f"Parallel process {process.name} started")
+            self._run_with_logger(executor, self.logger.getChild(logger_name))
+            if completion_callback:
+                completion_callback()
+            complete_parallel()
+
+        def complete_parallel() -> None:
+            self.logger.info(f"Parallel process {process.name} complete")
+            self.parallel_processes.remove(process)
+
+        lxc_command = self._get_lxc_command(environment, working_dir)
+        executor = partial(self._run, command, prefix=lxc_command, parallel=True)
+        process = multiprocessing.Process(
+            target=run_in_parallel, name=f"'{' '.join(command[:2])}'"
+        )
+        process.start()
+        self.parallel_processes.append(process)
+
     @property
     def files(self) -> _FilesWrapper:
         return _FilesWrapper(self, self._run)
@@ -318,6 +353,7 @@ class Instance:
         return self._run(["lxc", "start", self._instance_name])
 
     def stop(self, force: bool = False) -> subprocess.CompletedProcess[str]:
+        self.terminate_parallel()
         argv = ["lxc", "stop", self._instance_name]
         if force:
             argv.append("--force")
@@ -329,7 +365,17 @@ class Instance:
             argv.append("--force")
         return self._run(argv)
 
+    def terminate_parallel(self) -> None:
+        if self.parallel_processes:
+            self.logger.warn(
+                f"Terminating {len(self.parallel_processes)} parallel processes."
+            )
+            for proc in self.parallel_processes[::-1]:
+                proc.terminate()
+                self.parallel_processes.remove(proc)
+
     def delete(self) -> None:
+        self.terminate_parallel()
         self._run(["lxc", "delete", "--force", self._instance_name])
 
     def status(self) -> str:
diff --git a/systemtests/subprocess.py b/systemtests/subprocess.py
index de427b0..4519bc9 100644
--- a/systemtests/subprocess.py
+++ b/systemtests/subprocess.py
@@ -16,11 +16,17 @@ def run_with_logging(
     env: Optional[dict[str, str]] = None,
     prefix: Optional[list[str]] = None,
     stdin: Optional[str] = None,
+    parallel: Optional[bool] = False,
 ) -> subprocess.CompletedProcess[str]:
     __tracebackhide__ = True
+    startchar, midchar, endchar = "┌", "|", "└"
     if prefix is None:
         prefix = []
-    logger.info("┌ " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd))
+    if parallel or False:
+        startchar, midchar, endchar = "╔", "║", "╚"
+    logger.info(
+        f"{startchar} " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd)
+    )
     process = subprocess.Popen(
         prefix + cmd,
         stdin=subprocess.PIPE,
@@ -43,8 +49,8 @@ def run_with_logging(
     stdout = io.StringIO()
     stderr = io.StringIO()
     sel = selectors.DefaultSelector()
-    sel.register(process.stdout, selectors.EVENT_READ, ("|", stdout.write))
-    sel.register(process.stderr, selectors.EVENT_READ, ("|E", stderr.write))
+    sel.register(process.stdout, selectors.EVENT_READ, (f"{midchar} ", stdout.write))
+    sel.register(process.stderr, selectors.EVENT_READ, (f"{midchar}E", stderr.write))
     with sel:
         while sel.get_map():
             for select, mask in sel.select():
@@ -69,11 +75,11 @@ def run_with_logging(
 
     returncode = process.wait()
     if returncode != 0:
-        logger.warning(f"└ ❌ Return code: {returncode}")
+        logger.warning(f"{endchar} ❌ Return code: {returncode}")
         raise subprocess.CalledProcessError(
             returncode, prefix + cmd, stdout.getvalue(), stderr.getvalue()
         )
-    logger.info("└ ✔")
+    logger.info(f"{endchar} ✔")
     return subprocess.CompletedProcess(
         process.args, returncode, stdout.getvalue(), stderr.getvalue()
     )
diff --git a/systemtests/temporal.py b/systemtests/temporal.py
new file mode 100644
index 0000000..c7d6196
--- /dev/null
+++ b/systemtests/temporal.py
@@ -0,0 +1,197 @@
+from __future__ import annotations
+
+import time
+from datetime import datetime
+from logging import getLogger
+from subprocess import CalledProcessError
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+from retry import retry
+from temporalio.client import Client
+from temporalio.types import ClassType
+
+from .ansible import apt_install, apt_update, clone_repo
+from .lxd import Instance
+from .temporal_registry import registry_worker, split_log
+
+if TYPE_CHECKING:
+    from logging import Logger
+
+    from .lxd import CLILXD
+
+NAME = "systemtests.temporal"
+LOG = getLogger(NAME)
+
+
+class TemporalMain:
+    def __init__(
+        self,
+        ip: str,
+        config: dict[str, Any],
+        logger: Logger,
+    ) -> None:
+        self._ip = ip
+        self.logger = logger
+        self._config = config
+        self.cluster_namespace = config.get("namespace", "default")
+        self.link_temporal_server(ip)
+
+    @property
+    def ip(self) -> str:
+        return self._ip
+
+    def link_temporal_server(self, ip: str) -> None:
+        self.cluster_server = f"{ip}:7233"
+        self.cluster_ui = f"{ip}:8233"
+        self.logger.info(f"Linked temporal cluster at {self.cluster_server}")
+        self.logger.info(f"Connect to temporal UI at {self.cluster_ui}")
+        self.logger.info(f"Using namespace {self.cluster_namespace}")
+
+    def __repr__(self) -> str:
+        return f"<TemporalMain at {self.ip}>"
+
+    def shutdown(self) -> None:
+        pass
+
+    async def connect(
+        self, override_ip: str = "", override_namespace: str = ""
+    ) -> Client:
+        @retry(tries=5, delay=10, logger=self.logger)
+        async def _connect() -> None:
+            self.client = await Client.connect(
+                override_ip or self.cluster_server,
+                namespace=override_namespace or self.cluster_namespace,
+            )
+
+        await _connect()
+        return self.client
+
+    async def run_workflow(
+        self,
+        client: Client,
+        Workflow: ClassType,
+        params: Any,
+        workflow_id: Optional[str] = "",
+        workflow_dependencies: Optional[Dict[str, Any]] = {},
+    ) -> Any:
+        wflow_name = Workflow.__name__
+        wflow_logger = getLogger(f"{NAME}.workflow.{wflow_name}")
+        wflow_logger.info(f"Request execution with params: {params}")
+        task_queue = "maas-temporal"
+        this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
+
+        # Run a worker for the workflow
+        async with registry_worker(
+            client,
+            task_queue=task_queue,
+            logger=self.logger,
+            dependencies=workflow_dependencies or {},
+        ):
+            result = await client.execute_workflow(
+                Workflow.run,
+                params,
+                id=f"{workflow_id or wflow_name}-{this_time}",
+                task_queue=task_queue,
+            )
+
+            split_log(self.logger.info, "Execution completed with output:", result)
+            return result
+
+
+# Used to locally host a temporal cluster for the duration of the tests
+class ManagedTemporalMain(TemporalMain):
+    def __init__(
+        self,
+        lxd: CLILXD,
+        instance: Instance,
+        config: dict[str, Any],
+        proxy_env: Optional[dict[str, str]],
+    ) -> None:
+        self._lxd = lxd
+        self.instance = instance
+        self._proxy_env = proxy_env
+        self._config = config
+
+        self.temporal_repo = "https://github.com/temporalio/temporalite.git";
+        self.temporal_repo_path = "/home/ubuntu/temporalite"
+
+        self.cluster_namespace = self._config.get("namespace", "default")
+
+    def setup(self) -> None:
+        self.logger.info("Installing apt-updates")
+        apt_update(self.instance, environment=self._proxy_env)
+
+        self.logger.info("Installing golang")
+        apt_install(self.instance, "golang", environment=self._proxy_env)
+
+        self.logger.info("Cloning temporalite repo")
+        clone_repo(self.instance, self.temporal_repo, self.temporal_repo_path)
+
+        self.logger.info("Installing temporalite")
+        self.instance.execute(
+            ["go", "build", "./cmd/temporalite"], working_dir=self.temporal_repo_path
+        )
+
+    def wait_for_host_port(
+        self, host_ip: str = "127.0.0.1", host_port: str = "80", timeout: int = 300
+    ) -> bool:
+        self.logger.info(
+            f"Waiting for {host_ip}:{host_port} to open with timeout {timeout}."
+        )
+        time_out = time.time() + timeout
+        connection = False
+        while time.time() < time_out:
+            try:
+                output = self.instance.quietly_execute(["nc", "-z", host_ip, host_port])
+                if connection := not bool(output.returncode):
+                    break
+            except CalledProcessError:
+                time.sleep(1)
+        if not connection:
+            raise ConnectionError(f"Could not connect to {host_ip}:{host_port} in time")
+        self.logger.info(f"{host_ip}:{host_port} reached.")
+        return connection
+
+    def start_temporal_server(self) -> None:
+        self.logger.info("Starting temporalite cluster")
+        self.instance.parallel_execute(
+            [
+                "./temporalite",
+                "start",
+                "--namespace",
+                self.cluster_namespace,
+                "--ephemeral",
+                "--log-level",
+                "warn",
+                "--log-format",
+                "pretty",
+            ],
+            working_dir=self.temporal_repo_path,
+            logger_name="temporalite",
+        )
+        self.wait_for_host_port(host_port="8233")
+
+    def return_exec(
+        self, command: list[str], environment: Optional[dict[str, str]] = None
+    ) -> str:
+        return self.instance.execute(
+            command=command, environment=environment
+        ).stdout.rstrip("\n")
+
+    def __repr__(self) -> str:
+        return f"<ManagedTemporalMain in {self.instance}>"
+
+    @property
+    def ip(self) -> str:
+        return self.instance.get_ip_address()
+
+    @property
+    def logger(self) -> Logger:
+        return self.instance.logger
+
+    @logger.setter
+    def logger(self, logger: Logger) -> None:
+        self.instance.logger = logger
+
+    def stop(self) -> None:
+        self.instance.delete()
diff --git a/systemtests/temporal_registry.py b/systemtests/temporal_registry.py
new file mode 100644
index 0000000..ff592c5
--- /dev/null
+++ b/systemtests/temporal_registry.py
@@ -0,0 +1,161 @@
+"""
+    Duplicates and modifies the registry code in @igor-brovtsin's
+    https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
+
+"""
+
+from __future__ import annotations
+
+import abc
+import inspect
+import logging
+from typing import Any, Callable, Dict, List, Optional, Set, Type
+
+from temporalio import activity, workflow
+from temporalio.client import Client
+from temporalio.types import CallableType, ClassType
+from temporalio.worker import Worker
+
+
+def split_log(
+    log_func: Callable[..., Any],
+    heading: str,
+    results: str | list[str],
+    mid_char: str = "│",
+) -> None:
+    res = results if isinstance(results, list) else results.split("\n")
+    log_func(heading)
+    for idx, line in enumerate(res):
+        log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
+
+
+def workflow_registrar() -> Callable[..., Any]:
+    """Generates a wrapper that applies `@temporalio.workflow.defn` to
+    a workflow class, and adds it to the list that can later be passed
+    to the Temporal client"""
+    registered = []
+
+    def wrapper(  # type:ignore
+        cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
+    ) -> Callable[..., Any]:
+        def decorator(cls: ClassType) -> ClassType:
+            registered.append(cls)
+            return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
+
+        if cls is not None:
+            return decorator(cls)
+        return decorator
+
+    wrapper.workflows = registered  # type:ignore
+    return wrapper
+
+
+def activity_registrar() -> Callable[..., Any]:
+    """Generates a wrapper that applies `@temporalio.activity.defn` to
+    a callable, and adds it to the list that can later be passed
+    to the Temporal client"""
+    registry = []
+
+    def decorator(  # type:ignore
+        f: CallableType, *args, **kwargs
+    ) -> Callable[..., Any]:
+        wrapped = activity.defn(*args, **kwargs)(f)
+        registry.append(wrapped)
+        return wrapped  # type:ignore
+
+    decorator.activities = registry  # type:ignore
+    return decorator
+
+
+register_workflow = workflow_registrar()
+register_activity = activity_registrar()
+
+
+class TemporalActivityClass(abc.ABC):
+    """Abstract class for Temporal activities that require external
+    dependencies: DB connections, LXD containers, HTTP clients, etc.
+    """
+
+    CLASSES: Set[Type[TemporalActivityClass]] = set()
+
+    def __init_subclass__(cls, **kwargs) -> None:  # type:ignore
+        super().__init_subclass__(**kwargs)
+        TemporalActivityClass.CLASSES.add(cls)
+
+    def get_activities(self) -> List[Any]:
+        result = []
+        for name in dir(self):
+            member = getattr(self, name)
+            if not callable(member) or not hasattr(
+                member, "__temporal_activity_definition"
+            ):
+                continue
+            result.append(member)
+        return result
+
+
+class ActivityClassDependencyMissingException(Exception):
+    """Raised when an activity does not provide a reqiored dependency."""
+
+
+def _initialize_activity_classes(
+    dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
+) -> List[CallableType]:
+    results = []
+    for cls in repo:
+        init = getattr(cls, "__init__")
+        kwargs = {}
+        for i, value in enumerate(inspect.signature(init).parameters.values()):
+            if i == 0:
+                continue
+            if value.name in dependencies:
+                kwargs[value.name] = dependencies[value.name]
+            elif value.default is inspect.Parameter.empty:
+                raise ActivityClassDependencyMissingException(
+                    f"{cls} requires dependency `{value.name}` that was not provided!"
+                )
+        results += cls(**kwargs).get_activities()  # type:ignore
+    return results
+
+
+def registry_worker(  # type:ignore
+    client: Client,
+    task_queue: str,
+    logger: logging.Logger,
+    dependencies: Dict[str, Any],
+    **kwargs,
+) -> Worker:
+    """Helper method that returns worker that uses `register_workflow`
+    and `register_activity` registries."""
+    kwargs.setdefault("workflows", [])
+    kwargs.setdefault("activities", [])
+    assert isinstance(
+        kwargs["workflows"], list
+    ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
+    assert isinstance(
+        kwargs["activities"], list
+    ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
+    kwargs["workflows"].extend(register_workflow.workflows)  # type:ignore
+    kwargs["activities"].extend(register_activity.activities)  # type:ignore
+
+    class_activities = _initialize_activity_classes(  # type:ignore
+        dependencies, TemporalActivityClass.CLASSES  # type:ignore
+    )
+    kwargs["activities"].extend(class_activities)
+
+    workflows_names = [
+        f"{w.__module__}.{w.__qualname__}"
+        if isinstance(w, type)
+        else f"{w} of {w.__module__}"
+        for w in kwargs["workflows"]
+    ]
+    activities_names = [
+        f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
+    ]
+
+    if workflows_names:
+        split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
+    if activities_names:
+        split_log(logger.info, "Registered activities", activities_names, mid_char="├")
+
+    return Worker(client=client, task_queue=task_queue, **kwargs)
diff --git a/tox.ini b/tox.ini
index 34ab368..b7d7005 100644
--- a/tox.ini
+++ b/tox.ini
@@ -26,7 +26,7 @@ passenv =
   MAAS_SYSTEMTESTS_CLIENT_CONTAINER
   MAAS_SYSTEMTESTS_LXD_PROFILE
 
-[testenv:{env_builder,collect_sos_report,general_tests,ansible_tests}]
+[testenv:{env_builder,collect_sos_report,general_tests,ansible_tests,image_tests}]
 passenv = {[base]passenv}
 
 [testenv:cog]
@@ -56,6 +56,11 @@ commands=
 #    passenv = {{[base]passenv}}
 #    """)
 #]]]
+
+[testenv:{opelt,stunky,vm1}]
+description=Per-machine tests for {envname}
+passenv = {[base]passenv}
+
 #[[[end]]]
 
 [testenv:format]

Follow ups