From ff2ddc9431f293d213561c834e1a171edf40b30a Mon Sep 17 00:00:00 2001 From: "m.kindritskiy" Date: Wed, 4 Dec 2024 14:57:26 +0200 Subject: [PATCH] Make EntityCache per request Since we can remove projects via UI, we need to update worker cache somehow: 1. We either need some mechanism to tell all workers to update its local cache 2. Or move cache to Redis 3. Or make cache per request (which mostly makes it useless) for now. We choose 3 option because it fixes the but and is the fastest to implement. Later we can move to 1 or 2 option if we need to or completely remove the cache. --- featureflags/http/repositories/flags.py | 6 +++--- featureflags/rpc/servicer.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/featureflags/http/repositories/flags.py b/featureflags/http/repositories/flags.py index 409522a..c8e25d1 100644 --- a/featureflags/http/repositories/flags.py +++ b/featureflags/http/repositories/flags.py @@ -82,8 +82,6 @@ def __init__( self._db_engine = db_engine self._graph_engine = graph_engine - self._entity_cache = EntityCache() - async def get_project_version(self, project: str) -> int: async with self._db_engine.acquire() as conn: return await select_scalar( @@ -99,11 +97,13 @@ async def prepare_project( Initialize project from request, create/update entities in the database. """ + entity_cache = EntityCache() + async with self._db_engine.acquire() as conn: await prepare_flags_project( request, conn=conn, - entity_cache=self._entity_cache, + entity_cache=entity_cache, ) async def load(self, request: PreloadFlagsRequest) -> PreloadFlagsResponse: diff --git a/featureflags/rpc/servicer.py b/featureflags/rpc/servicer.py index a189e8d..be93947 100644 --- a/featureflags/rpc/servicer.py +++ b/featureflags/rpc/servicer.py @@ -30,7 +30,6 @@ def __init__( db_engine: aiopg.sa.Engine, graph_engine: Engine, ) -> None: - self._entity_cache = EntityCache() self._flag_agg_stats = FlagAggStats() self._graph_engine = graph_engine self._db_engine = db_engine @@ -45,6 +44,7 @@ async def exchange(self, stream: Stream) -> None: @debug_cancellation @track async def Exchange(self, stream: Stream) -> None: # noqa: N802 + entity_cache = EntityCache() self._tasks.add(asyncio.current_task()) try: request: service_pb2.ExchangeRequest = await stream.recv_message() @@ -74,7 +74,7 @@ async def Exchange(self, stream: Stream) -> None: # noqa: N802 await add_statistics( request, conn=conn, - entity_cache=self._entity_cache, + entity_cache=entity_cache, flag_agg_stats=self._flag_agg_stats, ) version = await select_scalar(