From 521f59429da118ae3e511511cf6101b8335471d1 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 5 Jan 2024 16:24:57 +0100 Subject: [PATCH] feat: basic checkpoint file reading --- .../src/kernel/snapshot/log_segment.rs | 32 +++++++++++-- .../deltalake-core/src/kernel/snapshot/mod.rs | 48 +++++++++++++++---- .../src/kernel/snapshot/replay.rs | 22 +++++++-- 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs index c66a5f4400..374d20e86a 100644 --- a/crates/deltalake-core/src/kernel/snapshot/log_segment.rs +++ b/crates/deltalake-core/src/kernel/snapshot/log_segment.rs @@ -12,6 +12,8 @@ use object_store::path::Path; use object_store::{ Error as ObjectStoreError, ObjectMeta, ObjectStore, Result as ObjectStoreResult, }; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -19,6 +21,7 @@ use crate::kernel::schema::Schema; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; +const BATCH_SIZE: usize = 1024; lazy_static! { static ref CHECKPOINT_FILE_PATTERN: Regex = @@ -63,7 +66,7 @@ impl PathExt for Path { } } -pub(crate) struct LogSegment { +pub(super) struct LogSegment { pub version: i64, pub log_root: Path, pub commit_files: Vec, @@ -72,7 +75,7 @@ pub(crate) struct LogSegment { impl LogSegment { /// Try to create a new [`LogSegment`] - pub async fn try_new( + pub(super) async fn try_new( table_root: &Path, version: Option, store: &dyn ObjectStore, @@ -125,7 +128,7 @@ impl LogSegment { config: &DeltaTableConfig, ) -> DeltaResult>> { let decoder = ReaderBuilder::new(Arc::new(read_schema.try_into()?)) - .with_batch_size(1024) + .with_batch_size(BATCH_SIZE) .build_decoder()?; let stream = futures::stream::iter(self.commit_files.iter()) @@ -137,6 +140,29 @@ impl LogSegment { Ok(decode_stream(decoder, stream).boxed()) } + + pub(super) fn checkpoint_stream( + &self, + store: Arc, + _read_schema: &Schema, + config: &DeltaTableConfig, + ) -> BoxStream<'_, DeltaResult> { + futures::stream::iter(self.checkpoint_files.clone()) + .map(move |meta| { + let store = store.clone(); + async move { + let reader = ParquetObjectReader::new(store, meta); + let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; + builder.with_batch_size(BATCH_SIZE).build() + } + }) + .buffered(config.log_buffer_size) + .try_flatten() + .map_err(Into::into) + .boxed() + } } fn decode_stream> + Unpin>( diff --git a/crates/deltalake-core/src/kernel/snapshot/mod.rs b/crates/deltalake-core/src/kernel/snapshot/mod.rs index 2075dbe961..bde42fdb12 100644 --- a/crates/deltalake-core/src/kernel/snapshot/mod.rs +++ b/crates/deltalake-core/src/kernel/snapshot/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; -use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; +use futures::stream::BoxStream; use object_store::path::Path; use object_store::ObjectStore; @@ -44,22 +44,34 @@ impl Snapshot { /// Get the files in the snapshot pub fn files(&self) -> DeltaResult>>> { - let read_schema = Arc::new(StructType::new(vec![ - ActionType::Add.schema_field().clone(), - ActionType::Remove.schema_field().clone(), - ])); - - let batches = + lazy_static::lazy_static! { + static ref COMMIT_SCHEMA: StructType = StructType::new(vec![ + ActionType::Add.schema_field().clone(), + ActionType::Remove.schema_field().clone(), + ]); + static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![ + ActionType::Add.schema_field().clone(), + ]); + } + + let log_stream = self.log_segment - .commit_stream(self.store.clone(), &read_schema, &self.config)?; + .commit_stream(self.store.clone(), &COMMIT_SCHEMA, &self.config)?; + + let checkpoint_stream = self.log_segment.checkpoint_stream( + self.store.clone(), + &CHECKPOINT_SCHEMA, + &self.config, + ); - Ok(ReplayStream::new(batches)) + Ok(ReplayStream::new(log_stream, checkpoint_stream)) } } #[cfg(test)] mod tests { use deltalake_test::utils::*; + use futures::TryStreamExt; use super::*; @@ -91,6 +103,24 @@ mod tests { ]; assert_batches_sorted_eq!(expected, &batches); + let store = context + .table_builder(TestTables::Checkpoints) + .build_storage()? + .object_store(); + + for version in 0..=12 { + let snapshot = Snapshot::try_new( + &Path::default(), + store.clone(), + Default::default(), + Some(version), + ) + .await?; + let batches = snapshot.files()?.try_collect::>().await?; + let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::(); + assert_eq!(num_files, version); + } + Ok(()) } } diff --git a/crates/deltalake-core/src/kernel/snapshot/replay.rs b/crates/deltalake-core/src/kernel/snapshot/replay.rs index 23b7f3c478..58e0e68384 100644 --- a/crates/deltalake-core/src/kernel/snapshot/replay.rs +++ b/crates/deltalake-core/src/kernel/snapshot/replay.rs @@ -21,14 +21,18 @@ pin_project! { #[pin] stream: S, + + #[pin] + checkpoint: S, } } impl ReplayStream { - pub(super) fn new(stream: S) -> Self { + pub(super) fn new(stream: S, checkpoint: S) -> Self { Self { stream, scanner: LogReplayScanner::new(), + checkpoint, } } } @@ -41,14 +45,26 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - this.stream.poll_next(cx).map(|b| match b { + let res = this.stream.poll_next(cx).map(|b| match b { Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, true) { Ok(filtered) => Some(Ok(filtered)), Err(e) => Some(Err(e)), }, Some(Err(e)) => Some(Err(e)), None => None, - }) + }); + if matches!(res, Poll::Ready(None)) { + this.checkpoint.poll_next(cx).map(|b| match b { + Some(Ok(batch)) => match this.scanner.process_files_batch(&batch, false) { + Ok(filtered) => Some(Ok(filtered)), + Err(e) => Some(Err(e)), + }, + Some(Err(e)) => Some(Err(e)), + None => None, + }) + } else { + res + } } fn size_hint(&self) -> (usize, Option) {