diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 475da7be6..5a032215a 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -31,14 +31,17 @@ keywords = ["iceberg", "hive", "catalog"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } hive_metastore = { workspace = true } iceberg = { workspace = true } log = { workspace = true } pilota = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } typed-builder = { workspace = true } +uuid = { workspace = true } volo-thrift = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } port_scanner = { workspace = true } -tokio = { workspace = true } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index aba6a45cb..4ca66bd21 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -15,11 +15,18 @@ // specific language governing permissions and limitations // under the License. +use crate::error::from_io_error; +use crate::error::from_thrift_error; + use super::utils::*; use async_trait::async_trait; use hive_metastore::ThriftHiveMetastoreClient; use hive_metastore::ThriftHiveMetastoreClientBuilder; use hive_metastore::ThriftHiveMetastoreGetDatabaseException; +use hive_metastore::ThriftHiveMetastoreGetTableException; +use iceberg::io::FileIO; +use iceberg::spec::TableMetadata; +use iceberg::spec::TableMetadataBuilder; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, @@ -28,6 +35,8 @@ use iceberg::{ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use typed_builder::TypedBuilder; use volo_thrift::ResponseError; @@ -47,6 +56,9 @@ pub enum HmsThriftTransport { pub struct HmsCatalogConfig { address: String, thrift_transport: HmsThriftTransport, + warehouse: String, + #[builder(default)] + props: HashMap, } struct HmsClient(ThriftHiveMetastoreClient); @@ -55,6 +67,7 @@ struct HmsClient(ThriftHiveMetastoreClient); pub struct HmsCatalog { config: HmsCatalogConfig, client: HmsClient, + file_io: FileIO, } impl Debug for HmsCatalog { @@ -92,11 +105,20 @@ impl HmsCatalog { .build(), }; + let file_io = FileIO::from_path(&config.warehouse)? + .with_props(&config.props) + .build()?; + Ok(Self { config, client: HmsClient(client), + file_io, }) } + /// Get the catalogs `FileIO` + pub fn file_io(&self) -> FileIO { + self.file_io.clone() + } } #[async_trait] @@ -173,7 +195,7 @@ impl Catalog for HmsCatalog { let db = self .client .0 - .get_database(name.clone().into()) + .get_database(name.into()) .await .map_err(from_thrift_error)?; @@ -197,7 +219,7 @@ impl Catalog for HmsCatalog { async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { let name = validate_namespace(namespace)?; - let resp = self.client.0.get_database(name.clone().into()).await; + let resp = self.client.0.get_database(name.into()).await; match resp { Ok(_) => Ok(true), @@ -269,13 +291,22 @@ impl Catalog for HmsCatalog { Ok(()) } + /// Asynchronously lists all tables within a specified namespace. + /// + /// # Returns + /// + /// A `Result>`, which is: + /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each + /// representing a table within the specified namespace. + /// - `Err(...)` if an error occurs during namespace validation or while + /// querying the database. async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let name = validate_namespace(namespace)?; let tables = self .client .0 - .get_all_tables(name.clone().into()) + .get_all_tables(name.into()) .await .map_err(from_thrift_error)?; @@ -287,31 +318,201 @@ impl Catalog for HmsCatalog { Ok(tables) } + /// Creates a new table within a specified namespace using the provided + /// table creation settings. + /// + /// # Returns + /// A `Result` wrapping a `Table` object representing the newly created + /// table. + /// + /// # Errors + /// This function may return an error in several cases, including invalid + /// namespace identifiers, failure to determine a default storage location, + /// issues generating or writing table metadata, and errors communicating + /// with the Hive Metastore. async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result { - todo!() + let db_name = validate_namespace(namespace)?; + let table_name = creation.name.clone(); + + let location = match &creation.location { + Some(location) => location.clone(), + None => { + let ns = self.get_namespace(namespace).await?; + get_default_table_location(&ns, &table_name, &self.config.warehouse) + } + }; + + let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata_location = create_metadata_location(&location, 0)?; + + let mut file = self + .file_io + .new_output(&metadata_location)? + .writer() + .await?; + file.write_all(&serde_json::to_vec(&metadata)?).await?; + file.shutdown().await?; + + let hive_table = convert_to_hive_table( + db_name.clone(), + metadata.current_schema(), + table_name.clone(), + location, + metadata_location.clone(), + metadata.properties(), + )?; + + self.client + .0 + .create_table(hive_table) + .await + .map_err(from_thrift_error)?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .build(); + + Ok(table) } - async fn load_table(&self, _table: &TableIdent) -> Result
{ - todo!() + /// Loads a table from the Hive Metastore and constructs a `Table` object + /// based on its metadata. + /// + /// # Returns + /// A `Result` wrapping a `Table` object that represents the loaded table. + /// + /// # Errors + /// This function may return an error in several scenarios, including: + /// - Failure to validate the namespace. + /// - Failure to retrieve the table from the Hive Metastore. + /// - Absence of metadata location information in the table's properties. + /// - Issues reading or deserializing the table's metadata file. + async fn load_table(&self, table: &TableIdent) -> Result
{ + let db_name = validate_namespace(table.namespace())?; + + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), table.name.clone().into()) + .await + .map_err(from_thrift_error)?; + + let metadata_location = get_metadata_location(&hive_table.parameters)?; + + let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?; + let mut metadata_str = String::new(); + reader.read_to_string(&mut metadata_str).await?; + let metadata = serde_json::from_str::(&metadata_str)?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name), + table.name.clone(), + )) + .build(); + + Ok(table) } - async fn drop_table(&self, _table: &TableIdent) -> Result<()> { - todo!() + /// Asynchronously drops a table from the database. + /// + /// # Errors + /// Returns an error if: + /// - The namespace provided in `table` cannot be validated + /// or does not exist. + /// - The underlying database client encounters an error while + /// attempting to drop the table. This includes scenarios where + /// the table does not exist. + /// - Any network or communication error occurs with the database backend. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let db_name = validate_namespace(table.namespace())?; + + self.client + .0 + .drop_table(db_name.into(), table.name.clone().into(), false) + .await + .map_err(from_thrift_error)?; + + Ok(()) } - async fn table_exists(&self, _table: &TableIdent) -> Result { - todo!() + /// Asynchronously checks the existence of a specified table + /// in the database. + /// + /// # Returns + /// - `Ok(true)` if the table exists in the database. + /// - `Ok(false)` if the table does not exist in the database. + /// - `Err(...)` if an error occurs during the process + async fn table_exists(&self, table: &TableIdent) -> Result { + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name.clone(); + + let resp = self + .client + .0 + .get_table(db_name.into(), table_name.into()) + .await; + + match resp { + Ok(_) => Ok(true), + Err(err) => { + if let ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) = + &err + { + Ok(false) + } else { + Err(from_thrift_error(err)) + } + } + } } - async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { - todo!() + /// Asynchronously renames a table within the database + /// or moves it between namespaces (databases). + /// + /// # Returns + /// - `Ok(())` on successful rename or move of the table. + /// - `Err(...)` if an error occurs during the process. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let src_dbname = validate_namespace(src.namespace())?; + let dest_dbname = validate_namespace(dest.namespace())?; + + let src_tbl_name = src.name.clone(); + let dest_tbl_name = dest.name.clone(); + + let mut tbl = self + .client + .0 + .get_table(src_dbname.clone().into(), src_tbl_name.clone().into()) + .await + .map_err(from_thrift_error)?; + + tbl.db_name = Some(dest_dbname.into()); + tbl.table_name = Some(dest_tbl_name.into()); + + self.client + .0 + .alter_table(src_dbname.into(), src_tbl_name.into(), tbl) + .await + .map_err(from_thrift_error)?; + + Ok(()) } async fn update_table(&self, _commit: TableCommit) -> Result
{ - todo!() + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) } } diff --git a/crates/catalog/hms/src/error.rs b/crates/catalog/hms/src/error.rs new file mode 100644 index 000000000..a0f393c62 --- /dev/null +++ b/crates/catalog/hms/src/error.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::anyhow; +use iceberg::{Error, ErrorKind}; +use std::fmt::Debug; +use std::io; + +/// Format a thrift error into iceberg error. +pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error +where + T: Debug, +{ + Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting thrift error".to_string(), + ) + .with_source(anyhow!("thrift error: {:?}", error)) +} + +/// Format an io error into iceberg error. +pub fn from_io_error(error: io::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting io error".to_string(), + ) + .with_source(error) +} diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/lib.rs index b75e74977..db0034d46 100644 --- a/crates/catalog/hms/src/lib.rs +++ b/crates/catalog/hms/src/lib.rs @@ -22,4 +22,6 @@ mod catalog; pub use catalog::*; +mod error; +mod schema; mod utils; diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs new file mode 100644 index 000000000..77caaf715 --- /dev/null +++ b/crates/catalog/hms/src/schema.rs @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use hive_metastore::FieldSchema; +use iceberg::spec::{visit_schema, PrimitiveType, Schema, SchemaVisitor}; +use iceberg::{Error, ErrorKind, Result}; + +type HiveSchema = Vec; + +#[derive(Debug, Default)] +pub(crate) struct HiveSchemaBuilder { + schema: HiveSchema, + depth: usize, +} + +impl HiveSchemaBuilder { + /// Creates a new `HiveSchemaBuilder` from iceberg `Schema` + pub fn from_iceberg(schema: &Schema) -> Result { + let mut builder = Self::default(); + visit_schema(schema, &mut builder)?; + Ok(builder) + } + + /// Returns the newly converted `HiveSchema` + pub fn build(self) -> HiveSchema { + self.schema + } + + /// Check if is in `StructType` while traversing schema + fn is_inside_struct(&self) -> bool { + self.depth > 0 + } +} + +impl SchemaVisitor for HiveSchemaBuilder { + type T = String; + + fn schema( + &mut self, + _schema: &iceberg::spec::Schema, + value: String, + ) -> iceberg::Result { + Ok(value) + } + + fn before_struct_field( + &mut self, + _field: &iceberg::spec::NestedFieldRef, + ) -> iceberg::Result<()> { + self.depth += 1; + Ok(()) + } + + fn r#struct( + &mut self, + r#_struct: &iceberg::spec::StructType, + results: Vec, + ) -> iceberg::Result { + Ok(format!("struct<{}>", results.join(", "))) + } + + fn after_struct_field( + &mut self, + _field: &iceberg::spec::NestedFieldRef, + ) -> iceberg::Result<()> { + self.depth -= 1; + Ok(()) + } + + fn field( + &mut self, + field: &iceberg::spec::NestedFieldRef, + value: String, + ) -> iceberg::Result { + if self.is_inside_struct() { + return Ok(format!("{}:{}", field.name, value)); + } + + self.schema.push(FieldSchema { + name: Some(field.name.clone().into()), + r#type: Some(value.clone().into()), + comment: field.doc.clone().map(|doc| doc.into()), + }); + + Ok(value) + } + + fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result { + Ok(format!("array<{}>", value)) + } + + fn map( + &mut self, + _map: &iceberg::spec::MapType, + key_value: String, + value: String, + ) -> iceberg::Result { + Ok(format!("map<{},{}>", key_value, value)) + } + + fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result { + let hive_type = match p { + PrimitiveType::Boolean => "boolean".to_string(), + PrimitiveType::Int => "int".to_string(), + PrimitiveType::Long => "bigint".to_string(), + PrimitiveType::Float => "float".to_string(), + PrimitiveType::Double => "double".to_string(), + PrimitiveType::Date => "date".to_string(), + PrimitiveType::Timestamp => "timestamp".to_string(), + PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => { + "string".to_string() + } + PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(), + PrimitiveType::Decimal { precision, scale } => { + format!("decimal({},{})", precision, scale) + } + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Conversion from 'Timestamptz' is not supported", + )) + } + }; + + Ok(hive_type) + } +} + +#[cfg(test)] +mod tests { + use iceberg::{spec::Schema, Result}; + + use super::*; + + #[test] + fn test_schema_with_nested_maps() -> Result<()> { + let record = r#" + { + "schema-id": 1, + "type": "struct", + "fields": [ + { + "id": 1, + "name": "quux", + "required": true, + "type": { + "type": "map", + "key-id": 2, + "key": "string", + "value-id": 3, + "value-required": true, + "value": { + "type": "map", + "key-id": 4, + "key": "string", + "value-id": 5, + "value-required": true, + "value": "int" + } + } + } + ] + } + "#; + + let schema = serde_json::from_str::(record)?; + + let result = HiveSchemaBuilder::from_iceberg(&schema)?.build(); + + let expected = vec![FieldSchema { + name: Some("quux".into()), + r#type: Some("map>".into()), + comment: None, + }]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_struct_inside_list() -> Result<()> { + let record = r#" + { + "schema-id": 1, + "type": "struct", + "fields": [ + { + "id": 1, + "name": "location", + "required": true, + "type": { + "type": "list", + "element-id": 2, + "element-required": true, + "element": { + "type": "struct", + "fields": [ + { + "id": 3, + "name": "latitude", + "required": false, + "type": "float" + }, + { + "id": 4, + "name": "longitude", + "required": false, + "type": "float" + } + ] + } + } + } + ] + } + "#; + + let schema = serde_json::from_str::(record)?; + + let result = HiveSchemaBuilder::from_iceberg(&schema)?.build(); + + let expected = vec![FieldSchema { + name: Some("location".into()), + r#type: Some("array>".into()), + comment: None, + }]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_structs() -> Result<()> { + let record = r#"{ + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "person", + "required": true, + "type": { + "type": "struct", + "fields": [ + { + "id": 2, + "name": "name", + "required": true, + "type": "string" + }, + { + "id": 3, + "name": "age", + "required": false, + "type": "int" + } + ] + } + } + ] + }"#; + + let schema = serde_json::from_str::(record)?; + + let result = HiveSchemaBuilder::from_iceberg(&schema)?.build(); + + let expected = vec![FieldSchema { + name: Some("person".into()), + r#type: Some("struct".into()), + comment: None, + }]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_simple_fields() -> Result<()> { + let record = r#"{ + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "c1", + "required": true, + "type": "boolean" + }, + { + "id": 2, + "name": "c2", + "required": true, + "type": "int" + }, + { + "id": 3, + "name": "c3", + "required": true, + "type": "long" + }, + { + "id": 4, + "name": "c4", + "required": true, + "type": "float" + }, + { + "id": 5, + "name": "c5", + "required": true, + "type": "double" + }, + { + "id": 6, + "name": "c6", + "required": true, + "type": "decimal(2,2)" + }, + { + "id": 7, + "name": "c7", + "required": true, + "type": "date" + }, + { + "id": 8, + "name": "c8", + "required": true, + "type": "time" + }, + { + "id": 9, + "name": "c9", + "required": true, + "type": "timestamp" + }, + { + "id": 10, + "name": "c10", + "required": true, + "type": "string" + }, + { + "id": 11, + "name": "c11", + "required": true, + "type": "uuid" + }, + { + "id": 12, + "name": "c12", + "required": true, + "type": "fixed[4]" + }, + { + "id": 13, + "name": "c13", + "required": true, + "type": "binary" + } + ] + }"#; + + let schema = serde_json::from_str::(record)?; + + let result = HiveSchemaBuilder::from_iceberg(&schema)?.build(); + + let expected = vec![ + FieldSchema { + name: Some("c1".into()), + r#type: Some("boolean".into()), + comment: None, + }, + FieldSchema { + name: Some("c2".into()), + r#type: Some("int".into()), + comment: None, + }, + FieldSchema { + name: Some("c3".into()), + r#type: Some("bigint".into()), + comment: None, + }, + FieldSchema { + name: Some("c4".into()), + r#type: Some("float".into()), + comment: None, + }, + FieldSchema { + name: Some("c5".into()), + r#type: Some("double".into()), + comment: None, + }, + FieldSchema { + name: Some("c6".into()), + r#type: Some("decimal(2,2)".into()), + comment: None, + }, + FieldSchema { + name: Some("c7".into()), + r#type: Some("date".into()), + comment: None, + }, + FieldSchema { + name: Some("c8".into()), + r#type: Some("string".into()), + comment: None, + }, + FieldSchema { + name: Some("c9".into()), + r#type: Some("timestamp".into()), + comment: None, + }, + FieldSchema { + name: Some("c10".into()), + r#type: Some("string".into()), + comment: None, + }, + FieldSchema { + name: Some("c11".into()), + r#type: Some("string".into()), + comment: None, + }, + FieldSchema { + name: Some("c12".into()), + r#type: Some("binary".into()), + comment: None, + }, + FieldSchema { + name: Some("c13".into()), + r#type: Some("binary".into()), + comment: None, + }, + ]; + + assert_eq!(result, expected); + + Ok(()) + } +} diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 02f32c658..04ee5d4b3 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -15,59 +15,52 @@ // specific language governing permissions and limitations // under the License. -use anyhow::anyhow; -use hive_metastore::{Database, PrincipalType}; -use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; +use chrono::Utc; +use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor}; +use iceberg::{spec::Schema, Error, ErrorKind, Namespace, NamespaceIdent, Result}; use pilota::{AHashMap, FastStr}; use std::collections::HashMap; -use std::fmt::Debug; -use std::io; +use uuid::Uuid; + +use crate::schema::HiveSchemaBuilder; /// hive.metastore.database.owner setting -pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; +const HMS_DB_OWNER: &str = "hive.metastore.database.owner"; /// hive.metastore.database.owner default setting -pub const HMS_DEFAULT_DB_OWNER: &str = "user.name"; +const HMS_DEFAULT_DB_OWNER: &str = "user.name"; /// hive.metastore.database.owner-type setting -pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; +const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type"; +/// hive metatore `owner` property +const OWNER: &str = "owner"; /// hive metatore `description` property -pub const COMMENT: &str = "comment"; +const COMMENT: &str = "comment"; /// hive metatore `location` property -pub const LOCATION: &str = "location"; - -/// Format a thrift error into iceberg error. -pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error -where - T: Debug, -{ - Error::new( - ErrorKind::Unexpected, - "operation failed for hitting thrift error".to_string(), - ) - .with_source(anyhow!("thrift error: {:?}", error)) -} - -/// Format an io error into iceberg error. -pub fn from_io_error(error: io::Error) -> Error { - Error::new( - ErrorKind::Unexpected, - "operation failed for hitting io error".to_string(), - ) - .with_source(error) -} +const LOCATION: &str = "location"; +/// hive metatore `metadat_location` property +const METADATA_LOCATION: &str = "metadata_location"; +/// hive metatore `external` property +const EXTERNAL: &str = "EXTERNAL"; +/// hive metatore `external_table` property +const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE"; +/// hive metatore `table_type` property +const TABLE_TYPE: &str = "table_type"; +/// hive metatore `SerDeInfo` serialization_lib parameter +const SERIALIZATION_LIB: &str = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; +/// hive metatore input format +const INPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileInputFormat"; +/// hive metatore output format +const OUTPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileOutputFormat"; /// Returns a `Namespace` by extracting database name and properties /// from `hive_metastore::hms::Database` pub(crate) fn convert_to_namespace(database: &Database) -> Result { let mut properties = HashMap::new(); - let name = if let Some(name) = &database.name { - name.to_string() - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - "Database name must be specified", - )); - }; + let name = database + .name + .as_ref() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Database name must be specified"))? + .to_string(); if let Some(description) = &database.description { properties.insert(COMMENT.to_string(), description.to_string()); @@ -157,6 +150,57 @@ pub(crate) fn convert_to_database( Ok(db) } +pub(crate) fn convert_to_hive_table( + db_name: String, + schema: &Schema, + table_name: String, + location: String, + metadata_location: String, + properties: &HashMap, +) -> Result { + let serde_info = SerDeInfo { + serialization_lib: Some(SERIALIZATION_LIB.into()), + ..Default::default() + }; + + let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build(); + + let storage_descriptor = StorageDescriptor { + location: Some(location.into()), + cols: Some(hive_schema), + input_format: Some(INPUT_FORMAT.into()), + output_format: Some(OUTPUT_FORMAT.into()), + serde_info: Some(serde_info), + ..Default::default() + }; + + let parameters = AHashMap::from([ + (FastStr::from(EXTERNAL), FastStr::from("TRUE")), + (FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")), + ( + FastStr::from(METADATA_LOCATION), + FastStr::from(metadata_location), + ), + ]); + + let current_time_ms = get_current_time()?; + let owner = properties + .get(OWNER) + .map_or(HMS_DEFAULT_DB_OWNER.to_string(), |v| v.into()); + + Ok(hive_metastore::Table { + table_name: Some(table_name.into()), + db_name: Some(db_name.into()), + table_type: Some(EXTERNAL_TABLE.into()), + owner: Some(owner.into()), + create_time: Some(current_time_ms), + last_access_time: Some(current_time_ms), + sd: Some(storage_descriptor), + parameters: Some(parameters), + ..Default::default() + }) +} + /// Checks if provided `NamespaceIdent` is valid. pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { let name = namespace.as_ref(); @@ -183,6 +227,65 @@ pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { Ok(name) } +/// Get default table location from `Namespace` properties +pub(crate) fn get_default_table_location( + namespace: &Namespace, + table_name: impl AsRef, + warehouse: impl AsRef, +) -> String { + let properties = namespace.properties(); + + let location = match properties.get(LOCATION) { + Some(location) => location, + None => warehouse.as_ref(), + }; + + format!("{}/{}", location, table_name.as_ref()) +} + +/// Create metadata location from `location` and `version` +pub(crate) fn create_metadata_location(location: impl AsRef, version: i32) -> Result { + if version < 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata version: '{}' must be a non-negative integer", + version + ), + )); + }; + + let version = format!("{:0>5}", version); + let id = Uuid::new_v4(); + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + location.as_ref(), + version, + id + ); + + Ok(metadata_location) +} + +/// Get metadata location from `HiveTable` parameters +pub(crate) fn get_metadata_location( + parameters: &Option>, +) -> Result { + match parameters { + Some(properties) => match properties.get(METADATA_LOCATION) { + Some(location) => Ok(location.to_string()), + None => Err(Error::new( + ErrorKind::DataInvalid, + format!("No '{}' set on table", METADATA_LOCATION), + )), + }, + None => Err(Error::new( + ErrorKind::DataInvalid, + "No 'parameters' set on table. Location of metadata is undefined", + )), + } +} + /// Formats location_uri by e.g. removing trailing slashes. fn format_location_uri(location: String) -> String { let mut location = location; @@ -217,12 +320,141 @@ fn validate_owner_settings(properties: &HashMap) -> Result<()> { Ok(()) } +fn get_current_time() -> Result { + let now = Utc::now(); + now.timestamp().try_into().map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "Current time is out of range for i32", + ) + }) +} + #[cfg(test)] mod tests { - use iceberg::{Namespace, NamespaceIdent}; + use iceberg::{ + spec::{NestedField, PrimitiveType, Type}, + Namespace, NamespaceIdent, + }; use super::*; + #[test] + fn test_get_metadata_location() -> Result<()> { + let params_valid = Some(AHashMap::from([( + FastStr::new(METADATA_LOCATION), + FastStr::new("my_location"), + )])); + let params_missing_key = Some(AHashMap::from([( + FastStr::new("not_here"), + FastStr::new("my_location"), + )])); + + let result_valid = get_metadata_location(¶ms_valid)?; + let result_missing_key = get_metadata_location(¶ms_missing_key); + let result_no_params = get_metadata_location(&None); + + assert_eq!(result_valid, "my_location"); + assert!(result_missing_key.is_err()); + assert!(result_no_params.is_err()); + + Ok(()) + } + + #[test] + fn test_convert_to_hive_table() -> Result<()> { + let db_name = "my_db".to_string(); + let table_name = "my_table".to_string(); + let location = "s3a://warehouse/hms".to_string(); + let metadata_location = create_metadata_location(location.clone(), 0)?; + let properties = HashMap::new(); + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build()?; + + let result = convert_to_hive_table( + db_name.clone(), + &schema, + table_name.clone(), + location.clone(), + metadata_location, + &properties, + )?; + + let serde_info = SerDeInfo { + serialization_lib: Some(SERIALIZATION_LIB.into()), + ..Default::default() + }; + + let hive_schema = HiveSchemaBuilder::from_iceberg(&schema)?.build(); + + let sd = StorageDescriptor { + location: Some(location.into()), + cols: Some(hive_schema), + input_format: Some(INPUT_FORMAT.into()), + output_format: Some(OUTPUT_FORMAT.into()), + serde_info: Some(serde_info), + ..Default::default() + }; + + assert_eq!(result.db_name, Some(db_name.into())); + assert_eq!(result.table_name, Some(table_name.into())); + assert_eq!(result.table_type, Some(EXTERNAL_TABLE.into())); + assert_eq!(result.owner, Some(HMS_DEFAULT_DB_OWNER.into())); + assert_eq!(result.sd, Some(sd)); + + Ok(()) + } + + #[test] + fn test_create_metadata_location() -> Result<()> { + let location = "my_base_location"; + let valid_version = 0; + let invalid_version = -1; + + let valid_result = create_metadata_location(location, valid_version)?; + let invalid_result = create_metadata_location(location, invalid_version); + + assert!(valid_result.starts_with("my_base_location/metadata/00000-")); + assert!(valid_result.ends_with(".metadata.json")); + assert!(invalid_result.is_err()); + + Ok(()) + } + + #[test] + fn test_get_default_table_location() -> Result<()> { + let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]); + + let namespace = + Namespace::with_properties(NamespaceIdent::new("default".into()), properties); + let table_name = "my_table"; + + let expected = "db_location/my_table"; + let result = get_default_table_location(&namespace, table_name, "warehouse_location"); + + assert_eq!(expected, result); + + Ok(()) + } + + #[test] + fn test_get_default_table_location_warehouse() -> Result<()> { + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + let table_name = "my_table"; + + let expected = "warehouse_location/my_table"; + let result = get_default_table_location(&namespace, table_name, "warehouse_location"); + + assert_eq!(expected, result); + + Ok(()) + } + #[test] fn test_convert_to_namespace() -> Result<()> { let properties = HashMap::from([ diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index bab83a955..a48d0568c 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -19,7 +19,9 @@ use std::collections::HashMap; -use iceberg::{Catalog, Namespace, NamespaceIdent}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; @@ -27,6 +29,7 @@ use port_scanner::scan_port_addr; use tokio::time::sleep; const HMS_CATALOG_PORT: u16 = 9083; +const MINIO_PORT: u16 = 9000; type Result = std::result::Result; struct TestFixture { @@ -45,6 +48,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { docker_compose.run(); let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore"); + let minio_ip = docker_compose.get_container_ip("minio"); let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT); loop { @@ -56,9 +60,21 @@ async fn set_test_fixture(func: &str) -> TestFixture { } } + let props = HashMap::from([ + ( + S3_ENDPOINT.to_string(), + format!("http://{}:{}", minio_ip, MINIO_PORT), + ), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ]); + let config = HmsCatalogConfig::builder() .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT)) .thrift_transport(HmsThriftTransport::Buffered) + .warehouse("s3a://warehouse/hive".to_string()) + .props(props) .build(); let hms_catalog = HmsCatalog::new(config).unwrap(); @@ -69,6 +85,163 @@ async fn set_test_fixture(func: &str) -> TestFixture { } } +fn set_table_creation(location: impl ToString, name: impl ToString) -> Result { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let creation = TableCreation::builder() + .location(location.to_string()) + .name(name.to_string()) + .properties(HashMap::new()) + .schema(schema) + .build(); + + Ok(creation) +} + +#[tokio::test] +async fn test_rename_table() -> Result<()> { + let fixture = set_test_fixture("test_rename_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + + let table = fixture + .hms_catalog + .create_table(namespace.name(), creation) + .await?; + + let dest = TableIdent::new(namespace.name().clone(), "my_table_rename".to_string()); + + fixture + .hms_catalog + .rename_table(table.identifier(), &dest) + .await?; + + let result = fixture.hms_catalog.table_exists(&dest).await?; + + assert!(result); + + Ok(()) +} + +#[tokio::test] +async fn test_table_exists() -> Result<()> { + let fixture = set_test_fixture("test_table_exists").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + + let table = fixture + .hms_catalog + .create_table(namespace.name(), creation) + .await?; + + let result = fixture.hms_catalog.table_exists(table.identifier()).await?; + + assert!(result); + + Ok(()) +} + +#[tokio::test] +async fn test_drop_table() -> Result<()> { + let fixture = set_test_fixture("test_drop_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + + let table = fixture + .hms_catalog + .create_table(namespace.name(), creation) + .await?; + + fixture.hms_catalog.drop_table(table.identifier()).await?; + + let result = fixture.hms_catalog.table_exists(table.identifier()).await?; + + assert!(!result); + + Ok(()) +} + +#[tokio::test] +async fn test_load_table() -> Result<()> { + let fixture = set_test_fixture("test_load_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + + let expected = fixture + .hms_catalog + .create_table(namespace.name(), creation) + .await?; + + let result = fixture + .hms_catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(result.identifier(), expected.identifier()); + assert_eq!(result.metadata_location(), expected.metadata_location()); + assert_eq!(result.metadata(), expected.metadata()); + + Ok(()) +} + +#[tokio::test] +async fn test_create_table() -> Result<()> { + let fixture = set_test_fixture("test_create_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + + let result = fixture + .hms_catalog + .create_table(namespace.name(), creation) + .await?; + + assert_eq!(result.identifier().name(), "my_table"); + assert!(result + .metadata_location() + .is_some_and(|location| location.starts_with("s3a://warehouse/hive/metadata/00000-"))); + assert!( + fixture + .hms_catalog + .file_io() + .is_exist("s3a://warehouse/hive/metadata/") + .await? + ); + + Ok(()) +} + +#[tokio::test] +async fn test_list_tables() -> Result<()> { + let fixture = set_test_fixture("test_list_tables").await; + let ns = Namespace::new(NamespaceIdent::new("default".into())); + let result = fixture.hms_catalog.list_tables(ns.name()).await?; + + assert_eq!(result, vec![]); + + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + fixture + .hms_catalog + .create_table(ns.name(), creation) + .await?; + let result = fixture.hms_catalog.list_tables(ns.name()).await?; + + assert_eq!( + result, + vec![TableIdent::new(ns.name().clone(), "my_table".to_string())] + ); + + Ok(()) +} + #[tokio::test] async fn test_list_namespace() -> Result<()> { let fixture = set_test_fixture("test_list_namespace").await; @@ -208,16 +381,3 @@ async fn test_drop_namespace() -> Result<()> { Ok(()) } - -#[tokio::test] -async fn test_list_tables() -> Result<()> { - let fixture = set_test_fixture("test_list_tables").await; - - let ns = Namespace::new(NamespaceIdent::new("default".into())); - - let result = fixture.hms_catalog.list_tables(ns.name()).await?; - - assert_eq!(result, vec![]); - - Ok(()) -}