From c3b5fc9951fb3a9c453b905b2e1173665acac4c4 Mon Sep 17 00:00:00 2001 From: James Gill Date: Tue, 15 Oct 2024 11:09:18 -0400 Subject: [PATCH] Add support for deserializing list-encoded JSON structs [#6558] Currently, a StructArray can only be deserialized from a JSON object (e.g. `{a: 1, b: "c"}`), but some services (e.g. Presto and Trino) encode ROW types as JSON lists (e.g. `[1, "c"]`) because this is more compact, and the schema is known. This PR adds the ability to parse JSON lists into StructArrays, if the StructParseMode is set to ListOnly. In ListOnly mode, object-encoded structs raise an error. Setting to ObjectOnly (the default) has the original parsing behavior. Some notes/questions/points for discussion: 1. I've made a JsonParseMode struct instead of a bool flag for two reasons. One is that it's self-descriptive (what would `true` be?), and the other is that it allows a future Mixed mode that could deserialize either. The latter isn't currently requested by anyone. 2. I kept the error messages as similar to the old messages as possible. I considered having more specific error messages (like "Encountered a '[' when parsing a Struct, but the StructParseMode is ObjectOnly" or similar), but wanted to hear opinions before I went that route. 3. I'm not attached to any name/code-style/etc, so happy to modify to fit local conventions. Fixes #6558 --- arrow-json/src/reader/list_array.rs | 4 +- arrow-json/src/reader/map_array.rs | 5 +- arrow-json/src/reader/mod.rs | 264 +++++++++++++++++++++++++- arrow-json/src/reader/struct_array.rs | 82 +++++--- 4 files changed, 322 insertions(+), 33 deletions(-) diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index b6f8c18ea9c3..92484e23c062 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_array::OffsetSizeTrait; use arrow_buffer::buffer::NullBuffer; @@ -37,6 +37,7 @@ impl ListArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let field = match &data_type { DataType::List(f) if !O::IS_LARGE => f, @@ -48,6 +49,7 @@ impl ListArrayDecoder { coerce_primitive, strict_mode, field.is_nullable(), + struct_parse_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index cd1ca5f71fa9..4433dad05c08 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::ArrowNativeType; @@ -36,6 +36,7 @@ impl MapArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let fields = match &data_type { DataType::Map(_, true) => { @@ -59,12 +60,14 @@ impl MapArrayDecoder { coerce_primitive, strict_mode, fields[0].is_nullable(), + struct_parse_mode, )?; let values = make_decoder( fields[1].data_type().clone(), coerce_primitive, strict_mode, fields[1].is_nullable(), + struct_parse_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f857e8813c7e..6fb5631b1646 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -170,12 +170,30 @@ mod struct_array; mod tape; mod timestamp_array; +/// Specifies what is considered valid JSON when parsing StructArrays. +/// +/// If a struct with fields `("a", Int32)` and `("b", Utf8)`, it could be represented as +/// a JSON object (`{"a": 1, "b": "c"}`) or a JSON list (`[1, "c"]`). This enum controls +/// which form(s) the Reader will accept. +/// +/// For objects, the order of the key does not matter. +/// For lists, the entries must be the same number and in the same order as the struct fields. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum StructParseMode { + #[default] + /// Only parse objects (e.g., {"a": 1, "b": "c"}) + ObjectOnly, + /// Only parse lists (e.g., [1, "c"]) + ListOnly, +} + /// A builder for [`Reader`] and [`Decoder`] pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, is_field: bool, + struct_parse_mode: StructParseMode, schema: SchemaRef, } @@ -195,6 +213,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: false, + struct_parse_mode: StructParseMode::ObjectOnly, schema, } } @@ -235,6 +254,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: true, + struct_parse_mode: StructParseMode::ObjectOnly, schema: Arc::new(Schema::new([field.into()])), } } @@ -262,6 +282,15 @@ impl ReaderBuilder { } } + /// Set the [`StructParseMode`] for the reader, which determines whether + /// structs can be represented by JSON objects, lists, or either. + pub fn with_struct_parse_mode(self, struct_parse_mode: StructParseMode) -> Self { + Self { + struct_parse_mode, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -280,7 +309,13 @@ impl ReaderBuilder { } }; - let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?; + let decoder = make_decoder( + data_type, + self.coerce_primitive, + self.strict_mode, + nullable, + self.struct_parse_mode, + )?; let num_fields = self.schema.flattened_fields().len(); @@ -643,6 +678,7 @@ fn make_decoder( coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result, ArrowError> { downcast_integer! { data_type => (primitive_decoder, data_type), @@ -693,13 +729,13 @@ fn make_decoder( DataType::Boolean => Ok(Box::::default()), DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON"))) } - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } @@ -715,7 +751,7 @@ mod tests { use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; - use arrow_schema::Field; + use arrow_schema::{Field, Fields}; use super::*; @@ -2343,4 +2379,220 @@ mod tests { .unwrap() ); } + + #[test] + fn test_struct_decoding_list_length() { + use arrow_array::array; + + let row = "[1, 2]"; + + let mut fields = vec![Field::new("a", DataType::Int32, true)]; + let too_few_fields = Fields::from(fields.clone()); + fields.push(Field::new("b", DataType::Int32, true)); + let correct_fields = Fields::from(fields.clone()); + fields.push(Field::new("c", DataType::Int32, true)); + let too_many_fields = Fields::from(fields.clone()); + + let parse = |fields: Fields, as_field: bool| { + let builder = if as_field { + ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true)) + } else { + ReaderBuilder::new(Arc::new(Schema::new(fields))) + }; + builder + .with_struct_parse_mode(StructParseMode::ListOnly) + .build(Cursor::new(row.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + let expected_row = StructArray::new( + correct_fields.clone(), + vec![ + Arc::new(array::Int32Array::from(vec![1])), + Arc::new(array::Int32Array::from(vec![2])), + ], + None, + ); + let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true); + + assert_eq!( + parse(too_few_fields.clone(), true).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(too_few_fields, false).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(correct_fields.clone(), true).unwrap(), + RecordBatch::try_new( + Arc::new(Schema::new(vec![row_field])), + vec![Arc::new(expected_row.clone())] + ) + .unwrap() + ); + assert_eq!( + parse(correct_fields, false).unwrap(), + RecordBatch::from(expected_row) + ); + assert_eq!( + parse(too_many_fields.clone(), true) + .unwrap_err() + .to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + assert_eq!( + parse(too_many_fields, false).unwrap_err().to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + } + + #[test] + fn test_struct_decoding() { + use arrow_array::builder; + + let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#; + let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#; + let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#; + + let struct_fields = Fields::from(vec![ + Field::new("b", DataType::new_list(DataType::Int32, true), true), + Field::new_map( + "c", + "entries", + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + false, + false, + ), + ]); + + let list_array = + ListArray::from_iter_primitive::(vec![Some(vec![Some(1), Some(2)])]); + + let map_array = { + let mut map_builder = builder::MapBuilder::new( + None, + builder::StringBuilder::new(), + builder::Int32Builder::new(), + ); + map_builder.keys().append_value("d"); + map_builder.values().append_value(3); + map_builder.append(true).unwrap(); + map_builder.finish() + }; + + let struct_array = StructArray::new( + struct_fields.clone(), + vec![Arc::new(list_array), Arc::new(map_array)], + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Struct(struct_fields), + true, + )])); + let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + let parse = |s: &str, mode: StructParseMode| { + ReaderBuilder::new(schema.clone()) + .with_struct_parse_mode(mode) + .build(Cursor::new(s.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse(nested_object_json, StructParseMode::ObjectOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_list_json, StructParseMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructParseMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned() + ); + + assert_eq!( + parse(nested_list_json, StructParseMode::ListOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_object_json, StructParseMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructParseMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned() + ); + } + + // Test cases: + // [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error + // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error + // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error + // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error + #[test] + fn test_struct_decoding_empty_list() { + let int_field = Field::new("a", DataType::Int32, true); + let struct_field = Field::new( + "r", + DataType::Struct(Fields::from(vec![int_field.clone()])), + true, + ); + + let parse = |json: &str, as_field: bool, field: Field| { + let builder = if as_field { + ReaderBuilder::new_with_field(field.clone()) + } else { + ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone()))) + }; + builder + .with_struct_parse_mode(StructParseMode::ListOnly) + .build(Cursor::new(json.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse("[]", true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, int_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + + assert_eq!( + parse("[[]]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned() + ); + } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 6c805591d390..9931744aaf81 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructParseMode}; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; @@ -27,6 +27,7 @@ pub struct StructArrayDecoder { decoders: Vec>, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, } impl StructArrayDecoder { @@ -35,6 +36,7 @@ impl StructArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_parse_mode: StructParseMode, ) -> Result { let decoders = struct_fields(&data_type) .iter() @@ -48,6 +50,7 @@ impl StructArrayDecoder { coerce_primitive, strict_mode, nullable, + struct_parse_mode, ) }) .collect::, ArrowError>>()?; @@ -57,6 +60,7 @@ impl StructArrayDecoder { decoders, strict_mode, is_nullable, + struct_parse_mode, }) } } @@ -71,42 +75,70 @@ impl ArrayDecoder for StructArrayDecoder { .then(|| BooleanBufferBuilder::new(pos.len())); for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartObject(end_idx), None) => end_idx, - (TapeElement::StartObject(end_idx), Some(nulls)) => { + let end_idx = match (tape.get(*p), nulls.as_mut(), self.struct_parse_mode) { + (TapeElement::StartObject(end_idx), None, StructParseMode::ObjectOnly) => end_idx, + (TapeElement::StartObject(end_idx), Some(nulls), StructParseMode::ObjectOnly) => { nulls.append(true); end_idx } - (TapeElement::Null, Some(nulls)) => { + (TapeElement::StartList(end_idx), None, StructParseMode::ListOnly) => end_idx, + (TapeElement::StartList(end_idx), Some(nulls), StructParseMode::ListOnly) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls), _) => { nulls.append(false); continue; } - _ => return Err(tape.error(*p, "{")), + (_, _, StructParseMode::ObjectOnly) => return Err(tape.error(*p, "{")), + (_, _, StructParseMode::ListOnly) => return Err(tape.error(*p, "[")), }; let mut cur_idx = *p + 1; - while cur_idx < end_idx { - // Read field name - let field_name = match tape.get(cur_idx) { - TapeElement::String(s) => tape.get_string(s), - _ => return Err(tape.error(cur_idx, "field name")), - }; - - // Update child pos if match found - match fields.iter().position(|x| x.name() == field_name) { - Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, - None => { - if self.strict_mode { - return Err(ArrowError::JsonError(format!( - "column '{}' missing from schema", - field_name - ))); + if self.struct_parse_mode == StructParseMode::ObjectOnly { + while cur_idx < end_idx { + // Read field name + let field_name = match tape.get(cur_idx) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(cur_idx, "field name")), + }; + + // Update child pos if match found + match fields.iter().position(|x| x.name() == field_name) { + Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, + None => { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{}' missing from schema", + field_name + ))); + } } } + // Advance to next field + cur_idx = tape.next(cur_idx + 1, "field value")?; + } + } else { + let mut entry_idx = 0; + while cur_idx < end_idx { + if entry_idx >= fields.len() { + return Err(ArrowError::JsonError(format!( + "found extra columns for {} fields", + fields.len() + ))); + } + child_pos[entry_idx][row] = cur_idx; + entry_idx += 1; + // Advance to next field + cur_idx = tape.next(cur_idx, "field value")?; + } + if entry_idx != fields.len() { + return Err(ArrowError::JsonError(format!( + "found {} columns for {} fields", + entry_idx, + fields.len() + ))); } - - // Advance to next field - cur_idx = tape.next(cur_idx + 1, "field value")?; } }