-
Notifications
You must be signed in to change notification settings - Fork 396
Store the LoggingContext
in a ContextVar
#18871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
224cb3f
3742b3b
4303879
0c8759b
3e66e0a
1f384b0
c7a80b6
93044f4
6ce2f3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Store the `LoggingContext` in a `ContextVar` instead of a thread-local variable. |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -601,6 +601,12 @@ def run_sighup(*args: Any, **kwargs: Any) -> None: | |
hs.get_datastores().main.db_pool.start_profiling() | ||
hs.get_pusherpool().start() | ||
|
||
# Register background tasks required by this server. This must be done | ||
# somewhat manually due to the background tasks not being registered | ||
# unless handlers are instantiated. | ||
if hs.config.worker.run_background_tasks: | ||
hs.start_background_tasks() | ||
Comment on lines
+604
to
+608
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Split out this change to #18886 since it seems good in any case And this PR may get stale |
||
|
||
# Log when we start the shut down process. | ||
hs.get_reactor().addSystemEventTrigger( | ||
"before", "shutdown", logger.info, "Shutting down..." | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import threading | ||
import typing | ||
import warnings | ||
from contextvars import ContextVar | ||
from types import TracebackType | ||
from typing import ( | ||
TYPE_CHECKING, | ||
|
@@ -653,13 +654,12 @@ def __exit__( | |
) | ||
|
||
|
||
_thread_local = threading.local() | ||
_thread_local.current_context = SENTINEL_CONTEXT | ||
_current_context: ContextVar[LoggingContextOrSentinel] = ContextVar("current_context") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error:
|
||
|
||
|
||
def current_context() -> LoggingContextOrSentinel: | ||
"""Get the current logging context from thread local storage""" | ||
return getattr(_thread_local, "current_context", SENTINEL_CONTEXT) | ||
return _current_context.get(SENTINEL_CONTEXT) | ||
|
||
|
||
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: The docstring needs to be updated |
||
|
@@ -680,7 +680,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe | |
if current is not context: | ||
rusage = get_thread_resource_usage() | ||
current.stop(rusage) | ||
_thread_local.current_context = context | ||
_current_context.set(context) | ||
context.start(rusage) | ||
|
||
return current | ||
|
@@ -796,7 +796,6 @@ def run_in_background( | |
CRITICAL error about an unhandled error will be logged without much | ||
indication about where it came from. | ||
""" | ||
current = current_context() | ||
try: | ||
res = f(*args, **kwargs) | ||
except Exception: | ||
|
@@ -825,23 +824,6 @@ def run_in_background( | |
# optimise out the messing about | ||
return d | ||
|
||
# The function may have reset the context before returning, so | ||
# we need to restore it now. | ||
ctx = set_current_context(current) | ||
|
||
# The original context will be restored when the deferred | ||
# completes, but there is nothing waiting for it, so it will | ||
# get leaked into the reactor or some other function which | ||
# wasn't expecting it. We therefore need to reset the context | ||
# here. | ||
# | ||
# (If this feels asymmetric, consider it this way: we are | ||
# effectively forking a new thread of execution. We are | ||
# probably currently within a ``with LoggingContext()`` block, | ||
# which is supposed to have a single entry and exit point. But | ||
# by spawning off another deferred, we are effectively | ||
# adding a new exit point.) | ||
d.addBoth(_set_context_cb, ctx) | ||
return d | ||
|
||
|
||
|
@@ -861,65 +843,20 @@ def run_coroutine_in_background( | |
cannot change the log contexts. | ||
""" | ||
|
||
current = current_context() | ||
d = defer.ensureDeferred(coroutine) | ||
|
||
# The function may have reset the context before returning, so | ||
# we need to restore it now. | ||
ctx = set_current_context(current) | ||
|
||
# The original context will be restored when the deferred | ||
# completes, but there is nothing waiting for it, so it will | ||
# get leaked into the reactor or some other function which | ||
# wasn't expecting it. We therefore need to reset the context | ||
# here. | ||
# | ||
# (If this feels asymmetric, consider it this way: we are | ||
# effectively forking a new thread of execution. We are | ||
# probably currently within a ``with LoggingContext()`` block, | ||
# which is supposed to have a single entry and exit point. But | ||
# by spawning off another deferred, we are effectively | ||
# adding a new exit point.) | ||
d.addBoth(_set_context_cb, ctx) | ||
return d | ||
return defer.ensureDeferred(coroutine) | ||
|
||
|
||
T = TypeVar("T") | ||
|
||
|
||
# TODO: This function is a no-op now and should be removed in a follow-up PR. | ||
def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": | ||
Comment on lines
+852
to
853
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"""Given a deferred, make it follow the Synapse logcontext rules: | ||
|
||
If the deferred has completed, essentially does nothing (just returns another | ||
completed deferred with the result/failure). | ||
|
||
If the deferred has not yet completed, resets the logcontext before | ||
returning a deferred. Then, when the deferred completes, restores the | ||
current logcontext before running callbacks/errbacks. | ||
|
||
(This is more-or-less the opposite operation to run_in_background.) | ||
""" | ||
if deferred.called and not deferred.paused: | ||
# it looks like this deferred is ready to run any callbacks we give it | ||
# immediately. We may as well optimise out the logcontext faffery. | ||
return deferred | ||
|
||
# ok, we can't be sure that a yield won't block, so let's reset the | ||
# logcontext, and add a callback to the deferred to restore it. | ||
prev_context = set_current_context(SENTINEL_CONTEXT) | ||
deferred.addBoth(_set_context_cb, prev_context) | ||
return deferred | ||
|
||
|
||
ResultT = TypeVar("ResultT") | ||
|
||
|
||
def _set_context_cb(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: | ||
"""A callback function which just sets the logging context""" | ||
set_current_context(context) | ||
return result | ||
|
||
|
||
def defer_to_thread( | ||
reactor: "ISynapseReactor", f: Callable[P, R], *args: P.args, **kwargs: P.kwargs | ||
) -> "defer.Deferred[R]": | ||
|
@@ -931,9 +868,6 @@ def defer_to_thread( | |
logcontext (so its CPU usage metrics will get attributed to the current | ||
logcontext). `f` should preserve the logcontext it is given. | ||
|
||
The result deferred follows the Synapse logcontext rules: you should `yield` | ||
on it. | ||
|
||
Args: | ||
reactor: The reactor in whose main thread the Deferred will be invoked, | ||
and whose threadpool we should use for the function. | ||
|
@@ -971,9 +905,6 @@ def defer_to_threadpool( | |
logcontext (so its CPU usage metrics will get attributed to the current | ||
logcontext). `f` should preserve the logcontext it is given. | ||
|
||
The result deferred follows the Synapse logcontext rules: you should `yield` | ||
on it. | ||
|
||
Args: | ||
reactor: The reactor in whose main thread the Deferred will be invoked. | ||
Normally this will be hs.get_reactor(). | ||
|
@@ -991,18 +922,6 @@ def defer_to_threadpool( | |
A Deferred which fires a callback with the result of `f`, or an | ||
errback if `f` throws an exception. | ||
""" | ||
curr_context = current_context() | ||
if not curr_context: | ||
logger.warning( | ||
"Calling defer_to_threadpool from sentinel context: metrics will be lost" | ||
) | ||
parent_context = None | ||
else: | ||
assert isinstance(curr_context, LoggingContext) | ||
parent_context = curr_context | ||
|
||
def g() -> R: | ||
with LoggingContext(str(curr_context), parent_context=parent_context): | ||
return f(*args, **kwargs) | ||
|
||
return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g)) | ||
return make_deferred_yieldable( | ||
threads.deferToThreadPool(reactor, threadpool, f, *args, **kwargs) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #18871 (comment) for why we've removed the
patch_inline_callbacks