sts-sponsors team mailing list archive
-
sts-sponsors team
-
Mailing list archive
-
Message #08492
Re: [Merge] ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master
Diff comments:
> diff --git a/systemtests/fixtures.py b/systemtests/fixtures.py
> index 64d3974..fcf6f43 100644
> --- a/systemtests/fixtures.py
> +++ b/systemtests/fixtures.py
> @@ -484,10 +520,15 @@ def maas_region(
> maas_container.execute(["update-ca-certificates"])
>
> if installed_from_snap:
> - version = get_maas_snap_version(maas_container)
> maas_already_initialized = maas_container.files[
> "/var/snap/maas/common/snap_mode"
> ].exists()
> + # just to record which version is running.
> + snap_list = maas_container.execute(["snap", "list", "maas"])
> + try:
> + version = snap_list.stdout.split("\n")[1].split()[1]
> + except IndexError:
> + version = ""
bogus conflict merge?
> if not maas_already_initialized:
> maas_container.execute(
> [
> diff --git a/systemtests/subprocess.py b/systemtests/subprocess.py
> index de427b0..4519bc9 100644
> --- a/systemtests/subprocess.py
> +++ b/systemtests/subprocess.py
> @@ -16,11 +16,17 @@ def run_with_logging(
> env: Optional[dict[str, str]] = None,
> prefix: Optional[list[str]] = None,
> stdin: Optional[str] = None,
> + parallel: Optional[bool] = False,
> ) -> subprocess.CompletedProcess[str]:
> __tracebackhide__ = True
> + startchar, midchar, endchar = "┌", "|", "└"
> if prefix is None:
> prefix = []
> - logger.info("┌ " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd))
> + if parallel or False:
huh? the or False here is redundant?
> + startchar, midchar, endchar = "╔", "║", "╚"
> + logger.info(
> + f"{startchar} " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd)
> + )
> process = subprocess.Popen(
> prefix + cmd,
> stdin=subprocess.PIPE,
> diff --git a/systemtests/temporal.py b/systemtests/temporal.py
> new file mode 100644
> index 0000000..e93a309
> --- /dev/null
> +++ b/systemtests/temporal.py
> @@ -0,0 +1,197 @@
> +from __future__ import annotations
> +
> +import time
> +from datetime import datetime
> +from logging import getLogger
> +from subprocess import CalledProcessError
> +from typing import TYPE_CHECKING, Any, Dict, Optional
> +
> +from retry import retry
> +from temporalio.client import Client
> +from temporalio.types import ClassType
> +
> +from .ansible import apt_install, apt_update, clone_repo
this needs a TODO or similar to extract to a common location
> +from .lxd import Instance
> +from .temporal_registry import registry_worker, split_log
> +
> +if TYPE_CHECKING:
> + from logging import Logger
> +
> + from .lxd import CLILXD
> +
> +NAME = "systemtests.temporal"
> +LOG = getLogger(NAME)
> +
> +
> +class TemporalMain:
this isn't a great name... am not sure what is better yet
> + def __init__(
> + self,
> + ip: str,
> + config: dict[str, Any],
> + logger: Logger,
> + ) -> None:
> + self._ip = ip
> + self.logger = logger
> + self._config = config
> + self.cluster_namespace = config.get("namespace", "default")
> + self.link_temporal_server(ip)
> +
> + @property
> + def ip(self) -> str:
> + return self._ip
> +
> + def link_temporal_server(self, ip: str) -> None:
> + self.cluster_server = f"{ip}:7233"
> + self.cluster_ui = f"{ip}:8233"
> + self.logger.info(f"Linked temporal cluster at {self.cluster_server}")
> + self.logger.info(f"Connect to temporal UI at {self.cluster_ui}")
> + self.logger.info(f"Using namespace {self.cluster_namespace}")
> +
> + def __repr__(self) -> str:
> + return f"<TemporalMain at {self.ip}>"
> +
> + def shutdown(self) -> None:
> + pass
> +
> + async def connect(
> + self, override_ip: str = "", override_namespace: str = ""
> + ) -> Client:
> + @retry(tries=5, delay=10, logger=self.logger)
> + async def _connect() -> None:
> + self.client = await Client.connect(
> + override_ip or self.cluster_server,
> + namespace=override_namespace or self.cluster_namespace,
> + )
> +
> + await _connect()
> + return self.client
> +
> + async def run_workflow(
> + self,
> + client: Client,
> + Workflow: ClassType,
> + params: Any,
> + workflow_id: Optional[str] = "",
> + workflow_dependencies: Optional[Dict[str, Any]] = {},
> + ) -> Any:
> + wflow_name = Workflow.__name__
> + wflow_logger = getLogger(f"{NAME}.workflow.{wflow_name}")
> + wflow_logger.info(f"Request execution with params: {params}")
> + task_queue = "maas-temporal"
> + this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
> +
> + # Run a worker for the workflow
> + async with registry_worker(
> + client,
> + task_queue=task_queue,
> + logger=self.logger,
> + dependencies=workflow_dependencies or {},
> + ):
> + result = await client.execute_workflow(
> + Workflow.run,
> + params,
> + id=f"{workflow_id or wflow_name}-{this_time}",
> + task_queue=task_queue,
> + )
> +
> + split_log(self.logger.info, "Execution completed with output:", result)
> + return result
> +
> +
> +# Used to locally host a temporal cluster for the duration of the tests
> +class ManagedTemporalMain(TemporalMain):
> + def __init__(
> + self,
> + lxd: CLILXD,
> + instance: Instance,
> + config: dict[str, Any],
> + proxy_env: Optional[dict[str, str]],
> + ) -> None:
> + self._lxd = lxd
> + self.instance = instance
> + self._proxy_env = proxy_env
> + self._config = config
> +
> + self.temporal_repo = "https://github.com/temporalio/temporalite.git"
> + self.temporal_repo_path = "/home/ubuntu/temporalite"
> +
> + self.cluster_namespace = self._config.get("namespace", "default")
> +
> + def setup(self) -> None:
> + self.logger.info("Installing apt-updates")
> + apt_update(self.instance, environment=self._proxy_env)
> +
> + self.logger.info("Installing golang")
> + apt_install(self.instance, "golang", environment=self._proxy_env)
> +
> + self.logger.info("Cloning temporalite repo")
> + clone_repo(self.instance, self.temporal_repo, "main", self.temporal_repo_path)
> +
> + self.logger.info("Installing temporalite")
> + self.instance.execute(
> + ["go", "build", "./cmd/temporalite"], working_dir=self.temporal_repo_path
> + )
> +
> + def wait_for_host_port(
> + self, host_ip: str = "127.0.0.1", host_port: str = "80", timeout: int = 300
> + ) -> bool:
> + self.logger.info(
> + f"Waiting for {host_ip}:{host_port} to open with timeout {timeout}."
> + )
> + time_out = time.time() + timeout
> + connection = False
> + while time.time() < time_out:
> + try:
> + output = self.instance.quietly_execute(["nc", "-z", host_ip, host_port])
> + if connection := not bool(output.returncode):
> + break
> + except CalledProcessError:
> + time.sleep(1)
> + if not connection:
> + raise ConnectionError(f"Could not connect to {host_ip}:{host_port} in time")
> + self.logger.info(f"{host_ip}:{host_port} reached.")
> + return connection
> +
> + def start_temporal_server(self) -> None:
> + self.logger.info("Starting temporalite cluster")
> + self.instance.parallel_execute(
> + [
> + "./temporalite",
> + "start",
> + "--namespace",
> + self.cluster_namespace,
> + "--ephemeral",
> + "--log-level",
> + "warn",
> + "--log-format",
> + "pretty",
> + ],
> + working_dir=self.temporal_repo_path,
> + logger_name="temporalite",
> + )
> + self.wait_for_host_port(host_port="8233")
> +
> + def return_exec(
> + self, command: list[str], environment: Optional[dict[str, str]] = None
> + ) -> str:
> + return self.instance.execute(
> + command=command, environment=environment
> + ).stdout.rstrip("\n")
> +
> + def __repr__(self) -> str:
> + return f"<ManagedTemporalMain in {self.instance}>"
> +
> + @property
> + def ip(self) -> str:
> + return self.instance.get_ip_address()
> +
> + @property
> + def logger(self) -> Logger:
> + return self.instance.logger
> +
> + @logger.setter
> + def logger(self, logger: Logger) -> None:
> + self.instance.logger = logger
> +
> + def stop(self) -> None:
> + self.instance.delete()
> diff --git a/systemtests/temporal_registry.py b/systemtests/temporal_registry.py
> new file mode 100644
> index 0000000..ff592c5
> --- /dev/null
> +++ b/systemtests/temporal_registry.py
> @@ -0,0 +1,161 @@
> +"""
> + Duplicates and modifies the registry code in @igor-brovtsin's
> + https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
> +
> +"""
> +
> +from __future__ import annotations
> +
> +import abc
> +import inspect
> +import logging
> +from typing import Any, Callable, Dict, List, Optional, Set, Type
> +
> +from temporalio import activity, workflow
> +from temporalio.client import Client
> +from temporalio.types import CallableType, ClassType
> +from temporalio.worker import Worker
> +
> +
> +def split_log(
> + log_func: Callable[..., Any],
> + heading: str,
> + results: str | list[str],
> + mid_char: str = "│",
> +) -> None:
> + res = results if isinstance(results, list) else results.split("\n")
> + log_func(heading)
> + for idx, line in enumerate(res):
> + log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
> +
> +
> +def workflow_registrar() -> Callable[..., Any]:
> + """Generates a wrapper that applies `@temporalio.workflow.defn` to
> + a workflow class, and adds it to the list that can later be passed
> + to the Temporal client"""
> + registered = []
> +
> + def wrapper( # type:ignore
> + cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
> + ) -> Callable[..., Any]:
> + def decorator(cls: ClassType) -> ClassType:
> + registered.append(cls)
> + return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
> +
> + if cls is not None:
> + return decorator(cls)
> + return decorator
> +
> + wrapper.workflows = registered # type:ignore
> + return wrapper
> +
> +
> +def activity_registrar() -> Callable[..., Any]:
> + """Generates a wrapper that applies `@temporalio.activity.defn` to
> + a callable, and adds it to the list that can later be passed
> + to the Temporal client"""
> + registry = []
> +
> + def decorator( # type:ignore
> + f: CallableType, *args, **kwargs
> + ) -> Callable[..., Any]:
> + wrapped = activity.defn(*args, **kwargs)(f)
> + registry.append(wrapped)
> + return wrapped # type:ignore
> +
> + decorator.activities = registry # type:ignore
> + return decorator
> +
> +
> +register_workflow = workflow_registrar()
> +register_activity = activity_registrar()
> +
> +
> +class TemporalActivityClass(abc.ABC):
> + """Abstract class for Temporal activities that require external
> + dependencies: DB connections, LXD containers, HTTP clients, etc.
> + """
> +
> + CLASSES: Set[Type[TemporalActivityClass]] = set()
> +
> + def __init_subclass__(cls, **kwargs) -> None: # type:ignore
> + super().__init_subclass__(**kwargs)
> + TemporalActivityClass.CLASSES.add(cls)
> +
> + def get_activities(self) -> List[Any]:
> + result = []
> + for name in dir(self):
> + member = getattr(self, name)
eek this is a smell - what is it trying to do?
> + if not callable(member) or not hasattr(
> + member, "__temporal_activity_definition"
> + ):
> + continue
> + result.append(member)
> + return result
> +
> +
> +class ActivityClassDependencyMissingException(Exception):
> + """Raised when an activity does not provide a reqiored dependency."""
> +
> +
> +def _initialize_activity_classes(
> + dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
> +) -> List[CallableType]:
> + results = []
> + for cls in repo:
> + init = getattr(cls, "__init__")
> + kwargs = {}
> + for i, value in enumerate(inspect.signature(init).parameters.values()):
> + if i == 0:
> + continue
> + if value.name in dependencies:
> + kwargs[value.name] = dependencies[value.name]
> + elif value.default is inspect.Parameter.empty:
> + raise ActivityClassDependencyMissingException(
> + f"{cls} requires dependency `{value.name}` that was not provided!"
> + )
> + results += cls(**kwargs).get_activities() # type:ignore
> + return results
> +
> +
> +def registry_worker( # type:ignore
> + client: Client,
> + task_queue: str,
> + logger: logging.Logger,
> + dependencies: Dict[str, Any],
> + **kwargs,
> +) -> Worker:
> + """Helper method that returns worker that uses `register_workflow`
> + and `register_activity` registries."""
> + kwargs.setdefault("workflows", [])
> + kwargs.setdefault("activities", [])
> + assert isinstance(
> + kwargs["workflows"], list
> + ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
> + assert isinstance(
> + kwargs["activities"], list
> + ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
> + kwargs["workflows"].extend(register_workflow.workflows) # type:ignore
> + kwargs["activities"].extend(register_activity.activities) # type:ignore
> +
> + class_activities = _initialize_activity_classes( # type:ignore
> + dependencies, TemporalActivityClass.CLASSES # type:ignore
> + )
> + kwargs["activities"].extend(class_activities)
> +
> + workflows_names = [
> + f"{w.__module__}.{w.__qualname__}"
> + if isinstance(w, type)
> + else f"{w} of {w.__module__}"
> + for w in kwargs["workflows"]
> + ]
> + activities_names = [
> + f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
> + ]
> +
> + if workflows_names:
> + split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
> + if activities_names:
> + split_log(logger.info, "Registered activities", activities_names, mid_char="├")
> +
> + return Worker(client=client, task_queue=task_queue, **kwargs)
> diff --git a/tox.ini b/tox.ini
> index 34ab368..b7d7005 100644
> --- a/tox.ini
> +++ b/tox.ini
> @@ -56,6 +56,11 @@ commands=
> # passenv = {{[base]passenv}}
> # """)
> #]]]
> +
> +[testenv:{opelt,stunky,vm1}]
> +description=Per-machine tests for {envname}
> +passenv = {[base]passenv}
> +
Be sure to clean this up - shouldn't be checked in
> #[[[end]]]
>
> [testenv:format]
--
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443010
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.
References