Skip to content

Commit

Permalink
Added a test
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Jan 13, 2025
1 parent 135814f commit 18719e4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
62 changes: 60 additions & 2 deletions server/state_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,22 @@ pub fn task_stream(state: Arc<IndexifyState>, executor: ExecutorId, limit: usize
#[cfg(test)]
mod tests {
use data_model::{
test_objects::tests::{mock_graph_a, TEST_NAMESPACE},
test_objects::tests::{
mock_executor,
mock_graph_a,
mock_invocation_payload,
TEST_NAMESPACE,
},
ComputeGraph,
GraphVersion,
Namespace,
};
use requests::{CreateOrUpdateComputeGraphRequest, NamespaceRequest};
use requests::{
CreateOrUpdateComputeGraphRequest,
InvokeComputeGraphRequest,
NamespaceRequest,
RegisterExecutorRequest,
};
use test_state_store::TestStateStore;
use tokio;

Expand Down Expand Up @@ -648,6 +658,54 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_order_state_changes() -> Result<()> {
let indexify_state = TestStateStore::new().await?.indexify_state;
let tx = indexify_state.db.transaction();
let state_change_1 = state_changes::invoke_compute_graph(
&indexify_state.last_state_change_id,
&InvokeComputeGraphRequest {
namespace: "namespace".to_string(),
compute_graph_name: "graph_A".to_string(),
invocation_payload: mock_invocation_payload(),
},
)
.unwrap();
state_machine::save_state_changes(indexify_state.db.clone(), &tx, &state_change_1).unwrap();
tx.commit().unwrap();
let tx = indexify_state.db.transaction();
let state_change_2 = state_changes::register_executor(
&indexify_state.last_state_change_id,
&RegisterExecutorRequest {
executor: mock_executor(),
},
)
.unwrap();
state_machine::save_state_changes(indexify_state.db.clone(), &tx, &state_change_2).unwrap();
tx.commit().unwrap();
let tx = indexify_state.db.transaction();
let state_change_3 = state_changes::invoke_compute_graph(
&indexify_state.last_state_change_id,
&InvokeComputeGraphRequest {
namespace: "namespace".to_string(),
compute_graph_name: "graph_A".to_string(),
invocation_payload: mock_invocation_payload(),
},
)
.unwrap();
state_machine::save_state_changes(indexify_state.db.clone(), &tx, &state_change_3).unwrap();
tx.commit().unwrap();
let state_changes = indexify_state.reader().unprocessed_state_changes().unwrap();
assert_eq!(state_changes.len(), 3);
// global state_change_2
assert_eq!(state_changes[0].id, StateChangeId::new(1));
// state_change_1
assert_eq!(state_changes[1].id, StateChangeId::new(0));
// state_change_3
assert_eq!(state_changes[2].id, StateChangeId::new(2));
Ok(())
}

fn _read_cgs_from_state_store(indexify_state: &IndexifyState) -> Vec<ComputeGraph> {
let reader = indexify_state.reader();
let result = reader
Expand Down
11 changes: 7 additions & 4 deletions server/state_store/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ impl StateReader {
pub fn unprocessed_state_changes(&self) -> Result<Vec<StateChange>> {
let kvs = &[KeyValue::new("op", "get_next_state_change")];
let _timer = Timer::start_with_labels(&self.metrics.state_read, kvs);
let mut state_changes = Vec::new();
let global_state_changes: Vec<StateChange> = self
.get_rows_from_cf_with_limits(
"global".as_bytes(),
Expand All @@ -432,18 +433,20 @@ impl StateReader {
None,
)?
.0;
if global_state_changes.len() > 0 {
return Ok(global_state_changes);
state_changes.extend(global_state_changes);
if state_changes.len() >= 100 {
return Ok(state_changes);
}
let ns_state_changes: Vec<StateChange> = self
.get_rows_from_cf_with_limits(
"ns".as_bytes(),
None,
IndexifyObjectsColumns::UnprocessedStateChanges,
Some(100),
Some(100 - state_changes.len()),
)?
.0;
Ok(ns_state_changes)
state_changes.extend(ns_state_changes);
Ok(state_changes)
}

pub fn get_all_rows_from_cf<V>(
Expand Down

0 comments on commit 18719e4

Please sign in to comment.