Skip to content

Commit 25610ca

Browse files
committed
lint
1 parent ebe7f79 commit 25610ca

File tree

1 file changed

+21
-48
lines changed

1 file changed

+21
-48
lines changed

flowmachine/flowmachine/core/cache.py

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ def write_query_to_cache(
199199
con = connection.engine
200200
with con.begin() as trans:
201201
try:
202-
plan_time = run_ops_list_and_return_execution_time(query_ddl_ops, trans)
202+
plan_time = run_ops_list_and_return_execution_time(
203+
query_ddl_ops, trans
204+
)
203205
logger.debug("Executed queries.")
204206
except Exception as exc:
205207
q_state_machine.raise_error()
@@ -278,7 +280,6 @@ def write_cache_metadata(
278280
logger.debug(f"Can't pickle ({e}), attempting to cache anyway.")
279281
pass
280282

281-
<<<<<<< HEAD
282283
cache_record_insert = """
283284
INSERT INTO cache.cached
284285
(query_id, version, query, created, access_count, last_accessed, compute_time,
@@ -297,6 +298,7 @@ def write_cache_metadata(
297298
psycopg2.Binary(self_storage),
298299
),
299300
)
301+
logger.debug("Touching cache.", query_id=query.query_id, query=str(query))
300302
connection.exec_driver_sql(
301303
"SELECT touch_cache(%(ident)s);", dict(ident=query.query_id)
302304
)
@@ -310,39 +312,6 @@ def write_cache_metadata(
310312
logger.debug(f"{query.fully_qualified_table_name} added to cache.")
311313
else:
312314
logger.debug(f"Touched cache for {query.fully_qualified_table_name}.")
313-
=======
314-
with con.begin():
315-
cache_record_insert = """
316-
INSERT INTO cache.cached
317-
(query_id, version, query, created, access_count, last_accessed, compute_time,
318-
cache_score_multiplier, class, schema, tablename, obj)
319-
VALUES (%s, %s, %s, NOW(), 0, NOW(), %s, 0, %s, %s, %s, %s)
320-
ON CONFLICT (query_id) DO UPDATE SET last_accessed = NOW();"""
321-
con.execute(
322-
cache_record_insert,
323-
(
324-
query.query_id,
325-
__version__,
326-
query._make_query(),
327-
compute_time,
328-
query.__class__.__name__,
329-
*query.fully_qualified_table_name.split("."),
330-
psycopg2.Binary(self_storage),
331-
),
332-
)
333-
logger.debug("Touching cache.", query_id=query.query_id, query=str(query))
334-
con.execute("SELECT touch_cache(%s);", query.query_id)
335-
336-
if not in_cache:
337-
for dep in query._get_stored_dependencies(exclude_self=True):
338-
con.execute(
339-
"INSERT INTO cache.dependencies values (%s, %s) ON CONFLICT DO NOTHING",
340-
(query.query_id, dep.query_id),
341-
)
342-
logger.debug(f"{query.fully_qualified_table_name} added to cache.")
343-
else:
344-
logger.debug(f"Touched cache for {query.fully_qualified_table_name}.")
345-
>>>>>>> 5b45f1582 (Match cache half life)
346315
except NotImplementedError:
347316
logger.debug("Table has no standard name.")
348317

@@ -363,17 +332,13 @@ def touch_cache(connection: "Connection", query_id: str) -> float:
363332
The new cache score
364333
"""
365334
try:
366-
<<<<<<< HEAD
335+
logger.debug("Touching cache.", query_id=query_id)
367336
with connection.engine.begin() as trans:
368337
return float(
369338
trans.exec_driver_sql(f"SELECT touch_cache('{query_id}')").fetchall()[
370339
0
371340
][0]
372341
)
373-
=======
374-
logger.debug("Touching cache.", query_id=query_id)
375-
return float(connection.fetch(f"SELECT touch_cache('{query_id}')")[0][0])
376-
>>>>>>> 5b45f1582 (Match cache half life)
377342
except (IndexError, psycopg2.InternalError):
378343
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")
379344

@@ -515,6 +480,18 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
515480
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")
516481

517482

483+
def _get_protected_classes():
484+
from flowmachine.core.events_table import events_table_map
485+
from flowmachine.core.infrastructure_table import infrastructure_table_map
486+
487+
return [
488+
"Table",
489+
"GeoTable",
490+
*[cls.__name__ for cls in events_table_map.values()],
491+
*[cls.__name__ for cls in infrastructure_table_map.values()],
492+
]
493+
494+
518495
def get_cached_query_objects_ordered_by_score(
519496
connection: "Connection",
520497
protected_period: Optional[int] = None,
@@ -542,11 +519,9 @@ def get_cached_query_objects_ordered_by_score(
542519
if protected_period is not None
543520
else " AND NOW()-created > (cache_protected_period()*INTERVAL '1 seconds')"
544521
)
545-
qry = f"""
546-
WITH no_score AS (SELECT array_agg(object_class) as classes FROM cache.zero_cache)
547-
SELECT query_id, table_size(tablename, schema) as table_size
522+
qry = f"""SELECT query_id, table_size(tablename, schema) as table_size
548523
FROM cache.cached
549-
WHERE NOT (cached.class=ANY((SELECT classes FROM no_score)::TEXT[]))
524+
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))
550525
{protected_period_clause}
551526
ORDER BY cache_score(cache_score_multiplier, compute_time, table_size(tablename, schema)) ASC
552527
"""
@@ -726,11 +701,9 @@ def get_size_of_cache(connection: "Connection") -> int:
726701
Number of bytes in total used by cache tables
727702
728703
"""
729-
sql = f"""
730-
WITH no_score AS (SELECT array_agg(object_class) as classes FROM cache.zero_cache)
731-
SELECT sum(table_size(tablename, schema)) as total_bytes
704+
sql = f"""SELECT sum(table_size(tablename, schema)) as total_bytes
732705
FROM cache.cached
733-
WHERE NOT (cached.class=ANY((SELECT classes FROM no_score)::TEXT[]))"""
706+
WHERE NOT (cached.class=ANY(ARRAY{_get_protected_classes()}))"""
734707
cache_bytes = connection.fetch(sql)[0][0]
735708
return 0 if cache_bytes is None else int(cache_bytes)
736709

0 commit comments

Comments
 (0)