Skip to content

Commit

Permalink
Faster Serde Integration (~80% faster) (#4861)
Browse files Browse the repository at this point in the history
* Store decoded numerics in JSON tape

* Add arrow-json serde benchmarks

* Fix timestamp serialize

* Clippy
  • Loading branch information
tustvold authored Sep 26, 2023
1 parent 2c9e2e9 commit fbd9008
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 37 deletions.
7 changes: 7 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ serde = { version = "1.0", default-features = false, features = ["derive"] }
futures = "0.3"
tokio = { version = "1.27", default-features = false, features = ["io-util"] }
bytes = "1.4"
criterion = { version = "0.5", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }

[[bench]]
name = "serde"
harness = false

62 changes: 62 additions & 0 deletions arrow-json/benches/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_json::ReaderBuilder;
use arrow_schema::{DataType, Field, Schema};
use criterion::*;
use rand::{thread_rng, Rng};
use serde::Serialize;
use std::sync::Arc;

#[allow(deprecated)]
fn do_bench<R: Serialize>(c: &mut Criterion, name: &str, rows: &[R], schema: &Schema) {
let schema = Arc::new(schema.clone());
c.bench_function(name, |b| {
b.iter(|| {
let builder = ReaderBuilder::new(schema.clone()).with_batch_size(64);
let mut decoder = builder.build_decoder().unwrap();
decoder.serialize(rows)
})
});
}

fn criterion_benchmark(c: &mut Criterion) {
let mut rng = thread_rng();
let schema = Schema::new(vec![Field::new("i32", DataType::Int32, false)]);
let v: Vec<i32> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();

do_bench(c, "small_i32", &v, &schema);
let v: Vec<i32> = (0..2048).map(|_| rng.gen()).collect();
do_bench(c, "large_i32", &v, &schema);

let schema = Schema::new(vec![Field::new("i64", DataType::Int64, false)]);
let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..10000)).collect();
do_bench(c, "small_i64", &v, &schema);
let v: Vec<i64> = (0..2048).map(|_| rng.gen_range(0..i32::MAX as _)).collect();
do_bench(c, "medium_i64", &v, &schema);
let v: Vec<i64> = (0..2048).map(|_| rng.gen()).collect();
do_bench(c, "large_i64", &v, &schema);

let schema = Schema::new(vec![Field::new("f32", DataType::Float32, false)]);
let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..10000.)).collect();
do_bench(c, "small_f32", &v, &schema);
let v: Vec<f32> = (0..2048).map(|_| rng.gen_range(0.0..f32::MAX)).collect();
do_bench(c, "large_f32", &v, &schema);
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
46 changes: 37 additions & 9 deletions arrow-json/src/reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,20 @@ impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
impl<P> ArrayDecoder for PrimitiveArrayDecoder<P>
where
P: ArrowPrimitiveType + Parser,
P::Native: ParseJsonNumber,
P::Native: ParseJsonNumber + NumCast,
{
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
.with_data_type(self.data_type.clone());
let d = &self.data_type;

for p in pos {
match tape.get(*p) {
TapeElement::Null => builder.append_null(),
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let value = P::parse(s).ok_or_else(|| {
ArrowError::JsonError(format!(
"failed to parse \"{s}\" as {}",
self.data_type
))
ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",))
})?;

builder.append_value(value)
Expand All @@ -115,14 +113,44 @@ where
let s = tape.get_string(idx);
let value =
ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
ArrowError::JsonError(format!(
"failed to parse {s} as {}",
self.data_type
))
ArrowError::JsonError(format!("failed to parse {s} as {d}",))
})?;

builder.append_value(value)
}
TapeElement::F32(v) => {
let v = f32::from_bits(v);
let value = NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v} as {d}",))
})?;
builder.append_value(value)
}
TapeElement::I32(v) => {
let value = NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v} as {d}",))
})?;
builder.append_value(value)
}
TapeElement::F64(high) => match tape.get(p + 1) {
TapeElement::F32(low) => {
let v = f64::from_bits((high as u64) << 32 | low as u64);
let value = NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v} as {d}",))
})?;
builder.append_value(value)
}
_ => unreachable!(),
},
TapeElement::I64(high) => match tape.get(p + 1) {
TapeElement::I32(low) => {
let v = (high as i64) << 32 | low as i64;
let value = NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v} as {d}",))
})?;
builder.append_value(value)
}
_ => unreachable!(),
},
_ => return Err(tape.error(*p, "primitive")),
}
}
Expand Down
56 changes: 30 additions & 26 deletions arrow-json/src/reader/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,6 @@ impl<'a> TapeSerializer<'a> {
}
}

/// The tape stores all values as strings, and so must serialize numeric types
///
/// Formatting to a string only to parse it back again is rather wasteful,
/// it may be possible to tweak the tape representation to avoid this
///
/// Need to use macro as const generic expressions are unstable
/// <https://github.com/rust-lang/rust/issues/76560>
macro_rules! serialize_numeric {
($s:ident, $t:ty, $v:ident) => {{
let mut buffer = [0_u8; <$t>::FORMATTED_SIZE];
let s = lexical_core::write($v, &mut buffer);
$s.serialize_number(s);
Ok(())
}};
}

impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
type Ok = ();

Expand All @@ -115,43 +99,63 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
}

fn serialize_i8(self, v: i8) -> Result<(), SerializerError> {
serialize_numeric!(self, i8, v)
self.serialize_i32(v as _)
}

