Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mutual exclusion for synchronized stream access in logging handlers and CLPLoglevelTimeout (fixes #55). #59

Merged
merged 12 commits into from
Feb 28, 2025
36 changes: 31 additions & 5 deletions src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pathlib import Path
from queue import Empty, Queue
from signal import SIGINT, signal, SIGTERM
from threading import Thread, Timer
from threading import Lock, Thread, Timer
from types import FrameType
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union

Expand Down Expand Up @@ -89,6 +89,32 @@ def _encode_log_event(msg: str, last_timestamp_ms: int) -> Tuple[bytearray, int]
return clp_msg, timestamp_ms


class _LockedStream:
"""
A wrapper class for output streams that locks before `write` or `flush`.
"""

def __init__(self, stream: Union[ZstdCompressionWriter, IO[bytes]]) -> None:
self._stream: Union[ZstdCompressionWriter, IO[bytes]] = stream
self._lock: Lock = Lock()

def write(self, b: Union[bytes, bytearray]) -> int:
with self._lock:
return self._stream.write(b)

def flush(self, flush_mode: Optional[int] = None) -> None:
with self._lock:
if flush_mode is None:
self._stream.flush()
else:
assert isinstance(self._stream, ZstdCompressionWriter)
self._stream.flush(flush_mode)

def close(self) -> None:
with self._lock:
self._stream.close()


class CLPBaseHandler(logging.Handler, metaclass=ABCMeta):
def __init__(self) -> None:
super().__init__()
Expand Down Expand Up @@ -223,11 +249,11 @@ def __init__(
self.timeout_fn: Callable[[], None] = timeout_fn
self.next_hard_timeout_ts: int = ULONG_MAX
self.min_soft_timeout_delta: int = ULONG_MAX
self.ostream: Optional[Union[ZstdCompressionWriter, IO[bytes]]] = None
self.ostream: Optional[_LockedStream] = None
self.hard_timeout_thread: Optional[Timer] = None
self.soft_timeout_thread: Optional[Timer] = None

def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes]]) -> None:
def set_ostream(self, ostream: _LockedStream) -> None:
self.ostream = ostream

def timeout(self) -> None:
Expand Down Expand Up @@ -421,7 +447,7 @@ def _aggregator(

with log_path.open("ab") as log:
# Since the compression may be disabled, context manager is not used
ostream: Union[ZstdCompressionWriter, IO[bytes]] = (
ostream: _LockedStream = _LockedStream(
cctx.stream_writer(log) if enable_compression else log
)

Expand Down Expand Up @@ -702,7 +728,7 @@ class CLPStreamHandler(CLPBaseHandler):

def init(self, stream: IO[bytes]) -> None:
self.cctx: ZstdCompressor = ZstdCompressor()
self.ostream: Union[ZstdCompressionWriter, IO[bytes]] = (
self.ostream: _LockedStream = _LockedStream(
self.cctx.stream_writer(stream) if self.enable_compression else stream
)
self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate
Expand Down