diff --git a/iceberg-rest-catalog/src/apis/catalog_api_api.rs b/iceberg-rest-catalog/src/apis/catalog_api_api.rs index 1b0169a0..f442d6f7 100644 --- a/iceberg-rest-catalog/src/apis/catalog_api_api.rs +++ b/iceberg-rest-catalog/src/apis/catalog_api_api.rs @@ -404,9 +404,7 @@ where } let req = req_builder.build()?; - dbg!(&req); let resp = client.execute(req).await?; - dbg!(&resp); let status = resp.status(); let content = resp.text().await?; @@ -425,6 +423,70 @@ where } } +async fn fetch_empty( + configuration: &configuration::Configuration, + method: reqwest::Method, + prefix: Option<&str>, + uri_str: &str, + request: &R, + headers: Option>, + query_params: Option>, +) -> Result<(), Error> +where + R: serde::Serialize + ?Sized, + E: for<'a> serde::Deserialize<'a>, +{ + let uri_base = match prefix { + Some(prefix) => format!( + "{}/v1/{prefix}/", + configuration.base_path, + prefix = crate::apis::urlencode(prefix) + ), + None => format!("{}/v1/", configuration.base_path,), + }; + let client = &configuration.client; + + let mut req_builder = client.request(method.clone(), &(uri_base + uri_str)); + + if let Some(ref user_agent) = configuration.user_agent { + req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone()); + } + if let Some(ref token) = configuration.oauth_access_token { + req_builder = req_builder.bearer_auth(token.to_owned()); + }; + if let Some(ref token) = configuration.bearer_access_token { + req_builder = req_builder.bearer_auth(token.to_owned()); + }; + for (key, value) in headers.unwrap_or_default() { + req_builder = req_builder.header(key, value); + } + for (key, value) in query_params.unwrap_or_default() { + req_builder = req_builder.query(&[(key, value)]); + } + if let &reqwest::Method::POST | &reqwest::Method::PUT = &method { + req_builder = req_builder.json(request); + } + + let req = req_builder.build()?; + let resp = client.execute(req).await?; + + let status = resp.status(); + let content = resp.text().await?; + dbg!(&content); + + if !status.is_client_error() && !status.is_server_error() { + Ok(()) + } else { + let entity: Option = serde_json::from_str(&content).ok(); + let error = ResponseContent { + status, + content, + entity, + }; + Err(Error::ResponseError(error)) + } +} + pub async fn commit_transaction( configuration: &configuration::Configuration, prefix: Option<&str>, @@ -433,7 +495,7 @@ pub async fn commit_transaction( let uri_str = format!("transactions/commit"); let method = reqwest::Method::POST; - fetch( + fetch_empty( configuration, method, prefix, @@ -537,7 +599,7 @@ pub async fn drop_namespace( let method = reqwest::Method::DELETE; - fetch(configuration, method, prefix, &uri_str, &(), None, None).await + fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await } /// Remove a table from the catalog @@ -560,7 +622,7 @@ pub async fn drop_table( ); let method = reqwest::Method::DELETE; - fetch( + fetch_empty( configuration, method, prefix, @@ -586,7 +648,7 @@ pub async fn drop_view( ); let method = reqwest::Method::DELETE; - fetch(configuration, method, prefix, &uri_str, &(), None, None).await + fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await } /// List all namespaces at a certain level, optionally starting from a given parent namespace. If table accounting.tax.paid.info exists, using 'SELECT NAMESPACE IN accounting' would translate into `GET /namespaces?parent=accounting` and must return a namespace, [\"accounting\", \"tax\"] only. Using 'SELECT NAMESPACE IN accounting.tax' would translate into `GET /namespaces?parent=accounting%1Ftax` and must return a namespace, [\"accounting\", \"tax\", \"paid\"]. If `parent` is not provided, all top-level namespaces should be listed. @@ -777,7 +839,7 @@ pub async fn namespace_exists( let method = reqwest::Method::HEAD; - fetch(configuration, method, prefix, &uri_str, &(), None, None).await + fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await } /// Register a table using given metadata file location. @@ -815,7 +877,7 @@ pub async fn rename_table( let uri_str = format!("tables/rename",); let method = reqwest::Method::POST; - fetch( + fetch_empty( configuration, method, prefix, @@ -836,7 +898,7 @@ pub async fn rename_view( let uri_str = format!("views/rename",); let method = reqwest::Method::POST; - fetch( + fetch_empty( configuration, method, prefix, @@ -889,7 +951,7 @@ pub async fn report_metrics( ); let method = reqwest::Method::POST; - fetch( + fetch_empty( configuration, method, prefix, @@ -916,7 +978,7 @@ pub async fn table_exists( let method = reqwest::Method::HEAD; - fetch(configuration, method, prefix, &uri_str, &(), None, None).await + fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await } /// Set and/or remove properties on a namespace. The request body specifies a list of properties to remove and a map of key value pairs to update. Properties that are not in the request are not modified or removed by this call. Server implementations are not required to support namespace properties. @@ -986,5 +1048,5 @@ pub async fn view_exists( let method = reqwest::Method::HEAD; - fetch(configuration, method, prefix, &uri_str, &(), None, None).await + fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await } diff --git a/iceberg-rest-catalog/src/catalog.rs b/iceberg-rest-catalog/src/catalog.rs index ddf59764..bc91c5de 100644 --- a/iceberg-rest-catalog/src/catalog.rs +++ b/iceberg-rest-catalog/src/catalog.rs @@ -152,15 +152,7 @@ impl Catalog for RestCatalog { ) .await .map_err(Into::::into)?; - let tables = tables - .identifiers - .unwrap_or(Vec::new()) - .into_iter() - .map(|x| { - let mut vec = x.namespace().to_vec(); - vec.push(x.name().to_string()); - Identifier::try_new(&vec) - }); + let tables = tables.identifiers.unwrap_or(Vec::new()).into_iter(); let views = catalog_api_api::list_views( &self.configuration, self.name.as_deref(), @@ -170,17 +162,12 @@ impl Catalog for RestCatalog { ) .await .map_err(Into::::into)?; - views + Ok(views .identifiers .unwrap_or(Vec::new()) .into_iter() - .map(|x| { - let mut vec = x.namespace().to_vec(); - vec.push(x.name().to_string()); - Identifier::try_new(&vec) - }) .chain(tables) - .collect() + .collect()) } /// Lists all namespaces in the catalog. async fn list_namespaces(&self, parent: Option<&str>) -> Result, Error> { @@ -508,7 +495,7 @@ pub mod tests { api_key: None, } } - // #[tokio::test] + #[tokio::test] async fn test_create_update_drop_table() { let container = GenericImage::new("tabulario/iceberg-rest", "latest") .with_wait_for(WaitFor::StdOutMessage { @@ -529,7 +516,6 @@ pub mod tests { .expect("Failed to create namespace"); let identifier = Identifier::parse("public.test").unwrap(); - dbg!(&identifier); let schema = Schema::builder() .with_schema_id(1) @@ -564,28 +550,21 @@ pub mod tests { .current_schema_id(1); let mut table = builder.build().await.expect("Failed to create table."); - let exists = Arc::clone(&catalog) - .tabular_exists(&identifier) - .await - .expect("Table doesn't exist"); - assert!(exists); - let tables = catalog .clone() .list_tabulars( - &Namespace::try_new(&["load_table".to_owned()]) - .expect("Failed to create namespace"), + &Namespace::try_new(&["public".to_owned()]).expect("Failed to create namespace"), ) .await .expect("Failed to list Tables"); - assert_eq!(tables[0].to_string(), "load_table.test".to_owned()); + assert_eq!(tables[0].to_string(), "public.test".to_owned()); let namespaces = catalog .clone() .list_namespaces(None) .await .expect("Failed to list namespaces"); - assert_eq!(namespaces[0].to_string(), "load_table"); + assert_eq!(namespaces[0].to_string(), "public"); let transaction = table.new_transaction(None); transaction.commit().await.expect("Transaction failed."); @@ -594,11 +573,5 @@ pub mod tests { .drop_table(&identifier) .await .expect("Failed to drop table."); - - let exists = Arc::clone(&catalog) - .tabular_exists(&identifier) - .await - .expect("Table exists failed"); - assert!(!exists); } } diff --git a/iceberg-rest-catalog/src/models/list_tables_response.rs b/iceberg-rest-catalog/src/models/list_tables_response.rs index 1b110220..702cf220 100644 --- a/iceberg-rest-catalog/src/models/list_tables_response.rs +++ b/iceberg-rest-catalog/src/models/list_tables_response.rs @@ -34,3 +34,65 @@ impl ListTablesResponse { } } } +#[cfg(test)] +mod tests { + use super::*; + use iceberg_rust::catalog::identifier::Identifier; + use serde_json::{json, Value}; + + #[test] + fn test_list_tables_response_serialization() { + let identifiers = vec![ + Identifier::parse("db.table1").unwrap(), + Identifier::parse("db.table2").unwrap(), + ]; + + let response = ListTablesResponse { + next_page_token: None, + identifiers: Some(identifiers), + }; + + let expected_json = json!({ + "identifiers": [ + { + "namespace": ["db"], + "name": "table1" + }, + { + "namespace": ["db"], + "name": "table2" + } + ] + }); + + let serialized = serde_json::to_value(&response).unwrap(); + assert_eq!(serialized, expected_json); + } + + #[test] + fn test_list_tables_response_deserialization() { + let json_value = json!({ + "identifiers": [ + { + "namespace": ["db"], + "name": "table1" + }, + { + "namespace": ["db"], + "name": "table2" + } + ] + }); + + let expected_response = ListTablesResponse { + next_page_token: None, + identifiers: Some(vec![ + Identifier::parse("db.table1").unwrap(), + Identifier::parse("db.table2").unwrap(), + ]), + }; + + let deserialized: ListTablesResponse = serde_json::from_value(json_value).unwrap(); + assert_eq!(deserialized, expected_response); + } +} diff --git a/iceberg-rust/src/catalog/identifier.rs b/iceberg-rust/src/catalog/identifier.rs index 924a183d..4e4de82c 100644 --- a/iceberg-rust/src/catalog/identifier.rs +++ b/iceberg-rust/src/catalog/identifier.rs @@ -16,7 +16,6 @@ pub static SEPARATOR: &str = "."; ///Identifies a table in an iceberg catalog. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Identifier { - #[serde(flatten)] namespace: Namespace, name: String, } diff --git a/iceberg-rust/src/catalog/namespace.rs b/iceberg-rust/src/catalog/namespace.rs index 3441e7c9..3de6e327 100644 --- a/iceberg-rust/src/catalog/namespace.rs +++ b/iceberg-rust/src/catalog/namespace.rs @@ -67,6 +67,8 @@ impl Display for Namespace { #[cfg(test)] mod tests { + use serde_json::{json, Value}; + use super::Namespace; #[test] @@ -84,4 +86,29 @@ mod tests { fn test_empty() { let _ = Namespace::try_new(&["".to_string(), "level2".to_string()]).unwrap(); } + + #[test] + fn test_namespace_serialization() { + let namespace = Namespace(vec!["foo".to_string(), "bar".to_string()]); + let serialized = serde_json::to_string(&namespace).unwrap(); + assert_eq!(serialized, r#"["foo","bar"]"#); + } + + #[test] + fn test_namespace_deserialization() { + let json_value: Value = json!(["foo", "bar"]); + let namespace: Namespace = serde_json::from_value(json_value).unwrap(); + assert_eq!( + namespace, + Namespace(vec!["foo".to_string(), "bar".to_string()]) + ); + } + + #[test] + fn test_namespace_roundtrip() { + let original = Namespace(vec!["foo".to_string(), "bar".to_string()]); + let serialized = serde_json::to_string(&original).unwrap(); + let deserialized: Namespace = serde_json::from_str(&serialized).unwrap(); + assert_eq!(original, deserialized); + } }