sts-sponsors team mailing list archive
-
sts-sponsors team
-
Mailing list archive
-
Message #08758
[Merge] ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master
Jack Lloyd-Walters has proposed merging ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master.
Commit message:
add ability to execute single workflows from tox for debugging
Requested reviews:
MAAS Committers (maas-committers)
For more details, see:
https://code.launchpad.net/~lloydwaltersj/maas-ci/+git/system-tests/+merge/443669
--
Your team MAAS Committers is requested to review the proposed merge of ~lloydwaltersj/maas-ci/+git/system-tests:add-temporal-workers into ~maas-committers/maas-ci/+git/system-tests:master.
diff --git a/setup.py b/setup.py
index 4f8f242..06ef27c 100644
--- a/setup.py
+++ b/setup.py
@@ -11,6 +11,7 @@ install_requires = (
'requests',
'retry',
'ruamel.yaml',
+ 'temporalio'
)
diff --git a/systemtests/temporal/run_workflow.py b/systemtests/temporal/run_workflow.py
new file mode 100644
index 0000000..bffe1cf
--- /dev/null
+++ b/systemtests/temporal/run_workflow.py
@@ -0,0 +1,74 @@
+"""Used to run a workflow independently as a debugging tool."""
+
+import argparse
+import asyncio
+import sys
+from datetime import datetime
+from logging import getLogger
+from typing import Any
+
+from temporal_registry import registry_worker # type: ignore
+from temporalio.client import Client
+from workflows.workflow_test import ( # type: ignore
+ hello_world_param,
+ hello_world_workflow,
+)
+
+
+def return_workflow(worflow_name: str) -> Any:
+ match worflow_name:
+ case "hello_world":
+ return hello_world_workflow, hello_world_param
+
+
+async def main(args: Any, wflow_args: Any) -> None:
+ client = await Client.connect(f"{args.ip}:7233")
+ wflow, param = return_workflow(args.workflow)
+ wflow_name = wflow.__name__
+ this_time = datetime.utcnow().strftime("%H%M%S%f")[:-3]
+ wflow_param = param(**wflow_args)
+ print("starting worker")
+ async with registry_worker(
+ client,
+ task_queue=args.task_queue,
+ logger=getLogger("temporal-worker"),
+ dependencies={},
+ ):
+ print("awaiting workflow execution")
+ result = await client.execute_workflow(
+ wflow.run,
+ wflow_param,
+ id=f"{wflow_name}-{this_time}",
+ task_queue=args.task_queue,
+ )
+ print(f"Result: {result}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ prog="Temporal workflow Executor",
+ description="Request a temporal workflow is run on the server",
+ )
+ parser.add_argument("--workflow", help="The temporal workflow to execute.")
+ parser.add_argument(
+ "--ip",
+ help="IP address of the temporal server",
+ default="localhost",
+ )
+ parser.add_argument(
+ "--namespace",
+ help="Namespace to use in the temporal server",
+ default="default",
+ )
+ parser.add_argument(
+ "--task-queue",
+ help="Task queue to use for this workflow",
+ default="test-queue",
+ )
+ args, extra = parser.parse_known_args(sys.argv[1:])
+ wflow_args = dict(
+ [
+ keyval.split("=") for keyval in extra
+ ]
+ )
+ asyncio.run(main(args, wflow_args))
diff --git a/systemtests/temporal/temporal_registry.py b/systemtests/temporal/temporal_registry.py
new file mode 100644
index 0000000..ff592c5
--- /dev/null
+++ b/systemtests/temporal/temporal_registry.py
@@ -0,0 +1,161 @@
+"""
+ Duplicates and modifies the registry code in @igor-brovtsin's
+ https://code.launchpad.net/~igor-brovtsin/maas-ci/+git/maas-ci/+ref/temporal-sandbox
+
+"""
+
+from __future__ import annotations
+
+import abc
+import inspect
+import logging
+from typing import Any, Callable, Dict, List, Optional, Set, Type
+
+from temporalio import activity, workflow
+from temporalio.client import Client
+from temporalio.types import CallableType, ClassType
+from temporalio.worker import Worker
+
+
+def split_log(
+ log_func: Callable[..., Any],
+ heading: str,
+ results: str | list[str],
+ mid_char: str = "│",
+) -> None:
+ res = results if isinstance(results, list) else results.split("\n")
+ log_func(heading)
+ for idx, line in enumerate(res):
+ log_func(f"{'└' if idx == len(res)-1 else mid_char} {line}")
+
+
+def workflow_registrar() -> Callable[..., Any]:
+ """Generates a wrapper that applies `@temporalio.workflow.defn` to
+ a workflow class, and adds it to the list that can later be passed
+ to the Temporal client"""
+ registered = []
+
+ def wrapper( # type:ignore
+ cls: Optional[ClassType] = None, *args, sandboxed=False, **kwargs
+ ) -> Callable[..., Any]:
+ def decorator(cls: ClassType) -> ClassType:
+ registered.append(cls)
+ return workflow.defn(*args, sandboxed=sandboxed, **kwargs)(cls)
+
+ if cls is not None:
+ return decorator(cls)
+ return decorator
+
+ wrapper.workflows = registered # type:ignore
+ return wrapper
+
+
+def activity_registrar() -> Callable[..., Any]:
+ """Generates a wrapper that applies `@temporalio.activity.defn` to
+ a callable, and adds it to the list that can later be passed
+ to the Temporal client"""
+ registry = []
+
+ def decorator( # type:ignore
+ f: CallableType, *args, **kwargs
+ ) -> Callable[..., Any]:
+ wrapped = activity.defn(*args, **kwargs)(f)
+ registry.append(wrapped)
+ return wrapped # type:ignore
+
+ decorator.activities = registry # type:ignore
+ return decorator
+
+
+register_workflow = workflow_registrar()
+register_activity = activity_registrar()
+
+
+class TemporalActivityClass(abc.ABC):
+ """Abstract class for Temporal activities that require external
+ dependencies: DB connections, LXD containers, HTTP clients, etc.
+ """
+
+ CLASSES: Set[Type[TemporalActivityClass]] = set()
+
+ def __init_subclass__(cls, **kwargs) -> None: # type:ignore
+ super().__init_subclass__(**kwargs)
+ TemporalActivityClass.CLASSES.add(cls)
+
+ def get_activities(self) -> List[Any]:
+ result = []
+ for name in dir(self):
+ member = getattr(self, name)
+ if not callable(member) or not hasattr(
+ member, "__temporal_activity_definition"
+ ):
+ continue
+ result.append(member)
+ return result
+
+
+class ActivityClassDependencyMissingException(Exception):
+ """Raised when an activity does not provide a reqiored dependency."""
+
+
+def _initialize_activity_classes(
+ dependencies: Dict[str, Any], repo: Set[TemporalActivityClass]
+) -> List[CallableType]:
+ results = []
+ for cls in repo:
+ init = getattr(cls, "__init__")
+ kwargs = {}
+ for i, value in enumerate(inspect.signature(init).parameters.values()):
+ if i == 0:
+ continue
+ if value.name in dependencies:
+ kwargs[value.name] = dependencies[value.name]
+ elif value.default is inspect.Parameter.empty:
+ raise ActivityClassDependencyMissingException(
+ f"{cls} requires dependency `{value.name}` that was not provided!"
+ )
+ results += cls(**kwargs).get_activities() # type:ignore
+ return results
+
+
+def registry_worker( # type:ignore
+ client: Client,
+ task_queue: str,
+ logger: logging.Logger,
+ dependencies: Dict[str, Any],
+ **kwargs,
+) -> Worker:
+ """Helper method that returns worker that uses `register_workflow`
+ and `register_activity` registries."""
+ kwargs.setdefault("workflows", [])
+ kwargs.setdefault("activities", [])
+ assert isinstance(
+ kwargs["workflows"], list
+ ), f"`workflows` kwarg type is `{type(kwargs['workflows'])}`, not `list`"
+ assert isinstance(
+ kwargs["activities"], list
+ ), f"`activities` kwarg type is `{type(kwargs['activities'])}`, not `list`"
+ kwargs["workflows"].extend(register_workflow.workflows) # type:ignore
+ kwargs["activities"].extend(register_activity.activities) # type:ignore
+
+ class_activities = _initialize_activity_classes( # type:ignore
+ dependencies, TemporalActivityClass.CLASSES # type:ignore
+ )
+ kwargs["activities"].extend(class_activities)
+
+ workflows_names = [
+ f"{w.__module__}.{w.__qualname__}"
+ if isinstance(w, type)
+ else f"{w} of {w.__module__}"
+ for w in kwargs["workflows"]
+ ]
+ activities_names = [
+ f"{a.__module__}.{a.__qualname__}" for a in kwargs["activities"]
+ ]
+
+ if workflows_names:
+ split_log(logger.info, "Registered workflows", workflows_names, mid_char="├")
+ if activities_names:
+ split_log(logger.info, "Registered activities", activities_names, mid_char="├")
+
+ return Worker(client=client, task_queue=task_queue, **kwargs)
diff --git a/systemtests/temporal/workflows/__init__.py b/systemtests/temporal/workflows/__init__.py
new file mode 100644
index 0000000..b334705
--- /dev/null
+++ b/systemtests/temporal/workflows/__init__.py
@@ -0,0 +1 @@
+"""Folder containing temporal workflows."""
diff --git a/systemtests/temporal/workflows/workflow_test.py b/systemtests/temporal/workflows/workflow_test.py
new file mode 100644
index 0000000..bb8c445
--- /dev/null
+++ b/systemtests/temporal/workflows/workflow_test.py
@@ -0,0 +1,27 @@
+from dataclasses import dataclass
+from datetime import timedelta
+
+from temporal_registry import register_activity, register_workflow # type: ignore
+from temporalio import workflow
+
+
+@dataclass
+class hello_world_param:
+ name: str
+
+
+@register_activity # type:ignore
+async def hello_world_activity(param: hello_world_param) -> str:
+ return f"Hello {param.name}"
+
+
+@register_workflow
+class hello_world_workflow:
+ @workflow.run
+ async def run(self, param: hello_world_param) -> str:
+ res = await workflow.execute_activity(
+ hello_world_activity,
+ param,
+ schedule_to_close_timeout=timedelta(seconds=60),
+ )
+ return str(res)
diff --git a/tox.ini b/tox.ini
index 34ab368..cb66822 100644
--- a/tox.ini
+++ b/tox.ini
@@ -105,6 +105,13 @@ description=Filter environments by valid tox ones.
skip_install = true
commands=python utils/filter_envs.py {posargs}
+[testenv:run_workflow]
+description=Execute a temporal workflow
+deps=
+ temporalio
+commands =
+ python systemtests/temporal/run_workflow.py {posargs}
+
[flake8]
max-line-length = 88
per-file-ignores =
Follow ups