Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for workflow ID conflict policy #579

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -328,6 +329,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -360,6 +362,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -392,6 +395,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -422,6 +426,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -455,6 +460,10 @@ async def start_workflow(
run_timeout: Timeout of a single workflow run.
task_timeout: Timeout of a single workflow task.
id_reuse_policy: How already-existing IDs are treated.
id_conflict_policy: How already-running workflows of the same ID are
treated. Default is unspecified which effectively means fail the
start attempt. This cannot be set if ``id_reuse_policy`` is set
to terminate if running.
Comment on lines +463 to +466
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just default it to FAIL then? Otherwise we're sort of at the mercy of whatever server feels like, which could either be a good thing or a bad thing depending on how you look at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we wanted to do, but unfortunately we can't. Unfortunately the server in temporalio/temporal#5507 will fail you if you do not set unspecified and you try to set terminate-if-running id reuse policy. This is an admitted unfortunate server case where unspecified is not treated the same as its default.

So if we default this to fail, a user's code that sets id reuse policy to terminate-if-running will start failing (they both cannot be set). This is the rare case where we have to leave it unset/unspecified as the default. This is why I document what the real default is, because users can't see it as the default param option.

retry_policy: Retry policy for the workflow.
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
Expand Down Expand Up @@ -510,6 +519,7 @@ async def start_workflow(
run_timeout=run_timeout,
task_timeout=task_timeout,
id_reuse_policy=id_reuse_policy,
id_conflict_policy=id_conflict_policy,
retry_policy=retry_policy,
cron_schedule=cron_schedule,
memo=memo,
Expand Down Expand Up @@ -537,6 +547,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -567,6 +578,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -599,6 +611,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -631,6 +644,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -661,6 +675,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -696,6 +711,7 @@ async def execute_workflow(
run_timeout=run_timeout,
task_timeout=task_timeout,
id_reuse_policy=id_reuse_policy,
id_conflict_policy=id_conflict_policy,
retry_policy=retry_policy,
cron_schedule=cron_schedule,
memo=memo,
Expand Down Expand Up @@ -4487,6 +4503,7 @@ class StartWorkflowInput:
run_timeout: Optional[timedelta]
task_timeout: Optional[timedelta]
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy
retry_policy: Optional[temporalio.common.RetryPolicy]
cron_schedule: str
memo: Optional[Mapping[str, Any]]
Expand Down Expand Up @@ -5008,6 +5025,10 @@ async def start_workflow(
"temporalio.api.enums.v1.WorkflowIdReusePolicy.ValueType",
int(input.id_reuse_policy),
)
req.workflow_id_conflict_policy = cast(
"temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType",
int(input.id_conflict_policy),
)
if input.retry_policy is not None:
input.retry_policy.apply_to_proto(req.retry_policy)
req.cron_schedule = input.cron_schedule
Expand Down
20 changes: 20 additions & 0 deletions temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ class WorkflowIDReusePolicy(IntEnum):
)


class WorkflowIDConflictPolicy(IntEnum):
"""How already-running workflows of the same ID are handled on start.

See :py:class:`temporalio.api.enums.v1.WorkflowIdConflictPolicy`.
"""

UNSPECIFIED = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
)
FAIL = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL
)
USE_EXISTING = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
)
TERMINATE_EXISTING = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
)


class QueryRejectCondition(IntEnum):
"""Whether a query should be rejected in certain conditions.

Expand Down
67 changes: 67 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
SearchAttributes,
SearchAttributeValues,
TypedSearchAttributes,
WorkflowIDConflictPolicy,
)
from temporalio.converter import (
DataConverter,
Expand Down Expand Up @@ -5505,3 +5506,69 @@ def _unfinished_handler_warning_cls(self) -> Type:
"update": workflow.UnfinishedUpdateHandlersWarning,
"signal": workflow.UnfinishedSignalHandlersWarning,
}[self.handler_type]


@workflow.defn
class IDConflictWorkflow:
# Just run forever
@workflow.run
async def run(self) -> None:
await workflow.wait_condition(lambda: False)


async def test_workflow_id_conflict(client: Client):
async with new_worker(client, IDConflictWorkflow) as worker:
# Start a workflow
handle = await client.start_workflow(
IDConflictWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, handle.id, run_id=handle.result_run_id
)

# Confirm another fails by default
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
)

# Confirm fails if explicitly given that option
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
)

# Confirm gives back same handle if requested
new_handle = await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
)
new_handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
)
assert new_handle.run_id == handle.run_id
assert (await handle.describe()).status == WorkflowExecutionStatus.RUNNING
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING

# Confirm terminates and starts new if requested
new_handle = await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.TERMINATE_EXISTING,
)
new_handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
)
assert new_handle.run_id != handle.run_id
assert (await handle.describe()).status == WorkflowExecutionStatus.TERMINATED
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING
Loading