From 6b0972d7c9a25e602d14ca9dfb8b9088ca6844cd Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 24 Oct 2023 15:00:36 -0700 Subject: [PATCH] Renames / no poll update tracing --- temporalio/client.py | 21 +++++++++------------ temporalio/contrib/opentelemetry.py | 17 +++-------------- temporalio/worker/_interceptor.py | 2 +- tests/contrib/test_opentelemetry.py | 4 ++-- tests/test_client.py | 8 ++++---- 5 files changed, 19 insertions(+), 33 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 2b1d5708..2f989081 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1795,7 +1795,7 @@ async def _start_update( update_name = str(update) return await self._client._impl.start_workflow_update( - UpdateWorkflowInput( + StartWorkflowUpdateInput( id=self._id, run_id=self._run_id, update_id=id, @@ -3940,13 +3940,12 @@ async def result( ) return await self._client._impl.poll_workflow_update( - PollUpdateWorkflowInput( + PollWorkflowUpdateInput( self.workflow_id, self.workflow_run_id, self.id, self.name, timeout, - {}, self._result_type, rpc_metadata, rpc_timeout, @@ -4157,7 +4156,7 @@ class TerminateWorkflowInput: @dataclass -class UpdateWorkflowInput: +class StartWorkflowUpdateInput: """Input for :py:meth:`OutboundInterceptor.start_workflow_update`.""" id: str @@ -4175,7 +4174,7 @@ class UpdateWorkflowInput: @dataclass -class PollUpdateWorkflowInput: +class PollWorkflowUpdateInput: """Input for :py:meth:`OutboundInterceptor.poll_workflow_update`.""" workflow_id: str @@ -4183,7 +4182,6 @@ class PollUpdateWorkflowInput: update_id: str update: str timeout: Optional[timedelta] - headers: Mapping[str, temporalio.api.common.v1.Payload] ret_type: Optional[Type] rpc_metadata: Mapping[str, str] rpc_timeout: Optional[timedelta] @@ -4434,12 +4432,12 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None: await self.next.terminate_workflow(input) async def start_workflow_update( - self, input: UpdateWorkflowInput + self, input: StartWorkflowUpdateInput ) -> WorkflowUpdateHandle: """Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call.""" return await self.next.start_workflow_update(input) - async def poll_workflow_update(self, input: PollUpdateWorkflowInput) -> Any: + async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any: """May be called when calling :py:meth:`WorkflowUpdateHandle.result`.""" return await self.next.poll_workflow_update(input) @@ -4766,7 +4764,7 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None: ) async def start_workflow_update( - self, input: UpdateWorkflowInput + self, input: StartWorkflowUpdateInput ) -> WorkflowUpdateHandle: wait_policy = ( temporalio.api.update.v1.WaitPolicy(lifecycle_stage=input.wait_for_stage) @@ -4819,7 +4817,7 @@ async def start_workflow_update( return update_handle - async def poll_workflow_update(self, input: PollUpdateWorkflowInput) -> Any: + async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any: req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest( namespace=self._client.namespace, update_ref=temporalio.api.update.v1.UpdateRef( @@ -4859,8 +4857,7 @@ async def poll_loop(): # Wait for at most the *overall* timeout return await asyncio.wait_for( - poll_loop(), - input.timeout.total_seconds() if input.timeout else sys.float_info.max, + poll_loop(), input.timeout.total_seconds() if input.timeout else None ) ### Async activity calls diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 076b21f0..906ec2be 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -245,7 +245,7 @@ async def signal_workflow( return await super().signal_workflow(input) async def start_workflow_update( - self, input: temporalio.client.UpdateWorkflowInput + self, input: temporalio.client.StartWorkflowUpdateInput ) -> temporalio.client.WorkflowUpdateHandle: with self.root._start_as_current_span( f"StartWorkflowUpdate:{input.update}", @@ -255,17 +255,6 @@ async def start_workflow_update( ): return await super().start_workflow_update(input) - async def poll_workflow_update( - self, input: temporalio.client.PollUpdateWorkflowInput - ) -> Any: - with self.root._start_as_current_span( - f"PollWorkflowUpdate:{input.update}", - attributes={"temporalWorkflowID": input.workflow_id}, - input=input, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().poll_workflow_update(input) - class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): def __init__( @@ -440,7 +429,7 @@ def handle_update_validator( )[0] with self._top_level_workflow_context(success_is_complete=False): self._completed_span( - f"HandleUpdateValidator:{input.update}", + f"ValidateUpdate:{input.update}", link_context_carrier=link_context_carrier, kind=opentelemetry.trace.SpanKind.SERVER, ) @@ -460,7 +449,7 @@ async def handle_update_handler( )[0] with self._top_level_workflow_context(success_is_complete=False): self._completed_span( - f"HandleUpdateHandler:{input.update}", + f"HandleUpdate:{input.update}", link_context_carrier=link_context_carrier, kind=opentelemetry.trace.SpanKind.SERVER, ) diff --git a/temporalio/worker/_interceptor.py b/temporalio/worker/_interceptor.py index 1c28b611..af85161d 100644 --- a/temporalio/worker/_interceptor.py +++ b/temporalio/worker/_interceptor.py @@ -328,7 +328,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any: def handle_update_validator(self, input: HandleUpdateInput) -> None: """Called to handle an update's validation stage.""" - return self.next.handle_update_validator(input) + self.next.handle_update_validator(input) async def handle_update_handler(self, input: HandleUpdateInput) -> Any: """Called to handle an update's handler.""" diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index aba35f8f..bd986f2d 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -253,8 +253,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): " RunWorkflow:TracingWorkflow", " MyCustomSpan", " HandleSignal:signal (links: SignalWorkflow:signal)", - " HandleUpdateValidator:update (links: StartWorkflowUpdate:update)", - " HandleUpdateHandler:update (links: StartWorkflowUpdate:update)", + " ValidateUpdate:update (links: StartWorkflowUpdate:update)", + " HandleUpdate:update (links: StartWorkflowUpdate:update)", " StartActivity:tracing_activity", " RunActivity:tracing_activity", " RunActivity:tracing_activity", diff --git a/tests/test_client.py b/tests/test_client.py index 476d9882..622768a8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -36,7 +36,7 @@ Client, Interceptor, OutboundInterceptor, - PollUpdateWorkflowInput, + PollWorkflowUpdateInput, QueryWorkflowInput, RPCError, RPCStatusCode, @@ -56,9 +56,9 @@ ScheduleUpdateInput, SignalWorkflowInput, StartWorkflowInput, + StartWorkflowUpdateInput, TaskReachabilityType, TerminateWorkflowInput, - UpdateWorkflowInput, WorkflowContinuedAsNewError, WorkflowExecutionStatus, WorkflowFailureError, @@ -404,13 +404,13 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None: return await super().terminate_workflow(input) async def start_workflow_update( - self, input: UpdateWorkflowInput + self, input: StartWorkflowUpdateInput ) -> WorkflowUpdateHandle: self._parent.traces.append(("start_workflow_update", input)) return await super().start_workflow_update(input) async def poll_workflow_update( - self, input: PollUpdateWorkflowInput + self, input: PollWorkflowUpdateInput ) -> WorkflowUpdateHandle: self._parent.traces.append(("poll_workflow_update", input)) return await super().poll_workflow_update(input)