fn serialize_i16(self, v: i16) -> Result<(), SerializerError> {
serialize_numeric!(self, i16, v)
self.serialize_i32(v as _)
}

fn serialize_i32(self, v: i32) -> Result<(), SerializerError> {
serialize_numeric!(self, i32, v)
self.elements.push(TapeElement::I32(v));
Ok(())
}

fn serialize_i64(self, v: i64) -> Result<(), SerializerError> {
serialize_numeric!(self, i64, v)
let low = v as i32;
let high = (v >> 32) as i32;
self.elements.push(TapeElement::I64(high));
self.elements.push(TapeElement::I32(low));
Ok(())
}

fn serialize_u8(self, v: u8) -> Result<(), SerializerError> {
serialize_numeric!(self, u8, v)
self.serialize_i32(v as _)
}

fn serialize_u16(self, v: u16) -> Result<(), SerializerError> {
serialize_numeric!(self, u16, v)
self.serialize_i32(v as _)
}

fn serialize_u32(self, v: u32) -> Result<(), SerializerError> {
serialize_numeric!(self, u32, v)
match i32::try_from(v) {
Ok(v) => self.serialize_i32(v),
Err(_) => self.serialize_i64(v as _),
}
}

fn serialize_u64(self, v: u64) -> Result<(), SerializerError> {
serialize_numeric!(self, u64, v)
match i64::try_from(v) {
Ok(v) => self.serialize_i64(v),
Err(_) => {
let mut buffer = [0_u8; u64::FORMATTED_SIZE];
let s = lexical_core::write(v, &mut buffer);
self.serialize_number(s);
Ok(())
}
}
}

fn serialize_f32(self, v: f32) -> Result<(), SerializerError> {
serialize_numeric!(self, f32, v)
self.elements.push(TapeElement::F32(v.to_bits()));
Ok(())
}

fn serialize_f64(self, v: f64) -> Result<(), SerializerError> {
serialize_numeric!(self, f64, v)
let bits = v.to_bits();
self.elements.push(TapeElement::F64((bits >> 32) as u32));
self.elements.push(TapeElement::F32(bits as u32));
Ok(())
}

fn serialize_char(self, v: char) -> Result<(), SerializerError> {
Expand Down
51 changes: 49 additions & 2 deletions arrow-json/src/reader/tape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::reader::serializer::TapeSerializer;
use arrow_schema::ArrowError;
use serde::Serialize;
use std::fmt::Write;

/// We decode JSON to a flattened tape representation,
/// allowing for efficient traversal of the JSON data
Expand Down Expand Up @@ -54,6 +55,25 @@ pub enum TapeElement {
///
/// Contains the offset into the [`Tape`] string data
Number(u32),

/// The high bits of a i64
///
/// Followed by [`Self::I32`] containing the low bits
I64(i32),

/// A 32-bit signed integer
///
/// May be preceded by [`Self::I64`] containing high bits
I32(i32),

/// The high bits of a 64-bit float
///
/// Followed by [`Self::F32`] containing the low bits
F64(u32),

/// A 32-bit float or the low-bits of a 64-bit float if preceded by [`Self::F64`]
F32(u32),

/// A true literal
True,
/// A false literal
Expand Down Expand Up @@ -104,10 +124,15 @@ impl<'a> Tape<'a> {
| TapeElement::Number(_)
| TapeElement::True
| TapeElement::False
| TapeElement::Null => Ok(cur_idx + 1),
| TapeElement::Null
| TapeElement::I32(_)
| TapeElement::F32(_) => Ok(cur_idx + 1),
TapeElement::I64(_) | TapeElement::F64(_) => Ok(cur_idx + 2),
TapeElement::StartList(end_idx) => Ok(end_idx + 1),
TapeElement::StartObject(end_idx) => Ok(end_idx + 1),
_ => Err(self.error(cur_idx, expected)),
TapeElement::EndObject(_) | TapeElement::EndList(_) => {
Err(self.error(cur_idx, expected))
}
}
}

Expand Down Expand Up @@ -153,6 +178,28 @@ impl<'a> Tape<'a> {
TapeElement::True => out.push_str("true"),
TapeElement::False => out.push_str("false"),
TapeElement::Null => out.push_str("null"),
TapeElement::I64(high) => match self.get(idx + 1) {
TapeElement::I32(low) => {
let val = (high as i64) << 32 | low as i64;
let _ = write!(out, "{val}");
return idx + 2;
}
_ => unreachable!(),
},
TapeElement::I32(val) => {
let _ = write!(out, "{val}");
}
TapeElement::F64(high) => match self.get(idx + 1) {
TapeElement::F32(low) => {
let val = f64::from_bits((high as u64) << 32 | low as u64);
let _ = write!(out, "{val}");
return idx + 2;
}
_ => unreachable!(),
},
TapeElement::F32(val) => {
let _ = write!(out, "{}", f32::from_bits(val));
}
}
idx + 1
}
Expand Down
7 changes: 7 additions & 0 deletions arrow-json/src/reader/timestamp_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ where

builder.append_value(value)
}
TapeElement::I32(v) => builder.append_value(v as i64),
TapeElement::I64(high) => match tape.get(p + 1) {
TapeElement::I32(low) => {
builder.append_value((high as i64) << 32 | low as i64)
}
_ => unreachable!(),
},
_ => return Err(tape.error(*p, "primitive")),
}
}
Expand Down

0 comments on commit fbd9008

Please sign in to comment.