diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 8d64f85fb2..50983d8400 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -644,7 +644,15 @@ impl<'a> DeltaScanBuilder<'a> { let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { object_store_url: self.log_store.object_store_url(), file_schema, - file_groups: file_groups.into_values().collect(), + // If all files were filtered out, we still need to emit at least one partition to + // pass datafusion sanity checks. + // + // See https://github.com/apache/datafusion/issues/11322 + file_groups: if file_groups.is_empty() { + vec![vec![]] + } else { + file_groups.into_values().collect() + }, statistics: stats, projection: self.projection.cloned(), limit: self.limit, @@ -1764,8 +1772,11 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { - use arrow_array::StructArray; - use arrow_schema::Schema; + use crate::operations::create::CreateBuilder; + use crate::operations::write::SchemaMode; + use crate::writer::test_utils::get_delta_schema; + use arrow::array::StructArray; + use arrow::datatypes::{Field, Schema}; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; @@ -1774,13 +1785,12 @@ mod tests { use datafusion_expr::lit; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; + use delta_kernel::schema::StructField; use object_store::path::Path; use serde_json::json; use std::ops::Deref; use super::*; - use crate::operations::write::SchemaMode; - use crate::writer::test_utils::get_delta_schema; // test deserialization of serialized partition values. // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization @@ -2566,4 +2576,26 @@ mod tests { Ok(true) } } + + #[tokio::test] + async fn passes_sanity_checker_when_all_files_filtered() { + // Run a query that filters out all files and sorts. + // Verify that it returns an empty set of rows without panicing. + // + // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which + // datafusion rejected. + let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types") + .await + .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table)).unwrap(); + + let df = ctx + .sql("select * from test where c3 = 100 ORDER BY c1 ASC") + .await + .unwrap(); + let actual = df.collect().await.unwrap(); + + assert_eq!(actual.len(), 0); + } }