Skip to content

Commit

Permalink
Run init of app.storage.general as early as possible (#4355)
Browse files Browse the repository at this point in the history
When introducing Redis storage in #4074, the initialization of
persistent storage was made async to not block the main thread when
loading data. For that to work the init must be done async -- which
introduced #4352. This PR fixes the issue by moving the init of
`app.storage.general` from app startup to import time and using
`asyncio.run` to create a temporary event loop for the init.

---------

Co-authored-by: Falko Schindler <[email protected]>
  • Loading branch information
rodja and falkoschindler authored Feb 27, 2025
1 parent fc203e0 commit b71074e
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 5 deletions.
1 change: 0 additions & 1 deletion nicegui/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(self, **kwargs) -> None:
self._disconnect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
self._exception_handlers: List[Callable[..., Any]] = [log.exception]

self.on_startup(self.storage.general.initialize)
self.on_shutdown(self.storage.on_shutdown)

@property
Expand Down
4 changes: 2 additions & 2 deletions nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ async def __call__(self, scope, receive, send):


core.app = app = App(default_response_class=NiceGUIJSONResponse, lifespan=_lifespan)
# NOTE we use custom json module which wraps orjson
core.sio = sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*', json=json)
core.app.storage.general.initialize_sync()
core.sio = sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*', json=json) # custom orjson wrapper
sio_app = SocketIoApp(socketio_server=sio, socketio_path='/socket.io')
app.mount('/_nicegui_ws/', sio_app)

Expand Down
10 changes: 10 additions & 0 deletions nicegui/persistence/file_persistent_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ async def initialize(self) -> None:
except Exception:
log.warning(f'Could not load storage file {self.filepath}')

def initialize_sync(self) -> None:
try:
if self.filepath.exists():
data = json.loads(self.filepath.read_text(encoding=self.encoding))
else:
data = {}
self.update(data)
except Exception:
log.warning(f'Could not load storage file {self.filepath}')

def backup(self) -> None:
"""Back up the data to the given file path."""
if not self.filepath.exists():
Expand Down
4 changes: 4 additions & 0 deletions nicegui/persistence/persistent_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ class PersistentDict(observables.ObservableDict, abc.ABC):
async def initialize(self) -> None:
"""Load initial data from the persistence layer."""

@abc.abstractmethod
def initialize_sync(self) -> None:
"""Load initial data from the persistence layer in a synchronous context."""

async def close(self) -> None:
"""Clean up the persistence layer."""
27 changes: 25 additions & 2 deletions nicegui/persistence/redis_persistent_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .persistent_dict import PersistentDict

try:
import redis as redis_sync
import redis.asyncio as redis
optional_features.register('redis')
except ImportError:
Expand All @@ -14,6 +15,7 @@ class RedisPersistentDict(PersistentDict):
def __init__(self, *, url: str, id: str, key_prefix: str = 'nicegui:') -> None: # pylint: disable=redefined-builtin
if not optional_features.has('redis'):
raise ImportError('Redis is not installed. Please run "pip install nicegui[redis]".')
self.url = url
self.redis_client = redis.from_url(
url,
health_check_interval=10,
Expand All @@ -30,18 +32,39 @@ async def initialize(self) -> None:
try:
data = await self.redis_client.get(self.key)
self.update(json.loads(data) if data else {})
self._start_listening()
except Exception:
log.warning(f'Could not load data from Redis with key {self.key}')
await self.pubsub.subscribe(self.key + 'changes')

def initialize_sync(self) -> None:
"""Load initial data from Redis and start listening for changes in a synchronous context."""
with redis_sync.from_url(
self.url,
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True,
) as redis_client_sync:
try:
data = redis_client_sync.get(self.key)
self.update(json.loads(data) if data else {})
self._start_listening()
except Exception:
log.warning(f'Could not load data from Redis with key {self.key}')

def _start_listening(self) -> None:
async def listen():
await self.pubsub.subscribe(self.key + 'changes')
async for message in self.pubsub.listen():
if message['type'] == 'message':
new_data = json.loads(message['data'])
if new_data != self:
self.update(new_data)

background_tasks.create(listen(), name=f'redis-listen-{self.key}')
if core.loop and core.loop.is_running():
background_tasks.create(listen(), name=f'redis-listen-{self.key}')
else:
core.app.on_startup(listen())

def publish(self) -> None:
"""Publish the data to Redis and notify other instances."""
Expand Down

0 comments on commit b71074e

Please sign in to comment.