Skip to content

Commit 9410b26

Browse files
authored
feat: Support scheduling Activities within Workflows (cadence-workflow#41)
<!-- Describe what has changed in this PR --> **What changed?** - Add preliminary structure for scheduling activities within Workflow. - Invoking activity functions within a Workflow will schedule them as an activity. - DataConverter is no longer async so that it can be used outside of the main event loop. - CadenceError -> CadenceRpcError to indicate that it's the base error for all Rpc errors, not all errors the client may raise. - State machine restructure - State machines are split into multiple files and moved to `workflow.statemachine` - HistoryEvents are dispatched to state machines based on their class and a field specified in the decorator. This resolves the ambiguity in handling events and the custom handling needed for each type. - DecisionManager stores all state machines in a single ordered dictionary, and reorders it when state machines are manipulated. This is required for the decisions to be ordered correctly. - State machines interact with futures, completing them based on the history - Canceling the futures will cancel the activities/timers <!-- Tell your future self why have you made these changes --> **Why?** - Core workflow functionality <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** - Unit tests. We need the deterministic event loop and workflow execution supported before this can effectively be tested. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent f91c925 commit 9410b26

37 files changed

+1312
-1492
lines changed

cadence/_internal/activity/_activity_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def _report_failure(
7373
_logger.exception("Exception reporting activity failure")
7474

7575
async def _report_success(self, task: PollForActivityTaskResponse, result: Any):
76-
as_payload = await self._data_converter.to_data([result])
76+
as_payload = self._data_converter.to_data([result])
7777

7878
try:
7979
await self._client.worker_stub.RespondActivityTaskCompleted(

cadence/_internal/activity/_context.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ def __init__(
1919
self._activity_fn = activity_fn
2020

2121
async def execute(self, payload: Payload) -> Any:
22-
params = await self._to_params(payload)
22+
params = self._to_params(payload)
2323
with self._activate():
2424
return await self._activity_fn(*params)
2525

26-
async def _to_params(self, payload: Payload) -> list[Any]:
26+
def _to_params(self, payload: Payload) -> list[Any]:
2727
type_hints = [param.type_hint for param in self._activity_fn.params]
28-
return await self._client.data_converter.from_data(payload, type_hints)
28+
return self._client.data_converter.from_data(payload, type_hints)
2929

3030
def client(self) -> Client:
3131
return self._client
@@ -46,7 +46,7 @@ def __init__(
4646
self._executor = executor
4747

4848
async def execute(self, payload: Payload) -> Any:
49-
params = await self._to_params(payload)
49+
params = self._to_params(payload)
5050
loop = asyncio.get_running_loop()
5151
return await loop.run_in_executor(self._executor, self._run, params)
5252

0 commit comments

Comments
 (0)