diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 977ed4390c99..df38a52811c2 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -54,3 +54,10 @@ serde = { version = "1.0", default-features = false, features = ["derive"] } futures = "0.3" tokio = { version = "1.27", default-features = false, features = ["io-util"] } bytes = "1.4" +criterion = { version = "0.5", default-features = false } +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } + +[[bench]] +name = "serde" +harness = false + diff --git a/arrow-json/benches/serde.rs b/arrow-json/benches/serde.rs new file mode 100644 index 000000000000..7636b9c9dff9 --- /dev/null +++ b/arrow-json/benches/serde.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_json::ReaderBuilder; +use arrow_schema::{DataType, Field, Schema}; +use criterion::*; +use rand::{thread_rng, Rng}; +use serde::Serialize; +use std::sync::Arc; + +#[allow(deprecated)] +fn do_bench(c: &mut Criterion, name: &str, rows: &[R], schema: &Schema) { + let schema = Arc::new(schema.clone()); + c.bench_function(name, |b| { + b.iter(|| { + let builder = ReaderBuilder::new(schema.clone()).with_batch_size(64); + let mut decoder = builder.build_decoder().unwrap(); + decoder.serialize(rows) + }) + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = thread_rng(); + let schema = Schema::new(vec![Field::new("i32", DataType::Int32, false)]); + let v: Vec = (0..2048).map(|_| rng.gen_range(0..10000)).collect(); + + do_bench(c, "small_i32", &v, &schema); + let v: Vec = (0..2048).map(|_| rng.gen()).collect(); + do_bench(c, "large_i32", &v, &schema); + + let schema = Schema::new(vec![Field::new("i64", DataType::Int64, false)]); + let v: Vec = (0..2048).map(|_| rng.gen_range(0..10000)).collect(); + do_bench(c, "small_i64", &v, &schema); + let v: Vec = (0..2048).map(|_| rng.gen_range(0..i32::MAX as _)).collect(); + do_bench(c, "medium_i64", &v, &schema); + let v: Vec = (0..2048).map(|_| rng.gen()).collect(); + do_bench(c, "large_i64", &v, &schema); + + let schema = Schema::new(vec![Field::new("f32", DataType::Float32, false)]); + let v: Vec = (0..2048).map(|_| rng.gen_range(0.0..10000.)).collect(); + do_bench(c, "small_f32", &v, &schema); + let v: Vec = (0..2048).map(|_| rng.gen_range(0.0..f32::MAX)).collect(); + do_bench(c, "large_f32", &v, &schema); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index c78e4d914060..6cf0bac86737 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -91,11 +91,12 @@ impl PrimitiveArrayDecoder

