Skip to content

Commit

Permalink
Fail graph update if graph version didn't change
Browse files Browse the repository at this point in the history
This provides a clear error message to customers showing them that
they need to update graph version when deploying an update to it
if they manually manage graph versions.

Testing:

make fmt
make test
  • Loading branch information
eabatalov committed Jan 13, 2025
1 parent 48bda77 commit ff64883
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 70 deletions.
29 changes: 28 additions & 1 deletion python-sdk/tests/src/tests/test_graph_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import tests.testing
from indexify import RemoteGraph
from indexify.error import GraphStillProcessing
from indexify.error import ApiException, GraphStillProcessing
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import indexify_function
from tests.testing import test_graph_name
Expand Down Expand Up @@ -174,6 +174,33 @@ def end_node(data: dict) -> int:
except GraphStillProcessing:
time.sleep(1)

def test_graph_update_fails_without_version_update(self):
graph_name = test_graph_name(self)

@indexify_function()
def function_a() -> str:
return "success"

g = Graph(
name=graph_name,
description="test description",
start_node=function_a,
)
RemoteGraph.deploy(g)

g.description = "updated description without version update"
try:
RemoteGraph.deploy(g)
self.fail("Expected an exception to be raised")
except ApiException as e:
self.assertIn(
"This graph version already exists, please update the graph version",
str(e),
)
self.assertIn("status code: 400", str(e))
except Exception as e:
self.fail(f"Unexpected exception: {e}")


if __name__ == "__main__":
unittest.main()
105 changes: 71 additions & 34 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod test_objects;

use std::{
collections::HashMap,
error::Error,
fmt::{self, Display},
hash::{DefaultHasher, Hash, Hasher},
time::{SystemTime, UNIX_EPOCH},
Expand All @@ -15,7 +16,7 @@ use filter::LabelsFilter;
use indexify_utils::{default_creation_time, get_epoch_time_in_ms};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use strum::AsRefStr;
use strum::{AsRefStr, Display};

// Invoke graph for all existing payloads
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -334,42 +335,31 @@ impl ComputeGraph {
format!("{}|{}", namespace, name)
}

/// Update a compute graph by computing fields and ignoring immutable
/// fields.
/// Update the compute graph from all the supplied Graph fields.
///
/// Assumes validated update values.
pub fn update(&mut self, update: ComputeGraph) -> Option<ComputeGraphVersion> {
/// Assumes validated update values. Fails if the new graph version is not
/// updated. Returns the new graph version.
pub fn update(&mut self, update: ComputeGraph) -> Result<ComputeGraphVersion> {
// immutable fields
// self.namespace = other.namespace;
// self.name = other.name;
// self.created_at = other.created_at;
// self.replaying = other.replaying;

let mut graph_version: Option<ComputeGraphVersion> = None;
if self.version != update.version {
self.version = update.version;
self.code = update.code;
self.edges = update.edges;
self.start_fn = update.start_fn;
self.runtime_information = update.runtime_information;
self.nodes = update.nodes.clone();
self.description = update.description;
self.tags = update.tags;

graph_version = Some(self.into_version());
} else {
tracing::info!(
"doing nothing on graph {}:{} update because the version {} didn't change",
self.namespace,
self.name,
self.version.0
);
// TODO: Return a clear error with a message from Server API so
// client understands that no update happened and why
// so.
if self.version == update.version {
return Err(anyhow!(ComputeGraphError::VersionExists));
}

graph_version
self.version = update.version;
self.code = update.code;
self.edges = update.edges;
self.start_fn = update.start_fn;
self.runtime_information = update.runtime_information;
self.nodes = update.nodes.clone();
self.description = update.description;
self.tags = update.tags;

Ok(self.into_version())
}

pub fn into_version(&self) -> ComputeGraphVersion {
Expand All @@ -387,6 +377,13 @@ impl ComputeGraph {
}
}

#[derive(Debug, Display, PartialEq)]
pub enum ComputeGraphError {
VersionExists,
}

impl Error for ComputeGraphError {}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ComputeGraphVersion {
// Graph is currently versioned manually by users.
Expand Down Expand Up @@ -1042,6 +1039,7 @@ mod tests {
test_objects::tests::test_compute_fn,
ComputeGraph,
ComputeGraphCode,
ComputeGraphError,
ComputeGraphVersion,
DynamicEdgeRouter,
GraphVersion,
Expand Down Expand Up @@ -1090,6 +1088,7 @@ mod tests {
update: ComputeGraph,
expected_graph: ComputeGraph,
expected_version: Option<ComputeGraphVersion>,
expected_error: Option<ComputeGraphError>,
}

let test_cases = [
Expand All @@ -1098,6 +1097,7 @@ mod tests {
update: original_graph.clone(),
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "version update",
Expand All @@ -1113,6 +1113,7 @@ mod tests {
version: GraphVersion::from("100"),
..original_graph.into_version()
}),
expected_error: None,
},
TestCase {
description: "immutable fields should not change when version changed",
Expand All @@ -1132,6 +1133,7 @@ mod tests {
version: GraphVersion::from("100"),
..original_graph.into_version()
}),
expected_error: None,
},
// Runtime information.
TestCase {
Expand All @@ -1145,6 +1147,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "changing runtime information with version change should change runtime information",
Expand Down Expand Up @@ -1172,6 +1175,7 @@ mod tests {
},
..original_graph.into_version()
}),
expected_error: None,
},
// Code.
TestCase {
Expand All @@ -1185,6 +1189,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "changing code with version change should change code",
Expand Down Expand Up @@ -1212,6 +1217,7 @@ mod tests {
},
..original_graph.into_version()
}),
expected_error: None,
},
// Edges.
TestCase {
Expand All @@ -1225,6 +1231,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "changing edges with version change should change edges",
Expand Down Expand Up @@ -1252,6 +1259,7 @@ mod tests {
)]),
..original_graph.into_version()
}),
expected_error: None,
},
// start_fn.
TestCase {
Expand All @@ -1262,6 +1270,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "changing start function with version change should change start function",
Expand All @@ -1280,6 +1289,7 @@ mod tests {
start_fn: Node::Compute(fn_b.clone()),
..original_graph.into_version()
}),
expected_error: None,
},
// Adding a node.
TestCase {
Expand All @@ -1295,6 +1305,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "adding a node with version change should add node",
Expand Down Expand Up @@ -1328,6 +1339,7 @@ mod tests {
]),
..original_graph.into_version()
}),
expected_error: None,
},
// Removing a node.
TestCase {
Expand All @@ -1342,6 +1354,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "removing a node with version change should remove the node",
Expand Down Expand Up @@ -1370,6 +1383,7 @@ mod tests {
]),
..original_graph.into_version()
}),
expected_error: None,
},
// Changing a node's image.
TestCase {
Expand All @@ -1384,6 +1398,7 @@ mod tests {
},
expected_graph: original_graph.clone(),
expected_version: None,
expected_error: Some(ComputeGraphError::VersionExists),
},
TestCase {
description: "changing a node's image with version change should update the image and version",
Expand Down Expand Up @@ -1414,22 +1429,44 @@ mod tests {
]),
..original_graph.into_version()
}),
expected_error: None,
},
];

