Skip to content

Commit

Permalink
feat: protocol & metadata query
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 5, 2024
1 parent 521f594 commit 25a2ca0
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 114 deletions.
8 changes: 4 additions & 4 deletions crates/deltalake-core/src/kernel/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ impl Display for PrimitiveType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
PrimitiveType::String => write!(f, "string"),
PrimitiveType::Long => write!(f, "bigint"),
PrimitiveType::Integer => write!(f, "int"),
PrimitiveType::Short => write!(f, "smallint"),
PrimitiveType::Byte => write!(f, "tinyint"),
PrimitiveType::Long => write!(f, "long"),
PrimitiveType::Integer => write!(f, "integer"),
PrimitiveType::Short => write!(f, "short"),
PrimitiveType::Byte => write!(f, "byte"),
PrimitiveType::Float => write!(f, "float"),
PrimitiveType::Double => write!(f, "double"),
PrimitiveType::Boolean => write!(f, "boolean"),
Expand Down
277 changes: 174 additions & 103 deletions crates/deltalake-core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::cmp::Ordering;
use std::sync::Arc;
use std::task::{ready, Poll};

use arrow_array::RecordBatch;
use arrow_array::{
Array, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, StructArray,
};
use arrow_json::{reader::Decoder, ReaderBuilder};
use bytes::{Buf, Bytes};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
Expand All @@ -18,8 +20,12 @@ use regex::Regex;
use serde::{Deserialize, Serialize};

use crate::kernel::schema::Schema;
use crate::kernel::snapshot::extract::{extract_and_cast, extract_and_cast_opt};
use crate::kernel::{ActionType, Metadata, Protocol, StructType};
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::extract::{read_primitive, read_primitive_opt, read_str, read_str_opt};

const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
const BATCH_SIZE: usize = 1024;

