← Back to team overview

sts-sponsors team mailing list archive

Re: [Merge] ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master

 


Diff comments:

> diff --git a/systemtests/fixtures.py b/systemtests/fixtures.py
> index 64d3974..fcf6f43 100644
> --- a/systemtests/fixtures.py
> +++ b/systemtests/fixtures.py
> @@ -484,10 +520,15 @@ def maas_region(
>          maas_container.execute(["update-ca-certificates"])
>  
>      if installed_from_snap:
> -        version = get_maas_snap_version(maas_container)
>          maas_already_initialized = maas_container.files[
>              "/var/snap/maas/common/snap_mode"
>          ].exists()
> +        # just to record which version is running.
> +        snap_list = maas_container.execute(["snap", "list", "maas"])
> +        try:
> +            version = snap_list.stdout.split("\n")[1].split()[1]
> +        except IndexError:
> +            version = ""

bogus conflict merge?

>          if not maas_already_initialized:
>              maas_container.execute(
>                  [
> diff --git a/systemtests/subprocess.py b/systemtests/subprocess.py
> index de427b0..4519bc9 100644
> --- a/systemtests/subprocess.py
> +++ b/systemtests/subprocess.py
> @@ -16,11 +16,17 @@ def run_with_logging(
>      env: Optional[dict[str, str]] = None,
>      prefix: Optional[list[str]] = None,
>      stdin: Optional[str] = None,
> +    parallel: Optional[bool] = False,
>  ) -> subprocess.CompletedProcess[str]:
>      __tracebackhide__ = True
> +    startchar, midchar, endchar = "┌", "|", "└"
>      if prefix is None:
>          prefix = []
> -    logger.info("┌ " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd))
> +    if parallel or False:

huh? the or False here is redundant?

> +        startchar, midchar, endchar = "╔", "║", "╚"
> +    logger.info(
> +        f"{startchar} " + " ".join(repr(arg) if "\n" in arg else arg for arg in cmd)
> +    )
>      process = subprocess.Popen(
>          prefix + cmd,
>          stdin=subprocess.PIPE,
> diff --git a/systemtests/temporal.py b/systemtests/temporal.py
> new file mode 100644
> index 0000000..e93a309
> --- /dev/null
> +++ b/systemtests/temporal.py
> @@ -0,0 +1,197 @@
> +from __future__ import annotations
> +
> +import time
> +from datetime import datetime
> +from logging import getLogger
> +from subprocess import CalledProcessError
> +from typing import TYPE_CHECKING, Any, Dict, Optional
> +
> +from retry import retry
> +from temporalio.client import Client
> +from temporalio.types import ClassType
> +
> +from .ansible import apt_install, apt_update, clone_repo

this needs a TODO or similar to extract to a common location

> +from .lxd import Instance
> +from .temporal_registry import registry_worker, split_log
> +
> +if TYPE_CHECKING:
> +    from logging import Logger
> +
> +    from .lxd import CLILXD
> +
> +NAME = "systemtests.temporal"
> +LOG = getLogger(NAME)
> +
> +
> +class TemporalMain:

this isn't a great name... am not sure what is better yet

> +    def __init__(
> +        self,
> +        ip: str,
> +        config: dict[str, Any],
> +        logger: Logger,
> +    ) -> None:
> +        self._ip = ip
> +        self.logger = logger
> +        self._config = config
> +        self.cluster_namespace = config.get("namespace", "default")
> +        self.link_temporal_server(ip)
> +
> +    @property
> +    def ip(self) -> str:
> +        return self._ip
> +
> +    def link_temporal_server(self, ip: str) -> None:
> +        self.cluster_server = f"{ip}:7233"
> +        self.cluster_ui = f"{ip}:8233"
> +        self.logger.info(f"Linked temporal cluster at {self.cluster_server}")
> +        self.logger.info(f"Connect to temporal UI at {self.cluster_ui}")
> +        self.logger.info(f"Using namespace {self.cluster_namespace}")
> +
> +    def __repr__(self) -> str:
> +        return f"<TemporalMain at {self.ip}>"
> +
> +    def shutdown(self) -> None:
> +        pass
> +
> +    async def connect(
> +        self, override_ip: str = "", override_namespace: str = ""
> +    ) -> Client:
> +        @retry(tries=5, delay=10, logger=self.logger)
> +        async def _connect() -> None:
> +            self.client = await Client.connect(
> +                override_ip or self.cluster_server,
> +                namespace=override_namespace or self.cluster_namespace,
> +            )
> +
> +        await _connect()
> +        return self.client
> +
> +    async def run_workflow(
> +        self,
> +        client: Client,
> +        Workflow: ClassType,
> +        params: Any,
> +        workflow_id: Optional[str] = "",
> +        workflow_dependencies: Optional[Dict[str, Any]] = {},
> +    ) -> Any:
> +        wflow_name = Workflow.__name__
> +        wflow_logger = getLogger(f"{NAME}.workflow.{wflow_name}")
> +        wflow_logger.info(f"Request execution with params: {params}")
> +        task_queue = "maas-temporal"
> +        this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
> +
> +        # Run a worker for the workflow
> +        async with registry_worker(
> +            client,
> +            task_queue=task_queue,
> +            logger=self.logger,
> +            dependencies=workflow_dependencies or {},
> +        ):
> +            result = await client.execute_workflow(
> +                Workflow.run,
> +                params,
> +                id=f"{workflow_id or wflow_name}-{this_time}",
> +                task_queue=task_queue,
> +            )
> +
> +            split_log(self.logger.info, "Execution completed with output:", result)
> +            return result
> +
> +
> +# Used to locally host a temporal cluster for the duration of the tests
> +class ManagedTemporalMain(TemporalMain):
> +    def __init__(
> +        self,
> +        lxd: CLILXD,
> +        instance: Instance,
> +        config: dict[str, Any],
> +        proxy_env: Optional[dict[str, str]],
> +    ) -> None:
> +        self._lxd = lxd
> +        self.instance = instance
> +        self._proxy_env = proxy_env
> +        self._config = config
> +
> +        self.temporal_repo = "https://github.com/temporalio/temporalite.git";
> +        self.temporal_repo_path = "/home/ubuntu/temporalite"
> +
> +        self.cluster_namespace = self._config.get("namespace", "default")
> +
> +    def setup(self) -> None:
> +        self.logger.info("Installing apt-updates")
> +        apt_update(self.instance, environment=self._proxy_env)
> +
> +        self.logger.info("Installing golang")
> +        apt_install(self.instance, "golang", environment=self._proxy_env)
> +
> +        self.logger.info("Cloning temporalite repo")
> +        clone_repo(self.instance, self.temporal_repo, "main", self.temporal_repo_path)
> +
> +        self.logger.info("Installing temporalite")
> +        self.instance.execute(
> +            ["go", "build", "./cmd/temporalite"], working_dir=self.temporal_repo_path
> +        )
> +
> +    def wait_for_host_port(
> +        self, host_ip: str = "127.0.0.1", host_port: str = "80", timeout: int = 300
> +    ) -> bool:
> +        self.logger.info(
> +            f"Waiting for {host_ip}:{host_port} to open with timeout {timeout}."
> +        )
> +        time_out = time.time() + timeout
> +        connection = False
> +        while time.time() < time_out:
> +            try:
> +                output = self.instance.quietly_execute(["nc", "-z", host_ip, host_port])
> +                if connection := not bool(output.returncode):
> +                    break
> +            except CalledProcessError:
> +                time.sleep(1)
> +        if not connection:
> +            raise ConnectionError(f"Could not connect to {host_ip}:{host_port} in time")
> +        self.logger.info(f"{host_ip}:{host_port} reached.")
> +        return connection
> +
> +    def start_temporal_server(self) -> None:
> +        self.logger.info("Starting temporalite cluster")
> +        self.instance.parallel_execute(
> +            [
> +                "./temporalite",
> +                "start",
> +                "--namespace",
> +                self.cluster_namespace,
> +                "--ephemeral",
> +                "--log-level",
> +                "warn",
> +                "--log-format",
> +                "pretty",
> +            ],
> +            working_dir=self.temporal_repo_path,
> +            logger_name="temporalite",
> +        )
> +        self.wait_for_host_port(host_port="8233")
> +
> +    def return_exec(
> +        self, command: list[str], environment: Optional[dict[str, str]] = None
> +    ) -> str:
> +        return self.instance.execute(
> +            command=command, environment=environment
> +        ).stdout.rstrip("\n")
> +
> +    def __repr__(self) -> str:
> +        return f"<ManagedTemporalMain in {self.instance}>"
> +
> +    @property
> +    def ip(self) -> str:
> +        return self.instance.get_ip_address()
> +
> +    @property
> +    def logger(self) -> Logger:
> +        return self.instance.logger
> +
> +    @logger.setter
> +    def logger(self, logger: Logger) -> None:
> +        self.instance.logger = logger
> +
> +    def stop(self) -> None:
> +        self.instance.delete()
> diff --git a/systemtests/temporal_registry.py b/systemtests/temporal_registry.py
> new file mode 100644
> index 0000000..ff592c5
> --- /dev/null
> +++ b/systemtests/temporal_registry.py
> @@ -0,0 +1,161 @@
> +"""
> +    Duplicates and modifies the registry code in @igor-brovtsin's
> +    https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
> +
> +"""
> +
> +from __future__ import annotations
> +
> +import abc
> +import inspect
> +import logging
> +from typing import Any, Callable, Dict, List, Optional, Set, Type
> +
> +from temporalio import activity, workflow
> +from temporalio.client import Client
> +from temporalio.types import CallableType, ClassType
> +from temporalio.worker import Worker
> +
> +
> +def split_log(
> +    log_func: Callable[..., Any],
> +    heading: str,
> +    results: str | list[str],
> +    mid_char: str = "│",
> +) -> None:
> +    res = results if isinstance(results, list) else results.split("\n")
> +    log_func(heading)
> +    for idx, line in enumerate(res):
> +        log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
> +
> +
> +def workflow_registrar() -> Callable[..., Any]:
> +    """Generates a wrapper that applies `@temporalio.workflow.defn` to
> +    a workflow class, and adds it to the list that can later be passed
> +    to the Temporal client"""
> +    registered = []
> +
> +    def wrapper(  # type:ignore
> +        cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
> +    ) -> Callable[..., Any]:
> +        def decorator(cls: ClassType) -> ClassType:
> +            registered.append(cls)
> +            return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
> +
> +        if cls is not None:
> +            return decorator(cls)
> +        return decorator
> +
> +    wrapper.workflows = registered  # type:ignore
> +    return wrapper
> +
> +
> +def activity_registrar() -> Callable[..., Any]:
> +    """Generates a wrapper that applies `@temporalio.activity.defn` to
> +    a callable, and adds it to the list that can later be passed
> +    to the Temporal client"""
> +    registry = []
> +
> +    def decorator(  # type:ignore
> +        f: CallableType, *args, **kwargs
> +    ) -> Callable[..., Any]:
> +        wrapped = activity.defn(*args, **kwargs)(f)
> +        registry.append(wrapped)
> +        return wrapped  # type:ignore
> +
> +    decorator.activities = registry  # type:ignore
> +    return decorator
> +
> +
> +register_workflow = workflow_registrar()
> +register_activity = activity_registrar()
> +
> +
> +class TemporalActivityClass(abc.ABC):
> +    """Abstract class for Temporal activities that require external
> +    dependencies: DB connections, LXD containers, HTTP clients, etc.
> +    """
> +
> +    CLASSES: Set[Type[TemporalActivityClass]] = set()
> +
> +    def __init_subclass__(cls, **kwargs) -> None:  # type:ignore
> +        super().__init_subclass__(**kwargs)
> +        TemporalActivityClass.CLASSES.add(cls)
> +
> +    def get_activities(self) -> List[Any]:
> +        result = []
> +        for name in dir(self):
> +            member = getattr(self, name)

eek this is a smell - what is it trying to do?

> +            if not callable(member) or not hasattr(
> +                member, "__temporal_activity_definition"
> +            ):
> +                continue
> +            result.append(member)
> +        return result
> +
> +
> +class ActivityClassDependencyMissingException(Exception):
> +    """Raised when an activity does not provide a reqiored dependency."""
> +
> +
> +def _initialize_activity_classes(
> +    dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
> +) -> List[CallableType]:
> +    results = []
> +    for cls in repo:
> +        init = getattr(cls, "__init__")
> +        kwargs = {}
> +        for i, value in enumerate(inspect.signature(init).parameters.values()):
> +            if i == 0:
> +                continue
> +            if value.name in dependencies:
> +                kwargs[value.name] = dependencies[value.name]
> +            elif value.default is inspect.Parameter.empty:
> +                raise ActivityClassDependencyMissingException(
> +                    f"{cls} requires dependency `{value.name}` that was not provided!"
> +                )
> +        results += cls(**kwargs).get_activities()  # type:ignore
> +    return results
> +
> +
> +def registry_worker(  # type:ignore
> +    client: Client,
> +    task_queue: str,
> +    logger: logging.Logger,
> +    dependencies: Dict[str, Any],
> +    **kwargs,
> +) -> Worker:
> +    """Helper method that returns worker that uses `register_workflow`
> +    and `register_activity` registries."""
> +    kwargs.setdefault("workflows", [])
> +    kwargs.setdefault("activities", [])
> +    assert isinstance(
> +        kwargs["workflows"], list
> +    ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
> +    assert isinstance(
> +        kwargs["activities"], list
> +    ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
> +    kwargs["workflows"].extend(register_workflow.workflows)  # type:ignore
> +    kwargs["activities"].extend(register_activity.activities)  # type:ignore
> +
> +    class_activities = _initialize_activity_classes(  # type:ignore
> +        dependencies, TemporalActivityClass.CLASSES  # type:ignore
> +    )
> +    kwargs["activities"].extend(class_activities)
> +
> +    workflows_names = [
> +        f"{w.__module__}.{w.__qualname__}"
> +        if isinstance(w, type)
> +        else f"{w} of {w.__module__}"
> +        for w in kwargs["workflows"]
> +    ]
> +    activities_names = [
> +        f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
> +    ]
> +
> +    if workflows_names:
> +        split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
> +    if activities_names:
> +        split_log(logger.info, "Registered activities", activities_names, mid_char="├")
> +
> +    return Worker(client=client, task_queue=task_queue, **kwargs)
> diff --git a/tox.ini b/tox.ini
> index 34ab368..b7d7005 100644
> --- a/tox.ini
> +++ b/tox.ini
> @@ -56,6 +56,11 @@ commands=
>  #    passenv = {{[base]passenv}}
>  #    """)
>  #]]]
> +
> +[testenv:{opelt,stunky,vm1}]
> +description=Per-machine tests for {envname}
> +passenv = {[base]passenv}
> +

Be sure to clean this up - shouldn't be checked in

>  #[[[end]]]
>  
>  [testenv:format]


-- 
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443010
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:temporal-boilerplate into ~maas-committers/maas-ci/+git/system-tests:master.



References