-
Notifications
You must be signed in to change notification settings - Fork 137
Description
Is your feature request related to a problem? Please describe.
Heartbeat timeout failures can be a major pain point when working with async Temporal activities. It would be great if the OpenTelemetry TracingInterceptor
contrib were able to mark spans as failed for cancellations due to heartbeat timeouts, so we can identify workers with heartbeat issues more easily and assess the scale of the issue.
The SDK metrics also don't help much in this case, since activity_execution_failed doesn't provide a reason/code label.
Describe the solution you'd like
Something like this works for the async/threaded cases, though it doesn't distinguish different kinds of cancellation reasons. It doesn't work for sync, multi-processed activities since they don't raise cancellation errors.
class _TracingActivityInboundInterceptor(
temporalio.worker.ActivityInboundInterceptor
):
def __init__(
self,
next: temporalio.worker.ActivityInboundInterceptor,
root: TracingInterceptor,
) -> None:
super().__init__(next)
self.root = root
async def execute_activity(
self, input: temporalio.worker.ExecuteActivityInput
) -> Any:
info = temporalio.activity.info()
with self.root.tracer.start_as_current_span(
f"RunActivity:{info.activity_type}",
context=self.root._context_from_headers(input.headers),
attributes={
"temporalWorkflowID": info.workflow_id,
"temporalRunID": info.workflow_run_id,
"temporalActivityID": info.activity_id,
},
kind=opentelemetry.trace.SpanKind.SERVER,
) as span:
try:
return await super().execute_activity(input)
except BaseException as e:
# Catch heartbeat cancellation errors to mark span as failed
# Note: asyncio.CancelledError extends BaseException which would be ingored by span
if isinstance(e, asyncio.CancelledError):
span.record_exception(e)
span.set_status(
opentelemetry.trace.Status(
status_code=opentelemetry.trace.StatusCode.ERROR,
description=f"{type(e).__name__}: {e}",
)
)
raise e
Additional context
This only really applies in the case where the worker/activities are blocked for some reason and fail to heartbeat. Since the activity is still actually running, they receive a cancellation request from the server on their next, eventual heartbeat. If the worker completely died, I wouldn't expect to see any spans at all.
The problem is asyncio.CancelledError
extends BaseException
, but the OTEL Tracer's start_as_current_span
only catches Exception
type errors (reference), since exceptions directly subclassing BaseException
are usually not considered errors, e.g. GeneratorExit
, KeyboardInterrupt
, CancelledError
(according to that code).
However, in this context, a CancelledError
(both async and the sync Temporal kind) can represent a major failure of the system to behave as intended.
The second problem is that it doesn't look like there is enough context in the TracingInterceptor
to distinguish different kinds of cancellation reasons, including:
- cancelled by user (e.g. in Temporal UI, SDK, or CLI) - shouldn't be marked as failed
- cancelled by workflow due to a failure in some other parallel branch, e.g.
await asyncio.gather(activity1, activity2, ...)
- probably not a failure for this activity? - cancelled by heartbeat timeout - IMO is a failure
- cancelled by activity timeout - could be context specific, leaning towards a failure though
- cancelled due to activity pause - not a failure
- cancelled due to worker shutdown event - not a failure
I'm not sure what the correct semantics should be for each of these cases. It would probably help to align them with the existing SDK metrics, e.g. activity_execution_cancelled
and activity_execution_failed
(Note: I've not actually seen any activity_execution_cancelled
in my app as its Java only...) so not sure if all these cancellation errors are currently being emitted as activity_execution_failed
or not? If that is the case, then it might make sense to simply make all cancellations fail the span (though I do think it should be more refined than that, e.g. involuntary cancellations due to something bad happening, vs voluntary cancellations initiated by the user/system).
The _Context
object's cancellation_details
doesn't seem to contain anything useful, as its not set during the _ActivityWorker
's _heartbeat_async
method, and the interceptor doesn't have access to the info in _RunningActivity
, from what I could tell. Updating the cancellation_details
when a heartbeat error is received could solve this part of the issue since we can access it in the interceptor via _Context
/ cancellation_details
accessor.
Lastly, it doesn't look like cancellation happens at all in multi-process workers. After debugging, I saw that even after the whole workflow failed, the sync activity continued until completion. The thread_event=manager.new_event()
event set in _ActivityWorker
doesn't seem to be actively used yet. The result returned through the TracingInterceptor
was the expected return value, not an exception, so these aren't even caught as exception events. However, I rarely have issues with the multi-processor worker, since I do its auto-heartbeating in a background thread. Though, I have seen these fail occasionally, maybe due to GIL blocking or CPU throttling.
It would be nice if cancellations could be applied properly to sync, multi-processed activities to save time and resources. I've raised that as a separate issue: #1048.
Since this fix isn't likely that straight-forward, I'll likely patch this in my own interceptor for now. This has been suggested in these other tracing related issues:
- fix(opentelemetry): trace context propagation in process-pool workers #1017
- [Feature Request] Reclassify Benign Application errors in OpenTelemetry #1041
Cheers,
I'm happy to contribute towards all the issues mentioned in this area, if the solution/requirements are clarified. I also want to extend the SDK's runtime metrics support to sync, mp activities too. So I'm not shy to pick up fairly challenging bits of work to help out 🙏🏻