Skip to content

Commit

Permalink
Failing test that demonstrates distribution requirements bug
Browse files Browse the repository at this point in the history
  • Loading branch information
adamfaulkner-at authored and ion-elgreco committed Oct 3, 2024
1 parent 8f4232f commit 59ef7e0
Showing 1 changed file with 59 additions and 4 deletions.
63 changes: 59 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,8 +1764,11 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use arrow_array::StructArray;
use arrow_schema::Schema;
use crate::operations::create::CreateBuilder;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
Expand All @@ -1774,13 +1777,12 @@ mod tests {
use datafusion_expr::lit;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use delta_kernel::schema::StructField;
use object_store::path::Path;
use serde_json::json;
use std::ops::Deref;

use super::*;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;

// test deserialization of serialized partition values.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
Expand Down Expand Up @@ -2566,4 +2568,57 @@ mod tests {
Ok(true)
}
}

#[tokio::test]
async fn parent_distribution_requirements_bug() {
let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();

let path = "/tmp/table";

let mut table = CreateBuilder::new()
.with_location(path)
.with_columns([StructField {
name: "a".to_string(),
data_type: delta_kernel::schema::DataType::STRING,
nullable: false,
metadata: HashMap::new(),
}])
.await
.unwrap();

table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

table = crate::DeltaOps(table)
.write(vec![batch])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = SessionConfig::default();
let ctx = SessionContext::new_with_config(config);

ctx.register_table("table", Arc::new(table)).unwrap();
ctx.sql("SELECT * FROM `table` WHERE `a` > 's' ORDER BY `a` ASC")
.await
.unwrap()
.collect()
.await
.unwrap();

let re_opened_table = open_table(path).await.unwrap();
ctx.register_table("re_opened_table", Arc::new(re_opened_table))
.unwrap();
ctx.sql("SELECT * FROM `re_opened_table` WHERE `a` > 's' ORDER BY `a` ASC")
.await
.unwrap()
.collect()
.await
.unwrap();
assert!(false);
}
}

0 comments on commit 59ef7e0

Please sign in to comment.