From e63c4bed0fa8a74123ce64ebe6be669f18a4d805 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Fri, 20 Dec 2024 15:57:51 +0800 Subject: [PATCH] fix create_table --- crates/catalog/s3tables/src/catalog.rs | 158 +++++++++++++++++++++++-- crates/catalog/s3tables/src/utils.rs | 8 +- 2 files changed, 149 insertions(+), 17 deletions(-) diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index ab02dba69..0a7af8039 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -62,10 +62,18 @@ impl S3TablesCatalog { #[async_trait] impl Catalog for S3TablesCatalog { + /// List namespaces from s3tables catalog. + /// + /// S3Tables doesn't support nested namespaces. If parent is provided, it will + /// return an empty list. async fn list_namespaces( &self, parent: Option<&NamespaceIdent>, ) -> Result> { + if parent.is_some() { + return Ok(vec![]); + } + let mut result = Vec::new(); let mut continuation_token = None; loop { @@ -91,6 +99,22 @@ impl Catalog for S3TablesCatalog { Ok(result) } + /// Creates a new namespace with the given identifier and properties. + /// + /// Attempts to create a namespace defined by the `namespace`. The `properties` + /// parameter is ignored. + /// + /// The following naming rules apply to namespaces: + /// + /// - Names must be between 3 (min) and 63 (max) characters long. + /// - Names can consist only of lowercase letters, numbers, and underscores (_). + /// - Names must begin and end with a letter or number. + /// - Names must not contain hyphens (-) or periods (.). + /// + /// This function can return an error in the following situations: + /// + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. async fn create_namespace( &self, namespace: &NamespaceIdent, @@ -108,6 +132,15 @@ impl Catalog for S3TablesCatalog { )) } + /// Retrieves a namespace by its identifier. + /// + /// Validates the given namespace identifier and then queries the + /// underlying database client to fetch the corresponding namespace data. + /// Constructs a `Namespace` object with the retrieved data and returns it. + /// + /// This function can return an error in any of the following situations: + /// - If there is an error querying the database, returned by + /// `from_aws_sdk_error`. async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { let req = self .s3tables_client @@ -122,6 +155,18 @@ impl Catalog for S3TablesCatalog { )) } + /// Checks if a namespace exists within the s3tables catalog. + /// + /// Validates the namespace identifier by querying the s3tables catalog + /// to determine if the specified namespace exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the namespace exists. + /// - `Ok(false)` if the namespace does not exist, identified by a specific + /// `IsNotFoundException` variant. + /// - `Err(...)` if an error occurs during validation or the s3tables catalog + /// query, with the error encapsulating the issue. async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { let req = self .s3tables_client @@ -140,10 +185,14 @@ impl Catalog for S3TablesCatalog { } } + /// Updates the properties of an existing namespace. + /// + /// S3Tables doesn't support updating namespace properties, so this function + /// will always return an error. async fn update_namespace( &self, - namespace: &NamespaceIdent, - properties: HashMap, + _namespace: &NamespaceIdent, + _properties: HashMap, ) -> Result<()> { Err(Error::new( ErrorKind::FeatureUnsupported, @@ -151,6 +200,14 @@ impl Catalog for S3TablesCatalog { )) } + /// Drops an existing namespace from the s3tables catalog. + /// + /// Validates the namespace identifier and then deletes the corresponding + /// namespace from the s3tables catalog. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database deletion process, converted using + /// `from_aws_sdk_error`. async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { let req = self .s3tables_client @@ -161,6 +218,14 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Lists all tables within a given namespace. + /// + /// Retrieves all tables associated with the specified namespace and returns + /// their identifiers. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let mut result = Vec::new(); let mut continuation_token = None; @@ -188,6 +253,18 @@ impl Catalog for S3TablesCatalog { Ok(result) } + /// Creates a new table within a specified namespace. + /// + /// Attempts to create a table defined by the `creation` parameter. The metadata + /// location is generated by the s3tables catalog, looks like: + /// + /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json + /// + /// We have to get this random warehouse location after the table is created. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. async fn create_table( &self, namespace: &NamespaceIdent, @@ -195,30 +272,46 @@ impl Catalog for S3TablesCatalog { ) -> Result { let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); + // create table + let create_resp: CreateTableOutput = self + .s3tables_client + .create_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + + // get warehouse location + let get_resp: GetTableOutput = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(namespace.to_url_string()) + .name(table_ident.name()) + .send() + .await + .map_err(from_aws_sdk_error)?; + + // write metadata to file let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = - create_metadata_location(namespace.to_url_string(), table_ident.name(), 0)?; + let metadata_location = create_metadata_location(get_resp.warehouse_location(), 0)?; self.file_io .new_output(&metadata_location)? .write(serde_json::to_vec(&metadata)?.into()) .await?; - self.s3tables_client - .create_table() - .table_bucket_arn(self.config.table_bucket_arn.clone()) - .namespace(namespace.to_url_string()) - .name(table_ident.name()) - .send() - .await - .map_err(from_aws_sdk_error)?; + // update metadata location self.s3tables_client .update_table_metadata_location() .table_bucket_arn(self.config.table_bucket_arn.clone()) .namespace(namespace.to_url_string()) .name(table_ident.name()) .metadata_location(metadata_location.clone()) + .version_token(create_resp.version_token()) .send() .await .map_err(from_aws_sdk_error)?; @@ -232,6 +325,16 @@ impl Catalog for S3TablesCatalog { Ok(table) } + /// Loads an existing table from the s3tables catalog. + /// + /// Retrieves the metadata location of the specified table and constructs a + /// `Table` object with the retrieved metadata. + /// + /// This function can return an error in the following situations: + /// - If the table does not have a metadata location, identified by a specific + /// `Unexpected` variant. + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let req = self .s3tables_client @@ -263,6 +366,14 @@ impl Catalog for S3TablesCatalog { Ok(table) } + /// Drops an existing table from the s3tables catalog. + /// + /// Validates the table identifier and then deletes the corresponding + /// table from the s3tables catalog. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database deletion process, converted using + /// `from_aws_sdk_error`. async fn drop_table(&self, table: &TableIdent) -> Result<()> { let req = self .s3tables_client @@ -274,6 +385,18 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Checks if a table exists within the s3tables catalog. + /// + /// Validates the table identifier by querying the s3tables catalog + /// to determine if the specified table exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the table exists. + /// - `Ok(false)` if the table does not exist, identified by a specific + /// `IsNotFoundException` variant. + /// - `Err(...)` if an error occurs during validation or the s3tables catalog + /// query, with the error encapsulating the issue. async fn table_exists(&self, table_ident: &TableIdent) -> Result { let req = self .s3tables_client @@ -293,6 +416,14 @@ impl Catalog for S3TablesCatalog { } } + /// Renames an existing table within the s3tables catalog. + /// + /// Validates the source and destination table identifiers and then renames + /// the source table to the destination table. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database renaming process, converted using + /// `from_aws_sdk_error`. async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { let req = self .s3tables_client @@ -306,6 +437,9 @@ impl Catalog for S3TablesCatalog { Ok(()) } + /// Updates an existing table within the s3tables catalog. + /// + /// This function is still in development and will always return an error. async fn update_table(&self, commit: TableCommit) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, diff --git a/crates/catalog/s3tables/src/utils.rs b/crates/catalog/s3tables/src/utils.rs index 31ecc54f4..9e67f2d62 100644 --- a/crates/catalog/s3tables/src/utils.rs +++ b/crates/catalog/s3tables/src/utils.rs @@ -57,8 +57,7 @@ pub(crate) async fn create_sdk_config( /// Create metadata location from `location` and `version` pub(crate) fn create_metadata_location( - namespace: impl AsRef, - table_name: impl AsRef, + warehouse_location: impl AsRef, version: i32, ) -> Result { if version < 0 { @@ -74,9 +73,8 @@ pub(crate) fn create_metadata_location( let version = format!("{:0>5}", version); let id = Uuid::new_v4(); let metadata_location = format!( - "{}/{}/metadata/{}-{}.metadata.json", - namespace.as_ref(), - table_name.as_ref(), + "{}/metadata/{}-{}.metadata.json", + warehouse_location.as_ref(), version, id );