diff --git a/datafusion_iceberg/src/materialized_view.rs b/datafusion_iceberg/src/materialized_view.rs index 2626db3d..5911c329 100644 --- a/datafusion_iceberg/src/materialized_view.rs +++ b/datafusion_iceberg/src/materialized_view.rs @@ -13,7 +13,7 @@ use iceberg_rust::{ sql::find_relations, }; use iceberg_rust_spec::spec::{ - snapshot::{FullIdentifier, SourceTable}, + snapshot::{FullIdentifier, Lineage, SourceTable}, view_metadata::ViewRepresentation, }; use itertools::Itertools; @@ -60,12 +60,13 @@ pub async fn refresh_materialized_view( .collect::, Error>>()?, }; + // Load source tables let source_tables = stream::iter(source_tables.iter()) - .then(|base_table| { + .then(|source_table| { let catalog_list = catalog_list.clone(); let branch = branch.clone(); async move { - let identifier = base_table.identifier(); + let identifier = source_table.identifier(); let catalog_name = identifier.catalog(); let catalog = catalog_list .catalog(catalog_name) @@ -76,8 +77,10 @@ pub async fn refresh_materialized_view( ))?; let tabular = match catalog - .load_table(&Identifier::try_new(&[identifier.namespace().clone(), - identifier.table_name().clone()])?) + .load_table(&Identifier::try_new(&[ + identifier.namespace().clone(), + identifier.table_name().clone(), + ])?) .await? { Tabular::View(_) => { @@ -101,12 +104,12 @@ pub async fn refresh_materialized_view( _ => Err(Error::InvalidFormat("storage table".to_string())), }?; - let table_state = if current_snapshot_id == *base_table.snapshot_id() { + let table_state = if current_snapshot_id == *source_table.snapshot_id() { StorageTableState::Fresh - } else if *base_table.snapshot_id() == -1 { + } else if *source_table.snapshot_id() == -1 { StorageTableState::Invalid } else { - StorageTableState::Outdated(*base_table.snapshot_id()) + StorageTableState::Outdated(*source_table.snapshot_id()) }; Ok((catalog_name, tabular, table_state, current_snapshot_id)) @@ -115,16 +118,15 @@ pub async fn refresh_materialized_view( .try_collect::>() .await?; - // Full refresh - - let new_tables = source_tables + // Register source tables in datafusion context and return lineage information + let source_tables = source_tables .into_iter() - .flat_map(|(catalog_name, base_table, _, last_snapshot_id)| { - let identifier = base_table.identifier().to_string().to_owned(); - let uuid = *base_table.metadata().uuid(); + .flat_map(|(catalog_name, source_table, _, last_snapshot_id)| { + let identifier = source_table.identifier().to_string().to_owned(); + let uuid = *source_table.metadata().as_ref().uuid(); let table = Arc::new(DataFusionTable::new( - base_table, + source_table, None, None, branch.as_deref(), @@ -167,6 +169,7 @@ pub async fn refresh_materialized_view( let logical_plan = ctx.state().create_logical_plan(&sql_statements[0]).await?; + // Calculate arrow record batches from logical plan let batches = ctx .execute_logical_plan(logical_plan) .await? @@ -174,6 +177,7 @@ pub async fn refresh_materialized_view( .await? .map_err(ArrowError::from); + // Write arrow record batches to datafiles let files = write_parquet_partitioned( &storage_table.table_metadata, batches, @@ -183,7 +187,9 @@ pub async fn refresh_materialized_view( .await?; matview - .full_refresh(files, version_id, new_tables, branch) + .new_transaction(branch.as_deref()) + .full_refresh(files, Lineage::new(version_id, source_tables)) + .commit() .await?; Ok(()) diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index 55f24c93..cae63158 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -597,17 +597,14 @@ mod tests { }; use iceberg_catalog_sql::SqlCatalog; use iceberg_rust::{ - catalog::{ - identifier::Identifier, - tabular::{Tabular, TabularMetadata}, - Catalog, - }, + catalog::{identifier::Identifier, tabular::Tabular, Catalog}, table::table_builder::TableBuilder, view::view_builder::ViewBuilder, }; use iceberg_rust_spec::spec::{ partition::{PartitionField, PartitionSpecBuilder, Transform}, schema::Schema, + tabular::TabularMetadata, types::{PrimitiveType, StructField, StructType, Type}, }; use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; diff --git a/iceberg-catalog-sql/src/lib.rs b/iceberg-catalog-sql/src/lib.rs index dcae0199..707bc8d7 100644 --- a/iceberg-catalog-sql/src/lib.rs +++ b/iceberg-catalog-sql/src/lib.rs @@ -12,12 +12,12 @@ use iceberg_rust::{ }, identifier::Identifier, namespace::Namespace, - tabular::{Tabular, TabularMetadata}, + tabular::Tabular, Catalog, CatalogList, }, error::Error as IcebergError, materialized_view::MaterializedView, - spec::table_metadata::new_metadata_location, + spec::{table_metadata::new_metadata_location, tabular::TabularMetadata}, table::Table, util::strip_prefix, view::View, @@ -245,16 +245,16 @@ impl Catalog for SqlCatalog { metadata: TabularMetadata, ) -> Result { // Create metadata + let location = metadata.as_ref().location().to_string(); // Write metadata to object_store - let bucket = parse_bucket(metadata.location())?; + let bucket = parse_bucket(&location)?; let object_store = self.object_store(bucket); - let location = &metadata.location(); let uuid = Uuid::new_v4(); - let version = &metadata.sequence_number(); + let version = &metadata.as_ref().sequence_number(); let metadata_json = serde_json::to_string(&metadata)?; - let metadata_location = location.to_string() + let metadata_location = location + "/metadata/" + &version.to_string() + "-" @@ -308,7 +308,7 @@ impl Catalog for SqlCatalog { )); } apply_table_updates(&mut metadata, commit.updates)?; - let metadata_location = new_metadata_location(&metadata)?; + let metadata_location = new_metadata_location((&metadata).into()); self.object_store .put( &strip_prefix(&metadata_location).into(), diff --git a/iceberg-rust-spec/src/spec/mod.rs b/iceberg-rust-spec/src/spec/mod.rs index 95efe634..30f97954 100644 --- a/iceberg-rust-spec/src/spec/mod.rs +++ b/iceberg-rust-spec/src/spec/mod.rs @@ -9,6 +9,7 @@ pub mod schema; pub mod snapshot; pub mod sort; pub mod table_metadata; +pub mod tabular; pub mod types; pub mod values; pub mod view_metadata; diff --git a/iceberg-rust-spec/src/spec/table_metadata.rs b/iceberg-rust-spec/src/spec/table_metadata.rs index 0b2e194b..87654f84 100644 --- a/iceberg-rust-spec/src/spec/table_metadata.rs +++ b/iceberg-rust-spec/src/spec/table_metadata.rs @@ -25,6 +25,7 @@ use derive_builder::Builder; use super::{ schema::Schema, snapshot::{Snapshot, SnapshotReference}, + tabular::TabularMetadataRef, }; pub static MAIN_BRANCH: &str = "main"; @@ -214,15 +215,16 @@ impl TableMetadata { } } -pub fn new_metadata_location(metadata: &TableMetadata) -> Result { +pub fn new_metadata_location<'a>(metadata: TabularMetadataRef<'a>) -> String { let transaction_uuid = Uuid::new_v4(); - let version = metadata.last_sequence_number; - Ok(metadata.location.to_string() + let version = metadata.sequence_number(); + + metadata.location().to_string() + "/metadata/" + &version.to_string() + "-" + &transaction_uuid.to_string() - + ".metadata.json") + + ".metadata.json" } mod _serde { diff --git a/iceberg-rust-spec/src/spec/tabular.rs b/iceberg-rust-spec/src/spec/tabular.rs new file mode 100644 index 00000000..748d94b1 --- /dev/null +++ b/iceberg-rust-spec/src/spec/tabular.rs @@ -0,0 +1,106 @@ +/*! Enum for Metadata of Table, View or Materialized View +*/ + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::{ + materialized_view_metadata::MaterializedViewMetadata, table_metadata::TableMetadata, + view_metadata::ViewMetadata, +}; +/// Metadata of an iceberg relation +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(untagged)] +#[allow(clippy::large_enum_variant)] +pub enum TabularMetadata { + /// Table metadata + Table(TableMetadata), + /// View metadata + View(ViewMetadata), + /// Materialized view metadata + MaterializedView(MaterializedViewMetadata), +} + +impl TabularMetadata { + pub fn as_ref<'a>(&'a self) -> TabularMetadataRef<'a> { + match self { + TabularMetadata::Table(table) => TabularMetadataRef::Table(table), + TabularMetadata::View(view) => TabularMetadataRef::View(view), + TabularMetadata::MaterializedView(matview) => { + TabularMetadataRef::MaterializedView(matview) + } + } + } +} + +impl From for TabularMetadata { + fn from(value: TableMetadata) -> Self { + TabularMetadata::Table(value) + } +} + +impl From for TabularMetadata { + fn from(value: ViewMetadata) -> Self { + TabularMetadata::View(value) + } +} + +impl From for TabularMetadata { + fn from(value: MaterializedViewMetadata) -> Self { + TabularMetadata::MaterializedView(value) + } +} + +pub enum TabularMetadataRef<'a> { + /// Table metadata + Table(&'a TableMetadata), + /// View metadata + View(&'a ViewMetadata), + /// Materialized view metadata + MaterializedView(&'a MaterializedViewMetadata), +} + +impl<'a> TabularMetadataRef<'a> { + /// Get uuid of tabular + pub fn uuid(&self) -> &Uuid { + match self { + TabularMetadataRef::Table(table) => &table.table_uuid, + TabularMetadataRef::View(view) => &view.view_uuid, + TabularMetadataRef::MaterializedView(matview) => &matview.view_uuid, + } + } + /// Get location for tabular + pub fn location(&self) -> &str { + match self { + TabularMetadataRef::Table(table) => &table.location, + TabularMetadataRef::View(view) => &view.location, + TabularMetadataRef::MaterializedView(matview) => &matview.location, + } + } + /// Get sequence number for tabular + pub fn sequence_number(&self) -> i64 { + match self { + TabularMetadataRef::Table(table) => table.last_sequence_number, + TabularMetadataRef::View(view) => view.current_version_id, + TabularMetadataRef::MaterializedView(matview) => matview.current_version_id, + } + } +} + +impl<'a> From<&'a TableMetadata> for TabularMetadataRef<'a> { + fn from(value: &'a TableMetadata) -> Self { + TabularMetadataRef::Table(value) + } +} + +impl<'a> From<&'a ViewMetadata> for TabularMetadataRef<'a> { + fn from(value: &'a ViewMetadata) -> Self { + TabularMetadataRef::View(value) + } +} + +impl<'a> From<&'a MaterializedViewMetadata> for TabularMetadataRef<'a> { + fn from(value: &'a MaterializedViewMetadata) -> Self { + TabularMetadataRef::MaterializedView(value) + } +} diff --git a/iceberg-rust/src/catalog/mod.rs b/iceberg-rust/src/catalog/mod.rs index b93cc251..9203e29d 100644 --- a/iceberg-rust/src/catalog/mod.rs +++ b/iceberg-rust/src/catalog/mod.rs @@ -8,6 +8,7 @@ use std::sync::Arc; pub mod identifier; pub mod namespace; +use iceberg_rust_spec::spec::tabular::TabularMetadata; use identifier::Identifier; use object_store::ObjectStore; @@ -17,7 +18,7 @@ use crate::table::Table; use self::bucket::Bucket; use self::commit::{CommitTable, CommitView}; use self::namespace::Namespace; -use self::tabular::{Tabular, TabularMetadata}; +use self::tabular::Tabular; pub mod bucket; pub mod commit; diff --git a/iceberg-rust/src/catalog/tabular.rs b/iceberg-rust/src/catalog/tabular.rs index 9eb4507a..78570df8 100644 --- a/iceberg-rust/src/catalog/tabular.rs +++ b/iceberg-rust/src/catalog/tabular.rs @@ -4,12 +4,8 @@ use std::sync::Arc; -use iceberg_rust_spec::spec::materialized_view_metadata::MaterializedViewMetadata; -use iceberg_rust_spec::spec::table_metadata::TableMetadata; -use iceberg_rust_spec::spec::view_metadata::ViewMetadata; +use iceberg_rust_spec::spec::tabular::TabularMetadata; use object_store::ObjectStore; -use serde::{self, Deserialize, Serialize}; -use uuid::Uuid; use crate::error::Error; use crate::materialized_view::MaterializedView; @@ -108,64 +104,6 @@ impl Tabular { } } -/// Metadata of an iceberg relation -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(untagged)] -#[allow(clippy::large_enum_variant)] -pub enum TabularMetadata { - /// Table metadata - Table(TableMetadata), - /// View metadata - View(ViewMetadata), - /// Materialized view metadata - MaterializedView(MaterializedViewMetadata), -} - -impl TabularMetadata { - /// Get uuid of tabular - pub fn uuid(&self) -> &Uuid { - match self { - TabularMetadata::Table(table) => &table.table_uuid, - TabularMetadata::View(view) => &view.view_uuid, - TabularMetadata::MaterializedView(matview) => &matview.view_uuid, - } - } - /// Get location for tabular - pub fn location(&self) -> &str { - match self { - TabularMetadata::Table(table) => &table.location, - TabularMetadata::View(view) => &view.location, - TabularMetadata::MaterializedView(matview) => &matview.location, - } - } - /// Get sequence number for tabular - pub fn sequence_number(&self) -> i64 { - match self { - TabularMetadata::Table(table) => table.last_sequence_number, - TabularMetadata::View(view) => view.current_version_id, - TabularMetadata::MaterializedView(matview) => matview.current_version_id, - } - } -} - -impl From for TabularMetadata { - fn from(value: TableMetadata) -> Self { - TabularMetadata::Table(value) - } -} - -impl From for TabularMetadata { - fn from(value: ViewMetadata) -> Self { - TabularMetadata::View(value) - } -} - -impl From for TabularMetadata { - fn from(value: MaterializedViewMetadata) -> Self { - TabularMetadata::MaterializedView(value) - } -} - /// Fetch metadata of a tabular(table, view, materialized view) structure from an object_store pub async fn get_tabular_metadata( metadata_location: &str, diff --git a/iceberg-rust/src/materialized_view/mod.rs b/iceberg-rust/src/materialized_view/mod.rs index 0ac63bbf..f82a2363 100644 --- a/iceberg-rust/src/materialized_view/mod.rs +++ b/iceberg-rust/src/materialized_view/mod.rs @@ -2,33 +2,20 @@ * Defines the [MaterializedView] struct that represents an iceberg materialized view. */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; -use futures::{lock::Mutex, stream, StreamExt, TryStreamExt}; use iceberg_rust_spec::{ spec::{ - manifest::{Content, DataFile}, - manifest_list::ManifestListEntry, - materialized_view_metadata::{MaterializedViewMetadata, VersionId}, - schema::Schema, - snapshot::{ - generate_snapshot_id, Lineage, SnapshotBuilder, SnapshotReference, SnapshotRetention, - SourceTable, - }, - table_metadata::{new_metadata_location, TableMetadataBuilder}, - values::Struct, + materialized_view_metadata::MaterializedViewMetadata, schema::Schema, + tabular::TabularMetadata, }, util::strip_prefix, }; use object_store::ObjectStore; use crate::{ - catalog::{bucket::parse_bucket, identifier::Identifier, tabular::TabularMetadata, Catalog}, + catalog::{bucket::parse_bucket, identifier::Identifier, Catalog}, error::Error, - table::{ - delete_files, - transaction::operation::{write_manifest, ManifestStatus}, - }, }; use self::{storage_table::StorageTable, transaction::Transaction as MaterializedViewTransaction}; @@ -118,195 +105,4 @@ impl MaterializedView { Err(Error::InvalidFormat("storage table".to_string())) } } - - /// Replace the entire storage table with new datafiles - pub async fn full_refresh( - &mut self, - files: Vec, - version_id: VersionId, - base_tables: Vec, - branch: Option, - ) -> Result<(), Error> { - let object_store = self.object_store(); - - let old_storage_table_metadata = self.storage_table().await?.table_metadata; - let schema = old_storage_table_metadata - .current_schema(branch.as_deref())? - .clone(); - - // Split datafils by partition - let datafiles = Arc::new(files.into_iter().map(Ok::<_, Error>).try_fold( - HashMap::>::new(), - |mut acc, x| { - let x = x?; - let partition_value = x.partition().clone(); - acc.entry(partition_value).or_default().push(x); - Ok::<_, Error>(acc) - }, - )?); - - let snapshot_id = generate_snapshot_id(); - let manifest_list_location = old_storage_table_metadata.location.to_string() - + "/metadata/snap-" - + &snapshot_id.to_string() - + "-" - + &uuid::Uuid::new_v4().to_string() - + ".avro"; - - let manifest_iter = datafiles.keys().enumerate().map(|(i, partition_value)| { - let manifest_location = manifest_list_location - .to_string() - .trim_end_matches(".avro") - .to_owned() - + "-m" - + &(i).to_string() - + ".avro"; - let manifest = ManifestListEntry { - format_version: old_storage_table_metadata.format_version.clone(), - manifest_path: manifest_location, - manifest_length: 0, - partition_spec_id: old_storage_table_metadata.default_spec_id, - content: Content::Data, - sequence_number: old_storage_table_metadata.last_sequence_number, - min_sequence_number: 0, - added_snapshot_id: snapshot_id, - added_files_count: Some(0), - existing_files_count: Some(0), - deleted_files_count: Some(0), - added_rows_count: Some(0), - existing_rows_count: Some(0), - deleted_rows_count: Some(0), - partitions: None, - key_metadata: None, - }; - (ManifestStatus::New(manifest), vec![partition_value.clone()]) - }); - - let partition_columns = Arc::new( - old_storage_table_metadata - .default_partition_spec()? - .fields() - .iter() - .map(|x| schema.fields().get(*x.source_id() as usize)) - .collect::>>() - .ok_or(Error::InvalidFormat( - "Partition column in schema".to_string(), - ))?, - ); - - let manifest_list_schema = - ManifestListEntry::schema(&old_storage_table_metadata.format_version)?; - - let manifest_list_writer = Arc::new(Mutex::new(apache_avro::Writer::new( - &manifest_list_schema, - Vec::new(), - ))); - - stream::iter(manifest_iter) - .then(|(manifest, files): (ManifestStatus, Vec)| { - let object_store = object_store.clone(); - let datafiles = datafiles.clone(); - let partition_columns = partition_columns.clone(); - let branch = branch.clone(); - let schema = &schema; - let old_storage_table_metadata = &old_storage_table_metadata; - async move { - write_manifest( - old_storage_table_metadata, - manifest, - files, - datafiles, - schema, - &partition_columns, - object_store, - branch, - ) - .await - } - }) - .try_for_each_concurrent(None, |manifest| { - let manifest_list_writer = manifest_list_writer.clone(); - async move { - manifest_list_writer.lock().await.append_ser(manifest)?; - Ok(()) - } - }) - .await?; - - let manifest_list_bytes = Arc::into_inner(manifest_list_writer) - .unwrap() - .into_inner() - .into_inner()?; - - object_store - .put( - &strip_prefix(&manifest_list_location).into(), - manifest_list_bytes.into(), - ) - .await?; - - let snapshot = SnapshotBuilder::default() - .with_snapshot_id(snapshot_id) - .with_sequence_number(0) - .with_schema_id(*schema.schema_id()) - .with_manifest_list(manifest_list_location) - .with_lineage(Lineage::new(version_id, base_tables)) - .build() - .map_err(iceberg_rust_spec::error::Error::from)?; - - let table_metadata = TableMetadataBuilder::default() - .format_version(old_storage_table_metadata.format_version.clone()) - .location(old_storage_table_metadata.location.clone()) - .schemas(old_storage_table_metadata.schemas.clone()) - .current_schema_id(old_storage_table_metadata.current_schema_id) - .partition_specs(old_storage_table_metadata.partition_specs.clone()) - .default_spec_id(old_storage_table_metadata.default_spec_id) - .snapshots(HashMap::from_iter(vec![(snapshot_id, snapshot)])) - .current_snapshot_id(Some(snapshot_id)) - .refs(match branch.as_deref() { - None | Some("main") => HashMap::from_iter(vec![( - "main".to_owned(), - SnapshotReference { - snapshot_id, - retention: SnapshotRetention::default(), - }, - )]), - Some(branch) => HashMap::from_iter(vec![ - ( - branch.to_owned(), - SnapshotReference { - snapshot_id, - retention: SnapshotRetention::default(), - }, - ), - ( - "main".to_owned(), - SnapshotReference { - snapshot_id, - retention: SnapshotRetention::default(), - }, - ), - ]), - }) - .build()?; - let storage_table_location = new_metadata_location(&table_metadata)?; - - let bytes = serde_json::to_vec(&table_metadata)?; - - object_store - .put( - &strip_prefix(&storage_table_location.clone()).into(), - bytes.into(), - ) - .await?; - - self.new_transaction(branch.as_deref()) - .update_materialization(&storage_table_location) - .commit() - .await?; - - delete_files(&old_storage_table_metadata, object_store).await?; - - Ok(()) - } } diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 87d1f0fd..f59c9a6c 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -2,11 +2,26 @@ * Defines the [Transaction] type for materialized views to perform multiple [Operation]s with ACID guarantees. */ -use iceberg_rust_spec::spec::{types::StructType, view_metadata::ViewRepresentation}; +use std::collections::HashMap; + +use iceberg_rust_spec::{ + spec::{ + manifest::DataFile, snapshot::Lineage, table_metadata::new_metadata_location, + types::StructType, view_metadata::ViewRepresentation, + }, + util::strip_prefix, +}; use crate::{ - catalog::{commit::CommitView, tabular::Tabular}, + catalog::{ + commit::{apply_table_updates, check_table_requirements, CommitView}, + tabular::Tabular, + }, error::Error, + table::{ + delete_files, + transaction::{operation::Operation as TableOperation, REWRITE_KEY}, + }, view::transaction::operation::Operation as ViewOperation, }; @@ -15,7 +30,8 @@ use super::MaterializedView; /// Transactions let you perform a sequence of [Operation]s that can be committed to be performed with ACID guarantees. pub struct Transaction<'view> { materialized_view: &'view mut MaterializedView, - operations: Vec>, + view_operations: Vec>, + storage_table_operations: HashMap, branch: Option, } @@ -24,62 +40,169 @@ impl<'view> Transaction<'view> { pub fn new(view: &'view mut MaterializedView, branch: Option<&str>) -> Self { Transaction { materialized_view: view, - operations: vec![], + view_operations: vec![], + storage_table_operations: HashMap::new(), branch: branch.map(ToString::to_string), } } + /// Update the schmema of the view pub fn update_representation( mut self, representation: ViewRepresentation, schema: StructType, ) -> Self { - self.operations.push(ViewOperation::UpdateRepresentation { - representation, - schema, - branch: self.branch.clone(), - }); + self.view_operations + .push(ViewOperation::UpdateRepresentation { + representation, + schema, + branch: self.branch.clone(), + }); self } + /// Update view properties pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self { - self.operations + self.view_operations .push(ViewOperation::UpdateProperties(entries)); self } + /// Update materialization pub fn update_materialization(mut self, materialization: &str) -> Self { - self.operations.push(ViewOperation::UpdateMaterialization( - materialization.to_owned(), - )); + self.view_operations + .push(ViewOperation::UpdateMaterialization( + materialization.to_owned(), + )); + self + } + + /// Perform full refresh operation + pub fn full_refresh(mut self, files: Vec, lineage: Lineage) -> Self { + self.storage_table_operations + .entry(REWRITE_KEY.to_owned()) + .and_modify(|mut x| { + if let TableOperation::Rewrite { + branch: _, + files: old, + lineage: old_lineage, + } = &mut x + { + old.extend_from_slice(&files); + *old_lineage = Some(lineage.clone()); + } + }) + .or_insert(TableOperation::Rewrite { + branch: self.branch.clone(), + files, + lineage: Some(lineage), + }); + let materialization = new_metadata_location((self.materialized_view.metadata()).into()); + self.view_operations + .push(ViewOperation::UpdateMaterialization( + materialization.to_owned(), + )); self } + /// Commit the transaction to perform the [Operation]s with ACID guarantees. pub async fn commit(self) -> Result<(), Error> { let catalog = self.materialized_view.catalog(); let identifier = self.materialized_view.identifier().clone(); - // Execute the table operations - let (mut requirements, mut updates) = (Vec::new(), Vec::new()); - for operation in self.operations { + + let delete_data = if !self.storage_table_operations.is_empty() { + let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new()); + + let mut storage_table_metadata = + self.materialized_view.storage_table().await?.table_metadata; + + // Save old metadata to be able to remove old data after a rewrite operation + let delete_data = if self.storage_table_operations.values().any(|x| match x { + TableOperation::Rewrite { + branch: _, + files: _, + lineage: _, + } => true, + _ => false, + }) { + Some(storage_table_metadata.clone()) + } else { + None + }; + + // Execute table operations + for operation in self.storage_table_operations.into_values() { + let (requirement, update) = operation + .execute( + &storage_table_metadata, + self.materialized_view.object_store(), + ) + .await?; + + if let Some(requirement) = requirement { + table_requirements.push(requirement); + } + table_updates.extend(update); + } + + if !check_table_requirements(&table_requirements, &storage_table_metadata) { + return Err(Error::InvalidFormat( + "Table requirements not valid".to_owned(), + )); + } + + apply_table_updates(&mut storage_table_metadata, table_updates)?; + + let metadata_location = self + .view_operations + .iter() + .find_map(|x| match x { + ViewOperation::UpdateMaterialization(materialization) => Some(materialization), + _ => None, + }) + .ok_or(Error::NotFound( + "Storage table".to_owned(), + "pointer".to_owned(), + ))?; + + self.materialized_view + .object_store() + .put( + &strip_prefix(&metadata_location).into(), + serde_json::to_string(&storage_table_metadata)?.into(), + ) + .await?; + + delete_data + } else { + None + }; + // Execute the view operations + let (mut view_requirements, mut view_updates) = (Vec::new(), Vec::new()); + for operation in self.view_operations { let (requirement, update) = operation.execute(&self.materialized_view.metadata).await?; if let Some(requirement) = requirement { - requirements.push(requirement); + view_requirements.push(requirement); } - updates.extend(update); + view_updates.extend(update); } - if let Tabular::MaterializedView(new_mv) = catalog + if let Tabular::MaterializedView(new_matview) = catalog .clone() .update_view(CommitView { identifier, - requirements, - updates, + requirements: view_requirements, + updates: view_updates, }) .await? { - *self.materialized_view = new_mv; + // Delete data files in case of a rewrite operation + if let Some(old_metadata) = delete_data { + delete_files(&old_metadata, self.materialized_view.object_store()).await?; + } + *self.materialized_view = new_matview; Ok(()) } else { Err(Error::InvalidFormat( diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 4ba23df5..33b643f2 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -3,19 +3,26 @@ */ use std::collections::HashMap; -use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference}; +use iceberg_rust_spec::spec::{ + manifest::DataFile, + schema::Schema, + snapshot::{Lineage, SnapshotReference}, +}; use crate::{catalog::commit::CommitTable, error::Error, table::Table}; use self::operation::Operation; +use super::delete_files; + pub(crate) mod operation; -static APPEND_KEY: &str = "append"; -static ADD_SCHEMA_KEY: &str = "add-schema"; -static SET_DEFAULT_SPEC_KEY: &str = "set-default-spec"; -static UPDATE_PROPERTIES_KEY: &str = "update-properties"; -static SET_SNAPSHOT_REF_KEY: &str = "set-ref"; +pub(crate) static APPEND_KEY: &str = "append"; +pub(crate) static REWRITE_KEY: &str = "rewrite"; +pub(crate) static ADD_SCHEMA_KEY: &str = "add-schema"; +pub(crate) static SET_DEFAULT_SPEC_KEY: &str = "set-default-spec"; +pub(crate) static UPDATE_PROPERTIES_KEY: &str = "update-properties"; +pub(crate) static SET_SNAPSHOT_REF_KEY: &str = "set-ref"; /// Transactions let you perform a sequence of [Operation]s that can be committed to be performed with ACID guarantees. pub struct TableTransaction<'table> { @@ -55,6 +62,7 @@ impl<'table> TableTransaction<'table> { if let Operation::NewAppend { branch: _, files: old, + lineage: None, } = &mut x { old.extend_from_slice(&files) @@ -63,6 +71,50 @@ impl<'table> TableTransaction<'table> { .or_insert(Operation::NewAppend { branch: self.branch.clone(), files, + lineage: None, + }); + self + } + /// Quickly append files to the table + pub fn rewrite(mut self, files: Vec) -> Self { + self.operations + .entry(REWRITE_KEY.to_owned()) + .and_modify(|mut x| { + if let Operation::Rewrite { + branch: _, + files: old, + lineage: None, + } = &mut x + { + old.extend_from_slice(&files) + } + }) + .or_insert(Operation::Rewrite { + branch: self.branch.clone(), + files, + lineage: None, + }); + self + } + /// Quickly append files to the table + pub fn rewrite_with_lineage(mut self, files: Vec, lineage: Lineage) -> Self { + self.operations + .entry(REWRITE_KEY.to_owned()) + .and_modify(|mut x| { + if let Operation::Rewrite { + branch: _, + files: old, + lineage: old_lineage, + } = &mut x + { + old.extend_from_slice(&files); + *old_lineage = Some(lineage.clone()); + } + }) + .or_insert(Operation::Rewrite { + branch: self.branch.clone(), + files, + lineage: Some(lineage), }); self } @@ -89,14 +141,29 @@ impl<'table> TableTransaction<'table> { /// Commit the transaction to perform the [Operation]s with ACID guarantees. pub async fn commit(self) -> Result<(), Error> { let catalog = self.table.catalog(); + let object_store = self.table.object_store(); let identifier = self.table.identifier.clone(); - // Before executing the transactions operations, update the metadata for a new snapshot + // Save old metadata to be able to remove old data after a rewrite operation + let delete_data = if self.operations.values().any(|x| match x { + Operation::Rewrite { + branch: _, + files: _, + lineage: _, + } => true, + _ => false, + }) { + Some(self.table.metadata()) + } else { + None + }; // Execute the table operations let (mut requirements, mut updates) = (Vec::new(), Vec::new()); for operation in self.operations.into_values() { - let (requirement, update) = operation.execute(self.table).await?; + let (requirement, update) = operation + .execute(self.table.metadata(), self.table.object_store()) + .await?; if let Some(requirement) = requirement { requirements.push(requirement); @@ -112,6 +179,11 @@ impl<'table> TableTransaction<'table> { updates, }) .await?; + + if let Some(old_metadata) = delete_data { + delete_files(old_metadata, object_store).await?; + } + *self.table = new_table; Ok(()) } diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index 9e7e8462..94349635 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -15,7 +15,8 @@ use iceberg_rust_spec::spec::{ partition::PartitionField, schema::Schema, snapshot::{ - generate_snapshot_id, SnapshotBuilder, SnapshotReference, SnapshotRetention, Summary, + generate_snapshot_id, Lineage, SnapshotBuilder, SnapshotReference, SnapshotRetention, + Summary, }, types::StructField, values::{Struct, Value}, @@ -27,7 +28,6 @@ use object_store::ObjectStore; use crate::{ catalog::commit::{TableRequirement, TableUpdate}, error::Error, - table::Table, }; #[derive(Debug)] @@ -49,6 +49,7 @@ pub enum Operation { NewAppend { branch: Option, files: Vec, + lineage: Option, }, // /// Quickly append new files to the table // NewFastAppend { @@ -56,7 +57,11 @@ pub enum Operation { // partition_values: Vec, // }, // /// Replace files in the table and commit - // NewRewrite, + Rewrite { + branch: Option, + files: Vec, + lineage: Option, + }, // /// Replace manifests files and commit // RewriteManifests, // /// Replace files in the table by a filter expression @@ -76,12 +81,15 @@ pub enum Operation { impl Operation { pub async fn execute( self, - table: &Table, + table_metadata: &TableMetadata, + object_store: Arc, ) -> Result<(Option, Vec), Error> { match self { - Operation::NewAppend { branch, files } => { - let object_store = table.object_store(); - let table_metadata = table.metadata(); + Operation::NewAppend { + branch, + files, + lineage, + } => { let partition_spec = table_metadata.default_partition_spec()?; let schema = table_metadata.current_schema(branch.as_deref())?; let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?; @@ -303,7 +311,8 @@ impl Operation { ) .await?; - let snapshot = SnapshotBuilder::default() + let mut snapshot_builder = SnapshotBuilder::default(); + snapshot_builder .with_snapshot_id(snapshot_id) .with_manifest_list(new_manifest_list_location) .with_sequence_number( @@ -315,16 +324,175 @@ impl Operation { operation: iceberg_rust_spec::spec::snapshot::Operation::Append, other: HashMap::new(), }) + .with_schema_id(*schema.schema_id()); + if let Some(lineage) = lineage { + snapshot_builder.with_lineage(lineage); + } + let snapshot = snapshot_builder + .build() + .map_err(iceberg_rust_spec::error::Error::from)?; + + Ok(( + old_snapshot.map(|x| TableRequirement::AssertRefSnapshotId { + r#ref: branch.clone().unwrap_or("main".to_owned()), + snapshot_id: *x.snapshot_id(), + }), + vec![ + TableUpdate::AddSnapshot { snapshot }, + TableUpdate::SetSnapshotRef { + ref_name: branch.unwrap_or("main".to_owned()), + snapshot_reference: SnapshotReference { + snapshot_id, + retention: SnapshotRetention::default(), + }, + }, + ], + )) + } + Operation::Rewrite { + branch, + files, + lineage, + } => { + let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?; + let schema = table_metadata.current_schema(branch.as_deref())?.clone(); + + // Split datafils by partition + let datafiles = Arc::new(files.into_iter().map(Ok::<_, Error>).try_fold( + HashMap::>::new(), + |mut acc, x| { + let x = x?; + let partition_value = x.partition().clone(); + acc.entry(partition_value).or_default().push(x); + Ok::<_, Error>(acc) + }, + )?); + + let snapshot_id = generate_snapshot_id(); + let manifest_list_location = table_metadata.location.to_string() + + "/metadata/snap-" + + &snapshot_id.to_string() + + "-" + + &uuid::Uuid::new_v4().to_string() + + ".avro"; + + let manifest_iter = datafiles.keys().enumerate().map(|(i, partition_value)| { + let manifest_location = manifest_list_location + .to_string() + .trim_end_matches(".avro") + .to_owned() + + "-m" + + &(i).to_string() + + ".avro"; + let manifest = ManifestListEntry { + format_version: table_metadata.format_version.clone(), + manifest_path: manifest_location, + manifest_length: 0, + partition_spec_id: table_metadata.default_spec_id, + content: Content::Data, + sequence_number: table_metadata.last_sequence_number, + min_sequence_number: 0, + added_snapshot_id: snapshot_id, + added_files_count: Some(0), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(0), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: None, + key_metadata: None, + }; + (ManifestStatus::New(manifest), vec![partition_value.clone()]) + }); + + let partition_columns = Arc::new( + table_metadata + .default_partition_spec()? + .fields() + .iter() + .map(|x| schema.fields().get(*x.source_id() as usize)) + .collect::>>() + .ok_or(Error::InvalidFormat( + "Partition column in schema".to_string(), + ))?, + ); + + let manifest_list_schema = + ManifestListEntry::schema(&table_metadata.format_version)?; + + let manifest_list_writer = Arc::new(Mutex::new(apache_avro::Writer::new( + &manifest_list_schema, + Vec::new(), + ))); + + stream::iter(manifest_iter) + .then(|(manifest, files): (ManifestStatus, Vec)| { + let object_store = object_store.clone(); + let datafiles = datafiles.clone(); + let partition_columns = partition_columns.clone(); + let branch = branch.clone(); + let schema = &schema; + let old_storage_table_metadata = &table_metadata; + async move { + write_manifest( + old_storage_table_metadata, + manifest, + files, + datafiles, + schema, + &partition_columns, + object_store, + branch, + ) + .await + } + }) + .try_for_each_concurrent(None, |manifest| { + let manifest_list_writer = manifest_list_writer.clone(); + async move { + manifest_list_writer.lock().await.append_ser(manifest)?; + Ok(()) + } + }) + .await?; + + let manifest_list_bytes = Arc::into_inner(manifest_list_writer) + .unwrap() + .into_inner() + .into_inner()?; + + object_store + .put( + &strip_prefix(&manifest_list_location).into(), + manifest_list_bytes.into(), + ) + .await?; + + let mut snapshot_builder = SnapshotBuilder::default(); + snapshot_builder + .with_snapshot_id(snapshot_id) + .with_sequence_number(0) .with_schema_id(*schema.schema_id()) + .with_manifest_list(manifest_list_location); + if let Some(lineage) = lineage { + snapshot_builder.with_lineage(lineage); + } + let snapshot = snapshot_builder .build() .map_err(iceberg_rust_spec::error::Error::from)?; + let old_snapshot_ids: Vec = + table_metadata.snapshots.keys().map(Clone::clone).collect(); + Ok(( old_snapshot.map(|x| TableRequirement::AssertRefSnapshotId { r#ref: branch.clone().unwrap_or("main".to_owned()), snapshot_id: *x.snapshot_id(), }), vec![ + TableUpdate::RemoveSnapshots { + snapshot_ids: old_snapshot_ids, + }, TableUpdate::AddSnapshot { snapshot }, TableUpdate::SetSnapshotRef { ref_name: branch.unwrap_or("main".to_owned()), @@ -342,20 +510,19 @@ impl Operation { updates: HashMap::from_iter(entries), }], )), - Operation::SetSnapshotRef((key, value)) => { - Ok(( - table.metadata().refs.get(&key).map(|x| { - TableRequirement::AssertRefSnapshotId { - r#ref: key.clone(), - snapshot_id: x.snapshot_id, - } + Operation::SetSnapshotRef((key, value)) => Ok(( + table_metadata + .refs + .get(&key) + .map(|x| TableRequirement::AssertRefSnapshotId { + r#ref: key.clone(), + snapshot_id: x.snapshot_id, }), - vec![TableUpdate::SetSnapshotRef { - ref_name: key, - snapshot_reference: value, - }], - )) - } + vec![TableUpdate::SetSnapshotRef { + ref_name: key, + snapshot_reference: value, + }], + )), _ => Ok((None, vec![])), } } diff --git a/iceberg-tests/nyc_taxis/home/iceberg/warehouse/nyc/taxis/metadata/0-300c39ad-0663-4f3c-9523-e338aebb25ef.metadata.json b/iceberg-tests/nyc_taxis/home/iceberg/warehouse/nyc/taxis/metadata/0-300c39ad-0663-4f3c-9523-e338aebb25ef.metadata.json new file mode 100644 index 00000000..52afc021 --- /dev/null +++ b/iceberg-tests/nyc_taxis/home/iceberg/warehouse/nyc/taxis/metadata/0-300c39ad-0663-4f3c-9523-e338aebb25ef.metadata.json @@ -0,0 +1 @@ +{"format-version":1,"table-uuid":"df838b92-0b32-465d-a44e-d39936e538b7","location":"/home/iceberg/warehouse/nyc/taxis","last-updated-ms":1662532818843,"last-column-id":5,"schema":{"schema-id":0,"type":"struct","fields":[{"id":1,"name":"vendor_id","required":false,"type":"long"},{"id":2,"name":"trip_id","required":false,"type":"long"},{"id":3,"name":"trip_distance","required":false,"type":"float"},{"id":4,"name":"fare_amount","required":false,"type":"double"},{"id":5,"name":"store_and_fwd_flag","required":false,"type":"string"}]},"schemas":[{"schema-id":0,"type":"struct","fields":[{"id":1,"name":"vendor_id","required":false,"type":"long"},{"id":2,"name":"trip_id","required":false,"type":"long"},{"id":3,"name":"trip_distance","required":false,"type":"float"},{"id":4,"name":"fare_amount","required":false,"type":"double"},{"id":5,"name":"store_and_fwd_flag","required":false,"type":"string"}]}],"current-schema-id":0,"partition-spec":[{"source-id":1,"field-id":1000,"name":"vendor_id_partition","transform":"identity"}],"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"vendor_id_partition","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"owner":"root"},"current-snapshot-id":638933773299822130,"snapshots":[{"snapshot-id":638933773299822130,"timestamp-ms":1662532818843,"manifest-list":"/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro","summary":{"operation":"append","spark.app.id":"local-1662532784305","added-records":"4","total-data-files":"4","total-delete-files":"0","total-files-size":"6001","total-position-deletes":"0","added-files-size":"6001","changed-partition-count":"2","total-equality-deletes":"0","added-data-files":"4","total-records":"4"},"schema-id":0}],"snapshot-log":[{"snapshot-id":638933773299822130,"timestamp-ms":1662532818843}],"metadata-log":[{"metadata-file":"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json","timestamp-ms":1662532805245}],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0} \ No newline at end of file