From 363c87a5c40a4e06c0ab7574605dbabad8a5c850 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 1 Dec 2024 12:34:01 -0800 Subject: [PATCH] not updating tasks for graphs which are completed (#1071) * not updating tasks for graphs which are completed * lint * add check to ensure we don't update tasks if the invocation was deleted * checking if graph context is available --- server/state_store/src/state_machine.rs | 55 +++++++++++++++++++++---- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index fccdb347a..de4d2e8c8 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -7,6 +7,7 @@ use data_model::{ ExecutorId, GraphInvocationCtx, GraphInvocationCtxBuilder, + InvocationPayload, InvokeComputeGraphEvent, Namespace, NodeOutput, @@ -30,7 +31,7 @@ use rocksdb::{ TransactionDB, }; use strum::AsRefStr; -use tracing::error; +use tracing::{error, info}; use super::serializer::{JsonEncode, JsonEncoder}; use crate::{ @@ -717,6 +718,39 @@ pub fn mark_task_completed( req: FinalizeTaskRequest, sm_metrics: Arc, ) -> Result { + // Check if the graph exists before proceeding since + // the graph might have been deleted before the task completes + let graph_key = ComputeGraph::key_from(&req.namespace, &req.compute_graph); + let graph = txn + .get_cf( + &IndexifyObjectsColumns::ComputeGraphs.cf_db(&db), + &graph_key, + ) + .map_err(|e| anyhow!("failed to get compute graph: {}", e))?; + if graph.is_none() { + info!( + "Compute graph not found: {} for task completion update for task id: {}", + &req.compute_graph, &req.task_id + ); + return Ok(false); + } + + // Check if the invocation was deleted before the task completes + let invocation_id = + InvocationPayload::key_from(&req.namespace, &req.compute_graph, &req.invocation_id); + let invocation = txn + .get_cf( + &IndexifyObjectsColumns::GraphInvocations.cf_db(&db), + &invocation_id, + ) + .map_err(|e| anyhow!("failed to get invocation: {}", e))?; + if invocation.is_none() { + info!( + "Invocation not found: {} for task completion update for task id: {}", + &req.invocation_id, &req.task_id + ); + return Ok(false); + } let task_key = format!( "{}|{}|{}|{}|{}", req.namespace, req.compute_graph, req.invocation_id, req.compute_fn, req.task_id @@ -737,12 +771,19 @@ pub fn mark_task_completed( &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db), &graph_ctx_key, true, - )? - .ok_or(anyhow!( - "Graph context not found for task: {}", - &req.task_id - ))?; - let mut graph_ctx: GraphInvocationCtx = JsonEncoder::decode(&graph_ctx)?; + ) + .map_err(|e| anyhow!("failed to get graph context: {}", e))?; + if graph_ctx.is_none() { + error!( + "Graph context for graph {} and invocation {} not found for task: {}", + &req.compute_graph, &req.invocation_id, &req.task_id + ); + return Ok(false); + } + let mut graph_ctx: GraphInvocationCtx = JsonEncoder::decode(&graph_ctx.ok_or(anyhow!( + "Graph context not found for task: {}", + &req.task_id + ))?)?; for mut output in req.node_outputs { // Update with correct graph version output.graph_version = graph_ctx.graph_version;