Skip to content

Commit

Permalink
Renames / no poll update tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Oct 24, 2023
1 parent 5d2b614 commit 6b0972d
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 33 deletions.
21 changes: 9 additions & 12 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,7 @@ async def _start_update(
update_name = str(update)

return await self._client._impl.start_workflow_update(
UpdateWorkflowInput(
StartWorkflowUpdateInput(
id=self._id,
run_id=self._run_id,
update_id=id,
Expand Down Expand Up @@ -3940,13 +3940,12 @@ async def result(
)

return await self._client._impl.poll_workflow_update(
PollUpdateWorkflowInput(
PollWorkflowUpdateInput(
self.workflow_id,
self.workflow_run_id,
self.id,
self.name,
timeout,
{},
self._result_type,
rpc_metadata,
rpc_timeout,
Expand Down Expand Up @@ -4157,7 +4156,7 @@ class TerminateWorkflowInput:


@dataclass
class UpdateWorkflowInput:
class StartWorkflowUpdateInput:
"""Input for :py:meth:`OutboundInterceptor.start_workflow_update`."""

id: str
Expand All @@ -4175,15 +4174,14 @@ class UpdateWorkflowInput:


@dataclass
class PollUpdateWorkflowInput:
class PollWorkflowUpdateInput:
"""Input for :py:meth:`OutboundInterceptor.poll_workflow_update`."""

workflow_id: str
run_id: Optional[str]
update_id: str
update: str
timeout: Optional[timedelta]
headers: Mapping[str, temporalio.api.common.v1.Payload]
ret_type: Optional[Type]
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]
Expand Down Expand Up @@ -4434,12 +4432,12 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
await self.next.terminate_workflow(input)

async def start_workflow_update(
self, input: UpdateWorkflowInput
self, input: StartWorkflowUpdateInput
) -> WorkflowUpdateHandle:
"""Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call."""
return await self.next.start_workflow_update(input)

async def poll_workflow_update(self, input: PollUpdateWorkflowInput) -> Any:
async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any:
"""May be called when calling :py:meth:`WorkflowUpdateHandle.result`."""
return await self.next.poll_workflow_update(input)

Expand Down Expand Up @@ -4766,7 +4764,7 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
)

async def start_workflow_update(
self, input: UpdateWorkflowInput
self, input: StartWorkflowUpdateInput
) -> WorkflowUpdateHandle:
wait_policy = (
temporalio.api.update.v1.WaitPolicy(lifecycle_stage=input.wait_for_stage)
Expand Down Expand Up @@ -4819,7 +4817,7 @@ async def start_workflow_update(

return update_handle

async def poll_workflow_update(self, input: PollUpdateWorkflowInput) -> Any:
async def poll_workflow_update(self, input: PollWorkflowUpdateInput) -> Any:
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest(
namespace=self._client.namespace,
update_ref=temporalio.api.update.v1.UpdateRef(
Expand Down Expand Up @@ -4859,8 +4857,7 @@ async def poll_loop():

# Wait for at most the *overall* timeout
return await asyncio.wait_for(
poll_loop(),
input.timeout.total_seconds() if input.timeout else sys.float_info.max,
poll_loop(), input.timeout.total_seconds() if input.timeout else None
)

### Async activity calls
Expand Down
17 changes: 3 additions & 14 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async def signal_workflow(
return await super().signal_workflow(input)

async def start_workflow_update(
self, input: temporalio.client.UpdateWorkflowInput
self, input: temporalio.client.StartWorkflowUpdateInput
) -> temporalio.client.WorkflowUpdateHandle:
with self.root._start_as_current_span(
f"StartWorkflowUpdate:{input.update}",
Expand All @@ -255,17 +255,6 @@ async def start_workflow_update(
):
return await super().start_workflow_update(input)

async def poll_workflow_update(
self, input: temporalio.client.PollUpdateWorkflowInput
) -> Any:
with self.root._start_as_current_span(
f"PollWorkflowUpdate:{input.update}",
attributes={"temporalWorkflowID": input.workflow_id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().poll_workflow_update(input)


class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
def __init__(
Expand Down Expand Up @@ -440,7 +429,7 @@ def handle_update_validator(
)[0]
with self._top_level_workflow_context(success_is_complete=False):
self._completed_span(
f"HandleUpdateValidator:{input.update}",
f"ValidateUpdate:{input.update}",
link_context_carrier=link_context_carrier,
kind=opentelemetry.trace.SpanKind.SERVER,
)
Expand All @@ -460,7 +449,7 @@ async def handle_update_handler(
)[0]
with self._top_level_workflow_context(success_is_complete=False):
self._completed_span(
f"HandleUpdateHandler:{input.update}",
f"HandleUpdate:{input.update}",
link_context_carrier=link_context_carrier,
kind=opentelemetry.trace.SpanKind.SERVER,
)
Expand Down
2 changes: 1 addition & 1 deletion temporalio/worker/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any:

def handle_update_validator(self, input: HandleUpdateInput) -> None:
"""Called to handle an update's validation stage."""
return self.next.handle_update_validator(input)
self.next.handle_update_validator(input)

async def handle_update_handler(self, input: HandleUpdateInput) -> Any:
"""Called to handle an update's handler."""
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
" RunWorkflow:TracingWorkflow",
" MyCustomSpan",
" HandleSignal:signal (links: SignalWorkflow:signal)",
" HandleUpdateValidator:update (links: StartWorkflowUpdate:update)",
" HandleUpdateHandler:update (links: StartWorkflowUpdate:update)",
" ValidateUpdate:update (links: StartWorkflowUpdate:update)",
" HandleUpdate:update (links: StartWorkflowUpdate:update)",
" StartActivity:tracing_activity",
" RunActivity:tracing_activity",
" RunActivity:tracing_activity",
Expand Down
8 changes: 4 additions & 4 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
Client,
Interceptor,
OutboundInterceptor,
PollUpdateWorkflowInput,
PollWorkflowUpdateInput,
QueryWorkflowInput,
RPCError,
RPCStatusCode,
Expand All @@ -56,9 +56,9 @@
ScheduleUpdateInput,
SignalWorkflowInput,
StartWorkflowInput,
StartWorkflowUpdateInput,
TaskReachabilityType,
TerminateWorkflowInput,
UpdateWorkflowInput,
WorkflowContinuedAsNewError,
WorkflowExecutionStatus,
WorkflowFailureError,
Expand Down Expand Up @@ -404,13 +404,13 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
return await super().terminate_workflow(input)

async def start_workflow_update(
self, input: UpdateWorkflowInput
self, input: StartWorkflowUpdateInput
) -> WorkflowUpdateHandle:
self._parent.traces.append(("start_workflow_update", input))
return await super().start_workflow_update(input)

async def poll_workflow_update(
self, input: PollUpdateWorkflowInput
self, input: PollWorkflowUpdateInput
) -> WorkflowUpdateHandle:
self._parent.traces.append(("poll_workflow_update", input))
return await super().poll_workflow_update(input)
Expand Down

0 comments on commit 6b0972d

Please sign in to comment.