Skip to content

Commit

Permalink
test for rest catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed May 22, 2024
1 parent 2a1b24c commit 5a8e4c8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 47 deletions.
86 changes: 74 additions & 12 deletions iceberg-rest-catalog/src/apis/catalog_api_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,7 @@ where
}

let req = req_builder.build()?;
dbg!(&req);
let resp = client.execute(req).await?;
dbg!(&resp);

let status = resp.status();
let content = resp.text().await?;
Expand All @@ -425,6 +423,70 @@ where
}
}

async fn fetch_empty<R, E>(
configuration: &configuration::Configuration,
method: reqwest::Method,
prefix: Option<&str>,
uri_str: &str,
request: &R,
headers: Option<HashMap<String, String>>,
query_params: Option<HashMap<String, String>>,
) -> Result<(), Error<E>>
where
R: serde::Serialize + ?Sized,
E: for<'a> serde::Deserialize<'a>,
{
let uri_base = match prefix {
Some(prefix) => format!(
"{}/v1/{prefix}/",
configuration.base_path,
prefix = crate::apis::urlencode(prefix)
),
None => format!("{}/v1/", configuration.base_path,),
};
let client = &configuration.client;

let mut req_builder = client.request(method.clone(), &(uri_base + uri_str));

if let Some(ref user_agent) = configuration.user_agent {
req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone());
}
if let Some(ref token) = configuration.oauth_access_token {
req_builder = req_builder.bearer_auth(token.to_owned());
};
if let Some(ref token) = configuration.bearer_access_token {
req_builder = req_builder.bearer_auth(token.to_owned());
};
for (key, value) in headers.unwrap_or_default() {
req_builder = req_builder.header(key, value);
}
for (key, value) in query_params.unwrap_or_default() {
req_builder = req_builder.query(&[(key, value)]);
}
if let &reqwest::Method::POST | &reqwest::Method::PUT = &method {
req_builder = req_builder.json(request);
}

let req = req_builder.build()?;
let resp = client.execute(req).await?;

let status = resp.status();
let content = resp.text().await?;
dbg!(&content);

if !status.is_client_error() && !status.is_server_error() {
Ok(())
} else {
let entity: Option<E> = serde_json::from_str(&content).ok();
let error = ResponseContent {
status,
content,
entity,
};
Err(Error::ResponseError(error))
}
}