for test_case in test_cases.iter() {
let mut updated_graph = original_graph.clone();
let updated_version = updated_graph.update(test_case.update.clone());
let update_version_result = updated_graph.update(test_case.update.clone());

assert_eq!(
updated_graph, test_case.expected_graph,
"{}",
test_case.description
);
assert_eq!(
updated_version, test_case.expected_version,
"{}",
test_case.description
);
match update_version_result {
Ok(updated_version) => {
assert_eq!(
&updated_version,
test_case.expected_version.as_ref().unwrap(),
"{}",
test_case.description
);
}
Err(err) => match err.root_cause().downcast_ref::<ComputeGraphError>() {
Some(err) => {
assert_eq!(
err,
test_case.expected_error.as_ref().unwrap(),
"{}",
test_case.description
);
}
None => assert!(
false,
"{}, unexpected error type: {}",
test_case.description, err
),
},
}
}
}

Expand Down
16 changes: 10 additions & 6 deletions server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use axum_tracing_opentelemetry::{
middleware::{OtelAxumLayer, OtelInResponseLayer},
};
use blob_store::PutResult;
use data_model::ExecutorId;
use data_model::{ComputeGraphError, ExecutorId};
use futures::StreamExt;
use hyper::StatusCode;
use indexify_ui::Assets as UiAssets;
Expand Down Expand Up @@ -453,11 +453,15 @@ async fn create_or_update_compute_graph(
namespace,
compute_graph,
});
state
.dispatcher
.dispatch_requests(request)
.await
.map_err(IndexifyAPIError::internal_error)?;
let result = state.dispatcher.dispatch_requests(request).await;
if let Err(e) = result {
return match e.root_cause().downcast_ref::<ComputeGraphError>() {
Some(ComputeGraphError::VersionExists) => Err(IndexifyAPIError::bad_request(
"This graph version already exists, please update the graph version",
)),
_ => Err(IndexifyAPIError::internal_error(e)),
};
}

info!("compute graph created: {}", name);

Expand Down
Loading

0 comments on commit ff64883

Please sign in to comment.