From ac314d81bde50ef90c3119ec3502e5cda47a3fbc Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 8 Nov 2021 10:45:41 +0000 Subject: [PATCH 1/2] Trigger a reset failed state if invalidating cache fails #4009 --- flowmachine/flowmachine/core/query.py | 119 +++++++++++--------- flowmachine/flowmachine/core/query_state.py | 19 ++++ 2 files changed, 84 insertions(+), 54 deletions(-) diff --git a/flowmachine/flowmachine/core/query.py b/flowmachine/flowmachine/core/query.py index 4e9d5d923e..6720ec7ff2 100644 --- a/flowmachine/flowmachine/core/query.py +++ b/flowmachine/flowmachine/core/query.py @@ -792,64 +792,75 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True): log("Resetting state machine.") current_state, this_thread_is_owner = q_state_machine.reset() if this_thread_is_owner: - log("Reset state machine.") - con = get_db().engine - try: - log("Getting table reference.") - table_reference_to_this_query = self.get_table() - if table_reference_to_this_query is not self: - log("Invalidating table reference cache.") - table_reference_to_this_query.invalidate_db_cache( - cascade=cascade, drop=drop - ) # Remove any Table pointing as this query - except (ValueError, NotImplementedError) as e: - log("Query not stored - no table..") - pass # This cache record isn't actually stored - try: - log = partial(log, table_name=self.fully_qualified_table_name) - except NotImplementedError: - pass # Not a storable by default table try: - dep_ids = [ - rec[0] - for rec in get_db().fetch( - f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'" - ) - ] - with con.begin(): - con.execute( - "DELETE FROM cache.cached WHERE query_id=%s", (self.query_id,) - ) - log("Deleted cache record.") - if drop: - con.execute( - "DROP TABLE IF EXISTS {}".format( - self.fully_qualified_table_name - ) + log("Reset state machine.") + con = get_db().engine + try: + log("Getting table reference.") + table_reference_to_this_query = self.get_table() + if table_reference_to_this_query is not self: + log("Invalidating table reference cache.") + table_reference_to_this_query.invalidate_db_cache( + cascade=cascade, drop=drop + ) # Remove any Table pointing as this query + except (ValueError, NotImplementedError) as e: + log("Query not stored - no table..") + pass # This cache record isn't actually stored + try: + log = partial(log, table_name=self.fully_qualified_table_name) + except NotImplementedError: + pass # Not a storable by default table + try: + dep_ids = [ + rec[0] + for rec in get_db().fetch( + f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'" ) - log("Dropped cache table.") - - if cascade: - for dep_id in dep_ids: - dep = get_obj_or_stub(get_db(), dep_id) - log( - "Cascading to dependent.", - dependency=dep.fully_qualified_table_name, + ] + with con.begin(): + con.execute( + "DELETE FROM cache.cached WHERE query_id=%s", + (self.query_id,), ) - dep.invalidate_db_cache() + log("Deleted cache record.") + if drop: + con.execute( + "DROP TABLE IF EXISTS {}".format( + self.fully_qualified_table_name + ) + ) + log("Dropped cache table.") + + if cascade: + for dep_id in dep_ids: + dep = get_obj_or_stub(get_db(), dep_id) + log( + "Cascading to dependent.", + dependency=dep.fully_qualified_table_name, + ) + dep.invalidate_db_cache() + else: + log("Not cascading to dependents.") + except NotImplementedError: + logger.info("Table has no standard name.") + # Outside of cache schema table + if schema is not None: + full_name = "{}.{}".format(schema, name) + log("Dropping table outside cache schema.", table_name=full_name) else: - log("Not cascading to dependents.") - except NotImplementedError: - logger.info("Table has no standard name.") - # Outside of cache schema table - if schema is not None: - full_name = "{}.{}".format(schema, name) - else: - full_name = name - log("Dropping table outside cache schema.", table_name=full_name) - with con.begin(): - con.execute("DROP TABLE IF EXISTS {}".format(full_name)) - q_state_machine.finish_resetting() + full_name = name + with con.begin(): + con.execute("DROP TABLE IF EXISTS {}".format(full_name)) + q_state_machine.finish_resetting() + except Exception as exc: + logger.error( + "Query reset failed.", + query_id=self.query_id, + action="invalidate_db_cache", + exception=exc, + ) + q_state_machine.raise_error() + raise exc elif q_state_machine.is_resetting: log("Query is being reset from elsewhere, waiting for reset to finish.") while q_state_machine.is_resetting: diff --git a/flowmachine/flowmachine/core/query_state.py b/flowmachine/flowmachine/core/query_state.py index f1d770ff80..5457d96ad4 100644 --- a/flowmachine/flowmachine/core/query_state.py +++ b/flowmachine/flowmachine/core/query_state.py @@ -32,6 +32,7 @@ class QueryState(str, Enum): EXECUTING = ("executing", "is currently running") ERRORED = ("errored", "finished with an error") RESETTING = ("resetting", "is being reset") + RESET_FAILED = ("reset_failed", "error during reset") KNOWN = ("known", "is known, but has not yet been run") def __new__(cls, name, desc, **kwargs): @@ -78,6 +79,7 @@ class QueryStateMachine: - errored, when a query has been run but failed to succeed - cancelled, when execution was terminated by the user - resetting, when a previously run query is being purged from cache + - reset_failed, when an error occurred during reset When the query is in a queued, executing, or resetting state, methods which need to use the results of the query should wait. The `wait_until_complete` method @@ -139,6 +141,12 @@ def __init__(self, redis_client: StrictRedis, query_id: str, db_id: str): self.state_machine.on( QueryEvent.FINISH_RESET, QueryState.RESETTING, QueryState.KNOWN ) + self.state_machine.on( + QueryEvent.ERROR, QueryState.RESETTING, QueryState.RESET_FAILED + ) + self.state_machine.on( + QueryEvent.RESET, QueryState.RESET_FAILED, QueryState.RESETTING + ) @property def current_query_state(self) -> QueryState: @@ -206,6 +214,17 @@ def is_errored(self) -> bool: """ return self.current_query_state == QueryState.ERRORED + @property + def is_errored_in_reset(self) -> bool: + """ + Returns + ------- + bool + True if the query failed to reset with an error + + """ + return self.current_query_state == QueryState.RESET_FAILED + @property def is_resetting(self) -> bool: """ From 0360181954a8463cb4e749751b167a258766cca6 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Fri, 7 Jan 2022 22:56:33 +0000 Subject: [PATCH 2/2] Add to wait terminate states --- flowmachine/flowmachine/core/query_state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flowmachine/flowmachine/core/query_state.py b/flowmachine/flowmachine/core/query_state.py index 5457d96ad4..e4677e5a93 100644 --- a/flowmachine/flowmachine/core/query_state.py +++ b/flowmachine/flowmachine/core/query_state.py @@ -377,6 +377,10 @@ def wait_until_complete(self, sleep_duration=1): """ if self.is_executing or self.is_queued or self.is_resetting: while not ( - self.is_finished_executing or self.is_cancelled or self.is_known + self.is_finished_executing + or self.is_cancelled + or self.is_known + or self.is_errored + or self.is_errored_in_reset ): _sleep(sleep_duration)