Skip to content

Commit 60b1444

Browse files
committed
Redefine the workflow type to class instead of callable
Signed-off-by: Tim Li <[email protected]>
1 parent adb578b commit 60b1444

File tree

4 files changed

+184
-170
lines changed

4 files changed

+184
-170
lines changed

cadence/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66

77
# Import main client functionality
88
from .client import Client
9+
from .worker import Registry
10+
from .workflow import workflow
911

1012
__version__ = "0.1.0"
1113

1214
__all__ = [
1315
"Client",
16+
"Registry",
17+
"workflow",
1418
]

cadence/worker/_registry.py

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"""
88

99
import logging
10-
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload
10+
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type
1111
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
1212
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1313

@@ -35,50 +35,52 @@ def __init__(self) -> None:
3535

3636
def workflow(
3737
self,
38-
func: Optional[Callable] = None,
38+
cls: Optional[Type] = None,
3939
**kwargs: Unpack[RegisterWorkflowOptions]
40-
) -> Callable:
40+
) -> Type:
4141
"""
42-
Register a workflow function.
43-
42+
Register a workflow class.
43+
4444
This method can be used as a decorator or called directly.
45-
45+
Only supports class-based workflows.
46+
4647
Args:
47-
func: The workflow function to register
48+
cls: The workflow class to register
4849
**kwargs: Options for registration (name, alias)
49-
50+
5051
Returns:
51-
The decorated function or the function itself
52-
52+
The decorated class
53+
5354
Raises:
5455
KeyError: If workflow name already exists
56+
ValueError: If class workflow is invalid
5557
"""
5658
options = RegisterWorkflowOptions(**kwargs)
57-
58-
def decorator(f: Callable) -> Callable:
59-
workflow_name = options.get('name') or f.__name__
60-
59+
60+
def decorator(target: Type) -> Type:
61+
workflow_name = options.get('name') or target.__name__
62+
6163
if workflow_name in self._workflows:
6264
raise KeyError(f"Workflow '{workflow_name}' is already registered")
63-
65+
6466
# Create WorkflowDefinition with type information
6567
workflow_opts = WorkflowDefinitionOptions(name=workflow_name)
66-
workflow_def = WorkflowDefinition.wrap(f, workflow_opts)
68+
workflow_def = WorkflowDefinition.wrap(target, workflow_opts)
6769
self._workflows[workflow_name] = workflow_def
68-
70+
6971
# Register alias if provided
7072
alias = options.get('alias')
7173
if alias:
7274
if alias in self._workflow_aliases:
7375
raise KeyError(f"Workflow alias '{alias}' is already registered")
7476
self._workflow_aliases[alias] = workflow_name
75-
77+
7678
logger.info(f"Registered workflow '{workflow_name}'")
77-
return f
78-
79-
if func is None:
79+
return target
80+
81+
if cls is None:
8082
return decorator
81-
return decorator(func)
83+
return decorator(cls)
8284

8385
@overload
8486
def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]:
@@ -142,22 +144,22 @@ def _register_activity(self, defn: ActivityDefinition) -> None:
142144
def get_workflow(self, name: str) -> WorkflowDefinition:
143145
"""
144146
Get a registered workflow by name.
145-
147+
146148
Args:
147149
name: Name or alias of the workflow
148-
150+
149151
Returns:
150-
The workflow definition with type information
151-
152+
The workflow definition
153+
152154
Raises:
153155
KeyError: If workflow is not found
154156
"""
155157
# Check if it's an alias
156158
actual_name = self._workflow_aliases.get(name, name)
157-
159+
158160
if actual_name not in self._workflows:
159161
raise KeyError(f"Workflow '{name}' not found in registry")
160-
162+
161163
return self._workflows[actual_name]
162164

163165
def get_activity(self, name: str) -> ActivityDefinition:

cadence/workflow.py

Lines changed: 53 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,145 +2,106 @@
22
from contextlib import contextmanager
33
from contextvars import ContextVar
44
from dataclasses import dataclass
5-
from functools import update_wrapper
6-
from inspect import signature, Parameter
7-
from typing import Iterator, Callable, TypeVar, ParamSpec, Generic, TypedDict, Unpack, overload, get_type_hints, Type, Any
5+
from typing import Iterator, Callable, TypeVar, TypedDict, Type
6+
from functools import wraps
87

98
from cadence.client import Client
109

11-
12-
@dataclass(frozen=True)
13-
class WorkflowParameter:
14-
"""Parameter information for a workflow function."""
15-
name: str
16-
type_hint: Type | None
17-
default_value: Any | None
10+
T = TypeVar('T')
1811

1912

2013
class WorkflowDefinitionOptions(TypedDict, total=False):
2114
"""Options for defining a workflow."""
2215
name: str
2316

2417

25-
P = ParamSpec('P')
26-
T = TypeVar('T')
27-
28-
29-
class WorkflowDefinition(Generic[P, T]):
18+
class WorkflowDefinition:
3019
"""
31-
Definition of a workflow function with metadata.
20+
Definition of a workflow class with metadata.
3221
33-
Similar to ActivityDefinition but for workflows.
34-
Provides type safety and metadata for workflow functions.
22+
Similar to ActivityDefinition but for workflow classes.
23+
Provides type safety and metadata for workflow classes.
3524
"""
3625

37-
def __init__(self, wrapped: Callable[P, T], name: str, params: list[WorkflowParameter]):
38-
self._wrapped = wrapped
26+
def __init__(self, cls: Type, name: str):
27+
self._cls = cls
3928
self._name = name
40-
self._params = params
41-
update_wrapper(self, wrapped)
42-
43-
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
44-
return self._wrapped(*args, **kwargs)
4529

4630
@property
4731
def name(self) -> str:
4832
"""Get the workflow name."""
4933
return self._name
5034

5135
@property
52-
def params(self) -> list[WorkflowParameter]:
53-
"""Get the workflow parameters."""
54-
return self._params
55-
56-
@property
57-
def fn(self) -> Callable[P, T]:
58-
"""Get the underlying workflow function."""
59-
return self._wrapped
36+
def cls(self) -> Type:
37+
"""Get the workflow class."""
38+
return self._cls
6039

61-
@classmethod
62-
def wrap(cls, fn: Callable[P, T], opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition[P, T]':
40+
@staticmethod
41+
def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
6342
"""
64-
Wrap a function as a WorkflowDefinition.
43+
Wrap a class as a WorkflowDefinition.
6544
6645
Args:
67-
fn: The workflow function to wrap
46+
cls: The workflow class to wrap
6847
opts: Options for the workflow definition
6948
7049
Returns:
7150
A WorkflowDefinition instance
51+
52+
Raises:
53+
ValueError: If no run method is found or multiple run methods exist
7254
"""
73-
name = fn.__qualname__
55+
name = cls.__name__
7456
if "name" in opts and opts["name"]:
7557
name = opts["name"]
7658

77-
params = _get_workflow_params(fn)
78-
return cls(fn, name, params)
59+
# Validate that the class has exactly one run method
60+
run_method_count = 0
61+
for attr_name in dir(cls):
62+
if attr_name.startswith('_'):
63+
continue
7964

65+
attr = getattr(cls, attr_name)
66+
if not callable(attr):
67+
continue
8068

81-
WorkflowDecorator = Callable[[Callable[P, T]], WorkflowDefinition[P, T]]
69+
# Check for workflow run method
70+
if hasattr(attr, '_workflow_run'):
71+
run_method_count += 1
8272

73+
if run_method_count == 0:
74+
raise ValueError(f"No @workflow.run method found in class {cls.__name__}")
75+
elif run_method_count > 1:
76+
raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}")
8377

84-
@overload
85-
def defn(fn: Callable[P, T]) -> WorkflowDefinition[P, T]:
86-
...
78+
return WorkflowDefinition(cls, name)
8779

8880

89-
@overload
90-
def defn(**kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator:
91-
...
81+
def run(func: Callable[..., T]) -> Callable[..., T]:
82+
"""
83+
Decorator to mark a method as the main workflow run method.
9284
85+
Args:
86+
func: The method to mark as the workflow run method
9387
94-
def defn(fn: Callable[P, T] | None = None, **kwargs: Unpack[WorkflowDefinitionOptions]) -> WorkflowDecorator | WorkflowDefinition[P, T]:
88+
Returns:
89+
The decorated method with workflow run metadata
9590
"""
96-
Decorator to define a workflow function.
91+
@wraps(func)
92+
def wrapper(*args, **kwargs):
93+
return func(*args, **kwargs)
9794

98-
Usage:
99-
@defn
100-
def my_workflow(input_data: str) -> str:
101-
return f"processed: {input_data}"
95+
# Attach metadata to the function
96+
wrapper._workflow_run = True # type: ignore
97+
return wrapper
10298

103-
@defn(name="custom_workflow_name")
104-
def my_other_workflow(input_data: str) -> str:
105-
return f"custom: {input_data}"
10699

107-
Args:
108-
fn: The workflow function (when used without parentheses)
109-
**kwargs: Workflow definition options
100+
# Create a simple namespace object for the workflow decorators
101+
class _WorkflowNamespace:
102+
run = staticmethod(run)
110103

111-
Returns:
112-
Either a WorkflowDefinition (direct decoration) or a decorator function
113-
"""
114-
opts = WorkflowDefinitionOptions(**kwargs)
115-
116-
def decorator(inner_fn: Callable[P, T]) -> WorkflowDefinition[P, T]:
117-
return WorkflowDefinition.wrap(inner_fn, opts)
118-
119-
if fn is not None:
120-
return decorator(fn)
121-
122-
return decorator
123-
124-
125-
def _get_workflow_params(fn: Callable) -> list[WorkflowParameter]:
126-
"""Extract parameter information from a workflow function."""
127-
args = signature(fn).parameters
128-
hints = get_type_hints(fn)
129-
result = []
130-
for name, param in args.items():
131-
# Filter out self parameter
132-
if param.name == "self":
133-
continue
134-
default = None
135-
if param.default != Parameter.empty:
136-
default = param.default
137-
if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD):
138-
type_hint = hints.get(name, None)
139-
result.append(WorkflowParameter(name, type_hint, default))
140-
else:
141-
raise ValueError(f"Parameters must be positional. {name} is {param.kind}, and not valid")
142-
143-
return result
104+
workflow = _WorkflowNamespace()
144105

145106

146107
@dataclass

0 commit comments

Comments
 (0)