Skip to content

Commit

Permalink
use lexsort to verify batch order
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 18, 2023
1 parent 8f598ba commit 77dad69
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,49 +385,53 @@ pub(crate) fn validate_bounds(
key_hash: &ArrayRef,
up_to_time: RowTime,
) -> error_stack::Result<(), Error> {
use arrow::compute::SortColumn;

if time.len() == 0 {
// No more validation necessary for empty batches.
return Ok(());
}

let time: &TimestampNanosecondArray = time.as_primitive();
let subsort: &UInt64Array = subsort.as_primitive();
let key_hash: &UInt64Array = key_hash.as_primitive();

let mut prev_time = time.value(0);
let mut prev_subsort = subsort.value(0);
let mut prev_key_hash = key_hash.value(0);

for i in 1..time.len() {
let curr_time = time.value(i);
let curr_subsort = subsort.value(i);
let curr_key_hash = key_hash.value(i);

let order = prev_time
.cmp(&curr_time)
.then(prev_subsort.cmp(&curr_subsort))
.then(prev_key_hash.cmp(&curr_key_hash));

error_stack::ensure!(
order.is_lt(),
Error::internal_msg(format!(
"Expected data[i - 1] < data[i], but ({}, {}, {}) >= ({}, {}, {})",
prev_time, prev_subsort, prev_key_hash, curr_time, curr_subsort, curr_key_hash
))
);
let sort_indices = arrow::compute::lexsort_to_indices(
&[
SortColumn {
values: time.clone(),
options: None,
},
SortColumn {
values: subsort.clone(),
options: None,
},
SortColumn {
values: key_hash.clone(),
options: None,
},
],
None,
)
.into_report()
.change_context(Error::internal_msg("lexsort_to_indices".to_owned()))?;

let is_sorted = sort_indices
.values()
.iter()
.enumerate()
.all(|(i, x)| i == *x as usize);

prev_time = curr_time;
prev_subsort = curr_subsort;
prev_key_hash = curr_key_hash;
}
error_stack::ensure!(
is_sorted,
Error::internal_msg("Expected sorted batch".to_owned())
);

let curr_time: i64 = up_to_time.into();
let order = prev_time.cmp(&curr_time);
let time: &TimestampNanosecondArray = time.as_primitive();
let last_time = time.values()[time.len() - 1];
let up_to_time: i64 = up_to_time.into();
let order = last_time.cmp(&up_to_time);
error_stack::ensure!(
order.is_le(),
Error::internal_msg(format!(
"Expected last data <= upper bound, but ({}) > ({})",
prev_time, curr_time
last_time, up_to_time
))
);

Expand Down

0 comments on commit 77dad69

Please sign in to comment.