Skip to content

Commit

Permalink
return status from create_content() (#610)
Browse files Browse the repository at this point in the history
Return status from create_content() so the caller can determine if
content was added or a duplicate was detected. Delete the uploaded
content in case of duplicate and don't add new embeddings/metadata.
  • Loading branch information
maxkozlovsky authored May 20, 2024
1 parent ac2479e commit a095556
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 27 deletions.
30 changes: 28 additions & 2 deletions crates/indexify_proto/src/indexify_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ pub struct CreateContentRequest {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateContentResponse {
#[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
#[prost(enumeration = "CreateContentStatus", tag = "2")]
pub status: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -775,6 +775,32 @@ impl TaskOutcome {
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CreateContentStatus {
Created = 0,
Duplicate = 1,
}
impl CreateContentStatus {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
CreateContentStatus::Created => "CREATED",
CreateContentStatus::Duplicate => "DUPLICATE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CREATED" => Some(Self::Created),
"DUPLICATE" => Some(Self::Duplicate),
_ => None,
}
}
}
/// Generated client implementations.
pub mod coordinator_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
Expand Down
7 changes: 6 additions & 1 deletion protos/coordinator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,17 @@ message ContentMetadata {
repeated string extraction_graph_names = 14;
}

enum CreateContentStatus {
CREATED = 0;
DUPLICATE = 1;
}

message CreateContentRequest {
ContentMetadata content = 2;
}

message CreateContentResponse {
string id = 1;
CreateContentStatus status = 2;
}

message TombstoneContentRequest {
Expand Down
36 changes: 25 additions & 11 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use anyhow::{anyhow, Ok, Result};
use indexify_internal_api as internal_api;
use indexify_proto::indexify_coordinator;
use indexify_proto::indexify_coordinator::{self, CreateContentStatus};
use internal_api::{
ContentMetadataId,
ExtractionGraph,
Expand Down Expand Up @@ -539,9 +539,8 @@ impl Coordinator {
pub async fn create_content_metadata(
&self,
content_list: Vec<indexify_internal_api::ContentMetadata>,
) -> Result<()> {
self.shared_state.create_content_batch(content_list).await?;
Ok(())
) -> Result<Vec<CreateContentStatus>> {
self.shared_state.create_content_batch(content_list).await
}

pub async fn tombstone_content_metadatas(&self, content_ids: &[String]) -> Result<()> {
Expand Down Expand Up @@ -588,6 +587,7 @@ mod tests {
use std::{collections::HashMap, fs, sync::Arc, time::Duration, vec};

use indexify_internal_api as internal_api;
use indexify_proto::indexify_coordinator::CreateContentStatus;
use internal_api::{ContentMetadataId, ContentSource, Task, TaskOutcome};
use test_util::db_utils::Parent::{Child, Root};

Expand Down Expand Up @@ -1293,9 +1293,11 @@ mod tests {
executor_id: &str,
) -> Result<(), anyhow::Error> {
let content = create_content_for_task(coordinator, task, id).await?;
coordinator
let create_res = coordinator
.create_content_metadata(vec![content.clone()])
.await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Created);
complete_task(coordinator, task, executor_id).await
}

Expand Down Expand Up @@ -1426,9 +1428,11 @@ mod tests {
coordinator.run_scheduler().await?;

let parent_content = test_mock_content_metadata("test_parent_id", "", &eg.name);
coordinator
let create_res = coordinator
.create_content_metadata(vec![parent_content.clone()])
.await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Created);
coordinator.run_scheduler().await?;
let all_tasks = coordinator.shared_state.list_all_unfinished_tasks().await?;
assert_eq!(all_tasks.len(), 1);
Expand All @@ -1444,9 +1448,11 @@ mod tests {
// update root content
let mut parent_content_1 = parent_content.clone();
parent_content_1.hash = "test_parent_id_1".into();
coordinator
let create_res = coordinator
.create_content_metadata(vec![parent_content_1.clone()])
.await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Created);
coordinator.run_scheduler().await?;
let all_tasks = coordinator.shared_state.list_all_unfinished_tasks().await?;
assert_eq!(all_tasks.len(), 1);
Expand Down Expand Up @@ -1523,20 +1529,24 @@ mod tests {
// update root content and have the first child be identical to previous version
let mut parent_content_2 = parent_content_1.clone();
parent_content_2.hash = "test_parent_id_2".into();
coordinator
let create_res = coordinator
.create_content_metadata(vec![parent_content_2.clone()])
.await?;
coordinator.run_scheduler().await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Created);
let all_tasks = coordinator.shared_state.list_all_unfinished_tasks().await?;
assert_eq!(all_tasks.len(), 1);

let mut child_content =
create_content_for_task(&coordinator, &all_tasks[0], &next_child(&mut child_id))
.await?;
child_content.hash = tree[1].hash.clone();
coordinator
let create_res = coordinator
.create_content_metadata(vec![child_content])
.await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Duplicate);
complete_task(&coordinator, &all_tasks[0], "test_executor_id_1").await?;

coordinator.run_scheduler().await?;
Expand Down Expand Up @@ -1605,9 +1615,11 @@ mod tests {
// previous version
let mut parent_content_3 = parent_content_2.clone();
parent_content_3.hash = "test_parent_id_3".into();
coordinator
let create_res = coordinator
.create_content_metadata(vec![parent_content_3.clone()])
.await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Created);
coordinator.run_scheduler().await?;
let all_tasks = coordinator.shared_state.list_all_unfinished_tasks().await?;
assert_eq!(all_tasks.len(), 1);
Expand Down Expand Up @@ -1643,7 +1655,9 @@ mod tests {
.find(|c| c.source == ContentSource::ExtractionPolicyName(policy.name.clone()))
.unwrap();
content.hash = prev_content.hash.clone();
coordinator.create_content_metadata(vec![content]).await?;
let create_res = coordinator.create_content_metadata(vec![content]).await?;
assert_eq!(create_res.len(), 1);
assert_eq!(*create_res.first().unwrap(), CreateContentStatus::Duplicate);
complete_task(&coordinator, &all_tasks[0], "test_executor_id_1").await?;
coordinator.run_scheduler().await?;
// No new task should be created
Expand Down
10 changes: 7 additions & 3 deletions src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,18 @@ impl CoordinatorService for CoordinatorServiceServer {
.content
.ok_or(tonic::Status::aborted("content is missing"))?;
let content_meta: indexify_internal_api::ContentMetadata = content_meta.into();
let id = content_meta.id.clone();
let content_list = vec![content_meta];
let _ = self
let statuses = self
.coordinator
.create_content_metadata(content_list)
.await
.map_err(|e| tonic::Status::aborted(e.to_string()))?;
Ok(tonic::Response::new(CreateContentResponse { id: id.id }))
Ok(tonic::Response::new(CreateContentResponse {
status: *statuses
.first()
.ok_or_else(|| tonic::Status::aborted("result invalid"))?
as i32,
}))
}

async fn tombstone_content(
Expand Down
18 changes: 15 additions & 3 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use indexify_internal_api as internal_api;
use indexify_proto::indexify_coordinator::{self};
use indexify_proto::indexify_coordinator::{self, CreateContentStatus};
use itertools::Itertools;
use mime::Mime;
use nanoid::nanoid;
Expand Down Expand Up @@ -732,7 +732,8 @@ impl DataManager {
let req = indexify_coordinator::CreateContentRequest {
content: Some(content_metadata.clone()),
};
self.coordinator_client
let res = self
.coordinator_client
.get()
.await?
.create_content(GrpcHelper::into_req(req))
Expand All @@ -742,7 +743,18 @@ impl DataManager {
"unable to write content metadata to coordinator {}",
e.to_string()
)
})?;
})?
.into_inner();
if res.status() == CreateContentStatus::Duplicate {
if let Err(e) = self.delete_file(&content_metadata.storage_url).await {
tracing::warn!(
"unable to delete duplicate file for {:?}: {}",
content_metadata.id,
e
);
}
return Ok(());
}
let content_metadata_labels = content_metadata
.labels
.iter()
Expand Down
38 changes: 31 additions & 7 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use std::{
use anyhow::{anyhow, Result};
use grpc_server::RaftGrpcServer;
use indexify_internal_api as internal_api;
use indexify_proto::indexify_raft::raft_api_server::RaftApiServer;
use indexify_proto::{
indexify_coordinator::CreateContentStatus,
indexify_raft::raft_api_server::RaftApiServer,
};
use internal_api::{
ContentMetadataId,
ExtractionGraph,
Expand Down Expand Up @@ -146,6 +149,7 @@ pub struct RaftConfigOverrides {
fn add_update_entry(
update_entries: &mut Vec<CreateOrUpdateContentEntry>,
state_changes: &mut Vec<StateChange>,
statuses: &mut Vec<CreateContentStatus>,
content: internal_api::ContentMetadata,
) {
// Hold a reference to the content until the tasks are created if any.
Expand All @@ -159,6 +163,7 @@ fn add_update_entry(
content,
previous_parent: None,
});
statuses.push(CreateContentStatus::Created);
}

impl App {
Expand Down Expand Up @@ -780,9 +785,9 @@ impl App {
pub async fn create_content_batch(
&self,
content_metadata: Vec<internal_api::ContentMetadata>,
) -> Result<()> {
) -> Result<Vec<CreateContentStatus>> {
if content_metadata.is_empty() {
return Ok(());
return Ok(Vec::new());
}
let ns = &content_metadata.first().unwrap().namespace.clone();
let extraction_graph_names = &content_metadata
Expand All @@ -802,6 +807,8 @@ impl App {
}
}

let mut statuses = Vec::new();

let content_ids: Vec<String> = content_metadata.iter().map(|c| c.id.id.clone()).collect();
let existing_content = self.get_content_metadata_batch(content_ids.clone()).await?;
let existing_content_map: HashMap<String, internal_api::ContentMetadata> = existing_content
Expand All @@ -820,7 +827,12 @@ impl App {
// This is a root node that is being updated. Mark existing content as no
// longer latest and write both existing and new content.
incoming_content.id.version = existing_content.id.version + 1;
add_update_entry(&mut update_entries, &mut state_changes, incoming_content);
add_update_entry(
&mut update_entries,
&mut state_changes,
&mut statuses,
incoming_content,
);
let mut existing_content = existing_content.clone();
existing_content.latest = false;
update_entries.push(CreateOrUpdateContentEntry {
Expand All @@ -829,13 +841,19 @@ impl App {
});
} else {
tracing::warn!("Content with the same id and hash has been received");
statuses.push(CreateContentStatus::Duplicate);
}
continue;
}
let incoming_content_parent_id = match incoming_content.parent_id.clone() {
None => {
// This is a new root node, create the content
add_update_entry(&mut update_entries, &mut state_changes, incoming_content);
add_update_entry(
&mut update_entries,
&mut state_changes,
&mut statuses,
incoming_content,
);
continue;
}
Some(parent_id) => parent_id,
Expand Down Expand Up @@ -875,7 +893,12 @@ impl App {
.find(|content| content.hash == incoming_content.hash)
{
None => {
add_update_entry(&mut update_entries, &mut state_changes, incoming_content);
add_update_entry(
&mut update_entries,
&mut state_changes,
&mut statuses,
incoming_content,
);
}
Some(mut content) => {
// No new content state change is needed since content is identical.
Expand All @@ -884,6 +907,7 @@ impl App {
content,
previous_parent,
});
statuses.push(CreateContentStatus::Duplicate);
}
};
}
Expand All @@ -900,7 +924,7 @@ impl App {
.await
.map_err(|e| anyhow!("unable to create new content metadata: {}", e.to_string()))?;

Ok(())
Ok(statuses)
}

async fn tombstone_content_root_batch(
Expand Down

0 comments on commit a095556

Please sign in to comment.