From e87fdf790c75495e725a8bbaf61b18fd13191869 Mon Sep 17 00:00:00 2001 From: Sean Stewart Date: Sun, 24 Nov 2024 07:53:29 -0500 Subject: [PATCH] fix(aiohttp): Use a contextvar to track the request span for streaming responses(#8108) Without this change, the request and response objects are not freed from memory until the asyncio Task is freed, which can create a memory leak. This change leverages a contextvar to accomplish the same result as the previous version, while ensuring any memory is freed once the current async context is exited. --- .../contrib/internal/aiohttp/middlewares.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/internal/aiohttp/middlewares.py b/ddtrace/contrib/internal/aiohttp/middlewares.py index 07c8afbb07d..4f7c151fa72 100644 --- a/ddtrace/contrib/internal/aiohttp/middlewares.py +++ b/ddtrace/contrib/internal/aiohttp/middlewares.py @@ -1,3 +1,5 @@ +import contextvars + from aiohttp import web from aiohttp.web_urldispatcher import SystemRoute @@ -70,8 +72,6 @@ async def attach_context(request): request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY] try: response = await handler(request) - if isinstance(response, web.StreamResponse): - request.task.add_done_callback(lambda _: finish_request_span(request, response)) return response except Exception: request_span.set_traceback() @@ -132,10 +132,24 @@ def finish_request_span(request, response): response_headers=response.headers, route=route, ) + if type(response) is web.StreamResponse and not response.task.done(): + request_span_var.set(request_span) + request.task.add_done_callback(span_done_callback) + request_span.finish() +def span_done_callback(task): + span = request_span_var.get(None) + if span: + span.finish() + request_span_var.set(None) + + +request_span_var = contextvars.ContextVar("__dd_request_span") + + async def on_prepare(request, response): """ The on_prepare signal is used to close the request span that is created during @@ -143,8 +157,6 @@ async def on_prepare(request, response): """ # NB isinstance is not appropriate here because StreamResponse is a parent of the other # aiohttp response types - if type(response) is web.StreamResponse and not response.task.done(): - return finish_request_span(request, response)