From d54b29a092664cb3073736ce5cc336bf7ae68774 Mon Sep 17 00:00:00 2001 From: camilesing Date: Fri, 25 Apr 2025 08:55:43 +0800 Subject: [PATCH] feat: add cattalog API --- crates/paimon/src/catalog/mod.rs | 319 +++++++++++++++++++++++++++++++ crates/paimon/src/error.rs | 32 +++- crates/paimon/src/lib.rs | 2 +- 3 files changed, 351 insertions(+), 2 deletions(-) create mode 100644 crates/paimon/src/catalog/mod.rs diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs new file mode 100644 index 0000000..39281f1 --- /dev/null +++ b/crates/paimon/src/catalog/mod.rs @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt; +use std::hash::Hash; + +use async_trait::async_trait; +use chrono::Duration; + +use crate::error::Result; +use crate::io::FileIO; +use crate::spec::{RowType, SchemaChange, TableSchema}; + +/// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog. +/// +/// Impl References: +#[async_trait] +pub trait Catalog: Send + Sync { + fn default_database_info(&self) -> &str { + "default" + } + + fn default_system_table_splitter_info(&self) -> &str { + "$" + } + + fn default_system_database_name_info(&self) -> &str { + "sys" + } + + /// Returns the warehouse root path containing all database directories in this catalog. + fn warehouse(&self) -> &str; + + /// Returns the catalog options. + fn options(&self) -> &HashMap; + + /// Returns the FileIO instance. + fn file_io(&self) -> &FileIO; + + /// Lists all databases in this catalog. + async fn list_databases(&self) -> Result>; + + /// Checks if a database exists in this catalog. + async fn database_exists(&self, database_name: &str) -> Result; + + /// Creates a new database. + async fn create_database( + &self, + name: &str, + ignore_if_exists: bool, + properties: Option>, + ) -> Result<()>; + + /// Loads database properties. + async fn load_database_properties(&self, name: &str) -> Result>; + + /// Drops a database. + async fn drop_database( + &self, + name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()>; + + /// Returns a Table instance for the specified identifier. + async fn get_table(&self, identifier: &Identifier) -> Result>; + + /// Lists all tables in the specified database. + async fn list_tables(&self, database_name: &str) -> Result>; + + /// Checks if a table exists. + async fn table_exists(&self, identifier: &Identifier) -> Result { + match self.get_table(identifier).await { + Ok(_) => Ok(true), + Err(e) => match e { + crate::error::Error::TableNotExist { .. } => Ok(false), + _ => Err(e), + }, + } + } + + /// Drops a table. + async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>; + + /// Creates a new table. + async fn create_table( + &self, + identifier: &Identifier, + schema: TableSchema, + ignore_if_exists: bool, + ) -> Result<()>; + + /// Renames a table. + async fn rename_table( + &self, + from_table: &Identifier, + to_table: &Identifier, + ignore_if_not_exists: bool, + ) -> Result<()>; + + /// Alters an existing table. + async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()>; + + /// Drops a partition from the specified table. + async fn drop_partition( + &self, + identifier: &Identifier, + partitions: &HashMap, + ) -> Result<()>; + + /// Returns whether this catalog is case-sensitive. + fn case_sensitive(&self) -> bool { + true + } +} + +/// Identifies an object in a catalog. +/// +/// Impl References: +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Identifier { + database: String, + table: String, +} + +impl Identifier { + pub const UNKNOWN_DATABASE: &'static str = "unknown"; + + /// Create a new identifier. + pub fn new(database: String, table: String) -> Self { + Self { database, table } + } + + /// Get the table name. + pub fn database_name(&self) -> &str { + &self.database + } + + /// Get the table name. + pub fn object_name(&self) -> &str { + &self.table + } + + /// Get the full name of the identifier. + pub fn full_name(&self) -> String { + if self.database == Self::UNKNOWN_DATABASE { + self.table.clone() + } else { + format!("{}.{}", self.database, self.table) + } + } + + /// Get the full name of the identifier with a specified character. + pub fn escaped_full_name(&self) -> String { + self.escaped_full_name_with_char('`') + } + + /// Get the full name of the identifier with a specified character. + pub fn escaped_full_name_with_char(&self, escape_char: char) -> String { + format!( + "{0}{1}{0}.{0}{2}{0}", + escape_char, self.database, self.table + ) + } + + /// Create a new identifier. + pub fn create(db: &str, table: &str) -> Self { + Self::new(db.to_string(), table.to_string()) + } +} + +impl fmt::Display for Identifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.full_name()) + } +} + +/// A table provides basic abstraction for a table type and table scan, and table read. +/// +/// Impl Reference: +pub trait Table { + // ================== Table Metadata ===================== + + /// A name to identify this table. + fn name(&self) -> &str; + + /// Returns the row type of this table. + fn row_type(&self) -> &RowType; + + /// Partition keys of this table. + fn partition_keys(&self) -> Vec; + + /// Primary keys of this table. + fn primary_keys(&self) -> Vec; + + /// Options of this table. + fn options(&self) -> HashMap; + + /// Optional comment of this table. + fn comment(&self) -> Option<&String>; + + // ================= Table Operations ==================== + + /// Copy this table with adding dynamic options. + fn copy(&self, dynamic_options: HashMap) -> Box; + + /// Rollback table's state to a specific snapshot. + fn rollback_to(&mut self, snapshot_id: u64); + + /// Create a tag from given snapshot. + fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64); + + fn create_tag_with_retention( + &mut self, + tag_name: &str, + from_snapshot_id: u64, + time_retained: Duration, + ); + + /// Create a tag from the latest snapshot. + fn create_tag_from_latest(&mut self, tag_name: &str); + + fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration); + + /// Delete a tag by name. + fn delete_tag(&mut self, tag_name: &str); + + /// Rollback table's state to a specific tag. + fn rollback_to_tag(&mut self, tag_name: &str); + + /// Create an empty branch. + fn create_branch(&mut self, branch_name: &str); + + /// Create a branch from given snapshot. + fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64); + + /// Create a branch from given tag. + fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str); + + /// Delete a branch by branchName. + fn delete_branch(&mut self, branch_name: &str); +} +#[cfg(test)] +mod catalog_tests { + use super::*; + + #[tokio::test] + async fn test_full_name_identifier() { + let database_name = "trade".to_string(); + let table_name = "dwv_xxxxx".to_string(); + let my_sign = "`".to_string(); + + let identifier = Identifier { + database: database_name.clone(), + table: table_name.clone(), + }; + + assert_eq!(identifier.database_name(), database_name); + assert_eq!(identifier.object_name(), table_name); + assert_eq!( + identifier.full_name(), + format!("{}.{}", database_name.clone(), table_name.clone()) + ); + assert_eq!( + identifier.escaped_full_name(), + format!( + "{0}{1}{0}.{0}{2}{0}", + my_sign, + database_name.clone(), + table_name.clone() + ) + ); + } + + #[tokio::test] + async fn test_unkown_name_identifier() { + let database_name = "unknown".to_string(); + let table_name = "dwv_xxxxx".to_string(); + let my_sign = "`".to_string(); + + let identifier = Identifier { + database: database_name.to_string(), + table: table_name.clone(), + }; + + assert_eq!(identifier.database_name(), database_name); + assert_eq!(identifier.object_name(), table_name); + assert_eq!(identifier.full_name(), table_name.clone()); + assert_eq!( + identifier.escaped_full_name(), + format!( + "{0}{1}{0}.{0}{2}{0}", + my_sign, + database_name.clone(), + table_name.clone() + ) + ); + } +} diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index d7cfd18..ca05fa6 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use crate::catalog::Identifier; use snafu::prelude::*; - /// Result type used in paimon. pub type Result = std::result::Result; @@ -65,6 +65,36 @@ pub enum Error { display("Paimon hitting invalid file index format: {}", message) )] FileIndexFormatInvalid { message: String }, + + #[snafu(display("Database {} is not empty.", database))] + DatabaseNotEmpty { database: String }, + + #[snafu(display("Database {} already exists.", database))] + DatabaseAlreadyExist { database: String }, + + #[snafu(display("Database {} does not exist.", database))] + DatabaseNotExist { database: String }, + + #[snafu(display("Can't do operation on system database."))] + ProcessSystemDatabase, + + #[snafu(display("Table {} already exists.", identifier.full_name()))] + TableAlreadyExist { identifier: Identifier }, + + #[snafu(display("Table {} does not exist.", identifier.full_name()))] + TableNotExist { identifier: Identifier }, + + #[snafu(display("Partition {} do not exist in the table {}.", identifier.full_name(), partitions))] + PartitionNotExist { + identifier: Identifier, + partitions: String, + }, + + #[snafu(display("Column {} already exists.", column_name))] + ColumnAlreadyExist { column_name: String }, + + #[snafu(display("Column {} does not exist.", column_name))] + ColumnNotExist { column_name: String }, } impl From for Error { diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 5296a02..46cf190 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -18,7 +18,7 @@ mod error; pub use error::Error; pub use error::Result; - +pub mod catalog; pub mod file_index; pub mod io; pub mod spec;