Skip to content

Commit 6bf973b

Browse files
committed
another bunch of minor changes
Signed-off-by: Tim Li <[email protected]>
1 parent dae76e6 commit 6bf973b

File tree

8 files changed

+172
-89
lines changed

8 files changed

+172
-89
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ class DecisionResult:
2020
decisions: list[Decision]
2121

2222
class WorkflowEngine:
23-
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
23+
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
2424
self._context = Context(client, info)
25-
self._workflow_func = workflow_func
25+
self._workflow_definition = workflow_definition
26+
self._workflow_instance = None
27+
if workflow_definition:
28+
self._workflow_instance = workflow_definition.cls()
2629
self._decision_manager = DecisionManager()
2730
self._decisions_helper = DecisionsHelper(self._decision_manager)
2831
self._is_workflow_complete = False
@@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None:
250253
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
251254
"""
252255
Execute the workflow function to generate new decisions.
253-
256+
254257
This blocks until the workflow schedules an activity or completes.
255-
258+
256259
Args:
257260
decision_task: The decision task containing workflow context
258261
"""
259262
try:
260-
# Execute the workflow function
261-
# The workflow function should block until it schedules an activity
262-
workflow_func = self._workflow_func
263-
if workflow_func is None:
263+
# Execute the workflow function from the workflow instance
264+
if self._workflow_definition is None or self._workflow_instance is None:
264265
logger.warning(
265-
"No workflow function available",
266+
"No workflow definition or instance available",
266267
extra={
267268
"workflow_type": self._context.info().workflow_type,
268269
"workflow_id": self._context.info().workflow_id,
@@ -271,6 +272,9 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
271272
)
272273
return
273274

275+
# Get the workflow run method from the instance
276+
workflow_func = self._workflow_definition.get_run_method(self._workflow_instance)
277+
274278
# Extract workflow input from history
275279
workflow_input = await self._extract_workflow_input(decision_task)
276280

@@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
290294
"completion_type": "success"
291295
}
292296
)
293-
297+
294298
except Exception as e:
295299
logger.error(
296300
"Error executing workflow function",

cadence/worker/_decision_task_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
103103
workflow_engine = self._workflow_engines.get(cache_key)
104104
if workflow_engine is None:
105105
workflow_engine = WorkflowEngine(
106-
info=workflow_info,
107-
client=self._client,
108-
workflow_func=workflow_definition.fn
106+
info=workflow_info,
107+
client=self._client,
108+
workflow_definition=workflow_definition
109109
)
110110
self._workflow_engines[cache_key] = workflow_engine
111111

cadence/workflow.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from contextlib import contextmanager
33
from contextvars import ContextVar
44
from dataclasses import dataclass
5-
from typing import Iterator, Callable, TypeVar, TypedDict, Type
5+
from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any
66
from functools import wraps
77

88
from cadence.client import Client
@@ -37,6 +37,16 @@ def cls(self) -> Type:
3737
"""Get the workflow class."""
3838
return self._cls
3939

40+
def get_run_method(self, instance: Any) -> Callable:
41+
"""Get the workflow run method from an instance of the workflow class."""
42+
for attr_name in dir(instance):
43+
if attr_name.startswith('_'):
44+
continue
45+
attr = getattr(instance, attr_name)
46+
if callable(attr) and hasattr(attr, '_workflow_run'):
47+
return cast(Callable, attr)
48+
raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}")
49+
4050
@staticmethod
4151
def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
4252
"""

tests/cadence/_internal/workflow/test_workflow_engine_integration.py

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,25 @@ def workflow_info(self):
3636
)
3737

3838
@pytest.fixture
39-
def mock_workflow_func(self):
40-
"""Create a mock workflow function."""
41-
def workflow_func(input_data):
42-
return f"processed: {input_data}"
43-
return workflow_func
39+
def mock_workflow_definition(self):
40+
"""Create a mock workflow definition."""
41+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
42+
43+
class TestWorkflow:
44+
@workflow.run
45+
def weird_name(self, input_data):
46+
return f"processed: {input_data}"
47+
48+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
49+
return WorkflowDefinition.wrap(TestWorkflow, workflow_opts)
4450

4551
@pytest.fixture
46-
def workflow_engine(self, mock_client, workflow_info, mock_workflow_func):
52+
def workflow_engine(self, mock_client, workflow_info, mock_workflow_definition):
4753
"""Create a WorkflowEngine instance."""
4854
return WorkflowEngine(
4955
info=workflow_info,
5056
client=mock_client,
51-
workflow_func=mock_workflow_func
57+
workflow_definition=mock_workflow_definition
5258
)
5359

5460
def create_mock_decision_task(self, workflow_id="test-workflow", run_id="test-run", workflow_type="test_workflow"):
@@ -211,10 +217,13 @@ async def test_extract_workflow_input_deserialization_error(self, workflow_engin
211217
def test_execute_workflow_function_sync(self, workflow_engine):
212218
"""Test synchronous workflow function execution."""
213219
input_data = "test-input"
214-
220+
221+
# Get the workflow function from the instance
222+
workflow_func = workflow_engine._workflow_definition.get_run_method(workflow_engine._workflow_instance)
223+
215224
# Execute the workflow function
216-
result = workflow_engine._execute_workflow_function_once(workflow_engine._workflow_func, input_data)
217-
225+
result = workflow_engine._execute_workflow_function_once(workflow_func, input_data)
226+
218227
# Verify the result
219228
assert result == "processed: test-input"
220229

@@ -239,20 +248,21 @@ def test_execute_workflow_function_none(self, workflow_engine):
239248
with pytest.raises(TypeError, match="'NoneType' object is not callable"):
240249
workflow_engine._execute_workflow_function_once(None, input_data)
241250

242-
def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_func):
251+
def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_definition):
243252
"""Test WorkflowEngine initialization."""
244253
assert workflow_engine._context is not None
245-
assert workflow_engine._workflow_func == mock_workflow_func
254+
assert workflow_engine._workflow_definition == mock_workflow_definition
255+
assert workflow_engine._workflow_instance is not None
246256
assert workflow_engine._decision_manager is not None
247257
assert workflow_engine._is_workflow_complete is False
248258

249259
@pytest.mark.asyncio
250-
async def test_workflow_engine_without_workflow_func(self, mock_client, workflow_info):
251-
"""Test WorkflowEngine without workflow function."""
260+
async def test_workflow_engine_without_workflow_definition(self, mock_client, workflow_info):
261+
"""Test WorkflowEngine without workflow definition."""
252262
engine = WorkflowEngine(
253263
info=workflow_info,
254264
client=mock_client,
255-
workflow_func=None
265+
workflow_definition=None
256266
)
257267

258268
decision_task = self.create_mock_decision_task()
@@ -269,12 +279,21 @@ async def test_workflow_engine_without_workflow_func(self, mock_client, workflow
269279
async def test_workflow_engine_workflow_completion(self, workflow_engine, mock_client):
270280
"""Test workflow completion detection."""
271281
decision_task = self.create_mock_decision_task()
272-
273-
# Mock workflow function to return a result (indicating completion)
274-
def completing_workflow_func(input_data):
275-
return "workflow-completed"
276-
277-
workflow_engine._workflow_func = completing_workflow_func
282+
283+
# Create a workflow definition that returns a result (indicating completion)
284+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
285+
286+
class CompletingWorkflow:
287+
@workflow.run
288+
def run(self, input_data):
289+
return "workflow-completed"
290+
291+
workflow_opts = WorkflowDefinitionOptions(name="completing_workflow")
292+
completing_definition = WorkflowDefinition.wrap(CompletingWorkflow, workflow_opts)
293+
294+
# Replace the workflow definition and instance
295+
workflow_engine._workflow_definition = completing_definition
296+
workflow_engine._workflow_instance = completing_definition.cls()
278297

279298
with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[]):
280299
# Process the decision

tests/cadence/worker/test_decision_task_handler.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,15 @@ def test_initialization(self, mock_client, mock_registry):
8383
async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry):
8484
"""Test successful decision task handling."""
8585
# Create actual workflow definition
86-
def mock_workflow_func():
87-
return "test_result"
86+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
87+
88+
class MockWorkflow:
89+
@workflow.run
90+
async def run(self):
91+
return "test_result"
8892

89-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
9093
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
91-
workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts)
94+
workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts)
9295
mock_registry.get_workflow.return_value = workflow_definition
9396

9497
# Mock workflow engine
@@ -148,12 +151,15 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp
148151
async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry):
149152
"""Test that decision task handler caches workflow engines for same workflow execution."""
150153
# Create actual workflow definition
151-
def mock_workflow_func():
152-
return "test_result"
154+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
155+
156+
class MockWorkflow:
157+
@workflow.run
158+
async def run(self):
159+
return "test_result"
153160

154-
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
155161
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
156-
workflow_definition = WorkflowDefinition.wrap(mock_workflow_func, workflow_opts)
162+
workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts)
157163
mock_registry.get_workflow.return_value = workflow_definition
158164

159165
# Mock workflow engine
@@ -182,9 +188,17 @@ def mock_workflow_func():
182188
@pytest.mark.asyncio
183189
async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry):
184190
"""Test that different workflow executions get separate engines."""
185-
# Mock workflow function
186-
mock_workflow_func = Mock()
187-
mock_registry.get_workflow.return_value = mock_workflow_func
191+
# Create actual workflow definition
192+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
193+
194+
class MockWorkflow:
195+
@workflow.run
196+
async def run(self):
197+
return "test_result"
198+
199+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
200+
workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts)
201+
mock_registry.get_workflow.return_value = workflow_definition
188202

189203
# Create two different decision tasks
190204
task1 = Mock(spec=PollForDecisionTaskResponse)
@@ -333,10 +347,17 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis
333347
@pytest.mark.asyncio
334348
async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry):
335349
"""Test that WorkflowEngine is created with correct WorkflowInfo."""
336-
mock_workflow_func = Mock()
337-
mock_workflow_definition = Mock()
338-
mock_workflow_definition.fn = mock_workflow_func
339-
mock_registry.get_workflow.return_value = mock_workflow_definition
350+
# Create actual workflow definition
351+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions, workflow
352+
353+
class MockWorkflow:
354+
@workflow.run
355+
async def run(self):
356+
return "test_result"
357+
358+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
359+
workflow_definition = WorkflowDefinition.wrap(MockWorkflow, workflow_opts)
360+
mock_registry.get_workflow.return_value = workflow_definition
340361

341362
mock_engine = Mock(spec=WorkflowEngine)
342363
mock_engine._is_workflow_complete = False # Add missing attribute
@@ -363,4 +384,4 @@ async def test_workflow_engine_creation_with_workflow_info(self, handler, sample
363384
call_args = mock_workflow_engine_class.call_args
364385
assert call_args[1]['info'] is not None
365386
assert call_args[1]['client'] == handler._client
366-
assert call_args[1]['workflow_func'] == mock_workflow_func
387+
assert call_args[1]['workflow_definition'] == workflow_definition

tests/cadence/worker/test_decision_task_handler_integration.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from cadence.api.v1.decision_pb2 import Decision
1414
from cadence.worker._decision_task_handler import DecisionTaskHandler
1515
from cadence.worker._registry import Registry
16+
from cadence.workflow import workflow
1617
from cadence.client import Client
1718

1819

@@ -37,9 +38,11 @@ def registry(self):
3738
reg = Registry()
3839

3940
@reg.workflow(name="test_workflow")
40-
def test_workflow(input_data):
41-
"""Simple test workflow that returns the input."""
42-
return f"processed: {input_data}"
41+
class TestWorkflow:
42+
@workflow.run
43+
async def run(self, input_data):
44+
"""Simple test workflow that returns the input."""
45+
return f"processed: {input_data}"
4346

4447
return reg
4548

tests/cadence/worker/test_decision_worker_integration.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes
1212
from cadence.worker._decision import DecisionWorker
1313
from cadence.worker._registry import Registry
14+
from cadence.workflow import workflow
1415
from cadence.client import Client
1516

1617

@@ -34,12 +35,14 @@ def mock_client(self):
3435
def registry(self):
3536
"""Create a registry with a test workflow."""
3637
reg = Registry()
37-
38+
3839
@reg.workflow
39-
def test_workflow(input_data):
40-
"""Simple test workflow that returns the input."""
41-
return f"processed: {input_data}"
42-
40+
class TestWorkflow:
41+
@workflow.run
42+
async def run(self, input_data):
43+
"""Simple test workflow that returns the input."""
44+
return f"processed: {input_data}"
45+
4346
return reg
4447

4548
@pytest.fixture
@@ -236,8 +239,10 @@ async def test_decision_worker_with_different_workflow_types(self, decision_work
236239
"""Test decision worker with different workflow types."""
237240
# Add another workflow to the registry
238241
@registry.workflow
239-
def another_workflow(input_data):
240-
return f"another-processed: {input_data}"
242+
class AnotherWorkflow:
243+
@workflow.run
244+
async def run(self, input_data):
245+
return f"another-processed: {input_data}"
241246

242247
# Create decision tasks for different workflow types
243248
task1 = self.create_mock_decision_task(workflow_type="test_workflow")

0 commit comments

Comments
 (0)