Skip to content

Commit 2e10fc3

Browse files
authored
Merge branch 'main' into integration-test-1
2 parents 3dcb003 + 2d2accb commit 2e10fc3

File tree

12 files changed

+515
-204
lines changed

12 files changed

+515
-204
lines changed

Makefile

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
.PHONY: pr install lint type-check test integration-test clean generate help
2+
3+
# Run all PR checks locally
4+
pr: install generate lint type-check test integration-test
5+
@echo "All PR checks passed!"
6+
7+
# Install dependencies
8+
install:
9+
@echo "Installing dependencies..."
10+
uv sync --extra dev
11+
12+
# Generate idl files
13+
generate:
14+
@echo "Generating type files based on IDL..."
15+
uv run python scripts/generate_proto.py
16+
17+
# Run linter
18+
lint:
19+
@echo "Running Ruff linter and fixing lint issues..."
20+
uv tool run ruff check --fix
21+
22+
# Run type checker
23+
type-check:
24+
@echo "Running mypy type checker..."
25+
uv tool run mypy cadence/
26+
27+
# Run unit tests
28+
test:
29+
@echo "Running unit tests..."
30+
uv run pytest -v
31+
32+
# Run integration tests
33+
integration-test:
34+
@echo "Running integration tests..."
35+
uv run pytest -v --integration-tests
36+
37+
# Clean generated files and caches
38+
clean:
39+
@echo "Cleaning up..."
40+
find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true
41+
find . -type f -name "*.pyc" -delete 2>/dev/null || true
42+
find . -type d -name "*.egg-info" -exec rm -rf {} + 2>/dev/null || true
43+
44+
# Show help
45+
help:
46+
@echo "Available targets:"
47+
@echo " make pr - Run all PR checks (recommended before submitting PR)"
48+
@echo " make install - Install dependencies"
49+
@echo " make lint - Run Ruff linter"
50+
@echo " make type-check - Run mypy type checker"
51+
@echo " make test - Run unit tests"
52+
@echo " make integration-test - Run integration tests"
53+
@echo " make clean - Remove generated files and caches"
54+
@echo " make help - Show this help message"
55+

cadence/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66

77
# Import main client functionality
88
from .client import Client
9+
from .worker import Registry
10+
from . import workflow
911

1012
__version__ = "0.1.0"
1113

1214
__all__ = [
1315
"Client",
16+
"Registry",
17+
"workflow",
1418
]

cadence/_internal/workflow/workflow_engine.py

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ class DecisionResult:
2020
decisions: list[Decision]
2121

