Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed Jan 10, 2024
1 parent 91a21e0 commit 315a155
Show file tree
Hide file tree
Showing 9 changed files with 831 additions and 126 deletions.
47 changes: 31 additions & 16 deletions loguru/_error_interceptor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import sys
import traceback

from ._locks_machinery import create_error_lock


class ErrorInterceptor:
def __init__(self, should_catch, handler_id):
self._should_catch = should_catch
self._handler_id = handler_id
self._lock = create_error_lock()

def should_catch(self):
return self._should_catch
Expand All @@ -14,21 +17,33 @@ def print(self, record=None, *, exception=None):
if not sys.stderr:
return

if exception is None:
type_, value, traceback_ = sys.exc_info()
else:
type_, value, traceback_ = (type(exception), exception, exception.__traceback__)
# The Lock prevents concurrent writes to standard error. Also, it's registered into the
# machinery to make sure no fork occurs while internal Lock of "sys.stderr" is acquired.
with self._lock:
if exception is None:
type_, value, traceback_ = sys.exc_info()
else:
type_, value, traceback_ = (type(exception), exception, exception.__traceback__)

try:
sys.stderr.write("--- Logging error in Loguru Handler #%d ---\n" % self._handler_id)
try:
record_repr = str(record)
except Exception:
record_repr = "/!\\ Unprintable record /!\\"
sys.stderr.write("Record was: %s\n" % record_repr)
traceback.print_exception(type_, value, traceback_, None, sys.stderr)
sys.stderr.write("--- End of logging error ---\n")
except OSError:
pass
finally:
del type_, value, traceback_
sys.stderr.write("--- Logging error in Loguru Handler #%d ---\n" % self._handler_id)
try:
record_repr = str(record)
except Exception:
record_repr = "/!\\ Unprintable record /!\\"
sys.stderr.write("Record was: %s\n" % record_repr)
traceback.print_exception(type_, value, traceback_, None, sys.stderr)
sys.stderr.write("--- End of logging error ---\n")
except OSError:
pass
finally:
del type_, value, traceback_

def __getstate__(self):
state = self.__dict__.copy()
state["_lock"] = None
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._lock = create_error_lock()
105 changes: 56 additions & 49 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ._colorizer import Colorizer
from ._locks_machinery import create_handler_lock
from ._record_queue import RecordQueue


