Skip to content

Commit

Permalink
Support Arbitrary JSON values in JSON Reader (#4905)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 9, 2023
1 parent 431df70 commit e355b8a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 61 deletions.
108 changes: 85 additions & 23 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

//! JSON reader
//!
//! This JSON reader allows JSON line-delimited files to be read into the Arrow memory
//! model. Records are loaded in batches and are then converted from row-based data to
//! columnar data.
//! This JSON reader allows JSON records to be read into the Arrow memory
//! model. Records are loaded in batches and are then converted from the record-oriented
//! representation to the columnar arrow data model.
//!
//! The reader is agnostic to whitespace, including `\n` and `\r`, and will ignore such characters.
//! This allows parsing sequences of one or more arbitrarily formatted JSON values, including
//! but not limited to newline-delimited JSON.
//!
//! # Basic Usage
//!
Expand Down Expand Up @@ -130,16 +134,19 @@
//!
use std::io::BufRead;
use std::sync::Arc;

use chrono::Utc;
use serde::Serialize;

use arrow_array::timezone::Tz;
use arrow_array::types::Float32Type;
use arrow_array::types::*;
use arrow_array::{downcast_integer, RecordBatch, RecordBatchReader, StructArray};
use arrow_array::{
downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray,
};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
pub use schema::*;

use crate::reader::boolean_array::BooleanArrayDecoder;
Expand All @@ -150,7 +157,7 @@ use crate::reader::null_array::NullArrayDecoder;
use crate::reader::primitive_array::PrimitiveArrayDecoder;
use crate::reader::string_array::StringArrayDecoder;
use crate::reader::struct_array::StructArrayDecoder;
use crate::reader::tape::{Tape, TapeDecoder, TapeElement};
use crate::reader::tape::{Tape, TapeDecoder};
use crate::reader::timestamp_array::TimestampArrayDecoder;

mod boolean_array;
Expand All @@ -171,6 +178,7 @@ pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
is_field: bool,

schema: SchemaRef,
}
Expand All @@ -189,10 +197,51 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
is_field: false,
schema,
}
}

/// Create a new [`ReaderBuilder`] with the provided [`FieldRef`]
///
/// Unlike [`ReaderBuilder::new`] this does not require the root of the JSON schema
/// to be an object, i.e. `{..}`, allowing for parsing of any valid JSON value(s)
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::Int32Type;
/// # use arrow_json::ReaderBuilder;
/// # use arrow_schema::{DataType, Field};
/// // Root of JSON schema is a numeric type
/// let data = "1\n2\n3\n";
/// let field = Arc::new(Field::new("int", DataType::Int32, true));
/// let mut reader = ReaderBuilder::new_field(field.clone()).build(data.as_bytes()).unwrap();
/// let b = reader.next().unwrap().unwrap();
/// let values = b.column(0).as_primitive::<Int32Type>().values();
/// assert_eq!(values, &[1, 2, 3]);
///
/// // Root of JSON schema is a list type
/// let data = "[1, 2, 3, 4, 5, 6, 7]\n[1, 2, 3]";
/// let field = Field::new_list("int", field.clone(), true);
/// let mut reader = ReaderBuilder::new_field(field).build(data.as_bytes()).unwrap();
/// let b = reader.next().unwrap().unwrap();
/// let list = b.column(0).as_list::<i32>();
///
/// assert_eq!(list.offsets().as_ref(), &[0, 7, 10]);
/// let list_values = list.values().as_primitive::<Int32Type>();
/// assert_eq!(list_values.values(), &[1, 2, 3, 4, 5, 6, 7, 1, 2, 3]);
/// ```
pub fn new_field(field: impl Into<FieldRef>) -> Self {
Self {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
is_field: true,
schema: Arc::new(Schema::new([field.into()])),
}
}

