Skip to content

Commit

Permalink
move materialization to view properties
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Feb 20, 2024
1 parent 343f570 commit 5a14a9d
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 38 deletions.
6 changes: 3 additions & 3 deletions iceberg-rust-spec/src/spec/materialized_view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ mod tests {
"location" : "s3://bucket/warehouse/default.db/event_agg",
"current-version-id" : 1,
"properties" : {
"comment" : "Daily event counts"
"comment" : "Daily event counts",
"storage_table": "s3://bucket/path/to/metadata.json"
},
"versions" : [ {
"version-id" : 1,
Expand Down Expand Up @@ -113,8 +114,7 @@ mod tests {
"version-log" : [ {
"timestamp-ms" : 1573518431292,
"version-id" : 1
} ],
"materialization": "s3://bucket/path/to/metadata.json"
} ]
}
"#;
let metadata = serde_json::from_str::<MaterializedViewMetadata>(data)
Expand Down
48 changes: 33 additions & 15 deletions iceberg-rust-spec/src/spec/view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::{
collections::HashMap,
ops::{Deref, DerefMut},
time::{SystemTime, UNIX_EPOCH},
};

Expand All @@ -29,7 +30,7 @@ pub type ViewMetadataBuilder = GeneralViewMetadataBuilder<Option<()>>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(try_from = "ViewMetadataEnum<T>", into = "ViewMetadataEnum<T>")]
/// Fields for the version 1 of the view metadata.
pub struct GeneralViewMetadata<T: Clone> {
pub struct GeneralViewMetadata<T: Clone + Default> {
#[builder(default = "Uuid::new_v4()")]
/// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
pub view_uuid: Uuid,
Expand All @@ -54,11 +55,10 @@ pub struct GeneralViewMetadata<T: Clone> {
#[builder(default)]
/// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
/// This is not intended to be used for arbitrary metadata.
pub properties: HashMap<String, String>,
pub materialization: T,
pub properties: ViewProperties<T>,
}

impl<T: Clone> GeneralViewMetadata<T> {
impl<T: Clone + Default> GeneralViewMetadata<T> {
/// Get current schema
#[inline]
pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
Expand Down Expand Up @@ -112,20 +112,20 @@ mod _serde {
spec::{schema::SchemaV2, table_metadata::VersionNumber},
};

use super::{FormatVersion, GeneralViewMetadata, Version, VersionLogStruct};
use super::{FormatVersion, GeneralViewMetadata, Version, VersionLogStruct, ViewProperties};

/// Metadata of an iceberg view
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum ViewMetadataEnum<T> {
pub(super) enum ViewMetadataEnum<T: Clone> {
/// Version 1 of the table metadata
V1(ViewMetadataV1<T>),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Fields for the version 1 of the view metadata.
pub struct ViewMetadataV1<T> {
pub struct ViewMetadataV1<T: Clone> {
/// A UUID that identifies the view, generated when the view is created. Implementations must throw an exception if a view’s UUID does not match the expected UUID after refreshing metadata
pub view_uuid: Uuid,
/// An integer version number for the view format; must be 1
Expand All @@ -143,11 +143,10 @@ mod _serde {
pub schemas: Vec<SchemaV2>,
/// A string to string map of view properties. This is used for metadata such as “comment” and for settings that affect view maintenance.
/// This is not intended to be used for arbitrary metadata.
pub properties: Option<HashMap<String, String>>,
pub materialization: T,
pub properties: Option<ViewProperties<T>>,
}

impl<T: Clone> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
impl<T: Clone + Default> TryFrom<ViewMetadataEnum<T>> for GeneralViewMetadata<T> {
type Error = Error;
fn try_from(value: ViewMetadataEnum<T>) -> Result<Self, Self::Error> {
match value {
Expand All @@ -156,15 +155,15 @@ mod _serde {
}
}

impl<T: Clone> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
impl<T: Clone + Default> From<GeneralViewMetadata<T>> for ViewMetadataEnum<T> {
fn from(value: GeneralViewMetadata<T>) -> Self {
match value.format_version {
FormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
}
}
}

impl<T: Clone> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
impl<T: Clone + Default> TryFrom<ViewMetadataV1<T>> for GeneralViewMetadata<T> {
type Error = Error;
fn try_from(value: ViewMetadataV1<T>) -> Result<Self, Self::Error> {
Ok(GeneralViewMetadata {
Expand All @@ -175,7 +174,6 @@ mod _serde {
versions: HashMap::from_iter(value.versions.into_iter().map(|x| (x.version_id, x))),
version_log: value.version_log,
properties: value.properties.unwrap_or_default(),
materialization: value.materialization,
schemas: HashMap::from_iter(
value
.schemas
Expand All @@ -187,7 +185,7 @@ mod _serde {
}
}

impl<T: Clone> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
impl<T: Clone + Default> From<GeneralViewMetadata<T>> for ViewMetadataV1<T> {
fn from(value: GeneralViewMetadata<T>) -> Self {
ViewMetadataV1 {
view_uuid: value.view_uuid,
Expand All @@ -201,12 +199,32 @@ mod _serde {
} else {
Some(value.properties)
},
materialization: value.materialization,
schemas: value.schemas.into_values().map(Into::into).collect(),
}
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
pub struct ViewProperties<T: Clone> {
pub storage_table: T,
#[serde(flatten)]
pub other: HashMap<String, String>,
}

impl<T: Clone> Deref for ViewProperties<T> {
type Target = HashMap<String, String>;
fn deref(&self) -> &Self::Target {
&self.other
}
}

impl<T: Clone> DerefMut for ViewProperties<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.other
}
}

#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
#[repr(u8)]
/// Iceberg format version
Expand Down
9 changes: 5 additions & 4 deletions iceberg-rust/src/catalog/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,15 @@ pub fn check_table_requirements(
}

/// Check table update requirements
pub fn check_view_requirements<T: Clone + Eq + 'static>(
pub fn check_view_requirements<T: Clone + Default + Eq + 'static>(
requirements: &[ViewRequirement],
metadata: &GeneralViewMetadata<T>,
) -> bool {
requirements.iter().all(|x| match x {
ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
ViewRequirement::AssertMaterialization { materialization } => {
metadata.materialization == *(materialization as &dyn Any).downcast_ref::<T>().unwrap()
metadata.properties.storage_table
== *(materialization as &dyn Any).downcast_ref::<T>().unwrap()
}
})
}
Expand Down Expand Up @@ -373,7 +374,7 @@ pub fn apply_table_updates(
}

/// Apply updates to metadata
pub fn apply_view_updates<T: Clone + 'static>(
pub fn apply_view_updates<T: Clone + Default + 'static>(
metadata: &mut GeneralViewMetadata<T>,
updates: Vec<ViewUpdate>,
) -> Result<(), Error> {
Expand Down Expand Up @@ -411,7 +412,7 @@ pub fn apply_view_updates<T: Clone + 'static>(
metadata.current_version_id = view_version_id;
}
ViewUpdate::SetMaterialization { materialization } => {
metadata.materialization = (&materialization as &dyn Any)
metadata.properties.storage_table = (&materialization as &dyn Any)
.downcast_ref::<T>()
.cloned()
.ok_or(Error::InvalidFormat(
Expand Down
13 changes: 6 additions & 7 deletions iceberg-rust/src/materialized_view/materialized_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use iceberg_rust_spec::{
materialized_view_metadata::MaterializedViewMetadataBuilder,
schema::Schema,
table_metadata::TableMetadataBuilder,
view_metadata::{VersionBuilder, ViewRepresentation, REF_PREFIX},
view_metadata::{VersionBuilder, ViewProperties, ViewRepresentation, REF_PREFIX},
},
util::strip_prefix,
};
Expand Down Expand Up @@ -68,12 +68,11 @@ impl MaterializedViewBuilder {
.schema_id(1)
.build()?,
))
.materialization("".to_owned())
.current_version_id(1)
.properties(HashMap::from_iter(vec![(
REF_PREFIX.to_string() + "main",
1.to_string(),
)]));
.properties(ViewProperties {
storage_table: "".to_owned(),
other: HashMap::from_iter(vec![(REF_PREFIX.to_string() + "main", 1.to_string())]),
});
Ok(Self {
identifier: Identifier::parse(&identifier.to_string())?,
catalog,
Expand Down Expand Up @@ -106,7 +105,7 @@ impl MaterializedViewBuilder {
+ "-"
+ &Uuid::new_v4().to_string()
+ ".metadata.json";
metadata.materialization = table_path.clone();
metadata.properties.storage_table = table_path.clone();
let table_metadata_json = serde_json::to_string(&table_metadata)?;
object_store
.put(
Expand Down
2 changes: 1 addition & 1 deletion iceberg-rust/src/materialized_view/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl MaterializedView {
}
/// Get the storage table of the materialized view
pub async fn storage_table(&self) -> Result<StorageTable, Error> {
let storage_table_location = &self.metadata.materialization;
let storage_table_location = &self.metadata.properties.storage_table;
let bucket = parse_bucket(storage_table_location)?;
if let TabularMetadata::Table(metadata) = serde_json::from_str(std::str::from_utf8(
&self
Expand Down
4 changes: 2 additions & 2 deletions iceberg-rust/src/view/transaction/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum Operation<T: Clone> {
UpdateMaterialization(T),
}

impl<T: Clone + 'static> Operation<T> {
impl<T: Clone + Default + 'static> Operation<T> {
/// Execute operation
pub async fn execute(
self,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<T: Clone + 'static> Operation<T> {
)),
Operation::UpdateMaterialization(materialization) => {
let previous_materialization =
(&metadata.materialization as &dyn Any).downcast_ref::<String>();
(&metadata.properties.storage_table as &dyn Any).downcast_ref::<String>();
let materialization = (&materialization as &dyn Any)
.downcast_ref::<String>()
.ok_or(Error::InvalidFormat(
Expand Down
11 changes: 5 additions & 6 deletions iceberg-rust/src/view/view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::catalog::tabular::Tabular;
use crate::error::Error;
use iceberg_rust_spec::spec::schema::Schema;
use iceberg_rust_spec::spec::view_metadata::{
VersionBuilder, ViewMetadataBuilder, ViewRepresentation, REF_PREFIX,
VersionBuilder, ViewMetadataBuilder, ViewProperties, ViewRepresentation, REF_PREFIX,
};

use super::Catalog;
Expand Down Expand Up @@ -60,12 +60,11 @@ impl ViewBuilder {
.schema_id(1)
.build()?,
))
.materialization(None)
.current_version_id(1)
.properties(HashMap::from_iter(vec![(
REF_PREFIX.to_string() + "main",
1.to_string(),
)]));
.properties(ViewProperties {
storage_table: None,
other: HashMap::from_iter(vec![(REF_PREFIX.to_string() + "main", 1.to_string())]),
});
Ok(ViewBuilder {
metadata: builder,
identifier,
Expand Down

0 comments on commit 5a14a9d

Please sign in to comment.