Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mutual exclusion for synchronized stream access in logging handlers and CLPLoglevelTimeout (fixes #55). #59

Merged
merged 12 commits into from
Feb 28, 2025
87 changes: 59 additions & 28 deletions src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pathlib import Path
from queue import Empty, Queue
from signal import SIGINT, signal, SIGTERM
from threading import Thread, Timer
from threading import RLock, Thread, Timer
from types import FrameType
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union

Expand Down Expand Up @@ -175,6 +175,11 @@ class CLPLogLevelTimeout:
the last timeout. Therefore, if we've seen a log level with a low
delta, that delta will continue to be used to calculate the soft
timer until a timeout occurs.

Thread safety:
- This class locks any operations on the stream set by `set_ostream`.
- Any logging handler with a timeout object should lock the stream
operations using the lock return by `get_lock()`.
"""

# delta times in milliseconds
Expand Down Expand Up @@ -226,29 +231,34 @@ def __init__(
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes]]] = None
self.hard_timeout_thread: Optional[Timer] = None
self.soft_timeout_thread: Optional[Timer] = None
self.lock: RLock = RLock()

def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes]]) -> None:
self.ostream = ostream

def get_lock(self) -> RLock:
return self.lock

def timeout(self) -> None:
"""
Wraps the call to the user supplied `timeout_fn` ensuring that any
existing timeout threads are cancelled, `next_hard_timeout_ts` and
`min_soft_timeout_delta` are reset, and the zstandard frame is flushed.
"""
if self.hard_timeout_thread:
self.hard_timeout_thread.cancel()
if self.soft_timeout_thread:
self.soft_timeout_thread.cancel()
self.next_hard_timeout_ts = ULONG_MAX
self.min_soft_timeout_delta = ULONG_MAX

if self.ostream:
if isinstance(self.ostream, ZstdCompressionWriter):
self.ostream.flush(FLUSH_FRAME)
else:
self.ostream.flush()
self.timeout_fn()
with self.get_lock():
if self.hard_timeout_thread:
self.hard_timeout_thread.cancel()
if self.soft_timeout_thread:
self.soft_timeout_thread.cancel()
self.next_hard_timeout_ts = ULONG_MAX
self.min_soft_timeout_delta = ULONG_MAX

if self.ostream:
if isinstance(self.ostream, ZstdCompressionWriter):
self.ostream.flush(FLUSH_FRAME)
else:
self.ostream.flush()
self.timeout_fn()

def update(self, loglevel: int, log_timestamp_ms: int, log_fn: Callable[[str], None]) -> None:
"""
Expand Down Expand Up @@ -302,6 +312,21 @@ def update(self, loglevel: int, log_timestamp_ms: int, log_fn: Callable[[str], N
self.soft_timeout_thread.start()


def _get_mutex_context_from_loglevel_timeout(loglevel_timeout: Optional[CLPLogLevelTimeout]) -> Any:
"""
Gets a mutual exclusive context manager for IR stream access.

NOTE: The return type should be `AbstractContextManager[Optional[bool]]`,
but it is annotated as `Any` to satisfy the linter in Python 3.7 and 3.8,
as `AbstractContextManager` was introduced in Python 3.9 (#18239).

:param loglevel_timeout: An optional `CLPLogLevelTimeout` object.
:return: A context manager that either provides the lock from
`loglevel_timeout` or a `nullcontext` if `loglevel_timeout` is `None`.
"""
return loglevel_timeout.get_lock() if loglevel_timeout else nullcontext()


class CLPSockListener:
"""
Server that listens to a named Unix domain socket for `CLPSockHandler`
Expand Down Expand Up @@ -451,18 +476,21 @@ def log_fn(msg: str) -> None:
timestamp_ms - last_timestamp_ms
)
last_timestamp_ms = timestamp_ms
if loglevel_timeout:
loglevel_timeout.update(loglevel, last_timestamp_ms, log_fn)
buf += timestamp_buf
ostream.write(buf)
with _get_mutex_context_from_loglevel_timeout(loglevel_timeout):
if loglevel_timeout:
loglevel_timeout.update(loglevel, last_timestamp_ms, log_fn)
ostream.write(buf)
if loglevel_timeout:
loglevel_timeout.timeout()
ostream.write(EOF_CHAR)

if enable_compression:
# Since we are not using context manager, the ostream should be
# explicitly closed.
ostream.close()
with _get_mutex_context_from_loglevel_timeout(loglevel_timeout):
ostream.write(EOF_CHAR)

if enable_compression:
# Since we are not using context manager, the ostream should be
# explicitly closed.
ostream.close()
# tell _server to exit
CLPSockListener._signaled = True
return 0
Expand Down Expand Up @@ -740,17 +768,19 @@ def _direct_write(self, msg: str) -> None:
raise RuntimeError("Stream already closed")
clp_msg: bytearray
clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)
self.ostream.write(clp_msg)
with _get_mutex_context_from_loglevel_timeout(self.loglevel_timeout):
self.ostream.write(clp_msg)

# override
def _write(self, loglevel: int, msg: str) -> None:
if self.closed:
raise RuntimeError("Stream already closed")
clp_msg: bytearray
clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)
if self.loglevel_timeout:
self.loglevel_timeout.update(loglevel, self.last_timestamp_ms, self._direct_write)
self.ostream.write(clp_msg)
with _get_mutex_context_from_loglevel_timeout(self.loglevel_timeout):
if self.loglevel_timeout:
self.loglevel_timeout.update(loglevel, self.last_timestamp_ms, self._direct_write)
self.ostream.write(clp_msg)

# Added to logging.StreamHandler in python 3.7
# override
Expand All @@ -775,8 +805,9 @@ def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]:
def close(self) -> None:
if self.loglevel_timeout:
self.loglevel_timeout.timeout()
self.ostream.write(EOF_CHAR)
self.ostream.close()
with _get_mutex_context_from_loglevel_timeout(self.loglevel_timeout):
self.ostream.write(EOF_CHAR)
self.ostream.close()
self.closed = True
super().close()

Expand Down
Loading