Skip to content

Commit

Permalink
reduce number of conditions and indentation
Browse files Browse the repository at this point in the history
  • Loading branch information
maxkozlovsky committed May 21, 2024
1 parent 8683858 commit ba2629c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 66 deletions.
114 changes: 62 additions & 52 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,78 @@ impl Scheduler {
Ok(tables)
}

async fn gc_state_change(
&self,
content: indexify_internal_api::ContentMetadata,
) -> Result<Vec<StateChange>> {
let root_content_id = if let Some(root_id) = content.root_content_id {
self.shared_state
.state_machine
.get_latest_version_of_content(&root_id)?
.map(|c| c.id)
} else {
Some(content.id)
};
// Since we processed NewContent without creating a task, need to trigger
// garbage collection for previous content if root content was updated.
match root_content_id {
Some(id) if id.version > 1 => Ok(vec![StateChange::new(
id.to_string(),
indexify_internal_api::ChangeType::TaskCompleted {
root_content_id: id,
},
timestamp_secs(),
)]),
_ => Ok(Vec::new()),
}
}

pub async fn create_new_tasks(&self, state_change: StateChange) -> Result<()> {
let mut tasks: Vec<internal_api::Task> = Vec::new();
if let Some(content) = self
let content = match self
.shared_state
.state_machine
.get_latest_version_of_content(&state_change.object_id)?
{
let extraction_policies = self
.shared_state
.match_extraction_policies_for_content(&content)
.await?;
let tables = self.tables_for_policies(&extraction_policies).await?;
for extraction_policy in extraction_policies {
let task = self
.create_task(&extraction_policy.id, &content, &tables)
.await?;
tasks.push(task);
}
if !tasks.is_empty() {
self.shared_state
.create_tasks(tasks.clone(), state_change.id)
.await?;
let allocation_plan = self.allocate_tasks(tasks).await?;
if !allocation_plan.0.is_empty() {
self.shared_state
.commit_task_assignments(allocation_plan.0, state_change.id)
.await
} else {
Ok(())
}
} else {
let root_content_id = if let Some(root_id) = content.root_content_id {
self.shared_state
.state_machine
.get_latest_version_of_content(&&root_id)?
.map(|c| c.id)
} else {
Some(content.id)
};
// Since we processed NewContent without creating a task, need to trigger
// garbage collection for previous content if root content was updated.
let new_state_changes = if let Some(root_content_id) = root_content_id {
if root_content_id.version > 1 {
vec![StateChange::new(
root_content_id.to_string(),
indexify_internal_api::ChangeType::TaskCompleted { root_content_id },
timestamp_secs(),
)]
} else {
Vec::new()
}
} else {
Vec::new()
};
self.shared_state
.mark_change_events_as_processed(vec![state_change], new_state_changes)
Some(content) => content,
None => {
return self
.shared_state
.mark_change_events_as_processed(vec![state_change], Vec::new())
.await
}
} else {
};
let extraction_policies = self
.shared_state
.match_extraction_policies_for_content(&content)
.await?;
let tables = self.tables_for_policies(&extraction_policies).await?;
for extraction_policy in extraction_policies {
let task = self
.create_task(&extraction_policy.id, &content, &tables)
.await?;
tasks.push(task);
}
if tasks.is_empty() {
return self
.shared_state
.mark_change_events_as_processed(
vec![state_change],
self.gc_state_change(content).await?,
)
.await;
}

self.shared_state
.create_tasks(tasks.clone(), state_change.id)
.await?;
let allocation_plan = self.allocate_tasks(tasks).await?;
if !allocation_plan.0.is_empty() {
self.shared_state
.mark_change_events_as_processed(vec![state_change], Vec::new())
.commit_task_assignments(allocation_plan.0, state_change.id)
.await
} else {
Ok(())
}
}

Expand Down
25 changes: 11 additions & 14 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,23 +599,20 @@ impl App {
};
// Trigger garbage collection for previous content if the root content has been
// updated.
let new_state_changes = if let Some(root_content_id) = root_content_id {
if root_content_id.version > 1 {
vec![StateChange::new(
root_content_id.to_string(),
indexify_internal_api::ChangeType::TaskCompleted { root_content_id },
timestamp_secs(),
)]
} else {
Vec::new()
}
} else {
Vec::new()
let new_state_changes = match root_content_id {
Some(id) if id.version > 1 => vec![StateChange::new(
id.to_string(),
indexify_internal_api::ChangeType::TaskCompleted {
root_content_id: id,
},
timestamp_secs(),
)],
_ => Vec::new(),
};
let req = StateMachineUpdateRequest {
payload: RequestPayload::UpdateTask {
task: task.clone(),
executor_id: executor_id.clone(),
task,
executor_id,
update_time: SystemTime::now(),
},
new_state_changes,
Expand Down

0 comments on commit ba2629c

Please sign in to comment.