Skip to content

Commit

Permalink
Add column_offsets to row format
Browse files Browse the repository at this point in the history
This allows a user to more easily extract the encoded data for a single
value for the row format.
  • Loading branch information
judahrand committed Sep 22, 2023
1 parent f9cd26f commit a729f47
Showing 1 changed file with 63 additions and 23 deletions.
86 changes: 63 additions & 23 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl Codec {

let owned = OwnedRow {
data: nulls.buffer.into(),
offsets: nulls.column_offsets.first().unwrap()[..].into(),
config: nulls.config,
};
Ok(Self::Dictionary(converter, owned))
Expand Down Expand Up @@ -436,6 +437,7 @@ impl Codec {
let nulls = converter.convert_columns(&nulls)?;
let owned = OwnedRow {
data: nulls.buffer.into(),
offsets: nulls.column_offsets.first().unwrap()[..].into(),
config: nulls.config,
};

Expand Down Expand Up @@ -641,7 +643,19 @@ impl RowConverter {
.collect::<Result<Vec<_>, _>>()?;

let write_offset = rows.num_rows();
let lengths = row_lengths(columns, &encoders);
let lengths = column_lengths(columns, &encoders);

// These are offsets within the row's data buffer.
for (row_idx, col_lengths) in lengths.iter().enumerate() {
rows.column_offsets.reserve(columns.len() + 1);
rows.column_offsets[row_idx].push(0);

let mut cur_offset = 0_usize;
for l in col_lengths {
cur_offset = cur_offset.checked_add(*l).expect("overflow");
rows.column_offsets[row_idx].push(cur_offset);
}
}

// We initialize the offsets shifted down by one row index.
//
Expand All @@ -662,7 +676,9 @@ impl RowConverter {
let mut cur_offset = rows.offsets[write_offset];
for l in lengths {
rows.offsets.push(cur_offset);
cur_offset = cur_offset.checked_add(l).expect("overflow");
cur_offset = cur_offset
.checked_add(l.iter().sum::<usize>())
.expect("overflow");
}

// Note this will not zero out any trailing data in `rows.buffer`,
Expand Down Expand Up @@ -755,6 +771,7 @@ impl RowConverter {

Rows {
offsets,
column_offsets: vec![vec![]; row_capacity],
buffer: Vec::with_capacity(data_capacity),
config: RowConfig {
fields: self.fields.clone(),
Expand Down Expand Up @@ -819,6 +836,7 @@ impl RowParser {
pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
Row {
data: bytes,
offsets: &[0],
config: &self.config,
}
}
Expand All @@ -842,6 +860,8 @@ pub struct Rows {
buffer: Vec<u8>,
/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
offsets: Vec<usize>,
/// Column `j` of row `i` has data `&buffer[offsets[i]..offsets[i+1]][column_offsets[i][j]..column_offsets[i][j+1]]`
column_offsets: Vec<Vec<usize>>,
/// The config for these rows
config: RowConfig,
}
Expand All @@ -864,6 +884,7 @@ impl Rows {
let start = self.offsets[row];
Row {
data: &self.buffer[start..end],
offsets: &self.column_offsets[row],
config: &self.config,
}
}
Expand Down Expand Up @@ -962,6 +983,7 @@ impl<'a> DoubleEndedIterator for RowsIter<'a> {
#[derive(Debug, Copy, Clone)]
pub struct Row<'a> {
data: &'a [u8],
offsets: &'a [usize],
config: &'a RowConfig,
}

Expand All @@ -970,6 +992,7 @@ impl<'a> Row<'a> {
pub fn owned(&self) -> OwnedRow {
OwnedRow {
data: self.data.into(),
offsets: self.offsets.into(),
config: self.config.clone(),
}
}
Expand Down Expand Up @@ -1020,6 +1043,7 @@ impl<'a> AsRef<[u8]> for Row<'a> {
#[derive(Debug, Clone)]
pub struct OwnedRow {
data: Box<[u8]>,
offsets: Box<[usize]>,
config: RowConfig,
}

Expand All @@ -1030,6 +1054,7 @@ impl OwnedRow {
pub fn row(&self) -> Row<'_> {
Row {
data: &self.data,
offsets: &self.offsets,
config: &self.config,
}
}
Expand Down Expand Up @@ -1084,42 +1109,42 @@ fn null_sentinel(options: SortOptions) -> u8 {
}

/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
fn column_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<Vec<usize>> {
use fixed::FixedLengthEncoding;

let num_columns = cols.len();
let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
let mut lengths = vec![0; num_rows];

let mut lengths = vec![Vec::with_capacity(num_columns); num_rows];
for (array, encoder) in cols.iter().zip(encoders) {
match encoder {
Encoder::Stateless => {
downcast_primitive_array! {
array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
DataType::Null => {},
DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
array => lengths.iter_mut().for_each(|x| x.push(fixed::encoded_len(array))),
DataType::Null => {lengths.iter_mut().for_each(|x| x.push(0))},
DataType::Boolean => lengths.iter_mut().for_each(|x| x.push(bool::ENCODED_LEN)),
DataType::Binary => as_generic_binary_array::<i32>(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
.for_each(|(slice, length)| length.push(variable::encoded_len(slice))),
DataType::LargeBinary => as_generic_binary_array::<i64>(array)
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| *length += variable::encoded_len(slice)),
.for_each(|(slice, length)| length.push(variable::encoded_len(slice))),
DataType::Utf8 => array.as_string::<i32>()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
length.push(variable::encoded_len(slice.map(|x| x.as_bytes())))
}),
DataType::LargeUtf8 => array.as_string::<i64>()
.iter()
.zip(lengths.iter_mut())
.for_each(|(slice, length)| {
*length += variable::encoded_len(slice.map(|x| x.as_bytes()))
length.push(variable::encoded_len(slice.map(|x| x.as_bytes())))
}),
DataType::FixedSizeBinary(len) => {
let len = len.to_usize().unwrap();
lengths.iter_mut().for_each(|x| *x += 1 + len)
lengths.iter_mut().for_each(|x| x.push(1 + len))
}
_ => unreachable!(),
}
Expand All @@ -1128,30 +1153,45 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
downcast_dictionary_array! {
array => {
for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
*length += match v {
length.push(match v {
Some(k) => values.row(k.as_usize()).data.len(),
None => null.data.len(),
}
})
}
}
_ => unreachable!(),
}
}
Encoder::Struct(rows, null) => {
let array = as_struct_array(array);
lengths.iter_mut().enumerate().for_each(|(idx, length)| {
match array.is_valid(idx) {
true => *length += 1 + rows.row(idx).as_ref().len(),
false => *length += 1 + null.data.len(),
}
});
lengths
.iter_mut()
.enumerate()
.for_each(|(row_idx, length)| match array.is_valid(row_idx) {
true => length.push(1 + rows.row(row_idx).as_ref().len()),
false => length.push(1 + null.data.len()),
});
}
Encoder::List(rows) => match array.data_type() {
DataType::List(_) => {
list::compute_lengths(&mut lengths, rows, as_list_array(array))
let mut x = vec![0; num_rows];
list::compute_lengths(&mut x, rows, as_list_array(array));
lengths
.iter_mut()
.enumerate()
.for_each(|(row_idx, length)| {
length.push(x[row_idx]);
});
}
DataType::LargeList(_) => {
list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
let mut x = vec![0; num_rows];
list::compute_lengths(&mut x, rows, as_large_list_array(array));
lengths
.iter_mut()
.enumerate()
.for_each(|(row_idx, length)| {
length.push(x[row_idx]);
});
}
_ => unreachable!(),
},
Expand Down

0 comments on commit a729f47

Please sign in to comment.