2222
class WorkflowEngine:
23-
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
23+
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
2424
self._context = Context(client, info)
25-
self._workflow_func = workflow_func
25+
self._workflow_definition = workflow_definition
26+
self._workflow_instance = None
27+
if workflow_definition:
28+
self._workflow_instance = workflow_definition.cls()
2629
self._decision_manager = DecisionManager()
2730
self._decisions_helper = DecisionsHelper(self._decision_manager)
2831
self._is_workflow_complete = False
@@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None:
250253
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
251254
"""
252255
Execute the workflow function to generate new decisions.
253-
256+
254257
This blocks until the workflow schedules an activity or completes.
255-
258+
256259
Args:
257260
decision_task: The decision task containing workflow context
258261
"""
259262
try:
260-
# Execute the workflow function
261-
# The workflow function should block until it schedules an activity
262-
workflow_func = self._workflow_func
263-
if workflow_func is None:
263+
# Execute the workflow function from the workflow instance
264+
if self._workflow_definition is None or self._workflow_instance is None:
264265
logger.warning(
265-
"No workflow function available",
266+
"No workflow definition or instance available",
266267
extra={
267268
"workflow_type": self._context.info().workflow_type,
268269
"workflow_id": self._context.info().workflow_id,
@@ -271,11 +272,14 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
271272
)
272273
return
273274

275+
# Get the workflow run method from the instance
276+
workflow_func = self._workflow_definition.get_run_method(self._workflow_instance)
277+
274278
# Extract workflow input from history
275279
workflow_input = await self._extract_workflow_input(decision_task)
276280

277281
# Execute workflow function
278-
result = self._execute_workflow_function_once(workflow_func, workflow_input)
282+
result = await self._execute_workflow_function_once(workflow_func, workflow_input)
279283

280284
# Check if workflow is complete
281285
if result is not None:
@@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
290294
"completion_type": "success"
291295
}
292296
)
293-
297+
294298
except Exception as e:
295299
logger.error(
296300
"Error executing workflow function",
@@ -337,7 +341,7 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon
337341
logger.warning("No WorkflowExecutionStarted event found in history")
338342
return None
339343

340-
def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
344+
async def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
341345
"""
342346
Execute the workflow function once (not during replay).
343347
@@ -351,23 +355,9 @@ def _execute_workflow_function_once(self, workflow_func: Callable, workflow_inpu
351355
logger.debug(f"Executing workflow function with input: {workflow_input}")
352356
result = workflow_func(workflow_input)
353357

354-
# If the workflow function is async, we need to handle it properly
358+
# If the workflow function is async, await it properly
355359
if asyncio.iscoroutine(result):
356-
# For now, use asyncio.run for async workflow functions
357-
# TODO: Implement proper deterministic event loop for workflow execution
358-
try:
359-
result = asyncio.run(result)
360-
except RuntimeError:
361-
# If we're already in an event loop, create a new task
362-
loop = asyncio.get_event_loop()
363-
if loop.is_running():
364-
# We can't use asyncio.run inside a running loop
365-
# For now, just get the result (this may not be deterministic)
366-
logger.warning("Async workflow function called within running event loop - may not be deterministic")
367-
# This is a workaround - in a real implementation, we'd need proper task scheduling
368-
result = None
369-
else:
370-
result = loop.run_until_complete(result)
360+
result = await result
371361

372362
return result
373363

cadence/worker/_decision_task_handler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
7676
)
7777

7878
try:
79-
workflow_func = self._registry.get_workflow(workflow_type_name)
79+
workflow_definition = self._registry.get_workflow(workflow_type_name)
8080
except KeyError:
8181
logger.error(
8282
"Workflow type not found in registry",
@@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
103103
workflow_engine = self._workflow_engines.get(cache_key)
104104
if workflow_engine is None:
105105
workflow_engine = WorkflowEngine(
106-
info=workflow_info,
107-
client=self._client,
108-
workflow_func=workflow_func
106+
info=workflow_info,
107+
client=self._client,
108+
workflow_definition=workflow_definition
109109
)
110110
self._workflow_engines[cache_key] = workflow_engine
111111

cadence/worker/_registry.py

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
"""
88

99
import logging
10-
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload
10+
from typing import Callable, Dict, Optional, Unpack, TypedDict, overload, Type, Union, TypeVar
1111
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
12+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1213

1314
logger = logging.getLogger(__name__)
1415

16+
# TypeVar for workflow class types
17+
W = TypeVar('W')
18+
1519

1620
class RegisterWorkflowOptions(TypedDict, total=False):
1721
"""Options for registering a workflow."""
@@ -28,53 +32,58 @@ class Registry:
2832

2933
def __init__(self) -> None:
3034
"""Initialize the registry."""
31-
self._workflows: Dict[str, Callable] = {}
35+
self._workflows: Dict[str, WorkflowDefinition] = {}
3236
self._activities: Dict[str, ActivityDefinition] = {}
3337
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping
3438

3539
def workflow(
3640
self,
37-
func: Optional[Callable] = None,
41+
cls: Optional[Type[W]] = None,
3842
**kwargs: Unpack[RegisterWorkflowOptions]
39-
) -> Callable:
43+
) -> Union[Type[W], Callable[[Type[W]], Type[W]]]:
4044
"""
41-
Register a workflow function.
42-
45+
Register a workflow class.
46+
4347
This method can be used as a decorator or called directly.
44-
48+
Only supports class-based workflows.
49+
4550
Args:
46-
func: The workflow function to register
51+
cls: The workflow class to register
4752
**kwargs: Options for registration (name, alias)
48-
53+
4954
Returns:
50-
The decorated function or the function itself
51-
55+
The decorated class
56+
5257
Raises:
5358
KeyError: If workflow name already exists
59+
ValueError: If class workflow is invalid
5460
"""
5561
options = RegisterWorkflowOptions(**kwargs)
56-
57-
def decorator(f: Callable) -> Callable:
58-
workflow_name = options.get('name') or f.__name__
59-
62+
63+
def decorator(target: Type[W]) -> Type[W]:
64+
workflow_name = options.get('name') or target.__name__
65+
6066
if workflow_name in self._workflows:
6167
raise KeyError(f"Workflow '{workflow_name}' is already registered")
62-
63-
self._workflows[workflow_name] = f
64-
68+
69+
# Create WorkflowDefinition with type information
70+
workflow_opts = WorkflowDefinitionOptions(name=workflow_name)
71+
workflow_def = WorkflowDefinition.wrap(target, workflow_opts)
72+
self._workflows[workflow_name] = workflow_def
73+
6574
# Register alias if provided
6675
alias = options.get('alias')
6776
if alias:
6877
if alias in self._workflow_aliases:
6978
raise KeyError(f"Workflow alias '{alias}' is already registered")
7079
self._workflow_aliases[alias] = workflow_name
71-
80+
7281
logger.info(f"Registered workflow '{workflow_name}'")
73-
return f
74-
75-
if func is None:
82+
return target
83+
84+
if cls is None:
7685
return decorator
77-
return decorator(func)
86+
return decorator(cls)
7887

