diff --git a/crates/sparrow-batch/src/batch.rs b/crates/sparrow-batch/src/batch.rs index 8fc8dbdba..82f311d4d 100644 --- a/crates/sparrow-batch/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -385,49 +385,53 @@ pub(crate) fn validate_bounds( key_hash: &ArrayRef, up_to_time: RowTime, ) -> error_stack::Result<(), Error> { + use arrow::compute::SortColumn; + if time.len() == 0 { // No more validation necessary for empty batches. return Ok(()); } - let time: &TimestampNanosecondArray = time.as_primitive(); - let subsort: &UInt64Array = subsort.as_primitive(); - let key_hash: &UInt64Array = key_hash.as_primitive(); - - let mut prev_time = time.value(0); - let mut prev_subsort = subsort.value(0); - let mut prev_key_hash = key_hash.value(0); - - for i in 1..time.len() { - let curr_time = time.value(i); - let curr_subsort = subsort.value(i); - let curr_key_hash = key_hash.value(i); - - let order = prev_time - .cmp(&curr_time) - .then(prev_subsort.cmp(&curr_subsort)) - .then(prev_key_hash.cmp(&curr_key_hash)); - - error_stack::ensure!( - order.is_lt(), - Error::internal_msg(format!( - "Expected data[i - 1] < data[i], but ({}, {}, {}) >= ({}, {}, {})", - prev_time, prev_subsort, prev_key_hash, curr_time, curr_subsort, curr_key_hash - )) - ); + let sort_indices = arrow::compute::lexsort_to_indices( + &[ + SortColumn { + values: time.clone(), + options: None, + }, + SortColumn { + values: subsort.clone(), + options: None, + }, + SortColumn { + values: key_hash.clone(), + options: None, + }, + ], + None, + ) + .into_report() + .change_context(Error::internal_msg("lexsort_to_indices".to_owned()))?; + + let is_sorted = sort_indices + .values() + .iter() + .enumerate() + .all(|(i, x)| i == *x as usize); - prev_time = curr_time; - prev_subsort = curr_subsort; - prev_key_hash = curr_key_hash; - } + error_stack::ensure!( + is_sorted, + Error::internal_msg("Expected sorted batch".to_owned()) + ); - let curr_time: i64 = up_to_time.into(); - let order = prev_time.cmp(&curr_time); + let time: &TimestampNanosecondArray = time.as_primitive(); + let last_time = time.values()[time.len() - 1]; + let up_to_time: i64 = up_to_time.into(); + let order = last_time.cmp(&up_to_time); error_stack::ensure!( order.is_le(), Error::internal_msg(format!( "Expected last data <= upper bound, but ({}) > ({})", - prev_time, curr_time + last_time, up_to_time )) );