From e355b8ab7245567e2c3dbd55aa1f6d90378cb8eb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 9 Oct 2023 16:06:34 +0100 Subject: [PATCH] Support Arbitrary JSON values in JSON Reader (#4905) --- arrow-json/src/reader/mod.rs | 108 ++++++++++++++++++++++++++-------- arrow-json/src/reader/tape.rs | 60 +++++++++---------- arrow-json/src/writer.rs | 6 +- 3 files changed, 113 insertions(+), 61 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 4e98e2fd873a..8b4435c85205 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -17,9 +17,13 @@ //! JSON reader //! -//! This JSON reader allows JSON line-delimited files to be read into the Arrow memory -//! model. Records are loaded in batches and are then converted from row-based data to -//! columnar data. +//! This JSON reader allows JSON records to be read into the Arrow memory +//! model. Records are loaded in batches and are then converted from the record-oriented +//! representation to the columnar arrow data model. +//! +//! The reader is agnostic to whitespace, including `\n` and `\r`, and will ignore such characters. +//! This allows parsing sequences of one or more arbitrarily formatted JSON values, including +//! but not limited to newline-delimited JSON. //! //! # Basic Usage //! @@ -130,6 +134,7 @@ //! use std::io::BufRead; +use std::sync::Arc; use chrono::Utc; use serde::Serialize; @@ -137,9 +142,11 @@ use serde::Serialize; use arrow_array::timezone::Tz; use arrow_array::types::Float32Type; use arrow_array::types::*; -use arrow_array::{downcast_integer, RecordBatch, RecordBatchReader, StructArray}; +use arrow_array::{ + downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray, +}; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit}; +use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; pub use schema::*; use crate::reader::boolean_array::BooleanArrayDecoder; @@ -150,7 +157,7 @@ use crate::reader::null_array::NullArrayDecoder; use crate::reader::primitive_array::PrimitiveArrayDecoder; use crate::reader::string_array::StringArrayDecoder; use crate::reader::struct_array::StructArrayDecoder; -use crate::reader::tape::{Tape, TapeDecoder, TapeElement}; +use crate::reader::tape::{Tape, TapeDecoder}; use crate::reader::timestamp_array::TimestampArrayDecoder; mod boolean_array; @@ -171,6 +178,7 @@ pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, + is_field: bool, schema: SchemaRef, } @@ -189,10 +197,51 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + is_field: false, schema, } } + /// Create a new [`ReaderBuilder`] with the provided [`FieldRef`] + /// + /// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON schema + /// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s) + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::types::Int32Type; + /// # use arrow_json::ReaderBuilder; + /// # use arrow_schema::{DataType, Field}; + /// // Root of JSON schema is a numeric type + /// let data = "1\n2\n3\n"; + /// let field = Arc::new(Field::new("int", DataType::Int32, true)); + /// let mut reader = ReaderBuilder::new_field(field.clone()).build(data.as_bytes()).unwrap(); + /// let b = reader.next().unwrap().unwrap(); + /// let values = b.column(0).as_primitive::().values(); + /// assert_eq!(values, &[1, 2, 3]); + /// + /// // Root of JSON schema is a list type + /// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]"; + /// let field = Field::new_list("int", field.clone(), true); + /// let mut reader = ReaderBuilder::new_field(field).build(data.as_bytes()).unwrap(); + /// let b = reader.next().unwrap().unwrap(); + /// let list = b.column(0).as_list::(); + /// + /// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]); + /// let list_values = list.values().as_primitive::(); + /// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]); + /// ``` + pub fn new_field(field: impl Into) -> Self { + Self { + batch_size: 1024, + coerce_primitive: false, + strict_mode: false, + is_field: true, + schema: Arc::new(Schema::new([field.into()])), + } + } + /// Sets the batch size in rows to read pub fn with_batch_size(self, batch_size: usize) -> Self { Self { batch_size, ..self } @@ -233,16 +282,22 @@ impl ReaderBuilder { /// Create a [`Decoder`] pub fn build_decoder(self) -> Result { - let decoder = make_decoder( - DataType::Struct(self.schema.fields.clone()), - self.coerce_primitive, - self.strict_mode, - false, - )?; + let (data_type, nullable) = match self.is_field { + false => (DataType::Struct(self.schema.fields.clone()), false), + true => { + let field = &self.schema.fields[0]; + (field.data_type().clone(), field.is_nullable()) + } + }; + + let decoder = + make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?; + let num_fields = self.schema.all_fields().len(); Ok(Decoder { decoder, + is_field: self.is_field, tape_decoder: TapeDecoder::new(self.batch_size, num_fields), batch_size: self.batch_size, schema: self.schema, @@ -344,6 +399,7 @@ pub struct Decoder { tape_decoder: TapeDecoder, decoder: Box, batch_size: usize, + is_field: bool, schema: SchemaRef, } @@ -563,24 +619,20 @@ impl Decoder { let mut next_object = 1; let pos: Vec<_> = (0..tape.num_rows()) .map(|_| { - let end = match tape.get(next_object) { - TapeElement::StartObject(end) => end, - _ => unreachable!("corrupt tape"), - }; - std::mem::replace(&mut next_object, end + 1) + let next = tape.next(next_object, "row").unwrap(); + std::mem::replace(&mut next_object, next) }) .collect(); let decoded = self.decoder.decode(&tape, &pos)?; self.tape_decoder.clear(); - // Sanity check - assert!(matches!(decoded.data_type(), DataType::Struct(_))); - assert_eq!(decoded.null_count(), 0); - assert_eq!(decoded.len(), pos.len()); + let batch = match self.is_field { + true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?, + false => RecordBatch::from(StructArray::from(decoded)) + .with_schema(self.schema.clone())?, + }; - let batch = RecordBatch::from(StructArray::from(decoded)) - .with_schema(self.schema.clone())?; Ok(Some(batch)) } } @@ -2175,4 +2227,14 @@ mod tests { let values = batch.column(0).as_primitive::(); assert_eq!(values.values(), &[1681319393, -7200]); } + + #[test] + fn test_serde_field() { + let field = Field::new("int", DataType::Int32, true); + let mut decoder = ReaderBuilder::new_field(field).build_decoder().unwrap(); + decoder.serialize(&[1_i32, 2, 3, 4]).unwrap(); + let b = decoder.flush().unwrap().unwrap(); + let values = b.column(0).as_primitive::().values(); + assert_eq!(values, &[1, 2, 3, 4]); + } } diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 801e8f29d525..599190429515 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -297,7 +297,7 @@ macro_rules! next { pub struct TapeDecoder { elements: Vec, - num_rows: usize, + cur_row: usize, /// Number of rows to read per batch batch_size: usize, @@ -330,36 +330,34 @@ impl TapeDecoder { offsets, elements, batch_size, - num_rows: 0, + cur_row: 0, bytes: Vec::with_capacity(num_fields * 2 * 8), stack: Vec::with_capacity(10), } } pub fn decode(&mut self, buf: &[u8]) -> Result { - if self.num_rows >= self.batch_size { - return Ok(0); - } - let mut iter = BufIter::new(buf); while !iter.is_empty() { - match self.stack.last_mut() { - // Start of row + let state = match self.stack.last_mut() { + Some(l) => l, None => { - // Skip over leading whitespace iter.skip_whitespace(); - match next!(iter) { - b'{' => { - let idx = self.elements.len() as u32; - self.stack.push(DecoderState::Object(idx)); - self.elements.push(TapeElement::StartObject(u32::MAX)); - } - b => return Err(err(b, "trimming leading whitespace")), + if iter.is_empty() || self.cur_row >= self.batch_size { + break; } + + // Start of row + self.cur_row += 1; + self.stack.push(DecoderState::Value); + self.stack.last_mut().unwrap() } + }; + + match state { // Decoding an object - Some(DecoderState::Object(start_idx)) => { + DecoderState::Object(start_idx) => { iter.advance_until(|b| !json_whitespace(b) && b != b','); match next!(iter) { b'"' => { @@ -374,16 +372,12 @@ impl TapeDecoder { TapeElement::StartObject(end_idx); self.elements.push(TapeElement::EndObject(start_idx)); self.stack.pop(); - self.num_rows += self.stack.is_empty() as usize; - if self.num_rows >= self.batch_size { - break; - } } b => return Err(err(b, "parsing object")), } } // Decoding a list - Some(DecoderState::List(start_idx)) => { + DecoderState::List(start_idx) => { iter.advance_until(|b| !json_whitespace(b) && b != b','); match iter.peek() { Some(b']') => { @@ -400,7 +394,7 @@ impl TapeDecoder { } } // Decoding a string - Some(DecoderState::String) => { + DecoderState::String => { let s = iter.advance_until(|b| matches!(b, b'\\' | b'"')); self.bytes.extend_from_slice(s); @@ -415,7 +409,7 @@ impl TapeDecoder { b => unreachable!("{}", b), } } - Some(state @ DecoderState::Value) => { + state @ DecoderState::Value => { iter.skip_whitespace(); *state = match next!(iter) { b'"' => DecoderState::String, @@ -439,7 +433,7 @@ impl TapeDecoder { b => return Err(err(b, "parsing value")), }; } - Some(DecoderState::Number) => { + DecoderState::Number => { let s = iter.advance_until(|b| { !matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' | b'E') }); @@ -452,14 +446,14 @@ impl TapeDecoder { self.offsets.push(self.bytes.len()); } } - Some(DecoderState::Colon) => { + DecoderState::Colon => { iter.skip_whitespace(); match next!(iter) { b':' => self.stack.pop(), b => return Err(err(b, "parsing colon")), }; } - Some(DecoderState::Literal(literal, idx)) => { + DecoderState::Literal(literal, idx) => { let bytes = literal.bytes(); let expected = bytes.iter().skip(*idx as usize).copied(); for (expected, b) in expected.zip(&mut iter) { @@ -474,7 +468,7 @@ impl TapeDecoder { self.elements.push(element); } } - Some(DecoderState::Escape) => { + DecoderState::Escape => { let v = match next!(iter) { b'u' => { self.stack.pop(); @@ -496,7 +490,7 @@ impl TapeDecoder { self.bytes.push(v); } // Parse a unicode escape sequence - Some(DecoderState::Unicode(high, low, idx)) => loop { + DecoderState::Unicode(high, low, idx) => loop { match *idx { 0..=3 => *high = *high << 4 | parse_hex(next!(iter))? as u16, 4 => { @@ -547,7 +541,7 @@ impl TapeDecoder { .try_for_each(|row| row.serialize(&mut serializer)) .map_err(|e| ArrowError::JsonError(e.to_string()))?; - self.num_rows += rows.len(); + self.cur_row += rows.len(); Ok(()) } @@ -591,7 +585,7 @@ impl TapeDecoder { strings, elements: &self.elements, string_offsets: &self.offsets, - num_rows: self.num_rows, + num_rows: self.cur_row, }) } @@ -599,7 +593,7 @@ impl TapeDecoder { pub fn clear(&mut self) { assert!(self.stack.is_empty()); - self.num_rows = 0; + self.cur_row = 0; self.bytes.clear(); self.elements.clear(); self.elements.push(TapeElement::Null); @@ -837,7 +831,7 @@ mod tests { let err = decoder.decode(b"hello").unwrap_err().to_string(); assert_eq!( err, - "Json error: Encountered unexpected 'h' whilst trimming leading whitespace" + "Json error: Encountered unexpected 'h' whilst parsing value" ); let mut decoder = TapeDecoder::new(16, 2); diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs index db371b59080a..8c4145bc95b4 100644 --- a/arrow-json/src/writer.rs +++ b/arrow-json/src/writer.rs @@ -1338,11 +1338,7 @@ mod tests { let batch = reader.next().unwrap().unwrap(); - let list_row = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + let list_row = batch.column(0).as_list::(); let values = list_row.values(); assert_eq!(values.len(), 4); assert_eq!(values.null_count(), 1);