def prepare_colored_format(format_, ansi_level):
Expand Down Expand Up @@ -44,8 +45,7 @@ def __init__(
multiprocessing_context,
error_interceptor,
exception_formatter,
id_,
levels_ansi_codes
id_
):
self._name = name
self._sink = sink
Expand All @@ -60,69 +60,73 @@ def __init__(
self._error_interceptor = error_interceptor
self._exception_formatter = exception_formatter
self._id = id_
self._levels_ansi_codes = levels_ansi_codes # Warning, reference shared among handlers
self._levels_ansi_codes = {}

self._decolorized_format = None
self._precolorized_formats = {}
self._memoize_dynamic_format = None

self._stopped = False
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
self._thread_locals = threading.local()
self._queue = None
self._queue_lock = None
self._confirmation_event = None
self._confirmation_lock = None
self._owner_process_pid = None
self._thread = None
self._writer_thread = None

# We can't use "object()" because their identity doesn't survive pickling.
self._confirmation_sentinel = True
self._stop_sentinel = None

if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
else:
self._memoize_dynamic_format = memoize(prepare_stripped_format)
else:
if self._colorize:
for level_name in self._levels_ansi_codes:
self.update_format(level_name)
else:
self._decolorized_format = self._formatter.strip()
elif not self._colorize:
self._decolorized_format = self._formatter.strip()

if self._enqueue:
if self._multiprocessing_context is None:
self._queue = multiprocessing.SimpleQueue()
self._queue = RecordQueue(
self._multiprocessing_context, self._error_interceptor, self._id
)
self._confirmation_event = multiprocessing.Event()
self._confirmation_lock = multiprocessing.Lock()
else:
self._queue = self._multiprocessing_context.SimpleQueue()
self._queue = RecordQueue(
self._multiprocessing_context, self._error_interceptor, self._id
)
self._confirmation_event = self._multiprocessing_context.Event()
self._confirmation_lock = self._multiprocessing_context.Lock()
self._queue_lock = create_handler_lock()
self._owner_process_pid = os.getpid()
self._thread = Thread(
target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
self._writer_thread = Thread(
target=self._threaded_writer, daemon=True, name="loguru-writer-%d" % self._id
)
self._thread.start()
self._writer_thread.start()

def __repr__(self):
return "(id=%d, level=%d, sink=%s)" % (self._id, self._levelno, self._name)

@contextmanager
def _protected_lock(self):
"""Acquire the lock, but fail fast if its already acquired by the current thread."""
if getattr(self._lock_acquired, "acquired", False):
if getattr(self._thread_locals, "lock_acquired", False):
raise RuntimeError(
"Could not acquire internal lock because it was already in use (deadlock avoided). "
"This likely happened because the logger was re-used inside a sink, a signal "
"handler or a '__del__' method. This is not permitted because the logger and its "
"handlers are not re-entrant."
)
self._lock_acquired.acquired = True
self._thread_locals.lock_acquired = True
try:
with self._lock:
yield
finally:
self._lock_acquired.acquired = False
self._thread_locals.lock_acquired = False

def emit(self, record, level_id, from_decorator, is_raw, colored_message):
try:
Expand Down Expand Up @@ -214,10 +218,15 @@ def stop(self):
self._stopped = True
if self._enqueue:
if self._owner_process_pid != os.getpid():
self._queue.stop()
return
self._queue.put(None)
self._thread.join()
if hasattr(self._queue, "close"):
# Although we're not waiting for any confirmation, we still need to acquire
# the underlying Lock to ensure that not two processes try to stop and complete
# the queue at the same time (would possibly cause deadlock).
with self._confirmation_lock:
self._queue.put_final(self._stop_sentinel)
self._writer_thread.join()
self._queue.stop()
self._queue.close()

self._sink.stop()
Expand All @@ -227,7 +236,10 @@ def complete_queue(self):
return

with self._confirmation_lock:
self._queue.put(True)
if self._queue.is_closed():
return
with self._protected_lock():
self._queue.put(self._confirmation_sentinel)
self._confirmation_event.wait()
self._confirmation_event.clear()

Expand All @@ -238,11 +250,11 @@ def tasks_to_complete(self):
with lock:
return self._sink.tasks_to_complete()

def update_format(self, level_id):
if not self._colorize or self._is_formatter_dynamic:
return
ansi_code = self._levels_ansi_codes[level_id]
self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)
def update_format(self, level_id, ansi_code):
with self._protected_lock():
self._levels_ansi_codes[level_id] = ansi_code
if self._colorize and not self._is_formatter_dynamic:
self._precolorized_formats[level_id] = self._formatter.colorize(ansi_code)

@property
def levelno(self):
Expand Down Expand Up @@ -287,53 +299,48 @@ def _serialize_record(text, record):

return json.dumps(serializable, default=str, ensure_ascii=False) + "\n"

def _queued_writer(self):
message = None
queue = self._queue

# We need to use a lock to protect sink during fork.
# Particularly, writing to stderr may lead to deadlock in child process.
lock = self._queue_lock

def _threaded_writer(self):
while True:
try:
message = queue.get()
message = self._queue.get()
except Exception:
with lock:
with self._queue_lock:
self._error_interceptor.print(None)
continue

if message is None:
if message is self._stop_sentinel:
break

if message is True:
if message is self._confirmation_sentinel:
self._confirmation_event.set()
continue

with lock:
try:
try:
# We need to use a registered Lock to protect sink during fork. In particular, if
# this thread is writing to stderr while the main thread is forked, the lock
# internally used by stderr might be copied while being in locked state. That would
# cause a deadlock in the child process.
with self._queue_lock:
self._sink.write(message)
except Exception:
self._error_interceptor.print(message.record)
except Exception:
self._error_interceptor.print(message.record)

def __getstate__(self):
state = self.__dict__.copy()
state["_lock"] = None
state["_lock_acquired"] = None
state["_thread_locals"] = None
state["_memoize_dynamic_format"] = None
if self._enqueue:
state["_sink"] = None
state["_thread"] = None
state["_writer_thread"] = None
state["_owner_process"] = None
state["_queue_lock"] = None
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
if self._enqueue:
self._queue_lock = create_handler_lock()
self._thread_locals = threading.local()
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
Expand Down
Loading

0 comments on commit 315a155

Please sign in to comment.