From b69c02a7417dd65828154673dec05f6588cb771c Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 16 Dec 2024 21:52:15 +0800 Subject: [PATCH 01/16] first commit on making s3tables --- crates/catalog/s3tables/Cargo.toml | 41 ++++++++++++++++++++++++++ crates/catalog/s3tables/src/catalog.rs | 0 crates/catalog/s3tables/src/lib.rs | 22 ++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 crates/catalog/s3tables/Cargo.toml create mode 100644 crates/catalog/s3tables/src/catalog.rs create mode 100644 crates/catalog/s3tables/src/lib.rs diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml new file mode 100644 index 000000000..0efa51d85 --- /dev/null +++ b/crates/catalog/s3tables/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-s3tables" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust S3Tables Catalog" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "sql", "catalog"] + +[dependencies] +async-trait = { workspace = true } +iceberg = { workspace = true } +serde_json = { workspace = true } +typed-builder = { workspace = true } +uuid = { workspace = true, features = ["v4"] } + +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +itertools = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/catalog/s3tables/src/lib.rs b/crates/catalog/s3tables/src/lib.rs new file mode 100644 index 000000000..082c290e3 --- /dev/null +++ b/crates/catalog/s3tables/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg s3tables catalog implementation. + +#![deny(missing_docs)] + +mod catalog; From a024733f5bae49949532c22cd0ffd388430f87cd Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 16 Dec 2024 22:03:06 +0800 Subject: [PATCH 02/16] add scaffold for todo --- crates/catalog/s3tables/src/catalog.rs | 85 ++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index e69de29bb..30f69a1ef 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use iceberg::table::Table; +use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; + +/// S3Tables catalog implementation. +#[derive(Debug)] +pub struct S3TablesCatalog {} + +impl S3TablesCatalog { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl Catalog for S3TablesCatalog { + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result> { + todo!() + } + + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap, + ) -> Result { + todo!() + } + + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn update_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap, + ) -> Result<()> { + todo!() + } + + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + todo!() + } + + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + todo!() + } + + async fn create_table( + &self, + namespace: &NamespaceIdent, + creation: TableCreation, + ) -> Result { + todo!() + } + + async fn load_table(&self, table: &TableIdent) -> Result
{ + todo!() + } + + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + todo!() + } + + async fn table_exists(&self, table: &TableIdent) -> Result { + todo!() + } + + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + todo!() + } + + async fn update_table(&self, commit: TableCommit) -> Result
{ + todo!() + } +} From 81b4a9bbe971525a82ef81fb935555e84ef859e3 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 16 Dec 2024 22:40:50 +0800 Subject: [PATCH 03/16] draft for s3tables --- crates/catalog/s3tables/Cargo.toml | 3 ++ crates/catalog/s3tables/src/catalog.rs | 50 +++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index 0efa51d85..bf4174bff 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -34,6 +34,9 @@ iceberg = { workspace = true } serde_json = { workspace = true } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } +aws-config = { workspace = true } +aws-sdk-s3tables = "1.0.0" +anyhow = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 30f69a1ef..202ce8cba 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -1,16 +1,29 @@ use std::collections::HashMap; +use anyhow::anyhow; use async_trait::async_trait; +use aws_config::BehaviorVersion; use iceberg::table::Table; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; /// S3Tables catalog implementation. #[derive(Debug)] -pub struct S3TablesCatalog {} +pub struct S3TablesCatalog { + table_bucket_arn: String, + client: aws_sdk_s3tables::Client, +} impl S3TablesCatalog { - pub fn new() -> Self { - Self {} + pub async fn new(table_bucket_arn: String) -> Self { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = aws_sdk_s3tables::Client::new(&config); + Self { + table_bucket_arn, + client, + } } } @@ -20,7 +33,24 @@ impl Catalog for S3TablesCatalog { &self, parent: Option<&NamespaceIdent>, ) -> Result> { - todo!() + let mut req = self + .client + .list_namespaces() + .table_bucket_arn(self.table_bucket_arn.clone()); + if let Some(parent) = parent { + req = req.prefix(parent.to_url_string()); + } + let resp = req.send().await.map_err(from_aws_sdk_error)?; + let mut result = Vec::new(); + for ns in resp.namespaces() { + let ns_names = ns.namespace(); + result.extend( + ns_names + .into_iter() + .map(|name| NamespaceIdent::new(name.to_string())), + ); + } + Ok(result) } async fn create_namespace( @@ -83,3 +113,13 @@ impl Catalog for S3TablesCatalog { todo!() } } + +/// Format AWS SDK error into iceberg error +pub(crate) fn from_aws_sdk_error(error: aws_sdk_s3tables::error::SdkError) -> Error +where T: std::fmt::Debug { + Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting aws skd error".to_string(), + ) + .with_source(anyhow!("aws sdk error: {:?}", error)) +} From 25f5e2ae270dc30714add55804ffc2deeb507ae4 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 16 Dec 2024 22:54:28 +0800 Subject: [PATCH 04/16] add list_namespaces --- crates/catalog/s3tables/src/catalog.rs | 35 ++++++++-------- crates/catalog/s3tables/src/lib.rs | 1 + crates/catalog/s3tables/src/utils.rs | 55 ++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 16 deletions(-) create mode 100644 crates/catalog/s3tables/src/utils.rs diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 202ce8cba..4b25e26ca 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -2,27 +2,35 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; -use aws_config::BehaviorVersion; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use crate::utils::create_sdk_config; + +#[derive(Debug)] +pub struct S3TablesCatalogConfig { + table_bucket_arn: String, + properties: HashMap, + endpoint_url: Option, +} + /// S3Tables catalog implementation. #[derive(Debug)] pub struct S3TablesCatalog { - table_bucket_arn: String, - client: aws_sdk_s3tables::Client, + config: S3TablesCatalogConfig, + s3tables_client: aws_sdk_s3tables::Client, } impl S3TablesCatalog { - pub async fn new(table_bucket_arn: String) -> Self { - let config = aws_config::load_defaults(BehaviorVersion::latest()).await; - let client = aws_sdk_s3tables::Client::new(&config); + pub async fn new(config: S3TablesCatalogConfig) -> Self { + let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()).await; + let s3tables_client = aws_sdk_s3tables::Client::new(&aws_config); Self { - table_bucket_arn, - client, + config, + s3tables_client, } } } @@ -34,21 +42,16 @@ impl Catalog for S3TablesCatalog { parent: Option<&NamespaceIdent>, ) -> Result> { let mut req = self - .client + .s3tables_client .list_namespaces() - .table_bucket_arn(self.table_bucket_arn.clone()); + .table_bucket_arn(self.config.table_bucket_arn.clone()); if let Some(parent) = parent { req = req.prefix(parent.to_url_string()); } let resp = req.send().await.map_err(from_aws_sdk_error)?; let mut result = Vec::new(); for ns in resp.namespaces() { - let ns_names = ns.namespace(); - result.extend( - ns_names - .into_iter() - .map(|name| NamespaceIdent::new(name.to_string())), - ); + result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?); } Ok(result) } diff --git a/crates/catalog/s3tables/src/lib.rs b/crates/catalog/s3tables/src/lib.rs index 082c290e3..b5e8b0c32 100644 --- a/crates/catalog/s3tables/src/lib.rs +++ b/crates/catalog/s3tables/src/lib.rs @@ -20,3 +20,4 @@ #![deny(missing_docs)] mod catalog; +mod utils; diff --git a/crates/catalog/s3tables/src/utils.rs b/crates/catalog/s3tables/src/utils.rs new file mode 100644 index 000000000..e84b7fb9a --- /dev/null +++ b/crates/catalog/s3tables/src/utils.rs @@ -0,0 +1,55 @@ +use std::collections::HashMap; + +use aws_config::default_provider::endpoint_url; +use aws_config::{BehaviorVersion, Region, SdkConfig}; +use aws_sdk_s3tables::config::Credentials; + +/// Property aws profile name +pub const AWS_PROFILE_NAME: &str = "profile_name"; +/// Property aws region +pub const AWS_REGION_NAME: &str = "region_name"; +/// Property aws access key +pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id"; +/// Property aws secret access key +pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; +/// Property aws session token +pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; + +/// Creates an aws sdk configuration based on +/// provided properties and an optional endpoint URL. +pub(crate) async fn create_sdk_config( + properties: &HashMap, + endpoint_url: Option, +) -> SdkConfig { + let mut config = aws_config::defaults(BehaviorVersion::latest()); + + if properties.is_empty() { + return config.load().await; + } + + if let Some(endpoint_url) = endpoint_url { + config = config.endpoint_url(endpoint_url); + } + + if let (Some(access_key), Some(secret_key)) = ( + properties.get(AWS_ACCESS_KEY_ID), + properties.get(AWS_SECRET_ACCESS_KEY), + ) { + let session_token = properties.get(AWS_SESSION_TOKEN).cloned(); + let credentials_provider = + Credentials::new(access_key, secret_key, session_token, None, "properties"); + + config = config.credentials_provider(credentials_provider) + }; + + if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) { + config = config.profile_name(profile_name); + } + + if let Some(region_name) = properties.get(AWS_REGION_NAME) { + let region = Region::new(region_name.clone()); + config = config.region(region); + } + + config.load().await +} From 866de6a08ef23d53b9735f344504439938106185 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Wed, 18 Dec 2024 14:34:09 +0800 Subject: [PATCH 05/16] add impl about more methods --- crates/catalog/s3tables/src/catalog.rs | 280 ++++++++++++++++++++++--- 1 file changed, 253 insertions(+), 27 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 4b25e26ca..0b2d18238 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -2,6 +2,10 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; +use aws_sdk_s3tables::operation::get_table::GetTableOutput; +use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; +use iceberg::io::FileIO; +use iceberg::spec::TableMetadata; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, @@ -22,16 +26,35 @@ pub struct S3TablesCatalogConfig { pub struct S3TablesCatalog { config: S3TablesCatalogConfig, s3tables_client: aws_sdk_s3tables::Client, + file_io: FileIO, } impl S3TablesCatalog { - pub async fn new(config: S3TablesCatalogConfig) -> Self { + pub async fn new(config: S3TablesCatalogConfig) -> Result { let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()).await; let s3tables_client = aws_sdk_s3tables::Client::new(&aws_config); - Self { + + // parse bucket name from ARN format like: arn:aws:s3:::bucket/ + let bucket_name = config + .table_bucket_arn + .rsplit(":bucket/") + .next() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid bucket ARN format: {}", config.table_bucket_arn), + ) + })?; + + let file_io = FileIO::from_path(&format!("s3://{}", bucket_name))? + .with_props(&config.properties) + .build()?; + + Ok(Self { config, s3tables_client, - } + file_io, + }) } } @@ -41,17 +64,27 @@ impl Catalog for S3TablesCatalog { &self, parent: Option<&NamespaceIdent>, ) -> Result> { - let mut req = self - .s3tables_client - .list_namespaces() - .table_bucket_arn(self.config.table_bucket_arn.clone()); - if let Some(parent) = parent { - req = req.prefix(parent.to_url_string()); - } - let resp = req.send().await.map_err(from_aws_sdk_error)?; let mut result = Vec::new(); - for ns in resp.namespaces() { - result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?); + let mut continuation_token = None; + loop { + let mut req = self + .s3tables_client + .list_namespaces() + .table_bucket_arn(self.config.table_bucket_arn.clone()); + if let Some(parent) = parent { + req = req.prefix(parent.to_url_string()); + } + if let Some(token) = continuation_token { + req = req.continuation_token(token); + } + let resp = req.send().await.map_err(from_aws_sdk_error)?; + for ns in resp.namespaces() { + result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?); + } + continuation_token = resp.continuation_token().map(|s| s.to_string()); + if continuation_token.is_none() { + break; + } } Ok(result) } @@ -59,17 +92,50 @@ impl Catalog for S3TablesCatalog { async fn create_namespace( &self, namespace: &NamespaceIdent, - properties: HashMap, + _properties: HashMap, ) -> Result { - todo!() + let req = self + .s3tables_client + .create_namespace() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()); + req.send().await.map_err(from_aws_sdk_error)?; + Ok(Namespace::with_properties( + namespace.clone(), + HashMap::new(), + )) } async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { - todo!() + let req = self + .s3tables_client + .get_namespace() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()); + let resp = req.send().await.map_err(from_aws_sdk_error)?; + let properties = HashMap::new(); + Ok(Namespace::with_properties( + NamespaceIdent::from_vec(resp.namespace().to_vec())?, + properties, + )) } async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { - todo!() + let req = self + .s3tables_client + .get_namespace() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()); + match req.send().await { + Ok(_) => Ok(true), + Err(err) => { + if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) { + Ok(false) + } else { + Err(from_aws_sdk_error(err)) + } + } + } } async fn update_namespace( @@ -77,15 +143,47 @@ impl Catalog for S3TablesCatalog { namespace: &NamespaceIdent, properties: HashMap, ) -> Result<()> { - todo!() + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Update namespace is not supported for s3tables catalog", + )) } async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { - todo!() + let req = self + .s3tables_client + .delete_namespace() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()); + req.send().await.map_err(from_aws_sdk_error)?; + Ok(()) } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - todo!() + let mut result = Vec::new(); + let mut continuation_token = None; + loop { + let mut req = self + .s3tables_client + .list_tables() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()); + if let Some(token) = continuation_token { + req = req.continuation_token(token); + } + let resp: ListTablesOutput = req.send().await.map_err(from_aws_sdk_error)?; + for table in resp.tables() { + result.push(TableIdent::new( + NamespaceIdent::from_vec(table.namespace().to_vec())?, + table.name().to_string(), + )); + } + continuation_token = resp.continuation_token().map(|s| s.to_string()); + if continuation_token.is_none() { + break; + } + } + Ok(result) } async fn create_table( @@ -96,24 +194,85 @@ impl Catalog for S3TablesCatalog { todo!() } - async fn load_table(&self, table: &TableIdent) -> Result
{ - todo!() + async fn load_table(&self, table_ident: &TableIdent) -> Result
{ + let req = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(table_ident.namespace().to_url_string()) + .name(table_ident.name()); + let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?; + + let metadata_location = resp.metadata_location().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Table {} does not have metadata location", + table_ident.name() + ), + ) + })?; + let input_file = self.file_io.new_input(&metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + + let table = Table::builder() + .identifier(table_ident.clone()) + .metadata(metadata) + .metadata_location(metadata_location) + .file_io(self.file_io.clone()) + .build()?; + Ok(table) } async fn drop_table(&self, table: &TableIdent) -> Result<()> { - todo!() + let req = self + .s3tables_client + .delete_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(table.namespace().to_url_string()) + .name(table.name()); + req.send().await.map_err(from_aws_sdk_error)?; + Ok(()) } - async fn table_exists(&self, table: &TableIdent) -> Result { - todo!() + async fn table_exists(&self, table_ident: &TableIdent) -> Result { + let req = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(table_ident.namespace().to_url_string()) + .name(table_ident.name()); + match req.send().await { + Ok(_) => Ok(true), + Err(err) => { + if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) { + Ok(false) + } else { + Err(from_aws_sdk_error(err)) + } + } + } } async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { - todo!() + let req = self + .s3tables_client + .rename_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(src.namespace().to_url_string()) + .name(src.name()) + .new_namespace_name(dest.namespace().to_url_string()) + .new_name(dest.name()); + req.send().await.map_err(from_aws_sdk_error)?; + Ok(()) } async fn update_table(&self, commit: TableCommit) -> Result
{ - todo!() + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) } } @@ -126,3 +285,70 @@ where T: std::fmt::Debug { ) .with_source(anyhow!("aws sdk error: {:?}", error)) } + +#[cfg(test)] +mod tests { + use super::*; + + async fn load_s3tables_catalog_from_env() -> Result> { + let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() { + Some(table_bucket_arn) => table_bucket_arn, + None => return Ok(None), + }; + + let properties = HashMap::new(); + let config = S3TablesCatalogConfig { + table_bucket_arn, + properties, + endpoint_url: None, + }; + + Ok(Some(S3TablesCatalog::new(config).await?)) + } + + #[tokio::test] + async fn test_s3tables_list_namespace() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + let namespaces = catalog.list_namespaces(None).await.unwrap(); + assert!(namespaces.len() > 0); + } + + #[tokio::test] + async fn test_s3tables_list_tables() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + let tables = catalog + .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string())) + .await + .unwrap(); + println!("{:?}", tables); + assert!(tables.len() > 0); + } + + #[tokio::test] + async fn test_s3tables_load_table() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + let table = catalog + .load_table(&TableIdent::new( + NamespaceIdent::new("aws_s3_metadata".to_string()), + "query_storage_metadata".to_string(), + )) + .await + .unwrap(); + println!("{:?}", table); + } +} From dab8e51b370e0f31dc685a077338e3f6241b2dbb Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 15:38:09 +0800 Subject: [PATCH 06/16] add create table --- crates/catalog/s3tables/src/catalog.rs | 46 +++++++++++++++++++++++--- crates/catalog/s3tables/src/utils.rs | 32 +++++++++++++++++- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 0b2d18238..ab02dba69 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -2,17 +2,19 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; +use aws_sdk_s3tables::operation::create_table::CreateTableOutput; +use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput; use aws_sdk_s3tables::operation::get_table::GetTableOutput; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; use iceberg::io::FileIO; -use iceberg::spec::TableMetadata; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use crate::utils::create_sdk_config; +use crate::utils::{create_metadata_location, create_sdk_config}; #[derive(Debug)] pub struct S3TablesCatalogConfig { @@ -112,7 +114,7 @@ impl Catalog for S3TablesCatalog { .get_namespace() .table_bucket_arn(self.config.table_bucket_arn.clone()) .namespace(namespace.to_url_string()); - let resp = req.send().await.map_err(from_aws_sdk_error)?; + let resp: GetNamespaceOutput = req.send().await.map_err(from_aws_sdk_error)?; let properties = HashMap::new(); Ok(Namespace::with_properties( NamespaceIdent::from_vec(resp.namespace().to_vec())?, @@ -191,7 +193,43 @@ impl Catalog for S3TablesCatalog { namespace: &NamespaceIdent, creation: TableCreation, ) -> Result
{ - todo!() + let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); + + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; + let metadata_location = + create_metadata_location(namespace.to_url_string(), table_ident.name(), 0)?; + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + + self.s3tables_client + .create_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + self.s3tables_client + .update_table_metadata_location() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .metadata_location(metadata_location.clone()) + .send() + .await + .map_err(from_aws_sdk_error)?; + + let table = Table::builder() + .identifier(table_ident) + .metadata_location(metadata_location) + .metadata(metadata) + .file_io(self.file_io.clone()) + .build()?; + Ok(table) } async fn load_table(&self, table_ident: &TableIdent) -> Result
{ diff --git a/crates/catalog/s3tables/src/utils.rs b/crates/catalog/s3tables/src/utils.rs index e84b7fb9a..31ecc54f4 100644 --- a/crates/catalog/s3tables/src/utils.rs +++ b/crates/catalog/s3tables/src/utils.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; -use aws_config::default_provider::endpoint_url; use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_sdk_s3tables::config::Credentials; +use iceberg::{Error, ErrorKind, Result}; +use uuid::Uuid; /// Property aws profile name pub const AWS_PROFILE_NAME: &str = "profile_name"; @@ -53,3 +54,32 @@ pub(crate) async fn create_sdk_config( config.load().await } + +/// Create metadata location from `location` and `version` +pub(crate) fn create_metadata_location( + namespace: impl AsRef, + table_name: impl AsRef, + version: i32, +) -> Result { + if version < 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata version: '{}' must be a non-negative integer", + version + ), + )); + }; + + let version = format!("{:0>5}", version); + let id = Uuid::new_v4(); + let metadata_location = format!( + "{}/{}/metadata/{}-{}.metadata.json", + namespace.as_ref(), + table_name.as_ref(), + version, + id + ); + + Ok(metadata_location) +} From 43dd5735ba86a784c63663bb0f0ad18e3ec8fbec Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 15:57:51 +0800 Subject: [PATCH 07/16] fix create_table --- crates/catalog/s3tables/src/catalog.rs | 160 +++++++++++++++++++++++-- crates/catalog/s3tables/src/utils.rs | 8 +- 2 files changed, 150 insertions(+), 18 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index ab02dba69..9f830768a 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -62,10 +62,18 @@ impl S3TablesCatalog { #[async_trait] impl Catalog for S3TablesCatalog { + /// List namespaces from s3tables catalog. + /// + /// S3Tables doesn't support nested namespaces. If parent is provided, it will + /// return an empty list. async fn list_namespaces( &self, parent: Option<&NamespaceIdent>, ) -> Result> { + if parent.is_some() { + return Ok(vec![]); + } + let mut result = Vec::new(); let mut continuation_token = None; loop { @@ -91,6 +99,22 @@ impl Catalog for S3TablesCatalog { Ok(result) } + /// Creates a new namespace with the given identifier and properties. + /// + /// Attempts to create a namespace defined by the `namespace`. The `properties` + /// parameter is ignored. + /// + /// The following naming rules apply to namespaces: + /// + /// - Names must be between 3 (min) and 63 (max) characters long. + /// - Names can consist only of lowercase letters, numbers, and underscores (_). + /// - Names must begin and end with a letter or number. + /// - Names must not contain hyphens (-) or periods (.). + /// + /// This function can return an error in the following situations: + /// + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. async fn create_namespace( &self, namespace: &NamespaceIdent, @@ -108,6 +132,15 @@ impl Catalog for S3TablesCatalog { )) } + /// Retrieves a namespace by its identifier. + /// + /// Validates the given namespace identifier and then queries the + /// underlying database client to fetch the corresponding namespace data. + /// Constructs a `Namespace` object with the retrieved data and returns it. + /// + /// This function can return an error in any of the following situations: + /// - If there is an error querying the database, returned by + /// `from_aws_sdk_error`. async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { let req = self .s3tables_client @@ -122,6 +155,18 @@ impl Catalog for S3TablesCatalog { )) } + /// Checks if a namespace exists within the s3tables catalog. + /// + /// Validates the namespace identifier by querying the s3tables catalog + /// to determine if the specified namespace exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the namespace exists. + /// - `Ok(false)` if the namespace does not exist, identified by a specific + /// `IsNotFoundException` variant. + /// - `Err(...)` if an error occurs during validation or the s3tables catalog + /// query, with the error encapsulating the issue. async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { let req = self .s3tables_client @@ -140,10 +185,14 @@ impl Catalog for S3TablesCatalog { } } + /// Updates the properties of an existing namespace. + /// + /// S3Tables doesn't support updating namespace properties, so this function + /// will always return an error. async fn update_namespace( &self, - namespace: &NamespaceIdent, - properties: HashMap, + _namespace: &NamespaceIdent, + _properties: HashMap, ) -> Result<()> { Err(Error::new( ErrorKind::FeatureUnsupported, @@ -151,6 +200,14 @@ impl Catalog for S3TablesCatalog { )) } + /// Drops an existing namespace from the s3tables catalog. + /// + /// Validates the namespace identifier and then deletes the corresponding + /// namespace from the s3tables catalog. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database deletion process, converted using + /// `from_aws_sdk_error`. async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { let req = self .s3tables_client @@ -161,6 +218,14 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Lists all tables within a given namespace. + /// + /// Retrieves all tables associated with the specified namespace and returns + /// their identifiers. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let mut result = Vec::new(); let mut continuation_token = None; @@ -188,6 +253,18 @@ impl Catalog for S3TablesCatalog { Ok(result) } + /// Creates a new table within a specified namespace. + /// + /// Attempts to create a table defined by the `creation` parameter. The metadata + /// location is generated by the s3tables catalog, looks like: + /// + /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json + /// + /// We have to get this random warehouse location after the table is created. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. async fn create_table( &self, namespace: &NamespaceIdent, @@ -195,30 +272,46 @@ impl Catalog for S3TablesCatalog { ) -> Result
{ let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); + // create table + let create_resp: CreateTableOutput = self + .s3tables_client + .create_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + + // get warehouse location + let get_resp: GetTableOutput = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + + // write metadata to file let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = - create_metadata_location(namespace.to_url_string(), table_ident.name(), 0)?; + let metadata_location = create_metadata_location(get_resp.warehouse_location(), 0)?; self.file_io .new_output(&metadata_location)? .write(serde_json::to_vec(&metadata)?.into()) .await?; - self.s3tables_client - .create_table() - .table_bucket_arn(self.config.table_bucket_arn.clone()) - .namespace(namespace.to_url_string()) - .name(table_ident.name()) - .send() - .await - .map_err(from_aws_sdk_error)?; + // update metadata location self.s3tables_client .update_table_metadata_location() .table_bucket_arn(self.config.table_bucket_arn.clone()) .namespace(namespace.to_url_string()) .name(table_ident.name()) .metadata_location(metadata_location.clone()) + .version_token(create_resp.version_token()) .send() .await .map_err(from_aws_sdk_error)?; @@ -232,6 +325,16 @@ impl Catalog for S3TablesCatalog { Ok(table) } + /// Loads an existing table from the s3tables catalog. + /// + /// Retrieves the metadata location of the specified table and constructs a + /// `Table` object with the retrieved metadata. + /// + /// This function can return an error in the following situations: + /// - If the table does not have a metadata location, identified by a specific + /// `Unexpected` variant. + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let req = self .s3tables_client @@ -263,6 +366,14 @@ impl Catalog for S3TablesCatalog { Ok(table) } + /// Drops an existing table from the s3tables catalog. + /// + /// Validates the table identifier and then deletes the corresponding + /// table from the s3tables catalog. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database deletion process, converted using + /// `from_aws_sdk_error`. async fn drop_table(&self, table: &TableIdent) -> Result<()> { let req = self .s3tables_client @@ -274,6 +385,18 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Checks if a table exists within the s3tables catalog. + /// + /// Validates the table identifier by querying the s3tables catalog + /// to determine if the specified table exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the table exists. + /// - `Ok(false)` if the table does not exist, identified by a specific + /// `IsNotFoundException` variant. + /// - `Err(...)` if an error occurs during validation or the s3tables catalog + /// query, with the error encapsulating the issue. async fn table_exists(&self, table_ident: &TableIdent) -> Result { let req = self .s3tables_client @@ -293,6 +416,14 @@ impl Catalog for S3TablesCatalog { } } + /// Renames an existing table within the s3tables catalog. + /// + /// Validates the source and destination table identifiers and then renames + /// the source table to the destination table. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database renaming process, converted using + /// `from_aws_sdk_error`. async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { let req = self .s3tables_client @@ -306,7 +437,10 @@ impl Catalog for S3TablesCatalog { Ok(()) } - async fn update_table(&self, commit: TableCommit) -> Result
{ + /// Updates an existing table within the s3tables catalog. + /// + /// This function is still in development and will always return an error. + async fn update_table(&self, _commit: TableCommit) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, "Updating a table is not supported yet", diff --git a/crates/catalog/s3tables/src/utils.rs b/crates/catalog/s3tables/src/utils.rs index 31ecc54f4..9e67f2d62 100644 --- a/crates/catalog/s3tables/src/utils.rs +++ b/crates/catalog/s3tables/src/utils.rs @@ -57,8 +57,7 @@ pub(crate) async fn create_sdk_config( /// Create metadata location from `location` and `version` pub(crate) fn create_metadata_location( - namespace: impl AsRef, - table_name: impl AsRef, + warehouse_location: impl AsRef, version: i32, ) -> Result { if version < 0 { @@ -74,9 +73,8 @@ pub(crate) fn create_metadata_location( let version = format!("{:0>5}", version); let id = Uuid::new_v4(); let metadata_location = format!( - "{}/{}/metadata/{}-{}.metadata.json", - namespace.as_ref(), - table_name.as_ref(), + "{}/metadata/{}-{}.metadata.json", + warehouse_location.as_ref(), version, id ); From 4a9817b43502ea0dd259bb5e94efdb82ce2b20b6 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 16:29:20 +0800 Subject: [PATCH 08/16] add test about namespace --- crates/catalog/s3tables/src/catalog.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 9f830768a..35023dba8 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -523,4 +523,22 @@ mod tests { .unwrap(); println!("{:?}", table); } + + #[tokio::test] + async fn test_s3tables_create_delete_namespace() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + let namespace = NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + assert!(catalog.namespace_exists(&namespace).await.unwrap()); + catalog.drop_namespace(&namespace).await.unwrap(); + assert!(!catalog.namespace_exists(&namespace).await.unwrap()); + } } From ad0160de9874d0dc0649f0b5a3ced5120f898f63 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 16:46:01 +0800 Subject: [PATCH 09/16] fix create tables --- crates/catalog/s3tables/src/catalog.rs | 89 ++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 35023dba8..715ceb2a0 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -6,6 +6,7 @@ use aws_sdk_s3tables::operation::create_table::CreateTableOutput; use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput; use aws_sdk_s3tables::operation::get_table::GetTableOutput; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; +use aws_sdk_s3tables::types::OpenTableFormat; use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -263,12 +264,14 @@ impl Catalog for S3TablesCatalog { /// We have to get this random warehouse location after the table is created. /// /// This function can return an error in the following situations: + /// - If the location of the table is set by user, identified by a specific + /// `DataInvalid` variant. /// - Errors from the underlying database creation process, converted using /// `from_aws_sdk_error`. async fn create_table( &self, namespace: &NamespaceIdent, - creation: TableCreation, + mut creation: TableCreation, ) -> Result
{ let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); @@ -278,27 +281,41 @@ impl Catalog for S3TablesCatalog { .create_table() .table_bucket_arn(self.config.table_bucket_arn.clone()) .namespace(namespace.to_url_string()) + .format(OpenTableFormat::Iceberg) .name(table_ident.name()) .send() .await .map_err(from_aws_sdk_error)?; - // get warehouse location - let get_resp: GetTableOutput = self - .s3tables_client - .get_table() - .table_bucket_arn(self.config.table_bucket_arn.clone()) - .namespace(namespace.to_url_string()) - .name(table_ident.name()) - .send() - .await - .map_err(from_aws_sdk_error)?; + // prepare metadata location. the warehouse location is generated by s3tables catalog, + // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3 + let metadata_location = match &creation.location { + Some(_) => { + return Err(Error::new( + ErrorKind::DataInvalid, + "The location of the table is generated by s3tables catalog, can't be set by user.", + )); + } + None => { + let get_resp: GetTableOutput = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + let warehouse_location = get_resp.warehouse_location().to_string(); + create_metadata_location(warehouse_location, 0)? + } + }; // write metadata to file + creation.location = Some(metadata_location.clone()); let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = create_metadata_location(get_resp.warehouse_location(), 0)?; self.file_io .new_output(&metadata_location)? .write(serde_json::to_vec(&metadata)?.into()) @@ -344,6 +361,7 @@ impl Catalog for S3TablesCatalog { .name(table_ident.name()); let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?; + // when a table is created, it's possible that the metadata location is not set. let metadata_location = resp.metadata_location().ok_or_else(|| { Error::new( ErrorKind::Unexpected, @@ -353,7 +371,7 @@ impl Catalog for S3TablesCatalog { ), ) })?; - let input_file = self.file_io.new_input(&metadata_location)?; + let input_file = self.file_io.new_input(metadata_location)?; let metadata_content = input_file.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; @@ -460,6 +478,8 @@ where T: std::fmt::Debug { #[cfg(test)] mod tests { + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use super::*; async fn load_s3tables_catalog_from_env() -> Result> { @@ -541,4 +561,47 @@ mod tests { catalog.drop_namespace(&namespace).await.unwrap(); assert!(!catalog.namespace_exists(&namespace).await.unwrap()); } + + #[tokio::test] + async fn test_s3tables_create_delete_table() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + let creation = { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + TableCreation::builder() + .name("test_s3tables_create_delete_table".to_string()) + .properties(HashMap::new()) + .schema(schema) + .build() + }; + + let namespace = NamespaceIdent::new("test_s3tables_create_delete_table".to_string()); + let table_ident = TableIdent::new( + namespace.clone(), + "test_s3tables_create_delete_table".to_string(), + ); + catalog.drop_namespace(&namespace).await.ok(); + catalog.drop_table(&table_ident).await.ok(); + + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + catalog.create_table(&namespace, creation).await.unwrap(); + assert!(catalog.table_exists(&table_ident).await.unwrap()); + catalog.drop_table(&table_ident).await.unwrap(); + assert!(!catalog.table_exists(&table_ident).await.unwrap()); + catalog.drop_namespace(&namespace).await.unwrap(); + } } From 98c47bef7975966da858e3405a54bbce025f2aa3 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 16:51:07 +0800 Subject: [PATCH 10/16] fix license header --- crates/catalog/s3tables/src/catalog.rs | 17 +++++++++++++++++ crates/catalog/s3tables/src/utils.rs | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 715ceb2a0..8d1902b02 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use anyhow::anyhow; diff --git a/crates/catalog/s3tables/src/utils.rs b/crates/catalog/s3tables/src/utils.rs index 9e67f2d62..d0195dccf 100644 --- a/crates/catalog/s3tables/src/utils.rs +++ b/crates/catalog/s3tables/src/utils.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use aws_config::{BehaviorVersion, Region, SdkConfig}; From 120a5e4f92255734a30a90fb6981d9f5693463e5 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 16:57:35 +0800 Subject: [PATCH 11/16] fix clippy --- crates/catalog/s3tables/src/catalog.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 8d1902b02..368b2106e 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -524,7 +524,7 @@ mod tests { }; let namespaces = catalog.list_namespaces(None).await.unwrap(); - assert!(namespaces.len() > 0); + assert!(!namespaces.is_empty()); } #[tokio::test] @@ -539,8 +539,7 @@ mod tests { .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string())) .await .unwrap(); - println!("{:?}", tables); - assert!(tables.len() > 0); + assert!(!tables.is_empty()); } #[tokio::test] From d5bc77c71bf5a2479a13020de3a7c8dd06799e3d Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 17:05:24 +0800 Subject: [PATCH 12/16] cargo sort --- crates/catalog/s3tables/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index bf4174bff..8e0f10904 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -29,14 +29,14 @@ license = { workspace = true } keywords = ["iceberg", "sql", "catalog"] [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } +aws-config = { workspace = true } +aws-sdk-s3tables = "1.0.0" iceberg = { workspace = true } serde_json = { workspace = true } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } -aws-config = { workspace = true } -aws-sdk-s3tables = "1.0.0" -anyhow = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } From 78eed428afd29f0c5a80c62aa1fcfacc87ae0cc2 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 17:24:35 +0800 Subject: [PATCH 13/16] fix cargo-machete --- crates/catalog/s3tables/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index 8e0f10904..64b332611 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -35,7 +35,6 @@ aws-config = { workspace = true } aws-sdk-s3tables = "1.0.0" iceberg = { workspace = true } serde_json = { workspace = true } -typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] From 1d4772a5a7b019eaf3906afcd101b9f2a6d06cde Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 17:30:44 +0800 Subject: [PATCH 14/16] fix typo --- crates/catalog/s3tables/src/catalog.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 368b2106e..2a9f92df1 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -99,9 +99,6 @@ impl Catalog for S3TablesCatalog { .s3tables_client .list_namespaces() .table_bucket_arn(self.config.table_bucket_arn.clone()); - if let Some(parent) = parent { - req = req.prefix(parent.to_url_string()); - } if let Some(token) = continuation_token { req = req.continuation_token(token); } From fd6b0d29a1df4cc1fbff00c519a16de56dd698b8 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 27 Dec 2024 18:51:41 +0800 Subject: [PATCH 15/16] add comments for all the pub structs & methods --- crates/catalog/s3tables/src/catalog.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 2a9f92df1..1fa5ffa6f 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -34,6 +34,7 @@ use iceberg::{ use crate::utils::{create_metadata_location, create_sdk_config}; +/// S3Tables catalog configuration. #[derive(Debug)] pub struct S3TablesCatalogConfig { table_bucket_arn: String, @@ -50,6 +51,7 @@ pub struct S3TablesCatalog { } impl S3TablesCatalog { + /// Creates a new S3Tables catalog. pub async fn new(config: S3TablesCatalogConfig) -> Result { let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()).await; let s3tables_client = aws_sdk_s3tables::Client::new(&aws_config); From 27c0e28525b24fdcd733b101798385a886435e17 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 27 Dec 2024 18:55:13 +0800 Subject: [PATCH 16/16] add more comments --- crates/catalog/s3tables/src/catalog.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 1fa5ffa6f..b3fab91f9 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -37,8 +37,19 @@ use crate::utils::{create_metadata_location, create_sdk_config}; /// S3Tables catalog configuration. #[derive(Debug)] pub struct S3TablesCatalogConfig { + /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a virtual bucket + /// that is managed by s3tables. We can't directly access the bucket with path like + /// s3://{bucket_name}/{file_path}, all the operations are done with respect of the bucket + /// ARN. table_bucket_arn: String, + /// Properties for the catalog. The available properties are: + /// - `profile_name`: The name of the AWS profile to use. + /// - `region_name`: The AWS region to use. + /// - `aws_access_key_id`: The AWS access key ID to use. + /// - `aws_secret_access_key`: The AWS secret access key to use. + /// - `aws_session_token`: The AWS session token to use. properties: HashMap, + /// Endpoint URL for the catalog. endpoint_url: Option, }