diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index e692dd054b..d430ed8655 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1818,10 +1818,15 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { + use crate::kernel::log_segment::PathExt; + use crate::logstore::default_logstore::DefaultLogStore; use crate::operations::write::SchemaMode; + use crate::storage::ObjectStoreRef; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; use arrow::datatypes::{Field, Schema}; + use arrow_array::cast::AsArray; + use bytes::Bytes; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; @@ -1830,9 +1835,15 @@ mod tests { use datafusion_expr::lit; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; - use object_store::path::Path; + use futures::{stream::BoxStream, StreamExt}; + use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, + }; use serde_json::json; - use std::ops::Deref; + use std::fmt::{Debug, Display, Formatter}; + use std::ops::{Deref, Range}; + use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use super::*; @@ -2676,4 +2687,242 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_delta_scan_uses_parquet_column_pruning() { + let small: Arc = Arc::new(arrow::array::StringArray::from(vec!["a"])); + let large: Arc = Arc::new(arrow::array::StringArray::from(vec!["b" + .repeat(1024) + .as_str()])); + let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap(); + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + + let (object_store, mut operations) = + RecordingObjectStore::new(table.log_store().object_store()); + let log_store = + DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone()); + let provider = DeltaTableProvider::try_new( + table.snapshot().unwrap().clone(), + Arc::new(log_store), + config, + ) + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + let state = ctx.state(); + + let df = ctx.sql("select small from test").await.unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + + let mut stream = plan.execute(0, state.task_ctx()).unwrap(); + let Some(Ok(batch)) = stream.next().await else { + panic!() + }; + assert!(stream.next().await.is_none()); + assert_eq!(1, batch.num_columns()); + assert_eq!(1, batch.num_rows()); + let small = batch.column_by_name("small").unwrap().as_string::(); + assert_eq!("a", small.iter().next().unwrap().unwrap()); + + let expected = vec![ + ObjectStoreOperation::GetRange(LocationType::Data, 4920..4928), + ObjectStoreOperation::GetRange(LocationType::Data, 2399..4920), + ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..58]), + ]; + let mut actual = Vec::new(); + operations.recv_many(&mut actual, 3).await; + assert_eq!(expected, actual); + } + + /// Records operations made by the inner object store on a channel obtained at construction + struct RecordingObjectStore { + inner: ObjectStoreRef, + operations: UnboundedSender, + } + + impl RecordingObjectStore { + /// Returns an object store and a channel recording all operations made by the inner object store + fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver) { + let (operations, operations_receiver) = unbounded_channel(); + (Self { inner, operations }, operations_receiver) + } + } + + impl Display for RecordingObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Display::fmt(&self.inner, f) + } + } + + impl Debug for RecordingObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Debug::fmt(&self.inner, f) + } + } + + #[derive(Debug, PartialEq)] + enum ObjectStoreOperation { + GetRanges(LocationType, Vec>), + GetRange(LocationType, Range), + GetOpts(LocationType), + Get(LocationType), + } + + #[derive(Debug, PartialEq)] + enum LocationType { + Data, + Commit, + } + + impl From<&Path> for LocationType { + fn from(value: &Path) -> Self { + if value.is_commit_file() { + LocationType::Commit + } else if value.to_string().starts_with("part-") { + LocationType::Data + } else { + panic!("Unknown location type: {:?}", value) + } + } + } + + // Currently only read operations are recorded. Extend as necessary. + #[async_trait] + impl ObjectStore for RecordingObjectStore { + async fn put( + &self, + location: &Path, + payload: PutPayload, + ) -> object_store::Result { + self.inner.put(location, payload).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get(&self, location: &Path) -> object_store::Result { + self.operations + .send(ObjectStoreOperation::Get(location.into())) + .unwrap(); + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + self.operations + .send(ObjectStoreOperation::GetOpts(location.into())) + .unwrap(); + self.inner.get_opts(location, options).await + } + + async fn get_range( + &self, + location: &Path, + range: Range, + ) -> object_store::Result { + self.operations + .send(ObjectStoreOperation::GetRange( + location.into(), + range.clone(), + )) + .unwrap(); + self.inner.get_range(location, range).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + self.operations + .send(ObjectStoreOperation::GetRanges( + location.into(), + ranges.to_vec(), + )) + .unwrap(); + self.inner.get_ranges(location, ranges).await + } + + async fn head(&self, location: &Path) -> object_store::Result { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, object_store::Result>, + ) -> BoxStream<'a, object_store::Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, object_store::Result> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename_if_not_exists(from, to).await + } + } }