diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index a64832e00b..cb938c9ff6 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -33,6 +33,7 @@ parquet = { workspace = true, features = [ "async", "object_store", ], optional = true } +pin-project-lite = "^0.2.7" # datafusion datafusion = { workspace = true, optional = true } @@ -49,6 +50,7 @@ serde_json = { workspace = true } # "stdlib" bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } +hashbrown = "*" regex = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } diff --git a/crates/deltalake-core/src/kernel/mod.rs b/crates/deltalake-core/src/kernel/mod.rs index 7d8f301d6d..2fa43c0fb6 100644 --- a/crates/deltalake-core/src/kernel/mod.rs +++ b/crates/deltalake-core/src/kernel/mod.rs @@ -14,6 +14,7 @@ pub use actions::*; pub use error::*; pub use expressions::*; pub use schema::*; +pub use snapshot::*; /// A trait for all kernel types that are used as part of data checking pub trait DataCheck { diff --git a/crates/deltalake-core/src/kernel/snapshot/extract.rs b/crates/deltalake-core/src/kernel/snapshot/extract.rs new file mode 100644 index 0000000000..04420cfeb5 --- /dev/null +++ b/crates/deltalake-core/src/kernel/snapshot/extract.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; + +use arrow_array::{ + Array, ArrowNativeTypeOp, ArrowNumericType, BooleanArray, ListArray, MapArray, PrimitiveArray, + RecordBatch, StringArray, StructArray, +}; +use arrow_schema::{ArrowError, DataType}; + +use crate::{DeltaResult, DeltaTableError}; + +pub(crate) trait ProvidesColumnByName { + fn column_by_name(&self, name: &str) -> Option<&Arc>; +} + +impl ProvidesColumnByName for RecordBatch { + fn column_by_name(&self, name: &str) -> Option<&Arc> { + self.column_by_name(name) + } +} + +impl ProvidesColumnByName for StructArray { + fn column_by_name(&self, name: &str) -> Option<&Arc> { + self.column_by_name(name) + } +} + +pub(super) fn extract_and_cast<'a, T: Array + 'static>( + arr: &'a dyn ProvidesColumnByName, + name: &'a str, +) -> DeltaResult<&'a T> { + extract_and_cast_opt::(arr, name).ok_or(DeltaTableError::Generic(format!( + "missing-column: {}", + name + ))) +} + +pub(super) fn extract_and_cast_opt<'a, T: Array + 'static>( + array: &'a dyn ProvidesColumnByName, + name: &'a str, +) -> Option<&'a T> { + let mut path_steps = name.split('.'); + let first = path_steps.next()?; + extract_column(array, first, &mut path_steps) + .ok()? + .as_any() + .downcast_ref::() +} + +pub(super) fn extract_column<'a>( + array: &'a dyn ProvidesColumnByName, + path_step: &str, + remaining_path_steps: &mut impl Iterator, +) -> Result<&'a Arc, ArrowError> { + let child = array + .column_by_name(path_step) + .ok_or(ArrowError::SchemaError(format!( + "No such field: {}", + path_step, + )))?; + + if let Some(next_path_step) = remaining_path_steps.next() { + match child.data_type() { + DataType::Map(_, _) => { + // NOTE a map has exatly one child, but we wnat to be agnostic of its name. + // so we case the current array as map, and use the entries accessor. + let maparr = column_as_map(path_step, &Some(child))?; + if let Some(next_path) = remaining_path_steps.next() { + extract_column(maparr.entries(), next_path, remaining_path_steps) + } else { + Ok(child) + // if maparr.entries().num_columns() != 2 { + // return Err(ArrowError::SchemaError(format!( + // "Map {} has {} columns, expected 2", + // path_step, + // maparr.entries().num_columns() + // ))); + // } + // if next_path_step == *maparr.entries().column_names().first().unwrap() { + // Ok(maparr.entries().column(0)) + // } else { + // Ok(maparr.entries().column(1)) + // } + } + } + DataType::List(_) => { + let listarr = column_as_list(path_step, &Some(child))?; + if let Some(next_path) = remaining_path_steps.next() { + extract_column( + column_as_struct(next_path_step, &Some(listarr.values()))?, + next_path, + remaining_path_steps, + ) + } else { + Ok(listarr.values()) + } + } + _ => extract_column( + column_as_struct(path_step, &Some(child))?, + next_path_step, + remaining_path_steps, + ), + } + } else { + Ok(child) + } +} + +fn column_as_struct<'a>( + name: &str, + column: &Option<&'a Arc>, +) -> Result<&'a StructArray, ArrowError> { + column + .ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))? + .as_any() + .downcast_ref::() + .ok_or(ArrowError::SchemaError(format!("{} is not a struct", name))) +} + +fn column_as_map<'a>( + name: &str, + column: &Option<&'a Arc>, +) -> Result<&'a MapArray, ArrowError> { + column + .ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))? + .as_any() + .downcast_ref::() + .ok_or(ArrowError::SchemaError(format!("{} is not a map", name))) +} + +fn column_as_list<'a>( + name: &str, + column: &Option<&'a Arc>, +) -> Result<&'a ListArray, ArrowError> { + column + .ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))? + .as_any() + .downcast_ref::() + .ok_or(ArrowError::SchemaError(format!("{} is not a map", name))) +} + +#[inline] +pub(super) fn read_str<'a>(arr: &'a StringArray, idx: usize) -> DeltaResult<&'a str> { + read_str_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) +} + +#[inline] +pub(super) fn read_str_opt<'a>(arr: &'a StringArray, idx: usize) -> Option<&'a str> { + arr.is_valid(idx).then(|| arr.value(idx)) +} + +#[inline] +pub(super) fn read_primitive(arr: &PrimitiveArray, idx: usize) -> DeltaResult +where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, +{ + read_primitive_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) +} + +#[inline] +pub(super) fn read_primitive_opt(arr: &PrimitiveArray, idx: usize) -> Option +where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, +{ + arr.is_valid(idx).then(|| arr.value(idx)) +} + +#[inline] +pub(super) fn read_bool(arr: &BooleanArray, idx: usize) -> DeltaResult { + read_bool_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) +} + +#[inline] +pub(super) fn read_bool_opt(arr: &BooleanArray, idx: usize) -> Option { + arr.is_valid(idx).then(|| arr.value(idx)) +} diff --git a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs index 5828c86538..c66a5f4400 100644 --- a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs +++ b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs @@ -118,7 +118,7 @@ impl LogSegment { }) } - pub(super) fn log_stream( + pub(super) fn commit_stream( &self, store: Arc, read_schema: &Schema, diff --git a/crates/deltalake-core/src/kernel/snapshot/mod.rs b/crates/deltalake-core/src/kernel/snapshot/mod.rs index a31c208e6f..2075dbe961 100644 --- a/crates/deltalake-core/src/kernel/snapshot/mod.rs +++ b/crates/deltalake-core/src/kernel/snapshot/mod.rs @@ -1,2 +1,96 @@ +use std::sync::Arc; + +use arrow_array::RecordBatch; +use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::ObjectStore; + +use self::log_segment::LogSegment; +use self::replay::ReplayStream; +use crate::kernel::{actions::ActionType, StructType}; +use crate::{DeltaResult, DeltaTableConfig}; + mod extract; mod log_segment; +mod replay; + +/// A snapshot of a Delta table +pub struct Snapshot { + log_segment: LogSegment, + store: Arc, + config: DeltaTableConfig, +} + +impl Snapshot { + /// Create a new snapshot from a log segment + pub async fn try_new( + table_root: &Path, + store: Arc, + config: DeltaTableConfig, + version: Option, + ) -> DeltaResult { + let log_segment = LogSegment::try_new(table_root, version, store.as_ref()).await?; + Ok(Self { + log_segment, + store, + config, + }) + } + + /// Get the table version of the snapshot + pub fn version(&self) -> i64 { + self.log_segment.version + } + + /// Get the files in the snapshot + pub fn files(&self) -> DeltaResult>>> { + let read_schema = Arc::new(StructType::new(vec![ + ActionType::Add.schema_field().clone(), + ActionType::Remove.schema_field().clone(), + ])); + + let batches = + self.log_segment + .commit_stream(self.store.clone(), &read_schema, &self.config)?; + + Ok(ReplayStream::new(batches)) + } +} + +#[cfg(test)] +mod tests { + use deltalake_test::utils::*; + + use super::*; + + #[tokio::test] + async fn test_snapshot_files() -> TestResult { + let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?; + context.load_table(TestTables::Checkpoints).await?; + context.load_table(TestTables::Simple).await?; + + let store = context + .table_builder(TestTables::Simple) + .build_storage()? + .object_store(); + + let snapshot = + Snapshot::try_new(&Path::default(), store.clone(), Default::default(), None).await?; + let batches = snapshot.files()?.try_collect::>().await?; + + let expected = [ + "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| add |", + "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| {path: part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet, partitionValues: {}, size: 262, modificationTime: 1587968626000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |", + "| {path: part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet, partitionValues: {}, size: 262, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |", + "| {path: part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |", + "| {path: part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |", + "| {path: part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet, partitionValues: {}, size: 429, modificationTime: 1587968602000, dataChange: true, stats: , tags: , deletionVector: , baseRowId: , defaultRowCommitVersion: , clusteringProvider: } |", + "+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } +} diff --git a/crates/deltalake-core/src/kernel/snapshot/replay.rs b/crates/deltalake-core/src/kernel/snapshot/replay.rs new file mode 100644 index 0000000000..23b7f3c478 --- /dev/null +++ b/crates/deltalake-core/src/kernel/snapshot/replay.rs @@ -0,0 +1,299 @@ +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use arrow_arith::boolean::{is_not_null, or}; +use arrow_array::{Array, BooleanArray, Int32Array, RecordBatch, StringArray, StructArray}; +use arrow_select::filter::filter_record_batch; +use futures::Stream; +use hashbrown::HashSet; +use pin_project_lite::pin_project; +use tracing::debug; + +use super::extract::{ + extract_and_cast, extract_and_cast_opt, read_primitive_opt, read_str, ProvidesColumnByName, +}; +use crate::{DeltaResult, DeltaTableError}; + +pin_project! { + pub struct ReplayStream { + scanner: LogReplayScanner, + + #[pin] + stream: S, + } +} + +impl ReplayStream { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + scanner: LogReplayScanner::new(), + } + } +} + +impl Stream for ReplayStream +where + S: Stream>, +{ + type Item = DeltaResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_next(cx).map(|b| match b { + Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) { + Ok(filtered) => Some(Ok(filtered)), + Err(e) => Some(Err(e)), + }, + Some(Err(e)) => Some(Err(e)), + None => None, + }) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +#[derive(Debug)] +pub(super) struct FileInfo<'a> { + pub path: &'a str, + pub dv: Option>, +} + +#[derive(Debug)] +pub(super) struct DVInfo<'a> { + pub storage_type: &'a str, + pub path_or_inline_dv: &'a str, + pub offset: Option, + // pub size_in_bytes: i32, + // pub cardinality: i64, +} + +fn seen_key(info: &FileInfo<'_>) -> String { + if let Some(dv) = &info.dv { + if let Some(offset) = dv.offset { + format!( + "{}::{}{}@{offset}", + info.path, dv.storage_type, dv.path_or_inline_dv + ) + } else { + format!("{}::{}{}", info.path, dv.storage_type, dv.path_or_inline_dv) + } + } else { + format!("{}", info.path) + } +} + +struct LogReplayScanner { + // filter: Option, + /// A set of (data file path, dv_unique_id) pairs that have been seen thus + /// far in the log. This is used to filter out files with Remove actions as + /// well as duplicate entries in the log. + seen: HashSet, +} + +impl LogReplayScanner { + /// Creates a new [`LogReplayScanner`] instance. + fn new() -> Self { + Self { + seen: HashSet::new(), + } + } + + /// Takes a record batch of add and protentially remove actions and returns a + /// filtered batch of actions that contains only active rows. + fn process_files_batch<'a>( + &mut self, + batch: &'a RecordBatch, + is_log_batch: bool, + ) -> DeltaResult { + let add_col = extract_and_cast::(batch, "add")?; + let maybe_remove_col = extract_and_cast_opt::(batch, "remove"); + let filter = if let Some(remove_col) = maybe_remove_col { + or(&is_not_null(add_col)?, &is_not_null(remove_col)?)? + } else { + is_not_null(add_col)? + }; + + let filtered = filter_record_batch(batch, &filter)?; + let add_col = extract_and_cast::(&filtered, "add")?; + let maybe_remove_col = extract_and_cast_opt::(&filtered, "remove"); + let add_actions = read_file_info(add_col)?; + + let mut keep = Vec::with_capacity(filtered.num_rows()); + if let Some(remove_col) = maybe_remove_col { + let remove_actions = read_file_info(remove_col)?; + for (a, r) in add_actions.into_iter().zip(remove_actions.into_iter()) { + match (a, r) { + (Some(a), None) => { + let file_id = seen_key(&a); + if !self.seen.contains(&file_id) { + is_log_batch.then(|| self.seen.insert(file_id)); + keep.push(true); + } else { + keep.push(false); + } + } + (None, Some(r)) => { + self.seen.insert(seen_key(&r)); + keep.push(false); + } + // NOTE: there sould always be only one action per row. + (None, None) => debug!("WARNING: no action found for row"), + (Some(a), Some(r)) => { + debug!( + "WARNING: both add and remove actions found for row: {:?} {:?}", + a, r + ) + } + } + } + } else { + for a in add_actions.into_iter() { + if let Some(a) = a { + let file_id = seen_key(&a); + if !self.seen.contains(&file_id) { + is_log_batch.then(|| self.seen.insert(file_id)); + keep.push(true); + } else { + keep.push(false); + } + } + } + }; + + let projection = filtered + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| (field.name() == "add").then(|| idx)) + .collect::>(); + let filtered = filtered.project(&projection)?; + + Ok(filter_record_batch(&filtered, &BooleanArray::from(keep))?) + } +} + +fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult>>> { + let path = extract_and_cast::(arr, "path")?; + let mut adds = Vec::with_capacity(path.len()); + + let base = extract_and_cast_opt::(arr, "deletionVector"); + + if let Some(base) = base { + let storage_type = extract_and_cast::(base, "storageType")?; + let path_or_inline_dv = extract_and_cast::(base, "pathOrInlineDv")?; + let offset = extract_and_cast::(base, "offset")?; + // let size_in_bytes = extract_and_cast::(base, "sizeInBytes")?; + // let cardinality = extract_and_cast::(base, "cardinality")?; + + for idx in 0..path.len() { + let value = path + .is_valid(idx) + .then(|| { + let dv = if read_str(storage_type, idx).is_ok() { + Some(DVInfo { + storage_type: read_str(storage_type, idx)?, + path_or_inline_dv: read_str(path_or_inline_dv, idx)?, + offset: read_primitive_opt(offset, idx), + // size_in_bytes: read_primitive(size_in_bytes, idx)?, + // cardinality: read_primitive(cardinality, idx)?, + }) + } else { + None + }; + Ok::<_, DeltaTableError>(FileInfo { + path: read_str(path, idx)?, + dv, + }) + }) + .transpose()?; + adds.push(value); + } + } else { + for idx in 0..path.len() { + let value = path + .is_valid(idx) + .then(|| { + Ok::<_, DeltaTableError>(FileInfo { + path: read_str(path, idx)?, + dv: None, + }) + }) + .transpose()?; + adds.push(value); + } + } + + Ok(adds) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_cast::pretty::print_batches; + use deltalake_test::utils::*; + use futures::StreamExt; + use object_store::path::Path; + + use super::super::log_segment::LogSegment; + use super::*; + use crate::kernel::{actions::ActionType, StructType}; + + #[tokio::test] + async fn test_read_log_stram() -> TestResult { + let context = IntegrationContext::new(Box::new(LocalStorageIntegration::default()))?; + context.load_table(TestTables::Checkpoints).await?; + context.load_table(TestTables::Simple).await?; + let read_schema = Arc::new(StructType::new(vec![ + ActionType::Add.schema_field().clone(), + ActionType::Remove.schema_field().clone(), + ])); + + // let store = context + // .table_builder(TestTables::Checkpoints) + // .build_storage()? + // .object_store(); + + // let segment = LogSegment::try_new(&Path::default(), Some(9), store.as_ref()).await?; + // let mut batches = segment.log_stream(store, &read_schema, &Default::default())?; + // let mut scanner = LogReplayScanner::new(); + // while let Some(batch) = batches.next().await { + // let batch = batch?; + // print_batches(&[batch.clone()])?; + // let filtered = scanner.process_batch(&batch, true)?; + // print_batches(&[filtered])?; + // } + + // let segment = LogSegment::try_new(&Path::default(), Some(9), store.as_ref()).await?; + // let mut batches = segment.log_stream(store, &read_schema, &Default::default())?; + // let mut scanner = LogReplayScanner::new(); + // while let Some(batch) = batches.next().await { + // let batch = batch?; + // print_batches(&[batch.clone()])?; + // let filtered = scanner.process_batch(&batch, true)?; + // print_batches(&[filtered])?; + // } + + let store = context + .table_builder(TestTables::Simple) + .build_storage()? + .object_store(); + + let segment = LogSegment::try_new(&Path::default(), None, store.as_ref()).await?; + let mut batches = segment.commit_stream(store, &read_schema, &Default::default())?; + let mut scanner = LogReplayScanner::new(); + while let Some(batch) = batches.next().await { + let batch = batch?; + print_batches(&[batch.clone()])?; + let filtered = scanner.process_files_batch(&batch, true)?; + print_batches(&[filtered])?; + } + + Ok(()) + } +} diff --git a/crates/deltalake-test/src/utils.rs b/crates/deltalake-test/src/utils.rs index f56197d1bc..6aac4b7a95 100644 --- a/crates/deltalake-test/src/utils.rs +++ b/crates/deltalake-test/src/utils.rs @@ -303,3 +303,37 @@ pub mod hdfs_cli { child.wait() } } + +#[macro_export] +macro_rules! assert_batches_sorted_eq { + ($EXPECTED_LINES: expr, $CHUNKS: expr) => { + let mut expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); + + // sort except for header + footer + let num_lines = expected_lines.len(); + if num_lines > 3 { + expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() + } + + let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + .unwrap() + .to_string(); + // fix for windows: \r\n --> + + let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); + + // sort except for header + footer + let num_lines = actual_lines.len(); + if num_lines > 3 { + actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() + } + + assert_eq!( + expected_lines, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + }; +} + +pub use assert_batches_sorted_eq;