diff --git a/temporalio/client.py b/temporalio/client.py index 822b9ac4..f83d138c 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6078,11 +6078,24 @@ def on_start( None, ) if status and status.code in RPCStatusCode: - err = RPCError( - status.message, - RPCStatusCode(status.code), - err.raw_grpc_status, - ) + if ( + status.code == RPCStatusCode.ALREADY_EXISTS + and status.details + ): + details = temporalio.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure() + if status.details[0].Unpack(details): + err = temporalio.exceptions.WorkflowAlreadyStartedError( + input.start_workflow_input.id, + input.start_workflow_input.workflow, + run_id=details.run_id, + ) + else: + err = RPCError( + status.message, + RPCStatusCode(status.code), + err.raw_grpc_status, + ) + raise err finally: if not seen_start: diff --git a/tests/worker/test_update_with_start.py b/tests/worker/test_update_with_start.py index 2ab16a93..ff57723d 100644 --- a/tests/worker/test_update_with_start.py +++ b/tests/worker/test_update_with_start.py @@ -14,7 +14,6 @@ Client, Interceptor, OutboundInterceptor, - RPCError, StartWorkflowUpdateWithStartInput, WithStartWorkflowOperation, WorkflowUpdateFailedError, @@ -24,8 +23,7 @@ from temporalio.common import ( WorkflowIDConflictPolicy, ) -from temporalio.exceptions import ApplicationError -from temporalio.service import RPCStatusCode +from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError from tests.helpers import ( new_worker, ) @@ -230,15 +228,14 @@ async def _do_execute_update_test( await start_op_2.workflow_handle() ).first_execution_run_id is not None else: - with pytest.raises(RPCError) as e: - await client.execute_update_with_start_workflow( + for aw in [ + client.execute_update_with_start_workflow( update_handler, "21", start_workflow_operation=start_op_2 - ) - - assert e.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS - with pytest.raises(RPCError) as e2: - await start_op_2.workflow_handle() - assert e2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS + ), + start_op_2.workflow_handle(), + ]: + with pytest.raises(WorkflowAlreadyStartedError): + await aw assert ( await start_op_1.workflow_handle() @@ -337,18 +334,18 @@ def make_start_op(workflow_id: str): # Second UWS start fails because the workflow already exists # first execution run ID is not set on the second UWS handle start_op_2 = make_start_op("wid-1") - with pytest.raises(RPCError) as exc_info: - await client.start_update_with_start_workflow( + + for aw in [ + client.start_update_with_start_workflow( WorkflowForUpdateWithStartTest.my_non_blocking_update, "2", wait_for_stage=WorkflowUpdateStage.COMPLETED, start_workflow_operation=start_op_2, - ) - assert exc_info.value.status == RPCStatusCode.ALREADY_EXISTS - - with pytest.raises(RPCError) as e2: - await start_op_2.workflow_handle() - assert e2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS + ), + start_op_2.workflow_handle(), + ]: + with pytest.raises(WorkflowAlreadyStartedError): + await aw # Third UWS start succeeds, but the update fails after acceptance start_op_3 = make_start_op("wid-2") @@ -421,18 +418,18 @@ def make_start_op(workflow_id: str): ) start_op_2 = make_start_op(wid) - with pytest.raises(RPCError) as exc_info: - await client.start_update_with_start_workflow( + + for aw in [ + client.start_update_with_start_workflow( WorkflowForUpdateWithStartTest.my_non_blocking_update, "2", wait_for_stage=WorkflowUpdateStage.COMPLETED, start_workflow_operation=start_op_2, - ) - assert exc_info.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS - - with pytest.raises(RPCError) as exc_info2: - await start_op_2.workflow_handle() - assert exc_info2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS + ), + start_op_2.workflow_handle(), + ]: + with pytest.raises(WorkflowAlreadyStartedError): + await aw async def test_workflow_update_poll_loop(client: Client):