Skip to content

Commit

Permalink
refactor materialized view full refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Feb 11, 2024
1 parent 9b61c17 commit 08f9337
Show file tree
Hide file tree
Showing 13 changed files with 565 additions and 355 deletions.
38 changes: 22 additions & 16 deletions datafusion_iceberg/src/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use iceberg_rust::{
sql::find_relations,
};
use iceberg_rust_spec::spec::{
snapshot::{FullIdentifier, SourceTable},
snapshot::{FullIdentifier, Lineage, SourceTable},
view_metadata::ViewRepresentation,
};
use itertools::Itertools;
Expand Down Expand Up @@ -60,12 +60,13 @@ pub async fn refresh_materialized_view(
.collect::<Result<Vec<_>, Error>>()?,
};

// Load source tables
let source_tables = stream::iter(source_tables.iter())
.then(|base_table| {
.then(|source_table| {
let catalog_list = catalog_list.clone();
let branch = branch.clone();
async move {
let identifier = base_table.identifier();
let identifier = source_table.identifier();
let catalog_name = identifier.catalog();
let catalog = catalog_list
.catalog(catalog_name)
Expand All @@ -76,8 +77,10 @@ pub async fn refresh_materialized_view(
))?;

let tabular = match catalog
.load_table(&Identifier::try_new(&[identifier.namespace().clone(),
identifier.table_name().clone()])?)
.load_table(&Identifier::try_new(&[
identifier.namespace().clone(),
identifier.table_name().clone(),
])?)
.await?
{
Tabular::View(_) => {
Expand All @@ -101,12 +104,12 @@ pub async fn refresh_materialized_view(
_ => Err(Error::InvalidFormat("storage table".to_string())),
}?;

let table_state = if current_snapshot_id == *base_table.snapshot_id() {
let table_state = if current_snapshot_id == *source_table.snapshot_id() {
StorageTableState::Fresh
} else if *base_table.snapshot_id() == -1 {
} else if *source_table.snapshot_id() == -1 {
StorageTableState::Invalid
} else {
StorageTableState::Outdated(*base_table.snapshot_id())
StorageTableState::Outdated(*source_table.snapshot_id())
};

Ok((catalog_name, tabular, table_state, current_snapshot_id))
Expand All @@ -115,16 +118,15 @@ pub async fn refresh_materialized_view(
.try_collect::<Vec<_>>()
.await?;

// Full refresh

let new_tables = source_tables
// Register source tables in datafusion context and return lineage information
let source_tables = source_tables
.into_iter()
.flat_map(|(catalog_name, base_table, _, last_snapshot_id)| {
let identifier = base_table.identifier().to_string().to_owned();
let uuid = *base_table.metadata().uuid();
.flat_map(|(catalog_name, source_table, _, last_snapshot_id)| {
let identifier = source_table.identifier().to_string().to_owned();
let uuid = *source_table.metadata().as_ref().uuid();

let table = Arc::new(DataFusionTable::new(
base_table,
source_table,
None,
None,
branch.as_deref(),
Expand Down Expand Up @@ -167,13 +169,15 @@ pub async fn refresh_materialized_view(

let logical_plan = ctx.state().create_logical_plan(&sql_statements[0]).await?;

// Calculate arrow record batches from logical plan
let batches = ctx
.execute_logical_plan(logical_plan)
.await?
.execute_stream()
.await?
.map_err(ArrowError::from);

// Write arrow record batches to datafiles
let files = write_parquet_partitioned(
&storage_table.table_metadata,
batches,
Expand All @@ -183,7 +187,9 @@ pub async fn refresh_materialized_view(
.await?;

matview
.full_refresh(files, version_id, new_tables, branch)
.new_transaction(branch.as_deref())
.full_refresh(files, Lineage::new(version_id, source_tables))
.commit()
.await?;

Ok(())
Expand Down
7 changes: 2 additions & 5 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,17 +597,14 @@ mod tests {
};
use iceberg_catalog_sql::SqlCatalog;
use iceberg_rust::{
catalog::{
identifier::Identifier,
tabular::{Tabular, TabularMetadata},
Catalog,
},
catalog::{identifier::Identifier, tabular::Tabular, Catalog},
table::table_builder::TableBuilder,
view::view_builder::ViewBuilder,
};
use iceberg_rust_spec::spec::{
partition::{PartitionField, PartitionSpecBuilder, Transform},
schema::Schema,
tabular::TabularMetadata,
types::{PrimitiveType, StructField, StructType, Type},
};
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
Expand Down
14 changes: 7 additions & 7 deletions iceberg-catalog-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use iceberg_rust::{
},
identifier::Identifier,
namespace::Namespace,
tabular::{Tabular, TabularMetadata},
tabular::Tabular,
Catalog, CatalogList,
},
error::Error as IcebergError,
materialized_view::MaterializedView,
spec::table_metadata::new_metadata_location,
spec::{table_metadata::new_metadata_location, tabular::TabularMetadata},
table::Table,
util::strip_prefix,
view::View,
Expand Down Expand Up @@ -245,16 +245,16 @@ impl Catalog for SqlCatalog {
metadata: TabularMetadata,
) -> Result<Tabular, IcebergError> {
// Create metadata
let location = metadata.as_ref().location().to_string();

// Write metadata to object_store
let bucket = parse_bucket(metadata.location())?;
let bucket = parse_bucket(&location)?;
let object_store = self.object_store(bucket);

let location = &metadata.location();
let uuid = Uuid::new_v4();
let version = &metadata.sequence_number();
let version = &metadata.as_ref().sequence_number();
let metadata_json = serde_json::to_string(&metadata)?;
let metadata_location = location.to_string()
let metadata_location = location
+ "/metadata/"
+ &version.to_string()
+ "-"
Expand Down Expand Up @@ -308,7 +308,7 @@ impl Catalog for SqlCatalog {
));
}
apply_table_updates(&mut metadata, commit.updates)?;
let metadata_location = new_metadata_location(&metadata)?;
let metadata_location = new_metadata_location((&metadata).into());
self.object_store
.put(
&strip_prefix(&metadata_location).into(),
Expand Down
1 change: 1 addition & 0 deletions iceberg-rust-spec/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod schema;
pub mod snapshot;
pub mod sort;
pub mod table_metadata;
pub mod tabular;
pub mod types;
pub mod values;
pub mod view_metadata;
10 changes: 6 additions & 4 deletions iceberg-rust-spec/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use derive_builder::Builder;
use super::{
schema::Schema,
snapshot::{Snapshot, SnapshotReference},
tabular::TabularMetadataRef,
};

pub static MAIN_BRANCH: &str = "main";
Expand Down Expand Up @@ -214,15 +215,16 @@ impl TableMetadata {
}
}

pub fn new_metadata_location(metadata: &TableMetadata) -> Result<String, Error> {
pub fn new_metadata_location<'a>(metadata: TabularMetadataRef<'a>) -> String {
let transaction_uuid = Uuid::new_v4();
let version = metadata.last_sequence_number;
Ok(metadata.location.to_string()
let version = metadata.sequence_number();

metadata.location().to_string()
+ "/metadata/"
+ &version.to_string()
+ "-"
+ &transaction_uuid.to_string()
+ ".metadata.json")
+ ".metadata.json"
}

mod _serde {
Expand Down
106 changes: 106 additions & 0 deletions iceberg-rust-spec/src/spec/tabular.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*! Enum for Metadata of Table, View or Materialized View
*/

use serde::{Deserialize, Serialize};
use uuid::Uuid;

use super::{
materialized_view_metadata::MaterializedViewMetadata, table_metadata::TableMetadata,
view_metadata::ViewMetadata,
};
/// Metadata of an iceberg relation
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
#[allow(clippy::large_enum_variant)]
pub enum TabularMetadata {
/// Table metadata
Table(TableMetadata),
/// View metadata
View(ViewMetadata),
/// Materialized view metadata
MaterializedView(MaterializedViewMetadata),
}

impl TabularMetadata {
pub fn as_ref<'a>(&'a self) -> TabularMetadataRef<'a> {
match self {
TabularMetadata::Table(table) => TabularMetadataRef::Table(table),
TabularMetadata::View(view) => TabularMetadataRef::View(view),
TabularMetadata::MaterializedView(matview) => {
TabularMetadataRef::MaterializedView(matview)
}
}
}
}

impl From<TableMetadata> for TabularMetadata {
fn from(value: TableMetadata) -> Self {
TabularMetadata::Table(value)
}
}

impl From<ViewMetadata> for TabularMetadata {
fn from(value: ViewMetadata) -> Self {
TabularMetadata::View(value)
}
}

impl From<MaterializedViewMetadata> for TabularMetadata {
fn from(value: MaterializedViewMetadata) -> Self {
TabularMetadata::MaterializedView(value)
}
}

pub enum TabularMetadataRef<'a> {
/// Table metadata
Table(&'a TableMetadata),
/// View metadata
View(&'a ViewMetadata),
/// Materialized view metadata
MaterializedView(&'a MaterializedViewMetadata),
}

impl<'a> TabularMetadataRef<'a> {
/// Get uuid of tabular
pub fn uuid(&self) -> &Uuid {
match self {
TabularMetadataRef::Table(table) => &table.table_uuid,
TabularMetadataRef::View(view) => &view.view_uuid,
TabularMetadataRef::MaterializedView(matview) => &matview.view_uuid,
}
}
/// Get location for tabular
pub fn location(&self) -> &str {
match self {
TabularMetadataRef::Table(table) => &table.location,
TabularMetadataRef::View(view) => &view.location,
TabularMetadataRef::MaterializedView(matview) => &matview.location,
}
}
/// Get sequence number for tabular
pub fn sequence_number(&self) -> i64 {
match self {
TabularMetadataRef::Table(table) => table.last_sequence_number,
TabularMetadataRef::View(view) => view.current_version_id,
TabularMetadataRef::MaterializedView(matview) => matview.current_version_id,
}
}
}

impl<'a> From<&'a TableMetadata> for TabularMetadataRef<'a> {
fn from(value: &'a TableMetadata) -> Self {
TabularMetadataRef::Table(value)
}
}

impl<'a> From<&'a ViewMetadata> for TabularMetadataRef<'a> {
fn from(value: &'a ViewMetadata) -> Self {
TabularMetadataRef::View(value)
}
}

impl<'a> From<&'a MaterializedViewMetadata> for TabularMetadataRef<'a> {
fn from(value: &'a MaterializedViewMetadata) -> Self {
TabularMetadataRef::MaterializedView(value)
}
}
3 changes: 2 additions & 1 deletion iceberg-rust/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;
pub mod identifier;
pub mod namespace;

use iceberg_rust_spec::spec::tabular::TabularMetadata;
use identifier::Identifier;
use object_store::ObjectStore;

Expand All @@ -17,7 +18,7 @@ use crate::table::Table;
use self::bucket::Bucket;
use self::commit::{CommitTable, CommitView};
use self::namespace::Namespace;
use self::tabular::{Tabular, TabularMetadata};
use self::tabular::Tabular;

pub mod bucket;
pub mod commit;
Expand Down
Loading

0 comments on commit 08f9337

Please sign in to comment.