Skip to content

Commit

Permalink
not updating tasks for graphs which are completed (#1071)
Browse files Browse the repository at this point in the history
* not updating tasks for graphs which are completed

* lint

* add check to ensure we don't update tasks if the invocation was deleted

* checking if graph context is available
  • Loading branch information
diptanu authored Dec 1, 2024
1 parent 4cab1d0 commit 363c87a
Showing 1 changed file with 48 additions and 7 deletions.
55 changes: 48 additions & 7 deletions server/state_store/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use data_model::{
ExecutorId,
GraphInvocationCtx,
GraphInvocationCtxBuilder,
InvocationPayload,
InvokeComputeGraphEvent,
Namespace,
NodeOutput,
Expand All @@ -30,7 +31,7 @@ use rocksdb::{
TransactionDB,
};
use strum::AsRefStr;
use tracing::error;
use tracing::{error, info};

use super::serializer::{JsonEncode, JsonEncoder};
use crate::{
Expand Down Expand Up @@ -717,6 +718,39 @@ pub fn mark_task_completed(
req: FinalizeTaskRequest,
sm_metrics: Arc<StateStoreMetrics>,
) -> Result<bool> {
// Check if the graph exists before proceeding since
// the graph might have been deleted before the task completes
let graph_key = ComputeGraph::key_from(&req.namespace, &req.compute_graph);
let graph = txn
.get_cf(
&IndexifyObjectsColumns::ComputeGraphs.cf_db(&db),
&graph_key,
)
.map_err(|e| anyhow!("failed to get compute graph: {}", e))?;
if graph.is_none() {
info!(
"Compute graph not found: {} for task completion update for task id: {}",
&req.compute_graph, &req.task_id
);
return Ok(false);
}

// Check if the invocation was deleted before the task completes
let invocation_id =
InvocationPayload::key_from(&req.namespace, &req.compute_graph, &req.invocation_id);
let invocation = txn
.get_cf(
&IndexifyObjectsColumns::GraphInvocations.cf_db(&db),
&invocation_id,
)
.map_err(|e| anyhow!("failed to get invocation: {}", e))?;
if invocation.is_none() {
info!(
"Invocation not found: {} for task completion update for task id: {}",
&req.invocation_id, &req.task_id
);
return Ok(false);
}
let task_key = format!(
"{}|{}|{}|{}|{}",
req.namespace, req.compute_graph, req.invocation_id, req.compute_fn, req.task_id
Expand All @@ -737,12 +771,19 @@ pub fn mark_task_completed(
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
&graph_ctx_key,
true,
)?
.ok_or(anyhow!(
"Graph context not found for task: {}",
&req.task_id
))?;
let mut graph_ctx: GraphInvocationCtx = JsonEncoder::decode(&graph_ctx)?;
)
.map_err(|e| anyhow!("failed to get graph context: {}", e))?;
if graph_ctx.is_none() {
error!(
"Graph context for graph {} and invocation {} not found for task: {}",
&req.compute_graph, &req.invocation_id, &req.task_id
);
return Ok(false);
}
let mut graph_ctx: GraphInvocationCtx = JsonEncoder::decode(&graph_ctx.ok_or(anyhow!(
"Graph context not found for task: {}",
&req.task_id
))?)?;
for mut output in req.node_outputs {
// Update with correct graph version
output.graph_version = graph_ctx.graph_version;
Expand Down

0 comments on commit 363c87a

Please sign in to comment.