From a3eae380dcd405758a7993cc61a87916436bf68e Mon Sep 17 00:00:00 2001 From: Edwin Kys Date: Wed, 5 Jun 2024 12:32:09 -0500 Subject: [PATCH] feat: implement tryfrom for api types --- crates/indexify_internal_api/src/lib.rs | 88 +++++++++++++---------- src/api.rs | 94 +++++++++++++++---------- src/coordinator.rs | 17 +++-- src/coordinator_service.rs | 84 +++++++++++++++++----- src/data_manager.rs | 39 +++++----- src/ingest_extracted_content.rs | 12 ++-- 6 files changed, 212 insertions(+), 122 deletions(-) diff --git a/crates/indexify_internal_api/src/lib.rs b/crates/indexify_internal_api/src/lib.rs index a6c53d6e7..7f3a98b9b 100644 --- a/crates/indexify_internal_api/src/lib.rs +++ b/crates/indexify_internal_api/src/lib.rs @@ -29,18 +29,21 @@ pub struct ExtractionGraph { pub extraction_policies: Vec, } -impl From for indexify_coordinator::ExtractionGraph { - fn from(value: ExtractionGraph) -> Self { - Self { +impl TryFrom for indexify_coordinator::ExtractionGraph { + type Error = anyhow::Error; + + fn try_from(value: ExtractionGraph) -> Result { + let mut extraction_policies = vec![]; + for policy in value.extraction_policies { + let policy = indexify_coordinator::ExtractionPolicy::try_from(policy)?; + extraction_policies.push(policy); + } + Ok(Self { id: value.id, name: value.name, namespace: value.namespace, - extraction_policies: value - .extraction_policies - .into_iter() - .map(|p| p.into()) - .collect(), - } + extraction_policies, + }) } } @@ -478,21 +481,23 @@ impl Display for Task { } } -impl From for indexify_coordinator::Task { - fn from(value: Task) -> Self { +impl TryFrom for indexify_coordinator::Task { + type Error = anyhow::Error; + + fn try_from(value: Task) -> Result { let outcome: indexify_coordinator::TaskOutcome = value.outcome.into(); - indexify_coordinator::Task { + Ok(indexify_coordinator::Task { id: value.id, extractor: value.extractor, namespace: value.namespace, - content_metadata: Some(value.content_metadata.into()), + content_metadata: Some(value.content_metadata.try_into()?), input_params: value.input_params.to_string(), extraction_policy_id: value.extraction_policy_id, extraction_graph_name: value.extraction_graph_name, output_index_mapping: value.output_index_table_mapping, outcome: outcome as i32, index_tables: value.index_tables, - } + }) } } @@ -637,13 +642,15 @@ pub struct ExtractionPolicy { pub content_source: ExtractionPolicyContentSource, } -impl From for indexify_coordinator::ExtractionPolicy { - fn from(value: ExtractionPolicy) -> Self { +impl TryFrom for indexify_coordinator::ExtractionPolicy { + type Error = anyhow::Error; + + fn try_from(value: ExtractionPolicy) -> Result { let filters = utils::convert_map_serde_to_prost_json(value.filters) .map_err(|e| anyhow!("unable to convert to protobuff JSON value: {e:?}")) .unwrap(); - Self { + Ok(Self { id: value.id, extractor: value.extractor, name: value.name, @@ -652,7 +659,7 @@ impl From for indexify_coordinator::ExtractionPolicy { content_source: value.content_source.into(), graph_name: value.graph_name, output_table_mapping: value.output_table_mapping, - } + }) } } @@ -918,13 +925,15 @@ impl ContentMetadata { } } -impl From for indexify_coordinator::ContentMetadata { - fn from(value: ContentMetadata) -> Self { +impl TryFrom for indexify_coordinator::ContentMetadata { + type Error = anyhow::Error; + + fn try_from(value: ContentMetadata) -> Result { let labels = utils::convert_map_serde_to_prost_json(value.labels) .map_err(|e| anyhow!("unable to convert to protobuff JSON value: {e:?}")) .unwrap(); - Self { + Ok(Self { id: value.id.id, // don't expose the version on the task parent_id: value.parent_id.map(|id| id.id).unwrap_or_default(), /* don't expose the * version on the @@ -941,15 +950,15 @@ impl From for indexify_coordinator::ContentMetadata { hash: value.hash, extraction_policy_ids: value.extraction_policy_ids, extraction_graph_names: value.extraction_graph_names, - } + }) } } -impl From for ContentMetadata { - fn from(value: indexify_coordinator::ContentMetadata) -> Self { - let labels = utils::convert_map_prost_to_serde_json(value.labels) - .map_err(|e| anyhow!("unable to convert to serde JSON value: {e:?}")) - .unwrap(); +impl TryFrom for ContentMetadata { + type Error = anyhow::Error; + + fn try_from(value: indexify_coordinator::ContentMetadata) -> Result { + let labels = utils::convert_map_prost_to_serde_json(value.labels)?; let root_content_id = if value.root_content_id.is_empty() { None @@ -961,7 +970,7 @@ impl From for ContentMetadata { } else { Some(ContentMetadataId::new(&value.parent_id)) }; - Self { + Ok(Self { id: ContentMetadataId { id: value.id, version: 1, @@ -981,7 +990,7 @@ impl From for ContentMetadata { hash: value.hash, extraction_policy_ids: value.extraction_policy_ids, extraction_graph_names: value.extraction_graph_names, - } + }) } } @@ -1069,16 +1078,19 @@ pub struct Namespace { pub extraction_graphs: Vec, } -impl From for indexify_coordinator::Namespace { - fn from(value: Namespace) -> Self { - indexify_coordinator::Namespace { - name: value.name, - extraction_graphs: value - .extraction_graphs - .into_iter() - .map(|g| g.into()) - .collect(), +impl TryFrom for indexify_coordinator::Namespace { + type Error = anyhow::Error; + + fn try_from(value: Namespace) -> Result { + let mut extraction_graphs = vec![]; + for graph in value.extraction_graphs { + extraction_graphs.push(graph.try_into()?) } + + Ok(indexify_coordinator::Namespace { + name: value.name, + extraction_graphs, + }) } } diff --git a/src/api.rs b/src/api.rs index 12fb599c5..53f3de882 100644 --- a/src/api.rs +++ b/src/api.rs @@ -24,18 +24,22 @@ pub struct ExtractionGraph { pub extraction_policies: Vec, } -impl From for ExtractionGraph { - fn from(value: indexify_coordinator::ExtractionGraph) -> Self { - Self { +impl TryFrom for ExtractionGraph { + type Error = anyhow::Error; + + fn try_from(value: indexify_coordinator::ExtractionGraph) -> Result { + let mut extraction_policies = vec![]; + for policy in value.extraction_policies { + let policy: ExtractionPolicy = policy.try_into()?; + extraction_policies.push(policy); + } + + Ok(Self { id: value.namespace.clone(), namespace: value.namespace, name: value.name, - extraction_policies: value - .extraction_policies - .into_iter() - .map(Into::into) - .collect(), - } + extraction_policies, + }) } } @@ -51,13 +55,12 @@ pub struct ExtractionPolicy { pub graph_name: String, } -impl From for ExtractionPolicy { - fn from(value: indexify_coordinator::ExtractionPolicy) -> Self { - let filters_eq = internal_api::utils::convert_map_prost_to_serde_json(value.filters) - .map_err(|e| anyhow!("unable to convert filters to serde JSON: {e:?}",)) - .unwrap(); +impl TryFrom for ExtractionPolicy { + type Error = anyhow::Error; - Self { + fn try_from(value: indexify_coordinator::ExtractionPolicy) -> Result { + let filters_eq = internal_api::utils::convert_map_prost_to_serde_json(value.filters)?; + Ok(Self { id: value.id, extractor: value.extractor, name: value.name, @@ -65,7 +68,7 @@ impl From for ExtractionPolicy { input_params: Some(serde_json::from_str(&value.input_params).unwrap()), content_source: Some(value.content_source), graph_name: value.graph_name, - } + }) } } @@ -75,16 +78,23 @@ pub struct DataNamespace { pub extraction_graphs: Vec, } -impl From for DataNamespace { - fn from(value: indexify_coordinator::Namespace) -> Self { - Self { +impl TryFrom for DataNamespace { + type Error = anyhow::Error; + + fn try_from(value: indexify_coordinator::Namespace) -> Result { + let extraction_graphs = { + let mut graphs = vec![]; + for graph in value.extraction_graphs { + let graph: ExtractionGraph = graph.try_into()?; + graphs.push(graph); + } + graphs + }; + + Ok(Self { name: value.name, - extraction_graphs: value - .extraction_graphs - .into_iter() - .map(Into::into) - .collect(), - } + extraction_graphs, + }) } } @@ -421,13 +431,13 @@ pub struct ContentMetadata { pub hash: String, } -impl From for ContentMetadata { - fn from(value: indexify_coordinator::ContentMetadata) -> Self { - let labels = internal_api::utils::convert_map_prost_to_serde_json(value.labels) - .map_err(|e| anyhow!("unable to convert labels to serde JSON: {e:?}",)) - .unwrap(); +impl TryFrom for ContentMetadata { + type Error = anyhow::Error; - Self { + fn try_from(value: indexify_coordinator::ContentMetadata) -> Result { + let labels = internal_api::utils::convert_map_prost_to_serde_json(value.labels)?; + + Ok(Self { id: value.id, parent_id: value.parent_id, root_content_id: value.root_content_id, @@ -441,7 +451,7 @@ impl From for ContentMetadata { size: value.size_bytes, hash: value.hash, extraction_graph_names: value.extraction_graph_names, - } + }) } } @@ -478,21 +488,27 @@ pub struct Task { pub index_tables: Vec, } -impl From for Task { - fn from(value: indexify_coordinator::Task) -> Self { - Self { +impl TryFrom for Task { + type Error = anyhow::Error; + + fn try_from(value: indexify_coordinator::Task) -> Result { + let content_metadata = if let Some(metadata) = value.content_metadata { + metadata.try_into()? + } else { + ContentMetadata::default() + }; + + Ok(Self { id: value.id, extractor: value.extractor, extraction_policy_id: value.extraction_policy_id, output_index_table_mapping: value.output_index_mapping, namespace: value.namespace, - content_metadata: value - .content_metadata - .map_or_else(Default::default, Into::into), // EGTODO: Is this correct? + content_metadata, input_params: serde_json::Value::String(value.input_params), outcome: value.outcome, // EGTODO: Is it correct to just return i32 for value outcome? index_tables: value.index_tables, - } + }) } } diff --git a/src/coordinator.rs b/src/coordinator.rs index 658c563e8..9d8ab28d1 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -64,7 +64,7 @@ impl Coordinator { ) -> Result> { let mut content_meta_list = Vec::new(); for content in content_list { - let content: indexify_coordinator::ContentMetadata = content.into(); + let content: indexify_coordinator::ContentMetadata = content.try_into()?; content_meta_list.push(content.clone()); } Ok(content_meta_list) @@ -73,8 +73,13 @@ impl Coordinator { pub fn external_content_metadata_to_internal( &self, content_list: Vec, - ) -> Vec { - content_list.into_iter().map(|v| v.into()).collect() + ) -> Result> { + let mut contents = vec![]; + for content in content_list { + let content: internal_api::ContentMetadata = content.try_into()?; + contents.push(content.clone()); + } + Ok(contents) } pub async fn list_content( @@ -183,7 +188,7 @@ impl Coordinator { .await?; let tasks = tasks .into_iter() - .map(|task| -> Result { Ok(task.into()) }) + .map(|task| -> Result { Ok(task.try_into()?) }) .collect::>>()?; Ok(tasks) } @@ -207,7 +212,7 @@ impl Coordinator { .await?; let tasks = tasks .into_iter() - .map(|task| -> Result { Ok(task.into()) }) + .map(|task| -> Result { Ok(task.try_into()?) }) .collect::>>()?; Ok(tasks) } @@ -316,7 +321,7 @@ impl Coordinator { pub async fn get_task(&self, task_id: &str) -> Result { let task = self.shared_state.task_with_id(task_id).await?; - Ok(task.into()) + Ok(task.try_into()?) } pub async fn get_task_and_root_content( diff --git a/src/coordinator_service.rs b/src/coordinator_service.rs index 50d44f79e..bef0d31f0 100644 --- a/src/coordinator_service.rs +++ b/src/coordinator_service.rs @@ -211,7 +211,10 @@ impl CoordinatorService for CoordinatorServiceServer { .into_inner() .content .ok_or(tonic::Status::aborted("content is missing"))?; - let content_meta: indexify_internal_api::ContentMetadata = content_meta.into(); + let content_meta: indexify_internal_api::ContentMetadata = + content_meta.try_into().map_err(|e| { + tonic::Status::aborted(format!("unable to convert content metadata: {}", e)) + })?; let content_list = vec![content_meta]; let statuses = self .coordinator @@ -268,11 +271,19 @@ impl CoordinatorService for CoordinatorServiceServer { .coordinator .list_content(&req.namespace, &req.source, &req.parent_id, &labels_eq) .await - .map_err(|e| tonic::Status::aborted(e.to_string()))? - .into_iter() - .map(|c| c.into()) - .collect_vec(); - Ok(tonic::Response::new(ListContentResponse { content_list })) + .map_err(|e| tonic::Status::aborted(e.to_string()))?; + + let mut proto_content_list = vec![]; + for content in content_list { + let proto_content = content + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?; + proto_content_list.push(proto_content); + } + + Ok(tonic::Response::new(ListContentResponse { + content_list: proto_content_list, + })) } async fn list_active_contents( @@ -316,12 +327,14 @@ impl CoordinatorService for CoordinatorServiceServer { .create_extraction_graph(graph.clone()) .await .map_err(|e| tonic::Status::aborted(e.to_string()))?; - let policies = creation_result - .extraction_policies - .clone() - .into_iter() - .map(|p| (p.name.clone(), p.clone().into())) - .collect(); + let mut policies = HashMap::new(); + for policy in creation_result.extraction_policies { + let extraction_policy = policy + .clone() + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?; + policies.insert(policy.name.clone(), extraction_policy); + } let extractors: HashMap<_, _> = creation_result .extractors .iter() @@ -349,7 +362,11 @@ impl CoordinatorService for CoordinatorServiceServer { .get_extraction_policy(request.extraction_policy_id) .map_err(|e| tonic::Status::aborted(e.to_string()))?; Ok(tonic::Response::new(GetExtractionPolicyResponse { - policy: Some(extraction_policy.into()), + policy: Some( + extraction_policy + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?, + ), })) } @@ -363,7 +380,13 @@ impl CoordinatorService for CoordinatorServiceServer { .list_policies(&request.namespace) .await .map_err(|e| tonic::Status::aborted(e.to_string()))?; - let policies = extraction_policies.into_iter().map(|p| p.into()).collect(); + let mut policies = vec![]; + for policy in extraction_policies { + let extraction_policy = policy + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?; + policies.push(extraction_policy); + } Ok(tonic::Response::new(ListExtractionPoliciesResponse { policies, })) @@ -395,9 +418,17 @@ impl CoordinatorService for CoordinatorServiceServer { .list_namespaces() .await .map_err(|e| tonic::Status::aborted(e.to_string()))?; - let namespaces = namespaces.into_iter().map(|n| n.into()).collect(); + let mut proto_namespaces = vec![]; + for namespace in namespaces { + let proto_namespace = namespace.try_into().map_err(|e| { + tonic::Status::aborted(format!("unable to convert namespace: {}", e)) + })?; + proto_namespaces.push(proto_namespace); + } Ok(tonic::Response::new( - indexify_coordinator::ListNamespaceResponse { namespaces }, + indexify_coordinator::ListNamespaceResponse { + namespaces: proto_namespaces, + }, )) } @@ -415,7 +446,11 @@ impl CoordinatorService for CoordinatorServiceServer { Ok(tonic::Response::new( indexify_coordinator::GetNamespaceResponse { - namespace: Some(namespace.into()), + namespace: Some( + namespace + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?, + ), }, )) } @@ -805,10 +840,21 @@ impl CoordinatorService for CoordinatorServiceServer { .map_err(|e| tonic::Status::aborted(e.to_string()))?; let root_content: Option = - root_content.map(|c| c.into()); + if let Some(metadata) = root_content { + Some( + metadata + .try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?, + ) + } else { + None + }; Ok(Response::new(GetIngestionInfoResponse { - task: Some(task.into()), + task: Some( + task.try_into() + .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?, + ), root_content, })) } diff --git a/src/data_manager.rs b/src/data_manager.rs index 65cb4013d..b06bfbac1 100644 --- a/src/data_manager.rs +++ b/src/data_manager.rs @@ -85,7 +85,11 @@ impl DataManager { .into_iter() .map(|r| api::DataNamespace { name: r.name, - extraction_graphs: r.extraction_graphs.into_iter().map(Into::into).collect(), + extraction_graphs: r + .extraction_graphs + .into_iter() + .map(|g| g.try_into().unwrap()) + .collect(), }) .collect(); Ok(data_namespaces) @@ -119,7 +123,7 @@ impl DataManager { .await? .into_inner(); let namespace = response.namespace.ok_or(anyhow!("namespace not found"))?; - Ok(namespace.into()) + Ok(namespace.try_into()?) } pub async fn get_extraction_policy(&self, id: &str) -> Result { @@ -136,7 +140,7 @@ impl DataManager { let policy = resp .policy .ok_or_else(|| anyhow!("extraction policy not found"))?; - Ok(policy.into()) + Ok(policy.try_into()?) } pub async fn create_extraction_graph( @@ -254,12 +258,11 @@ impl DataManager { .await? .list_content(req) .await?; - let content_list = response - .into_inner() - .content_list - .into_iter() - .map(|c| c.into()) - .collect_vec(); + let mut content_list = vec![]; + for content in response.into_inner().content_list { + let content: api::ContentMetadata = content.try_into()?; + content_list.push(content); + } Ok(content_list) } @@ -454,7 +457,12 @@ impl DataManager { .get_content_metadata(req) .await? .into_inner(); - Ok(response.content_list.into_iter().map(Into::into).collect()) + let mut content_list = vec![]; + for content in response.content_list { + let content: api::ContentMetadata = content.try_into()?; + content_list.push(content); + } + Ok(content_list) } pub async fn get_content_tree_metadata( @@ -469,12 +477,11 @@ impl DataManager { .await? .get_content_tree_metadata(req) .await?; - let content_list: Vec = response - .into_inner() - .content_list - .into_iter() - .map(|content| content.into()) - .collect(); + let mut content_list: Vec = vec![]; + for content in response.into_inner().content_list { + let content: api::ContentMetadata = content.try_into()?; + content_list.push(content); + } Ok(content_list) } diff --git a/src/ingest_extracted_content.rs b/src/ingest_extracted_content.rs index d5b7f5a14..626bf6d76 100644 --- a/src/ingest_extracted_content.rs +++ b/src/ingest_extracted_content.rs @@ -77,7 +77,11 @@ impl ContentStateWriting { if task.content_metadata.is_none() { return Err(anyhow!("task does not have content metadata")); } - let root_content = root_content.map(|c| c.into()); + let root_content = if let Some(content) = root_content { + Some(content.try_into()?) + } else { + None + }; Ok(Self { ingest_metadata, task, @@ -169,7 +173,7 @@ impl ContentStateWriting { let root_content_metadata = self .root_content_metadata .clone() - .unwrap_or(self.task.content_metadata.clone().unwrap().into()); + .unwrap_or(self.task.content_metadata.clone().unwrap().try_into()?); let extraction_policy = state .data_manager .get_extraction_policy(&self.task.extraction_policy_id) @@ -512,7 +516,7 @@ mod tests { .unwrap(); let content_metadata = test_mock_content_metadata("1", "1", &eg.name); test_coordinator - .create_content(content_metadata.clone().into()) + .create_content(content_metadata.clone().try_into().unwrap()) .await .unwrap(); let internal_content_metadata = test_coordinator @@ -557,7 +561,7 @@ mod tests { content: indexify_coordinator::ContentMetadata, ) -> Result<()> { self.coordinator - .create_content_metadata(vec![content.into()]) + .create_content_metadata(vec![content.try_into()?]) .await .unwrap();