Skip to content

Commit

Permalink
chore: implement Snapshot for current table state
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 11, 2023
1 parent 6d107c9 commit 06e34ab
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 4 deletions.
16 changes: 12 additions & 4 deletions crates/deltalake-core/src/kernel/arrow/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ use crate::table::config::TableConfig;
pub trait Snapshot: std::fmt::Display + Send + Sync + std::fmt::Debug + 'static {
/// The version of the table at this [`Snapshot`].
fn version(&self) -> i64;

/// Table [`Schema`](crate::kernel::schema::StructType) at this [`Snapshot`]'s version.
fn schema(&self) -> DeltaResult<StructType>;
fn schema(&self) -> Option<&StructType>;

/// Table [`Metadata`] at this [`Snapshot`]'s version.
fn metadata(&self) -> DeltaResult<Metadata>;

/// Table [`Protocol`] at this [`Snapshot`]'s version.
fn protocol(&self) -> DeltaResult<Protocol>;

/// Iterator over the [`Add`] actions at this [`Snapshot`]'s version.
fn files(&self) -> DeltaResult<Box<dyn Iterator<Item = Add> + '_>>;

/// Well known table [configuration](crate::table::config::TableConfig).
fn table_config(&self) -> TableConfig<'_>;
}
Expand All @@ -49,6 +54,7 @@ pub struct TableStateArrow {
version: i64,
actions: RecordBatch,
metadata: Metadata,
schema: StructType,
protocol: Protocol,
}

Expand All @@ -69,11 +75,13 @@ impl TableStateArrow {
_ => None,
})
.ok_or(Error::Generic("expected protocol".into()))?;
let schema = serde_json::from_str(&metadata.schema_string)?;
Ok(Self {
version,
actions,
metadata,
protocol,
schema,
})
}

Expand Down Expand Up @@ -106,8 +114,8 @@ impl Snapshot for TableStateArrow {
}

/// Table [`Schema`](crate::kernel::schema::StructType) at this [`Snapshot`]'s version.
fn schema(&self) -> DeltaResult<StructType> {
self.metadata()?.schema()
fn schema(&self) -> Option<&StructType> {
Some(&self.schema)
}

/// Table [`Protocol`] at this [`Snapshot`]'s version.
Expand Down Expand Up @@ -520,7 +528,7 @@ mod tests {
let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
let expected: StructType = serde_json::from_str(schema_string).unwrap();
let schema = snapshot.schema().unwrap();
assert_eq!(schema, expected);
assert_eq!(schema, &expected);
}

#[tokio::test]
Expand Down
71 changes: 71 additions & 0 deletions crates/deltalake-core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};

use super::config::TableConfig;
use crate::errors::DeltaTableError;
use crate::kernel::arrow::snapshot::Snapshot;
use crate::kernel::error::DeltaResult;
use crate::kernel::{
Action, Add, CommitInfo, DataType, DomainMetadata, ReaderFeatures, Remove, StructType,
WriterFeatures,
Expand Down Expand Up @@ -54,6 +56,75 @@ pub struct DeltaTableState {
enable_expired_log_cleanup: bool,
}

impl std::fmt::Display for DeltaTableState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.schema())
}
}

impl Snapshot for DeltaTableState {
fn version(&self) -> i64 {
self.version
}

fn schema(&self) -> Option<&StructType> {
self.schema()
}

fn files(&self) -> DeltaResult<Box<dyn Iterator<Item = Add> + '_>> {
Ok(Box::new(self.files().clone().into_iter()))
}

fn table_config(&self) -> TableConfig<'_> {
self.table_config()
}

fn protocol(&self) -> DeltaResult<crate::kernel::Protocol> {
Ok(crate::kernel::Protocol {
min_reader_version: self.min_reader_version,
min_writer_version: self.min_writer_version,
reader_features: self.reader_features.clone(),
writer_features: self.writer_features.clone(),
})
}

fn metadata(&self) -> DeltaResult<crate::kernel::Metadata> {
Ok(crate::kernel::Metadata {
id: self
.current_metadata
.as_ref()
.map(|m| m.id.clone())
.unwrap_or("00000000-0000-0000-0000-000000000000".into()),
name: self.current_metadata.as_ref().and_then(|m| m.name.clone()),
description: self
.current_metadata
.as_ref()
.and_then(|m| m.description.clone()),
format: self
.current_metadata
.as_ref()
.map(|m| m.format.clone())
.unwrap_or_default(),
schema_string: self
.current_metadata
.as_ref()
.map(|m| serde_json::to_string(&m.schema).unwrap())
.unwrap_or_default(),
partition_columns: self
.current_metadata
.as_ref()
.map(|m| m.partition_columns.clone())
.unwrap_or_default(),
configuration: self
.current_metadata
.as_ref()
.map(|m| m.configuration.clone())
.unwrap_or_default(),
created_time: self.current_metadata.as_ref().and_then(|m| m.created_time),
})
}
}

impl DeltaTableState {
/// Create Table state with specified version
pub fn with_version(version: i64) -> Self {
Expand Down

0 comments on commit 06e34ab

Please sign in to comment.