pub async fn commit_transaction(
configuration: &configuration::Configuration,
prefix: Option<&str>,
Expand All @@ -433,7 +495,7 @@ pub async fn commit_transaction(
let uri_str = format!("transactions/commit");
let method = reqwest::Method::POST;

fetch(
fetch_empty(
configuration,
method,
prefix,
Expand Down Expand Up @@ -537,7 +599,7 @@ pub async fn drop_namespace(

let method = reqwest::Method::DELETE;

fetch(configuration, method, prefix, &uri_str, &(), None, None).await
fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await
}

/// Remove a table from the catalog
Expand All @@ -560,7 +622,7 @@ pub async fn drop_table(
);
let method = reqwest::Method::DELETE;

fetch(
fetch_empty(
configuration,
method,
prefix,
Expand All @@ -586,7 +648,7 @@ pub async fn drop_view(
);
let method = reqwest::Method::DELETE;

fetch(configuration, method, prefix, &uri_str, &(), None, None).await
fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await
}

/// List all namespaces at a certain level, optionally starting from a given parent namespace. If table accounting.tax.paid.info exists, using 'SELECT NAMESPACE IN accounting' would translate into `GET /namespaces?parent=accounting` and must return a namespace, [\"accounting\", \"tax\"] only. Using 'SELECT NAMESPACE IN accounting.tax' would translate into `GET /namespaces?parent=accounting%1Ftax` and must return a namespace, [\"accounting\", \"tax\", \"paid\"]. If `parent` is not provided, all top-level namespaces should be listed.
Expand Down Expand Up @@ -777,7 +839,7 @@ pub async fn namespace_exists(

let method = reqwest::Method::HEAD;

fetch(configuration, method, prefix, &uri_str, &(), None, None).await
fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await
}

/// Register a table using given metadata file location.
Expand Down Expand Up @@ -815,7 +877,7 @@ pub async fn rename_table(
let uri_str = format!("tables/rename",);
let method = reqwest::Method::POST;

fetch(
fetch_empty(
configuration,
method,
prefix,
Expand All @@ -836,7 +898,7 @@ pub async fn rename_view(
let uri_str = format!("views/rename",);
let method = reqwest::Method::POST;

fetch(
fetch_empty(
configuration,
method,
prefix,
Expand Down Expand Up @@ -889,7 +951,7 @@ pub async fn report_metrics(
);
let method = reqwest::Method::POST;

fetch(
fetch_empty(
configuration,
method,
prefix,
Expand All @@ -916,7 +978,7 @@ pub async fn table_exists(

let method = reqwest::Method::HEAD;

fetch(configuration, method, prefix, &uri_str, &(), None, None).await
fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await
}

/// Set and/or remove properties on a namespace. The request body specifies a list of properties to remove and a map of key value pairs to update. Properties that are not in the request are not modified or removed by this call. Server implementations are not required to support namespace properties.
Expand Down Expand Up @@ -986,5 +1048,5 @@ pub async fn view_exists(

let method = reqwest::Method::HEAD;

fetch(configuration, method, prefix, &uri_str, &(), None, None).await
fetch_empty(configuration, method, prefix, &uri_str, &(), None, None).await
}
41 changes: 7 additions & 34 deletions iceberg-rest-catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,7 @@ impl Catalog for RestCatalog {
)
.await
.map_err(Into::<Error>::into)?;
let tables = tables
.identifiers
.unwrap_or(Vec::new())
.into_iter()
.map(|x| {
let mut vec = x.namespace().to_vec();
vec.push(x.name().to_string());
Identifier::try_new(&vec)
});
let tables = tables.identifiers.unwrap_or(Vec::new()).into_iter();
let views = catalog_api_api::list_views(
&self.configuration,
self.name.as_deref(),
Expand All @@ -170,17 +162,12 @@ impl Catalog for RestCatalog {
)
.await
.map_err(Into::<Error>::into)?;
views
Ok(views
.identifiers
.unwrap_or(Vec::new())
.into_iter()
.map(|x| {
let mut vec = x.namespace().to_vec();
vec.push(x.name().to_string());
Identifier::try_new(&vec)
})
.chain(tables)
.collect()
.collect())
}
/// Lists all namespaces in the catalog.
async fn list_namespaces(&self, parent: Option<&str>) -> Result<Vec<Namespace>, Error> {
Expand Down Expand Up @@ -508,7 +495,7 @@ pub mod tests {
api_key: None,
}
}
// #[tokio::test]
#[tokio::test]
async fn test_create_update_drop_table() {
let container = GenericImage::new("tabulario/iceberg-rest", "latest")
.with_wait_for(WaitFor::StdOutMessage {
Expand All @@ -529,7 +516,6 @@ pub mod tests {
.expect("Failed to create namespace");

let identifier = Identifier::parse("public.test").unwrap();
dbg!(&identifier);

let schema = Schema::builder()
.with_schema_id(1)
Expand Down Expand Up @@ -564,28 +550,21 @@ pub mod tests {
.current_schema_id(1);
let mut table = builder.build().await.expect("Failed to create table.");

let exists = Arc::clone(&catalog)
.tabular_exists(&identifier)
.await
.expect("Table doesn't exist");
assert!(exists);

let tables = catalog
.clone()
.list_tabulars(
&Namespace::try_new(&["load_table".to_owned()])
.expect("Failed to create namespace"),
&Namespace::try_new(&["public".to_owned()]).expect("Failed to create namespace"),
)
.await
.expect("Failed to list Tables");
assert_eq!(tables[0].to_string(), "load_table.test".to_owned());
assert_eq!(tables[0].to_string(), "public.test".to_owned());

let namespaces = catalog
.clone()
.list_namespaces(None)
.await
.expect("Failed to list namespaces");
assert_eq!(namespaces[0].to_string(), "load_table");
assert_eq!(namespaces[0].to_string(), "public");

let transaction = table.new_transaction(None);
transaction.commit().await.expect("Transaction failed.");
Expand All @@ -594,11 +573,5 @@ pub mod tests {
.drop_table(&identifier)
.await
.expect("Failed to drop table.");

let exists = Arc::clone(&catalog)
.tabular_exists(&identifier)
.await
.expect("Table exists failed");
assert!(!exists);
}
}
62 changes: 62 additions & 0 deletions iceberg-rest-catalog/src/models/list_tables_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,65 @@ impl ListTablesResponse {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use iceberg_rust::catalog::identifier::Identifier;
use serde_json::{json, Value};

#[test]
fn test_list_tables_response_serialization() {
let identifiers = vec![
Identifier::parse("db.table1").unwrap(),
Identifier::parse("db.table2").unwrap(),
];

let response = ListTablesResponse {
next_page_token: None,
identifiers: Some(identifiers),
};

let expected_json = json!({
"identifiers": [
{
"namespace": ["db"],
"name": "table1"
},
{
"namespace": ["db"],
"name": "table2"
}
]
});

let serialized = serde_json::to_value(&response).unwrap();
assert_eq!(serialized, expected_json);
}

#[test]
fn test_list_tables_response_deserialization() {
let json_value = json!({
"identifiers": [
{
"namespace": ["db"],
"name": "table1"
},
{
"namespace": ["db"],
"name": "table2"
}
]
});

let expected_response = ListTablesResponse {
next_page_token: None,
identifiers: Some(vec![
Identifier::parse("db.table1").unwrap(),
Identifier::parse("db.table2").unwrap(),
]),
};

let deserialized: ListTablesResponse = serde_json::from_value(json_value).unwrap();
assert_eq!(deserialized, expected_response);
}
}
1 change: 0 additions & 1 deletion iceberg-rust/src/catalog/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub static SEPARATOR: &str = ".";
///Identifies a table in an iceberg catalog.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Identifier {
#[serde(flatten)]
namespace: Namespace,
name: String,
}
Expand Down
27 changes: 27 additions & 0 deletions iceberg-rust/src/catalog/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl Display for Namespace {
#[cfg(test)]

mod tests {
use serde_json::{json, Value};

use super::Namespace;

#[test]
Expand All @@ -84,4 +86,29 @@ mod tests {
fn test_empty() {
let _ = Namespace::try_new(&["".to_string(), "level2".to_string()]).unwrap();
}

#[test]
fn test_namespace_serialization() {
let namespace = Namespace(vec!["foo".to_string(), "bar".to_string()]);
let serialized = serde_json::to_string(&namespace).unwrap();
assert_eq!(serialized, r#"["foo","bar"]"#);
}

#[test]
fn test_namespace_deserialization() {
let json_value: Value = json!(["foo", "bar"]);
let namespace: Namespace = serde_json::from_value(json_value).unwrap();
assert_eq!(
namespace,
Namespace(vec!["foo".to_string(), "bar".to_string()])
);
}

#[test]
fn test_namespace_roundtrip() {
let original = Namespace(vec!["foo".to_string(), "bar".to_string()]);
let serialized = serde_json::to_string(&original).unwrap();
let deserialized: Namespace = serde_json::from_str(&serialized).unwrap();
assert_eq!(original, deserialized);
}
}

0 comments on commit 5a8e4c8

Please sign in to comment.