Expand Down Expand Up @@ -68,7 +74,7 @@ impl PathExt for Path {

pub(super) struct LogSegment {
pub version: i64,
pub log_root: Path,
pub table_root: Path,
pub commit_files: Vec<ObjectMeta>,
pub checkpoint_files: Vec<ObjectMeta>,
}
Expand Down Expand Up @@ -115,7 +121,7 @@ impl LogSegment {

Ok(Self {
version: version_eff,
log_root: log_url,
table_root: table_root.clone(),
commit_files,
checkpoint_files,
})
Expand Down Expand Up @@ -163,6 +169,146 @@ impl LogSegment {
.map_err(Into::into)
.boxed()
}

/// Read Protocol and Metadata actions
pub(super) async fn read_metadata(
&self,
store: Arc<dyn ObjectStore>,
config: &DeltaTableConfig,
) -> DeltaResult<(Protocol, Metadata)> {
lazy_static::lazy_static! {
static ref READ_SCHEMA: StructType = StructType::new(vec![
ActionType::Protocol.schema_field().clone(),
ActionType::Metadata.schema_field().clone(),
]);
}

let mut maybe_protocol = None;
let mut maybe_metadata = None;

let mut commit_stream = self.commit_stream(store.clone(), &READ_SCHEMA, config)?;
while let Some(batch) = commit_stream.next().await {
let batch = batch?;
if maybe_protocol.is_none() {
if let Some(p) = read_protocol(&batch)? {
maybe_protocol.replace(p);
};
}
if maybe_metadata.is_none() {
if let Some(m) = read_metadata(&batch)? {
maybe_metadata.replace(m);
};
}
if maybe_protocol.is_some() && maybe_metadata.is_some() {
return Ok((maybe_protocol.unwrap(), maybe_metadata.unwrap()));
}
}

let mut checkpoint_stream = self.checkpoint_stream(store.clone(), &READ_SCHEMA, config);
while let Some(batch) = checkpoint_stream.next().await {
let batch = batch?;
if maybe_protocol.is_none() {
if let Some(p) = read_protocol(&batch)? {
maybe_protocol.replace(p);
};
}
if maybe_metadata.is_none() {
if let Some(m) = read_metadata(&batch)? {
maybe_metadata.replace(m);
};
}
if maybe_protocol.is_some() && maybe_metadata.is_some() {
return Ok((maybe_protocol.unwrap(), maybe_metadata.unwrap()));
}
}

match (maybe_protocol, maybe_metadata) {
(Some(protocol), Some(metadata)) => Ok((protocol, metadata)),
(Some(_), None) => Err(DeltaTableError::Generic(
"Missing metadata action".to_string(),
)),
(None, Some(_)) => Err(DeltaTableError::Generic(
"Missing protocol action".to_string(),
)),
(None, None) => Err(DeltaTableError::Generic(
"Missing protocol and metadata actions".to_string(),
)),
}
}
}

fn read_protocol(batch: &RecordBatch) -> DeltaResult<Option<Protocol>> {
if let Some(arr) = extract_and_cast_opt::<StructArray>(batch, "protocol") {
let min_reader_version = extract_and_cast::<Int32Array>(arr, "minReaderVersion")?;
let min_writer_version = extract_and_cast::<Int32Array>(arr, "minWriterVersion")?;
let maybe_reader_features = extract_and_cast_opt::<ListArray>(arr, "readerFeatures");
let maybe_writer_features = extract_and_cast_opt::<ListArray>(arr, "writerFeatures");

for idx in 0..arr.len() {
if arr.is_valid(idx) {
let reader_features = maybe_reader_features.map(|arr| {
let value = arr.value(idx);
let val = value.as_any().downcast_ref::<StringArray>();
match val {
Some(val) => val
.iter()
.filter_map(|s| s.map(|i| i.try_into().unwrap()))
.collect::<std::collections::HashSet<_>>(),
None => std::collections::HashSet::new(),
}
});

let writer_features = maybe_writer_features.map(|arr| {
let value = arr.value(idx);
let val = value.as_any().downcast_ref::<StringArray>();
match val {
Some(val) => val
.iter()
.filter_map(|s| s.map(|i| i.try_into().unwrap()))
.collect::<std::collections::HashSet<_>>(),
None => std::collections::HashSet::new(),
}
});

return Ok(Some(Protocol {
min_reader_version: read_primitive(min_reader_version, idx)?,
min_writer_version: read_primitive(min_writer_version, idx)?,
reader_features,
writer_features,
}));
}
}
}
Ok(None)
}

fn read_metadata(batch: &RecordBatch) -> DeltaResult<Option<Metadata>> {
if let Some(arr) = extract_and_cast_opt::<StructArray>(batch, "metaData") {
let id = extract_and_cast::<StringArray>(arr, "id")?;
let name = extract_and_cast::<StringArray>(arr, "name")?;
let description = extract_and_cast::<StringArray>(arr, "description")?;
// let format = extract_and_cast::<StringArray>(arr, "format")?;
let schema_string = extract_and_cast::<StringArray>(arr, "schemaString")?;
// let partition_columns = extract_and_cast::<StringArray>(arr, "partitionColumns")?;
// let configuration = extract_and_cast::<StringArray>(arr, "configuration")?;
let created_time = extract_and_cast::<Int64Array>(arr, "createdTime")?;

for idx in 0..arr.len() {
if arr.is_valid(idx) {
return Ok(Some(Metadata {
id: read_str(id, idx)?.to_string(),
name: read_str_opt(name, idx).map(|s| s.to_string()),
description: read_str_opt(description, idx).map(|s| s.to_string()),
format: Default::default(),
schema_string: read_str(schema_string, idx)?.to_string(),
partition_columns: Default::default(),
configuration: Default::default(),
created_time: read_primitive_opt(created_time, idx),
}));
}
}
}
Ok(None)
}

fn decode_stream<S: Stream<Item = ObjectStoreResult<Bytes>> + Unpin>(
Expand Down Expand Up @@ -389,104 +535,29 @@ mod tests {

Ok(())
}
}

// #[cfg(test)]
// mod tests {
// use super::*;
//
// use std::collections::HashMap;
// use std::path::PathBuf;
//
// use object_store::local::LocalFileSystem;
// use object_store::path::Path;
//
// use crate::kernel::executor::tokio::TokioBackgroundExecutor;
// use crate::kernel::filesystem::ObjectStoreFileSystemClient;
// use crate::kernel::schema::StructType;
//
// #[test]
// fn test_snapshot_read_metadata() {
// let path =
// std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
// let url = url::Url::from_directory_path(path).unwrap();
//
// let client = default_table_client(&url);
// let snapshot = Snapshot::try_new(url, &client, Some(1)).unwrap();
//
// let expected = Protocol {
// min_reader_version: 3,
// min_writer_version: 7,
// reader_features: Some(vec!["deletionVectors".into()]),
// writer_features: Some(vec!["deletionVectors".into()]),
// };
// assert_eq!(snapshot.protocol(), &expected);
//
// let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
// let expected: StructType = serde_json::from_str(schema_string).unwrap();
// assert_eq!(snapshot.schema(), &expected);
// }
//
// #[test]
// fn test_new_snapshot() {
// let path =
// std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
// let url = url::Url::from_directory_path(path).unwrap();
//
// let client = default_table_client(&url);
// let snapshot = Snapshot::try_new(url, &client, None).unwrap();
//
// let expected = Protocol {
// min_reader_version: 3,
// min_writer_version: 7,
// reader_features: Some(vec!["deletionVectors".into()]),
// writer_features: Some(vec!["deletionVectors".into()]),
// };
// assert_eq!(snapshot.protocol(), &expected);
//
// let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
// let expected: StructType = serde_json::from_str(schema_string).unwrap();
// assert_eq!(snapshot.schema(), &expected);
// }
//
// #[test]
// fn test_read_table_with_last_checkpoint() {
// let path = std::fs::canonicalize(PathBuf::from(
// "./tests/data/table-with-dv-small/_delta_log/",
// ))
// .unwrap();
// let url = url::Url::from_directory_path(path).unwrap();
//
// let store = Arc::new(LocalFileSystem::new());
// let prefix = Path::from(url.path());
// let client = ObjectStoreFileSystemClient::new(
// store,
// prefix,
// Arc::new(TokioBackgroundExecutor::new()),
// );
// let cp = read_last_checkpoint(&client, &url).unwrap();
// assert!(cp.is_none())
// }
//
// #[test]
// fn test_read_table_with_checkpoint() {
// let path = std::fs::canonicalize(PathBuf::from(
// "./tests/data/with_checkpoint_no_last_checkpoint/",
// ))
// .unwrap();
// let location = url::Url::from_directory_path(path).unwrap();
// let table_client = default_table_client(&location);
// let snapshot = Snapshot::try_new(location, &table_client, None).unwrap();
//
// assert_eq!(snapshot.log_segment.checkpoint_files.len(), 1);
// assert_eq!(
// LogPath(&snapshot.log_segment.checkpoint_files[0].location).commit_version(),
// Some(2)
// );
// assert_eq!(snapshot.log_segment.commit_files.len(), 1);
// assert_eq!(
// LogPath(&snapshot.log_segment.commit_files[0].location).commit_version(),
// Some(3)
// );
// }
// }
#[tokio::test]
async fn test_snapshot_read_metadata() -> TestResult {
let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?;
context.load_table(TestTables::WithDvSmall).await?;

let store = context
.table_builder(TestTables::WithDvSmall)
.build_storage()?
.object_store();
let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?;
let (protocol, _metadata) = segment
.read_metadata(store.clone(), &Default::default())
.await?;

let expected = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
writer_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
};
assert_eq!(protocol, expected);

Ok(())
}
}
Loading

0 comments on commit 25a2ca0

Please sign in to comment.