7988
@overload
8089
def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]:
@@ -135,25 +144,25 @@ def _register_activity(self, defn: ActivityDefinition) -> None:
135144
self._activities[defn.name] = defn
136145

137146

138-
def get_workflow(self, name: str) -> Callable:
147+
def get_workflow(self, name: str) -> WorkflowDefinition:
139148
"""
140149
Get a registered workflow by name.
141-
150+
142151
Args:
143152
name: Name or alias of the workflow
144-
153+
145154
Returns:
146-
The workflow function
147-
155+
The workflow definition
156+
148157
Raises:
149158
KeyError: If workflow is not found
150159
"""
151160
# Check if it's an alias
152161
actual_name = self._workflow_aliases.get(name, name)
153-
162+
154163
if actual_name not in self._workflows:
155164
raise KeyError(f"Workflow '{name}' not found in registry")
156-
165+
157166
return self._workflows[actual_name]
158167

159168
def get_activity(self, name: str) -> ActivityDefinition:
@@ -188,7 +197,7 @@ def of(*args: 'Registry') -> 'Registry':
188197

189198
return result
190199

191-
def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]:
200+
def _find_activity_definitions(instance: object) -> list[ActivityDefinition]:
192201
attr_to_def = {}
193202
for t in instance.__class__.__mro__:
194203
for attr in dir(t):
@@ -200,10 +209,7 @@ def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]
200209
raise ValueError(f"'{attr}' was overridden with a duplicate activity definition")
201210
attr_to_def[attr] = value
202211

203-
# Create new definitions, copying the attributes from the declaring type but using the function
204-
# from the specific object. This allows for the decorator to be applied to the base class and the
205-
# function to be overridden
206-
result = []
212+
result: list[ActivityDefinition] = []
207213
for attr, definition in attr_to_def.items():
208214
result.append(ActivityDefinition(getattr(instance, attr), definition.name, definition.strategy, definition.params))
209215

0 commit comments

Comments
 (0)