Skip to content

Commit

Permalink
Fix check_not_null_constraints null detection (apache#13033)
Browse files Browse the repository at this point in the history
* Fix function name typo

* Fix check_not_null_constraints null detection

`check_not_null_constraints` (aka `check_not_null_contraits`) checked
for null using `Array::null_count` which does not return real null
count.

* Drop assertor dependency
  • Loading branch information
findepi authored Oct 23, 2024
1 parent a4e6b07 commit 211e76e
Showing 1 changed file with 144 additions and 7 deletions.
151 changes: 144 additions & 7 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -852,7 +853,7 @@ pub fn execute_input_stream(
Ok(Box::pin(RecordBatchStreamAdapter::new(
sink_schema,
input_stream
.map(move |batch| check_not_null_contraits(batch?, &risky_columns)),
.map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
)))
}
}
Expand All @@ -872,7 +873,7 @@ pub fn execute_input_stream(
/// This function iterates over the specified column indices and ensures that none
/// of the columns contain null values. If any column contains null values, an error
/// is returned.
pub fn check_not_null_contraits(
pub fn check_not_null_constraints(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
Expand All @@ -885,7 +886,13 @@ pub fn check_not_null_contraits(
);
}

if batch.column(index).null_count() > 0 {
if batch
.column(index)
.logical_nulls()
.map(|nulls| nulls.null_count())
.unwrap_or_default()
> 0
{
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
Expand Down Expand Up @@ -920,11 +927,11 @@ pub enum CardinalityEffect {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{DictionaryArray, Int32Array, NullArray, RunArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};

use datafusion_common::{Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};

Expand Down Expand Up @@ -1068,6 +1075,136 @@ mod tests {
fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
let _ = plan.name();
}
}

// pub mod test;
#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_reject_null() -> Result<()> {
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
// some null value inside REE array
let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
let run_end_array = RunArray::try_new(&run_ends, &values)?;
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
run_end_array.data_type().to_owned(),
true,
)])),
vec![Arc::new(run_end_array)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
let keys = Int32Array::from(vec![0, 1, 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
// some null value marked out by dictionary array
let values = Arc::new(Int32Array::from(vec![
Some(1),
None, // this null value is masked by dictionary keys
Some(3),
Some(4),
]));
let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_on_null_type() -> Result<()> {
// null value of Null type
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
vec![Arc::new(NullArray::new(3))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

fn assert_starts_with(actual: impl AsRef<str>, expected_prefix: impl AsRef<str>) {
let actual = actual.as_ref();
let expected_prefix = expected_prefix.as_ref();
assert!(
actual.starts_with(expected_prefix),
"Expected '{}' to start with '{}'",
actual,
expected_prefix
);
}
}

0 comments on commit 211e76e

Please sign in to comment.