{ impl

ArrayDecoder for PrimitiveArrayDecoder

where P: ArrowPrimitiveType + Parser, - P::Native: ParseJsonNumber, + P::Native: ParseJsonNumber + NumCast, { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()) .with_data_type(self.data_type.clone()); + let d = &self.data_type; for p in pos { match tape.get(*p) { @@ -103,10 +104,7 @@ where TapeElement::String(idx) => { let s = tape.get_string(idx); let value = P::parse(s).ok_or_else(|| { - ArrowError::JsonError(format!( - "failed to parse \"{s}\" as {}", - self.data_type - )) + ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",)) })?; builder.append_value(value) @@ -115,14 +113,44 @@ where let s = tape.get_string(idx); let value = ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| { - ArrowError::JsonError(format!( - "failed to parse {s} as {}", - self.data_type - )) + ArrowError::JsonError(format!("failed to parse {s} as {d}",)) })?; builder.append_value(value) } + TapeElement::F32(v) => { + let v = f32::from_bits(v); + let value = NumCast::from(v).ok_or_else(|| { + ArrowError::JsonError(format!("failed to parse {v} as {d}",)) + })?; + builder.append_value(value) + } + TapeElement::I32(v) => { + let value = NumCast::from(v).ok_or_else(|| { + ArrowError::JsonError(format!("failed to parse {v} as {d}",)) + })?; + builder.append_value(value) + } + TapeElement::F64(high) => match tape.get(p + 1) { + TapeElement::F32(low) => { + let v = f64::from_bits((high as u64) << 32 | low as u64); + let value = NumCast::from(v).ok_or_else(|| { + ArrowError::JsonError(format!("failed to parse {v} as {d}",)) + })?; + builder.append_value(value) + } + _ => unreachable!(), + }, + TapeElement::I64(high) => match tape.get(p + 1) { + TapeElement::I32(low) => { + let v = (high as i64) << 32 | low as i64; + let value = NumCast::from(v).ok_or_else(|| { + ArrowError::JsonError(format!("failed to parse {v} as {d}",)) + })?; + builder.append_value(value) + } + _ => unreachable!(), + }, _ => return Err(tape.error(*p, "primitive")), } } diff --git a/arrow-json/src/reader/serializer.rs b/arrow-json/src/reader/serializer.rs index 2aa72de943f7..2fd250bdfcc3 100644 --- a/arrow-json/src/reader/serializer.rs +++ b/arrow-json/src/reader/serializer.rs @@ -77,22 +77,6 @@ impl<'a> TapeSerializer<'a> { } } -/// The tape stores all values as strings, and so must serialize numeric types -/// -/// Formatting to a string only to parse it back again is rather wasteful, -/// it may be possible to tweak the tape representation to avoid this -/// -/// Need to use macro as const generic expressions are unstable -/// -macro_rules! serialize_numeric { - ($s:ident, $t:ty, $v:ident) => {{ - let mut buffer = [0_u8; <$t>::FORMATTED_SIZE]; - let s = lexical_core::write($v, &mut buffer); - $s.serialize_number(s); - Ok(()) - }}; -} - impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> { type Ok = (); @@ -115,43 +99,63 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> { } fn serialize_i8(self, v: i8) -> Result<(), SerializerError> { - serialize_numeric!(self, i8, v) + self.serialize_i32(v as _) } fn serialize_i16(self, v: i16) -> Result<(), SerializerError> { - serialize_numeric!(self, i16, v) + self.serialize_i32(v as _) } fn serialize_i32(self, v: i32) -> Result<(), SerializerError> { - serialize_numeric!(self, i32, v) + self.elements.push(TapeElement::I32(v)); + Ok(()) } fn serialize_i64(self, v: i64) -> Result<(), SerializerError> { - serialize_numeric!(self, i64, v) + let low = v as i32; + let high = (v >> 32) as i32; + self.elements.push(TapeElement::I64(high)); + self.elements.push(TapeElement::I32(low)); + Ok(()) } fn serialize_u8(self, v: u8) -> Result<(), SerializerError> { - serialize_numeric!(self, u8, v) + self.serialize_i32(v as _) } fn serialize_u16(self, v: u16) -> Result<(), SerializerError> { - serialize_numeric!(self, u16, v) + self.serialize_i32(v as _) } fn serialize_u32(self, v: u32) -> Result<(), SerializerError> { - serialize_numeric!(self, u32, v) + match i32::try_from(v) { + Ok(v) => self.serialize_i32(v), + Err(_) => self.serialize_i64(v as _), + } } fn serialize_u64(self, v: u64) -> Result<(), SerializerError> { - serialize_numeric!(self, u64, v) + match i64::try_from(v) { + Ok(v) => self.serialize_i64(v), + Err(_) => { + let mut buffer = [0_u8; u64::FORMATTED_SIZE]; + let s = lexical_core::write(v, &mut buffer); + self.serialize_number(s); + Ok(()) + } + } } fn serialize_f32(self, v: f32) -> Result<(), SerializerError> { - serialize_numeric!(self, f32, v) + self.elements.push(TapeElement::F32(v.to_bits())); + Ok(()) } fn serialize_f64(self, v: f64) -> Result<(), SerializerError> { - serialize_numeric!(self, f64, v) + let bits = v.to_bits(); + self.elements.push(TapeElement::F64((bits >> 32) as u32)); + self.elements.push(TapeElement::F32(bits as u32)); + Ok(()) } fn serialize_char(self, v: char) -> Result<(), SerializerError> { diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 5eca7b43dcc7..801e8f29d525 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -18,6 +18,7 @@ use crate::reader::serializer::TapeSerializer; use arrow_schema::ArrowError; use serde::Serialize; +use std::fmt::Write; /// We decode JSON to a flattened tape representation, /// allowing for efficient traversal of the JSON data @@ -54,6 +55,25 @@ pub enum TapeElement { /// /// Contains the offset into the [`Tape`] string data Number(u32), + + /// The high bits of a i64 + /// + /// Followed by [`Self::I32`] containing the low bits + I64(i32), + + /// A 32-bit signed integer + /// + /// May be preceded by [`Self::I64`] containing high bits + I32(i32), + + /// The high bits of a 64-bit float + /// + /// Followed by [`Self::F32`] containing the low bits + F64(u32), + + /// A 32-bit float or the low-bits of a 64-bit float if preceded by [`Self::F64`] + F32(u32), + /// A true literal True, /// A false literal @@ -104,10 +124,15 @@ impl<'a> Tape<'a> { | TapeElement::Number(_) | TapeElement::True | TapeElement::False - | TapeElement::Null => Ok(cur_idx + 1), + | TapeElement::Null + | TapeElement::I32(_) + | TapeElement::F32(_) => Ok(cur_idx + 1), + TapeElement::I64(_) | TapeElement::F64(_) => Ok(cur_idx + 2), TapeElement::StartList(end_idx) => Ok(end_idx + 1), TapeElement::StartObject(end_idx) => Ok(end_idx + 1), - _ => Err(self.error(cur_idx, expected)), + TapeElement::EndObject(_) | TapeElement::EndList(_) => { + Err(self.error(cur_idx, expected)) + } } } @@ -153,6 +178,28 @@ impl<'a> Tape<'a> { TapeElement::True => out.push_str("true"), TapeElement::False => out.push_str("false"), TapeElement::Null => out.push_str("null"), + TapeElement::I64(high) => match self.get(idx + 1) { + TapeElement::I32(low) => { + let val = (high as i64) << 32 | low as i64; + let _ = write!(out, "{val}"); + return idx + 2; + } + _ => unreachable!(), + }, + TapeElement::I32(val) => { + let _ = write!(out, "{val}"); + } + TapeElement::F64(high) => match self.get(idx + 1) { + TapeElement::F32(low) => { + let val = f64::from_bits((high as u64) << 32 | low as u64); + let _ = write!(out, "{val}"); + return idx + 2; + } + _ => unreachable!(), + }, + TapeElement::F32(val) => { + let _ = write!(out, "{}", f32::from_bits(val)); + } } idx + 1 } diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index b80915f6a56a..09672614107c 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -96,6 +96,13 @@ where builder.append_value(value) } + TapeElement::I32(v) => builder.append_value(v as i64), + TapeElement::I64(high) => match tape.get(p + 1) { + TapeElement::I32(low) => { + builder.append_value((high as i64) << 32 | low as i64) + } + _ => unreachable!(), + }, _ => return Err(tape.error(*p, "primitive")), } }