Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datafusion-7181): enable slicing of rows #4817

Closed
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,46 @@ impl Rows {
+ self.buffer.len()
+ self.offsets.len() * std::mem::size_of::<usize>()
}

/// Return a new Rows sliced according to `offset` and `length`
/// This copies memory from the original Rows
///
/// # Panics
///
/// Panics if `offset` with `length` is greater than row length.
pub fn slice(&self, offset: usize, length: usize) -> Self {
assert!(
offset + length <= self.num_rows(),
wiedld marked this conversation as resolved.
Show resolved Hide resolved
"offset + length is greater than row length"
);

let Rows {
buffer: current_buffer,
offsets: current_offsets,
config,
} = self;

let start = current_offsets[offset];
let end = current_offsets[offset + length];
let mut buffer: Vec<u8> = vec![0_u8; end - start];
buffer.copy_from_slice(&current_buffer[start..end]);
wiedld marked this conversation as resolved.
Show resolved Hide resolved

let mut offsets: Vec<usize> = vec![0_usize; length + 1];
offsets.copy_from_slice(&current_offsets[offset..offset + length + 1]);
// mutate offsets to match new buffer length
offsets = offsets.iter_mut().map(|x| *x - start).collect();

if length > 0 {
assert_eq!(offsets.last().unwrap(), &buffer.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just sanity checks correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly.

assert_eq!(offsets.first().unwrap(), &0_usize);
}

Self {
buffer,
offsets,
config: config.clone(),
}
}
}

impl<'a> IntoIterator for &'a Rows {
Expand Down Expand Up @@ -2224,6 +2264,101 @@ mod tests {
}
}

#[test]
fn test_row_slice() {
let mut converter = RowConverter::new(vec![
SortField::new(DataType::Int16),
SortField::new(DataType::Float32),
])
.unwrap();

let cols = [
Arc::new(Int16Array::from_iter([Some(1), Some(2), Some(3), Some(4)]))
as ArrayRef,
Arc::new(Float32Array::from_iter([
Some(1.1),
Some(1.2),
Some(1.3),
Some(1.4),
])) as ArrayRef,
];
let rows = converter.convert_columns(&cols).unwrap();

let rows_first_half = rows.slice(0, 2);
let rows_last_one = rows.slice(3, 1);

assert_eq!(rows_first_half.num_rows(), 2);
assert_eq!(rows_last_one.num_rows(), 1);

let expected = converter
.convert_columns(&[
Arc::new(Int16Array::from_iter([Some(1), Some(2)])) as ArrayRef,
Arc::new(Float32Array::from_iter([Some(1.1), Some(1.2)])) as ArrayRef,
])
.unwrap();
assert_eq!(
rows_first_half
.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
expected
.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
);

let expected = converter
.convert_columns(&[
Arc::new(Int16Array::from_iter([Some(4)])) as ArrayRef,
Arc::new(Float32Array::from_iter([Some(1.4)])) as ArrayRef,
])
.unwrap();
assert_eq!(
rows_last_one
.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
expected
.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
);

assert_eq!(
rows.num_rows(),
4,
"expected the original rows to remain intact"
);
let expected = converter.convert_columns(&cols).unwrap();
assert_eq!(
rows.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
expected
.into_iter()
.map(|row| row.as_ref().to_owned())
.collect::<Vec<_>>(),
);
}

#[test]
#[should_panic(expected = "offset + length is greater than row length")]
fn test_row_slice_invalid_slice() {
let mut converter = RowConverter::new(vec![
SortField::new(DataType::Int16),
SortField::new(DataType::Float32),
])
.unwrap();

let cols = [
Arc::new(Int16Array::from_iter([Some(1), Some(2)])) as ArrayRef,
Arc::new(Float32Array::from_iter([Some(1.1), Some(1.2)])) as ArrayRef,
];
let rows = converter.convert_columns(&cols).unwrap();

rows.slice(1, 2);
}

fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
where
K: ArrowPrimitiveType,
Expand Down