diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 1a01d0c6ec..50983d8400 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -644,7 +644,15 @@ impl<'a> DeltaScanBuilder<'a> { let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { object_store_url: self.log_store.object_store_url(), file_schema, - file_groups: file_groups.into_values().collect(), + // If all files were filtered out, we still need to emit at least one partition to + // pass datafusion sanity checks. + // + // See https://github.com/apache/datafusion/issues/11322 + file_groups: if file_groups.is_empty() { + vec![vec![]] + } else { + file_groups.into_values().collect() + }, statistics: stats, projection: self.projection.cloned(), limit: self.limit, @@ -2570,55 +2578,24 @@ mod tests { } #[tokio::test] - async fn parent_distribution_requirements_bug() { - let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); - let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); - - let path = "/tmp/table"; - - let mut table = CreateBuilder::new() - .with_location(path) - .with_columns([StructField { - name: "a".to_string(), - data_type: delta_kernel::schema::DataType::STRING, - nullable: false, - metadata: HashMap::new(), - }]) - .await - .unwrap(); - - table = crate::DeltaOps(table) - .write(vec![batch.clone()]) - .with_save_mode(crate::protocol::SaveMode::Append) - .await - .unwrap(); - - table = crate::DeltaOps(table) - .write(vec![batch]) - .with_save_mode(crate::protocol::SaveMode::Append) + async fn passes_sanity_checker_when_all_files_filtered() { + // Run a query that filters out all files and sorts. + // Verify that it returns an empty set of rows without panicing. + // + // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which + // datafusion rejected. + let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types") .await .unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table)).unwrap(); - let config = SessionConfig::default(); - let ctx = SessionContext::new_with_config(config); - - ctx.register_table("table", Arc::new(table)).unwrap(); - ctx.sql("SELECT * FROM `table` WHERE `a` > 's' ORDER BY `a` ASC") - .await - .unwrap() - .collect() + let df = ctx + .sql("select * from test where c3 = 100 ORDER BY c1 ASC") .await .unwrap(); + let actual = df.collect().await.unwrap(); - let re_opened_table = open_table(path).await.unwrap(); - ctx.register_table("re_opened_table", Arc::new(re_opened_table)) - .unwrap(); - ctx.sql("SELECT * FROM `re_opened_table` WHERE `a` > 's' ORDER BY `a` ASC") - .await - .unwrap() - .collect() - .await - .unwrap(); - assert!(false); + assert_eq!(actual.len(), 0); } }