diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e1e20976..2b7d7966 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,7 @@ - Add a new ``context`` optional argument to ``logger.add()`` specifying ``multiprocessing`` context (like ``"spawn"`` or ``"fork"``) to be used internally instead of the default one (`#851 `_). - Add support for true colors on Windows using ANSI/VT console when available (`#934 `_, thanks `@tunaflsh `_). +- Fix possible deadlock when calling ``logger.complete()`` with concurrent logging of an asynchronous sink (`#906 `_). - Fix file possibly rotating too early or too late when re-starting an application around midnight (`#894 `_). - Fix inverted ``""`` and ``""`` color tags (`#943 `_, thanks `@tunaflsh `_). - Fix possible untraceable errors raised when logging non-unpicklable ``Exception`` instances while using ``enqueue=True`` (`#329 `_). diff --git a/loguru/_file_sink.py b/loguru/_file_sink.py index 564e93f3..bdc6ccd4 100644 --- a/loguru/_file_sink.py +++ b/loguru/_file_sink.py @@ -211,8 +211,8 @@ def stop(self): self._terminate_file(is_rotating=False) - async def complete(self): - pass + def tasks_to_complete(self): + return [] def _create_path(self): path = self._path.format_map({"time": FileDateFormatter()}) diff --git a/loguru/_handler.py b/loguru/_handler.py index e6545a90..6b684d21 100644 --- a/loguru/_handler.py +++ b/loguru/_handler.py @@ -69,6 +69,7 @@ def __init__( self._lock = create_handler_lock() self._lock_acquired = threading.local() self._queue = None + self._queue_lock = None self._confirmation_event = None self._confirmation_lock = None self._owner_process_pid = None @@ -88,6 +89,7 @@ def __init__( if self._enqueue: self._queue = self._multiprocessing_context.SimpleQueue() + self._queue_lock = create_handler_lock() self._confirmation_event = self._multiprocessing_context.Event() self._confirmation_lock = self._multiprocessing_context.Lock() self._owner_process_pid = os.getpid() @@ -223,12 +225,12 @@ def complete_queue(self): self._confirmation_event.wait() self._confirmation_event.clear() - async def complete_async(self): + def tasks_to_complete(self): if self._enqueue and self._owner_process_pid != os.getpid(): - return - - with self._protected_lock(): - await self._sink.complete() + return [] + lock = self._queue_lock if self._enqueue else self._protected_lock() + with lock: + return self._sink.tasks_to_complete() def update_format(self, level_id): if not self._colorize or self._is_formatter_dynamic: @@ -285,7 +287,7 @@ def _queued_writer(self): # We need to use a lock to protect sink during fork. # Particularly, writing to stderr may lead to deadlock in child process. - lock = create_handler_lock() + lock = self._queue_lock while True: try: @@ -317,12 +319,15 @@ def __getstate__(self): state["_sink"] = None state["_thread"] = None state["_owner_process"] = None + state["_queue_lock"] = None return state def __setstate__(self, state): self.__dict__.update(state) self._lock = create_handler_lock() self._lock_acquired = threading.local() + if self._enqueue: + self._queue_lock = create_handler_lock() if self._is_formatter_dynamic: if self._colorize: self._memoize_dynamic_format = memoize(prepare_colored_format) diff --git a/loguru/_logger.py b/loguru/_logger.py index b2516494..89146432 100644 --- a/loguru/_logger.py +++ b/loguru/_logger.py @@ -1109,20 +1109,18 @@ def complete(self): >>> process.join() Message sent from the child """ + tasks = [] with self._core.lock: handlers = self._core.handlers.copy() for handler in handlers.values(): handler.complete_queue() - - logger = self + tasks.extend(handler.tasks_to_complete()) class AwaitableCompleter: def __await__(self): - with logger._core.lock: - handlers = logger._core.handlers.copy() - for handler in handlers.values(): - yield from handler.complete_async().__await__() + for task in tasks: + yield from task.__await__() return AwaitableCompleter() diff --git a/loguru/_simple_sinks.py b/loguru/_simple_sinks.py index 46d4490c..068f1e13 100644 --- a/loguru/_simple_sinks.py +++ b/loguru/_simple_sinks.py @@ -21,9 +21,10 @@ def stop(self): if self._stoppable: self._stream.stop() - async def complete(self): - if self._completable: - await self._stream.complete() + def tasks_to_complete(self): + if not self._completable: + return [] + return [self._stream.complete()] class StandardSink: @@ -52,8 +53,8 @@ def write(self, message): def stop(self): self._handler.close() - async def complete(self): - pass + def tasks_to_complete(self): + return [] class AsyncSink: @@ -86,14 +87,22 @@ def stop(self): for task in self._tasks: task.cancel() - async def complete(self): + def tasks_to_complete(self): + # To avoid errors due to "self._tasks" being mutated while iterated, the + # "tasks_to_complete()" method must be protected by the same lock as "write()" (which + # happens to be the handler lock). However, the tasks must not be awaited while the lock is + # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks + # to complete, then return them so that they can be awaited outside of the lock. + return [self._complete_task(task) for task in self._tasks] + + async def _complete_task(self, task): loop = get_running_loop() - for task in self._tasks: - if get_task_loop(task) is loop: - try: - await task - except Exception: - pass # Handled in "check_exception()" + if get_task_loop(task) is not loop: + return + try: + await task + except Exception: + pass # Handled in "check_exception()" def __getstate__(self): state = self.__dict__.copy() @@ -115,5 +124,5 @@ def write(self, message): def stop(self): pass - async def complete(self): - pass + def tasks_to_complete(self): + return [] diff --git a/tests/test_coroutine_sink.py b/tests/test_coroutine_sink.py index 373b7a2d..8573bc47 100644 --- a/tests/test_coroutine_sink.py +++ b/tests/test_coroutine_sink.py @@ -565,6 +565,67 @@ async def complete(): assert err == "" +def test_complete_and_sink_write_concurrency(): + count = 1000 + n = 0 + + async def sink(message): + nonlocal n + n += 1 + + async def some_task(): + for _ in range(count): + logger.info("Message") + await asyncio.sleep(0) + + async def another_task(): + for _ in range(count): + await logger.complete() + await asyncio.sleep(0) + + async def main(): + logger.remove() + logger.add(sink, catch=False) + + await asyncio.gather(some_task(), another_task()) + + asyncio.run(main()) + + assert n == count + + +def test_complete_and_contextualize_concurrency(): + called = False + + async def main(): + logging_event = asyncio.Event() + contextualize_event = asyncio.Event() + + async def sink(message): + nonlocal called + logging_event.set() + await contextualize_event.wait() + called = True + + async def logging_task(): + logger.info("Message") + await logger.complete() + + async def contextualize_task(): + with logger.contextualize(): + contextualize_event.set() + await logging_event.wait() + + logger.remove() + logger.add(sink, catch=False) + + await asyncio.gather(logging_task(), contextualize_task()) + + asyncio.run(main()) + + assert called + + async def async_subworker(logger_): logger_.info("Child") await logger_.complete() @@ -588,7 +649,7 @@ async def write(self, message): self.output += message -def test_complete_with_sub_processes(monkeypatch, capsys): +def test_complete_with_sub_processes(capsys): spawn_context = multiprocessing.get_context("spawn") loop = asyncio.new_event_loop()