sts-sponsors team mailing list archive
-
sts-sponsors team
-
Mailing list archive
-
Message #08457
[Merge] ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master
Jack Lloyd-Walters has proposed merging ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.
Commit message:
all of the temporal specific backend stuff should live here
Requested reviews:
MAAS Committers (maas-committers)
For more details, see:
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443010
--
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.
diff --git a/README.md b/README.md
index f0fd167..9df9c41 100644
--- a/README.md
+++ b/README.md
@@ -50,7 +50,7 @@ for init in inits:
docstring = textwrap.fill(module.__doc__, 180)
cog.outl(f" - `{package}`: {docstring}\n")
]]] -->
-We have 5 test suites:
+We have 6 test suites:
- `ansible_tests`: Prepares a container running Ansible, and clones maas-ansible-playbooks, to test that MAAS topology is configurable with Ansible, and that the MAAS install behaves as expected
under testing.
@@ -60,6 +60,8 @@ under testing.
- `general_tests`: Uses credentials.yaml info to access a running MAAS deployment, asserts the state is useful to these tests and executes them.
+ - `image_tests`: Prepares a container running a temporal cluster, uses the cluster to execute image tests, and returns the results
+
- `tests_per_machine`: Contains tests that are per machine, run in parallel by tox for efficiency.
<!-- [[[end]]] -->
diff --git a/setup.py b/setup.py
index 4f8f242..e7b6ba4 100644
--- a/setup.py
+++ b/setup.py
@@ -4,6 +4,7 @@ install_requires = (
'netaddr',
'paramiko',
'pytest',
+ 'pytest-asyncio',
'pytest-dependency',
'pytest-rerunfailures',
'pytest-steps',
@@ -11,6 +12,7 @@ install_requires = (
'requests',
'retry',
'ruamel.yaml',
+ 'temporalio'
)
diff --git a/systemtests/ansible.py b/systemtests/ansible.py
index 46c376b..747eba0 100644
--- a/systemtests/ansible.py
+++ b/systemtests/ansible.py
@@ -109,28 +109,21 @@ def pip_package_exists(instance: Instance, package: str) -> bool:
def clone_repo(
instance: Instance,
repo: str,
- branch: str,
clone_path: str,
+ branch: Optional[str] = None,
environment: Optional[dict[str, str]] = None,
) -> None:
clone_file = instance.files[clone_path]
+ cmd = ["eatmydata", "git", "clone"]
+ if branch:
+ cmd.extend(["--single-branch", "-b", branch])
+ cmd.extend(["--depth", "1", repo, clone_path])
if not clone_file.exists():
- instance.execute(
- [
- "eatmydata",
- "git",
- "clone",
- "-b",
- branch,
- "--depth",
- "1",
- repo,
- clone_path,
- ],
- environment=environment,
- )
+ instance.execute(cmd, environment=environment)
instance.execute(["git", "-C", clone_path, "show", "-q"])
- instance.logger.info(f"Cloned {branch} from {repo} to {clone_path}")
+ instance.logger.info(
+ f"Cloned{' '+branch if branch else ''} from {repo} to {clone_path}"
+ )
class AnsibleHost:
@@ -324,8 +317,11 @@ class AnsibleMain:
playbooks_repo: str,
playbooks_branch: str,
proxy_env: Optional[dict[str, str]],
+<<<<<<< systemtests/ansible.py
debug: str = "",
floating_ip_net: Optional[str] = None,
+=======
+>>>>>>> systemtests/ansible.py
) -> None:
self._lxd = lxd
self.instance = instance
@@ -340,7 +336,6 @@ class AnsibleMain:
self._inventory_: set[AnsibleHost] = set()
self.ansible_repo_path = "/home/ubuntu/ansible_repo"
- self.default_debug = debug or ""
def setup(self) -> None:
self.logger.info("Installing python3-pip")
@@ -353,8 +348,8 @@ class AnsibleMain:
clone_repo(
self.instance,
self._playbooks_repo,
- self._playbooks_branch,
self.ansible_repo_path,
+ self._playbooks_branch,
environment=self._proxy_env,
)
@@ -519,7 +514,6 @@ class AnsibleMain:
["maas", "apikey", "--username", user]
).stdout.rstrip("\n")
_, authd_client = maas_client.log_in("admin", api_key)
- authd_client.refresh()
return authd_client
raise HostWithoutRegion()
@@ -560,10 +554,7 @@ class AnsibleMain:
]
)
file_content = "\n".join(inv)
- if self.default_debug:
- self.logger.info("Ansible hosts file generated:")
- for line in file_content:
- self.logger.info(line)
+ LOG.info(f"Ansible hosts file generated:\n{file_content}")
self._hosts_file.write(file_content)
def create_config_file(self) -> None:
diff --git a/systemtests/ansible_tests/test_ansible.py b/systemtests/ansible_tests/test_ansible.py
index d0477b0..a543f52 100644
--- a/systemtests/ansible_tests/test_ansible.py
+++ b/systemtests/ansible_tests/test_ansible.py
@@ -11,7 +11,6 @@ if TYPE_CHECKING:
DEFAULT_VERSION = "3.3"
-
@pytest.mark.skip_if_ansible_playbooks_unconfigured(
"Needs Ansible playbook configuration"
)
@@ -100,8 +99,11 @@ class TestAnsibleMAAS:
ansible_main.logger = testlog.getChild(f"[{upgrade_version}]")
ansible_main.update_config({"maas_version": upgrade_version})
ansible_main.run_playbook()
+<<<<<<< systemtests/ansible_tests/test_ansible.py
region = ansible_main.fetch_region(host)
region.refresh()
+=======
+>>>>>>> systemtests/ansible_tests/test_ansible.py
assert region.read_version_information()["version"][:3] == upgrade_version
assert not ansible_main._inventory_
assert not host.instance.exists()
diff --git a/systemtests/api.py b/systemtests/api.py
index 9696fda..6a205a6 100644
--- a/systemtests/api.py
+++ b/systemtests/api.py
@@ -158,9 +158,6 @@ class AuthenticatedAPIClient:
raise
return output
- def refresh(self) -> None:
- self.api_client.execute(["refresh"])
-
def list_subnets(self) -> list[Subnet]:
subnets: list[Subnet] = self.execute(["subnets", "read"])
return subnets
@@ -683,6 +680,19 @@ class AuthenticatedAPIClient:
)
return result["state"]
+ def create_storage_layout(
+ self, machine: Machine, layout_type: str, options: dict[str, str]
+ ) -> None:
+ self.execute(
+ [
+ "machine",
+ "set-storage-layout",
+ machine["system_id"],
+ f"storage_layout={layout_type}",
+ ]
+ + [f"{k}={v}" for k, v in options.items()]
+ )
+
class QuietAuthenticatedAPIClient(AuthenticatedAPIClient):
"""An Authenticated API Client that is quiet."""
diff --git a/systemtests/conftest.py b/systemtests/conftest.py
index 0db6532..405cd46 100644
--- a/systemtests/conftest.py
+++ b/systemtests/conftest.py
@@ -49,6 +49,7 @@ from .fixtures import (
skip_if_installed_from_snap,
ssh_key,
tag_all,
+ temporal_main,
testlog,
unauthenticated_maas_api_client,
vault,
@@ -96,6 +97,7 @@ __all__ = [
"skip_if_installed_from_deb_package",
"skip_if_installed_from_snap",
"tag_all",
+ "temporal_main",
"testlog",
"unauthenticated_maas_api_client",
"vault",
diff --git a/systemtests/fixtures.py b/systemtests/fixtures.py
index 64d3974..6871772 100644
--- a/systemtests/fixtures.py
+++ b/systemtests/fixtures.py
@@ -2,10 +2,9 @@ from __future__ import annotations
import io
import os
-import re
from logging import Logger, StreamHandler, getLogger
from textwrap import dedent
-from typing import Any, Iterator, Optional, TextIO
+from typing import Any, Iterator, Optional, TextIO, Type
import paramiko
import pytest
@@ -18,6 +17,7 @@ from .config import ADMIN_EMAIL, ADMIN_PASSWORD, ADMIN_USER
from .lxd import Instance, get_lxd
from .o11y import is_o11y_enabled, setup_o11y
from .region import MAASRegion
+from .temporal import ManagedTemporalMain, TemporalMain
from .tls import MAAS_CONTAINER_CERTS_PATH, is_tls_enabled, setup_tls
from .vault import Vault, VaultNotReadyError, is_vault_enabled, setup_vault
@@ -25,11 +25,14 @@ LOG_NAME = "systemtests.fixtures"
LXD_PROFILE = os.environ.get("MAAS_SYSTEMTESTS_LXD_PROFILE", "prof-maas-lab")
+<<<<<<< systemtests/fixtures.py
_DEB_GITREV_RE = re.compile(r"^\s+Installed:\s+\S+-g\.(?P<gitrev>\S+)-\S+$", re.M)
_SNAP_GITREV_RE = re.compile(r"^maas\s+.+-g\.(?P<gitrev>\S+)\s+.+$", re.M)
MAAS_VERSION_KEY = pytest.StashKey[str]()
+=======
+>>>>>>> systemtests/fixtures.py
def _add_maas_ppa(
instance: Instance,
@@ -58,6 +61,7 @@ def ansible_main(config: dict[str, Any]) -> Optional[Iterator[AnsibleMain]]:
instance = Instance(lxd, "ansible-main")
instance.create_container(config["containers-image"])
proxy_env = get_proxy_env(config.get("proxy", {}))
+<<<<<<< systemtests/fixtures.py
main = AnsibleMain(
lxd,
instance,
@@ -71,6 +75,21 @@ def ansible_main(config: dict[str, Any]) -> Optional[Iterator[AnsibleMain]]:
yield main
main.remove_hosts(main._inventory_)
instance.delete()
+=======
+ try:
+ main = AnsibleMain(
+ lxd,
+ instance,
+ playbooks_repo=playbooks_repo,
+ playbooks_branch=playbooks_branch,
+ proxy_env=proxy_env,
+ )
+ main.setup()
+ yield main
+ finally:
+ main.remove_hosts(main._inventory_)
+ instance.delete()
+>>>>>>> systemtests/fixtures.py
@pytest.fixture(scope="session")
@@ -86,10 +105,63 @@ def maas_from_ansible(ansible_main: AnsibleMain) -> Iterator[AuthenticatedAPICli
"maas_url": f"http://{host.ip}:5240/MAAS",
}
)
+<<<<<<< systemtests/fixtures.py
ansible_main.run_playbook()
+=======
+ ansible_main.run_playbook("site.yaml", "-vv")
+>>>>>>> systemtests/fixtures.py
yield ansible_main.fetch_region(host)
+def image_results(
+ log: Logger, temporal_main: TemporalMain | ManagedTemporalMain
+) -> None:
+ if hasattr(temporal_main, "image_results"):
+ log.info("Image results:")
+ # TODO: Better image summary, and send results to frontend
+ # Maybe save as some local file to be sent out later?
+ if temporal_main.image_results:
+ for image, result in temporal_main.image_results.items():
+ log.info(f"{image}: {result}")
+ log.info("No Image tests completed")
+ else:
+ log.info("Could not launch temporal")
+
+
+@pytest.fixture(scope="session")
+def temporal_main(config: dict[str, Any]) -> Optional[Iterator[Type[TemporalMain]]]:
+ """Create a reference to a temporal cluster."""
+ log = getLogger(f"{LOG_NAME}.temporal")
+ lxd = get_lxd(log)
+ temporal_cfg = config.get("temporal-config", {})
+ proxy_env = get_proxy_env(config.get("proxy", {}))
+ cluster_ip = temporal_cfg.get("ip-addr")
+ try:
+ # create a temporal container with everything setup if not given
+ if not cluster_ip:
+ instance = Instance(lxd, "temporal-container")
+ instance.create_container(config["containers-image"])
+ main = ManagedTemporalMain(lxd, instance, temporal_cfg, proxy_env)
+ main.setup()
+ main.start_temporal_server()
+ log.info("Temporal enclosure started")
+ yield main
+ main.stop()
+ else:
+ external_main = TemporalMain(
+ cluster_ip,
+ temporal_cfg,
+ getLogger(f"{LOG_NAME}.external_temporal_cluster"),
+ )
+ log.info("Temporal enclosure started")
+ yield external_main
+ except Exception as e:
+ log.error(f"Image tests failure: {e}")
+ finally:
+ if not cluster_ip:
+ main.stop()
+
+
@pytest.fixture(scope="session")
def build_container(config: dict[str, Any]) -> Optional[Iterator[Instance]]:
"""Create a container for building MAAS package in."""
@@ -437,6 +509,7 @@ def install_deb(
["apt-cache", "policy", "maas"],
environment={"DEBIAN_FRONTEND": "noninteractive"},
) # just to record which version is running.
+<<<<<<< systemtests/fixtures.py
match = _DEB_GITREV_RE.search(policy.stdout)
if not match:
return ""
@@ -451,6 +524,13 @@ def get_maas_snap_version(maas_container: Instance) -> str:
return ""
entry = match.groupdict()
return entry["gitrev"]
+=======
+ try:
+ version = policy.stdout.split("\n")[1].strip().split(" ")[1][2:]
+ except IndexError:
+ version = ""
+ return version
+>>>>>>> systemtests/fixtures.py
@pytest.fixture(scope="session")
@@ -484,10 +564,15 @@ def maas_region(
maas_container.execute(["update-ca-certificates"])
if installed_from_snap:
- version = get_maas_snap_version(maas_container)
maas_already_initialized = maas_container.files[
"/var/snap/maas/common/snap_mode"
].exists()
+ # just to record which version is running.
+ snap_list = maas_container.execute(["snap", "list", "maas"])
+ try:
+ version = snap_list.stdout.split("\n")[1].split()[1]
+ except IndexError:
+ version = ""
if not maas_already_initialized:
maas_container.execute(
[
@@ -503,6 +588,11 @@ def maas_region(
else:
assert maas_deb_repo is not None
version = install_deb(maas_container, maas_deb_repo, config)
+<<<<<<< systemtests/fixtures.py
+=======
+ with open("version_under_test", "w") as fh:
+ fh.write(f"{version}\n")
+>>>>>>> systemtests/fixtures.py
# We never want to access the region via the system proxy
if "no_proxy" not in os.environ:
diff --git a/systemtests/image_tests/__init__.py b/systemtests/image_tests/__init__.py
new file mode 100644
index 0000000..8c18b57
--- /dev/null
+++ b/systemtests/image_tests/__init__.py
@@ -0,0 +1,4 @@
+"""
+Prepares a container running a temporal cluster,
+uses the cluster to execute image tests, and returns the results
+"""
diff --git a/systemtests/image_tests/test_images.py b/systemtests/image_tests/test_images.py
new file mode 100644
index 0000000..dd0e8ef
--- /dev/null
+++ b/systemtests/image_tests/test_images.py
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+import pytest
+
+from systemtests.temporal import TemporalMain
+
+from .workflows.workflows import TestTemporalWorkflow
+
+
+@pytest.mark.asyncio
+class TestImages:
+ async def test_completes_prototype_workflow(
+ self, temporal_main: TemporalMain
+ ) -> None:
+ name = "test"
+ client = await temporal_main.connect()
+ result = await temporal_main.run_workflow(
+ client, TestTemporalWorkflow, name, workflow_id="prototype-workflow"
+ )
+ assert result == f"Hello, {name}"
diff --git a/systemtests/image_tests/workflows/workflows.py b/systemtests/image_tests/workflows/workflows.py
new file mode 100644
index 0000000..ea24020
--- /dev/null
+++ b/systemtests/image_tests/workflows/workflows.py
@@ -0,0 +1,28 @@
+from __future__ import annotations
+
+import time
+from datetime import timedelta
+
+from temporalio import workflow
+
+from ...temporal_registry import register_activity, register_workflow
+
+NAME = "systemtests.temporal.workflow"
+
+
+@register_activity
+async def test_temporal(name: str) -> str:
+ time.sleep(1)
+ return f"Hello, {name}"
+
+
+@register_workflow
+class TestTemporalWorkflow:
+ @workflow.run
+ async def run(self, name: str) -> str:
+ res = await workflow.execute_activity(
+ test_temporal,
+ name,
+ schedule_to_close_timeout=timedelta(seconds=10),
+ )
+ return str(res)
diff --git a/systemtests/lxd.py b/systemtests/lxd.py
index 4ea9298..3b9a553 100644
--- a/systemtests/lxd.py
+++ b/systemtests/lxd.py
@@ -1,12 +1,13 @@
from __future__ import annotations
import json
+import multiprocessing
import subprocess
import textwrap
from functools import partial
from itertools import chain
from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, List, Optional
from netaddr import IPNetwork
from retry import retry
@@ -146,6 +147,7 @@ class Instance:
self._lxd = lxd
self._instance_name = instance_name
self.logger = lxd.logger.getChild(instance_name)
+ self.parallel_processes: List[multiprocessing.Process] = []
def __repr__(self) -> str:
return f"<Instance {self.name}>"
@@ -160,11 +162,14 @@ class Instance:
stdin: Optional[str] = None,
prefix: Optional[list[str]] = None,
logger: Optional[logging.Logger] = None,
+ parallel: Optional[bool] = False,
) -> subprocess.CompletedProcess[str]:
__tracebackhide__ = True
if logger is None:
logger = self.logger
- return run_with_logging(cmd, logger, prefix=prefix, stdin=stdin)
+ return run_with_logging(
+ cmd, logger, prefix=prefix, stdin=stdin, parallel=parallel
+ )
def exists(self) -> bool:
try:
@@ -247,6 +252,36 @@ class Instance:
)
return self._run_with_logger(executor, None)
+ def parallel_execute(
+ self,
+ command: list[str],
+ environment: Optional[dict[str, str]] = None,
+ working_dir: Optional[str] = None,
+ logger_name: str = "parallel-process",
+ completion_callback: Optional[Callable[..., Any]] = None,
+ ) -> None:
+ __tracebackhide__ = True
+
+ def run_in_parallel() -> None:
+ __tracebackhide__ = True
+ self.logger.info(f"Parallel process {process.name} started")
+ self._run_with_logger(executor, self.logger.getChild(logger_name))
+ if completion_callback:
+ completion_callback()
+ complete_parallel()
+
+ def complete_parallel() -> None:
+ self.logger.info(f"Parallel process {process.name} complete")
+ self.parallel_processes.remove(process)
+
+ lxc_command = self._get_lxc_command(environment, working_dir)
+ executor = partial(self._run, command, prefix=lxc_command, parallel=True)
+ process = multiprocessing.Process(
+ target=run_in_parallel, name=f"'{' '.join(command[:2])}'"
+ )
+ process.start()
+ self.parallel_processes.append(process)
+
@property
def files(self) -> _FilesWrapper:
return _FilesWrapper(self, self._run)
@@ -318,6 +353,7 @@ class Instance:
return self._run(["lxc", "start", self._instance_name])
def stop(self, force: bool = False) -> subprocess.CompletedProcess[str]:
+ self.terminate_parallel()
argv = ["lxc", "stop", self._instance_name]
if force:
argv.append("--force")
@@ -329,7 +365,17 @@ class Instance:
argv.append("--force")
return self._run(argv)
+ def terminate_parallel(self) -> None:
+ if self.parallel_processes:
+ self.logger.warn(
+ f"Terminating {len(self.parallel_processes)} parallel processes."
+ )
+ for proc in self.parallel_processes[::-1]:
+ proc.terminate()
+ self.parallel_processes.remove(proc)
+
def delete(self) -> None:
+ self.terminate_parallel()
self._run(["lxc", "delete", "--force", self._instance_name])
def status(self) -> str:
diff --git a/systemtests/subprocess.py b/systemtests/subprocess.py
index de427b0..4519bc9 100644
--- a/systemtests/subprocess.py
+++ b/systemtests/subprocess.py
@@ -16,11 +16,17 @@ def run_with_logging(
env: Optional[dict[str, str]] = None,
prefix: Optional[list[str]] = None,
stdin: Optional[str] = None,
+ parallel: Optional[bool] = False,
) -> subprocess.CompletedProcess[str]:
__tracebackhide__ = True
+ startchar, midchar, endchar = "┌", "|", "└"
if prefix is None:
prefix = []
- logger.info("┌ " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd))
+ if parallel or False:
+ startchar, midchar, endchar = "╔", "║", "╚"
+ logger.info(
+ f"{startchar} " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd)
+ )
process = subprocess.Popen(
prefix + cmd,
stdin=subprocess.PIPE,
@@ -43,8 +49,8 @@ def run_with_logging(
stdout = io.StringIO()
stderr = io.StringIO()
sel = selectors.DefaultSelector()
- sel.register(process.stdout, selectors.EVENT_READ, ("|", stdout.write))
- sel.register(process.stderr, selectors.EVENT_READ, ("|E", stderr.write))
+ sel.register(process.stdout, selectors.EVENT_READ, (f"{midchar} ", stdout.write))
+ sel.register(process.stderr, selectors.EVENT_READ, (f"{midchar}E", stderr.write))
with sel:
while sel.get_map():
for select, mask in sel.select():
@@ -69,11 +75,11 @@ def run_with_logging(
returncode = process.wait()
if returncode != 0:
- logger.warning(f"└ ❌ Return code: {returncode}")
+ logger.warning(f"{endchar} ❌ Return code: {returncode}")
raise subprocess.CalledProcessError(
returncode, prefix + cmd, stdout.getvalue(), stderr.getvalue()
)
- logger.info("└ ✔")
+ logger.info(f"{endchar} ✔")
return subprocess.CompletedProcess(
process.args, returncode, stdout.getvalue(), stderr.getvalue()
)
diff --git a/systemtests/temporal.py b/systemtests/temporal.py
new file mode 100644
index 0000000..c7d6196
--- /dev/null
+++ b/systemtests/temporal.py
@@ -0,0 +1,197 @@
+from __future__ import annotations
+
+import time
+from datetime import datetime
+from logging import getLogger
+from subprocess import CalledProcessError
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+from retry import retry
+from temporalio.client import Client
+from temporalio.types import ClassType
+
+from .ansible import apt_install, apt_update, clone_repo
+from .lxd import Instance
+from .temporal_registry import registry_worker, split_log
+
+if TYPE_CHECKING:
+ from logging import Logger
+
+ from .lxd import CLILXD
+
+NAME = "systemtests.temporal"
+LOG = getLogger(NAME)
+
+
+class TemporalMain:
+ def __init__(
+ self,
+ ip: str,
+ config: dict[str, Any],
+ logger: Logger,
+ ) -> None:
+ self._ip = ip
+ self.logger = logger
+ self._config = config
+ self.cluster_namespace = config.get("namespace", "default")
+ self.link_temporal_server(ip)
+
+ @property
+ def ip(self) -> str:
+ return self._ip
+
+ def link_temporal_server(self, ip: str) -> None:
+ self.cluster_server = f"{ip}:7233"
+ self.cluster_ui = f"{ip}:8233"
+ self.logger.info(f"Linked temporal cluster at {self.cluster_server}")
+ self.logger.info(f"Connect to temporal UI at {self.cluster_ui}")
+ self.logger.info(f"Using namespace {self.cluster_namespace}")
+
+ def __repr__(self) -> str:
+ return f"<TemporalMain at {self.ip}>"
+
+ def shutdown(self) -> None:
+ pass
+
+ async def connect(
+ self, override_ip: str = "", override_namespace: str = ""
+ ) -> Client:
+ @retry(tries=5, delay=10, logger=self.logger)
+ async def _connect() -> None:
+ self.client = await Client.connect(
+ override_ip or self.cluster_server,
+ namespace=override_namespace or self.cluster_namespace,
+ )
+
+ await _connect()
+ return self.client
+
+ async def run_workflow(
+ self,
+ client: Client,
+ Workflow: ClassType,
+ params: Any,
+ workflow_id: Optional[str] = "",
+ workflow_dependencies: Optional[Dict[str, Any]] = {},
+ ) -> Any:
+ wflow_name = Workflow.__name__
+ wflow_logger = getLogger(f"{NAME}.workflow.{wflow_name}")
+ wflow_logger.info(f"Request execution with params: {params}")
+ task_queue = "maas-temporal"
+ this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
+
+ # Run a worker for the workflow
+ async with registry_worker(
+ client,
+ task_queue=task_queue,
+ logger=self.logger,
+ dependencies=workflow_dependencies or {},
+ ):
+ result = await client.execute_workflow(
+ Workflow.run,
+ params,
+ id=f"{workflow_id or wflow_name}-{this_time}",
+ task_queue=task_queue,
+ )
+
+ split_log(self.logger.info, "Execution completed with output:", result)
+ return result
+
+
+# Used to locally host a temporal cluster for the duration of the tests
+class ManagedTemporalMain(TemporalMain):
+ def __init__(
+ self,
+ lxd: CLILXD,
+ instance: Instance,
+ config: dict[str, Any],
+ proxy_env: Optional[dict[str, str]],
+ ) -> None:
+ self._lxd = lxd
+ self.instance = instance
+ self._proxy_env = proxy_env
+ self._config = config
+
+ self.temporal_repo = "https://github.com/temporalio/temporalite.git"
+ self.temporal_repo_path = "/home/ubuntu/temporalite"
+
+ self.cluster_namespace = self._config.get("namespace", "default")
+
+ def setup(self) -> None:
+ self.logger.info("Installing apt-updates")
+ apt_update(self.instance, environment=self._proxy_env)
+
+ self.logger.info("Installing golang")
+ apt_install(self.instance, "golang", environment=self._proxy_env)
+
+ self.logger.info("Cloning temporalite repo")
+ clone_repo(self.instance, self.temporal_repo, self.temporal_repo_path)
+
+ self.logger.info("Installing temporalite")
+ self.instance.execute(
+ ["go", "build", "./cmd/temporalite"], working_dir=self.temporal_repo_path
+ )
+
+ def wait_for_host_port(
+ self, host_ip: str = "127.0.0.1", host_port: str = "80", timeout: int = 300
+ ) -> bool:
+ self.logger.info(
+ f"Waiting for {host_ip}:{host_port} to open with timeout {timeout}."
+ )
+ time_out = time.time() + timeout
+ connection = False
+ while time.time() < time_out:
+ try:
+ output = self.instance.quietly_execute(["nc", "-z", host_ip, host_port])
+ if connection := not bool(output.returncode):
+ break
+ except CalledProcessError:
+ time.sleep(1)
+ if not connection:
+ raise ConnectionError(f"Could not connect to {host_ip}:{host_port} in time")
+ self.logger.info(f"{host_ip}:{host_port} reached.")
+ return connection
+
+ def start_temporal_server(self) -> None:
+ self.logger.info("Starting temporalite cluster")
+ self.instance.parallel_execute(
+ [
+ "./temporalite",
+ "start",
+ "--namespace",
+ self.cluster_namespace,
+ "--ephemeral",
+ "--log-level",
+ "warn",
+ "--log-format",
+ "pretty",
+ ],
+ working_dir=self.temporal_repo_path,
+ logger_name="temporalite",
+ )
+ self.wait_for_host_port(host_port="8233")
+
+ def return_exec(
+ self, command: list[str], environment: Optional[dict[str, str]] = None
+ ) -> str:
+ return self.instance.execute(
+ command=command, environment=environment
+ ).stdout.rstrip("\n")
+
+ def __repr__(self) -> str:
+ return f"<ManagedTemporalMain in {self.instance}>"
+
+ @property
+ def ip(self) -> str:
+ return self.instance.get_ip_address()
+
+ @property
+ def logger(self) -> Logger:
+ return self.instance.logger
+
+ @logger.setter
+ def logger(self, logger: Logger) -> None:
+ self.instance.logger = logger
+
+ def stop(self) -> None:
+ self.instance.delete()
diff --git a/systemtests/temporal_registry.py b/systemtests/temporal_registry.py
new file mode 100644
index 0000000..ff592c5
--- /dev/null
+++ b/systemtests/temporal_registry.py
@@ -0,0 +1,161 @@
+"""
+ Duplicates and modifies the registry code in @igor-brovtsin's
+ https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
+
+"""
+
+from __future__ import annotations
+
+import abc
+import inspect
+import logging
+from typing import Any, Callable, Dict, List, Optional, Set, Type
+
+from temporalio import activity, workflow
+from temporalio.client import Client
+from temporalio.types import CallableType, ClassType
+from temporalio.worker import Worker
+
+
+def split_log(
+ log_func: Callable[..., Any],
+ heading: str,
+ results: str | list[str],
+ mid_char: str = "│",
+) -> None:
+ res = results if isinstance(results, list) else results.split("\n")
+ log_func(heading)
+ for idx, line in enumerate(res):
+ log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
+
+
+def workflow_registrar() -> Callable[..., Any]:
+ """Generates a wrapper that applies `@temporalio.workflow.defn` to
+ a workflow class, and adds it to the list that can later be passed
+ to the Temporal client"""
+ registered = []
+
+ def wrapper( # type:ignore
+ cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
+ ) -> Callable[..., Any]:
+ def decorator(cls: ClassType) -> ClassType:
+ registered.append(cls)
+ return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
+
+ if cls is not None:
+ return decorator(cls)
+ return decorator
+
+ wrapper.workflows = registered # type:ignore
+ return wrapper
+
+
+def activity_registrar() -> Callable[..., Any]:
+ """Generates a wrapper that applies `@temporalio.activity.defn` to
+ a callable, and adds it to the list that can later be passed
+ to the Temporal client"""
+ registry = []
+
+ def decorator( # type:ignore
+ f: CallableType, *args, **kwargs
+ ) -> Callable[..., Any]:
+ wrapped = activity.defn(*args, **kwargs)(f)
+ registry.append(wrapped)
+ return wrapped # type:ignore
+
+ decorator.activities = registry # type:ignore
+ return decorator
+
+
+register_workflow = workflow_registrar()
+register_activity = activity_registrar()
+
+
+class TemporalActivityClass(abc.ABC):
+ """Abstract class for Temporal activities that require external
+ dependencies: DB connections, LXD containers, HTTP clients, etc.
+ """
+
+ CLASSES: Set[Type[TemporalActivityClass]] = set()
+
+ def __init_subclass__(cls, **kwargs) -> None: # type:ignore
+ super().__init_subclass__(**kwargs)
+ TemporalActivityClass.CLASSES.add(cls)
+
+ def get_activities(self) -> List[Any]:
+ result = []
+ for name in dir(self):
+ member = getattr(self, name)
+ if not callable(member) or not hasattr(
+ member, "__temporal_activity_definition"
+ ):
+ continue
+ result.append(member)
+ return result
+
+
+class ActivityClassDependencyMissingException(Exception):
+ """Raised when an activity does not provide a reqiored dependency."""
+
+
+def _initialize_activity_classes(
+ dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
+) -> List[CallableType]:
+ results = []
+ for cls in repo:
+ init = getattr(cls, "__init__")
+ kwargs = {}
+ for i, value in enumerate(inspect.signature(init).parameters.values()):
+ if i == 0:
+ continue
+ if value.name in dependencies:
+ kwargs[value.name] = dependencies[value.name]
+ elif value.default is inspect.Parameter.empty:
+ raise ActivityClassDependencyMissingException(
+ f"{cls} requires dependency `{value.name}` that was not provided!"
+ )
+ results += cls(**kwargs).get_activities() # type:ignore
+ return results
+
+
+def registry_worker( # type:ignore
+ client: Client,
+ task_queue: str,
+ logger: logging.Logger,
+ dependencies: Dict[str, Any],
+ **kwargs,
+) -> Worker:
+ """Helper method that returns worker that uses `register_workflow`
+ and `register_activity` registries."""
+ kwargs.setdefault("workflows", [])
+ kwargs.setdefault("activities", [])
+ assert isinstance(
+ kwargs["workflows"], list
+ ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
+ assert isinstance(
+ kwargs["activities"], list
+ ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
+ kwargs["workflows"].extend(register_workflow.workflows) # type:ignore
+ kwargs["activities"].extend(register_activity.activities) # type:ignore
+
+ class_activities = _initialize_activity_classes( # type:ignore
+ dependencies, TemporalActivityClass.CLASSES # type:ignore
+ )
+ kwargs["activities"].extend(class_activities)
+
+ workflows_names = [
+ f"{w.__module__}.{w.__qualname__}"
+ if isinstance(w, type)
+ else f"{w} of {w.__module__}"
+ for w in kwargs["workflows"]
+ ]
+ activities_names = [
+ f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
+ ]
+
+ if workflows_names:
+ split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
+ if activities_names:
+ split_log(logger.info, "Registered activities", activities_names, mid_char="├")
+
+ return Worker(client=client, task_queue=task_queue, **kwargs)
diff --git a/tox.ini b/tox.ini
index 34ab368..b7d7005 100644
--- a/tox.ini
+++ b/tox.ini
@@ -26,7 +26,7 @@ passenv =
MAAS_SYSTEMTESTS_CLIENT_CONTAINER
MAAS_SYSTEMTESTS_LXD_PROFILE
-[testenv:{env_builder,collect_sos_report,general_tests,ansible_tests}]
+[testenv:{env_builder,collect_sos_report,general_tests,ansible_tests,image_tests}]
passenv = {[base]passenv}
[testenv:cog]
@@ -56,6 +56,11 @@ commands=
# passenv = {{[base]passenv}}
# """)
#]]]
+
+[testenv:{opelt,stunky,vm1}]
+description=Per-machine tests for {envname}
+passenv = {[base]passenv}
+
#[[[end]]]
[testenv:format]
Follow ups