Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement deletion of resources cleanly #1154

Merged
merged 36 commits into from
Jan 14, 2025
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
94b8946
update crates
diptanu Jan 11, 2025
0fc75e7
fmt
diptanu Jan 11, 2025
f408669
updated axum metrics
diptanu Jan 11, 2025
f62e519
update dependencies
diptanu Jan 11, 2025
b96cf68
bring back every other metrics
diptanu Jan 11, 2025
aaf560e
update rocksdb
diptanu Jan 11, 2025
9bfea04
simplifying code for updating compute graphs
diptanu Jan 11, 2025
7d8ba08
Rewired api to state machine requests for user facing APIs
diptanu Jan 12, 2025
3ff5885
updated state store to use graph processor
diptanu Jan 12, 2025
75f7473
update code
diptanu Jan 12, 2025
3804a16
renamed stuff
diptanu Jan 12, 2025
19bebf7
update
diptanu Jan 12, 2025
683c945
implemented deletion
diptanu Jan 12, 2025
b30ec74
merge conflicts
diptanu Jan 12, 2025
d6789c0
scheduling tasks when a new exeuctor comes online
diptanu Jan 12, 2025
64a95b5
making seek more efficient
diptanu Jan 12, 2025
84dd3e7
update graph processor
diptanu Jan 12, 2025
d0f2a5b
fixing GC
diptanu Jan 12, 2025
798ca17
Making graph processor make progress even if there are state change u…
diptanu Jan 12, 2025
2453712
shutting down slatedb cleanly
diptanu Jan 12, 2025
99ab374
refreshing executors less frequently
diptanu Jan 12, 2025
8b9c453
fixed a bunch of tests
diptanu Jan 12, 2025
e2a99a0
fixed tests
diptanu Jan 12, 2025
5a185c8
lint
diptanu Jan 12, 2025
0a4e32d
fix test
diptanu Jan 13, 2025
0648e2e
fix lint
diptanu Jan 13, 2025
2ffbc47
removed test
diptanu Jan 13, 2025
c7237fa
clippy
diptanu Jan 13, 2025
267f1e3
lint
diptanu Jan 13, 2025
135814f
add some comments
diptanu Jan 13, 2025
18719e4
Added a test
diptanu Jan 13, 2025
9fbcbdc
fixing tests
diptanu Jan 13, 2025
1d5aa44
update
diptanu Jan 14, 2025
06476f7
fmt
diptanu Jan 14, 2025
fd7520c
creating a tombstone executor event
diptanu Jan 14, 2025
322e59a
Updated namespace key schema
diptanu Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
diptanu committed Jan 13, 2025
commit 5a185c812d9e248aba3985c44be6f6cd85be4e3a
12 changes: 6 additions & 6 deletions server/processor/src/system_tasks.rs
Original file line number Diff line number Diff line change
@@ -101,12 +101,12 @@ impl SystemTasksExecutor {
info!(queuing = invocations.len(), "queueing invocations");

let replay_req = ReplayInvocationsRequest {
namespace: task.namespace.clone(),
compute_graph_name: task.compute_graph_name.clone(),
graph_version: task.graph_version.clone(),
invocation_ids: invocations.iter().map(|i| i.id.clone()).collect(),
restart_key: restart_key.clone(),
};
namespace: task.namespace.clone(),
compute_graph_name: task.compute_graph_name.clone(),
graph_version: task.graph_version.clone(),
invocation_ids: invocations.iter().map(|i| i.id.clone()).collect(),
restart_key: restart_key.clone(),
};
self.state
.write(StateMachineUpdateRequest {
payload: RequestPayload::ReplayInvocations(replay_req),
105 changes: 50 additions & 55 deletions server/src/integration_test.rs
Original file line number Diff line number Diff line change
@@ -58,10 +58,8 @@ mod tests {
.unwrap()
.0;
assert_eq!(tasks.len(), 1);
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes()
.unwrap();
let unprocessed_state_changes =
indexify_state.reader().unprocessed_state_changes().unwrap();
// Processes the invoke cg event and creates a task created event
assert_eq!(unprocessed_state_changes.len(), 1);
Ok(())
@@ -75,7 +73,12 @@ mod tests {

// Should have 1 unprocessed state - one task created event
let unprocessed_state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(1, unprocessed_state_changes.len(), "{:?}", unprocessed_state_changes);
assert_eq!(
1,
unprocessed_state_changes.len(),
"{:?}",
unprocessed_state_changes
);
test_srv.process_all().await?;

let tasks = indexify_state
@@ -87,30 +90,38 @@ mod tests {

// Should have 0 unprocessed state - one task it would be unallocated
let unprocessed_state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(0, unprocessed_state_changes.len(), "{:?}", unprocessed_state_changes);
assert_eq!(
0,
unprocessed_state_changes.len(),
"{:?}",
unprocessed_state_changes
);
let unallocated_tasks = indexify_state.reader().unallocated_tasks()?;
assert_eq!(1, unallocated_tasks.len(), "{:?}", unallocated_tasks);
let task = &tasks[0];
// Finish the task and check if new tasks are created
indexify_state.write(StateMachineUpdateRequest{
processed_state_changes: vec![],
payload: RequestPayload::FinalizeTask(FinalizeTaskRequest {
namespace: TEST_NAMESPACE.to_string(),
compute_graph: "graph_A".to_string(),
compute_fn: "fn_a".to_string(),
invocation_id: invocation_id.clone(),
task_id: task.id.clone(),
node_outputs: vec![mock_node_fn_output(
invocation_id.as_str(),
"graph_A",
"fn_a",
None,
)],
task_outcome: TaskOutcome::Success,
executor_id: ExecutorId::new(TEST_EXECUTOR_ID.to_string()),
diagnostics: None,
indexify_state
.write(StateMachineUpdateRequest {
processed_state_changes: vec![],
payload: RequestPayload::FinalizeTask(FinalizeTaskRequest {
namespace: TEST_NAMESPACE.to_string(),
compute_graph: "graph_A".to_string(),
compute_fn: "fn_a".to_string(),
invocation_id: invocation_id.clone(),
task_id: task.id.clone(),
node_outputs: vec![mock_node_fn_output(
invocation_id.as_str(),
"graph_A",
"fn_a",
None,
)],
task_outcome: TaskOutcome::Success,
executor_id: ExecutorId::new(TEST_EXECUTOR_ID.to_string()),
diagnostics: None,
}),
})
}).await.unwrap();
.await
.unwrap();
test_srv.process_ns().await?;

let tasks = indexify_state
@@ -119,12 +130,11 @@ mod tests {
.unwrap()
.0;
assert_eq!(tasks.len(), 3);
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes()
.unwrap();
let unprocessed_state_changes =
indexify_state.reader().unprocessed_state_changes().unwrap();

// At this point there should be 2 unprocessed task created state changes - for the two new tasks
// At this point there should be 2 unprocessed task created state changes - for
// the two new tasks
assert_eq!(
unprocessed_state_changes.len(),
2,
@@ -157,10 +167,8 @@ mod tests {

test_srv.process_all().await?;

let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes()
.unwrap();
let unprocessed_state_changes =
indexify_state.reader().unprocessed_state_changes().unwrap();

// no more tasks since invocation failed
assert_eq!(
@@ -402,10 +410,8 @@ mod tests {
.unwrap()
.0;
assert_eq!(tasks.len(), 2);
let unprocessed_state_changes = indexify_state
.reader()
.unprocessed_state_changes()
.unwrap();
let unprocessed_state_changes =
indexify_state.reader().unprocessed_state_changes().unwrap();

// has task created state change in it.
assert_eq!(
@@ -663,9 +669,7 @@ mod tests {
}

{
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let graph_ctx = indexify_state
@@ -971,9 +975,7 @@ mod tests {
}

{
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let graph_ctx = indexify_state
@@ -1244,9 +1246,7 @@ mod tests {
}

{
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let graph_ctx = indexify_state
@@ -1495,9 +1495,8 @@ mod tests {
.find(|t| t.compute_fn_name == "fn_reduce")
.unwrap();

let all_unprocessed_state_changes_before = indexify_state
.reader()
.unprocessed_state_changes()?;
let all_unprocessed_state_changes_before =
indexify_state.reader().unprocessed_state_changes()?;

let request = make_finalize_request("fn_reduce", &reduce_task.id, 1);
indexify_state
@@ -1631,9 +1630,7 @@ mod tests {
test_srv.process_all().await?;
}
{
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(
state_changes.len(),
0,
@@ -1864,9 +1861,7 @@ mod tests {

// Expect no more tasks and a completed graph
{
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let graph_ctx = indexify_state
24 changes: 6 additions & 18 deletions server/src/system_task_test.rs
Original file line number Diff line number Diff line change
@@ -198,9 +198,7 @@ mod tests {

test_srv.process_all().await?;

let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let graph_ctx = indexify_state.reader().invocation_ctx(
@@ -230,9 +228,7 @@ mod tests {
system_tasks_executor.lock().await.run().await?;

// Since graph version is the same it should generate new tasks
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

let system_tasks = indexify_state.reader().get_system_tasks(None).unwrap().0;
@@ -287,9 +283,7 @@ mod tests {
assert_eq!(system_tasks.len(), 1);

// Since graph version is different new changes should be generated
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 1);

// Number of pending system tasks should be incremented
@@ -377,9 +371,7 @@ mod tests {

test_srv.process_all().await?;

let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
assert_eq!(state_changes.len(), 0);

// Number of pending system tasks should be decremented after graph completion
@@ -496,9 +488,7 @@ mod tests {
let incomplete_tasks = tasks
.iter()
.filter(|t: &&data_model::Task| t.outcome == TaskOutcome::Unknown);
let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
if state_changes.is_empty() && incomplete_tasks.count() == 0 {
break;
}
@@ -573,9 +563,7 @@ mod tests {

let system_tasks = indexify_state.reader().get_system_tasks(None).unwrap().0;

let state_changes = indexify_state
.reader()
.unprocessed_state_changes()?;
let state_changes = indexify_state.reader().unprocessed_state_changes()?;
if state_changes.is_empty() && num_incomplete_tasks == 0 && system_tasks.is_empty() {
break;
}
20 changes: 17 additions & 3 deletions server/src/testing.rs
Original file line number Diff line number Diff line change
@@ -44,17 +44,31 @@ impl TestService {
}

pub async fn process_all(&self) -> Result<()> {
while self.service.indexify_state.reader().unprocessed_state_changes()?.len() > 0 {
while self
.service
.indexify_state
.reader()
.unprocessed_state_changes()?
.len() >
0
{
self.process_ns().await?;
}
Ok(())
}

pub async fn process_ns(&self) -> Result<()> {
let notify = Arc::new(tokio::sync::Notify::new());
let mut cached_state_changes: Vec<StateChange> = self.service.indexify_state.reader().unprocessed_state_changes()?;
let mut cached_state_changes: Vec<StateChange> = self
.service
.indexify_state
.reader()
.unprocessed_state_changes()?;
while !cached_state_changes.is_empty() {
self.service.graph_processor.write_sm_update(&mut cached_state_changes, &notify).await?;
self.service
.graph_processor
.write_sm_update(&mut cached_state_changes, &notify)
.await?;
}
Ok(())
}