From 67a46a771b5b1dbfea03dabd5c6dd10c55ab3987 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 24 Oct 2023 13:48:13 -0500 Subject: [PATCH 1/2] Support workflow start delay Fixes #404 --- temporalio/client.py | 16 ++++++++++++++++ tests/test_client.py | 23 +++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index 008cdbcc..d34ef866 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -275,6 +275,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -299,6 +300,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -325,6 +327,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -351,6 +354,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -375,6 +379,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -400,6 +405,7 @@ async def start_workflow( cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/ memo: Memo for the workflow. search_attributes: Search attributes for the workflow. + start_delay: Amount of time to wait before starting the workflow. start_signal: If present, this signal is sent as signal-with-start instead of traditional workflow start. start_signal_args: Arguments for start_signal if start_signal @@ -444,6 +450,7 @@ async def start_workflow( cron_schedule=cron_schedule, memo=memo, search_attributes=search_attributes, + start_delay=start_delay, headers={}, start_signal=start_signal, start_signal_args=start_signal_args, @@ -469,6 +476,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -493,6 +501,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -519,6 +528,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -545,6 +555,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -569,6 +580,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -597,6 +609,7 @@ async def execute_workflow( cron_schedule=cron_schedule, memo=memo, search_attributes=search_attributes, + start_delay=start_delay, start_signal=start_signal, start_signal_args=start_signal_args, rpc_metadata=rpc_metadata, @@ -3753,6 +3766,7 @@ class StartWorkflowInput: cron_schedule: str memo: Optional[Mapping[str, Any]] search_attributes: Optional[temporalio.common.SearchAttributes] + start_delay: Optional[timedelta] headers: Mapping[str, temporalio.api.common.v1.Payload] start_signal: Optional[str] start_signal_args: Sequence[Any] @@ -4233,6 +4247,8 @@ async def start_workflow( temporalio.converter.encode_search_attributes( input.search_attributes, req.search_attributes ) + if input.start_delay is not None: + req.workflow_start_delay.FromTimedelta(input.start_delay) if input.headers is not None: temporalio.common._apply_headers(input.headers, req.header.fields) diff --git a/tests/test_client.py b/tests/test_client.py index 7ce9d307..e24c7ab3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -140,6 +140,29 @@ async def test_start_with_signal(client: Client, worker: ExternalWorker): assert "some signal arg" == await handle.result() +async def test_start_delay( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Java test server does not support start delay") + start_delay = timedelta(hours=1, minutes=20, seconds=30) + handle = await client.start_workflow( + "kitchen_sink", + KSWorkflowParams( + actions=[KSAction(result=KSResultAction(value="some result"))] + ), + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + start_delay=start_delay, + ) + # Check that first event has start delay + first_event = [e async for e in handle.fetch_history_events()][0] + assert ( + start_delay + == first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta() + ) + + async def test_result_follow_continue_as_new( client: Client, worker: ExternalWorker, env: WorkflowEnvironment ): From 55953e32a776a0a4f44c496263139370c6edec45 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 24 Oct 2023 16:10:04 -0500 Subject: [PATCH 2/2] Minor updates --- temporalio/client.py | 2 ++ tests/test_client.py | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index d34ef866..aceb3301 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -406,6 +406,8 @@ async def start_workflow( memo: Memo for the workflow. search_attributes: Search attributes for the workflow. start_delay: Amount of time to wait before starting the workflow. + This does not work with ``cron_schedule``. This is currently + experimental. start_signal: If present, this signal is sent as signal-with-start instead of traditional workflow start. start_signal_args: Arguments for start_signal if start_signal diff --git a/tests/test_client.py b/tests/test_client.py index e24c7ab3..5760f7a6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -163,6 +163,30 @@ async def test_start_delay( ) +async def test_signal_with_start_delay( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Java test server does not support start delay") + start_delay = timedelta(hours=1, minutes=20, seconds=30) + handle = await client.start_workflow( + "kitchen_sink", + KSWorkflowParams( + actions=[KSAction(result=KSResultAction(value="some result"))] + ), + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + start_delay=start_delay, + start_signal="some-signal", + ) + # Check that first event has start delay + first_event = [e async for e in handle.fetch_history_events()][0] + assert ( + start_delay + == first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta() + ) + + async def test_result_follow_continue_as_new( client: Client, worker: ExternalWorker, env: WorkflowEnvironment ):