Skip to content

Commit

Permalink
register storage table in the catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Feb 20, 2024
1 parent df6e67b commit faaa14d
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 182 deletions.
6 changes: 3 additions & 3 deletions datafusion_iceberg/src/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ pub async fn refresh_materialized_view(
Tabular::MaterializedView(mv) => {
let storage_table = mv.storage_table().await?;
Ok(*storage_table
.table_metadata
.metadata()
.current_snapshot(branch.as_deref())?
// Fallback to main branch
.or(storage_table.table_metadata.current_snapshot(None)?)
.or(storage_table.metadata().current_snapshot(None)?)
.ok_or(Error::NotFound(
"Snapshot in source table".to_owned(),
format!("{}", table_name),
Expand Down Expand Up @@ -196,7 +196,7 @@ pub async fn refresh_materialized_view(

// Write arrow record batches to datafiles
let files = write_parquet_partitioned(
&storage_table.table_metadata,
&storage_table.metadata(),
batches,
matview.object_store(),
branch.as_deref(),
Expand Down
16 changes: 2 additions & 14 deletions datafusion_iceberg/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use datafusion::{
physical_plan::{ColumnStatistics, Statistics},
scalar::ScalarValue,
};
use iceberg_rust::{
catalog::{identifier::Identifier, tabular::Tabular},
table::Table,
};
use iceberg_rust::{catalog::tabular::Tabular, table::Table};
use iceberg_rust_spec::spec::values::Value;

use crate::error::Error;
Expand All @@ -21,16 +18,7 @@ impl DataFusionTable {
Tabular::Table(table) => table_statistics(table, &self.snapshot_range).await,
Tabular::View(_) => Err(Error::NotSupported("Statistics for views".to_string())),
Tabular::MaterializedView(mv) => {
let table = Table::new(
Identifier::try_new(&["temp".to_owned()]).map_err(Error::from)?,
mv.catalog(),
mv.storage_table()
.await
.map_err(Error::from)?
.table_metadata,
)
.await
.map_err(Error::from)?;
let table = mv.storage_table().await?;
table_statistics(&table, &self.snapshot_range).await
}
}
Expand Down
18 changes: 3 additions & 15 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ use crate::{
};

use iceberg_rust::{
arrow::write::write_parquet_partitioned,
catalog::{identifier::Identifier, tabular::Tabular},
materialized_view::MaterializedView,
table::Table,
view::View,
arrow::write::write_parquet_partitioned, catalog::tabular::Tabular,
materialized_view::MaterializedView, table::Table, view::View,
};
use iceberg_rust_spec::spec::{
schema::Schema,
Expand Down Expand Up @@ -193,16 +190,7 @@ impl TableProvider for DataFusionTable {
.await
}
Tabular::MaterializedView(mv) => {
let table = Table::new(
Identifier::try_new(&["temp".to_owned()]).map_err(Error::from)?,
mv.catalog(),
mv.storage_table()
.await
.map_err(Error::from)?
.table_metadata,
)
.await
.map_err(Error::from)?;
let table = mv.storage_table().await.map_err(Error::from)?;
let schema = self.schema();
let statistics = self.statistics().await.map_err(Into::<Error>::into)?;
table_scan(
Expand Down
6 changes: 1 addition & 5 deletions iceberg-rust-spec/src/spec/view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,7 @@ mod _serde {
current_version_id: value.current_version_id,
versions: value.versions.into_values().collect(),
version_log: value.version_log,
properties: if value.properties.is_empty() {
None
} else {
Some(value.properties)
},
properties: Some(value.properties),
schemas: value.schemas.into_values().map(Into::into).collect(),
}
}
Expand Down
9 changes: 0 additions & 9 deletions iceberg-rust/src/catalog/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ pub enum ViewRequirement {
/// Uuid to assert
uuid: Uuid,
},
/// The materialization must match the requirement's materialization
AssertMaterialization {
/// Storage table pointer to assert
materialization: String,
},
}
/// Check table update requirements
pub fn check_table_requirements(
Expand Down Expand Up @@ -294,10 +289,6 @@ pub fn check_view_requirements<T: Clone + Default + Eq + 'static>(
) -> bool {
requirements.iter().all(|x| match x {
ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
ViewRequirement::AssertMaterialization { materialization } => {
metadata.properties.storage_table
== *(materialization as &dyn Any).downcast_ref::<T>().unwrap()
}
})
}
/// Apply updates to metadata
Expand Down
48 changes: 20 additions & 28 deletions iceberg-rust/src/materialized_view/materialized_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,19 @@ use std::{
sync::Arc,
};

use iceberg_rust_spec::{
spec::{
materialized_view_metadata::MaterializedViewMetadataBuilder,
schema::Schema,
table_metadata::TableMetadataBuilder,
view_metadata::{VersionBuilder, ViewProperties, ViewRepresentation, REF_PREFIX},
},
util::strip_prefix,
use iceberg_rust_spec::spec::{
materialized_view_metadata::MaterializedViewMetadataBuilder,
schema::Schema,
table_metadata::TableMetadataBuilder,
view_metadata::{VersionBuilder, ViewProperties, ViewRepresentation, REF_PREFIX},
};
use uuid::Uuid;

use crate::{
catalog::{bucket::parse_bucket, identifier::Identifier, tabular::Tabular, Catalog},
catalog::{identifier::Identifier, tabular::Tabular, Catalog},
error::Error,
};

use super::MaterializedView;
use super::{MaterializedView, STORAGE_TABLE_POSTFIX};

///Builder pattern to create a view
pub struct MaterializedViewBuilder {
Expand Down Expand Up @@ -82,8 +78,14 @@ impl MaterializedViewBuilder {

/// Building a materialized view writes the metadata file to the object store and commits the table to the metastore
pub async fn build(self) -> Result<MaterializedView, Error> {
let mut metadata = self.metadata.build()?;
let bucket = parse_bucket(&metadata.location)?;
let mut metadata = self.metadata;
let table_identifier = self.identifier.to_string() + STORAGE_TABLE_POSTFIX;
let metadata = metadata
.properties(ViewProperties {
storage_table: table_identifier.clone(),
other: Default::default(),
})
.build()?;
let schema_id = &metadata.current_version(None)?.schema_id;
let table_metadata = TableMetadataBuilder::default()
.location(&metadata.location)
Expand All @@ -97,22 +99,12 @@ impl MaterializedViewBuilder {
))
.current_schema_id(*schema_id)
.build()?;
let object_store = self.catalog.object_store(bucket);
let location = &metadata.location;
let table_path = location.to_string()
+ "/metadata/"
+ &table_metadata.last_sequence_number.to_string()
+ "-"
+ &Uuid::new_v4().to_string()
+ ".metadata.json";
metadata.properties.storage_table = table_path.clone();
let table_metadata_json = serde_json::to_string(&table_metadata)?;
object_store
.put(
&strip_prefix(&table_path).into(),
table_metadata_json.into(),
)

self.catalog
.clone()
.register_tabular(Identifier::parse(&table_identifier)?, table_metadata.into())
.await?;

if let Tabular::MaterializedView(matview) = self
.catalog
.register_tabular(self.identifier, metadata.into())
Expand Down
34 changes: 13 additions & 21 deletions iceberg-rust/src/materialized_view/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

use std::sync::Arc;

use iceberg_rust_spec::{
spec::{
materialized_view_metadata::MaterializedViewMetadata, schema::Schema,
tabular::TabularMetadata,
},
util::strip_prefix,
use iceberg_rust_spec::spec::{
materialized_view_metadata::MaterializedViewMetadata, schema::Schema,
};
use object_store::ObjectStore;

use crate::{
catalog::{bucket::parse_bucket, identifier::Identifier, Catalog},
catalog::{bucket::parse_bucket, identifier::Identifier, tabular::Tabular, Catalog},
error::Error,
};

Expand All @@ -26,6 +22,8 @@ pub mod transaction;

/// Name of table property that stores lineage information
pub static DEPENDS_ON_TABLES: &str = "depends_on_tables";
/// Postfix of storage table
pub static STORAGE_TABLE_POSTFIX: &str = "__storage";

#[derive(Debug)]
/// An iceberg materialized view
Expand Down Expand Up @@ -90,20 +88,14 @@ impl MaterializedView {
}
/// Get the storage table of the materialized view
pub async fn storage_table(&self) -> Result<StorageTable, Error> {
let storage_table_location = &self.metadata.properties.storage_table;
let bucket = parse_bucket(storage_table_location)?;
if let TabularMetadata::Table(metadata) = serde_json::from_str(std::str::from_utf8(
&self
.catalog()
.object_store(bucket)
.get(&strip_prefix(storage_table_location).into())
.await?
.bytes()
.await?,
)?)? {
Ok(StorageTable {
table_metadata: metadata,
})
if let Tabular::Table(table) = self
.catalog()
.load_table(&Identifier::parse(
&(&self.metadata.properties.storage_table),
)?)
.await?
{
Ok(StorageTable(table))
} else {
Err(Error::InvalidFormat("storage table".to_string()))
}
Expand Down
20 changes: 9 additions & 11 deletions iceberg-rust/src/materialized_view/storage_table.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
use std::ops::{Deref, DerefMut};

use iceberg_rust_spec::spec::{
materialized_view_metadata::{depends_on_tables_from_string, SourceTable},
table_metadata::TableMetadata,
use iceberg_rust_spec::spec::materialized_view_metadata::{
depends_on_tables_from_string, SourceTable,
};

use crate::error::Error;
use crate::{error::Error, table::Table};

use super::DEPENDS_ON_TABLES;

pub struct StorageTable {
pub table_metadata: TableMetadata,
}
pub struct StorageTable(pub Table);

impl Deref for StorageTable {
type Target = TableMetadata;
type Target = Table;
fn deref(&self) -> &Self::Target {
&self.table_metadata
&self.0
}
}

impl DerefMut for StorageTable {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.table_metadata
&mut self.0
}
}

impl StorageTable {
pub fn source_tables(&self) -> Result<Option<Vec<SourceTable>>, Error> {
self.properties
self.metadata()
.properties
.get(DEPENDS_ON_TABLES)
.as_ref()
.map(|x| depends_on_tables_from_string(x.as_str()))
Expand Down
Loading

0 comments on commit faaa14d

Please sign in to comment.