diff --git a/crates/core/src/kernel/arrow/json.rs b/crates/core/src/kernel/arrow/json.rs index dcb56d308a..ed31a7b64e 100644 --- a/crates/core/src/kernel/arrow/json.rs +++ b/crates/core/src/kernel/arrow/json.rs @@ -62,9 +62,10 @@ pub(crate) fn parse_json( for it in 0..json_strings.len() { if json_strings.is_null(it) { if value_count > 0 { - let slice = json_strings.slice(value_start, value_count); - let batch = decode_reader(&mut decoder, get_reader(slice.value_data())) - .collect::, _>>()?; + let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count); + let batch = + decode_reader(&mut decoder, get_reader(&slice_data)) + .collect::, _>>()?; batches.extend(batch); value_count = 0; } @@ -86,15 +87,28 @@ pub(crate) fn parse_json( } if value_count > 0 { - let slice = json_strings.slice(value_start, value_count); - let batch = decode_reader(&mut decoder, get_reader(slice.value_data())) - .collect::, _>>()?; + let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count); + let batch = + decode_reader(&mut decoder, get_reader(&slice_data)).collect::, _>>()?; batches.extend(batch); } Ok(concat_batches(&output_schema, &batches)?) } +/// Get the data of a slice of non-null JSON strings. +fn get_nonnull_slice_data( + json_strings: &StringArray, + value_start: usize, + value_count: usize, +) -> Vec { + let slice = json_strings.slice(value_start, value_count); + slice.iter().fold(Vec::new(), |mut acc, s| { + acc.extend_from_slice(s.unwrap().as_bytes()); + acc + }) +} + /// Decode a stream of bytes into a stream of record batches. pub(crate) fn decode_stream> + Unpin>( mut decoder: Decoder, @@ -148,3 +162,42 @@ pub(crate) fn decode_reader<'a, R: BufRead + 'a>( }; std::iter::from_fn(move || next().map_err(DeltaTableError::from).transpose()) } + +#[cfg(test)] +mod tests { + use crate::kernel::arrow::json::parse_json; + use crate::DeltaTableConfig; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + + #[test] + fn json_to_struct() { + let json_strings = StringArray::from(vec![ + Some(r#"{"a": 1, "b": "foo"}"#), + Some(r#"{"a": 2, "b": "bar"}"#), + None, + Some(r#"{"a": 3, "b": "baz"}"#), + ]); + let struct_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ])); + let config = DeltaTableConfig::default(); + let result = parse_json(&json_strings, struct_schema.clone(), &config).unwrap(); + let expected = RecordBatch::try_new( + struct_schema, + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), None, Some(3)])), + Arc::new(StringArray::from(vec![ + Some("foo"), + Some("bar"), + None, + Some("baz"), + ])), + ], + ) + .unwrap(); + assert_eq!(result, expected); + } +} diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 71408b27d5..61cdab4c09 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -79,7 +79,7 @@ fn map_batch( config: &DeltaTableConfig, ) -> DeltaResult { let stats_col = ex::extract_and_cast_opt::(&batch, "add.stats"); - let stats_parsed_col = ex::extract_and_cast_opt::(&batch, "add.stats_parsed"); + let stats_parsed_col = ex::extract_and_cast_opt::(&batch, "add.stats_parsed"); if stats_parsed_col.is_some() { return Ok(batch); }