← Back to team overview

sts-sponsors team mailing list archive

[Merge] ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master

 

Jack Lloyd-Walters has proposed merging ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master.

Commit message:
add ability to execute single workflows from tox for debugging

Requested reviews:
  MAAS Committers (maas-committers)

For more details, see:
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443669
-- 
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master.
diff --git a/setup.py b/setup.py
index 4f8f242..06ef27c 100644
--- a/setup.py
+++ b/setup.py
@@ -11,6 +11,7 @@ install_requires = (
     'requests',
     'retry',
     'ruamel.yaml',
+    'temporalio'
 )
 
 
diff --git a/systemtests/temporal/run_workflow.py b/systemtests/temporal/run_workflow.py
new file mode 100644
index 0000000..bffe1cf
--- /dev/null
+++ b/systemtests/temporal/run_workflow.py
@@ -0,0 +1,74 @@
+"""Used to run a workflow independently as a debugging tool."""
+
+import argparse
+import asyncio
+import sys
+from datetime import datetime
+from logging import getLogger
+from typing import Any
+
+from temporal_registry import registry_worker  # type: ignore
+from temporalio.client import Client
+from workflows.workflow_test import (  # type: ignore
+    hello_world_param,
+    hello_world_workflow,
+)
+
+
+def return_workflow(worflow_name: str) -> Any:
+    match worflow_name:
+        case "hello_world":
+            return hello_world_workflow, hello_world_param
+
+
+async def main(args: Any, wflow_args: Any) -> None:
+    client = await Client.connect(f"{args.ip}:7233")
+    wflow, param = return_workflow(args.workflow)
+    wflow_name = wflow.__name__
+    this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
+    wflow_param = param(**wflow_args)
+    print("starting worker")
+    async with registry_worker(
+        client,
+        task_queue=args.task_queue,
+        logger=getLogger("temporal-worker"),
+        dependencies={},
+    ):
+        print("awaiting workflow execution")
+        result = await client.execute_workflow(
+            wflow.run,
+            wflow_param,
+            id=f"{wflow_name}-{this_time}",
+            task_queue=args.task_queue,
+        )
+        print(f"Result: {result}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        prog="Temporal workflow Executor",
+        description="Request a temporal workflow is run on the server",
+    )
+    parser.add_argument("--workflow", help="The temporal workflow to execute.")
+    parser.add_argument(
+        "--ip",
+        help="IP address of the temporal server",
+        default="localhost",
+    )
+    parser.add_argument(
+        "--namespace",
+        help="Namespace to use in the temporal server",
+        default="default",
+    )
+    parser.add_argument(
+        "--task-queue",
+        help="Task queue to use for this workflow",
+        default="test-queue",
+    )
+    args, extra = parser.parse_known_args(sys.argv[1:])
+    wflow_args = dict(
+        [
+            keyval.split("=") for keyval in extra
+        ]
+    )
+    asyncio.run(main(args, wflow_args))
diff --git a/systemtests/temporal/temporal_registry.py b/systemtests/temporal/temporal_registry.py
new file mode 100644
index 0000000..ff592c5
--- /dev/null
+++ b/systemtests/temporal/temporal_registry.py
@@ -0,0 +1,161 @@
+"""
+    Duplicates and modifies the registry code in @igor-brovtsin's
+    https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
+
+"""
+
+from __future__ import annotations
+
+import abc
+import inspect
+import logging
+from typing import Any, Callable, Dict, List, Optional, Set, Type
+
+from temporalio import activity, workflow
+from temporalio.client import Client
+from temporalio.types import CallableType, ClassType
+from temporalio.worker import Worker
+
+
+def split_log(
+    log_func: Callable[..., Any],
+    heading: str,
+    results: str | list[str],
+    mid_char: str = "│",
+) -> None:
+    res = results if isinstance(results, list) else results.split("\n")
+    log_func(heading)
+    for idx, line in enumerate(res):
+        log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
+
+
+def workflow_registrar() -> Callable[..., Any]:
+    """Generates a wrapper that applies `@temporalio.workflow.defn` to
+    a workflow class, and adds it to the list that can later be passed
+    to the Temporal client"""
+    registered = []
+
+    def wrapper(  # type:ignore
+        cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
+    ) -> Callable[..., Any]:
+        def decorator(cls: ClassType) -> ClassType:
+            registered.append(cls)
+            return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
+
+        if cls is not None:
+            return decorator(cls)
+        return decorator
+
+    wrapper.workflows = registered  # type:ignore
+    return wrapper
+
+
+def activity_registrar() -> Callable[..., Any]:
+    """Generates a wrapper that applies `@temporalio.activity.defn` to
+    a callable, and adds it to the list that can later be passed
+    to the Temporal client"""
+    registry = []
+
+    def decorator(  # type:ignore
+        f: CallableType, *args, **kwargs
+    ) -> Callable[..., Any]:
+        wrapped = activity.defn(*args, **kwargs)(f)
+        registry.append(wrapped)
+        return wrapped  # type:ignore
+
+    decorator.activities = registry  # type:ignore
+    return decorator
+
+
+register_workflow = workflow_registrar()
+register_activity = activity_registrar()
+
+
+class TemporalActivityClass(abc.ABC):
+    """Abstract class for Temporal activities that require external
+    dependencies: DB connections, LXD containers, HTTP clients, etc.
+    """
+
+    CLASSES: Set[Type[TemporalActivityClass]] = set()
+
+    def __init_subclass__(cls, **kwargs) -> None:  # type:ignore
+        super().__init_subclass__(**kwargs)
+        TemporalActivityClass.CLASSES.add(cls)
+
+    def get_activities(self) -> List[Any]:
+        result = []
+        for name in dir(self):
+            member = getattr(self, name)
+            if not callable(member) or not hasattr(
+                member, "__temporal_activity_definition"
+            ):
+                continue
+            result.append(member)
+        return result
+
+
+class ActivityClassDependencyMissingException(Exception):
+    """Raised when an activity does not provide a reqiored dependency."""
+
+
+def _initialize_activity_classes(
+    dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
+) -> List[CallableType]:
+    results = []
+    for cls in repo:
+        init = getattr(cls, "__init__")
+        kwargs = {}
+        for i, value in enumerate(inspect.signature(init).parameters.values()):
+            if i == 0:
+                continue
+            if value.name in dependencies:
+                kwargs[value.name] = dependencies[value.name]
+            elif value.default is inspect.Parameter.empty:
+                raise ActivityClassDependencyMissingException(
+                    f"{cls} requires dependency `{value.name}` that was not provided!"
+                )
+        results += cls(**kwargs).get_activities()  # type:ignore
+    return results
+
+
+def registry_worker(  # type:ignore
+    client: Client,
+    task_queue: str,
+    logger: logging.Logger,
+    dependencies: Dict[str, Any],
+    **kwargs,
+) -> Worker:
+    """Helper method that returns worker that uses `register_workflow`
+    and `register_activity` registries."""
+    kwargs.setdefault("workflows", [])
+    kwargs.setdefault("activities", [])
+    assert isinstance(
+        kwargs["workflows"], list
+    ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
+    assert isinstance(
+        kwargs["activities"], list
+    ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
+    kwargs["workflows"].extend(register_workflow.workflows)  # type:ignore
+    kwargs["activities"].extend(register_activity.activities)  # type:ignore
+
+    class_activities = _initialize_activity_classes(  # type:ignore
+        dependencies, TemporalActivityClass.CLASSES  # type:ignore
+    )
+    kwargs["activities"].extend(class_activities)
+
+    workflows_names = [
+        f"{w.__module__}.{w.__qualname__}"
+        if isinstance(w, type)
+        else f"{w} of {w.__module__}"
+        for w in kwargs["workflows"]
+    ]
+    activities_names = [
+        f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
+    ]
+
+    if workflows_names:
+        split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
+    if activities_names:
+        split_log(logger.info, "Registered activities", activities_names, mid_char="├")
+
+    return Worker(client=client, task_queue=task_queue, **kwargs)
diff --git a/systemtests/temporal/workflows/__init__.py b/systemtests/temporal/workflows/__init__.py
new file mode 100644
index 0000000..b334705
--- /dev/null
+++ b/systemtests/temporal/workflows/__init__.py
@@ -0,0 +1 @@
+"""Folder containing temporal workflows."""
diff --git a/systemtests/temporal/workflows/workflow_test.py b/systemtests/temporal/workflows/workflow_test.py
new file mode 100644
index 0000000..bb8c445
--- /dev/null
+++ b/systemtests/temporal/workflows/workflow_test.py
@@ -0,0 +1,27 @@
+from dataclasses import dataclass
+from datetime import timedelta
+
+from temporal_registry import register_activity, register_workflow  # type: ignore
+from temporalio import workflow
+
+
+@dataclass
+class hello_world_param:
+    name: str
+
+
+@register_activity  # type:ignore
+async def hello_world_activity(param: hello_world_param) -> str:
+    return f"Hello {param.name}"
+
+
+@register_workflow
+class hello_world_workflow:
+    @workflow.run
+    async def run(self, param: hello_world_param) -> str:
+        res = await workflow.execute_activity(
+            hello_world_activity,
+            param,
+            schedule_to_close_timeout=timedelta(seconds=60),
+        )
+        return str(res)
diff --git a/tox.ini b/tox.ini
index 34ab368..cb66822 100644
--- a/tox.ini
+++ b/tox.ini
@@ -105,6 +105,13 @@ description=Filter environments by valid tox ones.
 skip_install = true
 commands=python utils/filter_envs.py {posargs}
 
+[testenv:run_workflow]
+description=Execute a temporal workflow
+deps=
+  temporalio
+commands =
+  python systemtests/temporal/run_workflow.py {posargs}
+
 [flake8]
 max-line-length = 88
 per-file-ignores =

Follow ups