Skip to content

Commit

Permalink
Merge pull request #18 from JanKaul/refactor-matview
Browse files Browse the repository at this point in the history
Refactor matview
  • Loading branch information
JanKaul authored Feb 1, 2024
2 parents 72981eb + 8f8c12c commit 828417f
Show file tree
Hide file tree
Showing 32 changed files with 988 additions and 762 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
[workspace]
members = ["iceberg-rust-spec", "iceberg-rust", "datafusion_iceberg", "datafusion-iceberg-sql", "iceberg-catalog-sql"]
members = [
"iceberg-rust-spec",
"iceberg-rust",
"datafusion_iceberg",
"datafusion-iceberg-sql",
"iceberg-catalog-sql",
]

workspace.resolver = "2"

Expand All @@ -17,8 +23,9 @@ datafusion = "33.0.0"
datafusion-sql = "33.0.0"
datafusion-expr = "33.0.0"
datafusion-common = "33.0.0"
parquet = { version = "48.0.1", features = ["async","object_store"] }
parquet = { version = "48.0.1", features = ["async", "object_store"] }
sqlparser = { version = "0.39.0", features = ["visitor"] }
thiserror = "1.0"
url = "^2.2"
itertools = "0.10.5"
itertools = "0.10.5"
derive-getters = "0.3.0"
146 changes: 101 additions & 45 deletions datafusion_iceberg/src/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ use datafusion::{
datasource::{empty::EmptyTable, TableProvider},
prelude::SessionContext,
};
use futures::TryStreamExt;
use futures::{stream, StreamExt, TryStreamExt};
use iceberg_rust::{
arrow::write::write_parquet_partitioned,
catalog::CatalogList,
materialized_view::{MaterializedView, STORAGE_POSTFIX},
catalog::{identifier::Identifier, tabular::Tabular, CatalogList},
materialized_view::{MaterializedView, StorageTableState},
sql::find_relations,
};
use iceberg_rust_spec::spec::materialized_view_metadata::{
BaseTable, MaterializedViewRepresentation,
use iceberg_rust_spec::spec::{
materialized_view_metadata::BaseTable, view_metadata::ViewRepresentation,
};
use itertools::Itertools;
use itertools::{intersperse, Itertools};

use crate::{
error::Error,
Expand All @@ -23,51 +24,100 @@ use crate::{
};

pub async fn refresh_materialized_view(
matview: &MaterializedView,
matview: &mut MaterializedView,
catalog_list: Arc<dyn CatalogList>,
branch: Option<&str>,
) -> Result<(), Error> {
let ctx = SessionContext::new();

let sql = match &matview.metadata().current_version(branch)?.representations[0] {
MaterializedViewRepresentation::SqlMaterialized {
sql,
dialect: _,
format_version: _,
storage_table: _,
} => sql,
ViewRepresentation::Sql { sql, dialect: _ } => sql,
};

let version_id = matview.metadata().current_version_id;

let mut storage_table = matview.storage_table(branch).await?;
let storage_table = matview.storage_table().await?;

let branch = branch.map(ToString::to_string);

let base_tables = if storage_table.version_id(branch.clone())? == Some(version_id) {
storage_table
.base_tables(catalog_list, branch.clone())
.await?
let base_tables = match if storage_table.version_id(branch.clone())? == Some(version_id) {
storage_table.base_tables(branch.clone()).await?
} else {
storage_table
.base_tables(catalog_list, branch.clone())
.await?
storage_table.base_tables(branch.clone()).await?
} {
Some(x) => x,
None => find_relations(&sql)?
.into_iter()
.map(|x| BaseTable {
identifier: x,
snapshot_id: -1,
})
.collect::<Vec<_>>(),
};

let base_tables = stream::iter(base_tables.iter())
.then(|base_table| {
let catalog_list = catalog_list.clone();
let branch = branch.clone();
async move {
let mut parts = base_table.identifier.split('.');
let catalog_name = parts
.next()
.ok_or(Error::NotFound("".to_owned(), "Catalog".to_owned()))?
.to_owned();
let identifier: String = intersperse(parts, ".").collect();
let catalog = catalog_list
.catalog(&catalog_name)
.await
.ok_or(Error::NotFound(
"Catalog".to_owned(),
catalog_name.to_owned(),
))?;

let tabular = match catalog.load_table(&Identifier::parse(&identifier)?).await? {
Tabular::View(_) => {
return Err(Error::InvalidFormat("storage table".to_string()))
}
x => x,
};
let current_snapshot_id = match &tabular {
Tabular::Table(table) => Ok(table
.metadata()
.current_snapshot(branch.as_deref())?
.unwrap()
.snapshot_id),
Tabular::MaterializedView(mv) => Ok(mv
.storage_table()
.await?
.table_metadata
.current_snapshot(branch.as_deref())?
.unwrap()
.snapshot_id),
_ => Err(Error::InvalidFormat("storage table".to_string())),
}?;

let table_state = if current_snapshot_id == base_table.snapshot_id {
StorageTableState::Fresh
} else if base_table.snapshot_id == -1 {
StorageTableState::Invalid
} else {
StorageTableState::Outdated(base_table.snapshot_id)
};

Ok((catalog_name, tabular, table_state, current_snapshot_id))
}
})
.try_collect::<Vec<_>>()
.await?;

// Full refresh

let new_tables = base_tables
.into_iter()
.flat_map(|(catalog_name, base_table, _)| {
let identifier = base_table
.identifier()
.to_string()
.trim_end_matches(STORAGE_POSTFIX)
.to_owned();

let snapshot_id = base_table.metadata().current_snapshot_id.unwrap_or(-1);
.flat_map(|(catalog_name, base_table, _, last_snapshot_id)| {
let identifier = base_table.identifier().to_string().to_owned();

let table = Arc::new(DataFusionTable::new_table(
let table = Arc::new(DataFusionTable::new(
base_table,
None,
None,
Expand All @@ -76,11 +126,16 @@ pub async fn refresh_materialized_view(
let schema = table.schema().clone();

vec![
(catalog_name.clone(), identifier.clone(), snapshot_id, table),
(
catalog_name.clone(),
identifier.clone(),
last_snapshot_id,
table,
),
(
catalog_name,
identifier + "__delta__",
snapshot_id,
last_snapshot_id,
Arc::new(EmptyTable::new(schema)) as Arc<dyn TableProvider>,
),
]
Expand All @@ -107,9 +162,15 @@ pub async fn refresh_materialized_view(
.await?
.map_err(ArrowError::from);

let files = write_parquet_partitioned(&storage_table, batches, branch.as_deref()).await?;
let files = write_parquet_partitioned(
&storage_table.table_metadata,
batches,
matview.object_store(),
branch.as_deref(),
)
.await?;

storage_table
matview
.full_refresh(files, version_id, new_tables, branch)
.await?;

Expand Down Expand Up @@ -192,12 +253,7 @@ mod tests {
};
let partition_spec = PartitionSpecBuilder::default()
.with_spec_id(1)
.with_partition_field(PartitionField {
source_id: 4,
field_id: 1000,
name: "day".to_string(),
transform: Transform::Day,
})
.with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
.build()
.expect("Failed to create partition spec");

Expand Down Expand Up @@ -242,7 +298,7 @@ mod tests {
)
.expect("Failed to create filesystem view builder.");
builder.location("test/orders_view");
let matview = builder
let mut matview = builder
.build()
.await
.expect("Failed to create filesystem view");
Expand Down Expand Up @@ -277,7 +333,7 @@ mod tests {
)
.expect("Failed to create filesystem view builder.");
total_builder.location("test/total_orders");
let total_matview = total_builder
let mut total_matview = total_builder
.build()
.await
.expect("Failed to create filesystem view");
Expand Down Expand Up @@ -309,7 +365,7 @@ mod tests {
.await
.expect("Failed to insert values into table");

refresh_materialized_view(&matview, catalog_list.clone(), None)
refresh_materialized_view(&mut matview, catalog_list.clone(), None)
.await
.expect("Failed to refresh materialized view");

Expand Down Expand Up @@ -361,7 +417,7 @@ mod tests {
.await
.expect("Failed to insert values into table");

refresh_materialized_view(&matview, catalog_list.clone(), None)
refresh_materialized_view(&mut matview, catalog_list.clone(), None)
.await
.expect("Failed to refresh materialized view");

Expand Down Expand Up @@ -401,7 +457,7 @@ mod tests {
}
}

refresh_materialized_view(&total_matview, catalog_list.clone(), None)
refresh_materialized_view(&mut total_matview, catalog_list.clone(), None)
.await
.expect("Failed to refresh materialized view");

Expand Down
Loading

0 comments on commit 828417f

Please sign in to comment.