-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow the lifetime of the Connection thread to be tied to an event loop #112
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,13 +47,15 @@ def __init__( | |
connector: Callable[[], sqlite3.Connection], | ||
iter_chunk_size: int, | ||
loop: Optional[asyncio.AbstractEventLoop] = None, | ||
parent_loop: Optional[asyncio.AbstractEventLoop] = None, | ||
) -> None: | ||
super().__init__() | ||
self._running = True | ||
self._connection: Optional[sqlite3.Connection] = None | ||
self._connector = connector | ||
self._tx: Queue = Queue() | ||
self._iter_chunk_size = iter_chunk_size | ||
self._parent_loop = parent_loop | ||
|
||
if loop is not None: | ||
warn( | ||
|
@@ -87,7 +89,7 @@ def run(self) -> None: | |
|
||
:meta private: | ||
""" | ||
while True: | ||
while self._parent_loop is None or not self._parent_loop.is_closed(): | ||
# Continues running until all queue items are processed, | ||
# even after connection is closed (so we can finalize all | ||
# futures) | ||
|
@@ -116,6 +118,19 @@ def set_exception(fut, e): | |
|
||
get_loop(future).call_soon_threadsafe(set_exception, future, e) | ||
|
||
# Clean up within this thread only if the parent event loop exits ungracefully | ||
if not self._running or self._connection is None or self._parent_loop is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't obvious as to when it will return or run the cleanup below. Is there a way to always try cleaning up? |
||
return | ||
|
||
try: | ||
self._conn.close() | ||
except Exception: | ||
LOG.info("exception occurred while closing connection") | ||
raise | ||
finally: | ||
self._running = False | ||
self._connection = None | ||
|
||
async def _execute(self, fn, *args, **kwargs): | ||
"""Queue a function with the given arguments for execution.""" | ||
if not self._running or not self._connection: | ||
|
@@ -376,6 +391,7 @@ def connect( | |
*, | ||
iter_chunk_size=64, | ||
loop: Optional[asyncio.AbstractEventLoop] = None, | ||
parent_loop: Optional[asyncio.AbstractEventLoop] = None, | ||
**kwargs: Any | ||
) -> Connection: | ||
"""Create and return a connection proxy to the sqlite database.""" | ||
|
@@ -396,4 +412,4 @@ def connector() -> sqlite3.Connection: | |
|
||
return sqlite3.connect(loc, **kwargs) | ||
|
||
return Connection(connector, iter_chunk_size) | ||
return Connection(connector, iter_chunk_size, parent_loop=parent_loop) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import asyncio | ||
import sqlite3 | ||
import sys | ||
import time | ||
from pathlib import Path | ||
from sqlite3 import OperationalError | ||
from threading import Thread | ||
|
@@ -465,3 +466,20 @@ async def test_backup_py36(self): | |
) as db2: | ||
with self.assertRaisesRegex(RuntimeError, "backup().+3.7"): | ||
await db1.backup(db2) | ||
|
||
async def test_no_close_with_parent_event_loop(self): | ||
def runner(): | ||
loop = asyncio.new_event_loop() | ||
db = loop.run_until_complete(aiosqlite.connect(TEST_DB, parent_loop=loop)) | ||
loop.close() | ||
|
||
# Wait long enough for the queue `get` timeout to elapse | ||
time.sleep(0.2) | ||
|
||
# Database has been closed | ||
with self.assertRaises(ValueError): | ||
db.in_transaction | ||
Comment on lines
+480
to
+481
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe validate |
||
|
||
thread = Thread(target=runner) | ||
thread.start() | ||
thread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason or benefit to explicitly passing the parent loop (something that was explicitly deprecated in previous versions), rather than just using
get_loop
to get the active loop when the connection object is made? Usingget_loop
instead will prevent errors if the non-active loop is passed in, and also provide more consistent behavior in therun()
method.I think I would also prefer tracking this as
self._loop
for brevity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it as an explicit parameter to preserve existing behavior and because
get_loop
may not immediately run, like ifself._tx
is never filled with any futures, and it would pick the first-seen event loop in the case of multiple loops, which may not be desired.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
get_loop
is called within the_connect
method, then there's never a chance that it would get the wrong event loop. I'd overall prefer to have consistent behavior with minimal parameters than have more parameters that result in a higher number of distinct codepaths.