Skip to content

Commit

Permalink
fix(rust): stats_parsed has different number of records with stats (#…
Browse files Browse the repository at this point in the history
…2405)

# Description
- `stats_parsed` is a StructArray instead of StringArray
- Parse `Add` action's `stats` to `stats_parsed` would panic due to the
use of `slice.array_data()`.

# Related Issue(s)
<!---
For example:

- 
--->

closes #2312 

# Documentation

<!---
Share links to useful documentation
--->

https://docs.rs/arrow/51.0.0/arrow/array/struct.GenericByteArray.html#method.value_data

---------

Co-authored-by: R. Tyler Croy <[email protected]>
  • Loading branch information
yjshen and rtyler authored Apr 15, 2024
1 parent faa743a commit aa8f4d5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
65 changes: 59 additions & 6 deletions crates/core/src/kernel/arrow/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ pub(crate) fn parse_json(
for it in 0..json_strings.len() {
if json_strings.is_null(it) {
if value_count > 0 {
let slice = json_strings.slice(value_start, value_count);
let batch = decode_reader(&mut decoder, get_reader(slice.value_data()))
.collect::<Result<Vec<_>, _>>()?;
let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count);
let batch =
decode_reader(&mut decoder, get_reader(&slice_data))
.collect::<Result<Vec<_>, _>>()?;
batches.extend(batch);
value_count = 0;
}
Expand All @@ -86,15 +87,28 @@ pub(crate) fn parse_json(
}

if value_count > 0 {
let slice = json_strings.slice(value_start, value_count);
let batch = decode_reader(&mut decoder, get_reader(slice.value_data()))
.collect::<Result<Vec<_>, _>>()?;
let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count);
let batch =
decode_reader(&mut decoder, get_reader(&slice_data)).collect::<Result<Vec<_>, _>>()?;
batches.extend(batch);
}

Ok(concat_batches(&output_schema, &batches)?)
}

/// Get the data of a slice of non-null JSON strings.
fn get_nonnull_slice_data(
json_strings: &StringArray,
value_start: usize,
value_count: usize,
) -> Vec<u8> {
let slice = json_strings.slice(value_start, value_count);
slice.iter().fold(Vec::new(), |mut acc, s| {
acc.extend_from_slice(s.unwrap().as_bytes());
acc
})
}

/// Decode a stream of bytes into a stream of record batches.
pub(crate) fn decode_stream<S: Stream<Item = ObjectStoreResult<Bytes>> + Unpin>(
mut decoder: Decoder,
Expand Down Expand Up @@ -148,3 +162,42 @@ pub(crate) fn decode_reader<'a, R: BufRead + 'a>(
};
std::iter::from_fn(move || next().map_err(DeltaTableError::from).transpose())
}

#[cfg(test)]
mod tests {
use crate::kernel::arrow::json::parse_json;
use crate::DeltaTableConfig;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;

#[test]
fn json_to_struct() {
let json_strings = StringArray::from(vec![
Some(r#"{"a": 1, "b": "foo"}"#),
Some(r#"{"a": 2, "b": "bar"}"#),
None,
Some(r#"{"a": 3, "b": "baz"}"#),
]);
let struct_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]));
let config = DeltaTableConfig::default();
let result = parse_json(&json_strings, struct_schema.clone(), &config).unwrap();
let expected = RecordBatch::try_new(
struct_schema,
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(3)])),
Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
None,
Some("baz"),
])),
],
)
.unwrap();
assert_eq!(result, expected);
}
}
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn map_batch(
config: &DeltaTableConfig,
) -> DeltaResult<RecordBatch> {
let stats_col = ex::extract_and_cast_opt::<StringArray>(&batch, "add.stats");
let stats_parsed_col = ex::extract_and_cast_opt::<StringArray>(&batch, "add.stats_parsed");
let stats_parsed_col = ex::extract_and_cast_opt::<StructArray>(&batch, "add.stats_parsed");
if stats_parsed_col.is_some() {
return Ok(batch);
}
Expand Down

0 comments on commit aa8f4d5

Please sign in to comment.