diff --git a/datafusion_iceberg/src/materialized_view.rs b/datafusion_iceberg/src/materialized_view.rs index f074e1cf..611645b4 100644 --- a/datafusion_iceberg/src/materialized_view.rs +++ b/datafusion_iceberg/src/materialized_view.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use datafusion::{ arrow::error::ArrowError, @@ -7,7 +7,7 @@ use datafusion::{ }; use futures::{stream, StreamExt, TryStreamExt}; use iceberg_rust::spec::{ - materialized_view_metadata::RefreshTable, + materialized_view_metadata::RefreshState, view_metadata::{FullIdentifier, ViewRepresentation}, }; use iceberg_rust::{ @@ -132,7 +132,7 @@ pub async fn refresh_materialized_view( } // Register source tables in datafusion context and return lineage information - let refresh_tables = source_tables + let source_table_states = source_tables .into_iter() .flat_map(|(identifier, source_table, _, last_snapshot_id)| { let table = Arc::new(DataFusionTable::new( @@ -164,16 +164,13 @@ pub async fn refresh_materialized_view( .filter_ok(|(identifier, _)| !identifier.name().ends_with("__delta__")) .map(|x| { let (identifer, snapshot_id) = x?; - let sequence_id = *lineage.get(&identifer).ok_or(Error::NotFound( + let sequence_id = lineage.get(&identifer).ok_or(Error::NotFound( "Lineage entry".to_owned(), identifer.to_string(), ))?; - Ok(RefreshTable { - sequence_id, - revision_id: snapshot_id, - }) + Ok((sequence_id.clone(), snapshot_id)) }) - .collect::>()?; + .collect::, Error>>()?; let sql_statements = transform_relations(sql)?; @@ -196,10 +193,16 @@ pub async fn refresh_materialized_view( ) .await?; - let version_id = matview.metadata().current_version_id; + let refresh_version_id = matview.metadata().current_version_id; + + let refresh_state = RefreshState { + refresh_version_id, + source_table_states, + source_view_states: HashMap::new(), + }; matview .new_transaction(branch.as_deref()) - .full_refresh(files, refresh_tables, version_id)? + .full_refresh(files, refresh_state)? .commit() .await?; diff --git a/iceberg-rust-spec/src/spec/materialized_view_metadata.rs b/iceberg-rust-spec/src/spec/materialized_view_metadata.rs index 7bc21f9e..2c381cae 100644 --- a/iceberg-rust-spec/src/spec/materialized_view_metadata.rs +++ b/iceberg-rust-spec/src/spec/materialized_view_metadata.rs @@ -2,14 +2,15 @@ * A Struct for the materialized view metadata */ +use std::collections::HashMap; + use serde::{Deserialize, Serialize}; use crate::view_metadata::FullIdentifier; use super::view_metadata::{GeneralViewMetadata, GeneralViewMetadataBuilder}; -/// Property for the metadata location -pub static STORAGE_TABLE: &str = "storage_table"; +pub static REFRESH_STATE: &str = "refresh-state"; /// Fields for the version 1 of the view metadata. pub type MaterializedViewMetadata = GeneralViewMetadata; @@ -19,11 +20,13 @@ pub type MaterializedViewMetadataBuilder = GeneralViewMetadataBuilder, + /// A map from sequence-id (as defined in the view lineage) to the source views’ version-id of when the last refresh operation was performed. + pub source_view_states: HashMap, } #[cfg(test)] @@ -97,44 +100,4 @@ mod tests { Ok(()) } - - // #[test] - // fn test_depends_on_tables_try_from_str() { - // let input = "table1=1,table2=2"; - - // let result = depends_on_tables_from_string(input).unwrap(); - - // assert_eq!( - // result, - // vec![ - // RefreshTable { - // identifier: "table1".to_string(), - // revision_id: 1 - // }, - // RefreshTable { - // identifier: "table2".to_string(), - // revision_id: 2 - // } - // ] - // ); - // } - - // #[test] - // fn test_try_from_depends_on_tables_to_string() { - // let depends_on_tables = vec![ - // RefreshTable { - // identifier: "table1".to_string(), - // revision_id: 1, - // }, - // RefreshTable { - // identifier: "table2".to_string(), - // revision_id: 2, - // }, - // ]; - - // let result = depends_on_tables_to_string(&depends_on_tables); - - // assert!(result.is_ok()); - // assert_eq!(result.unwrap(), "table1=1,table2=2"); - // } } diff --git a/iceberg-rust-spec/src/spec/snapshot.rs b/iceberg-rust-spec/src/spec/snapshot.rs index f132fb40..8640a5b8 100644 --- a/iceberg-rust-spec/src/spec/snapshot.rs +++ b/iceberg-rust-spec/src/spec/snapshot.rs @@ -24,9 +24,6 @@ use super::{ use _serde::SnapshotEnum; -pub static REFRESH_TABLES: &str = "refresh-tables"; -pub static REFRESH_VERSION_ID: &str = "refresh-version-id"; - #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, Getters)] #[serde(from = "SnapshotEnum", into = "SnapshotEnum")] #[builder(setter(prefix = "with"))] diff --git a/iceberg-rust-spec/src/spec/view_metadata.rs b/iceberg-rust-spec/src/spec/view_metadata.rs index bd415b55..6f316840 100644 --- a/iceberg-rust-spec/src/spec/view_metadata.rs +++ b/iceberg-rust-spec/src/spec/view_metadata.rs @@ -490,18 +490,18 @@ impl Display for FullIdentifier { #[serde(rename_all = "kebab-case")] pub struct SourceTable { identifier: FullIdentifier, - sequence_id: i64, + sequence_id: String, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(from = "Vec", into = "Vec")] -pub struct Lineage(HashMap); +pub struct Lineage(HashMap); impl Lineage { pub fn new() -> Self { Lineage(HashMap::new()) } - pub fn from_iter>(iter: T) -> Self { + pub fn from_iter>(iter: T) -> Self { Lineage(HashMap::from_iter(iter)) } } @@ -528,7 +528,7 @@ impl From for Vec { } impl Deref for Lineage { - type Target = HashMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.0 } diff --git a/iceberg-rust/src/catalog/create.rs b/iceberg-rust/src/catalog/create.rs index ec8a6013..6b335f2e 100644 --- a/iceberg-rust/src/catalog/create.rs +++ b/iceberg-rust/src/catalog/create.rs @@ -283,7 +283,7 @@ impl CreateMaterializedViewBuilder { create.view_version.default_namespace().as_deref(), create.view_version.default_catalog().as_deref(), )?, - i as i64, + i.to_string(), )) }) .collect::, _>>()?, diff --git a/iceberg-rust/src/materialized_view/storage_table.rs b/iceberg-rust/src/materialized_view/storage_table.rs index 0cf2e476..fe35df79 100644 --- a/iceberg-rust/src/materialized_view/storage_table.rs +++ b/iceberg-rust/src/materialized_view/storage_table.rs @@ -3,10 +3,7 @@ use std::{ ops::{Deref, DerefMut}, }; -use iceberg_rust_spec::{ - snapshot::REFRESH_VERSION_ID, - spec::{materialized_view_metadata::RefreshTable, snapshot::REFRESH_TABLES}, -}; +use iceberg_rust_spec::materialized_view_metadata::{RefreshState, REFRESH_STATE}; use crate::{error::Error, table::Table}; @@ -35,23 +32,17 @@ impl StorageTable { &self, version_id: i64, branch: Option, - ) -> Result>, Error> { + ) -> Result>, Error> { let current_snapshot = self.metadata().current_snapshot(branch.as_deref())?; - if Some(version_id) - == current_snapshot - .and_then(|snapshot| snapshot.summary().other.get(REFRESH_VERSION_ID)) - .map(|x| str::parse::(x)) - .transpose()? - { - current_snapshot - .and_then(|snapshot| snapshot.summary().other.get(REFRESH_TABLES)) - .map(|x| { - serde_json::from_str::>(x).map(|x| { - HashMap::from_iter(x.into_iter().map(|y| (y.sequence_id, y.revision_id))) - }) - }) - .transpose() - .map_err(Error::from) + let refresh_state = current_snapshot + .and_then(|snapshot| snapshot.summary().other.get(REFRESH_STATE)) + .map(|x| serde_json::from_str::(&x)) + .transpose()?; + let Some(refresh_state) = refresh_state else { + return Ok(None); + }; + if version_id == refresh_state.refresh_version_id { + Ok(Some(refresh_state.source_table_states)) } else { Ok(None) } diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 5a0bb8c4..1fc8cb07 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -5,11 +5,8 @@ use std::collections::HashMap; use iceberg_rust_spec::{ - snapshot::{REFRESH_TABLES, REFRESH_VERSION_ID}, - spec::{ - manifest::DataFile, materialized_view_metadata::RefreshTable, types::StructType, - view_metadata::ViewRepresentation, - }, + materialized_view_metadata::{RefreshState, REFRESH_STATE}, + spec::{manifest::DataFile, types::StructType, view_metadata::ViewRepresentation}, }; use crate::{ @@ -69,10 +66,9 @@ impl<'view> Transaction<'view> { pub fn full_refresh( mut self, files: Vec, - lineage: Vec, - version_id: i64, + refresh_state: RefreshState, ) -> Result { - let refresh_tables = serde_json::to_string(&lineage)?; + let refresh_state = serde_json::to_string(&refresh_state)?; self.storage_table_operations .entry(REWRITE_KEY.to_owned()) .and_modify(|mut x| { @@ -83,19 +79,19 @@ impl<'view> Transaction<'view> { } = &mut x { old.extend_from_slice(&files); - *old_lineage = Some(HashMap::from_iter(vec![ - (REFRESH_VERSION_ID.to_owned(), version_id.to_string()), - (REFRESH_TABLES.to_owned(), refresh_tables.clone()), - ])); + *old_lineage = Some(HashMap::from_iter(vec![( + REFRESH_STATE.to_owned(), + refresh_state.clone(), + )])); } }) .or_insert(TableOperation::Rewrite { branch: self.branch.clone(), files, - additional_summary: Some(HashMap::from_iter(vec![ - (REFRESH_VERSION_ID.to_owned(), version_id.to_string()), - (REFRESH_TABLES.to_owned(), refresh_tables), - ])), + additional_summary: Some(HashMap::from_iter(vec![( + REFRESH_STATE.to_owned(), + refresh_state, + )])), }); Ok(self) } diff --git a/iceberg-rust/src/view/transaction/operation.rs b/iceberg-rust/src/view/transaction/operation.rs index 825914bb..00da2071 100644 --- a/iceberg-rust/src/view/transaction/operation.rs +++ b/iceberg-rust/src/view/transaction/operation.rs @@ -70,7 +70,7 @@ impl Operation { version.default_namespace().as_deref(), version.default_catalog().as_deref(), )?, - i as i64, + i.to_string(), )) }) .collect::, _>>()?,