Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: datafusion sanity checker passes when all files filtered out #2830

42 changes: 37 additions & 5 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,15 @@ impl<'a> DeltaScanBuilder<'a> {
let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
file_groups: if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
Expand Down Expand Up @@ -1764,8 +1772,11 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use arrow_array::StructArray;
use arrow_schema::Schema;
use crate::operations::create::CreateBuilder;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
Expand All @@ -1774,13 +1785,12 @@ mod tests {
use datafusion_expr::lit;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use delta_kernel::schema::StructField;
use object_store::path::Path;
use serde_json::json;
use std::ops::Deref;

use super::*;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;

// test deserialization of serialized partition values.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
Expand Down Expand Up @@ -2566,4 +2576,26 @@ mod tests {
Ok(true)
}
}

#[tokio::test]
async fn passes_sanity_checker_when_all_files_filtered() {
// Run a query that filters out all files and sorts.
// Verify that it returns an empty set of rows without panicing.
//
// Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
// datafusion rejected.
let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table)).unwrap();

let df = ctx
.sql("select * from test where c3 = 100 ORDER BY c1 ASC")
.await
.unwrap();
let actual = df.collect().await.unwrap();

assert_eq!(actual.len(), 0);
}
}
Loading