Skip to content

Commit

Permalink
Fix possible deadlock with "complete()" and async sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed Sep 2, 2023
1 parent f339282 commit 1d8101f
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

- Add a new ``context`` optional argument to ``logger.add()`` specifying ``multiprocessing`` context (like ``"spawn"`` or ``"fork"``) to be used internally instead of the default one (`#851 <https://github.com/Delgan/loguru/issues/851>`_).
- Add support for true colors on Windows using ANSI/VT console when available (`#934 <https://github.com/Delgan/loguru/issues/934>`_, thanks `@tunaflsh <https://github.com/tunaflsh>`_).
- Fix possible deadlock when calling ``logger.complete()`` with concurrent logging of an asynchronous sink (`#906 <https://github.com/Delgan/loguru/issues/906>`_).
- Fix file possibly rotating too early or too late when re-starting an application around midnight (`#894 <https://github.com/Delgan/loguru/issues/894>`_).
- Fix inverted ``"<hide>"`` and ``"<strike>"`` color tags (`#943 <https://github.com/Delgan/loguru/pull/943>`_, thanks `@tunaflsh <https://github.com/tunaflsh>`_).
- Fix possible untraceable errors raised when logging non-unpicklable ``Exception`` instances while using ``enqueue=True`` (`#329 <https://github.com/Delgan/loguru/issues/329>`_).
Expand Down
4 changes: 2 additions & 2 deletions loguru/_file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def stop(self):

self._terminate_file(is_rotating=False)

async def complete(self):
pass
def tasks_to_complete(self):
return []

def _create_path(self):
path = self._path.format_map({"time": FileDateFormatter()})
Expand Down
7 changes: 3 additions & 4 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,11 @@ def complete_queue(self):
self._confirmation_event.wait()
self._confirmation_event.clear()

async def complete_async(self):
def tasks_to_complete(self):
if self._enqueue and self._owner_process_pid != os.getpid():
return

return []
with self._protected_lock():
await self._sink.complete()
return self._sink.tasks_to_complete()

def update_format(self, level_id):
if not self._colorize or self._is_formatter_dynamic:
Expand Down
10 changes: 4 additions & 6 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,20 +1109,18 @@ def complete(self):
>>> process.join()
Message sent from the child
"""
tasks = []

with self._core.lock:
handlers = self._core.handlers.copy()
for handler in handlers.values():
handler.complete_queue()

logger = self
tasks.extend(handler.tasks_to_complete())

class AwaitableCompleter:
def __await__(self):
with logger._core.lock:
handlers = logger._core.handlers.copy()
for handler in handlers.values():
yield from handler.complete_async().__await__()
for task in tasks:
yield from task.__await__()

return AwaitableCompleter()

Expand Down
40 changes: 26 additions & 14 deletions loguru/_simple_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ def stop(self):
if self._stoppable:
self._stream.stop()

async def complete(self):
if self._completable:
await self._stream.complete()
def tasks_to_complete(self):
if not self._completable:
return []
return [self._stream.complete()]


class StandardSink:
Expand Down Expand Up @@ -52,8 +53,8 @@ def write(self, message):
def stop(self):
self._handler.close()

async def complete(self):
pass
def tasks_to_complete(self):
return []


class AsyncSink:
Expand Down Expand Up @@ -86,14 +87,25 @@ def stop(self):
for task in self._tasks:
task.cancel()

async def complete(self):
loop = get_running_loop()
def tasks_to_complete(self):
tasks = []
# To avoid errors due to "self._tasks" being mutated while iterated, the
# "tasks_to_complete()" method must be protected by the same lock as "write()" (which
# happens to be the handler lock). However, the tasks must not be awaited under the lock,
# as this could lead to a deadlock. Therefore, we first need to collect the tasks to
# complete, then return them so that they can be awaited outside of the lock.
for task in self._tasks:
if get_task_loop(task) is loop:
try:
await task
except Exception:
pass # Handled in "check_exception()"
tasks.append(self._complete_task(task))
return tasks

async def _complete_task(self, task):
loop = get_running_loop()
if get_task_loop(task) is not loop:
return
try:
await task
except Exception:
pass # Handled in "check_exception()"

def __getstate__(self):
state = self.__dict__.copy()
Expand All @@ -115,5 +127,5 @@ def write(self, message):
def stop(self):
pass

async def complete(self):
pass
def tasks_to_complete(self):
return []
63 changes: 62 additions & 1 deletion tests/test_coroutine_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,67 @@ async def complete():
assert err == ""


def test_complete_and_sink_write_concurrency():
count = 1000
n = 0

async def sink(message):
nonlocal n
n += 1

async def some_task():
for _ in range(count):
logger.info("Message")
await asyncio.sleep(0)

async def another_task():
for _ in range(count):
await logger.complete()
await asyncio.sleep(0)

async def main():
logger.remove()
logger.add(sink, catch=False)

await asyncio.gather(some_task(), another_task())

asyncio.run(main())

assert n == count


def test_complete_and_contextualize_concurrency():
called = False

async def main():
logging_event = asyncio.Event()
contextualize_event = asyncio.Event()

async def sink(message):
nonlocal called
logging_event.set()
await contextualize_event.wait()
called = True

async def logging_task():
logger.info("Message")
await logger.complete()

async def contextualize_task():
with logger.contextualize():
contextualize_event.set()
await logging_event.wait()

logger.remove()
logger.add(sink, catch=False)

await asyncio.gather(logging_task(), contextualize_task())

asyncio.run(main())

assert called


async def async_subworker(logger_):
logger_.info("Child")
await logger_.complete()
Expand All @@ -588,7 +649,7 @@ async def write(self, message):
self.output += message


def test_complete_with_sub_processes(monkeypatch, capsys):
def test_complete_with_sub_processes(capsys):
spawn_context = multiprocessing.get_context("spawn")

loop = asyncio.new_event_loop()
Expand Down

0 comments on commit 1d8101f

Please sign in to comment.