Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 65 additions & 54 deletions flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,64 +792,75 @@ def invalidate_db_cache(self, name=None, schema=None, cascade=True, drop=True):
log("Resetting state machine.")
current_state, this_thread_is_owner = q_state_machine.reset()
if this_thread_is_owner:
log("Reset state machine.")
con = get_db().engine
try:
log("Getting table reference.")
table_reference_to_this_query = self.get_table()
if table_reference_to_this_query is not self:
log("Invalidating table reference cache.")
table_reference_to_this_query.invalidate_db_cache(
cascade=cascade, drop=drop
) # Remove any Table pointing as this query
except (ValueError, NotImplementedError) as e:
log("Query not stored - no table..")
pass # This cache record isn't actually stored
try:
log = partial(log, table_name=self.fully_qualified_table_name)
except NotImplementedError:
pass # Not a storable by default table
try:
dep_ids = [
rec[0]
for rec in get_db().fetch(
f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'"
)
]
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s", (self.query_id,)
)
log("Deleted cache record.")
if drop:
con.execute(
"DROP TABLE IF EXISTS {}".format(
self.fully_qualified_table_name
)
log("Reset state machine.")
con = get_db().engine
try:
log("Getting table reference.")
table_reference_to_this_query = self.get_table()
if table_reference_to_this_query is not self:
log("Invalidating table reference cache.")
table_reference_to_this_query.invalidate_db_cache(
cascade=cascade, drop=drop
) # Remove any Table pointing as this query
except (ValueError, NotImplementedError) as e:
log("Query not stored - no table..")
pass # This cache record isn't actually stored
try:
log = partial(log, table_name=self.fully_qualified_table_name)
except NotImplementedError:
pass # Not a storable by default table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a log here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No harm in it for sure

try:
dep_ids = [
rec[0]
for rec in get_db().fetch(
f"SELECT query_id FROM cache.dependencies WHERE depends_on='{self.query_id}'"
)
log("Dropped cache table.")

if cascade:
for dep_id in dep_ids:
dep = get_obj_or_stub(get_db(), dep_id)
log(
"Cascading to dependent.",
dependency=dep.fully_qualified_table_name,
]
with con.begin():
con.execute(
"DELETE FROM cache.cached WHERE query_id=%s",
(self.query_id,),
)
dep.invalidate_db_cache()
log("Deleted cache record.")
if drop:
con.execute(
"DROP TABLE IF EXISTS {}".format(
self.fully_qualified_table_name
)
)
log("Dropped cache table.")

if cascade:
for dep_id in dep_ids:
dep = get_obj_or_stub(get_db(), dep_id)
log(
"Cascading to dependent.",
dependency=dep.fully_qualified_table_name,
)
dep.invalidate_db_cache()
else:
log("Not cascading to dependents.")
except NotImplementedError:
logger.info("Table has no standard name.")
# Outside of cache schema table
if schema is not None:
full_name = "{}.{}".format(schema, name)
log("Dropping table outside cache schema.", table_name=full_name)
else:
log("Not cascading to dependents.")
except NotImplementedError:
logger.info("Table has no standard name.")
# Outside of cache schema table
if schema is not None:
full_name = "{}.{}".format(schema, name)
else:
full_name = name
log("Dropping table outside cache schema.", table_name=full_name)
with con.begin():
con.execute("DROP TABLE IF EXISTS {}".format(full_name))
q_state_machine.finish_resetting()
full_name = name
with con.begin():
con.execute("DROP TABLE IF EXISTS {}".format(full_name))
q_state_machine.finish_resetting()
except Exception as exc:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a BaseException to catch shutdown as well

logger.error(
"Query reset failed.",
query_id=self.query_id,
action="invalidate_db_cache",
exception=exc,
)
q_state_machine.raise_error()
raise exc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a potential for hang here, if a query can reset from elsewhere? This thread throws Exception, some cache service sees it and tries again, this thrown Exception, ect....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something to be wary of - should have enough information to avoid that because we arrive in an explicit reset failed state.

elif q_state_machine.is_resetting:
log("Query is being reset from elsewhere, waiting for reset to finish.")
while q_state_machine.is_resetting:
Expand Down
25 changes: 24 additions & 1 deletion flowmachine/flowmachine/core/query_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class QueryState(str, Enum):
EXECUTING = ("executing", "is currently running")
ERRORED = ("errored", "finished with an error")
RESETTING = ("resetting", "is being reset")
RESET_FAILED = ("reset_failed", "error during reset")
KNOWN = ("known", "is known, but has not yet been run")

def __new__(cls, name, desc, **kwargs):
Expand Down Expand Up @@ -78,6 +79,7 @@ class QueryStateMachine:
- errored, when a query has been run but failed to succeed
- cancelled, when execution was terminated by the user
- resetting, when a previously run query is being purged from cache
- reset_failed, when an error occurred during reset

When the query is in a queued, executing, or resetting state, methods which need
to use the results of the query should wait. The `wait_until_complete` method
Expand Down Expand Up @@ -139,6 +141,12 @@ def __init__(self, redis_client: StrictRedis, query_id: str, db_id: str):
self.state_machine.on(
QueryEvent.FINISH_RESET, QueryState.RESETTING, QueryState.KNOWN
)
self.state_machine.on(
QueryEvent.ERROR, QueryState.RESETTING, QueryState.RESET_FAILED
)
self.state_machine.on(
QueryEvent.RESET, QueryState.RESET_FAILED, QueryState.RESETTING
)

@property
def current_query_state(self) -> QueryState:
Expand Down Expand Up @@ -206,6 +214,17 @@ def is_errored(self) -> bool:
"""
return self.current_query_state == QueryState.ERRORED

@property
def is_errored_in_reset(self) -> bool:
"""
Returns
-------
bool
True if the query failed to reset with an error

"""
return self.current_query_state == QueryState.RESET_FAILED

@property
def is_resetting(self) -> bool:
"""
Expand Down Expand Up @@ -358,6 +377,10 @@ def wait_until_complete(self, sleep_duration=1):
"""
if self.is_executing or self.is_queued or self.is_resetting:
while not (
self.is_finished_executing or self.is_cancelled or self.is_known
self.is_finished_executing
or self.is_cancelled
or self.is_known
or self.is_errored
or self.is_errored_in_reset
):
_sleep(sleep_duration)