/// Sets the batch size in rows to read
pub fn with_batch_size(self, batch_size: usize) -> Self {
Self { batch_size, ..self }
Expand Down Expand Up @@ -233,16 +282,22 @@ impl ReaderBuilder {

/// Create a [`Decoder`]
pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
let decoder = make_decoder(
DataType::Struct(self.schema.fields.clone()),
self.coerce_primitive,
self.strict_mode,
false,
)?;
let (data_type, nullable) = match self.is_field {
false => (DataType::Struct(self.schema.fields.clone()), false),
true => {
let field = &self.schema.fields[0];
(field.data_type().clone(), field.is_nullable())
}
};

let decoder =
make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?;

let num_fields = self.schema.all_fields().len();

Ok(Decoder {
decoder,
is_field: self.is_field,
tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
batch_size: self.batch_size,
schema: self.schema,
Expand Down Expand Up @@ -344,6 +399,7 @@ pub struct Decoder {
tape_decoder: TapeDecoder,
decoder: Box<dyn ArrayDecoder>,
batch_size: usize,
is_field: bool,
schema: SchemaRef,
}

Expand Down Expand Up @@ -563,24 +619,20 @@ impl Decoder {
let mut next_object = 1;
let pos: Vec<_> = (0..tape.num_rows())
.map(|_| {
let end = match tape.get(next_object) {
TapeElement::StartObject(end) => end,
_ => unreachable!("corrupt tape"),
};
std::mem::replace(&mut next_object, end + 1)
let next = tape.next(next_object, "row").unwrap();
std::mem::replace(&mut next_object, next)
})
.collect();

let decoded = self.decoder.decode(&tape, &pos)?;
self.tape_decoder.clear();

// Sanity check
assert!(matches!(decoded.data_type(), DataType::Struct(_)));
assert_eq!(decoded.null_count(), 0);
assert_eq!(decoded.len(), pos.len());
let batch = match self.is_field {
true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
false => RecordBatch::from(StructArray::from(decoded))
.with_schema(self.schema.clone())?,
};

let batch = RecordBatch::from(StructArray::from(decoded))
.with_schema(self.schema.clone())?;
Ok(Some(batch))
}
}
Expand Down Expand Up @@ -2175,4 +2227,14 @@ mod tests {
let values = batch.column(0).as_primitive::<TimestampSecondType>();
assert_eq!(values.values(), &[1681319393, -7200]);
}

#[test]
fn test_serde_field() {
let field = Field::new("int", DataType::Int32, true);
let mut decoder = ReaderBuilder::new_field(field).build_decoder().unwrap();
decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
let b = decoder.flush().unwrap().unwrap();
let values = b.column(0).as_primitive::<Int32Type>().values();
assert_eq!(values, &[1, 2, 3, 4]);
}
}
60 changes: 27 additions & 33 deletions arrow-json/src/reader/tape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ macro_rules! next {
pub struct TapeDecoder {
elements: Vec<TapeElement>,

num_rows: usize,
cur_row: usize,

/// Number of rows to read per batch
batch_size: usize,
Expand Down Expand Up @@ -330,36 +330,34 @@ impl TapeDecoder {
offsets,
elements,
batch_size,
num_rows: 0,
cur_row: 0,
bytes: Vec::with_capacity(num_fields * 2 * 8),
stack: Vec::with_capacity(10),
}
}

pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
if self.num_rows >= self.batch_size {
return Ok(0);
}

let mut iter = BufIter::new(buf);

while !iter.is_empty() {
match self.stack.last_mut() {
// Start of row
let state = match self.stack.last_mut() {
Some(l) => l,
None => {
// Skip over leading whitespace
iter.skip_whitespace();
match next!(iter) {
b'{' => {
let idx = self.elements.len() as u32;
self.stack.push(DecoderState::Object(idx));
self.elements.push(TapeElement::StartObject(u32::MAX));
}
b => return Err(err(b, "trimming leading whitespace")),
if iter.is_empty() || self.cur_row >= self.batch_size {
break;
}

// Start of row
self.cur_row += 1;
self.stack.push(DecoderState::Value);
self.stack.last_mut().unwrap()
}
};

match state {
// Decoding an object
Some(DecoderState::Object(start_idx)) => {
DecoderState::Object(start_idx) => {
iter.advance_until(|b| !json_whitespace(b) && b != b',');
match next!(iter) {
b'"' => {
Expand All @@ -374,16 +372,12 @@ impl TapeDecoder {
TapeElement::StartObject(end_idx);
self.elements.push(TapeElement::EndObject(start_idx));
self.stack.pop();
self.num_rows += self.stack.is_empty() as usize;
if self.num_rows >= self.batch_size {
break;
}
}
b => return Err(err(b, "parsing object")),
}
}
// Decoding a list
Some(DecoderState::List(start_idx)) => {
DecoderState::List(start_idx) => {
iter.advance_until(|b| !json_whitespace(b) && b != b',');
match iter.peek() {
Some(b']') => {
Expand All @@ -400,7 +394,7 @@ impl TapeDecoder {
}
}
// Decoding a string
Some(DecoderState::String) => {
DecoderState::String => {
let s = iter.advance_until(|b| matches!(b, b'\\' | b'"'));
self.bytes.extend_from_slice(s);

Expand All @@ -415,7 +409,7 @@ impl TapeDecoder {
b => unreachable!("{}", b),
}
}
Some(state @ DecoderState::Value) => {
state @ DecoderState::Value => {
iter.skip_whitespace();
*state = match next!(iter) {
b'"' => DecoderState::String,
Expand All @@ -439,7 +433,7 @@ impl TapeDecoder {
b => return Err(err(b, "parsing value")),
};
}
Some(DecoderState::Number) => {
DecoderState::Number => {
let s = iter.advance_until(|b| {
!matches!(b, b'0'..=b'9' | b'-' | b'+' | b'.' | b'e' | b'E')
});
Expand All @@ -452,14 +446,14 @@ impl TapeDecoder {
self.offsets.push(self.bytes.len());
}
}
Some(DecoderState::Colon) => {
DecoderState::Colon => {
iter.skip_whitespace();
match next!(iter) {
b':' => self.stack.pop(),
b => return Err(err(b, "parsing colon")),
};
}
Some(DecoderState::Literal(literal, idx)) => {
DecoderState::Literal(literal, idx) => {
let bytes = literal.bytes();
let expected = bytes.iter().skip(*idx as usize).copied();
for (expected, b) in expected.zip(&mut iter) {
Expand All @@ -474,7 +468,7 @@ impl TapeDecoder {
self.elements.push(element);
}
}
Some(DecoderState::Escape) => {
DecoderState::Escape => {
let v = match next!(iter) {
b'u' => {
self.stack.pop();
Expand All @@ -496,7 +490,7 @@ impl TapeDecoder {
self.bytes.push(v);
}
// Parse a unicode escape sequence
Some(DecoderState::Unicode(high, low, idx)) => loop {
DecoderState::Unicode(high, low, idx) => loop {
match *idx {
0..=3 => *high = *high << 4 | parse_hex(next!(iter))? as u16,
4 => {
Expand Down Expand Up @@ -547,7 +541,7 @@ impl TapeDecoder {
.try_for_each(|row| row.serialize(&mut serializer))
.map_err(|e| ArrowError::JsonError(e.to_string()))?;

self.num_rows += rows.len();
self.cur_row += rows.len();

Ok(())
}
Expand Down Expand Up @@ -591,15 +585,15 @@ impl TapeDecoder {
strings,
elements: &self.elements,
string_offsets: &self.offsets,
num_rows: self.num_rows,
num_rows: self.cur_row,
})
}

/// Clears this [`TapeDecoder`] in preparation to read the next batch
pub fn clear(&mut self) {
assert!(self.stack.is_empty());

self.num_rows = 0;
self.cur_row = 0;
self.bytes.clear();
self.elements.clear();
self.elements.push(TapeElement::Null);
Expand Down Expand Up @@ -837,7 +831,7 @@ mod tests {
let err = decoder.decode(b"hello").unwrap_err().to_string();
assert_eq!(
err,
"Json error: Encountered unexpected 'h' whilst trimming leading whitespace"
"Json error: Encountered unexpected 'h' whilst parsing value"
);

let mut decoder = TapeDecoder::new(16, 2);
Expand Down
6 changes: 1 addition & 5 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,11 +1338,7 @@ mod tests {

let batch = reader.next().unwrap().unwrap();

let list_row = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let list_row = batch.column(0).as_list::<i32>();
let values = list_row.values();
assert_eq!(values.len(), 4);
assert_eq!(values.null_count(), 1);
Expand Down

0 comments on commit e355b8a

Please sign in to comment.