Skip to content

Commit

Permalink
fix create_table
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 committed Dec 20, 2024
1 parent dab8e51 commit e63c4be
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 17 deletions.
158 changes: 146 additions & 12 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,18 @@ impl S3TablesCatalog {

#[async_trait]
impl Catalog for S3TablesCatalog {
/// List namespaces from s3tables catalog.
///
/// S3Tables doesn't support nested namespaces. If parent is provided, it will
/// return an empty list.
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
if parent.is_some() {
return Ok(vec![]);
}

let mut result = Vec::new();
let mut continuation_token = None;
loop {
Expand All @@ -91,6 +99,22 @@ impl Catalog for S3TablesCatalog {
Ok(result)
}

/// Creates a new namespace with the given identifier and properties.
///
/// Attempts to create a namespace defined by the `namespace`. The `properties`
/// parameter is ignored.
///
/// The following naming rules apply to namespaces:
///
/// - Names must be between 3 (min) and 63 (max) characters long.
/// - Names can consist only of lowercase letters, numbers, and underscores (_).
/// - Names must begin and end with a letter or number.
/// - Names must not contain hyphens (-) or periods (.).
///
/// This function can return an error in the following situations:
///
/// - Errors from the underlying database creation process, converted using
/// `from_aws_sdk_error`.
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
Expand All @@ -108,6 +132,15 @@ impl Catalog for S3TablesCatalog {
))
}

/// Retrieves a namespace by its identifier.
///
/// Validates the given namespace identifier and then queries the
/// underlying database client to fetch the corresponding namespace data.
/// Constructs a `Namespace` object with the retrieved data and returns it.
///
/// This function can return an error in any of the following situations:
/// - If there is an error querying the database, returned by
/// `from_aws_sdk_error`.
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let req = self
.s3tables_client
Expand All @@ -122,6 +155,18 @@ impl Catalog for S3TablesCatalog {
))
}

/// Checks if a namespace exists within the s3tables catalog.
///
/// Validates the namespace identifier by querying the s3tables catalog
/// to determine if the specified namespace exists.
///
/// # Returns
/// A `Result<bool>` indicating the outcome of the check:
/// - `Ok(true)` if the namespace exists.
/// - `Ok(false)` if the namespace does not exist, identified by a specific
/// `IsNotFoundException` variant.
/// - `Err(...)` if an error occurs during validation or the s3tables catalog
/// query, with the error encapsulating the issue.
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let req = self
.s3tables_client
Expand All @@ -140,17 +185,29 @@ impl Catalog for S3TablesCatalog {
}
}

/// Updates the properties of an existing namespace.
///
/// S3Tables doesn't support updating namespace properties, so this function
/// will always return an error.
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
) -> Result<()> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Update namespace is not supported for s3tables catalog",
))
}

/// Drops an existing namespace from the s3tables catalog.
///
/// Validates the namespace identifier and then deletes the corresponding
/// namespace from the s3tables catalog.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database deletion process, converted using
/// `from_aws_sdk_error`.
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let req = self
.s3tables_client
Expand All @@ -161,6 +218,14 @@ impl Catalog for S3TablesCatalog {
Ok(())
}

/// Lists all tables within a given namespace.
///
/// Retrieves all tables associated with the specified namespace and returns
/// their identifiers.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database query process, converted using
/// `from_aws_sdk_error`.
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let mut result = Vec::new();
let mut continuation_token = None;
Expand Down Expand Up @@ -188,37 +253,65 @@ impl Catalog for S3TablesCatalog {
Ok(result)
}

/// Creates a new table within a specified namespace.
///
/// Attempts to create a table defined by the `creation` parameter. The metadata
/// location is generated by the s3tables catalog, looks like:
///
/// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
///
/// We have to get this random warehouse location after the table is created.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database creation process, converted using
/// `from_aws_sdk_error`.
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());

// create table
let create_resp: CreateTableOutput = self
.s3tables_client
.create_table()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(namespace.to_url_string())
.name(table_ident.name())
.send()
.await
.map_err(from_aws_sdk_error)?;

