Skip to content

Commit

Permalink
update materialized views
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Aug 1, 2024
1 parent 076e2e7 commit a200100
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 103 deletions.
25 changes: 14 additions & 11 deletions datafusion_iceberg/src/materialized_view.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use datafusion::{
arrow::error::ArrowError,
Expand All @@ -7,7 +7,7 @@ use datafusion::{
};
use futures::{stream, StreamExt, TryStreamExt};
use iceberg_rust::spec::{
materialized_view_metadata::RefreshTable,
materialized_view_metadata::RefreshState,
view_metadata::{FullIdentifier, ViewRepresentation},
};
use iceberg_rust::{
Expand Down Expand Up @@ -132,7 +132,7 @@ pub async fn refresh_materialized_view(
}

// Register source tables in datafusion context and return lineage information
let refresh_tables = source_tables
let source_table_states = source_tables
.into_iter()
.flat_map(|(identifier, source_table, _, last_snapshot_id)| {
let table = Arc::new(DataFusionTable::new(
Expand Down Expand Up @@ -164,16 +164,13 @@ pub async fn refresh_materialized_view(
.filter_ok(|(identifier, _)| !identifier.name().ends_with("__delta__"))
.map(|x| {
let (identifer, snapshot_id) = x?;
let sequence_id = *lineage.get(&identifer).ok_or(Error::NotFound(
let sequence_id = lineage.get(&identifer).ok_or(Error::NotFound(
"Lineage entry".to_owned(),
identifer.to_string(),
))?;
Ok(RefreshTable {
sequence_id,
revision_id: snapshot_id,
})
Ok((sequence_id.clone(), snapshot_id))
})
.collect::<Result<_, Error>>()?;
.collect::<Result<HashMap<String, i64>, Error>>()?;

let sql_statements = transform_relations(sql)?;

Expand All @@ -196,10 +193,16 @@ pub async fn refresh_materialized_view(
)
.await?;

let version_id = matview.metadata().current_version_id;
let refresh_version_id = matview.metadata().current_version_id;

let refresh_state = RefreshState {
refresh_version_id,
source_table_states,
source_view_states: HashMap::new(),
};
matview
.new_transaction(branch.as_deref())
.full_refresh(files, refresh_tables, version_id)?
.full_refresh(files, refresh_state)?
.commit()
.await?;

Expand Down
57 changes: 10 additions & 47 deletions iceberg-rust-spec/src/spec/materialized_view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
* A Struct for the materialized view metadata
*/

use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::view_metadata::FullIdentifier;

use super::view_metadata::{GeneralViewMetadata, GeneralViewMetadataBuilder};

/// Property for the metadata location
pub static STORAGE_TABLE: &str = "storage_table";
pub static REFRESH_STATE: &str = "refresh-state";

/// Fields for the version 1 of the view metadata.
pub type MaterializedViewMetadata = GeneralViewMetadata<FullIdentifier>;
Expand All @@ -19,11 +20,13 @@ pub type MaterializedViewMetadataBuilder = GeneralViewMetadataBuilder<FullIdenti
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Freshness information of the materialized view
pub struct RefreshTable {
/// Sequence id in the materialized view lineage
pub sequence_id: i64,
/// Snapshot id of the base table when the refresh operation was performed.
pub revision_id: i64,
pub struct RefreshState {
/// The version-id of the materialized view when the refresh operation was performed.
pub refresh_version_id: i64,
/// A map from sequence-id (as defined in the view lineage) to the source tables’ snapshot-id of when the last refresh operation was performed.
pub source_table_states: HashMap<String, i64>,
/// A map from sequence-id (as defined in the view lineage) to the source views’ version-id of when the last refresh operation was performed.
pub source_view_states: HashMap<String, i64>,
}

#[cfg(test)]
Expand Down Expand Up @@ -97,44 +100,4 @@ mod tests {

Ok(())
}

// #[test]
// fn test_depends_on_tables_try_from_str() {
// let input = "table1=1,table2=2";

// let result = depends_on_tables_from_string(input).unwrap();

// assert_eq!(
// result,
// vec![
// RefreshTable {
// identifier: "table1".to_string(),
// revision_id: 1
// },
// RefreshTable {
// identifier: "table2".to_string(),
// revision_id: 2
// }
// ]
// );
// }

// #[test]
// fn test_try_from_depends_on_tables_to_string() {
// let depends_on_tables = vec![
// RefreshTable {
// identifier: "table1".to_string(),
// revision_id: 1,
// },
// RefreshTable {
// identifier: "table2".to_string(),
// revision_id: 2,
// },
// ];

// let result = depends_on_tables_to_string(&depends_on_tables);

// assert!(result.is_ok());
// assert_eq!(result.unwrap(), "table1=1,table2=2");
// }
}
3 changes: 0 additions & 3 deletions iceberg-rust-spec/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use super::{

use _serde::SnapshotEnum;

pub static REFRESH_TABLES: &str = "refresh-tables";
pub static REFRESH_VERSION_ID: &str = "refresh-version-id";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, Getters)]
#[serde(from = "SnapshotEnum", into = "SnapshotEnum")]
#[builder(setter(prefix = "with"))]
Expand Down
8 changes: 4 additions & 4 deletions iceberg-rust-spec/src/spec/view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,18 +490,18 @@ impl Display for FullIdentifier {
#[serde(rename_all = "kebab-case")]
pub struct SourceTable {
identifier: FullIdentifier,
sequence_id: i64,
sequence_id: String,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(from = "Vec<SourceTable>", into = "Vec<SourceTable>")]
pub struct Lineage(HashMap<FullIdentifier, i64>);
pub struct Lineage(HashMap<FullIdentifier, String>);

impl Lineage {
pub fn new() -> Self {
Lineage(HashMap::new())
}
pub fn from_iter<T: IntoIterator<Item = (FullIdentifier, i64)>>(iter: T) -> Self {
pub fn from_iter<T: IntoIterator<Item = (FullIdentifier, String)>>(iter: T) -> Self {
Lineage(HashMap::from_iter(iter))
}
}
Expand All @@ -528,7 +528,7 @@ impl From<Lineage> for Vec<SourceTable> {
}

impl Deref for Lineage {
type Target = HashMap<FullIdentifier, i64>;
type Target = HashMap<FullIdentifier, String>;
fn deref(&self) -> &Self::Target {
&self.0
}
Expand Down
2 changes: 1 addition & 1 deletion iceberg-rust/src/catalog/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl CreateMaterializedViewBuilder {
create.view_version.default_namespace().as_deref(),
create.view_version.default_catalog().as_deref(),
)?,
i as i64,
i.to_string(),
))
})
.collect::<Result<Vec<_>, _>>()?,
Expand Down
31 changes: 11 additions & 20 deletions iceberg-rust/src/materialized_view/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::{
ops::{Deref, DerefMut},
};

use iceberg_rust_spec::{
snapshot::REFRESH_VERSION_ID,
spec::{materialized_view_metadata::RefreshTable, snapshot::REFRESH_TABLES},
};
use iceberg_rust_spec::materialized_view_metadata::{RefreshState, REFRESH_STATE};

use crate::{error::Error, table::Table};

Expand Down Expand Up @@ -35,23 +32,17 @@ impl StorageTable {
&self,
version_id: i64,
branch: Option<String>,
) -> Result<Option<HashMap<i64, i64>>, Error> {
) -> Result<Option<HashMap<String, i64>>, Error> {
let current_snapshot = self.metadata().current_snapshot(branch.as_deref())?;
if Some(version_id)
== current_snapshot
.and_then(|snapshot| snapshot.summary().other.get(REFRESH_VERSION_ID))
.map(|x| str::parse::<i64>(x))
.transpose()?
{
current_snapshot
.and_then(|snapshot| snapshot.summary().other.get(REFRESH_TABLES))
.map(|x| {
serde_json::from_str::<Vec<RefreshTable>>(x).map(|x| {
HashMap::from_iter(x.into_iter().map(|y| (y.sequence_id, y.revision_id)))
})
})
.transpose()
.map_err(Error::from)
let refresh_state = current_snapshot
.and_then(|snapshot| snapshot.summary().other.get(REFRESH_STATE))
.map(|x| serde_json::from_str::<RefreshState>(&x))
.transpose()?;
let Some(refresh_state) = refresh_state else {
return Ok(None);
};
if version_id == refresh_state.refresh_version_id {
Ok(Some(refresh_state.source_table_states))
} else {
Ok(None)
}
Expand Down
28 changes: 12 additions & 16 deletions iceberg-rust/src/materialized_view/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
use std::collections::HashMap;

use iceberg_rust_spec::{
snapshot::{REFRESH_TABLES, REFRESH_VERSION_ID},
spec::{
manifest::DataFile, materialized_view_metadata::RefreshTable, types::StructType,
view_metadata::ViewRepresentation,
},
materialized_view_metadata::{RefreshState, REFRESH_STATE},
spec::{manifest::DataFile, types::StructType, view_metadata::ViewRepresentation},
};

use crate::{
Expand Down Expand Up @@ -69,10 +66,9 @@ impl<'view> Transaction<'view> {
pub fn full_refresh(
mut self,
files: Vec<DataFile>,
lineage: Vec<RefreshTable>,
version_id: i64,
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_tables = serde_json::to_string(&lineage)?;
let refresh_state = serde_json::to_string(&refresh_state)?;
self.storage_table_operations
.entry(REWRITE_KEY.to_owned())
.and_modify(|mut x| {
Expand All @@ -83,19 +79,19 @@ impl<'view> Transaction<'view> {
} = &mut x
{
old.extend_from_slice(&files);
*old_lineage = Some(HashMap::from_iter(vec![
(REFRESH_VERSION_ID.to_owned(), version_id.to_string()),
(REFRESH_TABLES.to_owned(), refresh_tables.clone()),
]));
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
)]));
}
})
.or_insert(TableOperation::Rewrite {
branch: self.branch.clone(),
files,
additional_summary: Some(HashMap::from_iter(vec![
(REFRESH_VERSION_ID.to_owned(), version_id.to_string()),
(REFRESH_TABLES.to_owned(), refresh_tables),
])),
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
)])),
});
Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion iceberg-rust/src/view/transaction/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Operation {
version.default_namespace().as_deref(),
version.default_catalog().as_deref(),
)?,
i as i64,
i.to_string(),
))
})
.collect::<Result<Vec<_>, _>>()?,
Expand Down

0 comments on commit a200100

Please sign in to comment.