// get warehouse location
let get_resp: GetTableOutput = self
.s3tables_client
.get_table()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(namespace.to_url_string())
.name(table_ident.name())
.send()
.await
.map_err(from_aws_sdk_error)?;

// write metadata to file
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location =
create_metadata_location(namespace.to_url_string(), table_ident.name(), 0)?;
let metadata_location = create_metadata_location(get_resp.warehouse_location(), 0)?;
self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;

self.s3tables_client
.create_table()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(namespace.to_url_string())
.name(table_ident.name())
.send()
.await
.map_err(from_aws_sdk_error)?;
// update metadata location
self.s3tables_client
.update_table_metadata_location()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(namespace.to_url_string())
.name(table_ident.name())
.metadata_location(metadata_location.clone())
.version_token(create_resp.version_token())
.send()
.await
.map_err(from_aws_sdk_error)?;
Expand All @@ -232,6 +325,16 @@ impl Catalog for S3TablesCatalog {
Ok(table)
}

/// Loads an existing table from the s3tables catalog.
///
/// Retrieves the metadata location of the specified table and constructs a
/// `Table` object with the retrieved metadata.
///
/// This function can return an error in the following situations:
/// - If the table does not have a metadata location, identified by a specific
/// `Unexpected` variant.
/// - Errors from the underlying database query process, converted using
/// `from_aws_sdk_error`.
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
let req = self
.s3tables_client
Expand Down Expand Up @@ -263,6 +366,14 @@ impl Catalog for S3TablesCatalog {
Ok(table)
}

/// Drops an existing table from the s3tables catalog.
///
/// Validates the table identifier and then deletes the corresponding
/// table from the s3tables catalog.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database deletion process, converted using
/// `from_aws_sdk_error`.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let req = self
.s3tables_client
Expand All @@ -274,6 +385,18 @@ impl Catalog for S3TablesCatalog {
Ok(())
}

/// Checks if a table exists within the s3tables catalog.
///
/// Validates the table identifier by querying the s3tables catalog
/// to determine if the specified table exists.
///
/// # Returns
/// A `Result<bool>` indicating the outcome of the check:
/// - `Ok(true)` if the table exists.
/// - `Ok(false)` if the table does not exist, identified by a specific
/// `IsNotFoundException` variant.
/// - `Err(...)` if an error occurs during validation or the s3tables catalog
/// query, with the error encapsulating the issue.
async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
let req = self
.s3tables_client
Expand All @@ -293,6 +416,14 @@ impl Catalog for S3TablesCatalog {
}
}

/// Renames an existing table within the s3tables catalog.
///
/// Validates the source and destination table identifiers and then renames
/// the source table to the destination table.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database renaming process, converted using
/// `from_aws_sdk_error`.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let req = self
.s3tables_client
Expand All @@ -306,6 +437,9 @@ impl Catalog for S3TablesCatalog {
Ok(())
}

/// Updates an existing table within the s3tables catalog.
///
/// This function is still in development and will always return an error.
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
Expand Down
8 changes: 3 additions & 5 deletions crates/catalog/s3tables/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ pub(crate) async fn create_sdk_config(

/// Create metadata location from `location` and `version`
pub(crate) fn create_metadata_location(
namespace: impl AsRef<str>,
table_name: impl AsRef<str>,
warehouse_location: impl AsRef<str>,
version: i32,
) -> Result<String> {
if version < 0 {
Expand All @@ -74,9 +73,8 @@ pub(crate) fn create_metadata_location(
let version = format!("{:0>5}", version);
let id = Uuid::new_v4();
let metadata_location = format!(
"{}/{}/metadata/{}-{}.metadata.json",
namespace.as_ref(),
table_name.as_ref(),
"{}/metadata/{}-{}.metadata.json",
warehouse_location.as_ref(),
version,
id
);
Expand Down

0 comments on commit e63c4be

Please sign in to comment.